Pular para o conteúdo principal

Atualizar o Job quando o usuário atualizar o espaço de trabalho legado para Unity Catalog

Ao atualizar o espaço de trabalho legado para Unity Catalog, você pode precisar atualizar os trabalhos existentes para que façam referência às tabelas e aos caminhos de arquivo atualizados. A tabela principal desta página lista cenários típicos e sugestões para atualizar seu trabalho. Os cenários que requerem exemplos de código contêm links para a seção Cenários detalhados .

Para uma demonstração da atualização de Job para Unity Catalog, consulte Atualizando um Job para Unity Catalog.

Visão geral dos cenários

Cenário

soluções

Job utiliza uma biblioteca personalizada através de um init script ou de uma biblioteca definida cluster .

Uma biblioteca personalizada seria definida como um pacote pip ou jar não disponível publicamente que executa operações de leitura ou gravação do Apache Spark ou SQL incorporadas em seu código.

Modifique a biblioteca personalizada para garantir que:

  • Os nomes dos bancos de dados têm namespaces de três níveis.
  • Os pontos de montagem não são usados no código.

Job lê ou escreve em uma tabela Hive metastore .

  • Avalie a configuração do catálogo default na configuração do Job clustering Spark: spark.databricks.sql.initial.catalog.name my_catalog
  • Avalie se o catálogo workspace default pode ser definido como diferente de hive_metastore para que o código do trabalho não precise ser alterado.
  • Caso contrário, altere o código do trabalho para renomear os namespaces de dois níveis para namespaces de três níveis da tabela apropriada.
  • Se o trabalho estiver usando puramente SQL, considere adicionar uma declaração USE CATALOG.

Job lê ou grava em caminhos que são subpastas de tabelas (não suportado no Unity Catalog).

  • Altere o código para ler da tabela com um predicado na coluna de partição.
  • Altere o código para escrever na tabela com overwriteByPartition ou outra opção apropriada.

Job lê ou grava em caminhos de montagem que são tabelas Unity Catalog .

  • Altere o código para se referir à tabela correta com namespaces de três níveis.
  • Se a tabela não estiver registrada ou não estiver, o código ainda precisará ser modificado para ser gravado em um caminho de volume em vez de em um caminho de montagem.

Job lê ou grava arquivos (não tabelas) usando caminhos de montagem.

Em vez disso, altere o código para gravar em um local de volume.

Job é um Job de transmissão que usa applyInPandasWithState.

Não há suporte no momento. Considere a possibilidade de reescrever, se possível, ou não tente refatorar esse trabalho até que o suporte seja fornecido.

Job é um Job de transmissão que usa o modo de processamento contínuo.

O modo de processamento contínuo é experimental no Spark e não é compatível com o Unity Catalog. Refatorar o trabalho para usar a transmissão estruturada. Se isso não for possível, considere manter o trabalho em execução no site Hive metastore.

Job é um trabalho de transmissão que usa diretórios de ponto de verificação.

  • Mova os diretórios de pontos de verificação para volumes.
  • Altere o código no Notebook para usar um caminho de volume.
  • Job O proprietário deve ter permissão de leitura e gravação nesse caminho.
  • Parar o trabalho.
  • Mova o ponto de verificação para o novo local do volume.
  • Reiniciar o trabalho.

Job tem uma definição de clustering abaixo de Databricks Runtime 11.3.

  • Altere a definição de clustering de trabalho para Databricks Runtime 11.3 ou acima.
  • Altere a definição de clustering de trabalhos para usar um modo de acesso designado ou padrão.

Job tem Notebook que interage com armazenamento ou tabelas.

A entidade de serviço na qual o trabalho foi executado deve ter acesso de leitura e gravação aos recursos necessários em Unity Catalog, como volumes, tabelas, locais externos e assim por diante.

Job é um pipeline declarativo LakeFlow Spark .

  • Altere o Job clustering para Databricks Runtime 13.1 ou acima.
  • Interrompa a tarefa do pipeline declarativo LakeFlow Spark .
  • Mova os dados para uma tabela Unity Catalog .
  • Altere a definição do Job do pipeline declarativo LakeFlow Spark para usar a nova tabela de gerenciamento Unity Catalog .
  • Reinicie o trabalho do pipeline declarativo LakeFlow Spark .

Job utiliza um serviço cloud que não seja de armazenamento (como AWS Kinesis) com um instance profile para autenticação.

Modifique o código para usar as credenciais do serviço Unity Catalog, que regem as credenciais capazes de interagir com o serviço de nuvem sem armazenamento, gerando credenciais temporárias utilizáveis pelos SDKs.

Job utiliza Scala.

  • Se o senhor estiver abaixo de Databricks Runtime 13.3, execute-o no site dedicado compute.
  • O clustering padrão é suportado em Databricks Runtime 13.3 e acima.

