次の方法で共有


Spark Connect と Spark クラシックの比較

Spark Connect は、クライアント アプリケーションがリモート Spark サーバーと通信する方法を指定する Apache Spark 内の gRPC ベースのプロトコルです。 これにより、DataFrame API を使用して Spark ワークロードをリモートで実行できます。

Spark Connect は次で使用されます。

  • Databricks Runtime バージョン 13.3以降の標準および専用コンピューティング環境で使用するScalaノートブック
  • 標準コンピュートおよび専用コンピュート上で動作する、Databricks Runtime バージョン 14.3 以降の Python ノートブック
  • サーバーレス コンピューティング
  • Databricks Connect

Spark Connect と Spark クラシックの両方が変換に遅延実行を利用しますが、Spark クラシックから Spark Connect に既存のコードを移行するとき、または両方で動作する必要があるコードを記述するときに、予期しない動作とパフォーマンスの問題を回避するために知っておくべき重要な違いがあります。

遅延と熱心

Spark Connect と Spark クラシックの主な違いは、次の表に示すように、Spark Connect が分析と名前解決を実行時間に延期することです。

特徴 Spark クラシック Spark Connect
クエリの実行 怠惰 怠惰
スキーマ分析 熱心な 怠惰
スキーマへのアクセス ローカル RPC をトリガーする
一時ビュー 組み込み計画 名前の参照
UDF のシリアル化 作成時 実行時

クエリの実行

Spark クラシックと Spark Connect はどちらも、クエリ実行に同じ遅延実行モデルに従います。

Spark クラシックでは、DataFrame 変換 ( filterlimitなど) は遅延です。 つまり、これらは直ちに実行されるのではなく、論理プランに記録されます。 実際の計算は、アクション ( show()collect() など) が呼び出された場合にのみトリガーされます。

Spark Connect は、同様の遅延評価モデルに従います。 変換はクライアント側で構築され、未解決の proto プランとしてサーバーに送信されます。 その後、サーバーは、アクションが呼び出されたときに必要な分析と実行を実行します。

特徴 Spark クラシック Spark Connect
変換: df.filter(...)df.select(...)df.limit(...) 遅延実行 遅延実行
SQL クエリ: spark.sql("select …") 遅延実行 遅延実行
アクション: df.collect()df.show() 即時実行 即時実行
SQL コマンド: spark.sql("insert …")spark.sql("create …") 即時実行 即時実行

スキーマ分析

Spark クラシックは、論理プラン構築フェーズ中にスキーマ分析を熱心に実行します。 変換を定義すると、Spark はすぐに DataFrame のスキーマを分析して、参照されているすべての列とデータ型が有効であることを確認します。 たとえば、spark.sql("select 1 as a, 2 as b").filter("c > 1") を実行するとエラーが即座に発生し、列 c が見つからないことが通知されます。

Spark Connectは、代わりに変換中に未解決のプロトプランを構築します。 スキーマにアクセスするとき、またはアクションを実行するとき、クライアントは RPC (リモート プロシージャ コール) を介して未解決のプランをサーバーに送信します。 その後、サーバーは分析と実行を実行します。 この設計では、スキーマ分析が延期されます。 たとえば、未解決のプランはクライアント側のみであるため、 spark.sql("select 1 as a, 2 as b").filter("c > 1") はエラーをスローしませんが、 df.columns または df.show() では、未解決のプランが分析のためにサーバーに送信されるため、エラーがスローされます。

クエリの実行とは異なり、Spark クラシックと Spark Connect はスキーマ分析の発生時に異なります。

特徴 Spark クラシック Spark Connect
変換: df.filter(...)df.select(...)df.limit(...) 熱心な 怠惰
スキーマ アクセス: df.columnsdf.schemadf.isStreaming 熱心な 熱心な
Spark クラシックとは異なり、分析 RPC 要求をトリガーします
アクション: df.collect()df.show() 熱心な 熱心な
依存セッション状態: UDF、一時ビュー、設定 熱心な 怠惰
実行中に評価される

ベスト プラクティス

遅延分析と一括分析の違いは、予期しない動作やパフォーマンスの問題 (特に、一時ビュー名の上書き、UDF での外部変数のキャプチャ、遅延エラー検出、新しい DataFrame での過剰なスキーマ アクセスによって発生する問題) を回避するために従うベスト プラクティスがあることを意味します。

一意の一時ビュー名を作成する

Spark Connect では、DataFrame は一時ビューへの参照のみを名前で格納します。 その結果、一時ビューが後で置き換えられると、実行時にビューが名前で検索されるため、DataFrame 内のデータも変更されます。

この動作は、一時ビューの論理プランが作成時にデータ フレームのプランに埋め込まれる Spark クラシックとは異なります。 一時ビューの後続の置換は、元のデータ フレームには影響しません。

この違いを軽減するには、常に一意の一時ビュー名を作成します。 たとえば、ビュー名に UUID を含めます。 これにより、以前に登録された一時ビューを参照する既存の DataFrame への影響を回避できます。

Python

import uuid
def create_temp_view_and_create_dataframe(x):
  temp_view_name = f"`temp_view_{uuid.uuid4()}`"  # Use a random name to avoid conflicts.
  spark.range(x).createOrReplaceTempView(temp_view_name)
  return spark.table(temp_view_name)

df10 = create_temp_view_and_create_dataframe(10)
assert len(df10.collect()) == 10

df100 = create_temp_view_and_create_dataframe(100)
assert len(df10.collect()) == 10  # It works as expected now.
assert len(df100.collect()) == 100

Scala

