Compatibilidade de versão do ambiente
Beta
As versões de ambiente para SDP estão em versão Beta.
pipeline com versão de ambiente definida para execução de código Python através do Spark Connect. Esta página aborda o que é incompatível, o que se comporta de maneira diferente, como analisar um pipeline em busca de padrões afetados e como migrar um pipeline existente.
Limitações
As versões de ambiente ainda não são compatíveis com todas as funcionalidades do pipeline. A execução de um pipeline com uma versão de ambiente definida falha se o código Python do pipeline fizer qualquer uma das seguintes ações:
- Altera o estado da sessão Spark dentro de uma função decorada com um decorador de pipeline. Exemplos incluem
spark.conf.set(...),spark.sql("USE CATALOG ...")ecreateOrReplaceTempView. - Usa APIs PySpark que não estão disponíveis no Spark Connect, incluindo
SparkContext,RDD,SQLContexte quaisquer APIs Py4J. Consulte a seção "O que é compatível com o Spark Connect".
Se habilitar uma versão de ambiente em um pipeline causar falha, desabilitar essa versão fará com que o pipeline retorne ao seu estado anterior.
Mudanças de comportamento
O Spark Connect apresenta algumas pequenas diferenças de comportamento em relação ao ambiente de execução PySpark clássico. Consulte a comparação entre Spark Connect e Spark clássico para obter informações completas. A verificação de compatibilidade detecta esses padrões antecipadamente e bloqueia a ativação até que sejam resolvidos, permitindo que você os encontre e corrija antes que afetem os dados de produção.
Em um pipeline, as situações mais comuns em que o comportamento pode diferir são:
- Construção DataFrame intercalados e mutação de sessão
- UDFs que fazem referência a um estado mutável do Python
Construção DataFrame intercalados e mutação de sessão
Quando um pipeline constrói um DataFrame e, em seguida, modifica o estado da sessão Spark (por exemplo, altera o catálogo ou esquema default , define uma configuração, substitui uma view temporária ou registra novamente uma UDF), ele usa o DataFrame:
- Sem uma versão de ambiente, o DataFrame utiliza o estado da sessão anterior à mutação .
- Com uma versão de ambiente, o DataFrame utiliza o estado da sessão pós-mutação .
Por exemplo:
from pyspark import pipelines as dp
spark.createDataFrame([(1, "Original Row")], ["id", "data"]) \
.createOrReplaceTempView("my_view")
df = spark.sql("SELECT * FROM my_view")
spark.createDataFrame([(2, "Replaced Row")], ["id", "data"]) \
.createOrReplaceTempView("my_view")
@dp.materialized_view
def mytable():
return df
Sem uma versão de ambiente, mytable contém [(1, "Original Row")]. Com uma versão de ambiente, mytable contém [(2, "Replaced Row")].
UDFs que fazem referência a um estado mutável do Python
Quando uma UDF faz referência a uma variável global do Python cujo valor muda após a definição da UDF:
- Sem uma versão de ambiente, a UDF usa o valor mais recente da variável.
- Com uma versão de ambiente, a UDF usa o valor no momento em que a UDF foi definida .
Por exemplo:
from pyspark import pipelines as dp
from pyspark.sql.functions import col, udf
suffix = "a"
@udf
def my_udf(s):
return s + suffix
suffix = "b"
@dp.materialized_view
def my_mv():
return spark.createDataFrame([("alex",)], ["name"]).select(my_udf(col("name")))
Sem uma versão de ambiente, my_mv contém [("alex_b",)]. Com uma versão de ambiente, my_mv contém [("alex_a",)].
Se um pipeline depender de algum desses padrões, faça uma auditoria antes de habilitar uma versão do ambiente.
Verificação de compatibilidade
A verificação de compatibilidade ajuda você a encontrar padrões de código em seu pipeline que produziriam resultados diferentes em uma determinada versão de ambiente, antes de você habilitá-la. A realização do exame é opcional. Quando a varredura está habilitada em um pipeline:
- Cada execução do pipeline emite um evento
BehaviorChangeInSparkConnectWARNno log de eventos do pipeline por padrão detectado. - Não é possível ativar uma versão de ambiente no pipeline até que todos os avisos de compatibilidade da atualização anterior, que foi bem-sucedida, sejam resolvidos.
Se a varredura não estiver ativada, nenhum evento será emitido e a ativação de environment_version não será bloqueada. A Databricks recomenda ativar a verificação e resolver quaisquer padrões detectados antes de ativar uma versão de ambiente no pipeline.
Ative a verificação em um pipeline.
Você pode habilitar a verificação de compatibilidade adicionando a configuração de pipeline pipelines.environmentVersion.enableCompatibilityScan Você pode adicionar a configuração por meio da interface do editor de pipeline ou adicionando uma entrada ao JSON de configuração do pipeline.
Através da interface do usuário :
- No editor de pipeline, clique em Configurações .
- Localize a seção Configuração nas configurações do pipeline.
- Clique
Adicionar configuração .
- Insira
pipelines.environmentVersion.enableCompatibilityScancomo key etruecomo valor. - Salve as configurações do pipeline.
No pipeline JSON :
Adicione a seguinte entrada ao bloco configuration :
"configuration": {
"pipelines.environmentVersion.enableCompatibilityScan": "true"
}
Fluxo de trabalho recomendado
- Ative a verificação no pipeline.
- Acione a execução de um pipeline.
- Consulte o log de eventos do pipeline para eventos
BehaviorChangeInSparkConnectWARN. Consulte a referência de eventos de compatibilidade para obter a lista completa de códigos de problemas, padrões de exemplo e correções sugeridas. - Atualize o código pipeline para remover os padrões detectados e execute o pipeline novamente até que nenhum evento seja emitido.
- Adicione
environment_versionao pipeline usando um dos métodos em Habilitar uma versão de ambiente em um pipeline.
Se você acredita que um aviso de compatibilidade é um falso positivo e deseja habilitar environment_version mesmo assim, remova a entrada pipelines.environmentVersion.enableCompatibilityScan da configuração do pipeline para ignorar a verificação. (Não é permitido definir o valor como false — você deve remover a entrada completamente.)
A verificação prévia não é executada em pipelines que não possuem atualizações anteriores ou em pipelines que já possuem uma versão de ambiente definida.
Migrar um pipeline existente para versões de ambiente
Para migrar um pipeline existente que ainda não utiliza uma versão de ambiente, siga este fluxo de trabalho completo. Este guia orienta você na identificação de padrões de código que podem se comportar de maneira diferente no Spark Connect, na correção desses problemas e na implantação segura da versão do ambiente.
-
Ative a verificação de compatibilidade no pipeline. Ative a verificação no pipeline conforme descrito em Verificação de compatibilidade. É isso que faz com que os padrões detectados apareçam no log de eventos e que permite a verificação prévia que protege sua tentativa de ativação.
-
Acione a execução de um pipeline e verifique os eventos de compatibilidade. Acione uma atualização normal do pipeline. Após a conclusão bem-sucedida, consulte o log de eventos do pipeline para
BehaviorChangeInSparkConnectWARNeventos. Cada evento reporta um padrão detectado. Consulte a referência de eventos de compatibilidade para obter a lista completa de códigos de problemas, padrões de exemplo e correções sugeridas. -
Atualize o código do seu pipeline para abordar os padrões detectados. Para cada padrão detectado, atualize o código do seu pipeline seguindo a correção sugerida. Após cada alteração, acione outra atualização do pipeline e verifique se os eventos correspondentes não aparecem mais. Repita o processo até que o log de eventos não mostre mais nenhum evento de compatibilidade referente a uma atualização bem-sucedida.
-
Habilite a versão do ambiente no pipeline. Após a atualização bem-sucedida mais recente não ter eventos de compatibilidade, adicione
environment_versionao pipeline usando a interface do usuário, a API ou o pacote, conforme descrito em Habilitar uma versão de ambiente em um pipeline. A próxima atualização será executada com o Spark Connect, especificando a versão da linguagem Python e a biblioteca pré-instalada.Se a atualização falhar porque ainda existem avisos de compatibilidade, remova o
environment_version, retorne ao passo 2 e resolva os avisos restantes antes de tentar novamente. -
Verifique a migração. Após a conclusão da primeira atualização com a versão do ambiente, verifique:
- O evento
create_updateno log eventos mostraenvironment_versiondefinido com o valor esperado. - O pipeline produz os dados esperados e nenhum novo evento de erro aparece.
- Verifique pontualmente as tabelas subsequentes para identificar quaisquer diferenças sutis de comportamento descritas em Alterações de comportamento.
- O evento
Reverter
Se o pipeline apresentar comportamento inadequado após a migração, remova o environment_version das configurações do pipeline. A próxima atualização será executada com a configuração de tempo de execução do Python anterior. Use a execução revertida para depurar e, em seguida, repita a migração a partir da etapa 2 depois de identificar e corrigir o problema.
Referência de eventos de compatibilidade
Quando a verificação de compatibilidade está habilitada em um pipeline, o SDP emite um evento BehaviorChangeInSparkConnect WARN no log de eventos do pipeline para cada padrão detectado. Quando a varredura está ativada e a atualização anterior bem-sucedida detectou algum padrão, o SDP também bloqueia a ativação environment_version até que os padrões sejam resolvidos.
Cada evento gera um único código de problema que identifica o que foi detectado. Para consultar um código, localize-o na tabela de códigos de problemas — cada linha contém um link para a seção da categoria que apresenta um padrão de exemplo e a correção sugerida.
Formato do evento
BehaviorChangeInSparkConnect Os eventos seguem o esquema padrão log de eventospipeline:
event_typeébehavior_change_in_spark_connect.leveléWARN.detailscontém o objetobehavior_change_in_spark_connect, que possui um único campoissue. O valor da emissão é um dos códigos listados abaixo.messageÉ uma descrição legível por humanos do padrão detectado.
Códigos de emissão
Categoria | Código do problema | Descrição |
|---|---|---|
| O catálogo default foi alterado após a criação de um DataFrame . O DataFrame existente pode resolver tabelas usando o novo catálogo default . | |
|
| |
| O banco de dados default foi alterado após a criação de um DataFrame . O DataFrame existente pode resolver tabelas usando o novo banco de dados default . | |
|
| |
| A função de fluxo chama um comando de ponto de verificação. | |
| A função de fluxo cria imediatamente uma view DataFrame ( | |
| A função de fluxo cria um perfil de recurso. | |
| A função de fluxo chama | |
| A função de fluxo executa um eager | |
| A função de fluxo realiza uma transação ávida Spark ML . | |
| A função de fluxo registra uma fonte de dados Python . | |
| A função de fluxo opera em um identificador de consulta de transmissão ativo. | |
| A função de fluxo registra ou remove um ouvinte de consulta de transmissão. | |
| A função de fluxo chama | |
| A função de fluxo executa uma | |
| A função de fluxo executa uma | |
| A função de fluxo inicia uma consulta de transmissão ( | |
|
| |
|
| |
|
| |
| Uma view temporária global foi substituída após a criação de um DataFrame que a referenciava. A substituição pode ser refletida no DataFrame existente. | |
| Uma view temporária foi substituída após a criação de um DataFrame que a referenciava. A substituição pode ser refletida no DataFrame existente. | |
| Uma UDF foi registrada novamente com o mesmo nome após a criação de um DataFrame que a referenciava. O DataFrame existente pode usar a nova definição UDF. | |
| Uma UDTF foi registrada novamente com o mesmo nome após a criação de um DataFrame que a referenciava. O DataFrame existente pode usar a nova definição UDTF. | |
| Uma UDF (Função Definida pelo Usuário) faz referência a uma variável global mutável em Python. Com uma versão de ambiente, a UDF usa o valor da variável no momento em que a UDF foi definida, e não no momento da invocação. | |
| Uma UDTF faz referência a uma variável Python global e mutável. Com uma versão de ambiente, a UDTF usa o valor da variável no momento em que a UDTF foi definida, e não no momento da invocação. |
Mutações de banco de dados e catálogo
Esses problemas ocorrem quando o código pipeline modifica o banco de dados ou o catálogo default . Com uma versão de ambiente, os DataFrames construídos antes da mutação podem resolver tabelas usando o novo banco de dados ou catálogo.
Exemplo de padrão que dispara um evento:
from pyspark import pipelines as dp
spark.sql("USE CATALOG marketing")
df = spark.read.table("events")
spark.sql("USE CATALOG sales") # changes the default catalog after df was created
@dp.materialized_view
def events_summary():
return df.groupBy("region").count()
Sem uma versão de ambiente, df resolve events do catálogo marketing . Com uma versão de ambiente, df resolve events do catálogo sales .
Sugestão de correção: qualifique totalmente os nomes das tabelas para que a resolução não dependa do catálogo ou banco de dados default e evite alterar o catálogo ou banco de dados default entre a criação e o uso DataFrame .
from pyspark import pipelines as dp
df = spark.read.table("marketing.default.events")
@dp.materialized_view
def events_summary():
return df.groupBy("region").count()
mutações de configuração do Spark
Esses problemas ocorrem quando o código do pipeline modifica a configuração do Spark de maneiras que podem alterar o comportamento do DataFrame em uma determinada versão do ambiente.
Exemplo de padrão que dispara um evento:
from pyspark import pipelines as dp
df = spark.read.table("events")
spark.conf.set("spark.sql.ansi.enabled", "true") # changes session conf after df was created
@dp.materialized_view
def events_strict():
return df.selectExpr("CAST(price AS INT) AS price")
Sem uma versão de ambiente, a conversão usa o valor de configuração (conf) no momento da criação do DataFrame. Com uma versão de ambiente, a conversão usa spark.sql.ansi.enabled=true e pode falhar com entrada inválida.
Solução sugerida: Defina todas as configurações necessárias do Spark no início do arquivo de pipeline, antes da criação de qualquer DataFrame. Para configuração por consulta, use a configuração configuration do pipeline na especificação do pipeline.
Substituições temporárias view
Esses problemas ocorrem quando o código pipeline substitui uma view temporária após a criação de um DataFrame que a referencia. Com uma versão de ambiente, o DataFrame existente pode refletir o novo conteúdo view .
Exemplo de padrão que dispara um evento:
from pyspark import pipelines as dp
spark.createDataFrame([(1, "Original Row")], ["id", "data"]) \
.createOrReplaceTempView("my_view")
df = spark.sql("SELECT * FROM my_view")
spark.createDataFrame([(2, "Replaced Row")], ["id", "data"]) \
.createOrReplaceTempView("my_view")
@dp.materialized_view
def mytable():
return df
Sem uma versão de ambiente, mytable contém [(1, "Original Row")]. Com uma versão de ambiente, mytable contém [(2, "Replaced Row")].
Solução sugerida: Crie cada view temporária apenas uma vez e não a substitua. Se você precisar de várias visualizações com dados relacionados, dê a cada uma um nome distinto.
Mutações UDF e UDTF
Esses problemas são gerados quando o código do pipeline modifica uma UDF ou UDTF de maneiras que alteram o comportamento em uma determinada versão do ambiente.
Exemplo de padrão que dispara um evento:
from pyspark import pipelines as dp
from pyspark.sql.functions import col, udf
suffix = "a"
@udf
def my_udf(s):
return s + suffix
suffix = "b"
@dp.materialized_view
def my_mv():
return spark.createDataFrame([("alex",)], ["name"]).select(my_udf(col("name")))
Sem uma versão de ambiente, my_mv contém [("alex_b",)]. Com uma versão de ambiente, my_mv contém [("alex_a",)].
Solução sugerida: Passe os valores para a UDF como argumentos em vez de capturá-los de variáveis globais do Python, ou defina a variável global antes de definir a UDF e não a modifique posteriormente.
from pyspark import pipelines as dp
from pyspark.sql.functions import col, lit, udf
@udf
def append_suffix(s, suffix):
return s + suffix
@dp.materialized_view
def my_mv():
return spark.createDataFrame([("alex",)], ["name"]).select(append_suffix(col("name"), lit("b")))
Execução ágil dentro de funções de fluxo
Esses problemas são emitidos quando o código pipeline executa um comando Spark imediato dentro de uma função decorada por um decorador de pipeline (@table, @materialized_view, etc). Espera-se que as funções de fluxo definam e retornem um DataFrame; Comando ansioso que grava dados, gerencia transmissão de consultas, registro recurso ou execução de operações ML não é permitido dentro de uma função de fluxo com uma versão de ambiente definida.
Sugestão de solução: Mova as operações eager para fora da função de fluxo e retorne um DataFrame da função de fluxo. Efeitos colaterais, como escrever em uma tabela ou iniciar uma consulta de transmissão, estão fora da definição pipeline ; o mecanismo pipeline lida com a materialização do DataFrame retornado pela função de fluxo.
Encontre eventos de compatibilidade no logde eventos.
A consulta a seguir retorna todos os eventos de compatibilidade para um pipeline, ordenados do mais recente para o mais antigo:
SELECT
timestamp,
message,
details:behavior_change_in_spark_connect:issue AS issue
FROM event_log(<pipeline-id>)
WHERE event_type = 'behavior_change_in_spark_connect'
AND level = 'WARN'
ORDER BY timestamp DESC;
Para contabilizar eventos por código de problema em atualizações recentes:
SELECT
details:behavior_change_in_spark_connect:issue AS issue,
COUNT(*) AS occurrences
FROM event_log(<pipeline-id>)
WHERE event_type = 'behavior_change_in_spark_connect'
AND level = 'WARN'
GROUP BY 1
ORDER BY occurrences DESC;
Para saber como consultar o log de eventos, consulte Consultar o logde eventos.
Veja também
- Configurar versões de ambiente para o pipeline — visão geral do recurso, como habilitar uma versão de ambiente.
- Esquema log de eventos do pipeline — esquema completo log eventos pipeline .
- logde eventos do pipeline — como consultar o log de eventos pipeline .