Pular para o conteúdo principal

Migre da compute clássica para a compute serverless

Migre suas cargas de trabalho da compute clássica para a compute serverless . compute sem servidor lida automaticamente com o provisionamento, o escalonamento, as atualizações de tempo de execução e a otimização.

A maioria das cargas de trabalho clássicas pode ser migrada com alterações mínimas ou nenhuma alteração de código. Esta página se concentra nessas cargas de trabalho. Alguns recursos, como df.cache, ainda não são suportados em serverless, mas não exigirão alterações de código quando estiverem disponíveis. Determinadas cargas de trabalho que dependem de R ou Scala Notebook exigem compute clássica e não poderão ser migradas para serverless. Para obter uma lista completa das limitações atuais, consulte Limitações compute sem servidor.

Migração os passos

Para migrar suas cargas de trabalho da compute clássica para a compute serverless , siga estes passos:

  1. Verifique os pré-requisitos : Certifique-se de que seu workspace, rede e acesso ao armazenamento cloud atendam aos requisitos. Consulte a seção " Antes de começar".
  2. Atualizar código : Faça todas as alterações necessárias no código e na configuração. Consulte Atualizar seu código.
  3. Teste suas cargas de trabalho : Valide a compatibilidade e a correção antes da migração. Consulte a seção "Teste suas cargas de trabalho".
  4. Escolha um modo de desempenho : Selecione o modo de desempenho que melhor se adapte aos requisitos da sua carga de trabalho. Consulte Escolher um modo de desempenho.
  5. Migre em fases : implemente a serverless de forma incremental, começando com cargas de trabalho novas e de baixo risco. Veja Migrar em fases.
  6. Monitore os custos : acompanhe o consumo DBU serverless e configure alertas. Consulte a seção Custos do Monitor.

Antes de começar

Antes de iniciar a migração, talvez seja necessário atualizar algumas configurações antigas em seu workspace.

Pré-requisito

Ação

Detalhes

O espaço de trabalho está habilitado para Unity Catalog

Migre do Hive metastore se necessário.

Atualize um workspace Databricks para Unity Catalog

Rede configurada

Substitua o peering de VPC por NCCs, Private Link ou regras de firewall.

Rede de plano de compute serverless

acesso ao armazenamento em nuvem

Substitua os padrões legados de acesso a dados por locais externos do Unity Catalog.

Conectar ao armazenamento de objetos na cloud usando o Unity Catalog

Atualize seu código

As seções a seguir listam as alterações de código e configuração necessárias para tornar suas cargas de trabalho compatíveis com serverless.

Acesso a dados

Os padrões de acesso a dados legados não são suportados em serverless. Atualize seu código para usar o Unity Catalog.

Padrão clássico

substituição sem servidor

Detalhes

Caminhos DBFS (dbfs:/...)

Volumes Unity Catalog

O que são volumes Unity Catalog ?

Tabelas Hive metastore

Tabelas Unity Catalog (ou Federação HMS )

Atualize um workspace Databricks para Unity Catalog

Credenciais account de armazenamento

Locais externos Unity Catalog

Conectar ao armazenamento de objetos na cloud usando o Unity Catalog

JARs JDBC personalizados

Federação lakehouse

O que é federação de consultas?

atenção

O acesso DBFS é limitado em serverless. Atualize todos os caminhos dbfs:/ para volumes Unity Catalog antes da migração. Para obter mais informações, consulte Migrar arquivos armazenados em DBFS.

Exemplo: Substituir caminhos DBFS e referências Hive metastore

Python
# Classic
df = spark.read.csv("dbfs:/mnt/datalake/data.csv", header=True)
df.write.parquet("dbfs:/mnt/output/results")
df = spark.table("my_database.my_table")

# Serverless
df = spark.read.csv("/Volumes/main/sales/raw_data/data.csv", header=True)
df.write.parquet("/Volumes/main/analytics/output/results")
df = spark.table("main.my_database.my_table") # three-level namespace

