Pular para o conteúdo principal

Crie um pipeline Unity Catalog clonando um pipeline Hive metastore

Este artigo descreve a solicitação clone a pipeline na API REST Databricks e como você pode usá-la para copiar um pipeline existente que publica no Hive metastore para um novo pipeline que publica no Unity Catalog. Quando você chama a solicitação clone a pipeline , ela:

  • Copia o código-fonte e a configuração do pipeline existente para um novo, aplicando quaisquer substituições de configuração especificadas.
  • Atualiza as definições e referências da view materializada e da tabela de transmissão com as alterações necessárias para que esses objetos sejam gerenciados pelo Unity Catalog.
  • iniciar uma atualização pipeline para migrar os dados e metadados existentes, como pontos de verificação, para quaisquer tabelas de transmissão no pipeline. Isso permite que essas tabelas de transmissão retomem o processamento no mesmo ponto do pipeline original.

Após a conclusão das operações de clone, tanto o pipeline original quanto o novo podem ser executados de forma independente.

Este artigo inclui exemplos de como chamar a solicitação API diretamente e por meio de um script Python de um Databricks Notebook.

Antes de começar

Os seguintes itens são necessários antes de clonar um pipeline:

  • Para clonar um pipeline Hive metastore , as tabelas e a exibição definidas no pipeline devem publicar tabelas em um esquema de destino. Para saber como adicionar um esquema de destino a um pipeline, consulte Configurar um pipeline para publicar no Hive metastore.

  • Referências às tabelas de gerenciamento Hive metastore ou à exibição no pipeline a ser clonado devem ser totalmente qualificadas com o catálogo (hive_metastore), esquema e nome da tabela. Por exemplo, no código a seguir, criando um dataset customers , o argumento do nome da tabela deve ser atualizado para hive_metastore.sales.customers:

    Python
    @dp.table
    def customers():
    return spark.read.table("sales.customers").where(...)
  • Não edite o código-fonte do pipeline Hive metastore enquanto uma operação de clonagem estiver em andamento, incluindo o Notebook configurado como parte do pipeline e quaisquer módulos armazenados em pastas Git ou arquivos workspace .

  • O pipeline Hive metastore de origem não deve estar em execução quando você iniciar as operações de clonagem. Se uma atualização estiver em execução, interrompa-a ou aguarde sua conclusão.

A seguir estão outras considerações importantes antes de clonar um pipeline:

  • Se as tabelas no pipeline Hive metastore especificarem um local de armazenamento usando o argumento path em Python ou LOCATION em SQL, passe a configuração "pipelines.migration.ignoreExplicitPath": "true" para a solicitação de clone. A definição desta configuração está incluída nas instruções abaixo.
  • Se o pipeline Hive metastore incluir uma fonte Auto Loader que especifique um valor para a opção cloudFiles.schemaLocation , e o pipeline Hive metastore permanecer operacional após a criação do clone Unity Catalog , você deverá definir a opção mergeSchema como true no pipeline do Hive metastore e no pipeline Unity Catalog clonado. Adicionar esta opção ao pipeline Hive metastore antes da clonagem copiará a opção para o novo pipeline.

Clonar um pipeline com a API REST do Databricks

O exemplo a seguir usa o comando curl para chamar a solicitação clone a pipeline na API REST do Databricks:

Bash
curl -X POST \
--header "Authorization: Bearer <personal-access-token>" \
<databricks-instance>/api/2.0/pipelines/<pipeline-id>/clone \
--data @clone-pipeline.json

Substituir:

clone-pipeline.JSON:

JSON
{
"catalog": "<target-catalog-name>",
"target": "<target-schema-name>",
"name": "<new-pipeline-name>"
"clone_mode": "MIGRATE_TO_UC",
"configuration": {
"pipelines.migration.ignoreExplicitPath": "true"
}
}

Substituir:

  • <target-catalog-name> com o nome de um catálogo no Unity Catalog no qual o novo pipeline deve publicar. Este deve ser um catálogo existente.
  • <target-schema-name> com o nome de um esquema no Unity Catalog no qual o novo pipeline deve publicar se for diferente do nome do esquema atual. Este parâmetro é opcional e, se não for especificado, o nome do esquema existente será usado.
  • <new-pipeline-name> com um nome opcional para o novo pipeline. Se não for especificado, o novo pipeline será nomeado usando o nome do pipeline de origem com [UC] anexado.

clone_mode especifica o modo a ser usado para as operações de clonagem. MIGRATE_TO_UC é a única opção suportada.

Use o campo configuration para especificar configurações no novo pipeline. Os valores definidos aqui substituem as configurações no pipeline original.

A resposta da solicitação da API REST clone é o ID do pipeline do novo pipeline do Unity Catalog.

Clonar um pipeline de um Databricks Notebook

