次の方法で共有


チュートリアル: 変更データ キャプチャを使用して ETL パイプラインを構築する

データ オーケストレーションと自動ローダーに Lakeflow Spark 宣言パイプライン (SDP) を使用して、変更データ キャプチャ (CDC) を使用して ETL (抽出、変換、読み込み) パイプラインを作成してデプロイする方法について説明します。 ETL パイプラインは、ソース システムからデータを読み取り、データ品質チェックや重複除去の記録などの要件に基づいてデータを変換し、データ ウェアハウスやデータ レイクなどのターゲット システムにデータを書き込む手順を実装します。

このチュートリアルでは、MySQL データベースの customers テーブルのデータを使用して、次の操作を行います。

  • Debezium または別のツールを使用してトランザクション データベースから変更を抽出し、クラウド オブジェクト ストレージ (S3、ADLS、または GCS) に保存します。 このチュートリアルでは、外部 CDC システムの設定をスキップし、代わりに偽のデータを生成してチュートリアルを簡略化します。
  • 自動ローダーを使用して、クラウド オブジェクト ストレージからメッセージを増分読み込みし、生メッセージを customers_cdc テーブルに格納します。 自動ローダーはスキーマを推論し、スキーマの進化を処理します。
  • 期待値を使用してデータ品質を確認する customers_cdc_clean テーブルを作成します。 たとえば、アップサート操作の実行に使用されるため、 idnull しないでください。
  • クリーンアップされた CDC データに対して AUTO CDC ... INTO を実行して、最終的な customers テーブルに変更をアップサートします。
  • パイプラインでタイプ 2 の緩やかに変化するディメンション (SCD2) テーブルを作成して、すべての変更を追跡する方法を示します。

目標は、生データをほぼリアルタイムで取り込み、データ品質を確保しながらアナリスト チームのテーブルを構築することです。

このチュートリアルでは medallion Lakehouse アーキテクチャを使用します。このアーキテクチャでは、ブロンズ レイヤーを介して生データを取り込み、シルバー レイヤーでデータをクリーンアップおよび検証し、ゴールド レイヤーを使用してディメンション モデリングと集計を適用します。 詳細については、「 medallion lakehouse のアーキテクチャとは」 を参照してください。

実装されたフローは次のようになります。

CDC を使用したパイプライン

パイプライン、自動ローダー、および CDC の詳細については、「Lakeflow Spark 宣言パイプライン」、自動ローダーとは、および変更データ キャプチャ (CDC) とは

Requirements

このチュートリアルを完了するには、次の要件を満たす必要があります。

ETL パイプラインでのデータ キャプチャの変更

変更データ キャプチャ (CDC) は、トランザクション データベース (MySQL や PostgreSQL など) またはデータ ウェアハウスに対して行われたレコードの変更をキャプチャするプロセスです。 CDC は、データの削除、追加、更新などの操作をキャプチャします。通常は、外部システムのテーブルを再具体化するためのストリームとしてキャプチャされます。 CDC を使用すると、増分読み込み機能が実現し、一括読み込みの更新が不要になります。

このチュートリアルを簡略化するには、外部 CDC システムの設定をスキップしてください。 CDC データが実行され、クラウド オブジェクト ストレージ (S3、ADLS、または GCS) に JSON ファイルとして保存されているとします。 このチュートリアルでは、 Faker ライブラリを使用して、チュートリアルで使用されるデータを生成します。

CDCをキャプチャする

さまざまな CDC ツールを使用できます。 主要なオープン ソース ソリューションの 1 つが Debezium ですが、Fivetran、Qlik Replicate、StreamSets、Talend、Oracle GoldenGate、AWS DMS など、データ ソースを簡略化する他の実装が存在します。

このチュートリアルでは、Debezium や DMS などの外部システムの CDC データを使用します。 Debezium は、変更されたすべての行をキャプチャします。 通常、データ変更の履歴を Kafka トピックに送信するか、ファイルとして保存します。