import java.util.UUID

def createTempViewAndDataFrame(x: Int) = {
  val tempViewName = s"`temp_view_${UUID.randomUUID()}`"
  spark.range(x).createOrReplaceTempView(tempViewName)
  spark.table(tempViewName)
}

val df10 = createTempViewAndDataFrame(10)
assert(df10.collect().length == 10)

val df100 = createTempViewAndDataFrame(100)
assert(df10.collect().length == 10) // Works as expected
assert(df100.collect().length == 100)

UDF 定義をラップする

Spark Connect では、Python UDF は遅延です。 シリアル化と登録は、実行時間まで延期されます。 次の例では、UDF はシリアル化され、 show() が呼び出されたときに実行するために Spark クラスターにアップロードされます。

from pyspark.sql.functions import udf

x = 123

@udf("INT")
def foo():
  return x


df = spark.range(1).select(foo())
x = 456
df.show() # Prints 456

この動作は、UDF が熱心に作成される Spark クラシックとは異なります。 Spark クラシックでは、UDF 作成時の x の値がキャプチャされるため、 x に対する後続の変更は、既に作成されている UDF には影響しません。

UDF が依存する外部変数の値を変更する必要がある場合は、関数ファクトリ (早期バインディングによるクロージャ) を使用して変数値を正しくキャプチャします。 具体的には、UDF の作成をヘルパー関数でラップして、従属変数の値をキャプチャします。

Python

from pyspark.sql.functions import udf

def make_udf(value):
  def foo():
    return value
  return udf(foo)


x = 123
foo_udf = make_udf(x)
x = 456
df = spark.range(1).select(foo_udf())
df.show() # Prints 123 as expected

Scala

def makeUDF(value: Int) = udf(() => value)

var x = 123
val fooUDF = makeUDF(x)  // Captures the current value
x = 456
val df = spark.range(1).select(fooUDF())
df.show() // Prints 123 as expected

UDF 定義を別の関数 (make_udf) 内にラップすることで、 x の現在の値が引数として渡される新しいスコープを作成します。 これにより、生成された各 UDF に、UDF の作成時にバインドされたフィールドの独自のコピーが確実に含まれます。

積極的な分析をトリガーしてエラー検出を行う

次のエラー処理は Spark クラシックで役立ちます。これは、一括分析を実行するためです。これにより、例外をすぐにスローできます。 ただし、Spark Connect では、分析をトリガーせずにローカルの未解決の proto プランのみを構築しているため、このコードでは問題は発生しません。

df = spark.createDataFrame([("Alice", 25), ("Bob", 30)], ["name", "age"])

try:
  df = df.select("name", "age")
  df = df.withColumn(
      "age_group",
      when(col("age") < 18, "minor").otherwise("adult"))
  df = df.filter(col("age_with_typo") > 6) # The use of non-existing column name will not throw analysis exception in Spark Connect
except Exception as e:
  print(f"Error: {repr(e)}")

コードが分析例外に依存していて、それをキャッチする場合は、 df.columnsdf.schemadf.collect()など、一括分析をトリガーできます。

Python

try:
  df = ...
  df.columns # This will trigger eager analysis
except Exception as e:
  print(f"Error: {repr(e)}")

Scala

import org.apache.spark.SparkThrowable
import org.apache.spark.sql.functions._

val df = spark.createDataFrame(Seq(("Alice", 25), ("Bob", 30))).toDF("name", "age")

try {
  val df2 = df.select("name", "age")
    .withColumn("age_group", when(col("age") < 18, "minor").otherwise("adult"))
    .filter(col("age_with_typo") > 6)
  df2.columns // Trigger eager analysis to catch the error
} catch {
  case e: SparkThrowable => println(s"Error: ${e.getMessage}")
}

積極的すぎる分析要求を避けるようにする

一括分析 ( df.columnsdf.schema など) をトリガーする呼び出しの過剰な使用を回避することで、多数の分析要求を回避すると、パフォーマンスが向上します。

これを回避できず、新しいデータ フレームの列を頻繁にチェックする必要がある場合は、分析要求を回避するために列名を追跡するセットを維持してください。

Python

df = spark.range(10)
columns = set(df.columns) # Maintain the set of column names
for i in range(200):
  new_column_name = str(i)
  if new_column_name not in columns: # Check the set
    df = df.withColumn(new_column_name, F.col("id") + i)
    columns.add(new_column_name)
df.show()

Scala

import org.apache.spark.sql.functions._

var df = spark.range(10).toDF
val columns = scala.collection.mutable.Set(df.columns: _*)
for (i <- 0 until 200) {
  val newColumnName = i.toString
  if (!columns.contains(newColumnName)) {
    df = df.withColumn(newColumnName, col("id") + i)
    columns.add(newColumnName)
  }
}
df.show()

もう 1 つの同様のケースは、多数の不要な中間データフレームを作成して分析することです。 代わりに、中間の DataFrame を作成するのではなく、DataFrame のスキーマから直接 StructType フィールド情報を取得します。

Python

from pyspark.sql.types import StructType

df = ...
struct_column_fields = {
    column_schema.name: [f.name for f in column_schema.dataType.fields]
    for column_schema in df.schema
    if isinstance(column_schema.dataType, StructType)
}
print(struct_column_fields)

Scala

import org.apache.spark.sql.types.StructType

df = ...
val structColumnFields = df.schema.fields
  .filter(_.dataType.isInstanceOf[StructType])
  .map { field =>
    field.name -> field.dataType.asInstanceOf[StructType].fields.map(_.name)
  }
  .toMap
println(structColumnFields)