O exemplo a seguir chama a solicitação create a pipeline de um script Python. Você pode usar um Databricks Notebook para executar este script:

  1. Crie um novo Notebook para o script. Veja Criar um Notebook.

  2. Copie o seguinte script Python na primeira célula do Notebook.

  3. Atualize os valores do espaço reservado no script substituindo:

    • <databricks-instance> com o nome da instânciaworkspace Databricks , por exemplo 1234567890123456.7.gcp.databricks.com
    • <pipeline-id> com o identificador exclusivo do pipeline Hive metastore a ser clonado. Você pode encontrar o ID pipeline na interface do usuário do pipeline declarativoLakeFlow.
    • <target-catalog-name> com o nome de um catálogo no Unity Catalog no qual o novo pipeline deve publicar. Este deve ser um catálogo existente.
    • <target-schema-name> com o nome de um esquema no Unity Catalog no qual o novo pipeline deve publicar se for diferente do nome do esquema atual. Este parâmetro é opcional e, se não for especificado, o nome do esquema existente será usado.
    • <new-pipeline-name> com um nome opcional para o novo pipeline. Se não for especificado, o novo pipeline será nomeado usando o nome do pipeline de origem com [UC] anexado.
  4. executar o script. Consulte execução Databricks Notebook.

Python
import requests

# Your Databricks workspace URL, with no trailing spaces
WORKSPACE = "<databricks-instance>"

# The pipeline ID of the Hive metastore pipeline to clone
SOURCE_PIPELINE_ID = "<pipeline-id>"
# The target catalog name in Unity Catalog
TARGET_CATALOG = "<target-catalog-name>"
# (Optional) The name of a target schema in Unity Catalog. If empty, the same schema name as the Hive metastore pipeline is used
TARGET_SCHEMA = "<target-schema-name>"
# (Optional) The name of the new pipeline. If empty, the following is used for the new pipeline name: f"{originalPipelineName} [UC]"
CLONED_PIPELINE_NAME = "<new-pipeline-name>"

# This is the only supported clone mode
CLONE_MODE = "MIGRATE_TO_UC"

# Specify override configurations
OVERRIDE_CONFIGS = {"pipelines.migration.ignoreExplicitPath": "true"}

def get_token():
ctx = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
return getattr(ctx, "apiToken")().get()

def check_source_pipeline_exists():
data = requests.get(
f"{WORKSPACE}/api/2.0/pipelines/{SOURCE_PIPELINE_ID}",
headers={"Authorization": f"Bearer {get_token()}"},
)

assert data.json()["pipeline_id"] == SOURCE_PIPELINE_ID, "The provided source pipeline does not exist!"

def request_pipeline_clone():
payload = {
"catalog": TARGET_CATALOG,
"clone_mode": CLONE_MODE,
}
if TARGET_SCHEMA != "":
payload["target"] = TARGET_SCHEMA
if CLONED_PIPELINE_NAME != "":
payload["name"] = CLONED_PIPELINE_NAME
if OVERRIDE_CONFIGS:
payload["configuration"] = OVERRIDE_CONFIGS

data = requests.post(
f"{WORKSPACE}/api/2.0/pipelines/{SOURCE_PIPELINE_ID}/clone",
headers={"Authorization": f"Bearer {get_token()}"},
json=payload,
)
response = data.json()
return response

check_source_pipeline_exists()
request_pipeline_clone()

Limitações

As seguintes são limitações da solicitação da API do pipeline declarativo LakeFlow clone a pipeline :

  • Somente a clonagem de um pipeline configurado para usar o Hive metastore para um pipeline Unity Catalog é suportada.

  • Você pode criar um clone somente no mesmo workspace Databricks que o pipeline do qual você está clonando.

  • O pipeline que você está clonando pode incluir apenas as seguintes fontes de transmissão:

  • Se o pipeline Hive metastore você está clonando usar o modo de notificação de arquivo Auto Loader , Databricks recomenda não executar o pipeline Hive metastore após a clonagem. Isso ocorre porque a execução do pipeline Hive metastore resulta na remoção de alguns eventos de notificação de arquivo do clone Unity Catalog . Se o pipeline Hive metastore de origem não for executado após a conclusão das operações de clonagem, você poderá preencher os arquivos ausentes usando Auto Loader com a opção cloudFiles.backfillInterval . Para saber mais sobre o modo de notificação de arquivo Auto Loader , consulte Configurar transmissão Auto Loader no modo de notificação de arquivo. Para saber mais sobre o preenchimento de arquivos com o Auto Loader, consulte Acionar preenchimentos regulares usando cloudFiles.backfillInterval e opções comuns Auto Loader.

  • As tarefas de manutenção do pipeline são pausadas automaticamente para ambos os pipelines enquanto a clonagem está em andamento.

  • O seguinte se aplica às consultas de viagem do tempo em tabelas no pipeline clonado Unity Catalog :

    • Se uma versão de tabela foi originalmente gravada em um objeto Hive metastore gerenciar, as consultas de viagem do tempo que usam uma cláusula timestamp_expression serão indefinidas ao consultar o objeto clonado Unity Catalog .
    • Entretanto, se a versão da tabela foi gravada no objeto clonado Unity Catalog , as consultas viagem do tempo usando uma cláusula timestamp_expression funcionarão corretamente.
    • Consultas de viagem do tempo usando uma cláusula version funcionam corretamente ao consultar um objeto clonado Unity Catalog , mesmo quando a versão foi originalmente gravada no objeto Hive metastore gerenciar.
  • Para outras limitações ao usar o pipeline declarativo LakeFlow com Unity Catalog, consulte Limitações pipeline Unity Catalog.

  • Para limitações Unity Catalog , consulte LimitaçõesUnity Catalog.