次の方法で共有


変更データ キャプチャ (CDC) とは

変更データ キャプチャ (CDC) は、挿入、更新、削除など、ソース システム内のデータに加えられた変更をキャプチャするデータ統合パターンです。 これらの変更はリストとして表され、一般に CDC フィードと呼ばれます。 ソース データセット全体を読み取る代わりに、CDC フィードを操作する場合は、データをより高速に処理できます。 SQL Server、MySQL、Oracle などのトランザクション データベースでは、CDC フィードが生成されます。 デルタテーブルは、変更データフィード (CDF) として知られる独自の CDC フィードを生成します。

次の図は、従業員データを含むソース テーブルの行が更新されると、変更 のみを 含む新しい一連の行を CDC フィードに生成することを示しています。 CDC フィードの各行には、通常、 UPDATE などの操作や、CDC フィードの各行の順序を決定論的に並べ替えるために使用できる列など、追加のメタデータが含まれているため、順序が正しく変更されていない更新を処理できます。 たとえば、次の図の sequenceNum 列によって、CDC フィードの行の順序が決まります。

変更データ キャプチャの概要。

変更データ フィードの処理: 最新のデータのみを保持する場合と、履歴バージョンのデータを保持する

変更されたデータ フィードの処理は、 緩やかに変化するディメンション (SCD) と呼ばれます。 CDC フィードを処理するときは、次の項目を選択できます。

  • 最新のデータ (つまり、既存のデータを上書き) のみを保持しますか? これは SCD Type 1 と呼ばれます。
  • または、データの変更履歴を保持しますか? これは SCD タイプ 2 と呼ばれます。

SCD タイプ 1 の処理では、変更が発生するたびに古いデータを新しいデータで上書きします。 つまり、変更の履歴は保持されません。 最新バージョンのデータのみが使用できます。 これは簡単なアプローチであり、エラーの修正や顧客のメール アドレスなどの重要でないフィールドの更新など、変更履歴が重要でない場合によく使用されます。

チェンジデータキャプチャ SCD タイプ 1 の概要。

SCD タイプ 2 の処理では、時間の経過に伴って異なるバージョンのデータをキャプチャする追加のレコードを作成することによって、データ変更の履歴レコードが保持されます。 データの各バージョンには、タイムスタンプが付けられます。また、メタデータがタグ付けされ、変更が発生したときにユーザーがトレースできるようになります。 これは、分析目的で時間の経過と同時に顧客アドレスの変化を追跡するなど、データの進化を追跡することが重要な場合に役立ちます。

変更データのキャプチャ SCD タイプ 2 の概要。

Lakeflow Spark 宣言パイプラインを使用した SCD タイプ 1 およびタイプ 2 処理の例

このセクションの例では、SCD タイプ 1 とタイプ 2 の使用方法を示します。

手順 1: サンプル データを準備する

この例では、サンプル CDC フィードを生成します。 まず、ノートブックを作成し、次のコードを貼り付けます。 コード ブロックの先頭にある変数を、テーブルとビューを作成する権限があるカタログとスキーマに更新します。

このコードは、複数の変更レコードを含む新しい Delta テーブルを作成します。 スキーマは次のとおりです。

  • id - この従業員の整数、一意識別子
  • name - 文字列、従業員の名前
  • role - 文字列、従業員の役割
  • country - 文字列、国コード、従業員の勤務先
  • operation - 型の変更 ( INSERTUPDATEDELETEなど)
  • sequenceNum - 整数。ソース データ内の CDC イベントの論理順序を識別します。 Lakeflow Spark 宣言型パイプラインでは、このシーケンス処理を使用して、順不同に到着した変更イベントを処理します。
# update these to the catalog and schema where you have permissions
# to create tables and views.

catalog = "mycatalog"
schema = "myschema"
employees_cdf_table = "employees_cdf"

def write_employees_cdf_to_delta():
 data = [
   (1, "Alex", "chef", "FR", "INSERT", 1),
   (2, "Jessica", "owner", "US", "INSERT", 2),
   (3, "Mikhail", "security", "UK", "INSERT", 3),
   (4, "Gary", "cleaner", "UK", "INSERT", 4),
   (5, "Chris", "owner", "NL", "INSERT", 6),
   # out of order update, this should be dropped from SCD Type 1
   (5, "Chris", "manager", "NL", "UPDATE", 5),
   (6, "Pat", "mechanic", "NL", "DELETE", 8),
   (6, "Pat", "mechanic", "NL", "INSERT", 7)
 ]
 columns = ["id", "name", "role", "country", "operation", "sequenceNum"]
 df = spark.createDataFrame(data, columns)
 df.write.format("delta").mode("overwrite").saveAsTable(f"{catalog}.{schema}.{employees_cdf_table}")

write_employees_cdf_to_delta()

