Compare o Spark Connect com o Spark Classic.
O Spark Connect é um protocolo baseado em gRPC dentro do Apache Spark que especifica como um aplicativo cliente pode se comunicar com um servidor Spark remoto. Permite a execução remota de cargas de trabalho do Spark usando a API DataFrame.
O Spark Connect é utilizado nos seguintes casos:
- Scala Notebook com Databricks Runtime versão 13.3 ou superior, em computepadrão e dedicados.
- Notebook Python com Databricks Runtime versão 14.3 ou superior, em computepadrão e dedicados.
- computação sem servidor
- Databricks Connect
Embora o Spark Connect e o Spark Classic utilizem execução preguiçosa para transformações, existem diferenças importantes que você precisa conhecer para evitar comportamentos inesperados e problemas de desempenho ao migrar código existente do Spark Classic para o Spark Connect ou ao escrever código que precisa funcionar com ambos.
Preguiçoso vs. ansioso
A key diferença entre Spark Connect e Spark Classic é que Spark Connect adia a análise e a resolução de nomes para o momento da execução, conforme resumido na tabela a seguir.
Aspecto | Spark Classic | Spark Connect |
|---|---|---|
Execução de consulta | Preguiçoso | Preguiçoso |
Análise de esquemas | Ansioso | Preguiçoso |
Acesso ao esquema | Local | Aciona RPC |
Vista temporária | Plano incorporado | Pesquisa de nome |
Serialização UDF | Na criação | Na execução |
Execução de consulta
Tanto o Spark Classic quanto o Spark Connect seguem o mesmo modelo de execução preguiçosa para a execução de consultas.
No Spark Classic, as transformações de DataFrame (como filter e limit) são preguiçosas. Isso significa que elas não são executadas imediatamente, mas sim registradas em um plano lógico. O cálculo real é acionado apenas quando uma ação (como show(), collect()) é invocada.
O Spark Connect segue um modelo de avaliação preguiçosa semelhante. As transformações são construídas no lado do cliente e enviadas como planos proto não resolvidos para o servidor. Em seguida, o servidor realiza a análise e a execução necessárias quando uma ação é chamada.
Aspecto | Spark Classic | Spark Connect |
|---|---|---|
transformações: | Execução preguiçosa | Execução preguiçosa |
Consultas SQL: | Execução preguiçosa | Execução preguiçosa |
Ações: | Execução ansiosa | Execução ansiosa |
Comando SQL : | Execução ansiosa | Execução ansiosa |
Análise de esquemas
O Spark Classic realiza a análise de esquema de forma eficiente durante a fase de construção do plano lógico. Ao definir transformações, o Spark analisa imediatamente o esquema do DataFrame para garantir que todas as colunas e tipos de dados referenciados sejam válidos. Por exemplo, executar spark.sql("select 1 as a, 2 as b").filter("c > 1") lançará um erro imediatamente, indicando que a coluna c não pode ser encontrada.
Em vez disso, Spark Connect constrói planos proto não resolvidos durante as transformações. Ao acessar um esquema ou executar uma ação, o cliente envia os planos não resolvidos para o servidor via RPC (chamada de procedimento remoto). Em seguida, o servidor realiza a análise e a execução. Este projeto adia a análise do esquema. Por exemplo, spark.sql("select 1 as a, 2 as b").filter("c > 1") não lançará nenhum erro porque o plano não resolvido é apenas do lado do cliente, mas em df.columns ou df.show() um erro será lançado porque o plano não resolvido é enviado para o servidor para análise.
Ao contrário da execução de consultas, o Spark Classic e o Spark Connect diferem no momento em que a análise de esquema ocorre.
Aspecto | Spark Classic | Spark Connect |
|---|---|---|
transformações: | Ansioso | Preguiçoso |
Acesso ao esquema: | Ansioso | Ansioso Aciona uma solicitação RPC de análise, diferentemente do Spark Classic. |
Ações: | Ansioso | Ansioso |
Estado da sessão dependente: UDFs, visualização temporária, configurações | Ansioso | Preguiçoso Avaliado durante a execução |
Melhores práticas
A diferença entre análise preguiçosa e análise ávida significa que existem algumas boas práticas a serem seguidas para evitar comportamentos inesperados e problemas de desempenho, especificamente aqueles causados pela sobrescrita de nomes view temporários, captura de variáveis externas em UDFs, detecção tardia de erros e acesso excessivo ao esquema em novos DataFrames.
Crie nomes view temporários exclusivos.
No Spark Connect, o DataFrame armazena apenas uma referência à view temporária pelo nome. Consequentemente, se a view temporária for substituída posteriormente, os dados no DataFrame também serão alterados, pois a consulta da view é feita pelo nome em tempo de execução.
Esse comportamento difere do Spark Classic, onde o plano lógico da view temporária é incorporado ao plano do dataframe no momento da criação. Qualquer substituição subsequente da view temporária não afeta o quadro de dados original.
Para minimizar a diferença, crie sempre nomes view temporários únicos. Por exemplo, inclua um UUID no nome view . Isso evita afetar quaisquer DataFrames existentes que façam referência a uma view temporária previamente registrada.
- Python
- Scala
import uuid
def create_temp_view_and_create_dataframe(x):
temp_view_name = f"`temp_view_{uuid.uuid4()}`" # Use a random name to avoid conflicts.
spark.range(x).createOrReplaceTempView(temp_view_name)
return spark.table(temp_view_name)
df10 = create_temp_view_and_create_dataframe(10)
assert len(df10.collect()) == 10
df100 = create_temp_view_and_create_dataframe(100)
assert len(df10.collect()) == 10 # It works as expected now.
assert len(df100.collect()) == 100
import java.util.UUID
def createTempViewAndDataFrame(x: Int) = {
val tempViewName = s"`temp_view_${UUID.randomUUID()}`"
spark.range(x).createOrReplaceTempView(tempViewName)
spark.table(tempViewName)
}
val df10 = createTempViewAndDataFrame(10)
assert(df10.collect().length == 10)
val df100 = createTempViewAndDataFrame(100)
assert(df10.collect().length == 10) // Works as expected
assert(df100.collect().length == 100)
Envolver definições UDF
No Spark Connect, as UDFs em Python são preguiçosas. Sua serialização e registro são adiados até o momento da execução. No exemplo a seguir, a UDF é serializada e carregada no cluster Spark para execução somente quando show() é chamado.
from pyspark.sql.functions import udf
x = 123
@udf("INT")
def foo():
return x
df = spark.range(1).select(foo())
x = 456
df.show() # Prints 456
Esse comportamento difere do Spark Classic, onde as UDFs são criadas imediatamente. No Spark Classic, o valor de x no momento da criação da UDF é capturado, portanto, alterações subsequentes em x não afetam a UDF já criada.
Se precisar modificar o valor de variáveis externas das quais uma UDF depende, use uma fábrica de funções (closure com vinculação antecipada) para capturar corretamente os valores das variáveis. Especificamente, envolva a criação da UDF em uma função auxiliar para capturar o valor de uma variável dependente.
- Python
- Scala
from pyspark.sql.functions import udf
def make_udf(value):
def foo():
return value
return udf(foo)
x = 123
foo_udf = make_udf(x)
x = 456
df = spark.range(1).select(foo_udf())
df.show() # Prints 123 as expected
def makeUDF(value: Int) = udf(() => value)
var x = 123
val fooUDF = makeUDF(x) // Captures the current value
x = 456
val df = spark.range(1).select(fooUDF())
df.show() // Prints 123 as expected
Ao envolver a definição UDF dentro de outra função (make_udf), criamos um novo escopo onde o valor atual de x é passado como argumento. Isso garante que cada UDF gerada tenha sua própria cópia do campo, vinculada no momento em que a UDF é criada.
Acione a análise imediata para detecção de erros.
O seguinte tratamento de erros é útil no Spark Classic porque realiza uma análise imediata, o que permite que as exceções sejam lançadas prontamente. No entanto, no Spark Connect, esse código não causa nenhum problema, pois apenas constrói um plano proto local não resolvido sem acionar nenhuma análise.
df = spark.createDataFrame([("Alice", 25), ("Bob", 30)], ["name", "age"])
try:
df = df.select("name", "age")
df = df.withColumn(
"age_group",
when(col("age") < 18, "minor").otherwise("adult"))
df = df.filter(col("age_with_typo") > 6) # The use of non-existing column name will not throw analysis exception in Spark Connect
except Exception as e:
print(f"Error: {repr(e)}")
Se o seu código depende da exceção de análise e você deseja capturá-la, você pode acionar a análise imediata, por exemplo com df.columns, df.schema ou df.collect().
- Python
- Scala
try:
df = ...
df.columns # This will trigger eager analysis
except Exception as e:
print(f"Error: {repr(e)}")
import org.apache.spark.SparkThrowable
import org.apache.spark.sql.functions._
val df = spark.createDataFrame(Seq(("Alice", 25), ("Bob", 30))).toDF("name", "age")
try {
val df2 = df.select("name", "age")
.withColumn("age_group", when(col("age") < 18, "minor").otherwise("adult"))
.filter(col("age_with_typo") > 6)
df2.columns // Trigger eager analysis to catch the error
} catch {
case e: SparkThrowable => println(s"Error: ${e.getMessage}")
}
Evite muitos pedidos de análise ansiosos
O desempenho pode ser melhorado se você evitar um grande número de solicitações de análise, evitando o uso excessivo de chamadas que acionam análises imediatas (como df.columns, df.schema).
Se não for possível evitar isso e você precisar verificar frequentemente as colunas de novos data frames, mantenha um conjunto de registros para rastrear os nomes das colunas e evitar solicitações de análise.
- Python
- Scala
df = spark.range(10)
columns = set(df.columns) # Maintain the set of column names
for i in range(200):
new_column_name = str(i)
if new_column_name not in columns: # Check the set
df = df.withColumn(new_column_name, F.col("id") + i)
columns.add(new_column_name)
df.show()
import org.apache.spark.sql.functions._
var df = spark.range(10).toDF
val columns = scala.collection.mutable.Set(df.columns: _*)
for (i <- 0 until 200) {
val newColumnName = i.toString
if (!columns.contains(newColumnName)) {
df = df.withColumn(newColumnName, col("id") + i)
columns.add(newColumnName)
}
}
df.show()
Outro caso semelhante é a criação de um grande número de DataFrames intermediários desnecessários e sua posterior análise. Em vez disso, obtenha informações do campo StructType diretamente do esquema do DataFrame em vez de criar DataFrames intermediários.
- Python
- Scala
from pyspark.sql.types import StructType
df = ...
struct_column_fields = {
column_schema.name: [f.name for f in column_schema.dataType.fields]
for column_schema in df.schema
if isinstance(column_schema.dataType, StructType)
}
print(struct_column_fields)
import org.apache.spark.sql.types.StructType
df = ...
val structColumnFields = df.schema.fields
.filter(_.dataType.isInstanceOf[StructType])
.map { field =>
field.name -> field.dataType.asInstanceOf[StructType].fields.map(_.name)
}
.toMap
println(structColumnFields)