Pular para o conteúdo principal

Mover tabelas entre o pipeline declarativo LakeFlow

Este artigo descreve como mover tabelas de transmissão e visualizações materializadas entre pipelines. Após a movimentação, o pipeline para o qual você move o fluxo atualiza a tabela, em vez do pipeline original. Isso é útil em muitos cenários, incluindo:

  • Divida um pipeline grande em outros menores.
  • mesclar vários pipelines em um único e maior.
  • Alterar a frequência refresh de algumas tabelas em um pipeline.
  • Mova tabelas de um pipeline que usa o modo de publicação legado para o modo de publicação default . Para obter detalhes sobre o modo de publicação legado, consulte Modo de publicação legado para pipeline. Para ver como você pode migrar o modo de publicação de um pipeline inteiro de uma só vez, consulte Habilitar o modo de publicação default em um pipeline.
  • Mova tabelas pelo pipeline em diferentes espaços de trabalho.

Requisitos

A seguir estão os requisitos para mover uma tabela entre pipelines.

  • Você deve usar Databricks Runtime 16.3 ou superior ao executar o comando ALTER ... e Databricks Runtime 17.2 para movimentação de tabelas entreworkspace .

  • Tanto o pipeline de origem quanto o de destino devem ser:

    • De propriedade da account de usuário Databricks ou entidade de serviço que executa as operações
    • Em espaços de trabalho que compartilham um metastore. Para verificar o metastore, consulte a funçãocurrent_metastore.
  • O pipeline de destino deve usar o modo de publicação default . Isso permite que você publique tabelas em vários catálogos e esquemas.

    Como alternativa, ambos os pipelines devem usar o modo de publicação legado e ambos devem ter o mesmo catálogo e valor de destino nas configurações. Para obter informações sobre o modo de publicação legado, consulte Esquema LIVE (legado).

nota

Este recurso não suporta a movimentação de um pipeline usando o modo de publicação default para um pipeline usando o modo de publicação legado.

Mover uma tabela entre pipelines

As instruções a seguir descrevem como mover uma tabela de transmissão ou view materializada de um pipeline para outro.

  1. Pare o pipeline de origem se ele estiver em execução. Espere até que pare completamente.

  2. Remova a definição da tabela do código do pipeline de origem e armazene-a em algum lugar para referência futura.

    Inclua quaisquer consultas ou códigos de suporte necessários para que o pipeline seja executado corretamente.

  3. Em um Notebook ou editor SQL , execute o seguinte comando SQL para reatribuir a tabela do pipeline de origem para o pipeline de destino:

    SQL
    ALTER [MATERIALIZED VIEW | STREAMING TABLE | TABLE] <table-name>
    SET TBLPROPERTIES("pipelines.pipelineId"="<destination-pipeline-id>");

    Observe que o comando SQL deve ser executado a partir do workspace do pipeline de origem.

    O comando usa ALTER MATERIALIZED VIEW e ALTER STREAMING TABLE para gerenciar visualizações materializadas e tabelas de transmissão Unity Catalog , respectivamente. Para executar a mesma ação em uma tabela Hive metastore , use ALTER TABLE.

    Por exemplo, se você quiser mover uma tabela de transmissão chamada sales para um pipeline com o ID abcd1234-ef56-ab78-cd90-1234efab5678, você executaria o seguinte comando:

    SQL
    ALTER STREAMING TABLE sales
    SET TBLPROPERTIES("pipelines.pipelineId"="abcd1234-ef56-ab78-cd90-1234efab5678");
nota

O pipelineId deve ser um identificador de pipeline válido. O valor null não é permitido.

  1. Adicione a definição da tabela ao código do pipeline de destino.
nota

Se o catálogo ou o esquema de destino forem diferentes entre a origem e o destino, copiar a consulta exatamente pode não funcionar. Tabelas parcialmente qualificadas na definição podem ser resolvidas de forma diferente. Pode ser necessário atualizar a definição ao mover para qualificar completamente os nomes das tabelas.

A mudança está concluída. Agora você pode executar o pipeline de origem e de destino. O pipeline de destino atualiza a tabela.

Solução de problemas

A tabela a seguir descreve erros que podem ocorrer ao mover uma tabela entre pipelines.

Erro

Descrição

DESTINATION_PIPELINE_NOT_IN_DIRECT_PUBLISHING_MODE

O pipeline de origem está no modo de publicação default , e o destino usa o modo de esquema LIVE (legado). Isso não é suportado. Se a origem usar o modo de publicação default , o destino também deverá usar.

PIPELINE_TYPE_NOT_WORKSPACE_PIPELINE_TYPE

Somente a movimentação de tabelas entre o pipeline declarativo LakeFlow é suportada. pipeline para tabelas de transmissão e visualização materializada criadas com Databricks SQL não são suportados.

DESTINATION_PIPELINE_NOT_FOUND

O pipelines.pipelineId deve ser um pipeline válido. O pipelineId não pode ser nulo.

