次の方法で共有


dbt を使用したデータの変換

Apache Airflow ジョブは、Apache Airflow を使用しています。

dbt (データ ビルド ツール) は、データ ウェアハウス内のデータの変換とモデル化に役立つオープンソースのコマンド ライン インターフェイス (CLI) です。 複雑な SQL コードを構造化された保守可能な方法で管理するため、データ チームは分析パイプラインに対して信頼性の高いテスト可能な変換を作成できます。

dbt と Apache エアフローを組み合わせると、両方の長所が得られます。 dbt は変換を処理し、エアフローはスケジューリング、オーケストレーション、タスク管理を管理します。 このアプローチにより、効率的で堅牢なパイプラインが作成され、より迅速で洞察に富んだデータドリブンの意思決定が行われます。

このチュートリアルでは、dbt を使用して Microsoft Fabric Data Warehouse に格納されているデータを変換する Apache エアフロー DAG を作成する方法について説明します。

前提条件

開始する前に、次の前提条件を満たす必要があります。

dbt を使用して Fabric ウェアハウス内のデータを変換する

dbt 変換を設定するには、次の手順に従います。

  1. 要件を指定する
  2. Apache エアフロー ジョブによって提供される Fabric マネージド ストレージに dbt プロジェクトを作成する
  3. オーケストレーション用の Apache エアフロー DAG を作成する

要件を指定する

  1. requirements.txt フォルダーに dags という名前のファイルを作成します。

  2. Apache エアフローの要件として、次のパッケージを追加します。

    • astronomer-cosmos: このパッケージは、Dbt コア プロジェクトを Apache Airflow dags およびタスク グループとして実行します。
    • dbt-fabric: このパッケージは、 Fabric Data Warehouse にデプロイできる dbt プロジェクトを作成します。
    astronomer-cosmos==1.10.1
    dbt-fabric==1.9.5   
    

Apache エアフロー ジョブによって提供される Fabric マネージド ストレージに dbt プロジェクトを作成する

  1. 次のディレクトリ構造で nyc_taxi_green データセットの Apache エアフロー ジョブにサンプル dbt プロジェクトを作成します。

       dags
       |-- my_cosmos_dag.py
       |-- nyc_taxi_green
       |  |-- profiles.yml
       |  |-- dbt_project.yml
       |  |-- models
       |  |   |-- nyc_trip_count.sql
       |  |-- target
    
  2. nyc_taxi_green ファイルを使用して、dags フォルダーに profiles.yml という名前のフォルダーを作成します。 このフォルダーには、dbt プロジェクトに必要なすべてのファイルが含まれています。 dbt プロジェクトのファイルを作成するスクリーンショット。

  3. 次の内容を profiles.yml ファイルにコピーします。 この構成ファイルには、dbt が使用するデータベース接続の詳細とプロファイルが含まれています。 プレースホルダーの値を更新し、ファイルを保存します。

    config:
      partial_parse: true
    nyc_taxi_green:
      target: fabric-dev
      outputs:
        fabric-dev:
          type: fabric
          driver: "ODBC Driver 18 for SQL Server"
          server: <sql connection string of your data warehouse>
          port: 1433
          database: "<name of the database>"
          schema: dbo
          threads: 4
          authentication: ServicePrincipal
          tenant_id: <Tenant ID of your service principal>
          client_id: <Client ID of your service principal>
          client_secret: <Client Secret of your service principal>
    
  4. dbt_project.yml ファイルを作成し、次の内容をコピーします。 このファイルは、プロジェクト レベルの構成を指定します。

    name: "nyc_taxi_green"
    
    config-version: 2
    version: "0.1"
    
    profile: "nyc_taxi_green"
    
    model-paths: ["models"]
    seed-paths: ["seeds"]
    test-paths: ["tests"]
    analysis-paths: ["analysis"]
    macro-paths: ["macros"]
    
    target-path: "target"
    clean-targets:
      - "target"
      - "dbt_modules"
      - "logs"
    
    require-dbt-version: [">=1.0.0", "<2.0.0"]
    
    models:
      nyc_taxi_green:
        materialized: table
    
  5. models フォルダーに、nyc_taxi_greenフォルダーを作成します。 このチュートリアルでは、ベンダーごとの 1 日あたりの乗車数を示すテーブルを作成する nyc_trip_count.sql という名前のファイルにサンプル モデルを作成します。 次の内容をファイルにコピーします。

       with new_york_taxis as (
           select * from nyctlc
       ),
       final as (
         SELECT
           vendorID,
           CAST(lpepPickupDatetime AS DATE) AS trip_date,
           COUNT(*) AS trip_count
         FROM
             [contoso-data-warehouse].[dbo].[nyctlc]
         GROUP BY
             vendorID,
             CAST(lpepPickupDatetime AS DATE)
         ORDER BY
             vendorID,
             trip_date;
       )
       select * from final
    

    dbt プロジェクトのモデルのスクリーンショット。

オーケストレーション用の Apache エアフロー DAG を作成する

  1. my_cosmos_dag.py フォルダーに dags という名前のファイルを作成し、次の内容を貼り付けます。

     import os
     from pathlib import Path
     from datetime import datetime
     from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig
     from airflow import DAG
    
     DEFAULT_DBT_ROOT_PATH = Path(__file__).parent.parent / "dags" / "nyc_taxi_green"
     DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH))
     profile_config = ProfileConfig(
         profile_name="nyc_taxi_green",
         target_name="fabric-dev",
         profiles_yml_filepath=DBT_ROOT_PATH / "profiles.yml",
     )
    
     dbt_fabric_dag = DbtDag(
         project_config=ProjectConfig(DBT_ROOT_PATH,),
         operator_args={"install_deps": True},
         profile_config=profile_config,
         schedule_interval="@daily",
         start_date=datetime(2023, 9, 10),
         catchup=False,
         dag_id="dbt_fabric_dag",
     )
    

DAG を実行する

  1. Apache Airflow ジョブ内で DAG を実行します。 dag の実行のスクリーンショット。

  2. Apache Airflow の UI で読み込まれた DAG を表示するには、[Apache エアフローで監視] を選択します。 dbt dag の監視のスクリーンショット。 dag の実行が成功したスクリーンショット。

  3. 正常に実行された後、Fabric データ ウェアハウスの "nyc_trip_count.sql" という名前の新しいテーブルを確認して、データを検証します。 成功した dbt dag のスクリーンショット。

クイック スタート: Apache Airflow ジョブを作成する