次の方法で共有


チュートリアル: Lakeflow Spark 宣言パイプラインを使用して ETL パイプラインを構築する

このチュートリアルでは、Lakeflow Spark 宣言パイプラインと自動ローダーを使用して、データ オーケストレーション用の ETL (抽出、変換、読み込み) パイプラインを作成してデプロイする方法について説明します。 ETL パイプラインは、ソース システムからデータを読み取り、データ品質チェックや重複除去の記録などの要件に基づいてデータを変換し、データ ウェアハウスやデータ レイクなどのターゲット システムにデータを書き込む手順を実装します。

このチュートリアルでは、パイプラインと自動ローダーを使用して次の操作を行います。

  • ターゲット テーブルに生のソース データを取り込みます。
  • 生のソース データを変換し、変換されたデータを 2 つのターゲットマテリアライズド ビューに書き込みます。
  • 変換されたデータに対してクエリを実行します。
  • Databricks ジョブを使用して ETL パイプラインを自動化します。

パイプラインと自動ローダーの詳細については、「Lakeflow Spark 宣言型パイプライン」および「自動ローダーとは」を参照してください。

必要条件

このチュートリアルを完了するには、次の要件を満たす必要があります。

データセットについて

この例で使用するデータセットは、現代音楽トラックの特徴とメタデータのコレクションである Million Song Dataset のサブセットです。 このデータセットは、Azure Databricks ワークスペースに含まれているサンプル データセット内にあります。

手順 1: パイプラインを作成する

まず、パイプライン構文を使用してファイル内のデータセット ( ソース コードと呼ばれます) を定義してパイプラインを作成します。 各ソース コード ファイルには 1 つの言語のみを含めることができますが、パイプラインに複数の言語固有のファイルを追加できます。 詳細については、「Lakeflow Spark 宣言型パイプライン」を参照してください。

このチュートリアルでは、サーバーレス コンピューティングと Unity カタログを使用します。 指定されていないすべての構成オプションで、既定の設定を使用します。 ワークスペースでサーバーレス コンピューティングが有効になっていないか、サポートされていない場合は、既定のコンピューティング設定を使用して記述されたチュートリアルを完了できます。

新しいパイプラインを作成するには、次の手順に従います。

  1. ワークスペースで、[プラス] アイコンをクリックします。サイドバーの新機能を選択し、[ETL パイプライン] を選択します。
  2. パイプラインに一意の名前を付けます。
  3. 名前のすぐ下で、生成するデータの既定のカタログとスキーマを選択します。 変換では他の変換先を指定できますが、このチュートリアルではこれらの既定値を使用します。 作成するカタログとスキーマに対するアクセス許可が必要です。 要件を参照してください。
  4. このチュートリアルでは、[ 空のファイルで開始] を選択します。
  5. [ フォルダー パス] で、ソース ファイルの場所を指定するか、既定値 (ユーザー フォルダー) をそのまま使用します。
  6. 最初のソース ファイルの言語として Python または SQL を選択します (パイプラインは言語を混在させ、一致させることができますが、各ファイルは 1 つの言語である必要があります)。
  7. [選択] をクリックします。

新しいパイプラインのパイプライン エディターが表示されます。 言語の空のソース ファイルが作成され、最初の変換の準備が整います。

手順 2: パイプライン ロジックを開発する

この手順では、 Lakeflow Pipelines エディター を使用して、パイプラインのソース コードを対話的に開発および検証します。

このコードでは、増分データ インジェストに自動ローダーを使用します。 自動ローダーは、クラウド オブジェクト ストレージに到着した新しいファイルを自動的に検出して処理します。 詳細については、「自動ローダーとは」を参照してください。