A tabela não atualiza no destino após a movimentação.

Para mitigar rapidamente esse caso, mova a tabela de volta para o pipeline de origem seguindo as mesmas instruções.

PIPELINE_PERMISSION_DENIED_NOT_OWNER

Tanto o pipeline de origem quanto o de destino devem pertencer ao usuário que executa as operações de movimentação.

TABLE_ALREADY_EXISTS

A tabela listada na mensagem de erro já existe. Isso pode acontecer se já existir uma tabela de apoio para o pipeline. Neste caso, DROP a tabela listada no erro.

Exemplo com várias tabelas em um pipeline

o pipeline pode conter mais de uma tabela. Você ainda pode mover uma tabela por vez entre pipelines. Neste cenário, há três tabelas (table_a, table_b, table_c) que leem umas das outras sequencialmente no pipeline de origem. Queremos mover uma tabela, table_b, para outro pipeline.

Código do pipeline de origem inicial:

Python
from pyspark import pipelines as dp
from pyspark.sql.functions import col

@dp.table
def table_a():
return spark.read.table("source_table")

# Table to be moved to new pipeline:
@dp.table
def table_b():
return (
spark.read.table("table_a")
.select(col("column1"), col("column2"))
)

@dp.table
def table_c():
return (
spark.read.table("table_b")
.groupBy(col("column1"))
.agg(sum("column2").alias("sum_column2"))
)

Movemos table_b para outro pipeline copiando e removendo a definição da tabela da origem e atualizando o pipelineId de table_b .

Primeiro, pause qualquer programa e aguarde a conclusão das atualizações no pipeline de origem e de destino. Em seguida, modifique o pipeline de origem para remover o código da tabela que está sendo movida. O código de exemplo do pipeline de origem atualizado se torna:

Python
from pyspark import pipelines as dp
from pyspark.sql.functions import col

@dp.table
def table_a():
return spark.read.table("source_table")

# Removed, to be in new pipeline:
# @dp.table
# def table_b():
# return (
# spark.read.table("table_a")
# .select(col("column1"), col("column2"))
# )

@dp.table
def table_c():
return (
spark.read.table("table_b")
.groupBy(col("column1"))
.agg(sum("column2").alias("sum_column2"))
)

Vá para o editor SQL para executar o comando ALTER pipelineId .

SQL
ALTER MATERIALIZED VIEW table_b
SET TBLPROPERTIES("pipelines.pipelineId"="<new-pipeline-id>");

Em seguida, vá para o pipeline de destino e adicione a definição de table_b. Se o catálogo e o esquema default forem os mesmos nas configurações pipeline , nenhuma alteração de código será necessária.

O código do pipeline de destino:

Python
from pyspark import pipelines as dp
from pyspark.sql.functions import col

@dp.table(name="table_b")
def table_b():
return (
spark.read.table("table_a")
.select(col("column1"), col("column2"))
)

Se o catálogo e o esquema default forem diferentes nas configurações pipeline , você deverá adicionar o nome totalmente qualificado usando o catálogo e o esquema do pipeline .

Por exemplo, o código do pipeline de destino poderia ser:

Python
from pyspark import pipelines as dp
from pyspark.sql.functions import col

@dp.table(name="source_catalog.source_schema.table_b")
def table_b():
return (
spark.read.table("source_catalog.source_schema.table_a")
.select(col("column1"), col("column2"))
)

executar (ou reativar programar) para o pipeline de origem e de destino.

Os oleodutos agora estão separados. A consulta para table_c lê de table_b (agora no pipeline de destino) e table_b lê de table_a (no pipeline de origem). Quando você faz uma execução disparada no pipeline de origem, table_b não é atualizado porque não é mais gerenciado pelo pipeline de origem. O pipeline de origem trata table_b como uma tabela externa ao pipeline. Isso é comparável à definição de uma leitura de view materializada de uma tabela Delta no Unity Catalog que não é gerenciada pelo pipeline.

Limitações

A seguir estão as limitações para mover tabelas entre pipelines.

  • Visualizações materializadas e tabelas de transmissão criadas com Databricks SQL não são suportadas.
  • Tabelas ou visualizações privadas não são suportadas.
  • O pipeline de origem e destino deve ser pipeline. Pipelines nulos não são suportados.
  • O pipeline de origem e destino deve estar no mesmo workspace ou em espaços de trabalho diferentes que compartilhem o mesmo metastore.
  • Tanto o pipeline de origem quanto o de destino devem pertencer ao usuário que executa as operações de movimentação.
  • Se o pipeline de origem usar o modo de publicação default , o pipeline de destino também deverá usar o modo de publicação default . Não é possível mover uma tabela de um pipeline usando o modo de publicação default para um pipeline que usa o esquema LIVE (legado). Veja o esquema LIVE (legado).
  • Se o pipeline de origem e de destino estiverem usando o esquema LIVE (legado), eles deverão ter os mesmos valores catalog e target nas configurações.