Pular para o conteúdo principal

Compare o Spark Connect com o Spark Classic.

O Spark Connect é um protocolo baseado em gRPC dentro do Apache Spark que especifica como um aplicativo cliente pode se comunicar com um servidor Spark remoto. Permite a execução remota de cargas de trabalho do Spark usando a API DataFrame.

O Spark Connect é utilizado nos seguintes casos:

  • Scala Notebook com Databricks Runtime versão 13.3 ou superior, em computepadrão e dedicados.
  • Notebook Python com Databricks Runtime versão 14.3 ou superior, em computepadrão e dedicados.
  • computação sem servidor
  • Databricks Connect

Embora o Spark Connect e o Spark Classic utilizem execução preguiçosa para transformações, existem diferenças importantes que você precisa conhecer para evitar comportamentos inesperados e problemas de desempenho ao migrar código existente do Spark Classic para o Spark Connect ou ao escrever código que precisa funcionar com ambos.

Preguiçoso vs. ansioso

A key diferença entre Spark Connect e Spark Classic é que Spark Connect adia a análise e a resolução de nomes para o momento da execução, conforme resumido na tabela a seguir.

Aspecto

Spark Classic

Spark Connect

Execução de consulta

Preguiçoso

Preguiçoso

Análise de esquemas

Ansioso

Preguiçoso

Acesso ao esquema

Local

Aciona RPC

Vista temporária

Plano incorporado

Pesquisa de nome

Serialização UDF

Na criação

Na execução

Execução de consulta

Tanto o Spark Classic quanto o Spark Connect seguem o mesmo modelo de execução preguiçosa para a execução de consultas.

No Spark Classic, as transformações de DataFrame (como filter e limit) são preguiçosas. Isso significa que elas não são executadas imediatamente, mas sim registradas em um plano lógico. O cálculo real é acionado apenas quando uma ação (como show(), collect()) é invocada.

O Spark Connect segue um modelo de avaliação preguiçosa semelhante. As transformações são construídas no lado do cliente e enviadas como planos proto não resolvidos para o servidor. Em seguida, o servidor realiza a análise e a execução necessárias quando uma ação é chamada.

Aspecto

Spark Classic

Spark Connect

transformações: df.filter(...), df.select(...), df.limit(...)

Execução preguiçosa

Execução preguiçosa

Consultas SQL: spark.sql("select …")

Execução preguiçosa

Execução preguiçosa

Ações: df.collect(), df.show()

Execução ansiosa

Execução ansiosa

Comando SQL : spark.sql("insert …"), spark.sql("create …")

Execução ansiosa

Execução ansiosa

Análise de esquemas

O Spark Classic realiza a análise de esquema de forma eficiente durante a fase de construção do plano lógico. Ao definir transformações, o Spark analisa imediatamente o esquema do DataFrame para garantir que todas as colunas e tipos de dados referenciados sejam válidos. Por exemplo, executar spark.sql("select 1 as a, 2 as b").filter("c > 1") lançará um erro imediatamente, indicando que a coluna c não pode ser encontrada.

Em vez disso, Spark Connect constrói planos proto não resolvidos durante as transformações. Ao acessar um esquema ou executar uma ação, o cliente envia os planos não resolvidos para o servidor via RPC (chamada de procedimento remoto). Em seguida, o servidor realiza a análise e a execução. Este projeto adia a análise do esquema. Por exemplo, spark.sql("select 1 as a, 2 as b").filter("c > 1") não lançará nenhum erro porque o plano não resolvido é apenas do lado do cliente, mas em df.columns ou df.show() um erro será lançado porque o plano não resolvido é enviado para o servidor para análise.

Ao contrário da execução de consultas, o Spark Classic e o Spark Connect diferem no momento em que a análise de esquema ocorre.

Aspecto

Spark Classic

Spark Connect

transformações: df.filter(...), df.select(...), df.limit(...)

Ansioso

Preguiçoso

Acesso ao esquema: df.columns, df.schema, df.isStreaming

Ansioso

Ansioso

Aciona uma solicitação RPC de análise, diferentemente do Spark Classic.

Ações: df.collect(), df.show()

Ansioso

Ansioso

Estado da sessão dependente: UDFs, visualização temporária, configurações

Ansioso

Preguiçoso

Avaliado durante a execução

Melhores práticas

A diferença entre análise preguiçosa e análise ávida significa que existem algumas boas práticas a serem seguidas para evitar comportamentos inesperados e problemas de desempenho, especificamente aqueles causados pela sobrescrita de nomes view temporários, captura de variáveis externas em UDFs, detecção tardia de erros e acesso excessivo ao esquema em novos DataFrames.

Crie nomes view temporários exclusivos.