APIs e código

Determinadas APIs e padrões de código não são suportados em serverless. Consulte esta tabela para verificar se o seu código precisa ser atualizado.

Padrão clássico

substituição sem servidor

Detalhes

APIs RDD (sc.parallelize, rdd.map)

APIs de DataFrame

Compare o Spark Connect com o Spark Classic.

df.cache(), df.persist()

Remover chamadas de cache

limitações compute sem servidor

spark.sparkContext, sqlContext

Use spark (SparkSession) diretamente

Compare o Spark Connect com o Spark Classic.

Variáveis do Hive (${var})

SQL DECLARE VARIABLE ou f-strings do Python

DECLARE VARIABLE

Configurações do Spark não suportadas

Remover configurações não suportadas. O modelo sem servidor ajusta automaticamente a maioria das configurações.

Configure as propriedades Spark para Notebooks e Jobs serverless .

Exemplo: Substitua operações de RDD por DataFrames

Python
from pyspark.sql import functions as F

# sc.parallelize + rdd.map
# Classic: rdd = sc.parallelize([1, 2, 3]); rdd.map(lambda x: x * 2).collect()
df = spark.createDataFrame([(1,), (2,), (3,)], ["value"])
result = df.select((F.col("value") * 2).alias("value")).collect()

# rdd.flatMap
# Classic: sc.parallelize(["hello world"]).flatMap(lambda l: l.split(" ")).collect()
df = spark.createDataFrame([("hello world",)], ["line"])
words = df.select(F.explode(F.split("line", " ")).alias("word")).collect()

# rdd.groupByKey
# Classic: rdd.groupByKey().mapValues(list).collect()
df = spark.createDataFrame([("a", 1), ("b", 2), ("a", 3)], ["key", "value"])
grouped = df.groupBy("key").agg(F.collect_list("value").alias("values")).collect()

# rdd.mapPartitions → applyInPandas
import pandas as pd
def process_group(pdf: pd.DataFrame) -> pd.DataFrame:
return pd.DataFrame({"total": [pdf["id"].sum()]})
result = (spark.range(100).repartition(4)
.groupBy(F.spark_partition_id())
.applyInPandas(process_group, schema="total long").collect())

# sc.textFile → spark.read.text
df = spark.read.text("/Volumes/catalog/schema/volume/file.txt")

Exemplo: Substitua SparkContext e cache.

Python
from pyspark.sql.functions import broadcast

# sc.broadcast → broadcast join
result = main_df.join(broadcast(lookup_df), "key")

# sc.accumulator → DataFrame aggregation
total = df.agg(F.sum("amount")).collect()[0][0]

# sqlContext.sql → spark.sql
result = spark.sql("SELECT * FROM main.db.table")

# df.cache() → remove caching calls
# Materialize expensive intermediate results to Delta as a workaround:
df = spark.read.parquet(path)
result = df.filter("status = 'active'")
expensive_df.write.format("delta").mode("overwrite").saveAsTable("main.scratch.temp")
result = spark.table("main.scratch.temp")

biblioteca e ambientes

Você pode gerenciar bibliotecas e ambientes no nível workspace usando ambientes base e no nível do Notebook usando o ambienteserverless do Notebook.

Padrão clássico

substituição sem servidor

Detalhes

Init scripts

ambientes sem servidor

Configure o ambiente serverless .

biblioteca com escopo de cluster

Notebook- biblioteca de escopo ou ambiente

Configure o ambiente serverless .

Maven/ BibliotecaJAR

Suporte a tarefas JAR para Job; PyPI para Notebook

Tarefa JAR para o trabalho

contêineres Docker

Ambientes sem servidor para necessidades de bibliotecas

Configure o ambiente serverless .

fixe o pacote Python em requirements.txt para ambientes reproduzíveis. Consulte as Melhores práticas para compute serverless.

transmissão