CDC 情報を customers テーブル (JSON 形式) から取り込み、それが正しいことを確認してから、Lakehouse で顧客テーブルを具体化する必要があります。

Debezium からの CDC 入力

変更ごとに、更新される行のすべてのフィールド (idfirstnamelastnameemailaddress) を含む JSON メッセージが表示されます。 メッセージには、追加のメタデータも含まれています。

  • operation: 通常、操作コード (DELETEAPPENDUPDATE)。
  • operation_date: 各操作アクションのレコードの日付とタイムスタンプ。

Debezium などのツールでは、変更前の行の値など、より高度な出力を生成できますが、このチュートリアルではわかりやすくするために省略します。

手順 1: パイプラインを作成する

CDC データ ソースに対してクエリを実行し、ワークスペースにテーブルを生成する新しい ETL パイプラインを作成します。

  1. ワークスペースで、[プラス] アイコンをクリックします。左上隅の新機能。

  2. [ ETL パイプライン] をクリックします。

  3. パイプラインのタイトルを Pipelines with CDC tutorial または希望する名前に変更します。

  4. タイトルの下で、書き込みアクセス許可があるカタログとスキーマを選択します。

    コードでカタログまたはスキーマを指定しない場合、このカタログとスキーマは既定で使用されます。 コードでは、完全なパスを指定することで、任意のカタログまたはスキーマに書き込むことができます。 このチュートリアルでは、ここで指定する既定値を使用します。

  5. [詳細設定] オプションで、[空のファイルで開始] を選択します。

  6. コードのフォルダーを選択します。 [ 参照 ] を選択すると、ワークスペース内のフォルダーの一覧を参照できます。 書き込みアクセス許可を持つ任意のフォルダーを選択できます。

    バージョン管理を使用するには、Git フォルダーを選択します。 新しいフォルダーを作成する必要がある場合は、[ プラス] アイコン を選択します。

  7. チュートリアルで使用する言語に基づいて、ファイルの言語として Python または SQL を選択します。

  8. [ 選択 ] をクリックしてこれらの設定でパイプラインを作成し、Lakeflow パイプライン エディターを開きます。

これで、既定のカタログとスキーマを含む空のパイプラインが作成されました。 次に、チュートリアルでインポートするサンプル データを設定します。

手順 2: このチュートリアルでインポートするサンプル データを作成する

既存のソースから独自のデータをインポートする場合、この手順は必要ありません。 このチュートリアルでは、チュートリアルの例として偽のデータを生成します。 Python データ生成スクリプトを実行するノートブックを作成します。 このコードはサンプル データを生成するために 1 回だけ実行する必要があるため、パイプラインの更新の一部として実行されないパイプラインの explorations フォルダー内に作成します。