このデータは、次の SQL コマンドを使用してプレビューできます。

SELECT *
FROM mycatalog.myschema.employees_cdf

手順 2: SCD タイプ 1 を使用して最新のデータのみを保持する

Lakeflow Spark 宣言パイプラインで AUTO CDC API を使用して、変更データ フィードを SCD Type 1 テーブルに処理することをお勧めします。

  1. 新しいノートブックを作成します。
  2. 次のコードを貼り付けます。
  3. パイプラインを作成して接続します

employees_cdf関数は、上記で作成したテーブルをストリームとして読み取ります。これは、変更データ キャプチャ処理に使用する create_auto_cdc_flow API が、変更のストリームを入力として受け取るためです。 このストリームをテーブルに具体化したくないので、デコレーター @dp.temporary_view でラップします。

次に、 dp.create_target_table を使用して、この変更データ フィードの処理結果を含むストリーミング テーブルを作成します。

最後に、 dp.create_auto_cdc_flow を使用して変更データ フィードを処理します。 各引数を見てみましょう。

  • target - 先に定義したターゲット ストリーミング テーブル。
  • source - 前に定義した変更レコードのストリームに対するビュー。
  • keys - 変更フィード内の一意の行を識別します。 idを一意の識別子として使用しているため、idを唯一の識別列として指定するだけです。
  • sequence_by - ソース データ内の CDC イベントの論理順序を指定する列名。 このシーケンス処理は、順不同で到着する変更イベントを処理するために必要です。 シーケンス列として sequenceNum を指定します。
  • apply_as_deletes - この例のデータには削除操作が含まれているため、 apply_as_deletes を使用して、CDC イベントをアップサートではなく DELETE として扱うタイミングを示します。
  • except_column_list - ターゲット テーブルに含めない列の一覧が含まれます。 この例では、この引数を使用して、 sequenceNumoperationを除外します。
  • stored_as_scd_type - 使用する SCD の種類を示します。
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr, lit, when
from pyspark.sql.types import StringType, ArrayType

catalog = "mycatalog"
schema = "myschema"
employees_cdf_table = "employees_cdf"
employees_table_current = "employees_current"
employees_table_historical = "employees_historical"

@dp.temporary_view
def employees_cdf():
 return spark.readStream.format("delta").table(f"{catalog}.{schema}.{employees_cdf_table}")

dp.create_target_table(f"{catalog}.{schema}.{employees_table_current}")

dp.create_auto_cdc_flow(
 target=f"{catalog}.{schema}.{employees_table_current}",
 source=employees_cdf_table,
 keys=["id"],
 sequence_by=col("sequenceNum"),
 apply_as_deletes=expr("operation = 'DELETE'"),
 except_column_list = ["operation", "sequenceNum"],
 stored_as_scd_type = 1
)

[開始] をクリックして、このパイプラインを実行します。

次に、SQL エディターで次のクエリを実行して、変更レコードが正しく処理されたことを確認します。

SELECT *
FROM mycatalog.myschema.employees_current

従業員 Chris に対する順序が異なる更新は、その役割がマネージャーではなく所有者に設定されているため、正しく適用されませんでした。

データ変更キャプチャのSCDタイプ1の例。

手順 3: SCD タイプ 2 を使用して履歴データを保持する

この例では、従業員レコードに対する変更の完全な履歴を含む 2 つ目のターゲット テーブル ( employees_historical) を作成します。

このコードをパイプラインに追加します。 ここでの唯一の違いは、 stored_as_scd_type が 1 ではなく 2 に設定されていることです。

dp.create_target_table(f"{catalog}.{schema}.{employees_table_historical}")

dp.create_auto_cdc_flow(
 target=f"{catalog}.{schema}.{employees_table_historical}",
 source=employees_cdf_table,
 keys=["id"],
 sequence_by=col("sequenceNum"),
 apply_as_deletes=expr("operation = 'DELETE'"),
 except_column_list = ["operation", "sequenceNum"],
 stored_as_scd_type = 2
)

[開始] をクリックして、このパイプラインを実行します。

次に、SQL エディターで次のクエリを実行して、変更レコードが正しく処理されたことを確認します。

SELECT *
FROM mycatalog.myschema.employees_historical

Pat など、削除された従業員を含め、従業員に対するすべての変更が表示されます。

変更データ キャプチャ SCD タイプ 2 の例。

手順 4: リソースをクリーンアップする

完了したら、次の手順に従ってリソースをクリーンアップします。

  1. パイプラインを削除します。

    パイプラインを削除すると、 employees テーブルと employees_historical テーブルが自動的に削除されます。

    1. [ ジョブ] と [パイプライン] をクリックし、削除するパイプラインの名前を見つけます。
    2. [オーバーフロー] アイコン をクリックします。同じ行でパイプライン名を入力し、[削除] をクリックします。
  2. ノートブックを削除します。

  3. 変更データ フィードを含むテーブルを削除します。

    1. 「新しい> クエリ」をクリックします。
    2. 次の SQL コードを貼り付けて実行し、必要に応じてカタログとスキーマを調整します。
