Pular para o conteúdo principal

execução a DLT pipeline in a fluxo de trabalho

O senhor pode executar um DLT pipeline como parte de um fluxo de trabalho de processamento de dados com Databricks Job, Apache Airflow, ou Azure Data Factory.

Empregos

O senhor pode orquestrar várias tarefas em um Databricks Job para implementar um fluxo de trabalho de processamento de dados. Para incluir um DLT pipeline em um trabalho, use o pipeline tarefa quando o senhor criar um trabalho. Consulte DLT pipeline tarefa for Job.

Apache Airflow

Apache Airflow é uma solução de código aberto para gerenciar e programar o fluxo de trabalho de dados. Airflow representa o fluxo de trabalho como um gráfico acíclico direcionado (DAGs) de operações. O senhor define um fluxo de trabalho em um arquivo Python e Airflow gerencia a programação e a execução. Para obter informações sobre como instalar e usar o Airflow com o Databricks, consulte Orquestrar o trabalho Databricks com o Apache Airflow .

Para executar um DLT pipeline como parte de um fluxo de trabalho Airflow, use o DatabricksSubmitRunOperator.

Requisitos

Os seguintes itens são necessários para usar o suporte do Airflow para DLT:

Exemplo

O exemplo a seguir cria um DAG do Airflow que aciona uma atualização para o pipeline DLT com o identificador 8279d543-063c-4d63-9926-dae38e35ce8b:

Python
from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
from airflow.utils.dates import days_ago

default_args = {
'owner': 'airflow'
}

with DAG('dlt',
start_date=days_ago(2),
schedule_interval="@once",
default_args=default_args
) as dag:

opr_run_now=DatabricksSubmitRunOperator(
task_id='run_now',
databricks_conn_id='CONNECTION_ID',
pipeline_task={"pipeline_id": "8279d543-063c-4d63-9926-dae38e35ce8b"}
)

Substitua CONNECTION_ID pelo identificador de uma conexãoAirflow com o seu workspace.

Salve este exemplo no diretório airflow/dags e use a interface do usuário Airflow para view e acionar o DAG. Use a DLT UI para view os detalhes da atualização pipeline.

Fábrica de dados do Azure

nota

O DLT e o Azure Data Factory incluem opções para configurar o número de novas tentativas quando ocorre uma falha. Se os valores de nova tentativa forem configurados em seu pipeline DLT e na atividade do Azure Data Factory que chama o pipeline, o número de novas tentativas será o valor de nova tentativa do Azure Data Factory multiplicado pelo valor de nova tentativa do DLT.

Por exemplo, se uma atualização do site pipeline falhar, a DLT tentará novamente a atualização até cinco vezes pelo site default. Se o retry do Azure Data Factory estiver configurado para três e o seu DLT pipeline usar o default de cinco retries, o DLT pipeline que estiver falhando poderá ser repetido até quinze vezes. Para evitar tentativas excessivas de nova tentativa quando as atualizações do pipeline falharem, a Databricks recomenda limitar o número de novas tentativas ao configurar o pipeline DLT ou a atividade do Azure Data Factory que chama o pipeline.

Para alterar a configuração de repetição do pipeline DLT, use a configuração pipelines.numUpdateRetryAttempts ao configurar o pipeline.

Azure O Data Factory é um serviço ETL baseado em nuvem que permite ao senhor orquestrar a integração de dados e as transformações do fluxo de trabalho. Azure O Data Factory suporta diretamente a execução da tarefa Databricks em um fluxo de trabalho, incluindo o Notebook, a tarefa JAR e os scripts Python. O senhor também pode incluir um pipeline em um fluxo de trabalho chamando a API DLT de uma atividade Web do Azure Data Factory. Por exemplo, para acionar uma atualização de pipeline do Azure Data Factory:

  1. Crie uma fábrica de dados ou abra uma fábrica de dados existente.

  2. Quando a criação for concluída, abra a página da sua fábrica de dados e clique no bloco Open Azure Data Factory Studio . A interface de usuário do Azure Data Factory é exibida.

  3. Crie um novo Azure Data Factory pipeline selecionando pipeline no menu suspenso New (Novo ) na interface do usuário do Azure Data Factory Studio.

  4. Na caixa de ferramentas Activities , expanda General e arraste a atividade da Web para a tela do pipeline. Clique em Settings (Configurações ) tab e digite os seguintes valores:

nota

Como prática recomendada de segurança ao se autenticar com ferramentas, sistemas, scripts e aplicativos automatizados, a Databricks recomenda que você use tokens OAuth.

Se o senhor usar a autenticação de tokens de acesso pessoal, a Databricks recomenda usar o acesso pessoal tokens pertencente à entidade de serviço em vez de usuários workspace. Para criar tokens o site para uma entidade de serviço, consulte gerenciar tokens para uma entidade de serviço.

  • URL : https://<databricks-instance>/api/2.0/pipelines/<pipeline-id>/updates.

    Substitua <get-workspace-instance>.

    Substitua <pipeline-id> pelo identificador do pipeline.

  • Método : Selecione POST no menu suspenso.

  • Cabeçalhos : clique em + Novo . Na caixa de texto Nome , digite Authorization. Na caixa de texto Valor , digite Bearer <personal-access-token>.

    Substitua <personal-access-token> por um Databricks tokens de acesso pessoal.

  • Body : para passar parâmetros de solicitação adicionais, insira um documento JSON contendo os parâmetros. Por exemplo, para começar uma atualização e reprocessar todos os dados do site pipeline: {"full_refresh": "true"}. Se não houver parâmetros adicionais de solicitação, insira chaves vazias ({}).

Para testar a atividade da Web, clique em Debug na barra de ferramentas do pipeline na interface do usuário do Data Factory. A saída e o status da execução, incluindo os erros, são exibidos na saída tab do Azure Data Factory pipeline. Use a DLT UI para view os detalhes da atualização pipeline.

dica

Um requisito comum de fluxo de trabalho é começar uma tarefa após a conclusão de uma tarefa anterior. Como a solicitação DLT updates é assíncrona - a solicitação retorna após o início da atualização, mas antes da conclusão da atualização -, a tarefa em seu Azure Data Factory pipeline com uma dependência da atualização DLT deve aguardar a conclusão da atualização. Uma opção para aguardar a conclusão da atualização é adicionar uma atividade Until após a atividade Web que aciona a atualização DLT. Na atividade Até:

  1. Adicione uma atividade Wait para aguardar um número configurado de segundos até a conclusão da atualização.
  2. Adicione uma atividade na Web após a atividade Wait que usa a solicitação de detalhes de atualização DLT para obter o status da atualização. O campo state na resposta retorna o estado atual da atualização, inclusive se ela foi concluída.
  3. Use o valor do campo state para definir a condição de encerramento da atividade Until. O senhor também pode usar uma atividade Set Variable para adicionar uma variável de pipeline com base no valor state e usar essa variável para a condição de encerramento.