execução LakeFlow Pipeline declarativo em um fluxo de trabalho
Você pode executar o pipeline declarativo LakeFlow como parte de um fluxo de trabalho de processamento de dados com LakeFlow Jobs, Apache Airflow ou Azure Data Factory.
Empregos
Você pode orquestrar múltiplas tarefas em um trabalho Databricks para implementar um fluxo de trabalho de processamento de dados. Para incluir um pipeline em um Job, use a tarefa pipeline ao criar um Job. Veja tarefa de pipeline para Job.
Apache Airflow
Apache Airflow é uma solução de código aberto para gerenciamento e programação de fluxo de trabalho de dados. Airflow representa fluxo de trabalho como gráficos acíclicos direcionados (DAGs) de operações. Você define um fluxo de trabalho em um arquivo Python e Airflow gerencia a programação e execução. Para obter informações sobre como instalar e usar Airflow com Databricks, consulte Orquestrar trabalhos LakeFlow com Apache Airflow.
Para executar um pipeline como parte de um fluxo de trabalho Airflow , use o DatabricksSubmitRunOperator.
Requisitos
Os seguintes itens são necessários para usar o suporte Airflow para o pipeline declarativo LakeFlow :
- Versão 2.1.0 do Airflow ou mais tarde.
- O pacote do provedor Databricks versão 2.1.0 ou mais tarde.
Exemplo
O exemplo a seguir cria um DAG Airflow que aciona uma atualização para o pipeline com o identificador 8279d543-063c-4d63-9926-dae38e35ce8b
:
from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow'
}
with DAG('ldp',
start_date=days_ago(2),
schedule_interval="@once",
default_args=default_args
) as dag:
opr_run_now=DatabricksSubmitRunOperator(
task_id='run_now',
databricks_conn_id='CONNECTION_ID',
pipeline_task={"pipeline_id": "8279d543-063c-4d63-9926-dae38e35ce8b"}
)
Substitua CONNECTION_ID
pelo identificador de uma conexãoAirflow com seu workspace.
Salve este exemplo no diretório airflow/dags
e use a interface do usuário Airflow para view e acionar o DAG. Use a interface do usuário do pipeline declarativo LakeFlow para view os detalhes da atualização pipeline .
Fábrica de Dados do Azure
O pipeline declarativo LakeFlow e Azure Data Factory incluem opções para configurar o número de tentativas quando ocorre uma falha. Se os valores de nova tentativa estiverem configurados no seu pipeline e na atividade do Azure Data Factory que chama o pipeline, o número de novas tentativas será o valor de nova tentativa Azure Data Factory multiplicado pelo valor de nova tentativa do pipeline declarativo LakeFlow .
Por exemplo, se uma atualização pipeline falhar, o pipeline declarativo LakeFlow tentará a atualização novamente até cinco vezes por default. Se a nova tentativa Azure Data Factory estiver definida como três e seu pipeline usar o default de cinco tentativas, seu pipeline com falha poderá ser repetido até quinze vezes. Para evitar tentativas excessivas quando as atualizações do pipeline falham, a Databricks recomenda limitar o número de tentativas ao configurar o pipeline ou a atividade do Azure Data Factory que chama o pipeline.
Para alterar a configuração de nova tentativa do seu pipeline, use a configuração pipelines.numUpdateRetryAttempts
ao configurar o pipeline.
Azure Data Factory é um serviço ETL baseado em cloudque permite orquestrar integração de dados e transformações de fluxo de trabalho. Azure Data Factory oferece suporte direto à execução de tarefas Databricks em um fluxo de trabalho, incluindo Notebook, tarefa JAR e scripts Python . Você também pode incluir um pipeline em um fluxo de trabalho chamando a API de pipeline declarativa LakeFlow de uma atividade da Web Azure Data Factory. Por exemplo, para acionar uma atualização de pipeline do Azure Data Factory:
-
Crie uma fábrica de dados ou abra uma fábrica de dados existente.
-
Quando a criação for concluída, abra a página do seu data factory e clique no bloco Abrir Azure Data Factory Studio . A interface do usuário do Azure Data Factory é exibida.
-
Crie um novo pipeline Azure Data Factory selecionando pipeline no menu suspenso Novo na interface do usuário do Azure Data Factory Studio.
-
Na caixa de ferramentas Atividades , expanda Geral e arraste a atividade da Web para a tela do pipeline. Clique na tab Configurações e insira os seguintes valores:
Como prática recomendada de segurança, ao autenticar com ferramentas, sistemas, scripts e aplicativos automatizados, Databricks recomenda que você use access tokens pessoais pertencentes à entidade de serviço em vez de usuários workspace . Para criar tokens para entidade de serviço, consulte gerenciar tokens para uma entidade de serviço.
-
URL :
https://<databricks-instance>/api/2.0/pipelines/<pipeline-id>/updates
.Substitua
<get-workspace-instance>
.Substitua
<pipeline-id>
pelo identificador do pipeline. -
Método : Selecione POST no menu suspenso.
-
Cabeçalhos : Clique em + Novo . Na caixa de texto Nome , digite
Authorization
. Na caixa de texto Valor , insiraBearer <personal-access-token>
.Substitua
<personal-access-token>
por um access tokenpessoal Databricks . -
Corpo : Para passar parâmetros de solicitação adicionais, insira um documento JSON contendo os parâmetros. Por exemplo, para iniciar uma atualização e reprocessar todos os dados do pipeline:
{"full_refresh": "true"}
. Se não houver parâmetros de solicitação adicionais, insira chaves vazias ({}
).
Para testar a atividade da Web, clique em Depurar na barra de ferramentas do pipeline na interface do usuário do Data Factory. A saída e o status da execução, incluindo erros, são exibidos na tab Saída do pipeline do Azure Data Factory. Use a interface do usuário do pipeline declarativo LakeFlow para view os detalhes da atualização pipeline .
Um requisito comum do fluxo de trabalho é começar uma tarefa após a conclusão de uma tarefa anterior. Como a solicitação do pipeline declarativo LakeFlow updates
é assíncrona (a solicitação retorna após o início da atualização, mas antes da conclusão da atualização), a tarefa no pipeline do Azure Data Factory com uma dependência na atualização do pipeline declarativo LakeFlow deve aguardar a conclusão da atualização. Uma opção para aguardar a conclusão da atualização é adicionar uma atividade Until após a atividade Web que aciona a atualização do pipeline Declarativo LakeFlow . Na atividade Até:
- Adicione uma atividade Wait para aguardar um número configurado de segundos para a conclusão da atualização.
- Adicione uma atividade Web após a atividade Wait que usa a solicitação de detalhes de atualização do pipeline declarativo LakeFlow para obter o status da atualização. O campo
state
na resposta retorna o estado atual da atualização, incluindo se ela foi concluída. - Use o valor do campo
state
para definir a condição de término da atividade Until. Você também pode usar uma atividade Definir variável para adicionar uma variável de pipeline com base no valorstate
e usar essa variável para a condição de término.