Spark を使用してデータ ファイルを操作する

完了

ノートブックを設定してクラスターにアタッチした後、Spark を使用してデータ ファイルの読み取りと処理を行うことができます。 Spark では、CSV、JSON、Parquet、ORC、Avro、Delta などのさまざまな形式がサポートされており、Databricks には、ワークスペース、Azure Data Lake または Blob Storage、またはその他の外部システムに格納されているファイルにアクセスするための組み込みのコネクタが用意されています。

ワークフローは通常、次の 3 つの手順に従います。

  1. 正しい形式とパスを使用して spark.read を使用して Spark DataFrame にファイルを読取ります。 CSV や JSON などの生のテキスト形式を読み取る場合、Spark はスキーマ (列名とデータ型) を推測できますが、低速または信頼性の低い場合があります。 運用環境では、データが一貫して効率的に読み込まれるようにスキーマを明示的に定義することをお勧めします。

  2. SQL 操作または DataFrame 操作 (行のフィルター処理、列の選択、値の集計など) を使用して、DataFrame を探索して変換します。

  3. 選択した形式で結果をストレージに書き戻します。

Spark でのファイルの操作は、小規模および大規模なデータセット間で一貫するように設計されています。 小規模な CSV ファイルのテストに使用されるのと同じコードは、Spark によってクラスター全体に作業が分散されるため、はるかに大きなデータセットでも機能します。 これにより、迅速な探索からより複雑なデータ処理へのスケールアップが容易になります。

データフレームにデータを読み込む

仮説の例を見て、データフレームを使用してデータを操作する方法を確認しましょう。 Databricks File System (DBFS) ストレージのデータ フォルダーに 、products.csv という名前のコンマ区切りのテキスト ファイルに次の データ があるとします。

ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

Spark ノートブックでは、次の PySpark コードを使用してデータフレームにデータを読み込み、最初の 10 行を表示できます。

%pyspark
df = spark.read.load('/data/products.csv',
    format='csv',
    header=True
)
display(df.limit(10))

先頭の %pyspark 行は "マジック" と呼ばれ、このセルで使用される言語が PySpark であることを Spark に伝えます。 製品データの例と同等の Scala コードを次に示します。

%spark
val df = spark.read.format("csv").option("header", "true").load("/data/products.csv")
display(df.limit(10))

マジック %spark は Scala を指定するために使用されます。

ヒント

Notebook インターフェイスの各セルに使用する言語を選択することもできます。

前に示した両方の例では、次のような出力が生成されます。

製品 ID 製品名 カテゴリ ListPrice
771 Mountain-100 Silver、38 マウンテン バイク 3399.9900
772 Mountain-100 Silver,42 マウンテン バイク 3399.9900
773 Mountain-100 Silver,44 マウンテン バイク 3399.9900
... ... ... ...

データフレーム スキーマを指定する

前の例では、CSV ファイルの最初の行に列名が含まれており、Spark により、含まれているデータから各列のデータ型を推論できました。 また、データの明示的なスキーマを指定することもできます。これは、次の CSV の例のように、データ ファイルに列名が含まれていない場合に便利です。

771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

次の PySpark の例は、 product-data.csvという名前 のファイルから読み込まれるデータフレームのスキーマをこの形式で指定する方法を示しています。

from pyspark.sql.types import *
from pyspark.sql.functions import *

productSchema = StructType([
    StructField("ProductID", IntegerType()),
    StructField("ProductName", StringType()),
    StructField("Category", StringType()),
    StructField("ListPrice", FloatType())
    ])

df = spark.read.load('/data/product-data.csv',
    format='csv',
    schema=productSchema,
    header=False)
display(df.limit(10))

ここでも、結果は次のようになります。

製品 ID 製品名 カテゴリ ListPrice
771 Mountain-100 Silver、38 マウンテン バイク 3399.9900
772 Mountain-100 Silver,42 マウンテン バイク 3399.9900
773 Mountain-100 Silver,44 マウンテン バイク 3399.9900
... ... ... ...

データフレームのフィルター処理とグループ化を行う

Dataframe クラスのメソッドを使用して、含まれているデータをフィルター処理、並べ替え、グループ化、操作できます。 たとえば、次のコード例では、select メソッドを使用して、前の例の製品データを含む df データフレームから ProductName 列と ListPrice 列を取得します。

pricelist_df = df.select("ProductID", "ListPrice")

このコード例の結果は次のようになります。

製品 ID ListPrice
771 3399.9900
772 3399.9900
773 3399.9900
... ...

ほとんどのデータ操作メソッドと共通して、 select は新しいデータフレーム オブジェクトを返します。

