注
Apache Airflow ジョブは、Apache Airflow を使用しています。
dbt (データ ビルド ツール) は、データ ウェアハウス内のデータの変換とモデル化に役立つオープンソースのコマンド ライン インターフェイス (CLI) です。 複雑な SQL コードを構造化された保守可能な方法で管理するため、データ チームは分析パイプラインに対して信頼性の高いテスト可能な変換を作成できます。
dbt と Apache エアフローを組み合わせると、両方の長所が得られます。 dbt は変換を処理し、エアフローはスケジューリング、オーケストレーション、タスク管理を管理します。 このアプローチにより、効率的で堅牢なパイプラインが作成され、より迅速で洞察に富んだデータドリブンの意思決定が行われます。
このチュートリアルでは、dbt を使用して Microsoft Fabric Data Warehouse に格納されているデータを変換する Apache エアフロー DAG を作成する方法について説明します。
前提条件
開始する前に、次の前提条件を満たす必要があります。
サービス プリンシパルを作成します。 データ ウェアハウスを作成するワークスペースに、サービス プリンシパルを
Contributorとして追加します。用意していない場合は、Fabric Warehouse を作成します。 パイプラインを使用して、サンプル データをウェアハウスに取り込みます。 このチュートリアルでは、NYC Taxi-Green サンプルを使用します。
dbt を使用して Fabric ウェアハウス内のデータを変換する
dbt 変換を設定するには、次の手順に従います。
- 要件を指定する
- Apache エアフロー ジョブによって提供される Fabric マネージド ストレージに dbt プロジェクトを作成する
- オーケストレーション用の Apache エアフロー DAG を作成する
要件を指定する
requirements.txtフォルダーにdagsという名前のファイルを作成します。Apache エアフローの要件として、次のパッケージを追加します。
- astronomer-cosmos: このパッケージは、Dbt コア プロジェクトを Apache Airflow dags およびタスク グループとして実行します。
- dbt-fabric: このパッケージは、 Fabric Data Warehouse にデプロイできる dbt プロジェクトを作成します。
astronomer-cosmos==1.10.1 dbt-fabric==1.9.5
Apache エアフロー ジョブによって提供される Fabric マネージド ストレージに dbt プロジェクトを作成する
次のディレクトリ構造で
nyc_taxi_greenデータセットの Apache エアフロー ジョブにサンプル dbt プロジェクトを作成します。dags |-- my_cosmos_dag.py |-- nyc_taxi_green | |-- profiles.yml | |-- dbt_project.yml | |-- models | | |-- nyc_trip_count.sql | |-- targetnyc_taxi_greenファイルを使用して、dagsフォルダーにprofiles.ymlという名前のフォルダーを作成します。 このフォルダーには、dbt プロジェクトに必要なすべてのファイルが含まれています。
次の内容を
profiles.ymlファイルにコピーします。 この構成ファイルには、dbt が使用するデータベース接続の詳細とプロファイルが含まれています。 プレースホルダーの値を更新し、ファイルを保存します。config: partial_parse: true nyc_taxi_green: target: fabric-dev outputs: fabric-dev: type: fabric driver: "ODBC Driver 18 for SQL Server" server: <sql connection string of your data warehouse> port: 1433 database: "<name of the database>" schema: dbo threads: 4 authentication: ServicePrincipal tenant_id: <Tenant ID of your service principal> client_id: <Client ID of your service principal> client_secret: <Client Secret of your service principal>dbt_project.ymlファイルを作成し、次の内容をコピーします。 このファイルは、プロジェクト レベルの構成を指定します。name: "nyc_taxi_green" config-version: 2 version: "0.1" profile: "nyc_taxi_green" model-paths: ["models"] seed-paths: ["seeds"] test-paths: ["tests"] analysis-paths: ["analysis"] macro-paths: ["macros"] target-path: "target" clean-targets: - "target" - "dbt_modules" - "logs" require-dbt-version: [">=1.0.0", "<2.0.0"] models: nyc_taxi_green: materialized: tablemodelsフォルダーに、nyc_taxi_greenフォルダーを作成します。 このチュートリアルでは、ベンダーごとの 1 日あたりの乗車数を示すテーブルを作成するnyc_trip_count.sqlという名前のファイルにサンプル モデルを作成します。 次の内容をファイルにコピーします。with new_york_taxis as ( select * from nyctlc ), final as ( SELECT vendorID, CAST(lpepPickupDatetime AS DATE) AS trip_date, COUNT(*) AS trip_count FROM [contoso-data-warehouse].[dbo].[nyctlc] GROUP BY vendorID, CAST(lpepPickupDatetime AS DATE) ORDER BY vendorID, trip_date; ) select * from final
オーケストレーション用の Apache エアフロー DAG を作成する
my_cosmos_dag.pyフォルダーにdagsという名前のファイルを作成し、次の内容を貼り付けます。import os from pathlib import Path from datetime import datetime from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig from airflow import DAG DEFAULT_DBT_ROOT_PATH = Path(__file__).parent.parent / "dags" / "nyc_taxi_green" DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH)) profile_config = ProfileConfig( profile_name="nyc_taxi_green", target_name="fabric-dev", profiles_yml_filepath=DBT_ROOT_PATH / "profiles.yml", ) dbt_fabric_dag = DbtDag( project_config=ProjectConfig(DBT_ROOT_PATH,), operator_args={"install_deps": True}, profile_config=profile_config, schedule_interval="@daily", start_date=datetime(2023, 9, 10), catchup=False, dag_id="dbt_fabric_dag", )
DAG を実行する
Apache Airflow の UI で読み込まれた DAG を表示するには、[Apache エアフローで監視] を選択します。
正常に実行された後、Fabric データ ウェアハウスの "nyc_trip_count.sql" という名前の新しいテーブルを確認して、データを検証します。