次の方法で共有


AUTO CDC API: パイプラインを使用して変更データ キャプチャを簡略化する

Lakeflow Spark 宣言パイプライン (SDP) は、 AUTO CDC API と AUTO CDC FROM SNAPSHOT API を使用して変更データ キャプチャ (CDC) を簡略化します。

AUTO CDC API は、APPLY CHANGES API を置き換え、同じ構文を持ちます。 APPLY CHANGES API は引き続き使用できますが、Databricks では、AUTO CDC API を代わりに使用することをお勧めします。

使用するインターフェイスは、変更データのソースによって異なります。

  • 変更データ フィード (CDF) からの変更を処理するには、 AUTO CDC を使用します。
  • AUTO CDC FROM SNAPSHOT (パブリック プレビュー、および Python でのみ使用可能) を使用して、データベース スナップショットの変更を処理します。

以前は、 MERGE INTO ステートメントは、Azure Databricks で CDC レコードを処理するために一般的に使用されていました。 ただし、 MERGE INTO は、レコードの順序が正しくないために正しくない結果が生成される場合や、レコードの順序を再作成するための複雑なロジックが必要になる場合があります。

AUTO CDC API は、パイプライン SQL および Python インターフェイスでサポートされています。 AUTO CDC FROM SNAPSHOT API は Python インターフェイスでサポートされています。 AUTO CDC API は、Apache Spark 宣言パイプラインではサポートされていません。

AUTO CDCAUTO CDC FROM SNAPSHOTの両方で、SCD タイプ 1 とタイプ 2 を使用したテーブルの更新がサポートされます。

  • SCD タイプ 1 を使用して、レコードを直接更新します。 更新されたレコードの履歴は保持されません。
  • SCD タイプ 2 を使用して、すべての更新または指定された列セットの更新時にレコードの履歴を保持します。

構文やその他の参照については、AUTO CDC(SQL)パイプライン用AUTO CDC(Python)パイプライン用、およびスナップショットからの AUTO CDC(Python)パイプライン用を参照してください。

この記事では、ソース データの変更に基づいてパイプライン内のテーブルを更新する方法について説明します。 Delta テーブルの行レベルの変更情報を記録およびクエリする方法については、「 Azure Databricks での Delta Lake 変更データ フィードの使用」を参照してください。

Requirements

CDC API を使用するには、 サーバーレス SDP または SDPPro または Advanceditions を使用するようにパイプラインを構成する必要があります。

CDC は AUTO CDC API でどのように実装されますか?

AUTO CDC API は、シーケンス外のレコードを自動的に処理することで、CDC レコードの正しい処理を保証し、シーケンス外のレコードを処理するための複雑なロジックを開発する必要がなくなります。 レコードをシーケンス処理するソース データの列を指定する必要があります。この列は、API がソース データの適切な順序を単調に増加させる表現として解釈されます。 パイプラインは、順不同で到着するデータを自動的に処理します。 SCD タイプ 2 の変更の場合、パイプラインは適切なシーケンス値をターゲット テーブルの __START_AT 列と __END_AT 列に伝達します。 各シーケンス値にはキーごとに 1 つの個別の更新が必要であり、NULL シーケンス値はサポートされていません。

AUTO CDCを使用して CDC 処理を実行するには、まずストリーミング テーブルを作成し、次に SQL の AUTO CDC ... INTO ステートメントまたは Python の create_auto_cdc_flow() 関数を使用して、変更フィードのソース、キー、シーケンスを指定します。 ターゲット ストリーミング テーブルを作成するには、SQL の CREATE OR REFRESH STREAMING TABLE ステートメントまたは Python の create_streaming_table() 関数を使用します。 SCD タイプ 1 およびタイプ 2 の処理例を参照してください。

構文の詳細については、パイプライン の SQL リファレンス または Python リファレンスを参照してください

API を使用して、どのように AUTO CDC FROM SNAPSHOT で CDC が実装されるか。

Important

AUTO CDC FROM SNAPSHOT API はパブリック プレビュー段階です

AUTO CDC FROM SNAPSHOT は、一連のインオーダー スナップショットを比較してソース データの変更を効率的に判断し、スナップショット内のレコードの CDC 処理に必要な処理を実行する宣言型 API です。 AUTO CDC FROM SNAPSHOT は Python パイプライン インターフェイスでのみサポートされています。