このコードでは 、Faker を使用してサンプル CDC データを生成します。 Faker は自動的にインストールできるため、チュートリアルでは %pip install fakerを使用します。 ノートブックにFakerライブラリへの依存関係を設定することもできます。 「 ノートブックへの依存関係の追加」を参照してください。

  1. Lakeflow Pipelines エディター内から、アセット ブラウザーのサイドバーのエディターの左側にある [プラス] アイコンをクリック します。[追加]、[ 探索] の順に選択します。

  2. を付けて、[Python] を選択Setup data。 既定の移動先フォルダー (新しい explorations フォルダー) のままにすることができます。

  3. Create をクリックしてください。 これにより、新しいフォルダーにノートブックが作成されます。

  4. 最初のセルに次のコードを入力します。 前の手順で選択した既定のカタログとスキーマと一致するように、 <my_catalog><my_schema> の定義を変更する必要があります。

    %pip install faker
    # Update these to match the catalog and schema
    # that you used for the pipeline in step 1.
    catalog = "<my_catalog>"
    schema = dbName = db = "<my_schema>"
    
    spark.sql(f'USE CATALOG `{catalog}`')
    spark.sql(f'USE SCHEMA `{schema}`')
    spark.sql(f'CREATE VOLUME IF NOT EXISTS `{catalog}`.`{db}`.`raw_data`')
    volume_folder =  f"/Volumes/{catalog}/{db}/raw_data"
    
    try:
      dbutils.fs.ls(volume_folder+"/customers")
    except:
      print(f"folder doesn't exist, generating the data under {volume_folder}...")
      from pyspark.sql import functions as F
      from faker import Faker
      from collections import OrderedDict
      import uuid
      fake = Faker()
      import random
    
      fake_firstname = F.udf(fake.first_name)
      fake_lastname = F.udf(fake.last_name)
      fake_email = F.udf(fake.ascii_company_email)
      fake_date = F.udf(lambda:fake.date_time_this_month().strftime("%m-%d-%Y %H:%M:%S"))
      fake_address = F.udf(fake.address)
      operations = OrderedDict([("APPEND", 0.5),("DELETE", 0.1),("UPDATE", 0.3),(None, 0.01)])
      fake_operation = F.udf(lambda:fake.random_elements(elements=operations, length=1)[0])
      fake_id = F.udf(lambda: str(uuid.uuid4()) if random.uniform(0, 1) < 0.98 else None)
    
      df = spark.range(0, 100000).repartition(100)
      df = df.withColumn("id", fake_id())
      df = df.withColumn("firstname", fake_firstname())
      df = df.withColumn("lastname", fake_lastname())
      df = df.withColumn("email", fake_email())
      df = df.withColumn("address", fake_address())
      df = df.withColumn("operation", fake_operation())
      df_customers = df.withColumn("operation_date", fake_date())
      df_customers.repartition(100).write.format("json").mode("overwrite").save(volume_folder+"/customers")
    
  5. チュートリアルで使用するデータセットを生成するには、「 Shift + Enter 」と入力してコードを実行します。

  6. Optional. このチュートリアルで使用するデータをプレビューするには、次のセルに次のコードを入力し、コードを実行します。 前のコードのパスと一致するようにカタログとスキーマを更新します。

    # Update these to match the catalog and schema
    # that you used for the pipeline in step 1.
    catalog = "<my_catalog>"
    schema = "<my_schema>"
    
    display(spark.read.json(f"/Volumes/{catalog}/{schema}/raw_data/customers"))
    

これにより、チュートリアルの残りの部分で使用できる大規模なデータセット (偽の CDC データを含む) が生成されます。 次の手順では、自動ローダーを使用してデータを取り込みます。

手順 3: 自動ローダーを使用してデータを増分的に取り込む

次の手順では、(偽の) クラウド ストレージからブロンズ レイヤーに生データを取り込みます。

これはいくつかの理由で困難になる可能性があり、その理由の一部は以下の通りです:

  • 大規模に動作し、何百万もの小さなファイルを取り込む可能性があります。
  • スキーマと JSON 型を推論します。
  • 不適切な JSON スキーマを使用して不適切なレコードを処理します。
  • スキーマの進化 (たとえば、顧客テーブルの新しい列) に注意してください。

自動ローダーを使用すると、スキーマの推論やスキーマの進化など、このインジェストが簡素化され、数百万もの受信ファイルにスケーリングされます。 自動ローダーは、 cloudFiles を使用する Python と、 SELECT * FROM STREAM read_files(...) を使用した SQL で使用でき、さまざまな形式 (JSON、CSV、Apache Avro など) で使用できます。