DROP TABLE mycatalog.myschema.employees_cdf

変更データ キャプチャに MERGE INTOforeachBatch を使用する場合の欠点

Databricks には、MERGE INTO API で行をデルタ テーブルにアップサートするために使用できるforeachBatch SQL コマンドが用意されています。 このセクションでは、この手法を単純なユース ケースに使用する方法について説明しますが、実際のシナリオに適用すると、この方法はますます複雑になり、脆弱になります。

この例では、前の例で使用したのと同じサンプル変更データ フィードを使用します。

MERGE INTOと単純な実装foreachBatch

ノートブックを作成し、次のコードをコピーします。 必要に応じて、 catalogschema、および employees_table 変数を変更します。 catalog変数とschema変数は、テーブルを作成できる Unity カタログ内の場所に設定する必要があります。

ノートブックを実行すると、次の処理が実行されます。

  • create_tableにターゲット テーブルを作成します。 この手順を自動的に処理する create_auto_cdc_flowとは異なり、スキーマを指定する必要があります。
  • 変更データ フィードをストリームとして読み取ります。 各マイクロバッチは、upsertToDelta コマンドを実行する MERGE INTO メソッドを使用して処理されます。
catalog = "jobs"
schema = "myschema"
employees_cdf_table = "employees_cdf"
employees_table = "employees_merge"

def upsertToDelta(microBatchDF, batchId):
 microBatchDF.createOrReplaceTempView("updates")
 microBatchDF.sparkSession.sql(f"""
   MERGE INTO {catalog}.{schema}.{employees_table} t
   USING updates s
   ON s.id = t.id
   WHEN MATCHED THEN UPDATE SET *
   WHEN NOT MATCHED THEN INSERT *
 """)

def create_table():
 spark.sql(f"DROP TABLE IF EXISTS {catalog}.{schema}.{employees_table}")
  spark.sql(f"""
   CREATE TABLE IF NOT EXISTS {catalog}.{schema}.{employees_table}
   (id INT, name STRING, age INT, country STRING)
 """)

create_table()

cdcData = spark.readStream.table(f"{catalog}.{schema}.{employees_cdf_table}")

cdcData.writeStream \
.foreachBatch(upsertToDelta) \
.outputMode("append") \
.start()

結果を表示するには、次の SQL クエリを実行します。

SELECT *
FROM mycatalog.myschema.employees_merge

残念ながら、次に示すように、結果は正しくありません。

データ キャプチャ MERGE INTO 例を変更します。

同じマイクロバッチ内の同じキーに対する複数の更新

最初の問題は、コードが同じマイクロバッチ内の同じキーに対する複数の更新を処理しないという点です。 たとえば、 INSERT を使用して従業員 Chris を挿入し、そのロールを所有者からマネージャーに更新します。 これにより 1 行になりますが、代わりに 2 つの行が存在します。

マイクロバッチ内に同じキーに対して複数の更新がある場合、どの変更が優先されますか?

変更データキャプチャにより、同じマイクロバッチの例で、同じキーに対する複数の更新を記録します。

ロジックがより複雑になります。 次のコード例では、 sequenceNum によって最新の行を取得し、そのデータのみをターゲット テーブルにマージします。

  • id、主キー別にグループ化します。
  • そのキーのバッチ内の最大 sequenceNum を持つ行のすべての列を取得します。
  • 行を元の状態に戻します。

次に示すように upsertToDelta メソッドを更新し、コードを実行します。

def upsertToDelta(microBatchDF, batchId):
 microBatchDF = microBatchDF.groupBy("id").agg(
   max_by(struct("*"), "sequenceNum").alias("row")
 ).select("row.*").createOrReplaceTempView("updates")

 spark.sql(f"""
   MERGE INTO {catalog}.{schema}.{employees_table} t
   USING updates s
   ON s.id = t.id
   WHEN MATCHED THEN UPDATE SET *
   WHEN NOT MATCHED THEN INSERT *
 """)

ターゲット テーブルに対してクエリを実行すると、Chris という名前の従業員が正しいロールを持っていることがわかりますが、ターゲット テーブルに表示されるレコードが削除されているため、解決する必要がある他の問題がまだ残っています。

同じマイクロバッチ内で同じキーの複数更新に対する変更データキャプチャの例の結果。

マイクロバッチ間で順不同に行われる更新

このセクションでは、マイクロバッチ全体の順序が異なる更新の問題について説明します。 次の図は、問題を示しています。Chris の行が最初のマイクロバッチで UPDATE 操作を行い、その後のマイクロバッチに INSERT が続いた場合はどうでしょうか。 コードはこれを正しく処理しません。