AUTO CDC FROM SNAPSHOT では、複数のソースの種類からのスナップショットの取り込みがサポートされています。

  • 定期的なスナップショット インジェストを使用して、既存のテーブルまたはビューからスナップショットを取り込みます。 AUTO CDC FROM SNAPSHOT には、既存のデータベース オブジェクトからのスナップショットの定期的な取り込みをサポートするシンプルで合理化されたインターフェイスがあります。 パイプラインの更新ごとに新しいスナップショットが取り込まれており、取り込み時間がスナップショット バージョンとして使用されます。 パイプラインが連続モードで実行されると、処理を含むフローのAUTO CDC FROM SNAPSHOT設定によって決定された期間に、パイプラインの更新ごとに複数のスナップショットが取り込まれます。
  • 履歴スナップショット インジェストを使用して、Oracle または MySQL データベースまたはデータ ウェアハウスから生成されたスナップショットなど、データベース スナップショットを含むファイルを処理します。

AUTO CDC FROM SNAPSHOTを使用して任意のソースの種類から CDC 処理を実行するには、まずストリーミング テーブルを作成してから、Python で create_auto_cdc_from_snapshot_flow() 関数を使用して、処理を実装するために必要なスナップショット、キー、およびその他の引数を指定します。 定期的なスナップショット インジェストスナップショット インジェストの履歴の例を参照してください。

API に渡されるスナップショットは、バージョン別の昇順である必要があります。 SDP で順序が誤ったスナップショットが検出されると、エラーが発生します。

構文の詳細については、パイプラインの Python リファレンスを参照してください

シーケンス処理に複数の列を使用する

複数の列(例えば、タイムスタンプとIDを使用して同順位を解消する場合など)で順序を決めることができます。それらを組み合わせるためにSTRUCTを使用できます。STRUCTはまず最初のフィールドで並べ、その後同順位があれば2番目のフィールドを考慮するといった具合です。

SQL の例:

SEQUENCE BY STRUCT(timestamp_col, id_col)

Python の例:

sequence_by = struct("timestamp_col", "id_col")

制限事項

シーケンス処理に使用する列は、並べ替え可能なデータ型である必要があります。

例: CDF ソース・データを使用した SCD タイプ 1 および SCD タイプ 2 の処理

次のセクションでは、変更データ フィードからのソース イベントに基づいてターゲット テーブルを更新する SCD 型 1 および種類 2 のクエリの例を示します。

  1. 新しいユーザー レコードを作成します。
  2. ユーザー レコードを削除します。
  3. ユーザー レコードを更新します。 SCD タイプ 1 の例では、最後の UPDATE 操作は遅れて到着し、ターゲット テーブルから削除され、順序が不順のイベントの処理を示しています。

次の例では、パイプラインの構成と更新に関する知識を前提としています。 「 チュートリアル: 変更データ キャプチャを使用して ETL パイプラインを構築する」を参照してください。

これらの例を実行するには、まずサンプル データセットを作成する必要があります。 「テスト データの生成」を参照してください。

これらの例の入力レコードを次に示します。

userId 名前 city 操作 シーケンス番号
124 ラウル Oaxaca INSERT 1
123 Isabel モンテレー INSERT 1
125 メルセデス ティファナ INSERT 2
126 リリー カンクン INSERT 2
123 null 値 null 値 DELETE 6
125 メルセデス Guadalajara UPDATE 6
125 メルセデス メヒカリ UPDATE 5
123 Isabel Chihuahua UPDATE 5

サンプル データの最後の行のコメントを解除すると、レコードを切り捨てる場所を指定する次のレコードが挿入されます。

userId 名前 city 操作 シーケンス番号
null 値 null 値 null 値 切り捨てる 3

次の例には、 DELETE 操作と TRUNCATE 操作の両方を指定するオプションが含まれていますが、それぞれは省略可能です。

SCD タイプ 1 の更新を処理する

次の例では、SCD タイプ 1 の更新の処理を示します。

Python

from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr

@dp.view
def users():
  return spark.readStream.table("cdc_data.users")

dp.create_streaming_table("target")

