execução a DLT pipeline in a fluxo de trabalho
O senhor pode executar um DLT pipeline como parte de um fluxo de trabalho de processamento de dados com Databricks Job, Apache Airflow, ou Azure Data Factory.
Empregos
O senhor pode orquestrar várias tarefas em um Databricks Job para implementar um fluxo de trabalho de processamento de dados. Para incluir um DLT pipeline em um trabalho, use o pipeline tarefa quando o senhor criar um trabalho. Consulte DLT pipeline tarefa for Job.
Apache Airflow
Apache Airflow é uma solução de código aberto para gerenciar e programar o fluxo de trabalho de dados. Airflow representa o fluxo de trabalho como um gráfico acíclico direcionado (DAGs) de operações. O senhor define um fluxo de trabalho em um arquivo Python e Airflow gerencia a programação e a execução. Para obter informações sobre como instalar e usar o Airflow com o Databricks, consulte Orquestrar o trabalho Databricks com o Apache Airflow .
Para executar um DLT pipeline como parte de um fluxo de trabalho Airflow, use o DatabricksSubmitRunOperator.
Requisitos
Os seguintes itens são necessários para usar o suporte do Airflow para DLT:
- Airflow versão 2.1.0 ou mais tarde.
- O pacote do provedor Databricks versão 2.1.0 ou mais tarde.
Exemplo
O exemplo a seguir cria um DAG do Airflow que aciona uma atualização para o pipeline DLT com o identificador 8279d543-063c-4d63-9926-dae38e35ce8b
:
from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow'
}
with DAG('dlt',
start_date=days_ago(2),
schedule_interval="@once",
default_args=default_args
) as dag:
opr_run_now=DatabricksSubmitRunOperator(
task_id='run_now',
databricks_conn_id='CONNECTION_ID',
pipeline_task={"pipeline_id": "8279d543-063c-4d63-9926-dae38e35ce8b"}
)
Substitua CONNECTION_ID
pelo identificador de uma conexãoAirflow com o seu workspace.
Salve este exemplo no diretório airflow/dags
e use a interface do usuário Airflow para view e acionar o DAG. Use a DLT UI para view os detalhes da atualização pipeline.
Fábrica de dados do Azure
O DLT e o Azure Data Factory incluem opções para configurar o número de novas tentativas quando ocorre uma falha. Se os valores de nova tentativa forem configurados em seu pipeline DLT e na atividade do Azure Data Factory que chama o pipeline, o número de novas tentativas será o valor de nova tentativa do Azure Data Factory multiplicado pelo valor de nova tentativa do DLT.
Por exemplo, se uma atualização do site pipeline falhar, a DLT tentará novamente a atualização até cinco vezes pelo site default. Se o retry do Azure Data Factory estiver configurado para três e o seu DLT pipeline usar o default de cinco retries, o DLT pipeline que estiver falhando poderá ser repetido até quinze vezes. Para evitar tentativas excessivas de nova tentativa quando as atualizações do pipeline falharem, a Databricks recomenda limitar o número de novas tentativas ao configurar o pipeline DLT ou a atividade do Azure Data Factory que chama o pipeline.
Para alterar a configuração de repetição do pipeline DLT, use a configuração pipelines.numUpdateRetryAttempts
ao configurar o pipeline.
Azure O Data Factory é um serviço ETL baseado em nuvem que permite ao senhor orquestrar a integração de dados e as transformações do fluxo de trabalho. Azure O Data Factory suporta diretamente a execução da tarefa Databricks em um fluxo de trabalho, incluindo o Notebook, a tarefa JAR e os scripts Python. O senhor também pode incluir um pipeline em um fluxo de trabalho chamando a API DLT de uma atividade Web do Azure Data Factory. Por exemplo, para acionar uma atualização de pipeline do Azure Data Factory:
-
Crie uma fábrica de dados ou abra uma fábrica de dados existente.
-
Quando a criação for concluída, abra a página da sua fábrica de dados e clique no bloco Open Azure Data Factory Studio . A interface de usuário do Azure Data Factory é exibida.
-
Crie um novo Azure Data Factory pipeline selecionando pipeline no menu suspenso New (Novo ) na interface do usuário do Azure Data Factory Studio.
-
Na caixa de ferramentas Activities , expanda General e arraste a atividade da Web para a tela do pipeline. Clique em Settings (Configurações ) tab e digite os seguintes valores:
Como prática recomendada de segurança ao se autenticar com ferramentas, sistemas, scripts e aplicativos automatizados, a Databricks recomenda que você use tokens OAuth.
Se o senhor usar a autenticação de tokens de acesso pessoal, a Databricks recomenda usar o acesso pessoal tokens pertencente à entidade de serviço em vez de usuários workspace. Para criar tokens o site para uma entidade de serviço, consulte gerenciar tokens para uma entidade de serviço.
-
URL :
https://<databricks-instance>/api/2.0/pipelines/<pipeline-id>/updates
.Substitua
<get-workspace-instance>
.Substitua
<pipeline-id>
pelo identificador do pipeline. -
Método : Selecione POST no menu suspenso.
-
Cabeçalhos : clique em + Novo . Na caixa de texto Nome , digite
Authorization
. Na caixa de texto Valor , digiteBearer <personal-access-token>
.Substitua
<personal-access-token>
por um Databricks tokens de acesso pessoal. -
Body : para passar parâmetros de solicitação adicionais, insira um documento JSON contendo os parâmetros. Por exemplo, para começar uma atualização e reprocessar todos os dados do site pipeline:
{"full_refresh": "true"}
. Se não houver parâmetros adicionais de solicitação, insira chaves vazias ({}
).
Para testar a atividade da Web, clique em Debug na barra de ferramentas do pipeline na interface do usuário do Data Factory. A saída e o status da execução, incluindo os erros, são exibidos na saída tab do Azure Data Factory pipeline. Use a DLT UI para view os detalhes da atualização pipeline.
Um requisito comum de fluxo de trabalho é começar uma tarefa após a conclusão de uma tarefa anterior. Como a solicitação DLT updates
é assíncrona - a solicitação retorna após o início da atualização, mas antes da conclusão da atualização -, a tarefa em seu Azure Data Factory pipeline com uma dependência da atualização DLT deve aguardar a conclusão da atualização. Uma opção para aguardar a conclusão da atualização é adicionar uma atividade Until após a atividade Web que aciona a atualização DLT. Na atividade Até:
- Adicione uma atividade Wait para aguardar um número configurado de segundos até a conclusão da atualização.
- Adicione uma atividade na Web após a atividade Wait que usa a solicitação de detalhes de atualização DLT para obter o status da atualização. O campo
state
na resposta retorna o estado atual da atualização, inclusive se ela foi concluída. - Use o valor do campo
state
para definir a condição de encerramento da atividade Until. O senhor também pode usar uma atividade Set Variable para adicionar uma variável de pipeline com base no valorstate
e usar essa variável para a condição de encerramento.