execução de um pipeline Delta Live Tables em um fluxo de trabalho

Você pode executar um pipeline Delta Live Tables como parte de um fluxo de trabalho de processamento de dados com Databricks Job, Apache Airflow ou Azure Data Factory.

Empregos

O senhor pode orquestrar várias tarefas em um Databricks Job para implementar um fluxo de trabalho de processamento de dados. Para incluir um Delta Live Tables pipeline em um trabalho, use a tarefa de pipeline quando o senhor criar um trabalho. Consulte Delta Live Tables pipeline tarefa for Job.

Apache Airflow

Apache Airflow é uma solução de código aberto para gerenciar e programar o fluxo de trabalho de dados. Airflow representa o fluxo de trabalho como um gráfico acíclico direcionado (DAGs) de operações. O senhor define um fluxo de trabalho em um arquivo Python e Airflow gerencia a programação e a execução. Para obter informações sobre como instalar e usar o Airflow com o Databricks, consulte Orquestrar o trabalho Databricks com o Apache Airflow .

Para executar um pipeline Delta Live Tables como parte de um fluxo de trabalho do Airflow, use o DatabricksSubmitRunOperator.

Requisitos

Os itens a seguir são necessários para usar o suporte Airflow para Delta Live Tables:

Exemplo

O exemplo a seguir cria um Airflow DAG que aciona uma atualização para o pipeline Delta Live Tables com o identificador 8279d543-063c-4d63-9926-dae38e35ce8b:

from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
from airflow.utils.dates import days_ago

default_args = {
  'owner': 'airflow'
}

with DAG('dlt',
         start_date=days_ago(2),
         schedule_interval="@once",
         default_args=default_args
         ) as dag:

  opr_run_now=DatabricksSubmitRunOperator(
    task_id='run_now',
    databricks_conn_id='CONNECTION_ID',
    pipeline_task={"pipeline_id": "8279d543-063c-4d63-9926-dae38e35ce8b"}
  )

Substitua CONNECTION_ID pelo identificador de uma conexãoAirflow com o site workspace.

Salve este exemplo no diretório airflow/dags e use a interface do usuário Airflow para view e acionar o DAG. Use a interface do usuário Delta Live Tables para view os detalhes da atualização pipeline.

Fábrica de dados do Azure

Azure O Data Factory é cloudum ETL serviço baseado no site que permite que o senhor orquestre a integração de dados e as transformações do fluxo de trabalho. Azure O Data Factory suporta diretamente a execução da tarefa Databricks em um fluxo de trabalho, incluindo o Notebook, a tarefa JAR e os scripts Python. O senhor também pode incluir um pipeline em um fluxo de trabalho chamando a API Delta Live Tables de uma atividade da Web do Azure Data Factory. Por exemplo, para acionar uma atualização de pipeline do Azure Data Factory:

  1. Crie um data factory ou abra um data factory existente.

  2. Quando a criação for concluída, abra a página do seu data factory e clique no bloco Open Azure Data Factory Studio . A interface de usuário do Azure Data Factory é exibida.

  3. Crie um novo pipeline do Azure Data Factory selecionando Pipeline no menu suspenso Novo na interface do usuário do Azure Data Factory Studio.

  4. Na caixa de ferramentas Atividades , expanda Geral e arraste a atividade da Web para a tela do pipeline. Clique na tab Configurações e insira os seguintes valores:

    Observação

    Como prática recomendada de segurança ao se autenticar com ferramentas, sistemas, scripts e aplicativos automatizados, a Databricks recomenda que você use tokens OAuth.

    Se o senhor usar a autenticação pessoal access token, a Databricks recomenda o uso de pessoal access tokens pertencente à entidade de serviço em vez de usuários workspace. Para criar o site tokens para uma entidade de serviço, consulte gerenciar tokens para uma entidade de serviço.

    • URL: https://<databricks-instance>/api/2.0/pipelines/<pipeline-id>/updates.

      Substitua <get-workspace-instance>.

      Substitua <pipeline-id> pelo identificador do pipeline.

    • Método: Selecione POST no menu suspenso.

    • Cabeçalhos: Clique em + Novo. Na caixa de texto Nome , digite Authorization. Na caixa de texto Valor , digite Bearer <personal-access-token>.

      Substitua <personal-access-token> por um access tokenpessoal do Databricks.

    • Corpo: para passar parâmetros de solicitação adicionais, insira um documento JSON contendo os parâmetros. Por exemplo, para iniciar uma atualização e reprocessar todos os dados para o pipeline: {"full_refresh": "true"}. Se não houver parâmetros de solicitação adicionais, insira chaves vazias ({}).

Para testar a atividade da Web, clique em Depurar na barra de ferramentas do pipeline na interface do usuário do Data Factory. A saída e o status da execução, incluindo erros, são exibidos na tab Saída do pipeline do Azure Data Factory. Use a IU do Delta Live Tables para view os detalhes da atualização do pipeline.

Dica

Um requisito de fluxo de trabalho comum é iniciar uma tarefa após a conclusão de uma tarefa anterior. Como a solicitação Delta Live Tables updates é assíncrona — a solicitação retorna depois de iniciar a atualização, mas antes da conclusão da atualização — as tarefas em seu pipeline do Azure Data Factory com dependência da atualização Delta Live Tables devem aguardar a conclusão da atualização. Uma opção para aguardar a conclusão da atualização é adicionar uma atividade Until após a atividade da Web que aciona a atualização Delta Live Tables. Na atividade Até:

  1. Adicione uma atividade Aguardar para aguardar um número configurado de segundos para a conclusão da atualização.

  2. Adicione uma atividade da Web após a atividade Wait que usa a solicitação de detalhes de atualização do Delta Live Tables para obter o status da atualização. O campo state na resposta retorna o estado atual da atualização, inclusive se ela foi concluída.

  3. Use o valor do campo state para definir a condição de encerramento da atividade Até. Você também pode usar uma atividade Definir variável para adicionar uma variável de pipeline com base no valor state e usar essa variável para a condição de encerramento.