Pular para o conteúdo principal

Atualizar o Job quando o usuário atualizar o espaço de trabalho legado para Unity Catalog

Quando o senhor faz upgrade do espaço de trabalho legado para Unity Catalog, talvez seja necessário atualizar o trabalho existente para fazer referência às tabelas e aos caminhos de arquivo atualizados. A tabela a seguir lista cenários típicos e sugestões para atualizar seu Job.

Cenário

soluções

Job está usando um Notebook que tem referências a uma biblioteca personalizada por meio de um init script ou de uma biblioteca definida por clustering.

Uma biblioteca personalizada seria definida como um pacote pip ou jar não disponível publicamente que executa operações de leitura ou gravação do Apache Spark ou SQL incorporadas em seu código.

Modifique a biblioteca personalizada para garantir que:

  • Os nomes dos bancos de dados têm namespaces de três níveis.
  • Os pontos de montagem não são usados no código.

Job está usando um Notebook que está lendo ou gravando em uma tabela do site Hive metastore.

  • Avalie a configuração do catálogo default na configuração do Job clustering Spark: spark.databricks.sql.initial.catalog.name my_catalog
  • Avalie se o catálogo workspace default pode ser definido como diferente de hive_metastore para que o código do trabalho não precise ser alterado.
  • Caso contrário, altere o código do trabalho para renomear os namespaces de dois níveis para namespaces de três níveis da tabela apropriada.
  • Se o trabalho estiver usando puramente SQL, considere adicionar uma declaração USE CATALOG.

Job está usando um Notebook que está lendo ou gravando em caminhos que são subpastas de tabelas. Isso não é possível no Unity Catalog.

  • Altere o código para ler da tabela com um predicado na coluna de partição.
  • Altere o código para escrever na tabela com overwriteByPartition ou outra opção apropriada.

Job está usando um Notebook que está lendo ou gravando em caminhos de montagem que são tabelas registradas em Unity Catalog

  • Altere o código para se referir à tabela correta com namespaces de três níveis.
  • Se a tabela não estiver registrada ou não estiver, o código ainda precisará ser modificado para ser gravado em um caminho de volume em vez de em um caminho de montagem.

Job está usando um Notebook que lê ou grava arquivos, não tabelas, por meio de caminhos de montagem.

Em vez disso, altere o código para gravar em um local de volume.

Job é um Job de transmissão que usa applyInPandasWithState.

Não há suporte no momento. Considere a possibilidade de reescrever, se possível, ou não tente refatorar esse trabalho até que o suporte seja fornecido.

Job é um Job de transmissão que usa o modo de processamento contínuo.

O modo de processamento contínuo é experimental no Spark e não é compatível com o Unity Catalog. Refatorar o trabalho para usar a transmissão estruturada. Se isso não for possível, considere manter o trabalho em execução no site Hive metastore.

Job é um trabalho de transmissão que usa diretórios de ponto de verificação.

  • Mova os diretórios de pontos de verificação para volumes.
  • Altere o código no Notebook para usar um caminho de volume.
  • Job O proprietário deve ter permissão de leitura e gravação nesse caminho.
  • Parar o trabalho.
  • Mova o ponto de verificação para o novo local do volume.
  • Reiniciar o trabalho.

Job tem uma definição de clustering abaixo de Databricks Runtime 11.3.

  • Altere a definição de clustering de trabalho para Databricks Runtime 11.3 ou acima.
  • Altere a definição de clustering de trabalhos para usar um modo de acesso designado ou padrão.

Job tem Notebook que interage com armazenamento ou tabelas.

A entidade de serviço na qual o trabalho foi executado deve ter acesso de leitura e gravação aos recursos necessários em Unity Catalog, como volumes, tabelas, locais externos e assim por diante.

Job é um pipeline declarativo LakeFlow.

  • Altere o Job clustering para Databricks Runtime 13.1 ou acima.
  • Interrompa o trabalho do pipeline declarativo LakeFlow.
  • Mova os dados para uma tabela gerenciadora do Unity Catalog.
  • Altere a definição de trabalho do pipeline declarativo LakeFlow para usar a nova tabela gerenciar Unity Catalog.
  • Reinicie o trabalho do pipeline declarativo LakeFlow.

Job O senhor tem um Notebook que usa um serviço de nuvem sem armazenamento, como o AWSKinesis, e a configuração usada para se conectar usa um instance profile.

  • Modifique o código para usar as credenciais do serviço Unity Catalog, que regem as credenciais capazes de interagir com o serviço de nuvem sem armazenamento, gerando credenciais temporárias utilizáveis pelos SDKs.

Job usos Scala

  • Se o senhor estiver abaixo de Databricks Runtime 13.3, execute-o no site dedicado compute.
  • O clustering padrão é suportado em Databricks Runtime 13.3 e acima.

Job tem Notebook que usa Scala UDFs

  • Se o senhor estiver abaixo de Databricks Runtime 13.3, execute-o no site dedicado compute.
  • O clustering padrão é compatível com o site Databricks Runtime 14.2.

Job tem tarefas que usam MLR

