次の方法で共有


型の拡大

重要

この機能は、Databricks Runtime 15.4 LTS 以降の パブリック プレビュー 段階にあります。

型拡大が有効になっているテーブルを使用すると、基になるデータ ファイルを書き換えることなく、列データ型をより広い型に変更できます。 列の型を手動で変更するか、スキーマの進化を使用して列型を進化させることができます。

重要

型拡大は、Databricks Runtime 15.4 LTS 以降で使用できます。 型拡大が有効になっているテーブルは、Databricks Runtime 15.4 LTS 以降でのみ読み取ることができます。

型の拡大には Delta Lake が必要です。 すべての Unity カタログ マネージド テーブルでは、既定で Delta Lake が使用されます。

サポートされている型の変更

以下の規則に従って、型を拡大することができます。

ソースの種類 サポートされるより拡大された型
byte shortintlongdecimaldouble
short intlongdecimaldouble
int longdecimaldouble
long decimal
float double
decimal decimal より高い精度と規模で
date timestampNTZ

整数値を誤って 10 進数に昇格しないようにするには、byteshort、またはintからlongまたはdecimalに型の変更をdoubleする必要があります。 整数型を decimal または doubleに昇格する場合、ダウンストリーム インジェストによってこの値が整数列に書き戻された場合、Spark は既定で値の小数部を切り捨てます。

何らかの数値型を decimal に変更する場合、合計の有効桁数は元の有効桁数以上となる必要があります。 スケールも大きくする場合は、合計有効桁数を対応する分だけ増やす必要があります。

byteshortint 型の最小のターゲットは、decimal(10,0) です。 long の最小ターゲットは decimal(20,0) です。

decimal(10,1) を持つフィールドに 2 個の桁を追加したい場合、最小のターゲットは decimal(12,3) になります。

構造体、マップ、配列の中で入れ子になったフィールドおよび最上位レベルの列に対する型の変更がサポートされています。

型の拡大を有効にする

既存のテーブルで型拡大を有効にするには、 delta.enableTypeWidening テーブルプロパティを trueに設定します。

  ALTER TABLE <table_name> SET TBLPROPERTIES ('delta.enableTypeWidening' = 'true')

テーブルの作成時に型拡大を有効にすることもできます。

  CREATE TABLE T(c1 INT) TBLPROPERTIES('delta.enableTypeWidening' = 'true')

重要

型拡大を有効にすると、テーブル機能 typeWideningが設定され、リーダー プロトコルとライター プロトコルがアップグレードされます。 型拡大が有効になっているテーブルを操作するには、Databricks Runtime 15.4 以降を使用する必要があります。 外部クライアントもテーブルと対話する場合は、このテーブル機能がサポートされていることを確認します。 Delta Lake の機能の互換性とプロトコルに関する記事を参照してください。

型の変更を手動で適用する

以下のように、ALTER COLUMN コマンドを使用して、型を手動で変更します。

ALTER TABLE <table_name> ALTER COLUMN <col_name> TYPE <new_type>

この操作では、基になるデータ ファイルを書き換えずにテーブル スキーマを更新します。

スキーマの自動進化を使用して型を拡大する

スキーマの進化は、入力データの種類と一致するようにターゲット テーブルのデータ型を更新する拡大型と連携します。

型拡大が有効になっていない場合、スキーマの進化は常に、ターゲット テーブル内の列型と一致するようにデータをダウンキャストしようとします。 ターゲット テーブルのデータ型を自動的に拡大しない場合は、スキーマの進化を有効にしてワークロードを実行する前に、型の拡大を無効にします。

インジェスト中にスキーマの進化を使用して列のデータ型を拡大するには、次の条件を満たす必要があります。

  • 書き込みコマンドは、スキーマの自動進化を有効にして実行されます。
  • ターゲット テーブルで、型拡大が有効になっています。
  • ソース列の種類は、ターゲット列の種類よりも広くなります。
  • 型の拡大が型の変更をサポートしている。
  • 型の変更は、byteshortint、またはlongからdecimalまたはdoubleへの変更ではありません。 これらの型の変更は、整数を誤って 10 進数に昇格しないように、 ALTER TABLE を使用してのみ手動で適用できます。

これらの条件をすべて満たしていない型の不一致は、通常のスキーマ適用規則に従います。 「スキーマの適用」を参照してください。

タイプ拡大テーブル フィーチャを無効にする

プロパティを false に設定することで、有効なテーブルで誤って型拡大を防ぐことができます。

  ALTER TABLE <table_name> SET TBLPROPERTIES ('delta.enableTypeWidening' = 'false')

この設定により、テーブルに対する将来の型の変更は防止されますが、拡大テーブルの型機能や変更された元に戻す型は削除されません。

タイプ拡大テーブル フィーチャを完全に削除する必要がある場合は、次の例に示すように DROP FEATURE コマンドを使用できます。

 ALTER TABLE <table-name> DROP FEATURE 'typeWidening' [TRUNCATE HISTORY]

Databricks Runtime 15.4 LTS を使用して型拡大を有効にしたテーブルでは、代わりに機能 typeWidening-preview を削除する必要があります。

型拡大を削除すると、現在のテーブル スキーマに準拠していないすべてのデータ ファイルが書き換えられます。 Delta Lake テーブル機能の削除とテーブル プロトコルのダウングレードに関する記事を参照してください。

Delta テーブルからのストリーミング

構造化ストリーミングでの Type Widening のサポートは、Databricks Runtime 16.3 以降で利用できます。