dp.create_auto_cdc_flow(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  apply_as_truncates = expr("operation = 'TRUNCATE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = 1
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

CREATE FLOW flowname AS AUTO CDC INTO
  target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
APPLY AS TRUNCATE WHEN
  operation = "TRUNCATE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 1;

SCD タイプ 1 の例を実行した後、ターゲット テーブルには次のレコードが含まれます。

userId 名前 city
124 ラウル Oaxaca
125 メルセデス Guadalajara
126 リリー カンクン

追加のTRUNCATE レコードで SCD タイプ 1 の例を実行すると、124での126操作により、レコードTRUNCATEおよびsequenceNum=3が切り捨てられ、ターゲット テーブルに次のレコードが含まれます。

userId 名前 city
125 メルセデス Guadalajara

SCD タイプ 2 の更新を処理する

次の例は、SCD タイプ 2 の更新の処理を示しています。

Python

from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr

@dp.view
def users():
  return spark.readStream.table("cdc_data.users")

dp.create_streaming_table("target")

dp.create_auto_cdc_flow(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2"
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

CREATE FLOW target_flow
AS AUTO CDC INTO
  target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2;

SCD タイプ 2 の例を実行した後、ターゲット テーブルには次のレコードが含まれます。

userId 名前 city __START_AT __END_AT
123 Isabel モンテレー 1 5
123 Isabel Chihuahua 5 6
124 ラウル Oaxaca 1 null 値
125 メルセデス ティファナ 2 5
125 メルセデス メヒカリ 5 6
125 メルセデス Guadalajara 6 null 値
126 リリー カンクン 2 null 値

SCD タイプ 2 クエリでは、ターゲット テーブルの履歴に対して追跡する出力列のサブセットを指定することもできます。 他の列への変更は、新しい履歴レコードを生成するのではなく、その場で更新されます。 次の例では、 city 列を追跡から除外する方法を示します。

次の例では、SCD タイプ 2 でトラック履歴を使用する方法を示します。

Python

from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr

@dp.view
def users():
  return spark.readStream.table("cdc_data.users")

dp.create_streaming_table("target")

dp.create_auto_cdc_flow(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2",
  track_history_except_column_list = ["city"]
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

CREATE FLOW target_flow
AS AUTO CDC INTO
  target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2
TRACK HISTORY ON * EXCEPT
  (city)

追加の TRUNCATE レコードなしでこの例を実行した後、ターゲット テーブルには次のレコードが含まれます。

userId 名前 city __START_AT __END_AT
123 Isabel Chihuahua 1 6
124 ラウル Oaxaca 1 null 値
125 メルセデス Guadalajara 2 null 値
126 リリー カンクン 2 null 値

テスト データを生成する

次のコードは、このチュートリアルに存在するサンプル クエリで使用するサンプル データセットを生成するために提供されています。 新しいスキーマを作成して新しいテーブルを作成するための適切な資格情報があると仮定すると、ノートブックまたは Databricks SQL でこれらのステートメントを実行できます。 次のコードは、パイプライン定義の一部として実行することを意図 していません

CREATE SCHEMA IF NOT EXISTS cdc_data;

CREATE TABLE
  cdc_data.users
AS SELECT
  col1 AS userId,
  col2 AS name,
  col3 AS city,
  col4 AS operation,
  col5 AS sequenceNum
FROM (
  VALUES
  -- Initial load.
  (124, "Raul",     "Oaxaca",      "INSERT", 1),
  (123, "Isabel",   "Monterrey",   "INSERT", 1),
  -- New users.
  (125, "Mercedes", "Tijuana",     "INSERT", 2),
  (126, "Lily",     "Cancun",      "INSERT", 2),
  -- Isabel is removed from the system and Mercedes moved to Guadalajara.
  (123, null,       null,          "DELETE", 6),
  (125, "Mercedes", "Guadalajara", "UPDATE", 6),
  -- This batch of updates arrived out of order. The above batch at sequenceNum 6 will be the final state.
  (125, "Mercedes", "Mexicali",    "UPDATE", 5),
  (123, "Isabel",   "Chihuahua",   "UPDATE", 5)
  -- Uncomment to test TRUNCATE.
  -- ,(null, null,      null,          "TRUNCATE", 3)
);

例: 定期的なスナップショット処理

次の例では、 mycatalog.myschema.mytableに格納されているテーブルのスナップショットを取り込む SCD タイプ 2 の処理を示します。 処理の結果は、 targetという名前のテーブルに書き込まれます。

mycatalog.myschema.mytable タイムスタンプ 2024-01-01 00:00:00 のレコード

Key 価値
1 a1
2 a2

mycatalog.myschema.mytable 2024年1月1日12時00分のタイムスタンプにあるレコード

Key 価値
2 b2
3 a3
from pyspark import pipelines as dp

@dp.view(name="source")
def source():
 return spark.read.table("mycatalog.myschema.mytable")

dp.create_streaming_table("target")

dp.create_auto_cdc_from_snapshot_flow(
 target="target",
 source="source",
 keys=["key"],
 stored_as_scd_type=2
)

スナップショットの処理後、ターゲット テーブルには次のレコードが含まれます。

Key 価値 __START_AT __END_AT
1 a1 2024-01-01 00:00:00 2024-01-01 12:00:00
2 a2 2024年01月01日 00時00分00秒 2024-01-01 12:00:00
2 b2 2024-01-01 12:00:00 null 値
3 a3 2024-01-01 12:00:00 null 値

例: スナップショット処理の履歴

次の例は、クラウド ストレージ システムに格納されている 2 つのスナップショットのソース イベントに基づいてターゲット テーブルを更新する SCD タイプ 2 の処理を示しています。

timestampに格納されているスナップショット/<PATH>/filename1.csv

Key TrackingColumn 非追跡列
1 a1 b1
2 a2 b2
4 a4 b4

timestamp + 5に格納されているスナップショット/<PATH>/filename2.csv

Key TrackingColumn 非追跡列
2 a2_new b2
3 a3 b3
4 a4 b4_new

次のコード例は、これらのスナップショットを使用した SCD タイプ 2 の更新の処理を示しています。

from pyspark import pipelines as dp

def exist(file_name):
  # Storage system-dependent function that returns true if file_name exists, false otherwise

# This function returns a tuple, where the first value is a DataFrame containing the snapshot
# records to process, and the second value is the snapshot version representing the logical
# order of the snapshot.
# Returns None if no snapshot exists.
def next_snapshot_and_version(latest_snapshot_version):
  latest_snapshot_version = latest_snapshot_version or 0
  next_version = latest_snapshot_version + 1
  file_name = "dir_path/filename_" + next_version + ".csv"
  if (exist(file_name)):
    return (spark.read.load(file_name), next_version)
   else:
     # No snapshot available
     return None

dp.create_streaming_live_table("target")

dp.create_auto_cdc_from_snapshot_flow(
  target = "target",
  source = next_snapshot_and_version,
  keys = ["Key"],
  stored_as_scd_type = 2,
  track_history_column_list = ["TrackingCol"]
)

スナップショットの処理後、ターゲット テーブルには次のレコードが含まれます。

Key TrackingColumn 非追跡列 __START_AT __END_AT
1 a1 b1 1 2
2 a2 b2 1 2
2 a2_new b2 2 null 値
3 a3 b3 2 null 値
4 a4 b4_new 1 null 値

ターゲット ストリーミング テーブル内のデータを追加、変更、または削除する

パイプラインでテーブルを Unity Catalog に発行する場合は、挿入、更新、削除、マージステートメントなどの データ操作言語 (DML) ステートメントを使用して、 AUTO CDC ... INTO ステートメントによって作成されたターゲット ストリーミング テーブルを変更できます。

  • ストリーミング テーブルのテーブル スキーマを変更する DML ステートメントはサポートされていません。 DML ステートメントがテーブル スキーマの進化を試みないことを確認します。
  • ストリーミング テーブルを更新する DML ステートメントは、Databricks Runtime 13.3 LTS 以降を使用する共有 Unity Catalog クラスターまたは SQL ウェアハウスでのみ実行できます。
  • ストリーミングには追加専用のデータソースが必要なため、処理で (DML ステートメントなどによる) 変更を伴うソース ストリーミング テーブルからのストリーミングが必要な場合は、ソース ストリーミング テーブルを読み取るときに skipChangeCommits フラグを設定します。 skipChangeCommits が設定されていれば、ソース テーブルのレコードを削除または変更するトランザクションは無視されます。 処理にストリーミング テーブルが必要ない場合は、具体化されたビュー (追加専用の制限がない) をターゲット テーブルとして使用できます。

Lakeflow Spark 宣言パイプラインでは、指定した SEQUENCE BY 列が使用され、ターゲット テーブルの __START_AT 列と __END_AT 列に適切なシーケンス値が伝達されるため (SCD 型 2 の場合)、DML ステートメントでこれらの列の有効な値を使用して、レコードの適切な順序を維持する必要があります。 「AUTO CDC API を使用して CDC を実装する方法」を参照してください。

ストリーミング テーブルで DML ステートメントを使用する方法の詳細については、「ストリーミング テーブルのデータの追加、変更、または削除」を参照してください。

次の例では、開始シーケンスが 5 のアクティブ なレコードを挿入します。

INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);

AUTO CDC ターゲット テーブルから変更データ フィードを読み取る

Databricks Runtime 15.2 以降では、他の Delta テーブルから変更データ フィードを読み取るのと同じ方法で、 AUTO CDC クエリまたは AUTO CDC FROM SNAPSHOT クエリのターゲットであるストリーミング テーブルから変更データ フィードを読み取ることができます。 ターゲット ストリーミング テーブルから変更データ フィードを読み取る場合は、次のものが必要です。

  • ターゲット ストリーミング テーブルは、Unity カタログに発行する必要があります。 「パイプラインで Unity カタログを使用する」を参照してください。
  • ターゲット ストリーミング テーブルから変更データ フィードを読み取る場合は、Databricks Runtime 15.2 以降を使用する必要があります。 別のパイプラインで変更データ フィードを読み取るために、Databricks Runtime 15.2 以降を使用するようにパイプラインを構成する必要があります。

他の Delta テーブルから変更データ フィードを読み取るのと同じ方法で、Lakeflow Spark 宣言パイプラインで作成されたターゲット ストリーミング テーブルから変更データ フィードを読み取ります。 PythonとSQLの例を含むDelta変更データフィードの使用法の詳細については、Azure DatabricksでのDelta Lake変更データフィードの使用を参照してください。

変更データ フィード レコードには、変更イベントの種類を識別する メタデータ が含まれています。 テーブル内のレコードが更新されると、関連付けられている変更レコードのメタデータには、通常、_change_typeイベントとupdate_preimageイベントに設定されたupdate_postimage値が含まれます。

ただし、 _change_type 値は、主キー値の変更を含むターゲット ストリーミング テーブルに対して更新が行われる場合は異なります。 変更に主キーの更新が含まれている場合、 _change_type メタデータ フィールドはイベントの insertdelete に設定されます。 主キーに対する変更は、 UPDATE または MERGE ステートメントを使用していずれかのキー フィールドに対して手動で更新を行った場合、または SCD タイプ 2 テーブルの場合、 __start_at フィールドが以前の開始シーケンス値を反映するように変更された場合に発生する可能性があります。

AUTO CDC クエリは、SCD タイプ 1 と SCD タイプ 2 の処理で異なる主キー値を決定します。

  • SCD タイプ 1 の処理とパイプライン Python インターフェイスの場合、主キーは keys 関数の create_auto_cdc_flow() パラメーターの値です。 SQL インターフェイスの主キーは、KEYS ステートメントの AUTO CDC ... INTO 句によって定義された列です。
  • SCD 型 2 の場合、主キーは、 keys パラメーターまたは KEYS 句に coalesce(__START_AT, __END_AT) 操作からの戻り値を加えたものになります。ここで、 __START_AT__END_AT はターゲット ストリーミング テーブルの対応する列です。

パイプライン内の CDC クエリによって処理されたレコードに関するデータを取得する

次のメトリックは、AUTO CDC クエリではなく、AUTO CDC FROM SNAPSHOT クエリによってのみキャプチャされます。

次のメトリックは、 AUTO CDC クエリによってキャプチャされます。

  • num_upserted_rows: 更新中にデータセットにアップサートされた出力行の数。
  • num_deleted_rows: 更新中にデータセットから削除された既存の出力行の数。

num_output_rowsメトリックは非CDCフローの出力であり、AUTO CDCクエリではキャプチャされません。

パイプラインでの CDC 処理に使用されるデータ オブジェクトは何ですか?

  • これらのデータ構造は、AUTO CDC処理ではなく、AUTO CDC FROM SNAPSHOT処理にのみ適用されます。
  • これらのデータ構造は、ターゲット テーブルが Hive メタストアに発行されている場合にのみ適用されます。 パイプラインが Unity カタログに発行されると、内部バッキング テーブルにユーザーがアクセスできなくなります。

Hive メタストアでターゲット テーブルを宣言すると、次の 2 つのデータ構造が作成されます。

  • ターゲット テーブルに割り当てられた名前を使用するビュー。
  • CDC 処理を管理するためにパイプラインによって使用される内部バッキング テーブル。 このテーブルの名前は、ターゲット テーブル名に対するプリペンド __apply_changes_storage_ によって指定されます。

たとえば、 dp_cdc_targetという名前のターゲット テーブルを宣言すると、 dp_cdc_target という名前のビューと、メタストアに __apply_changes_storage_dp_cdc_target という名前のテーブルが表示されます。 ビューを作成すると、Lakeflow Spark 宣言型パイプラインは、順序が不正なデータを処理するために必要な追加情報(削除マーカやバージョンなど)をフィルタリングできます。 処理されたデータを表示するには、ターゲット ビューに対してクエリを実行します。 __apply_changes_storage_ テーブルのスキーマは、将来の機能や機能強化をサポートするために変更される可能性があるため、運用環境で使用するためにテーブルのクエリを実行しないでください。 テーブルにデータを手動で追加した場合、バージョン列がないため、レコードは他の変更よりも前にあると見なされます。

その他のリソース