Job possui um Notebook que utiliza UDFs Scala .

  • Se o senhor estiver abaixo de Databricks Runtime 13.3, execute-o no site dedicado compute.
  • O clustering padrão é compatível com o site Databricks Runtime 14.2.

Job possui tarefas que utilizam MLR.

execução no site dedicado compute.

Job tem configuração de clustering que depende do script de inicialização global.

  • Use o site Databricks Runtime 13.3 ou o acima para obter suporte total.
  • Modifique para usar o script de inicialização com escopo clusterou a política de cluster. Os scripts, arquivos e pacotes devem ser instalados nos volumes Unity Catalog para serem executados.

Job utiliza jars/Maven, extensões Spark ou fonte de dados personalizada (do Spark).

  • Use o site Databricks Runtime 13.3 ou o acima.
  • Use a política de cluster para instalar a biblioteca.

Job inclui um Notebook com UDFs PySpark .

Use o site Databricks Runtime 13.2 ou o acima.

Job consiste em um Notebook com código Python que realiza chamadas de rede.

Use o site Databricks Runtime 12.2 ou o acima.

Job possui um Notebook com UDFs (escalares) Pandas .

Use o site Databricks Runtime 13.2 ou o acima.

Job usará o site Unity Catalog Volumes.

Use o site Databricks Runtime 13.3 ou o acima.

Job usa spark.catalog.X (tableExists, listTables, setDefaultCatalog) em um cluster compartilhado.

Consulte Job Notebook para usar spark.catalog.X em um clustercompartilhado.

Job usa dbutils...getContext().toJson() em um cluster compartilhado.

Use .safeToJson() em vez de .toJson() ao acessar o contexto do comando (por exemplo, para recuperar um ID de trabalho). Isso fornece um subconjunto de informações que podem ser compartilhadas com segurança em um cluster compartilhado.

Requer o Databricks Runtime 13.3 LTS+

Job usa spark.udf.registerJavaFunction em um cluster compartilhado.

  • Utilize Databricks Runtime 14.3 LTS ou superior.
  • Para Notebook e Job, use uma célula %scala para registrar a UDF Scala usando spark.udf.register. Como Python e Scala compartilham o contexto de execução, a UDF em Scala também está disponível em Python.
  • Para clientes que utilizam IDEs (como o Databricks Connect v2), a única opção é reescrever a UDF como uma UDF Python do Unity Catalog. A Databricks planeja estender o suporte para UDFs do Unity Catalog para Scala.

Job usa sc.parallelize e spark.read.json() em um cluster compartilhado.

Consulte Job Notebook para usar sc.parallelize e read.json() em um clustercompartilhado.

Job usa sc.emptyRDD() para criar DataFrames vazios em um cluster compartilhado.

Consulte Job Notebook para criar DataFrames vazios usando sc.emptyRDD() em um clustercompartilhado.

Job usa RDD mapPartitions em um cluster compartilhado.

Os clusters compartilhados do Unity Catalog usam o Spark Connect para comunicação entre programas Python e Scala e o servidor Spark, tornando os RDDs inacessíveis.

Um caso de uso típico para RDDs é executar uma lógica de inicialização dispendiosa apenas uma vez e, em seguida, realizar operações mais baratas por linha, como chamar um serviço externo ou inicializar a lógica de criptografia.

Reescreva as operações de RDD usando a API DataFrame e as UDFs Arrow nativas do PySpark.

Job usa SparkContext (sc) e sqlContext em um cluster compartilhado.

sc e sqlContext não estão disponíveis por design devido à arquitetura de cluster compartilhado do Unity Catalog e do SparkConnect. Use a variável spark para interagir com a instância SparkSession.

A JVM Spark não pode ser acessada diretamente do REPL do Python ou Scala — somente através do comando Spark . O comando sc._jvm falhará por design.

Os seguintes comandos sc não são suportados: emptyRDD, range, init_batched_serializer, parallelize, pickleFile, textFile, wholeTextFiles, binaryFiles, binaryRecords, sequenceFile, newAPIHadoopFile, newAPIHadoopRDD, hadoopFile, hadoopRDD, union, runJob, setSystemProperty, uiWebUrl, stop, setJobGroup, setLocalProperty, getConf

Job usa sparkContext.getConf em um cluster compartilhado.

sparkContextAPIs df.sparkContext, sc.sparkContext e similares não estão disponíveis por design. Use spark.conf em vez disso.

Job usa sc.setJobDescription() em um cluster compartilhado.

sc.setJobDescription("String") Não está disponível por design devido à arquitetura de cluster compartilhado do Unity Catalog e do SparkConnect.

Use spark.addTag() para anexar uma tag e getTags() e interruptTag(tag) para agir na presença ou ausência de uma tag.

Requer o Databricks Runtime 14.1+

