Spark ConnectとSpark Classicを比較する
Spark Connect は、Apache Spark 内の gRPC ベースのプロトコルであり、クライアント アプリケーションがリモート Spark サーバーと通信する方法を指定します。DataFrame API を使用して Spark ワークロードをリモートで実行できます。
Spark Connect は次のような場合に使用されます。
- Databricks Runtimeバージョン 13.3 以降を備えたScalaノートブック (標準および専用コンピュート上)
- Databricks Runtimeバージョン 14.3 以降を備えた標準および専用コンピュート上のPythonノートブック
- サーバーレスコンピュート
- Databricks Connect
Spark Connect と Spark Classic はどちらも変換に遅延実行を利用しますが、既存のコードを Spark Classic から Spark Connect に移行するとき、または両方で動作する必要があるコードを作成するときに、予期しない動作やパフォーマンスの問題を回避するために知っておくべき重要な違いがあります。
怠惰 vs 熱心
Spark Connect と Spark Classic の主な違いは、次の表にまとめられているように、Spark Connect では分析と名前解決が実行時に延期されることです。
観点 | Sparkクラシック | Sparkコネクト |
|---|---|---|
クエリ実行 | 怠け者 | 怠け者 |
スキーマ分析 | 熱心な | 怠け者 |
スキーマアクセス | ローカル | RPCをトリガーする |
一時的なビュー | 計画が組み込まれている | 名前検索 |
UDFシリアル化 | 創造時 | 実行時 |
クエリ実行
Spark Classic と Spark Connect はどちらも、クエリ実行に同じ遅延実行モデルに従います。
Spark Classic では、DataFrame 変換 ( filterやlimitなど) は遅延されます。つまり、すぐに実行されるのではなく、論理的な計画に記録されます。実際の計算は、アクション ( show() 、 collect()など) が呼び出されたときにのみトリガーされます。
Spark Connect も同様の遅延評価モデルに従います。変換はクライアント側で構築され、未解決のプロト プランとしてサーバーに送信されます。その後、アクションが呼び出されると、サーバーは必要な分析と実行を実行します。
観点 | Sparkクラシック | Sparkコネクト |
|---|---|---|
変換: | 遅延実行 | 遅延実行 |
SQL クエリ: | 遅延実行 | 遅延実行 |
アクション: | 熱心な実行 | 熱心な実行 |
SQLコマンド: | 熱心な実行 | 熱心な実行 |
スキーマ分析
Spark Classic は、論理プラン構築フェーズ中にスキーマ分析を積極的に実行します。変換を定義すると、Spark はすぐに DataFrame のスキーマを分析し、参照されているすべての列とデータ型が有効であることを確認します。たとえば、 spark.sql("select 1 as a, 2 as b").filter("c > 1")を実行すると、列cが見つからないことを示すエラーがすぐにスローされます。
代わりに、Spark Connect は変換中に未解決のプロト プランを構築します。スキーマにアクセスしたりアクションを実行したりする場合、クライアントは未解決のプランを RPC (リモート プロシージャ コール) 経由でサーバーに送信します。その後、サーバーは分析と実行を実行します。この設計ではスキーマ分析が延期されます。たとえば、 spark.sql("select 1 as a, 2 as b").filter("c > 1")では未解決のプランがクライアント側のみにあるためエラーはスローされませんが、 df.columnsまたはdf.show()では未解決のプランが分析のためにサーバーに送信されるためエラーがスローされます。
クエリ実行とは異なり、Spark Classic と Spark Connect ではスキーマ分析がいつ実行されるかが異なります。
観点 | Sparkクラシック | Sparkコネクト |
|---|---|---|
変換: | 熱心な | 怠け者 |
スキーマ アクセス: | 熱心な | 熱心な Spark Classicとは異なり、分析RPCリクエストをトリガーします。 |
アクション: | 熱心な | 熱心な |
依存セッション状態: UDF、一時ビュー、構成 | 熱心な | 怠け者 実行中に評価される |
ベストプラクティス
遅延分析と積極的分析の違いは、予期しない動作やパフォーマンスの問題、具体的には一時ビュー名の上書き、UDF での外部変数のキャプチャ、エラー検出の遅延、新しいDataFramesでの過度のスキーマ アクセスなどによって発生する問題を回避するために従うべきベスト プラクティスがいくつかあることを意味します。
一意の一時ビュー名を作成する
Spark Connect では、DataFrame には名前による一時ビューへの参照のみが保存されます。その結果、後で一時ビューが置き換えられると、実行時に名前でビューが検索されるため、DataFrame 内のデータも変更されます。
この動作は、作成時に一時ビューの論理プランがデータ フレームのプランに埋め込まれる Spark Classic とは異なります。その後の一時ビューの置き換えは、元のデータ フレームに影響しません。
違いを軽減するには、常に一意の一時ビュー名を作成します。たとえば、ビュー名に UUID を含めます。これにより、以前に登録された一時ビューを参照する既存のDataFramesへの影響が回避されます。
- Python
- Scala
import uuid
def create_temp_view_and_create_dataframe(x):
temp_view_name = f"`temp_view_{uuid.uuid4()}`" # Use a random name to avoid conflicts.
spark.range(x).createOrReplaceTempView(temp_view_name)
return spark.table(temp_view_name)
df10 = create_temp_view_and_create_dataframe(10)
assert len(df10.collect()) == 10
df100 = create_temp_view_and_create_dataframe(100)
assert len(df10.collect()) == 10 # It works as expected now.
assert len(df100.collect()) == 100
import java.util.UUID
def createTempViewAndDataFrame(x: Int) = {
val tempViewName = s"`temp_view_${UUID.randomUUID()}`"
spark.range(x).createOrReplaceTempView(tempViewName)
spark.table(tempViewName)
}
val df10 = createTempViewAndDataFrame(10)
assert(df10.collect().length == 10)
val df100 = createTempViewAndDataFrame(100)
assert(df10.collect().length == 10) // Works as expected
assert(df100.collect().length == 100)
UDF定義をラップする
Spark Connect では、Python UDF は遅延します。シリアル化と登録は実行時まで延期されます。次の例では、 show()が呼び出されたときにのみ、UDF がシリアル化され、実行のために Spark クラスターにアップロードされます。
from pyspark.sql.functions import udf
x = 123
@udf("INT")
def foo():
return x
df = spark.range(1).select(foo())
x = 456
df.show() # Prints 456
この動作は、UDF が積極的に作成される Spark Classic とは異なります。Spark Classic では、UDF 作成時のxの値がキャプチャされるため、その後のxの変更は既に作成されている UDF には影響しません。
UDF が依存する外部変数の値を変更する必要がある場合は、関数ファクトリ (早期バインディングによるクロージャ) を使用して変数値を正しくキャプチャします。具体的には、UDF の作成をヘルパー関数でラップして、従属変数の値を取得します。
- Python
- Scala
from pyspark.sql.functions import udf
def make_udf(value):
def foo():
return value
return udf(foo)
x = 123
foo_udf = make_udf(x)
x = 456
df = spark.range(1).select(foo_udf())
df.show() # Prints 123 as expected
def makeUDF(value: Int) = udf(() => value)
var x = 123
val fooUDF = makeUDF(x) // Captures the current value
x = 456
val df = spark.range(1).select(fooUDF())
df.show() // Prints 123 as expected
UDF 定義を別の関数 ( make_udf ) 内にラップすることで、 xの現在の値が引数として渡される新しいスコープを作成します。これにより、生成された各 UDF に、UDF の作成時にバインドされたフィールドの独自のコピーが含まれるようになります。
エラー検出のためのトリガーイーガー分析
次のエラー処理は、例外をすぐにスローできるようにする積極的な分析を実行するため、Spark Classic で役立ちます。ただし、Spark Connect では、このコードは分析をトリガーせずにローカルの未解決の proto プランを構築するだけなので、問題は発生しません。
df = spark.createDataFrame([("Alice", 25), ("Bob", 30)], ["name", "age"])
try:
df = df.select("name", "age")
df = df.withColumn(
"age_group",
when(col("age") < 18, "minor").otherwise("adult"))
df = df.filter(col("age_with_typo") > 6) # The use of non-existing column name will not throw analysis exception in Spark Connect
except Exception as e:
print(f"Error: {repr(e)}")
コードが分析例外に依存しており、それをキャッチしたい場合は、たとえばdf.columns 、 df.schema 、またはdf.collect()を使用して、積極的な分析をトリガーできます。
- Python
- Scala
try:
df = ...
df.columns # This will trigger eager analysis
except Exception as e:
print(f"Error: {repr(e)}")
import org.apache.spark.SparkThrowable
import org.apache.spark.sql.functions._
val df = spark.createDataFrame(Seq(("Alice", 25), ("Bob", 30))).toDF("name", "age")
try {
val df2 = df.select("name", "age")
.withColumn("age_group", when(col("age") < 18, "minor").otherwise("adult"))
.filter(col("age_with_typo") > 6)
df2.columns // Trigger eager analysis to catch the error
} catch {
case e: SparkThrowable => println(s"Error: ${e.getMessage}")
}
過剰な分析要求を避ける
積極的な分析をトリガーする呼び出し ( df.columns 、 df.schemaなど) を過度に使用しないようにして、大量の分析要求を回避すると、パフォーマンスが向上します。
これを回避できず、新しいデータ フレームの列を頻繁に確認する必要がある場合は、分析要求を回避するために列名を追跡するセットを維持します。
- Python
- Scala
df = spark.range(10)
columns = set(df.columns) # Maintain the set of column names
for i in range(200):
new_column_name = str(i)
if new_column_name not in columns: # Check the set
df = df.withColumn(new_column_name, F.col("id") + i)
columns.add(new_column_name)
df.show()
import org.apache.spark.sql.functions._
var df = spark.range(10).toDF
val columns = scala.collection.mutable.Set(df.columns: _*)
for (i <- 0 until 200) {
val newColumnName = i.toString
if (!columns.contains(newColumnName)) {
df = df.withColumn(newColumnName, col("id") + i)
columns.add(newColumnName)
}
}
df.show()
もう一つの同様のケースは、不要な中間DataFramesを大量に作成して分析することです。 代わりに、中間DataFramesを作成するのではなく、 DataFrameのスキーマから直接StructTypeフィールド 情報 を取得します。
- Python
- Scala
from pyspark.sql.types import StructType
df = ...
struct_column_fields = {
column_schema.name: [f.name for f in column_schema.dataType.fields]
for column_schema in df.schema
if isinstance(column_schema.dataType, StructType)
}
print(struct_column_fields)
import org.apache.spark.sql.types.StructType
df = ...
val structColumnFields = df.schema.fields
.filter(_.dataType.isInstanceOf[StructType])
.map { field =>
field.name -> field.dataType.asInstanceOf[StructType].fields.map(_.name)
}
.toMap
println(structColumnFields)