空のソース コード ファイルが自動的に作成され、パイプライン用に構成されます。 ファイルは、パイプラインの変換フォルダーに作成されます。 既定では、変換フォルダー内のすべての *.py ファイルと *.sql ファイルは、パイプラインのソースの一部です。

  1. 次のコードをコピーしてソース ファイルに貼り付けます。 手順 1 でファイルに選択した言語を必ず使用してください。

    Python

    # Import modules
    from pyspark import pipelines as dp
    from pyspark.sql.functions import *
    from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField
    
    # Define the path to the source data
    file_path = f"/databricks-datasets/songs/data-001/"
    
    # Define a streaming table to ingest data from a volume
    schema = StructType(
      [
        StructField("artist_id", StringType(), True),
        StructField("artist_lat", DoubleType(), True),
        StructField("artist_long", DoubleType(), True),
        StructField("artist_location", StringType(), True),
        StructField("artist_name", StringType(), True),
        StructField("duration", DoubleType(), True),
        StructField("end_of_fade_in", DoubleType(), True),
        StructField("key", IntegerType(), True),
        StructField("key_confidence", DoubleType(), True),
        StructField("loudness", DoubleType(), True),
        StructField("release", StringType(), True),
        StructField("song_hotnes", DoubleType(), True),
        StructField("song_id", StringType(), True),
        StructField("start_of_fade_out", DoubleType(), True),
        StructField("tempo", DoubleType(), True),
        StructField("time_signature", DoubleType(), True),
        StructField("time_signature_confidence", DoubleType(), True),
        StructField("title", StringType(), True),
        StructField("year", IntegerType(), True),
        StructField("partial_sequence", IntegerType(), True)
      ]
    )
    
    @dp.table(
      comment="Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks."
    )
    def songs_raw():
      return (spark.readStream
        .format("cloudFiles")
        .schema(schema)
        .option("cloudFiles.format", "csv")
        .option("sep","\t")
        .load(file_path))
    
    # Define a materialized view that validates data and renames a column
    @dp.materialized_view(
      comment="Million Song Dataset with data cleaned and prepared for analysis."
    )
    @dp.expect("valid_artist_name", "artist_name IS NOT NULL")
    @dp.expect("valid_title", "song_title IS NOT NULL")
    @dp.expect("valid_duration", "duration > 0")
    def songs_prepared():
      return (
        spark.read.table("songs_raw")
          .withColumnRenamed("title", "song_title")
          .select("artist_id", "artist_name", "duration", "release", "tempo", "time_signature", "song_title", "year")
      )
    
    # Define a materialized view that has a filtered, aggregated, and sorted view of the data
    @dp.materialized_view(
      comment="A table summarizing counts of songs released by the artists who released the most songs each year."
    )
    def top_artists_by_year():
      return (
        spark.read.table("songs_prepared")
          .filter(expr("year > 0"))
          .groupBy("artist_name", "year")
          .count().withColumnRenamed("count", "total_number_of_songs")
          .sort(desc("total_number_of_songs"), desc("year"))
      )
    

    SQL

    -- Define a streaming table to ingest data from a volume
    CREATE OR REFRESH STREAMING TABLE songs_raw
    COMMENT "Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks."
    AS SELECT *
    FROM STREAM read_files(
      '/databricks-datasets/songs/data-001/part*',
      format => "csv",
      header => "false",
      delimiter => "\t",
      schema => """
        artist_id STRING,
        artist_lat DOUBLE,
        artist_long DOUBLE,
        artist_location STRING,
        artist_name STRING,
        duration DOUBLE,
        end_of_fade_in DOUBLE,
        key INT,
        key_confidence DOUBLE,
        loudness DOUBLE,
        release STRING,
        song_hotnes DOUBLE,
        song_id STRING,
        start_of_fade_out DOUBLE,
        tempo DOUBLE,
        time_signature INT,
        time_signature_confidence DOUBLE,
        title STRING,
        year INT,
        partial_sequence STRING
      """,
      schemaEvolutionMode => "none");
    
    -- Define a materialized view that validates data and renames a column
    CREATE OR REFRESH MATERIALIZED VIEW songs_prepared(
    CONSTRAINT valid_artist_name EXPECT (artist_name IS NOT NULL),
    CONSTRAINT valid_title EXPECT (song_title IS NOT NULL),
    CONSTRAINT valid_duration EXPECT (duration > 0)
    )
    COMMENT "Million Song Dataset with data cleaned and prepared for analysis."
    AS SELECT artist_id, artist_name, duration, release, tempo, time_signature, title AS song_title, year
    FROM songs_raw;
    
    -- Define a materialized view that has a filtered, aggregated, and sorted view of the data
    CREATE OR REFRESH MATERIALIZED VIEW top_artists_by_year
    COMMENT "A table summarizing counts of songs released by the artists each year, who released the most songs."
    AS SELECT
      artist_name,
      year,
      COUNT(*) AS total_number_of_songs
    FROM songs_prepared
    WHERE year > 0
    GROUP BY artist_name, year
    ORDER BY total_number_of_songs DESC, year DESC;
    

    このソースには、3 つのクエリのコードが含まれています。 また、これらのクエリを別々のファイルに配置して、ファイルを整理し、必要に応じてコーディングすることもできます。

  2. [ 再生] アイコンをクリックします。ファイル または パイプラインの実行を実行 して、接続されたパイプラインの更新を開始します。 パイプライン内のソース ファイルが 1 つだけの場合、これらは機能的に同等です。