テーブルをストリーミング テーブルとして定義すると、新しい受信データのみを使用できます。 ストリーミング テーブルとして定義しない場合は、使用可能なすべてのデータをスキャンして取り込みます。 詳細については、 ストリーミング テーブルを 参照してください。

  1. 自動ローダーを使用して受信 CDC データを取り込むには、パイプラインで作成されたコード ファイル ( my_transformation.py と呼ばれます) に次のコードをコピーして貼り付けます。 パイプラインの作成時に選択した言語に基づいて、Python または SQL を使用できます。 <catalog><schema>は、パイプラインの既定値に設定した値に置き換えてください。

    Python

    from pyspark import pipelines as dp
    from pyspark.sql.functions import *
    
    # Replace with the catalog and schema name that
    # you are using:
    path = "/Volumes/<catalog>/<schema>/raw_data/customers"
    
    
    # Create the target bronze table
    dp.create_streaming_table("customers_cdc_bronze", comment="New customer data incrementally ingested from cloud object storage landing zone")
    
    # Create an Append Flow to ingest the raw data into the bronze table
    @dp.append_flow(
      target = "customers_cdc_bronze",
      name = "customers_bronze_ingest_flow"
    )
    def customers_bronze_ingest_flow():
      return (
          spark.readStream
              .format("cloudFiles")
              .option("cloudFiles.format", "json")
              .option("cloudFiles.inferColumnTypes", "true")
              .load(f"{path}")
      )
    

    SQL

    CREATE OR REFRESH STREAMING TABLE customers_cdc_bronze
    COMMENT "New customer data incrementally ingested from cloud object storage landing zone";
    
    CREATE FLOW customers_bronze_ingest_flow AS
    INSERT INTO customers_cdc_bronze BY NAME
      SELECT *
      FROM STREAM read_files(
        -- replace with the catalog/schema you are using:
        "/Volumes/<catalog>/<schema>/raw_data/customers",
        format => "json",
        inferColumnTypes => "true"
      )
    
  2. [ 再生] アイコンをクリックします。ファイル または パイプラインの実行を実行 して、接続されたパイプラインの更新を開始します。 パイプライン内のソース ファイルが 1 つだけの場合、これらは機能的に同等です。

更新が完了すると、エディターがパイプラインに関する情報で更新されます。

  • コードの右側のサイドバーにあるパイプライン グラフ (DAG) には、1 つのテーブル ( customers_cdc_bronze) が表示されます。
  • 更新プログラムの概要は、パイプライン資産ブラウザーの上部に表示されます。
  • 生成されたテーブルの詳細が下部ペインに表示され、テーブルからデータを参照するには、テーブルを選択します。

これは、クラウド ストレージからインポートされた生のブロンズ レイヤー データです。 次の手順では、データをクリーンアップしてシルバー レイヤー テーブルを作成します。

手順 4: データ品質を追跡するためのクリーンアップと期待

ブロンズ レイヤーが定義されたら、データ品質を制御するための期待値を追加して、シルバー レイヤーを作成します。 次の条件を確認します。

  • ID を nullしないでください。
  • CDC 操作タイプは有効でなければなりません。
  • JSON は、自動ローダーによって正しく読み取られる必要があります。

これらの条件を満たしていない行は削除されます。