As cargas de trabalho do Transmissão são suportadas em serverless, mas certos gatilhos não são suportados. Atualize seu código para usar os gatilhos compatíveis.

Gatilho Spark

Apoiado

Notas

Trigger.AvailableNow()

Sim

Recomendado

Trigger.Once()

Sim

Isso está obsoleto. Use Trigger.AvailableNow() em vez disso.

Trigger.ProcessingTime(interval)

Não

Devolve INFINITE_STREAMING_TRIGGER_NOT_SUPPORTED

Trigger.Continuous(interval)

Não

Em vez disso, use o modo contínuo do pipeline declarativo LakeFlow Spark .

padrão (não definir .trigger())

Não

Omitir .trigger() assume o valor padrão ProcessingTime("0 seconds"), o que não é suportado em serverless. Defina sempre .trigger(availableNow=True) explicitamente.

Para transmissão contínua, migre para o pipeline declarativoSpark no modo contínuo ou use o Job de programação contínua com AvailableNow. Para fontes grandes, defina maxFilesPerTrigger ou maxBytesPerTrigger para evitar erros de falta de memória.

Exemplo: Corrigir gatilhos de transmissão

Python
# Classic (not supported on serverless — default trigger is ProcessingTime)
query = df.writeStream.format("delta").outputMode("append").start()

# Serverless (explicit AvailableNow trigger)
query = (df.writeStream.format("delta").outputMode("append")
.trigger(availableNow=True)
.option("checkpointLocation", checkpoint_path)
.start(output_path))
query.awaitTermination()

# With OOM prevention for large sources
query = (spark.readStream.format("delta")
.option("maxFilesPerTrigger", 100)
.option("maxBytesPerTrigger", "10g")
.load(input_path)
.writeStream.format("delta")
.trigger(availableNow=True)
.option("checkpointLocation", checkpoint_path)
.start(output_path))

Teste suas cargas de trabalho

  1. Teste rápido de compatibilidade : execute a carga de trabalho em compute clássico com o modo de acesso padrão e Databricks Runtime 14.3 ou superior. Se a execução for bem-sucedida, a carga de trabalho poderá migrar para serverless sem qualquer alteração de código.
  2. Comparação A/B (recomendada para produção): execução da mesma carga de trabalho em um ambiente clássico (controle) e em serverless (experimento). Compare as tabelas de saída e verifique se estão corretas. Repita o processo até que as saídas coincidam.
  3. Configurações temporárias : Você pode definir temporariamente configurações compatíveis com o Spark durante os testes. Remova-os assim que estiverem estáveis.

Selecione um modo de desempenho

Os recursos de tarefas e pipelines sem servidor oferecem suporte a dois modos de desempenho: padrão e otimizado para desempenho. O modo de desempenho que você escolher dependerá dos requisitos da sua carga de trabalho.

Mode

Disponibilidade

startup

Melhor para

Standard

Empregos, pipeline declarativo LakeFlow Spark

4-6 minutos

lotes sensíveis ao custo

Otimizado para desempenho

Notebook, Jobs, LakeFlow Spark Pipeline declarativo

Segundos

Interativo, sensível à latência

Migrar em fases

  1. Novas cargas de trabalho : inicie todos os novos Notebooks e Jobs em serverless.
  2. Cargas de trabalho de baixo risco : Migre cargas de trabalho PySpark/SQL que já estejam no modo de acesso padrão e com Databricks Runtime 14.3 ou superior.
  3. Cargas de trabalho complexas : Migrar cargas de trabalho que necessitam de alterações de código (reescritas de RDD, atualizações de DBFS, correções de gatilhos).
  4. Carga de trabalho restante : Revisar periodicamente à medida que as capacidades se expandem.

Monitorar custos

A cobrança de serviços sem servidor é baseada no consumo DBU , não no tempo de atividade cluster . Antes de migrar em grande escala, valide as expectativas de custo com cargas de trabalho representativas.

Recursos adicionais

Você também pode consultar a seguinte postagem no blog para mais informações: