Orquestre Job do Databricks com o Apache Airflow
Este artigo descreve o suporte do Apache Airflow para orquestrar pipeline de dados com Databricks, tem instruções para instalar e configurar o Airflow localmente e fornece um exemplo de implantação e execução de um fluxo de trabalho do Databricks com Airflow.
Job orquestração em um pipeline de dados
O desenvolvimento e a implantação de um pipeline de processamento de dados geralmente exigem o gerenciamento de dependências complexas entre tarefas. Por exemplo, um pipeline pode ler dados de uma origem, limpar os dados, transformar os dados limpos e gravar os dados transformados em um destino. Você também precisa de suporte para testes, programação e solução de erros ao operacionalizar um pipeline.
Os sistemas de fluxo de trabalho abordam esses desafios permitindo definir dependências entre tarefas, programar quando pipeline é executado e monitorar fluxos de trabalho. Apache Airflow é uma solução open source para gerenciamento e programação de pipeline de dados. Airflow representa pipeline de dados como gráficos acíclicos direcionados (DAGs) de operações. Você define um fluxo de trabalho em um arquivo Python e o Airflow gerencia a programação e a execução. A conexão Airflow Databricks permite aproveitar o mecanismo Spark otimizado oferecido pelo Databricks com o recurso programático do Airflow.
Requisitos
A integração entre o Airflow e o Databricks requer o Airflow versão 2.5.0 e posterior. Os exemplos neste artigo foram testados com o Airflow versão 2.6.1.
O Airflow requer Python 3.8, 3.9, 3.10 ou 3.11. Os exemplos neste artigo são testados com Python 3.8.
As instruções neste artigo para instalação e execução do Airflow requerem pipenv para criar um ambiente virtual Python.
Operadores de fluxo de ar para Databricks
Um Airflow DAG é composto por tarefas, onde cada tarefa executa um Airflow Operator. Os operadores do Airflow que suportam a integração com Databricks são implementados no provedor Databricks.
O provedor Databricks inclui operadores para executar várias tarefas em um Databricks workspace, incluindo a importação de dados para uma tabela, a execução de consultas SQL e o trabalho com pastas Git do Databricks.
O provedor Databricks implementa dois operadores para disparar Job:
O DatabricksRunNowOperator requer um Job do Databricks existente e usa o POST /api/2.1/Job/execução-now Solicitação de API para disparar uma execução. A Databricks recomenda usar
DatabricksRunNowOperator
porque reduz a duplicação de definições Job , e a execução Job acionada com esse operador pode ser encontrada na IU do trabalho.O DatabricksSubmitRunOperator não requer a existência de um Job no Databricks e usa o POST /api/2.1/Job/execução/submit Solicitação de API para enviar a especificação Job e disparar uma execução.
Para criar um novo Job do Databricks ou Reset um Job existente, o provedor do Databricks implementa o DatabricksCreateJobsOperator. O DatabricksCreateJobsOperator
usa o JobPOST /api/2.1/ /create e POST /api/2.1/Job/Reset Solicitações de API. Você pode usar o DatabricksCreateJobsOperator
com o DatabricksRunNowOperator
para criar e executar um Job.
Observação
A utilização dos operadores Databricks para desencadear um Job requer o fornecimento de credenciais na configuração da ligação Databricks. Consulte Criar um access token pessoal do Databricks para o Airflow.
Os operadores do Databricks Airflow gravam a URL da página de execução Job nos logs do Airflow a cada polling_period_seconds
(o default é 30 segundos). Para obter mais informações, consulte a página do pacote apache-airflow-providers-databricks no site do Airflow.
Instale a integração do Airflow Databricks localmente
Para instalar o Airflow e o provedor Databricks localmente para teste e desenvolvimento, use os seguintes passos. Para outras opções de instalação do Airflow, incluindo a criação de uma instalação de produção, consulte instalação na documentação do Airflow.
Abra um terminal e execute o seguinte comando:
mkdir airflow
cd airflow
pipenv --python 3.8
pipenv shell
export AIRFLOW_HOME=$(pwd)
pipenv install apache-airflow
pipenv install apache-airflow-providers-databricks
mkdir dags
airflow db init
airflow users create --username admin --firstname <firstname> --lastname <lastname> --role Admin --email <email>
Substitua <firstname>
, <lastname>
e <email>
pelo seu nome de usuário e email. Você será solicitado a inserir uma senha para o usuário administrador. Certifique-se de salvar essa senha, pois ela é necessária para log in na IU do Airflow.
Este script executa os seguintes passos:
Cria um diretório chamado
airflow
e muda para esse diretório.Usa
pipenv
para criar e gerar um ambiente virtual Python. A Databricks recomenda a utilização de um ambiente virtual Python para isolar versões de pacotes e dependências de código para esse ambiente. Esse isolamento ajuda a reduzir incompatibilidades inesperadas de versões de pacotes e colisões de dependência de código.Inicializa uma variável de ambiente chamada
AIRFLOW_HOME
definida como o caminho do diretórioairflow
.Instala o Airflow e o pacote do provedor Airflow Databricks.
Cria um diretório
airflow/dags
. O Airflow usa o diretóriodags
para armazenar definições de DAG.Inicializa um banco de dados SQLite que o Airflow usa para rastrear metadados. Em uma implantação de produção do Airflow, você configuraria o Airflow com um banco de dados padrão. O banco de dados SQLite e a configuração default para sua implantação do Airflow são inicializados no diretório
airflow
.Cria um usuário administrador para o Airflow.
Dica
Para confirmar a instalação do provedor Databricks, execute o seguinte comando no diretório de instalação do Airflow:
airflow providers list
começar o servidor web e programador Airflow
O servidor da web Airflow é necessário para view a IU do Airflow. Para iniciar o servidor web, abra um terminal no diretório de instalação do Airflow e execute o seguinte comando:
Observação
Se o servidor da web do Airflow não reiniciar devido a um conflito de porta, você poderá alterar a porta default na configuração do Airflow.
pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow webserver
O programador é o componente Airflow que programa DAGs. Para iniciar o programador, abra um novo terminal no diretório de instalação do Airflow e execute o seguinte comando:
pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow scheduler
Teste a instalação do Airflow
Para verificar a instalação do Airflow, você pode executar um dos DAGs de exemplo incluídos no Airflow:
Em uma janela do navegador, abra
http://localhost:8080/home
. log in na interface do Airflow com o nome de usuário e a senha que você criou ao instalar o Airflow. A página Airflow DAGs é exibida.Clique no botão pausar/Reiniciar DAG para retomar um dos DAGs de exemplo, por exemplo, o
example_python_operator
.Acione o DAG de exemplo clicando no botão Acionar DAG .
Clique no nome do DAG para view os detalhes, incluindo o status de execução do DAG.
Criar access tokens pessoal do Databricks para Airflow
O Airflow se conecta ao Databricks usando um access token pessoal (PAT) do Databricks. Para criar um PAT:
Em seu Databricks workspace, clique em seu nome de usuário Databricks na barra superior e selecione Settings (Configurações ) no menu suspenso.
Clique em Desenvolvedor.
Ao lado do access token, clique em gerenciar.
Clique em Gerar novos tokens.
(Opcional) Insira um comentário que o ajude a identificar esse token no futuro e altere o tempo de vida padrão do token de 90 dias. Para criar um token sem vida útil (não recomendado), deixe a caixa Duração (dias) vazia (em branco).
Clique em Gerar.
Copie o token exibido em um local seguro e clique em Concluído.
Observação
Certifique-se de salvar os tokens copiados em um local seguro. Não compartilhe seus tokens copiados com outras pessoas. Se você perder os tokens copiados, não poderá regenerar exatamente os mesmos tokens. Em vez disso, você deve repetir este procedimento para criar novos tokens. Se você perder os tokens copiados ou acreditar que os tokens foram comprometidos, o Databricks recomenda fortemente que você exclua imediatamente esses tokens do seu workspace clicando no ícone da lixeira (Revogar) ao lado dos tokens na página access tokens .
Se não for possível criar ou usar tokens no seu workspace, isso pode ocorrer porque o administrador do workspace desativou os tokens ou não lhe deu permissão para criar ou usar tokens. Consulte o administrador do workspace ou o seguinte:
Observação
Como prática recomendada de segurança ao se autenticar com ferramentas, sistemas, scripts e aplicativos automatizados, a Databricks recomenda que você use tokens OAuth.
Se utilizar a autenticação access token pessoal, a Databricks recomenda a utilização access token pessoal pertencente à entidade de serviço em vez de utilizadores workspace . Para criar tokens para entidades de serviço, consulte gerenciar tokens para uma entidade de serviço.
Você também pode autenticar-se no Databricks usando o Databricks OAuth para entidade de serviço. Consulte Conexão do Databricks na documentação do Airflow.
Configurar uma Databricks Connect
Sua instalação do Airflow contém uma conexão default para Databricks. Para atualizar a conexão para se conectar ao seu workspace usando os access tokens pessoal que você criou acima:
Em uma janela do navegador, abra
http://localhost:8080/connection/list/
. Se for solicitado a fazer login, digite seu nome de usuário e senha de administrador.Em Conn ID, localize databricks_default e clique no botão Editar registro .
Substitua o valor no campo Host pelo nome da instância do workspace da implantação do Databricks, por exemplo,
https://adb-123456789.cloud.databricks.com
.No campo Senha , insira seu access token pessoal do Databricks.
Clique em Salvar.
Exemplo: Criar um Airflow DAG para executar um Jobdo Databricks
O exemplo a seguir demonstra como criar uma implantação simples do Airflow que é executada em sua máquina local e implantou um DAG de exemplo para acionar a execução no Databricks. Neste exemplo, você irá:
Crie um novo Notebook e adicione o código para imprimir uma saudação com base em um parâmetro configurado.
Crie um Job do Databricks com uma única tarefa que executa o Notebook.
Configure uma conexão Airflow para seu workspace Databricks.
Crie um DAG do Airflow para acionar o Job Notebook. Você define o DAG em um script Python usando
DatabricksRunNowOperator
.Use a IU do Airflow para acionar o DAG e view o status da execução.
Criar um Notebook
Este exemplo usa um Notebook contendo duas células:
A primeira célula contém um widget de texto de utilidades do Databricks definindo uma variável chamada
greeting
definida com o valor defaultworld
.A segunda célula imprime o valor da variável
greeting
prefixada porhello
.
Para criar o Notebook:
Vá para o seu workspace do Databricks, clique em Novo na barra lateral e selecione Notebook.
Dê um nome ao seu Notebook , como Hello Airflow e certifique-se de que o idioma default esteja definido como Python.
Copie o seguinte código Python e cole-o na primeira célula do Notebook.
dbutils.widgets.text("greeting", "world", "Greeting") greeting = dbutils.widgets.get("greeting")
Adicione uma nova célula abaixo da primeira célula e copie e cole o seguinte código Python na nova célula:
print("hello {}".format(greeting))
Criar um Job
Clique fluxo de trabalho na barra lateral.
Clique .
A aba Tarefas aparece com a caixa de diálogo de criação de tarefas.
Substitua Adicione um nome para o seu Job com o nome do seu Job .
No campo Nome da tarefa , insira um nome para a tarefa, por exemplo, saudação-tarefa.
No menu suspenso Tipo , selecione Notebook.
No menu suspenso Fonte , selecione workspace.
Clique na caixa de texto Caminho e use o navegador de arquivos para localizar o Notebook que você criou, clique no nome Notebook e clique em Confirmar.
Clique em Adicionar em Parâmetros. No campo key , digite
greeting
. No campo Valor , insiraAirflow user
.Clique em Criar tarefa.
No painel DetalhesJob , copie o valor do IDJob . Este valor é necessário para acionar o Job do Airflow.
execução do Job
Para testar seu novo Job na interface de fluxo de trabalho do Databricks, clique em no canto superior direito. Quando a execução for concluída, você poderá verificar a saída visualizando os detalhes da execuçãoJob .
Criar um novo DAG do Airflow
Você define um Airflow DAG em um arquivo Python. Para criar um DAG para acionar o exemplo Notebook Job:
Em um editor de texto ou IDE, crie um novo arquivo chamado
databricks_dag.py
com o seguinte conteúdo:from airflow import DAG from airflow.providers.databricks.operators.databricks import DatabricksRunNowOperator from airflow.utils.dates import days_ago default_args = { 'owner': 'airflow' } with DAG('databricks_dag', start_date = days_ago(2), schedule_interval = None, default_args = default_args ) as dag: opr_run_now = DatabricksRunNowOperator( task_id = 'run_now', databricks_conn_id = 'databricks_default', job_id = JOB_ID )
Substitua
JOB_ID
pelo valor do ID Job salvo anteriormente.Salve o arquivo no diretório
airflow/dags
. O Airflow lê e instala automaticamente arquivos DAG armazenados emairflow/dags/
.
Instale e verifique o DAG no Airflow
Para acionar e verificar o DAG na IU do Airflow:
Em uma janela do navegador, abra
http://localhost:8080/home
. A tela Airflow DAGs é exibida.Localize
databricks_dag
e clique no botão pausar/Retomar DAG para retomar o DAG.Acione o DAG clicando no botão Trigger DAG .
Clique em uma execução na coluna Execuções para view o status e os detalhes da execução.