詳細については、 パイプラインの期待に応えたデータ品質の管理に関する ページを参照してください。

  1. パイプライン資産ブラウザーのサイドバーで、[プラス] アイコンをクリックします。その後、[追加] を選択し、[変換] を選択します。

  2. 名前を入力し、ソース コード ファイルの言語 (Python または SQL) を選択します。 パイプライン内で言語を混在させたり一致させたりできるので、この手順でいずれかを選択できます。

  3. クレンジングされたテーブルを含むシルバー レイヤーを作成し、制約を適用するには、次のコードをコピーして新しいファイルに貼り付けます (ファイルの言語に基づいて Python または SQL を選択します)。

    Python

    from pyspark import pipelines as dp
    from pyspark.sql.functions import *
    
    dp.create_streaming_table(
      name = "customers_cdc_clean",
      expect_all_or_drop = {"no_rescued_data": "_rescued_data IS NULL","valid_id": "id IS NOT NULL","valid_operation": "operation IN ('APPEND', 'DELETE', 'UPDATE')"}
      )
    
    @dp.append_flow(
      target = "customers_cdc_clean",
      name = "customers_cdc_clean_flow"
    )
    def customers_cdc_clean_flow():
      return (
          spark.readStream.table("customers_cdc_bronze")
              .select("address", "email", "id", "firstname", "lastname", "operation", "operation_date", "_rescued_data")
      )
    

    SQL

    CREATE OR REFRESH STREAMING TABLE customers_cdc_clean (
      CONSTRAINT no_rescued_data EXPECT (_rescued_data IS NULL) ON VIOLATION DROP ROW,
      CONSTRAINT valid_id EXPECT (id IS NOT NULL) ON VIOLATION DROP ROW,
      CONSTRAINT valid_operation EXPECT (operation IN ('APPEND', 'DELETE', 'UPDATE')) ON VIOLATION DROP ROW
    )
    COMMENT "New customer data incrementally ingested from cloud object storage landing zone";
    
    CREATE FLOW customers_cdc_clean_flow AS
    INSERT INTO customers_cdc_clean BY NAME
    SELECT * FROM STREAM customers_cdc_bronze;
    
  4. [ 再生] アイコンをクリックします。ファイル または パイプラインの実行を実行 して、接続されたパイプラインの更新を開始します。

    2 つのソース ファイルが存在するため、これらは同じことを行いませんが、この場合、出力は同じです。

    • パイプラインを実行 すると、手順 3 のコードを含め、パイプライン全体が実行されます。 入力データが更新されている場合は、そのソースからの変更がブロンズレイヤーに取り込まれます。 これは、パイプラインのソースの一部ではなく、探索フォルダー内にあるため、データ セットアップ ステップからコードを実行しません。
    • ファイルの実行 では、現在のソース ファイルのみが実行されます。 この場合、入力データが更新されずに、キャッシュされたブロンズ テーブルからシルバー データが生成されます。 パイプライン コードを作成または編集するときに、このファイルだけを実行してイテレーションを高速化すると便利です。

更新が完了すると、パイプライン グラフに 2 つのテーブル (ブロンズ レイヤーに応じてシルバー レイヤーを含む) が表示され、下部のパネルに両方のテーブルの詳細が表示されていることがわかります。 パイプライン資産ブラウザーの上部に複数の実行時間が表示されるようになりましたが、最新の実行の詳細のみが表示されます。

次に、 customers テーブルの最終的なゴールド レイヤー バージョンを作成します。

手順 5: AUTO CDC フローを使用して顧客テーブルを具体化する

ここまで、テーブルは各ステップで CDC データを渡したばかりです。 次に、customers テーブルを作成して、最新のビューを含み、それを作成した CDC 操作の一覧ではなく、元のテーブルのレプリカにします。

これは、手動で実装するのは容易ではありません。 最新の行を保持するには、データ重複除去などを考慮する必要があります。

ただし、Lakeflow Spark 宣言パイプラインは、 AUTO CDC 操作でこれらの課題を解決します。

  1. パイプライン資産ブラウザーのサイドバーで、[プラス] アイコンをクリック します。追加変換

  2. 名前を入力し、新しいソース コード ファイルの言語 (Python または SQL) を選択します。 この手順ではもう一度いずれかの言語を選択できますが、次の正しいコードを使用してください。

  3. Lakeflow Spark 宣言パイプラインで AUTO CDC を使用して CDC データを処理するには、次のコードをコピーして新しいファイルに貼り付けます。

    Python

    from pyspark import pipelines as dp
    from pyspark.sql.functions import *
    
    dp.create_streaming_table(name="customers", comment="Clean, materialized customers")
    
    dp.create_auto_cdc_flow(
      target="customers",  # The customer table being materialized
      source="customers_cdc_clean",  # the incoming CDC
      keys=["id"],  # what we'll be using to match the rows to upsert
      sequence_by=col("operation_date"),  # de-duplicate by operation date, getting the most recent value
      ignore_null_updates=False,
      apply_as_deletes=expr("operation = 'DELETE'"),  # DELETE condition
      except_column_list=["operation", "operation_date", "_rescued_data"],
    )
    

    SQL

    CREATE OR REFRESH STREAMING TABLE customers;
    
    CREATE FLOW customers_cdc_flow
    AS AUTO CDC INTO customers
    FROM stream(customers_cdc_clean)
    KEYS (id)
    APPLY AS DELETE WHEN
    operation = "DELETE"
    SEQUENCE BY operation_date
    COLUMNS * EXCEPT (operation, operation_date, _rescued_data)
    STORED AS SCD TYPE 1;
    
  4. [ 再生] アイコンをクリックします。ファイルを実行 して、接続されたパイプラインの更新を開始します。