更新が完了すると、エディターがパイプラインに関する情報で更新されます。

  • コードの右側のサイドバーにあるパイプライン グラフ (DAG) には、 songs_rawsongs_preparedtop_artists_by_yearの 3 つのテーブルが表示されます。
  • 更新プログラムの概要は、パイプライン資産ブラウザーの上部に表示されます。
  • 生成されたテーブルの詳細が下部ペインに表示され、テーブルからデータを参照するには、テーブルからデータを選択します。

これには、生のデータとクリーンアップされたデータ、および年別のトップ アーティストを見つけるための簡単な分析が含まれます。 次の手順では、パイプライン内の別のファイルでさらに分析するためのアドホック クエリを作成します。

手順 3: パイプラインによって作成されたデータセットを調べる

この手順では、ETL パイプラインで処理されたデータに対してアドホック クエリを実行して、Databricks SQL エディターで曲データを分析します。 これらのクエリでは、前の手順で作成した準備済みレコードを使用します。

まず、1990 年以降、毎年最も多くの曲をリリースしているアーティストを検索するクエリを実行します。

  1. パイプライン資産ブラウザーのサイドバーで、[プラス] アイコンをクリックし、[追加]、その後[探索]をクリックします。

  2. 名前を入力し、探索ファイルの SQL を選択します。 SQL ノートブックが新しい explorations フォルダーに作成されます。 explorations フォルダー内のファイルは、既定ではパイプライン更新の一部として実行されません。 SQL ノートブックには、一緒に、または個別に実行できるセルがあります。

  3. 1990 年以降の各年のほとんどの曲をリリースするアーティストのテーブルを作成するには、新しい SQL ファイルに次のコードを入力します (ファイルにサンプル コードがある場合は、置き換えます)。 このノートブックはパイプラインの一部ではないため、既定のカタログとスキーマは使用されません。 <catalog>.<schema>を、パイプラインの既定値として使用したカタログとスキーマに置き換えます。

    -- Which artists released the most songs each year in 1990 or later?
    SELECT artist_name, total_number_of_songs, year
      -- replace with the catalog/schema you are using:
      FROM <catalog>.<schema>.top_artists_by_year
      WHERE year >= 1990
      ORDER BY total_number_of_songs DESC, year DESC;
    
  4. [ 再生] アイコンを クリックするか、 Shift + Enter キーを押してこのクエリを実行します。

次に、4/4ビートと踊り可能なテンポの曲を見つける別のクエリを実行します。

  1. 同じファイル内の次のセルに次のコードを追加します。 ここでも、 <catalog>.<schema> を、パイプラインの既定値として使用したカタログとスキーマに置き換えます。

    -- Find songs with a 4/4 beat and danceable tempo
    SELECT artist_name, song_title, tempo
      -- replace with the catalog/schema you are using:
      FROM <catalog>.<schema>.songs_prepared
      WHERE time_signature = 4 AND tempo between 100 and 140;
    
  2. [ 再生] アイコンを クリックするか、 Shift + Enter キーを押してこのクエリを実行します。

手順 4: パイプラインを実行するジョブを作成する

次に、スケジュールに従って実行される Databricks ジョブを使用して、データインジェスト、処理、および分析の手順を自動化するワークフローを作成します。

  1. エディターの上部にある [ スケジュール ] ボタンを選択します。
  2. [スケジュール] ダイアログが表示されたら、[スケジュールの追加] を選択します。
  3. [ 新しいスケジュール ] ダイアログが開き、スケジュールに従ってパイプラインを実行するジョブを作成できます。
  4. 必要に応じて、ジョブに名前を付けます。
  5. 既定では、スケジュールは 1 日に 1 回実行するように設定されています。 デフォルトを受け入れるか、独自のスケジュールを設定できます。 [詳細設定] を選択すると、ジョブを実行する特定の時刻を設定できます。 [ その他] オプション を選択すると、ジョブの実行時に通知を作成できます。
  6. [ 作成] を選択して変更を適用し、ジョブを作成します。

これで、ジョブが毎日実行され、パイプラインが最新の状態に保たれます。 もう一度 [スケジュール] を選択すると、スケジュールの一覧を表示できます。 パイプラインのスケジュールは、スケジュールの追加、編集、削除など、そのダイアログから管理できます。

スケジュール (またはジョブ) の名前をクリックすると、[ジョブ とパイプライン ] の一覧のジョブのページに移動します。 そこから、実行履歴など、ジョブの実行に関する詳細を表示したり、[ 今すぐ 実行] ボタンを使用してジョブをすぐに実行したりできます。

ジョブの実行の詳細については、Lakeflow ジョブの監視と可観測性を参照してください。

詳細情報