重要
この機能はパブリック プレビュー段階にあります。
Databricks Runtime 15.3 以降では、VARIANT
型を使用して半構造化データを取り込むことができます。 この記事では、動作について説明し、自動ローダーと COPY INTO
を使用してクラウド オブジェクト ストレージからデータを取り込み、Kafka からレコードをストリーミングするためのパターン例を示し、バリアント データを含む新しいテーブルを作成し、バリアント型を使用して新しいレコードを挿入するための SQL コマンドを紹介します。 次の表は、サポートされているファイル形式と Databricks Runtime バージョンのサポートをまとめたものです。
ファイル形式 | サポートされている Databricks ランタイムのバージョン |
---|---|
JSON | 15.3 以降 |
XML | 16.4 以降 |
CSV | 16.4 以降 |
「バリアント データにクエリを実行する」を参照してください。
バリアント列を持つテーブルを作成する
VARIANT
は Databricks Runtime 15.3 以降の標準 SQL 型であり、Delta Lake によってバックアップされるテーブルでサポートされています。 Azure Databricks のマネージド テーブルでは、既定で Delta Lake が使用されるため、次の構文を使用して 1 つの VARIANT
列を含む空のテーブルを作成できます。
CREATE TABLE table_name (variant_column VARIANT)
または、JSON 文字列の PARSE_JSON
関数または XML 文字列の FROM_XML
関数を使用して、CTAS ステートメントを使用してバリアント列を含むテーブルを作成することもできます。 次の例では、2 つの列を含むテーブルを作成します。
- JSON 文字列から
id
型として抽出されたSTRING
列。 -
variant_column
列には、VARIANT
型としてエンコードされた JSON 文字列全体が含まれます。
CREATE TABLE table_name AS
SELECT json_string:id AS id,
PARSE_JSON(json_string) variant_column
FROM source_data
注
Databricks では、フィールドを、クエリの高速化とストレージ レイアウトの最適化のために使用しようとしている非バリアント列として抽出および保存することをお勧めしています。
VARIANT
列は、クラスタリング キー、パーティション、または Z オーダー キーには使用できません。
VARIANT
データ型は、比較、グループ化、順序付け、集合の操作には使用できません。 制限事項の詳細な一覧については、「制限事項」を参照してください。
parse_json
を使用してデータを挿入する
既にターゲット テーブルに VARIANT
としてエンコードされた列が含まれている場合は、次の例のように、parse_json
を使用して JSON 文字列レコードを VARIANT
として挿入できます。
SQL
INSERT INTO table_name (variant_column)
SELECT PARSE_JSON(json_string)
FROM source_data
Python
from pyspark.sql.functions import col, parse_json
(spark.read
.table("source_data")
.select(parse_json(col("json_string")))
.write
.mode("append")
.saveAsTable("table_name")
)
from_xml
を使用してデータを挿入する
ターゲット テーブルに既に VARIANT
としてエンコードされた列が含まれている場合は、 from_xml
を使用して XML 文字列レコードを VARIANT
として挿入できます。 例えば次が挙げられます。
SQL
INSERT INTO table_name (variant_column)
SELECT FROM_XML(xml_string, 'variant')
FROM source_data
Python
from pyspark.sql.functions import col, from_xml
(spark.read
.table("source_data")
.select(from_xml(col("xml_string"), "variant"))
.write
.mode("append")
.saveAsTable("table_name")
)
を使用してデータを挿入する from_csv
ターゲット テーブルに既に VARIANT
としてエンコードされた列が含まれている場合は、 from_csv
を使用して XML 文字列レコードを VARIANT
として挿入できます。 例えば次が挙げられます。
SQL
INSERT INTO table_name (variant_column)
SELECT FROM_CSV(csv_string, 'v variant').v
FROM source_data
Python
from pyspark.sql.functions import col, from_csv
(spark.read
.table("source_data")
.select(from_csv(col("csv_string"), "v variant").v)
.write
.mode("append")
.saveAsTable("table_name")
)
バリアントとしてクラウド オブジェクト ストレージからデータを取り込む
自動ローダーを使用すると、サポートされているファイル ソースのすべてのデータをターゲット テーブルの 1 つの VARIANT
列として読み込むことができます。
VARIANT
は、スキーマと型の変更に対して柔軟であり、データ ソースに存在する大文字と小文字の区別と NULL
の値を維持するので、このパターンはほとんどのインジェスト シナリオで信頼性が高くなります。ただし、次の注意事項があります。
- 形式が正しくないレコードは、
VARIANT
型を使用してエンコードできません。 -
VARIANT
型に保持できるレコードのサイズは 16 MB までです。
注
バリアント型では、異常に大きなレコードは破損したレコードと同様に扱われます。 既定の PERMISSIVE
処理モードでは、過度に大きいレコードが corruptRecordColumn
にキャプチャされます。
レコード全体が 1 つの VARIANT
列として記録されるため、インジェスト中にスキーマの進化は発生せず、 rescuedDataColumn
はサポートされません。 次の例では、1 つの VARIANT
列を含むターゲット テーブルが既に存在することを想定しています。
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("singleVariantColumn", "variant_column")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
また、スキーマを定義、または VARIANT
を渡すときに schemaHints
を指定することもできます。 参照元フィールドのデータには、有効なレコードが含まれている必要があります。 この構文の例を次に示します。
# Define the schema.
# Writes the columns `name` as a string and `address` as variant.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema("name STRING, address VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
# Define the schema.
# A single field `payload` containing JSON data is written as variant.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema("payload VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
# Supply schema hints.
# Writes the `address` column as variant.
# Infers the schema for other fields using standard rules.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaHints", "address VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
バリアントで COPY INTO
を使用する
Databricks では、使用可能であれば COPY INTO
に対して自動ローダーを使用することをお勧めしています。
COPY INTO
では、サポートされているデータ ソースの内容全体を 1 つの列として取り込む機能がサポートされています。 次の例では、1 つの VARIANT
列を持つ新しいテーブルを作成し、COPY INTO
を使用して JSON ファイル ソースからレコードを取り込みます。
CREATE TABLE table_name (variant_column VARIANT);
COPY INTO table_name
FROM '/Volumes/catalog_name/schema_name/volume_name/path'
FILEFORMAT = JSON
FORMAT_OPTIONS ('singleVariantColumn' = 'name')
Kafka データをバリアントとしてストリームする
多くの Kafka ストリームでは、JSON を使用してペイロードがエンコードされます。
VARIANT
を使用して Kafka ストリームを取り込むと、スキーマが変更されてもこれらのワークロードの信頼性は高くなります。
次の例では、Kafka ストリーミング ソースを読み取り、key
を STRING
として、さらに value
を VARIANT
としてキャストし、ターゲット テーブルに書き出す方法を示しています。
from pyspark.sql.functions import col, parse_json
(spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.option("startingOffsets", "earliest")
.load()
.select(
col("key").cast("string"),
parse_json(col("value").cast("string"))
).writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)