更新が完了すると、パイプライン グラフに 3 つのテーブルが表示され、ブロンズからシルバーからゴールドに進んでいることがわかります。

手順 6: 緩やかに変化するディメンション タイプ 2 (SCD2) で更新履歴を追跡する

多くの場合、 APPENDUPDATE、および DELETEに起因するすべての変更を追跡するテーブルを作成する必要があります。

  • 履歴: テーブルに対するすべての変更の履歴を保持する必要があります。
  • 追跡可能性: 発生した操作を確認します。

SCD2 と Lakeflow SDP

Delta では変更データ フロー (CDF) がサポートされており、 table_change は SQL と Python でテーブルの変更を照会できます。 ただし、CDF の主なユース ケースは、最初からテーブルの変更の完全なビューを作成するのではなく、パイプライン内の変更をキャプチャすることです。

順序が整っていないイベントがある場合は、実装が特に複雑になります。 変更をタイムスタンプで並べ替え、過去に発生した変更を受け取る必要がある場合は、SCD テーブルに新しいエントリを追加し、前のエントリを更新する必要があります。

Lakeflow SDP では、この複雑さが排除され、最初からのすべての変更を含む個別のテーブルを作成できます。 このテーブルは、必要に応じて、特定のパーティションまたは ZORDER 列と共に大規模に使用できます。 順序が異なっているフィールドは、 _sequence_byに基づいてすぐに処理されます。

SCD2 テーブルを作成するには、SQL または Python でSTORED AS SCD TYPE 2stored_as_scd_type="2"オプションを使用します。

次のオプションを使用して、フィーチャが追跡する列を制限することもできます。 TRACK HISTORY ON {columnList | EXCEPT(exceptColumnList)}

  1. パイプライン資産ブラウザーのサイドバーで、[プラス] アイコンをクリック します。追加変換

  2. 名前を入力し、新しいソース コード ファイルの言語 (Python または SQL) を選択します。

  3. 次のコードをコピーして新しいファイルに貼り付けます。

    Python

    from pyspark import pipelines as dp
    from pyspark.sql.functions import *
    
    # create the table
    dp.create_streaming_table(
        name="customers_history", comment="Slowly Changing Dimension Type 2 for customers"
    )
    
    # store all changes as SCD2
    dp.create_auto_cdc_flow(
        target="customers_history",
        source="customers_cdc_clean",
        keys=["id"],
        sequence_by=col("operation_date"),
        ignore_null_updates=False,
        apply_as_deletes=expr("operation = 'DELETE'"),
        except_column_list=["operation", "operation_date", "_rescued_data"],
        stored_as_scd_type="2",
    )  # Enable SCD2 and store individual updates
    

    SQL

    CREATE OR REFRESH STREAMING TABLE customers_history;
    
    CREATE FLOW customers_history_cdc
    AS AUTO CDC INTO
      customers_history
    FROM stream(customers_cdc_clean)
    KEYS (id)
    APPLY AS DELETE WHEN
    operation = "DELETE"
    SEQUENCE BY operation_date
    COLUMNS * EXCEPT (operation, operation_date, _rescued_data)
    STORED AS SCD TYPE 2;
    
  4. [ 再生] アイコンをクリックします。ファイルを実行 して、接続されたパイプラインの更新を開始します。