複数のマイクロバッチ間で同じキーに対して順序が異なる更新がある場合、どの変更が優先されますか?

マイクロバッチ間でデータ キャプチャの順序が異なる更新を変更する例。

これを解決するには、次のようにコードを展開して各行にバージョンを格納します。

  • 行が最後に更新されたときの sequenceNum を格納します。
  • 新しい行ごとに、タイムスタンプが格納されている行より大きいかどうかを確認し、次のロジックを適用します。
    • 大きい場合は、ターゲットの新しいデータを使用します。
    • それ以外の場合は、データをソースに保持します。

まず、 createTable メソッドを更新して sequenceNum を格納します。これは、各行のバージョン管理に使用するためです。

def create_table():
 spark.sql(f"DROP TABLE IF EXISTS {catalog}.{schema}.{employees_table}")
  spark.sql(f"""
   CREATE TABLE IF NOT EXISTS {catalog}.{schema}.{employees_table}
   (id INT, name STRING, age INT, country STRING, sequenceNum INT)
 """)

次に、行のバージョンを処理するように upsertToDelta を更新します。 UPDATE SETMERGE INTO句では、すべての列を個別に処理する必要があります。

def upsertToDelta(microBatchDF, batchId):
 microBatchDF = microBatchDF.groupBy("id").agg(
   max_by(struct("*"), "sequenceNum").alias("row")
 ).select("row.*").createOrReplaceTempView("updates")

 spark.sql(f"""
   MERGE INTO {catalog}.{schema}.{employees_table} t
   USING updates s
   ON s.id = t.id
   WHEN MATCHED THEN UPDATE SET
     name=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.name ELSE t.name END,
     age=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.age ELSE t.age END,
     country=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.country ELSE t.country END
   WHEN NOT MATCHED THEN INSERT *
 """)

削除の処理

残念ながら、コードにはまだ問題があります。 従業員 Pat がまだターゲット テーブルに含まれているという事実によって証明されているように、 DELETE 操作は処理されません。

削除が同じマイクロバッチに到着するとします。 それらを処理するには、変更データ レコードが次のように削除を示す場合に、 upsertToDelta メソッドをもう一度更新して行を削除します。

def upsertToDelta(microBatchDF, batchId):
 microBatchDF = microBatchDF.groupBy("id").agg(
   max_by(struct("*"), "sequenceNum").alias("row")
 ).select("row.*").createOrReplaceTempView("updates")

 spark.sql(f"""
   MERGE INTO {catalog}.{schema}.{employees_table} t
   USING updates s
   ON s.id = t.id
   WHEN MATCHED AND s.operation = 'DELETE' THEN DELETE
   WHEN MATCHED THEN UPDATE SET
     name=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.name ELSE t.name END,
     age=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.age ELSE t.age END,
     country=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.country ELSE t.country END
   WHEN NOT MATCHED THEN INSERT *
 """)

削除後に順不同に到着する更新プログラムの処理

残念ながら、上記のコードは、 DELETE の後にマイクロバッチ間で順不同の UPDATE が続く場合を処理しないため、まだ正しくありません。

削除後に順不同に到着する変更データ キャプチャ処理の更新の例。

このケースを処理するアルゴリズムでは、後続の順序外の更新を処理できるように、削除を記憶する必要があります。 これを行うには:

  • 行をすぐに削除する代わりに、タイムスタンプまたは sequenceNumで論理的に削除します。 論理的に削除された行は 廃棄されます
  • すべてのユーザーを、廃棄石を除外するビューにリダイレクトします。
  • 時間の経過と同時に廃棄石を削除するクリーンアップ ジョブを作成します。

次のコードを使用します。

def upsertToDelta(microBatchDF, batchId):
 microBatchDF = microBatchDF.groupBy("id").agg(
   max_by(struct("*"), "sequenceNum").alias("row")
 ).select("row.*").createOrReplaceTempView("updates")

 spark.sql(f"""
   MERGE INTO {catalog}.{schema}.{employees_table} t
   USING updates s
   ON s.id = t.id
   WHEN MATCHED AND s.operation = 'DELETE' THEN UPDATE SET DELETED_AT=now()
   WHEN MATCHED THEN UPDATE SET
     name=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.name ELSE t.name END,
     age=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.age ELSE t.age END,
     country=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.country ELSE t.country END
   WHEN NOT MATCHED THEN INSERT *
 """)

ユーザーはターゲット テーブルを直接使用できないため、クエリを実行できるビューを作成します。

CREATE VIEW employees_v AS
SELECT * FROM employees_merge
WHERE DELETED_AT = NULL

最後に、廃棄された行を定期的に削除するクリーンアップ ジョブを作成します。

DELETE FROM employees_merge
WHERE DELETED_AT < now() - INTERVAL 1 DAY