次の方法で共有


半構造化バリアント型としてデータを取り込む

重要

この機能はパブリック プレビュー段階にあります。

Databricks Runtime 15.3 以降では、VARIANT 型を使用して半構造化データを取り込むことができます。 この記事では、動作について説明し、自動ローダーと COPY INTO を使用してクラウド オブジェクト ストレージからデータを取り込み、Kafka からレコードをストリーミングするためのパターン例を示し、バリアント データを含む新しいテーブルを作成し、バリアント型を使用して新しいレコードを挿入するための SQL コマンドを紹介します。 次の表は、サポートされているファイル形式と Databricks Runtime バージョンのサポートをまとめたものです。

ファイル形式 サポートされている Databricks ランタイムのバージョン
JSON 15.3 以降
XML 16.4 以降
CSV 16.4 以降

バリアント データにクエリを実行する」を参照してください。

バリアント列を持つテーブルを作成する

VARIANT は Databricks Runtime 15.3 以降の標準 SQL 型であり、Delta Lake によってバックアップされるテーブルでサポートされています。 Azure Databricks のマネージド テーブルでは、既定で Delta Lake が使用されるため、次の構文を使用して 1 つの VARIANT 列を含む空のテーブルを作成できます。

CREATE TABLE table_name (variant_column VARIANT)

または、JSON 文字列の PARSE_JSON 関数または XML 文字列の FROM_XML 関数を使用して、CTAS ステートメントを使用してバリアント列を含むテーブルを作成することもできます。 次の例では、2 つの列を含むテーブルを作成します。

  • JSON 文字列から id 型として抽出された STRING 列。
  • variant_column 列には、VARIANT 型としてエンコードされた JSON 文字列全体が含まれます。
CREATE TABLE table_name AS
  SELECT json_string:id AS id,
    PARSE_JSON(json_string) variant_column
  FROM source_data

Databricks では、フィールドを、クエリの高速化とストレージ レイアウトの最適化のために使用しようとしている非バリアント列として抽出および保存することをお勧めしています。

VARIANT 列は、クラスタリング キー、パーティション、または Z オーダー キーには使用できません。 VARIANT データ型は、比較、グループ化、順序付け、集合の操作には使用できません。 制限事項の詳細な一覧については、「制限事項」を参照してください。

parse_json を使用してデータを挿入する

既にターゲット テーブルに VARIANT としてエンコードされた列が含まれている場合は、次の例のように、parse_json を使用して JSON 文字列レコードを VARIANT として挿入できます。

SQL

INSERT INTO table_name (variant_column)
  SELECT PARSE_JSON(json_string)
  FROM source_data

Python

from pyspark.sql.functions import col, parse_json

(spark.read
  .table("source_data")
  .select(parse_json(col("json_string")))
  .write
  .mode("append")
  .saveAsTable("table_name")
)

from_xml を使用してデータを挿入する

ターゲット テーブルに既に VARIANTとしてエンコードされた列が含まれている場合は、 from_xml を使用して XML 文字列レコードを VARIANTとして挿入できます。 例えば次が挙げられます。

SQL

INSERT INTO table_name (variant_column)
  SELECT FROM_XML(xml_string, 'variant')
  FROM source_data

Python

from pyspark.sql.functions import col, from_xml

(spark.read
  .table("source_data")
  .select(from_xml(col("xml_string"), "variant"))
  .write
  .mode("append")
  .saveAsTable("table_name")
)

を使用してデータを挿入する from_csv

ターゲット テーブルに既に VARIANTとしてエンコードされた列が含まれている場合は、 from_csv を使用して XML 文字列レコードを VARIANTとして挿入できます。 例えば次が挙げられます。

SQL

INSERT INTO table_name (variant_column)
  SELECT FROM_CSV(csv_string, 'v variant').v
  FROM source_data

Python

from pyspark.sql.functions import col, from_csv

(spark.read
  .table("source_data")
  .select(from_csv(col("csv_string"), "v variant").v)
  .write
  .mode("append")
  .saveAsTable("table_name")
)

バリアントとしてクラウド オブジェクト ストレージからデータを取り込む

自動ローダーを使用すると、サポートされているファイル ソースのすべてのデータをターゲット テーブルの 1 つの VARIANT 列として読み込むことができます。 VARIANT は、スキーマと型の変更に対して柔軟であり、データ ソースに存在する大文字と小文字の区別と NULL の値を維持するので、このパターンはほとんどのインジェスト シナリオで信頼性が高くなります。ただし、次の注意事項があります。

  • 形式が正しくないレコードは、 VARIANT 型を使用してエンコードできません。
  • VARIANT 型に保持できるレコードのサイズは 16 MB までです。

バリアント型では、異常に大きなレコードは破損したレコードと同様に扱われます。 既定の PERMISSIVE 処理モードでは、過度に大きいレコードが corruptRecordColumnにキャプチャされます。

レコード全体が 1 つの VARIANT 列として記録されるため、インジェスト中にスキーマの進化は発生せず、 rescuedDataColumn はサポートされません。 次の例では、1 つの VARIANT 列を含むターゲット テーブルが既に存在することを想定しています。

(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("singleVariantColumn", "variant_column")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

また、スキーマを定義、または VARIANT を渡すときに schemaHints を指定することもできます。 参照元フィールドのデータには、有効なレコードが含まれている必要があります。 この構文の例を次に示します。

# Define the schema.
# Writes the columns `name` as a string and `address` as variant.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .schema("name STRING, address VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

# Define the schema.
# A single field `payload` containing JSON data is written as variant.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .schema("payload VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

# Supply schema hints.
# Writes the `address` column as variant.
# Infers the schema for other fields using standard rules.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaHints", "address VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

バリアントで COPY INTO を使用する

Databricks では、使用可能であれば COPY INTO に対して自動ローダーを使用することをお勧めしています。

COPY INTO では、サポートされているデータ ソースの内容全体を 1 つの列として取り込む機能がサポートされています。 次の例では、1 つの VARIANT 列を持つ新しいテーブルを作成し、COPY INTO を使用して JSON ファイル ソースからレコードを取り込みます。

CREATE TABLE table_name (variant_column VARIANT);
COPY INTO table_name
  FROM '/Volumes/catalog_name/schema_name/volume_name/path'
  FILEFORMAT = JSON
  FORMAT_OPTIONS ('singleVariantColumn' = 'name')

Kafka データをバリアントとしてストリームする

多くの Kafka ストリームでは、JSON を使用してペイロードがエンコードされます。 VARIANT を使用して Kafka ストリームを取り込むと、スキーマが変更されてもこれらのワークロードの信頼性は高くなります。

次の例では、Kafka ストリーミング ソースを読み取り、keySTRING として、さらに valueVARIANT としてキャストし、ターゲット テーブルに書き出す方法を示しています。

from pyspark.sql.functions import col, parse_json

(spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .option("startingOffsets", "earliest")
  .load()
  .select(
    col("key").cast("string"),
    parse_json(col("value").cast("string"))
  ).writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)