No Spark Connect, o DataFrame armazena apenas uma referência à view temporária pelo nome. Consequentemente, se a view temporária for substituída posteriormente, os dados no DataFrame também serão alterados, pois a consulta da view é feita pelo nome em tempo de execução.

Esse comportamento difere do Spark Classic, onde o plano lógico da view temporária é incorporado ao plano do dataframe no momento da criação. Qualquer substituição subsequente da view temporária não afeta o quadro de dados original.

Para minimizar a diferença, crie sempre nomes view temporários únicos. Por exemplo, inclua um UUID no nome view . Isso evita afetar quaisquer DataFrames existentes que façam referência a uma view temporária previamente registrada.

Python
import uuid
def create_temp_view_and_create_dataframe(x):
temp_view_name = f"`temp_view_{uuid.uuid4()}`" # Use a random name to avoid conflicts.
spark.range(x).createOrReplaceTempView(temp_view_name)
return spark.table(temp_view_name)

df10 = create_temp_view_and_create_dataframe(10)
assert len(df10.collect()) == 10

df100 = create_temp_view_and_create_dataframe(100)
assert len(df10.collect()) == 10 # It works as expected now.
assert len(df100.collect()) == 100

Envolver definições UDF

No Spark Connect, as UDFs em Python são preguiçosas. Sua serialização e registro são adiados até o momento da execução. No exemplo a seguir, a UDF é serializada e carregada no cluster Spark para execução somente quando show() é chamado.

Python
from pyspark.sql.functions import udf

x = 123

@udf("INT")
def foo():
return x


df = spark.range(1).select(foo())
x = 456
df.show() # Prints 456

Esse comportamento difere do Spark Classic, onde as UDFs são criadas imediatamente. No Spark Classic, o valor de x no momento da criação da UDF é capturado, portanto, alterações subsequentes em x não afetam a UDF já criada.

Se precisar modificar o valor de variáveis externas das quais uma UDF depende, use uma fábrica de funções (closure com vinculação antecipada) para capturar corretamente os valores das variáveis. Especificamente, envolva a criação da UDF em uma função auxiliar para capturar o valor de uma variável dependente.

Python
from pyspark.sql.functions import udf

def make_udf(value):
def foo():
return value
return udf(foo)


x = 123
foo_udf = make_udf(x)
x = 456
df = spark.range(1).select(foo_udf())
df.show() # Prints 123 as expected

Ao envolver a definição UDF dentro de outra função (make_udf), criamos um novo escopo onde o valor atual de x é passado como argumento. Isso garante que cada UDF gerada tenha sua própria cópia do campo, vinculada no momento em que a UDF é criada.

Acione a análise imediata para detecção de erros.

O seguinte tratamento de erros é útil no Spark Classic porque realiza uma análise imediata, o que permite que as exceções sejam lançadas prontamente. No entanto, no Spark Connect, esse código não causa nenhum problema, pois apenas constrói um plano proto local não resolvido sem acionar nenhuma análise.

Python
df = spark.createDataFrame([("Alice", 25), ("Bob", 30)], ["name", "age"])

try:
df = df.select("name", "age")
df = df.withColumn(
"age_group",
when(col("age") < 18, "minor").otherwise("adult"))
df = df.filter(col("age_with_typo") > 6) # The use of non-existing column name will not throw analysis exception in Spark Connect
except Exception as e:
print(f"Error: {repr(e)}")

Se o seu código depende da exceção de análise e você deseja capturá-la, você pode acionar a análise imediata, por exemplo com df.columns, df.schema ou df.collect().

Python
try:
df = ...
df.columns # This will trigger eager analysis
except Exception as e:
print(f"Error: {repr(e)}")

Evite muitos pedidos de análise ansiosos

O desempenho pode ser melhorado se você evitar um grande número de solicitações de análise, evitando o uso excessivo de chamadas que acionam análises imediatas (como df.columns, df.schema).

Se não for possível evitar isso e você precisar verificar frequentemente as colunas de novos data frames, mantenha um conjunto de registros para rastrear os nomes das colunas e evitar solicitações de análise.

Python
df = spark.range(10)
columns = set(df.columns) # Maintain the set of column names
for i in range(200):
new_column_name = str(i)
if new_column_name not in columns: # Check the set
df = df.withColumn(new_column_name, F.col("id") + i)
columns.add(new_column_name)
df.show()

Outro caso semelhante é a criação de um grande número de DataFrames intermediários desnecessários e sua posterior análise. Em vez disso, obtenha informações do campo StructType diretamente do esquema do DataFrame em vez de criar DataFrames intermediários.

Python
from pyspark.sql.types import StructType

df = ...
struct_column_fields = {
column_schema.name: [f.name for f in column_schema.dataType.fields]
for column_schema in df.schema
if isinstance(column_schema.dataType, StructType)
}
print(struct_column_fields)