execução no site dedicado compute.

Job tem configuração de clustering que depende do script de inicialização global.

  • Use o site Databricks Runtime 13.3 ou o acima para obter suporte total.
  • Modifique para usar o script de inicialização com escopo de cluster ou usar a política de cluster. Os scripts, arquivos e pacotes devem ser instalados nos volumes Unity Catalog para serem executados.

Job tem configuração de clustering ou Notebook que usa jars/Maven, extensões de Spark ou fonte de dados personalizada (de Spark).

  • Use o site Databricks Runtime 13.3 ou o acima.
  • Use a política de cluster para instalar a biblioteca.

Job tem o Notebook que usa PySpark UDFs.

Use o site Databricks Runtime 13.2 ou o acima.

Job tem Notebook que tem Python código que faz chamadas de rede.

Use o site Databricks Runtime 12.2 ou o acima.

Job tem o Notebook que usa Pandas UDFs (escalar).

Use o site Databricks Runtime 13.2 ou o acima.

Job usará o site Unity Catalog Volumes.

Use o site Databricks Runtime 13.3 ou o acima.

Job tem Notebook que usa spark.catalog.X (tableExists, listTables, setDefaultCatalog) e execução usando um clustering compartilhado

  • Use o site Databricks Runtime 14.2 ou o acima.

  • Se não for possível fazer o upgrade do Databricks Runtime, use as etapas a seguir:

Em vez de tableExists, use o seguinte código:

# SQL workaround
def tableExistsSql(tablename):
try:
spark.sql(f"DESCRIBE TABLE {tablename};")
except Exception as e:
return False
return True
tableExistsSql("jakob.jakob.my_table")

Em vez de listTables, use SHOW TABLES (também permite restringir a correspondência de bancos de dados ou padrões):

spark.sql("SHOW TABLES")

Para a execução do setDefaultCatalog

spark.sql("USE CATALOG ")

Job tem um Notebook que usa o DButils interno API: comando Contexto e execução usando um clustering compartilhado.

Cargas de trabalho que tentam acessar o acesso ao comando, por exemplo, para recuperar um ID de trabalho, usando

dbutils.notebook.entry_point.getDbutils().notebook().getContext().toJson()

Em vez de .toJson(), use .safeToJson(). Isso fornece um subconjunto de todas as informações de contexto de comando que podem ser compartilhadas com segurança em um cluster compartilhado.

Requer o Databricks Runtime 13.3 LTS+

Job tem Notebook que usa PySpark: spark.udf.registerJavaFunction e execução usando um clustering compartilhado

  • Use Databricks Runtime 14.3 LTS ou acima
  • Para Notebook e Job, use uma célula scala %para registrar o Scala UDF usando spark.udf.registro. Como o Python e o Scala compartilham o contexto de execução, o UDF do Scala também estará disponível no Python.
  • Para os clientes que usam IDEs (usando o Databricks Connect v2), a única opção é reescrever o UDF como um UDF Python do Unity Catalog. No futuro, planejamos estender o suporte a UDFs do Unity Catalog para suportar Scala.

Job tem Notebook que usa RDDs: sc.parallelize & spark.read.json() para converter um objeto JSON em um DF e execução usando um clustering compartilhado

  • Em vez disso, use JSON.loads

Exemplo -

Antes:

json_content1 = "{'json_col1': 'hello', 'json_col2': 32}"
json_content2 = "{'json_col1': 'hello', 'json_col2': 'world'}"
json_list = []
json_list.append(json_content1)
json_list.append(json_content2)
df = spark.read.json(sc.parallelize(json_list))
display(df)

Depois de:

from pyspark.sql import Row
import json
# Sample JSON data as a list of dictionaries (similar to JSON objects)
json_data_str = response.text
json_data = [json.loads(json_data_str)]
# Convert dictionaries to Row objects
rows = [Row(**json_dict) for json_dict in json_data]
# Create DataFrame from list of Row objects
df = spark.createDataFrame(rows)
df.display()

Job tem Notebook que usa RDDs: Empty Dataframe via sc.emptyRDD() e execução usando um clustering compartilhado

Exemplo -

Antes:

val schema = StructType( StructField("k", StringType, true) :: StructField("v", IntegerType, false) :: Nil)
spark.createDataFrame(sc.emptyRDD[Row], schema)

Depois de:

import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}
val schema = StructType( StructField("k", StringType, true) :: StructField("v", IntegerType, false) :: Nil)
spark.createDataFrame(new java.util.ArrayList[Row](), schema)
from pyspark.sql.types import StructType, StructField, StringType
schema = StructType([StructField("k", StringType(), True)])
spark.createDataFrame([], schema)

Job tem Notebook que usa RDDs: mapPartitions (lógica de inicialização cara + operações mais baratas por linha) e execução usando um clustering compartilhado

  • Motivo -

Unity Catalog clustering compartilhado usando o Spark Connect para a comunicação entre os programas Python / Scala e o servidor Spark, fazendo com que os RDDs não sejam mais acessíveis.

Antes:

Um caso de uso típico de RDDs é executar a lógica de inicialização cara apenas uma vez e, em seguida, realizar operações mais baratas por linha. Esse caso de uso pode ser a chamada de um serviço externo ou a inicialização da lógica de criptografia.

Depois de:

Reescrever operações RDD usando a API Dataframe e usando UDFs Arrow nativas do PySpark.

Job tem Notebook que usa SparkContext (sc) & sqlContext e execução usando um clustering compartilhado

  • Motivo -

Spark Context (sc) & sqlContext não estão disponíveis por design devido à arquitetura de clustering compartilhado Unity Catalog e ao SparkConnect.

Como resolver:

Use a variável spark para interagir com a instância SparkSession

Limitações:

O Spark JVM não pode ser acessado diretamente do Python / Scala REPL, somente via Spark comando. Isso significa que o comando sc._jvm falhará por padrão.

Os seguintes comandos sc não são suportados: emptyRDD, range, init_batched_serializer, parallelize, pickleFile, textFile, wholeTextFiles, binaryFiles, binaryRecords, sequenceFile, newAPIHadoopFile, newAPIHadoopRDD, hadoopFile, hadoopRDD, union, runJob, setSystemProperty, uiWebUrl, stop, setJobGroup, setLocalProperty, getConf

Job tem Notebook que usa Spark Conf - sparkContext.getConf e execução usando um clustering compartilhado

  • Motivo -

sparkContext, df.sparkContext, sc.sparkContext e APIs semelhantes não estão disponíveis por padrão.

Como resolver:

Em vez disso, use spark.conf

Job tem Notebook que usa SparkContext - SetJobDescription() e execução usando um clustering compartilhado

  • Motivo -

sc.setJobDescription("strings") não estão disponíveis por design devido à arquitetura de clustering compartilhado Unity Catalog e ao SparkConnect.

Como resolver:

Use tags em vez disso, se possível [PySpark docs].

spark.addTag () pode anexar uma tag, e getTags () e interruptTag (tag) podem ser usados para agir sobre a presença/ausência de uma tag

Requer o Databricks Runtime 14.1+

Job tem o Notebook que define os níveis de registro Spark usando o comando, como sc.setLogLevel("INFO"), e a execução usando um clustering compartilhado

  • Motivo -

Em Single-User e sem clustering de isolamento, é possível acessar o contexto Spark para definir dinamicamente o nível log nos drivers e no executor diretamente. No clustering compartilhado, esse método não era acessível pelo contexto Spark e, em Databricks Runtime 14+, o contexto Spark não está mais disponível.

Como resolver:

Para controlar o nível de log sem fornecer um log4j.conf, agora é possível usar um valor de configuração Spark nas configurações de clustering. Use os níveis de registro do Spark definindo spark.log.level como DEBUG, WARN, INFO, ERROR como um valor de configuração do Spark nas configurações de clustering.

Job tem Notebook que usa expressões/consultas profundamente aninhadas e execução usando um clustering compartilhado

  • Motivo -

RecursionError / Nível máximo de aninhamento do Protobuf excedido (para expressões/consultas profundamente aninhadas)

Ao criar recursivamente expressões e DataFrames profundamente aninhados usando a API DataFrame do PySpark, é possível que, em certos casos, ocorra uma das seguintes situações:

  • Exceção Python: RecursionError: profundidade máxima de recursão excedida
  • SparkConnectGprcException: Protobuf nível máximo de aninhamento excedido

Como resolver:

Para contornar o problema, identifique os caminhos de código profundamente aninhados e reescreva-os usando expressões lineares/subconsultas ou exibições temporárias.

Por exemplo: em vez de chamar recursivamente df.withColumn, chame df.withColumns (dict).

Job tem Notebook que usa input_file_name() no código e execução usando um clustering compartilhado

  • Motivo -

input_file_name() não é suportado em Unity Catalog para clustering compartilhado.

Como resolver:

Para obter o nome do arquivo

.withColumn("RECORD_FILE_NAME", col("_metadata.file_name"))

funcionará para spark.read

Para obter todo o caminho do arquivo

.withColumn("RECORD_FILE_PATH", col("_metadata.file_path"))

funcionará para spark.read

Job possui um Notebook que realiza operações de dados em sistemas de arquivos DBFS e execução usando um clustering compartilhado

  • Motivo -

Ao usar o site DBFS com clustering compartilhado usando o serviço FUSE, ele não consegue acessar o sistema de arquivos e gera um erro de arquivo não encontrado

Exemplo:

Veja a seguir alguns exemplos de falha no acesso ao DBFS por meio de clustering compartilhado

with open('/dbfs/test/sample_file.csv', 'r') as file:
ls -ltr /dbfs/test
cat /dbfs/test/sample_file.csv

Como resolver:

Qualquer um dos usos -

  • Databricks Unity Catalog Volume em vez de usar DBFS (Preferencial)
  • Atualize o código para usar o dbutils ou o spark que passa pelo caminho de acesso direto ao armazenamento e recebe acesso a DBFS a partir do clustering compartilhado