Delta テーブルからストリーミングする場合、型の変更は、 列マッピングを使用して列の名前を変更または削除する場合と同様に、非加法スキーマの変更として扱われます。

スキーマ追跡の場所を指定して、種類の変更が適用された Delta Lake テーブルからのストリーミングを有効にすることができます。

データ ソースに対する各ストリーミングの読み取りには、独自の schemaTrackingLocation を指定する必要があります。 指定した schemaTrackingLocation は、ストリーミング書き込み用のターゲット テーブルの checkpointLocation に指定されたディレクトリに含まれている必要があります。

複数のソース Delta テーブルのデータを結合するストリーミング ワークロードの場合は、ソース テーブルごとに checkpointLocation で一意のディレクトリを指定する必要があります。

次のコード例に示すように、schemaTrackingLocation オプションを使用してスキーマ追跡のパスを指定します。

Python(プログラミング言語)

checkpoint_path = "/path/to/checkpointLocation"

(spark.readStream
  .option("schemaTrackingLocation", checkpoint_path)
  .table("delta_source_table")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("output_table")
)

スカラ (プログラミング言語)

val checkpointPath = "/path/to/checkpointLocation"

spark.readStream
  .option("schemaTrackingLocation", checkpointPath)
  .table("delta_source_table")
  .writeStream
  .option("checkpointLocation", checkpointPath)
  .toTable("output_table")

スキーマ追跡の場所を指定すると、ストリームは、型の変更が検出されて停止されるたびに、追跡対象のスキーマを進化させます。 その時点で、ダウンストリーム テーブルで型の拡大を有効にしたり、ストリーミング クエリを更新したりするなど、型の変更を処理するために必要なアクションを実行できます。

処理を再開するには、Spark 構成 spark.databricks.delta.streaming.allowSourceColumnTypeChange または DataFrame リーダー オプションを allowSourceColumnTypeChange設定します。

Python(プログラミング言語)

checkpoint_path = "/path/to/checkpointLocation"

(spark.readStream
  .option("schemaTrackingLocation", checkpoint_path)
  .option("allowSourceColumnTypeChange", "<delta_source_table_version>")
  # alternatively to allow all future type changes for this stream:
  # .option("allowSourceColumnTypeChange", "always")
  .table("delta_source_table")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("output_table")
)

スカラ (プログラミング言語)

val checkpointPath = "/path/to/checkpointLocation"

spark.readStream
  .option("schemaTrackingLocation", checkpointPath)
  .option("allowSourceColumnTypeChange", "<delta_source_table_version>")
  // alternatively to allow all future type changes for this stream:
  // .option("allowSourceColumnTypeChange", "always")
  .table("delta_source_table")
  .writeStream
  .option("checkpointLocation", checkpointPath)
  .toTable("output_table")

SQL

  -- To unblock for this particular stream just for this series of schema change(s):
  SET spark.databricks.delta.streaming.allowSourceColumnTypeChange.ckpt_<checkpoint_id> = "<delta_source_table_version>"
  -- To unblock for this particular stream:
  SET spark.databricks.delta.streaming.allowSourceColumnTypeChange = "<delta_source_table_version>"
  -- To unblock for all streams:
  SET spark.databricks.delta.streaming.allowSourceColumnTypeChange = "always"

ストリームが停止すると、チェックポイント ID <checkpoint_id> と Delta Lake ソース テーブルのバージョン <delta_source_table_version> がエラー メッセージに表示されます。

デルタ・シェアリング

デルタ共有での型拡大のサポートは、Databricks Runtime 16.1 以降で利用できます。

Databricks 間での Delta Sharing において、型の拡張が有効となった Delta Lake テーブルの共有がサポートされています。 プロバイダーと受信者は、Databricks Runtime 16.1 以降である必要があります。

Delta Sharing を使用して型拡大が有効になっている Delta Lake テーブルから変更データ フィードを読み取る場合は、応答形式を delta に設定する必要があります。

spark.read
  .format("deltaSharing")
  .option("responseFormat", "delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", "<start version>")
  .option("endingVersion", "<end version>")
  .load("<table>")

型の変更間での変更データ フィードの読み取りはサポートされていません。 代わりに、操作を 2 つの個別の読み取りに分割する必要があります。1 つは型の変更を含むテーブル バージョンで終わり、もう 1 つは型の変更を含むバージョンから始まります。

制限事項

Apache Iceberg の互換性

Apache Iceberg では、型の拡大によってカバーされるすべての型変更がサポートされるわけではありません。 Iceberg スキーマの進化に関するページを参照してください。 特に、Azure Databricks では、次の種類の変更はサポートされていません。

  • byteshortintlongまたはdecimaldouble
  • 10進法スケールの拡大
  • date から timestampNTZ

Delta Lake テーブル で UniForm と Iceberg の互換性 が有効になっている場合、これらの種類の変更のいずれかを適用するとエラーが発生します。

これらのサポートされていない型の変更のいずれかを Delta Lake テーブルに適用すると、テーブルで Uniform with Iceberg の互換性 を有効にすると、エラーが発生します。 このエラーを解決するには、 タイプ拡大テーブル フィーチャを削除する必要があります。

その他の制限事項

  • 型の変更を伴う Delta Lake テーブルからのストリーミング時に SQL を使用してスキーマ追跡の場所を提供することはサポートされていません。
  • Databricks以外の消費者に対して、Delta Sharingで型の拡大を有効にしたテーブルの共有はサポートされていません。