更新が完了すると、パイプライン グラフには新しい customers_history テーブルが含まれます。これはシルバー レイヤー テーブルにも依存し、下部のパネルには 4 つのテーブルすべてに関する詳細が表示されます。

手順 7: 情報を最も変更したユーザーを追跡する具体化されたビューを作成する

テーブル customers_history には、ユーザーが自分の情報に加えたすべての履歴変更が含まれています。 情報を最も変更したユーザーを追跡するシンプルな具体化されたビューをゴールド レイヤーに作成します。 これは、実際のシナリオで不正行為の検出分析やユーザーの推奨事項に使用できます。 さらに、SCD2 で変更を適用すると重複が既に削除されているため、ユーザー ID ごとに行を直接カウントできます。

  1. パイプライン資産ブラウザーのサイドバーで、[プラス] アイコンをクリック します。追加変換

  2. 名前を入力し、新しいソース コード ファイルの言語 (Python または SQL) を選択します。

  3. 次のコードをコピーして、新しいソース ファイルに貼り付けます。

    Python

    from pyspark import pipelines as dp
    from pyspark.sql.functions import *
    
    @dp.table(
      name = "customers_history_agg",
      comment = "Aggregated customer history"
    )
    def customers_history_agg():
      return (
        spark.read.table("customers_history")
          .groupBy("id")
          .agg(
              count("address").alias("address_count"),
              count("email").alias("email_count"),
              count("firstname").alias("firstname_count"),
              count("lastname").alias("lastname_count")
          )
      )
    

    SQL

    CREATE OR REPLACE MATERIALIZED VIEW customers_history_agg AS
    SELECT
      id,
      count("address") as address_count,
      count("email") AS email_count,
      count("firstname") AS firstname_count,
      count("lastname") AS lastname_count
    FROM customers_history
    GROUP BY id
    
  4. [ 再生] アイコンをクリックします。ファイルを実行 して、接続されたパイプラインの更新を開始します。

更新が完了すると、 customers_history テーブルに依存する新しいテーブルがパイプライン グラフに表示され、下部のパネルで表示できます。 これでパイプラインが完了しました。 これをテストするには、完全な 実行パイプラインを実行します。 残っている唯一の手順は、定期的に更新するようにパイプラインをスケジュールすることです。

手順 8: ETL パイプラインを実行するジョブを作成する

次に、Databricks ジョブを使用して、パイプラインのデータ インジェスト、処理、および分析の手順を自動化するワークフローを作成します。

  1. エディターの上部にある [ スケジュール ] ボタンを選択します。
  2. [スケジュール] ダイアログが表示されたら、[スケジュールの追加] を選択します。
  3. [ 新しいスケジュール ] ダイアログが開き、スケジュールに従ってパイプラインを実行するジョブを作成できます。
  4. 必要に応じて、ジョブに名前を付けます。
  5. 既定では、スケジュールは 1 日に 1 回実行するように設定されています。 この既定値をそのまま使用することも、独自のスケジュールを設定することもできます。 [詳細設定] を選択すると、ジョブを実行する特定の時刻を設定できます。 [ その他] オプション を選択すると、ジョブの実行時に通知を作成できます。
  6. [ 作成] を選択して変更を適用し、ジョブを作成します。

これで、ジョブは毎日実行され、パイプラインが常に最新の状態に保たれるでしょう。 もう一度 [スケジュール] を選択すると、スケジュールの一覧を表示できます。 パイプラインのスケジュールは、スケジュールの追加、編集、削除など、そのダイアログから管理できます。

スケジュール (またはジョブ) の名前をクリックすると、[ジョブ とパイプライン ] の一覧のジョブのページに移動します。 そこから、実行履歴など、ジョブの実行に関する詳細を表示したり、[ 今すぐ 実行] ボタンを使用してジョブをすぐに実行したりできます。

ジョブの実行の詳細については、Lakeflow ジョブの監視と可観測性を参照してください。

その他のリソース