Job usa sc.setLogLevel() em um cluster compartilhado.

Em clusters compartilhados, o contexto do Spark não está acessível para definir os níveis de log diretamente. No Databricks Runtime 14+, o Contexto Spark não está mais disponível.

Defina spark.log.level como DEBUG, WARN, INFO ou ERROR como um valor de configuração do Spark nas configurações do cluster.

Job utiliza expressões ou consultas profundamente aninhadas em um cluster compartilhado.

DataFrames e expressões profundamente aninhadas, criadas recursivamente usando a API DataFrame do PySpark, podem produzir:

  • RecursionError: maximum recursion depth exceeded
  • SparkConnectGrpcException: Protobuf maximum nesting level exceeded

Identifique caminhos de código profundamente aninhados e reescreva-os usando expressões lineares, subconsultas ou visualizações temporárias. Por exemplo, em vez de chamar recursivamente df.withColumn, use df.withColumns(dict) em vez disso.

Job usa input_file_name() em um cluster compartilhado.

Consulte Job Notebook para usar input_file_name() em um clustercompartilhado.

Job realiza operações de dados em DBFS em um cluster compartilhado.

Consulte Job Notebook realizar operações de dados em DBFS em um clustercompartilhado.

Cenários detalhados

Os seguintes cenários requerem exemplos de código.

UsoJob Notebook spark.catalog.X em um clustercompartilhado

Use o site Databricks Runtime 14.2 ou o acima.

Caso não seja possível atualizar o Databricks Runtime, utilize as seguintes soluções alternativas.

Em vez de tableExists, use:

Python
# SQL workaround
def tableExistsSql(tablename):
try:
spark.sql(f"DESCRIBE TABLE {tablename};")
except Exception as e:
return False
return True
tableExistsSql("jakob.jakob.my_table")

Em vez de listTables, use SHOW TABLES (que também suporta restrição por banco de dados ou correspondência de padrões):

Python
spark.sql("SHOW TABLES")

Para setDefaultCatalog, execução:

Python
spark.sql("USE CATALOG <catalog_name>")

Job Notebook usa sc.parallelize e spark.read.json() em um clustercompartilhado

Use json.loads em vez disso.

Antes:

Python
json_content1 = "{'json_col1': 'hello', 'json_col2': 32}"
json_content2 = "{'json_col1': 'hello', 'json_col2': 'world'}"
json_list = []
json_list.append(json_content1)
json_list.append(json_content2)
df = spark.read.json(sc.parallelize(json_list))
display(df)

Depois de:

Python
from pyspark.sql import Row
import json
# Sample JSON data as a list of dictionaries (similar to JSON objects)
json_data_str = response.text
json_data = [json.loads(json_data_str)]
# Convert dictionaries to Row objects
rows = [Row(**json_dict) for json_dict in json_data]
# Create DataFrame from list of Row objects
df = spark.createDataFrame(rows)
df.display()

Job Notebook cria DataFrames vazios usando sc.emptyRDD() em um clustercompartilhado

Antes:

Scala
val schema = StructType( StructField("k", StringType, true) :: StructField("v", IntegerType, false) :: Nil)
spark.createDataFrame(sc.emptyRDD[Row], schema)

Depois de:

Scala
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}
val schema = StructType( StructField("k", StringType, true) :: StructField("v", IntegerType, false) :: Nil)
spark.createDataFrame(new java.util.ArrayList[Row](), schema)
Python
from pyspark.sql.types import StructType, StructField, StringType
schema = StructType([StructField("k", StringType(), True)])
spark.createDataFrame([], schema)

UsoJob Notebook input_file_name() em um clustercompartilhado

input_file_name() Não é suportado no Unity Catalog para clusters compartilhados.

Para obter o nome do arquivo:

Python
.withColumn("RECORD_FILE_NAME", col("_metadata.file_name"))

Para obter o caminho completo do arquivo:

Python
.withColumn("RECORD_FILE_PATH", col("_metadata.file_path"))

Ambas as opções funcionam com spark.read.

Job Notebook realiza operações de dados em DBFS em um clustercompartilhado

Ao usar o DBFS com um cluster compartilhado por meio do serviço FUSE, o cluster não consegue acessar o sistema de arquivos e gera um erro de arquivo não encontrado.

Os exemplos a seguir falham em um cluster compartilhado:

Bash
with open('/dbfs/test/sample_file.csv', 'r') as file:
ls -ltr /dbfs/test
cat /dbfs/test/sample_file.csv

Utilize uma das seguintes soluções:

  • Utilize um Volume de Catálogo Databricks Unity Catalog em vez do DBFS (preferencialmente).
  • Atualize o código para usar dbutils ou spark, que usam o caminho de acesso direto ao armazenamento e têm acesso concedido ao DBFS de clusters compartilhados.