Atualizar o Job quando o usuário atualizar o espaço de trabalho legado para Unity Catalog
Ao atualizar o espaço de trabalho legado para Unity Catalog, você pode precisar atualizar os trabalhos existentes para que façam referência às tabelas e aos caminhos de arquivo atualizados. A tabela principal desta página lista cenários típicos e sugestões para atualizar seu trabalho. Os cenários que requerem exemplos de código contêm links para a seção Cenários detalhados .
Para uma demonstração da atualização de Job para Unity Catalog, consulte Atualizando um Job para Unity Catalog.
Visão geral dos cenários
Cenário | soluções |
|---|---|
Job utiliza uma biblioteca personalizada através de um init script ou de uma biblioteca definida cluster . Uma biblioteca personalizada seria definida como um pacote | Modifique a biblioteca personalizada para garantir que:
|
Job lê ou escreve em uma tabela Hive metastore . |
|
Job lê ou grava em caminhos que são subpastas de tabelas (não suportado no Unity Catalog). |
|
Job lê ou grava em caminhos de montagem que são tabelas Unity Catalog . |
|
Job lê ou grava arquivos (não tabelas) usando caminhos de montagem. | Em vez disso, altere o código para gravar em um local de volume. |
Job é um Job de transmissão que usa | Não há suporte no momento. Considere a possibilidade de reescrever, se possível, ou não tente refatorar esse trabalho até que o suporte seja fornecido. |
Job é um Job de transmissão que usa o modo de processamento contínuo. | O modo de processamento contínuo é experimental no Spark e não é compatível com o Unity Catalog. Refatorar o trabalho para usar a transmissão estruturada. Se isso não for possível, considere manter o trabalho em execução no site Hive metastore. |
Job é um trabalho de transmissão que usa diretórios de ponto de verificação. |
|
Job tem uma definição de clustering abaixo de Databricks Runtime 11.3. |
|
Job tem Notebook que interage com armazenamento ou tabelas. | A entidade de serviço na qual o trabalho foi executado deve ter acesso de leitura e gravação aos recursos necessários em Unity Catalog, como volumes, tabelas, locais externos e assim por diante. |
Job é um pipeline declarativo LakeFlow Spark . |
|
Job utiliza um serviço cloud que não seja de armazenamento (como AWS Kinesis) com um instance profile para autenticação. | Modifique o código para usar as credenciais do serviço Unity Catalog, que regem as credenciais capazes de interagir com o serviço de nuvem sem armazenamento, gerando credenciais temporárias utilizáveis pelos SDKs. |
Job utiliza Scala. |
|
Job possui um Notebook que utiliza UDFs Scala . |
|
Job possui tarefas que utilizam MLR. | execução no site dedicado compute. |
Job tem configuração de clustering que depende do script de inicialização global. |
|
Job utiliza jars/Maven, extensões Spark ou fonte de dados personalizada (do Spark). |
|
Job inclui um Notebook com UDFs PySpark . | Use o site Databricks Runtime 13.2 ou o acima. |
Job consiste em um Notebook com código Python que realiza chamadas de rede. | Use o site Databricks Runtime 12.2 ou o acima. |
Job possui um Notebook com UDFs (escalares) Pandas . | Use o site Databricks Runtime 13.2 ou o acima. |
Job usará o site Unity Catalog Volumes. | Use o site Databricks Runtime 13.3 ou o acima. |
Job usa | Consulte Job Notebook para usar spark.catalog.X em um clustercompartilhado. |
Job usa | Use Requer o Databricks Runtime 13.3 LTS+ |
Job usa |
|
Job usa | Consulte Job Notebook para usar sc.parallelize e read.json() em um clustercompartilhado. |
Job usa | Consulte Job Notebook para criar DataFrames vazios usando sc.emptyRDD() em um clustercompartilhado. |
Job usa RDD | Os clusters compartilhados do Unity Catalog usam o Spark Connect para comunicação entre programas Python e Scala e o servidor Spark, tornando os RDDs inacessíveis. Um caso de uso típico para RDDs é executar uma lógica de inicialização dispendiosa apenas uma vez e, em seguida, realizar operações mais baratas por linha, como chamar um serviço externo ou inicializar a lógica de criptografia. Reescreva as operações de RDD usando a API DataFrame e as UDFs Arrow nativas do PySpark. |
Job usa SparkContext ( |
A JVM Spark não pode ser acessada diretamente do REPL do Python ou Scala — somente através do comando Spark . O comando Os seguintes comandos |
Job usa |
|
Job usa |
Use Requer o Databricks Runtime 14.1+ |
Job usa | Em clusters compartilhados, o contexto do Spark não está acessível para definir os níveis de log diretamente. No Databricks Runtime 14+, o Contexto Spark não está mais disponível. Defina |
Job utiliza expressões ou consultas profundamente aninhadas em um cluster compartilhado. | DataFrames e expressões profundamente aninhadas, criadas recursivamente usando a API DataFrame do PySpark, podem produzir:
Identifique caminhos de código profundamente aninhados e reescreva-os usando expressões lineares, subconsultas ou visualizações temporárias. Por exemplo, em vez de chamar recursivamente |
Job usa | Consulte Job Notebook para usar input_file_name() em um clustercompartilhado. |
Job realiza operações de dados em DBFS em um cluster compartilhado. | Consulte Job Notebook realizar operações de dados em DBFS em um clustercompartilhado. |
Cenários detalhados
Os seguintes cenários requerem exemplos de código.
UsoJob Notebook spark.catalog.X em um clustercompartilhado
Use o site Databricks Runtime 14.2 ou o acima.
Caso não seja possível atualizar o Databricks Runtime, utilize as seguintes soluções alternativas.
Em vez de tableExists, use:
# SQL workaround
def tableExistsSql(tablename):
try:
spark.sql(f"DESCRIBE TABLE {tablename};")
except Exception as e:
return False
return True
tableExistsSql("jakob.jakob.my_table")
Em vez de listTables, use SHOW TABLES (que também suporta restrição por banco de dados ou correspondência de padrões):
spark.sql("SHOW TABLES")
Para setDefaultCatalog, execução:
spark.sql("USE CATALOG <catalog_name>")
Job Notebook usa sc.parallelize e spark.read.json() em um clustercompartilhado
Use json.loads em vez disso.
Antes:
json_content1 = "{'json_col1': 'hello', 'json_col2': 32}"
json_content2 = "{'json_col1': 'hello', 'json_col2': 'world'}"
json_list = []
json_list.append(json_content1)
json_list.append(json_content2)
df = spark.read.json(sc.parallelize(json_list))
display(df)
Depois de:
from pyspark.sql import Row
import json
# Sample JSON data as a list of dictionaries (similar to JSON objects)
json_data_str = response.text
json_data = [json.loads(json_data_str)]
# Convert dictionaries to Row objects
rows = [Row(**json_dict) for json_dict in json_data]
# Create DataFrame from list of Row objects
df = spark.createDataFrame(rows)
df.display()
Job Notebook cria DataFrames vazios usando sc.emptyRDD() em um clustercompartilhado
Antes:
val schema = StructType( StructField("k", StringType, true) :: StructField("v", IntegerType, false) :: Nil)
spark.createDataFrame(sc.emptyRDD[Row], schema)
Depois de:
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}
val schema = StructType( StructField("k", StringType, true) :: StructField("v", IntegerType, false) :: Nil)
spark.createDataFrame(new java.util.ArrayList[Row](), schema)
from pyspark.sql.types import StructType, StructField, StringType
schema = StructType([StructField("k", StringType(), True)])
spark.createDataFrame([], schema)
UsoJob Notebook input_file_name() em um clustercompartilhado
input_file_name() Não é suportado no Unity Catalog para clusters compartilhados.
Para obter o nome do arquivo:
.withColumn("RECORD_FILE_NAME", col("_metadata.file_name"))
Para obter o caminho completo do arquivo:
.withColumn("RECORD_FILE_PATH", col("_metadata.file_path"))
Ambas as opções funcionam com spark.read.
Job Notebook realiza operações de dados em DBFS em um clustercompartilhado
Ao usar o DBFS com um cluster compartilhado por meio do serviço FUSE, o cluster não consegue acessar o sistema de arquivos e gera um erro de arquivo não encontrado.
Os exemplos a seguir falham em um cluster compartilhado:
with open('/dbfs/test/sample_file.csv', 'r') as file:
ls -ltr /dbfs/test
cat /dbfs/test/sample_file.csv
Utilize uma das seguintes soluções:
- Utilize um Volume de Catálogo Databricks Unity Catalog em vez do DBFS (preferencialmente).
- Atualize o código para usar
dbutilsouspark, que usam o caminho de acesso direto ao armazenamento e têm acesso concedido ao DBFS de clusters compartilhados.