ヒント

データフレームから列のサブセットを選択することは一般的な操作であり、次の短い構文を使用して実現することもできます。

pricelist_df = df["ProductID", "ListPrice"]

メソッドを "チェーン" して、変換されたデータフレームを作成する一連の操作を実行できます。 たとえば、次のコード例では、selectメソッドと where メソッドを連結して、Mountain Bikes または Road Bikes のカテゴリを持つ製品の ProductName 列と ListPrice 列を含む新しいデータフレームを作成します。

bikes_df = df.select("ProductName", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)

このコード例の結果は次のようになります。

製品名 ListPrice
Mountain-100 Silver、38 3399.9900
ロード-750 ブラック、52 539.9900
... ...

データをグループ化して集計するには、 groupby メソッドと集計関数を使用できます。 たとえば、次の PySpark コードでは、各カテゴリの製品数をカウントします。

counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)

このコード例の結果は次のようになります。

カテゴリ カウント
Headsets 3
ホイール 14
マウンテン バイク 32
... ...

Spark DataFrame は 宣言型であり、不変です。 各変換 ( selectfiltergroupByなど) は、実行方法ではなく、必要なものを表す新しい DataFrame を作成します。 これにより、コードを再利用可能で、最適化可能になり、副作用がなくなります。 ただし、 アクション (たとえばdisplaycollectwrite) をトリガーするまで、これらの変換は実際には実行されません。この時点で、Spark は完全に最適化されたプランを実行します。

Spark で SQL 式を使用する

Dataframe API は Spark SQL という名前の Spark ライブラリの一部であり、データ アナリストは SQL 式を使用してデータのクエリと操作を行います。

Spark カタログでデータベース オブジェクトを作成する

Spark カタログは、ビューやテーブルなどのリレーショナル データ オブジェクトのメタストアです。 Spark ランタイムでは、このカタログを使用して、任意の Spark 対応言語で記述されたコードと、一部のデータ アナリストや開発者にとってより自然な SQL 式をシームレスに統合できます。

Spark カタログでクエリを実行するためにデータフレーム内のデータを使用できるようにする最も簡単な方法の 1 つは、次のコード例に示すように、一時ビューを作成することです。

df.createOrReplaceTempView("products")

"ビュー" は一時的なもので、現在のセッションの終了時に自動的に削除されます。 また、カタログに保持される "テーブル" を作成して、Spark SQL を使用してクエリを実行できるデータベースを定義することもできます。

このモジュールでは Spark カタログ テーブルについて詳しく説明しませんが、いくつかの重要な点を確認しておくことをお勧めします。

  • spark.catalog.createTable メソッドを使用して、空のテーブルを作成できます。 テーブルは、カタログに関連付けられているストレージの場所に、基になるデータを格納するメタデータ構造です。 テーブルを削除すると、基になるデータも削除されます。
  • データフレームをテーブルとして保存するには、saveAsTable メソッドを使用します。
  • メソッドを使用して "外部"spark.catalog.createExternalTable テーブルを作成できます。 外部テーブルではカタログ内のメタデータが定義されますが、外部ストレージの場所 (通常は、データ レイク内のフォルダー) から基になるデータが取得されます。 外部テーブルを削除しても、基になるデータは削除されません。

Spark SQL API を使用してデータのクエリを実行する

任意の言語で記述されたコードで Spark SQL API を使用して、カタログ内のデータに対してクエリを実行できます。 たとえば、次の PySpark コードでは、SQL クエリを使用して 製品 ビューからデータフレームとしてデータを返します。

bikes_df = spark.sql("SELECT ProductID, ProductName, ListPrice \
                      FROM products \
                      WHERE Category IN ('Mountain Bikes', 'Road Bikes')")
display(bikes_df)

このコード例の結果は、次の表のようになります。

製品名 ListPrice
Mountain-100 Silver、38 3399.9900
ロード-750 ブラック、52 539.9900
... ...

SQL コードを使用する

前の例では、Spark SQL API を使用して、Spark コードに SQL 式を埋め込む方法を示しました。 また、ノートブックで %sql マジックを使用して、次のようにカタログ内のオブジェクトに対してクエリを行う SQL コードを実行することもできます。

%sql

SELECT Category, COUNT(ProductID) AS ProductCount
FROM products
GROUP BY Category
ORDER BY Category

この SQL コード例では、次のように、ノートブックにテーブルとして自動的に表示される結果セットが返されます。

カテゴリ ProductCount
Bib-Shorts 3
Bike Racks 1
Bike Stands 1
... ...