Compare o Spark Connect com o Spark Classic.
O Spark Connect é um protocolo baseado em gRPC dentro do Apache Spark que especifica como um aplicativo cliente pode se comunicar com um servidor Spark remoto. Permite a execução remota de cargas de trabalho do Spark usando a API DataFrame.
O Spark Connect é utilizado nos seguintes casos:
- Notebook Scala com Databricks Runtime versão 13.3 ou superior, em computepadrão.
- Notebook Python com Databricks Runtime versão 14.3 ou superior, em computepadrão.
- computação sem servidor
- Databricks Connect
Embora o Spark Connect e o Spark Classic utilizem execução preguiçosa para transformações, existem diferenças importantes que você precisa conhecer para evitar comportamentos inesperados e problemas de desempenho ao migrar código existente do Spark Classic para o Spark Connect ou ao escrever código que precisa funcionar com ambos.
Preguiçoso vs. ansioso
A key diferença entre Spark Connect e Spark Classic é que Spark Connect adia a análise e a resolução de nomes para o momento da execução, conforme resumido na tabela a seguir.
Aspecto | Spark Classic | Spark Connect |
|---|---|---|
Execução de consulta | Preguiçoso | Preguiçoso |
Análise de esquemas | Ansioso | Preguiçoso |
Acesso ao esquema | Local | Aciona RPC e armazena o esquema em cache no primeiro acesso. |
Vista temporária | Plano incorporado | Pesquisa de nome |
Serialização UDF | Na criação | Na execução |
Execução de consulta
Tanto o Spark Classic quanto o Spark Connect seguem o mesmo modelo de execução preguiçosa para a execução de consultas.
No Spark Classic, as transformações de DataFrame (como filter e limit) são preguiçosas. Isso significa que elas não são executadas imediatamente, mas estão codificadas em um plano lógico. O cálculo propriamente dito só é acionado com uma ação (como show(), collect()).
O Spark Connect segue um modelo de avaliação preguiçosa semelhante. As transformações são construídas no lado do cliente e enviadas como planos não resolvidos para o servidor. Em seguida, o servidor realiza a análise e a execução necessárias quando uma ação é chamada.
Aspecto | Spark Classic | Spark Connect |
|---|---|---|
transformações: | Execução preguiçosa | Execução preguiçosa |
Consultas SQL: | Execução preguiçosa | Execução preguiçosa |
Ações: | Execução ansiosa | Execução ansiosa |
Comando SQL : | Execução ansiosa | Execução ansiosa |
Análise de esquemas
Spark Classic realiza análises com agilidade durante a construção do plano lógico. Esta fase de análise converte o plano não resolvido em um plano lógico totalmente resolvido e verifica se as operações podem ser executadas pelo Spark. Um dos key benefícios de realizar esse trabalho com entusiasmo é que os usuários recebem feedback imediato quando um erro é cometido. Por exemplo, executar spark.sql("select 1 as a, 2 as b").filter("c > 1") lançará um erro imediatamente, indicando que a coluna c não pode ser encontrada.
Spark Connect difere do Classic porque o cliente constrói planos não resolvidos durante as transformações e adia sua análise. Qualquer operação que exija um plano resolvido — como acessar um esquema, explicar o plano, persistir um DataFrame ou executar uma ação — faz com que o cliente envie os planos não resolvidos para o servidor via RPC. Em seguida, o servidor realiza uma análise completa para obter seu plano lógico definido e executar as operações. Por exemplo, spark.sql("select 1 as a, 2 as b").filter("c > 1") não lançará nenhum erro porque o plano não resolvido é apenas do lado do cliente, mas em df.columns ou df.show() um erro será lançado porque o plano não resolvido é enviado para o servidor para análise.
Ao contrário da execução de consultas, o Spark Classic e o Spark Connect diferem no momento em que a análise de esquema ocorre.
Aspecto | Spark Classic | Spark Connect |
|---|---|---|
transformações: | Ansioso | Preguiçoso |
Acesso ao esquema: | Ansioso | Ansioso Aciona uma solicitação RPC de análise, diferentemente do Spark Classic. |
Ações: | Ansioso | Ansioso |
Estado de sessão dependente de DataFrames: UDFs, visualização temporária, configurações | Ansioso | Preguiçoso Avaliado durante a execução do plano do DataFrame |
Estado da sessão dependente da visualização temporária: UDFs, outra visualização temporária, configurações | Ansioso | Ansioso A análise é acionada imediatamente ao criar a viewtemporária. |
Melhores práticas
A diferença entre análise preguiçosa e análise ávida significa que existem algumas boas práticas a serem seguidas para evitar comportamentos inesperados e problemas de desempenho, especificamente aqueles causados pela sobrescrita de nomes view temporários, captura de variáveis externas em UDFs, detecção tardia de erros e acesso excessivo ao esquema em novos DataFrames.
Crie nomes view temporários exclusivos.
No Spark Connect, o DataFrame armazena apenas uma referência à view temporária pelo nome. Consequentemente, se a view temporária for substituída posteriormente, os dados no DataFrame também serão alterados, pois a consulta da view é feita pelo nome em tempo de execução.
Esse comportamento difere do Spark Classic, onde o plano lógico da view temporária é incorporado ao plano do dataframe no momento da criação. Qualquer substituição subsequente da view temporária não afeta o quadro de dados original.
Para minimizar a diferença, crie sempre nomes view temporários únicos. Por exemplo, inclua um UUID no nome view . Isso evita afetar quaisquer DataFrames existentes que façam referência a uma view temporária previamente registrada.
- Python
- Scala
import uuid
def create_temp_view_and_create_dataframe(x):
temp_view_name = f"`temp_view_{uuid.uuid4()}`" # Use a random name to avoid conflicts.
spark.range(x).createOrReplaceTempView(temp_view_name)
return spark.table(temp_view_name)
df10 = create_temp_view_and_create_dataframe(10)
assert len(df10.collect()) == 10
df100 = create_temp_view_and_create_dataframe(100)
assert len(df10.collect()) == 10 # It works as expected now.
assert len(df100.collect()) == 100
import java.util.UUID
def createTempViewAndDataFrame(x: Int) = {
val tempViewName = s"`temp_view_${UUID.randomUUID()}`"
spark.range(x).createOrReplaceTempView(tempViewName)
spark.table(tempViewName)
}
val df10 = createTempViewAndDataFrame(10)
assert(df10.collect().length == 10)
val df100 = createTempViewAndDataFrame(100)
assert(df10.collect().length == 10) // Works as expected
assert(df100.collect().length == 100)
Envolver definições UDF
Geralmente é considerada uma má prática que as UDFs dependam de variáveis externas mutáveis, pois isso introduz dependências implícitas, pode levar a comportamentos não determinísticos e reduz a capacidade de composição. No entanto, se você tiver um padrão desse tipo, fique atento à seguinte armadilha:
No Spark Connect, as UDFs em Python são preguiçosas. Sua serialização e registro são adiados até o momento da execução. No exemplo a seguir, a UDF é serializada e carregada no cluster Spark para execução somente quando show() é chamado.
from pyspark.sql.functions import udf
x = 123
@udf("INT")
def foo():
return x
df = spark.range(1).select(foo())
x = 456
df.show() # Prints 456
Esse comportamento difere do Spark Classic, onde as UDFs são criadas imediatamente. No Spark Classic, o valor de x no momento da criação da UDF é capturado, portanto, alterações subsequentes em x não afetam a UDF já criada.
Se precisar modificar o valor de variáveis externas das quais uma UDF depende, use uma fábrica de funções (closure com vinculação antecipada) para capturar corretamente os valores das variáveis. Especificamente, envolva a criação da UDF em uma função auxiliar para capturar o valor de uma variável dependente.
- Python
- Scala
from pyspark.sql.functions import udf
def make_udf(value):
def foo():
return value
return udf(foo)
x = 123
foo_udf = make_udf(x)
x = 456
df = spark.range(1).select(foo_udf())
df.show() # Prints 123 as expected
def makeUDF(value: Int) = udf(() => value)
var x = 123
val fooUDF = makeUDF(x) // Captures the current value
x = 456
val df = spark.range(1).select(fooUDF())
df.show() // Prints 123 as expected
Ao envolver a definição UDF dentro de outra função (make_udf), criamos um novo escopo onde o valor atual de x é passado como argumento. Isso garante que cada UDF gerada tenha sua própria cópia do campo, vinculada no momento em que a UDF é criada.
Acione a análise imediata para detecção de erros.
O seguinte tratamento de erros é útil no Spark Classic porque realiza uma análise imediata, o que permite que as exceções sejam lançadas prontamente. No entanto, no Spark Connect, esse código não causa nenhum problema, pois apenas constrói um plano local não resolvido sem acionar nenhuma análise.
df = spark.createDataFrame([("Alice", 25), ("Bob", 30)], ["name", "age"])
try:
df = df.select("name", "age")
df = df.withColumn(
"age_group",
when(col("age") < 18, "minor").otherwise("adult"))
df = df.filter(col("age_with_typo") > 6) # The use of non-existing column name will not throw analysis exception in Spark Connect
except Exception as e:
print(f"Error: {repr(e)}")
Se o seu código depende da exceção de análise e você deseja capturá-la, você pode acionar a análise imediata, por exemplo com df.columns, df.schema ou df.collect().
- Python
- Scala
try:
df = ...
df.columns # This will trigger eager analysis
except Exception as e:
print(f"Error: {repr(e)}")
import org.apache.spark.SparkThrowable
import org.apache.spark.sql.functions._
val df = spark.createDataFrame(Seq(("Alice", 25), ("Bob", 30))).toDF("name", "age")
try {
val df2 = df.select("name", "age")
.withColumn("age_group", when(col("age") < 18, "minor").otherwise("adult"))
.filter(col("age_with_typo") > 6)
df2.columns // Trigger eager analysis to catch the error
} catch {
case e: SparkThrowable => println(s"Error: ${e.getMessage}")
}
Evite muitos pedidos de análise ansiosos
O desempenho pode ser melhorado se você evitar solicitações de análise em um grande número de DataFrames.
Criando novos DataFrames passo a passo e acessando seus esquemas em cada iteração.
Ao criar um grande número de novos DataFrames, evite o uso excessivo de chamadas que desencadeiam análises imediatas neles (como df.columns, df.schema). Você pode acessar o esquema do mesmo DataFrame várias vezes, mas acionar análises em muitos DataFrames recém-criados afetará o desempenho.
Por exemplo, ao adicionar colunas iterativamente a um DataFrame dentro de um loop e verificar se cada coluna já existe antes de adicioná-la, chamar df.columns em cada DataFrame recém-criado aciona uma solicitação de análise em cada iteração. Para evitar isso, mantenha um conjunto de registros para controlar os nomes das colunas em vez de acessar repetidamente o esquema do DataFrame.
- Python
- Scala
df = spark.range(10)
columns = set(df.columns) # Maintain the set of column names
for i in range(200):
new_column_name = str(i)
# if new_column_name not in df.columns: # Bad practice. The `df.columns` call causes an analysis request on the newly created DataFrame in every iteration.
if new_column_name not in columns: # Check the set without triggering analysis
df = df.withColumn(new_column_name, F.col("id") + i)
columns.add(new_column_name)
df.show()
import org.apache.spark.sql.functions._
var df = spark.range(10).toDF
val columns = scala.collection.mutable.Set(df.columns: _*)
for (i <- 0 until 200) {
val newColumnName = i.toString
// if (!df.columns.contains(newColumnName)) { // Bad practice. The `df.columns` call causes an analysis request on the newly created DataFrame in every iteration.
if (!columns.contains(newColumnName)) { // Check the set without triggering analysis
df = df.withColumn(newColumnName, col("id") + i)
columns.add(newColumnName)
}
}
df.show()
Evite acessar esquemas para um grande número de DataFrames intermediários.
Outro caso semelhante é a criação de um grande número de DataFrames intermediários desnecessários e sua posterior análise. No caso a seguir, para extrair os nomes dos campos de cada coluna de um tipo struct, obtenha as informações do campo StructType diretamente do esquema do DataFrame em vez de criar DataFrames intermediários.
- Python
- Scala
from pyspark.sql.types import StructType
df = ...
struct_column_fields = {
# column_schema.name: df.select(column_schema.name + ".*").columns # Bad practice. This creates an intermediate DataFrame and triggers an analysis request for each StructType column.
column_schema.name: [f.name for f in column_schema.dataType.fields] # Access StructType fields directly from the schema, avoiding analysis on intermediate DataFrames.
for column_schema in df.schema
if isinstance(column_schema.dataType, StructType)
}
print(struct_column_fields)
import org.apache.spark.sql.types.StructType
df = ...
val structColumnFields = df.schema.fields
.filter(_.dataType.isInstanceOf[StructType])
.map { field =>
// field.name -> df.select(field.name + ".*").columns // Bad practice. This creates an intermediate DataFrame and triggers analysis for each StructType column.
field.name -> field.dataType.asInstanceOf[StructType].fields.map(_.name) // Access StructType fields directly from the schema, avoiding analysis on intermediate DataFrames.
}
.toMap
println(structColumnFields)