Orquestre Job do Databricks com o Apache Airflow

Este artigo descreve o suporte do Apache Airflow para orquestrar pipeline de dados com Databricks, tem instruções para instalar e configurar o Airflow localmente e fornece um exemplo de implantação e execução de um fluxo de trabalho do Databricks com Airflow.

Job orquestração em um pipeline de dados

O desenvolvimento e a implantação de um pipeline de processamento de dados geralmente exigem o gerenciamento de dependências complexas entre tarefas. Por exemplo, um pipeline pode ler dados de uma origem, limpar os dados, transformar os dados limpos e gravar os dados transformados em um destino. Você também precisa de suporte para testes, programação e solução de erros ao operacionalizar um pipeline.

Os sistemas de fluxo de trabalho abordam esses desafios permitindo definir dependências entre tarefas, programar quando pipeline é executado e monitorar fluxos de trabalho. Apache Airflow é uma solução open source para gerenciamento e programação de pipeline de dados. Airflow representa pipeline de dados como gráficos acíclicos direcionados (DAGs) de operações. Você define um fluxo de trabalho em um arquivo Python e o Airflow gerencia a programação e a execução. A conexão Airflow Databricks permite aproveitar o mecanismo Spark otimizado oferecido pelo Databricks com o recurso programático do Airflow.

Requisitos

  • A integração entre o Airflow e o Databricks requer o Airflow versão 2.5.0 e posterior. Os exemplos neste artigo foram testados com o Airflow versão 2.6.1.

  • O Airflow requer Python 3.8, 3.9, 3.10 ou 3.11. Os exemplos neste artigo são testados com Python 3.8.

  • As instruções neste artigo para instalação e execução do Airflow requerem pipenv para criar um ambiente virtual Python.

Operadores de fluxo de ar para Databricks

Um Airflow DAG é composto por tarefas, onde cada tarefa executa um Airflow Operator. Os operadores do Airflow que suportam a integração com Databricks são implementados no provedor Databricks.

O provedor Databricks inclui operadores para executar várias tarefas em um Databricks workspace, incluindo a importação de dados para uma tabela, a execução de consultas SQL e o trabalho com pastas Git do Databricks.

O provedor Databricks implementa dois operadores para disparar Job:

Para criar um novo Job do Databricks ou Reset um Job existente, o provedor do Databricks implementa o DatabricksCreateJobsOperator. O DatabricksCreateJobsOperator usa o JobPOST /api/2.1/ /create e POST /api/2.1/Job/Reset Solicitações de API. Você pode usar o DatabricksCreateJobsOperator com o DatabricksRunNowOperator para criar e executar um Job.

Observação

A utilização dos operadores Databricks para desencadear um Job requer o fornecimento de credenciais na configuração da ligação Databricks. Consulte Criar um access token pessoal do Databricks para o Airflow.

Os operadores do Databricks Airflow gravam a URL da página de execução Job nos logs do Airflow a cada polling_period_seconds (o default é 30 segundos). Para obter mais informações, consulte a página do pacote apache-airflow-providers-databricks no site do Airflow.

Instale a integração do Airflow Databricks localmente

Para instalar o Airflow e o provedor Databricks localmente para teste e desenvolvimento, use os seguintes passos. Para outras opções de instalação do Airflow, incluindo a criação de uma instalação de produção, consulte instalação na documentação do Airflow.

Abra um terminal e execute o seguinte comando:

mkdir airflow
cd airflow
pipenv --python 3.8
pipenv shell
export AIRFLOW_HOME=$(pwd)
pipenv install apache-airflow
pipenv install apache-airflow-providers-databricks
mkdir dags
airflow db init
airflow users create --username admin --firstname <firstname> --lastname <lastname> --role Admin --email <email>

Substitua <firstname>, <lastname> e <email> pelo seu nome de usuário e email. Você será solicitado a inserir uma senha para o usuário administrador. Certifique-se de salvar essa senha, pois ela é necessária para log in na IU do Airflow.

Este script executa os seguintes passos:

  1. Cria um diretório chamado airflow e muda para esse diretório.

  2. Usa pipenv para criar e gerar um ambiente virtual Python. A Databricks recomenda a utilização de um ambiente virtual Python para isolar versões de pacotes e dependências de código para esse ambiente. Esse isolamento ajuda a reduzir incompatibilidades inesperadas de versões de pacotes e colisões de dependência de código.

  3. Inicializa uma variável de ambiente chamada AIRFLOW_HOME definida como o caminho do diretório airflow .

  4. Instala o Airflow e o pacote do provedor Airflow Databricks.

  5. Cria um diretório airflow/dags . O Airflow usa o diretório dags para armazenar definições de DAG.

  6. Inicializa um banco de dados SQLite que o Airflow usa para rastrear metadados. Em uma implantação de produção do Airflow, você configuraria o Airflow com um banco de dados padrão. O banco de dados SQLite e a configuração default para sua implantação do Airflow são inicializados no diretório airflow.

  7. Cria um usuário administrador para o Airflow.

Dica

Para confirmar a instalação do provedor Databricks, execute o seguinte comando no diretório de instalação do Airflow:

airflow providers list

começar o servidor web e programador Airflow

O servidor da web Airflow é necessário para view a IU do Airflow. Para iniciar o servidor web, abra um terminal no diretório de instalação do Airflow e execute o seguinte comando:

Observação

Se o servidor da web do Airflow não reiniciar devido a um conflito de porta, você poderá alterar a porta default na configuração do Airflow.

pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow webserver

O programador é o componente Airflow que programa DAGs. Para iniciar o programador, abra um novo terminal no diretório de instalação do Airflow e execute o seguinte comando:

pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow scheduler

Teste a instalação do Airflow

Para verificar a instalação do Airflow, você pode executar um dos DAGs de exemplo incluídos no Airflow:

  1. Em uma janela do navegador, abra http://localhost:8080/home. log in na interface do Airflow com o nome de usuário e a senha que você criou ao instalar o Airflow. A página Airflow DAGs é exibida.

  2. Clique no botão pausar/Reiniciar DAG para retomar um dos DAGs de exemplo, por exemplo, o example_python_operator.

  3. Acione o DAG de exemplo clicando no botão Acionar DAG .

  4. Clique no nome do DAG para view os detalhes, incluindo o status de execução do DAG.

Criar access tokens pessoal do Databricks para Airflow

O Airflow se conecta ao Databricks usando um access token pessoal (PAT) do Databricks. Para criar um PAT:

  1. No workspace do Databricks, clique no nome de usuário do Databricks na barra superior e selecione Configurações do usuário na lista suspensa.

  2. Clique em Desenvolvedor.

  3. Ao lado do access token, clique em gerenciar.

  4. Clique em Gerar novos tokens.

  5. (Opcional) Insira um comentário que o ajude a identificar esse token no futuro e altere o tempo de vida padrão do token de 90 dias. Para criar um token sem vida útil (não recomendado), deixe a caixa Duração (dias) vazia (em branco).

  6. Clique em Gerar.

  7. Copie o token exibido em um local seguro e clique em Concluído.

Observação

Certifique-se de salvar os tokens copiados em um local seguro. Não compartilhe seus tokens copiados com outras pessoas. Se você perder os tokens copiados, não poderá regenerar exatamente os mesmos tokens. Em vez disso, você deve repetir este procedimento para criar novos tokens. Se você perder os tokens copiados ou acreditar que os tokens foram comprometidos, o Databricks recomenda fortemente que você exclua imediatamente esses tokens do seu workspace clicando no ícone da lixeira (Revogar) ao lado dos tokens na página access tokens .

Se não for possível criar ou usar tokens no seu workspace, isso pode ocorrer porque o administrador do workspace desativou os tokens ou não lhe deu permissão para criar ou usar tokens. Consulte o administrador do workspace ou o seguinte:

Observação

Como prática recomendada de segurança ao se autenticar com ferramentas, sistemas, scripts e aplicativos automatizados, a Databricks recomenda que você use tokens OAuth.

Se utilizar a autenticação access token pessoal, a Databricks recomenda a utilização access token pessoal pertencente à entidade de serviço em vez de utilizadores workspace . Para criar tokens para entidades de serviço, consulte gerenciar tokens para uma entidade de serviço.

Você também pode autenticar-se no Databricks usando o Databricks OAuth para entidade de serviço. Consulte Conexão do Databricks na documentação do Airflow.

Configurar uma Databricks Connect

Sua instalação do Airflow contém uma conexão default para Databricks. Para atualizar a conexão para se conectar ao seu workspace usando os access tokens pessoal que você criou acima:

  1. Em uma janela do navegador, abra http://localhost:8080/connection/list/. Se for solicitado a fazer login, digite seu nome de usuário e senha de administrador.

  2. Em Conn ID, localize databricks_default e clique no botão Editar registro .

  3. Substitua o valor no campo Host pelo nome da instância do workspace da implantação do Databricks, por exemplo, https://adb-123456789.cloud.databricks.com.

  4. No campo Senha , insira seu access token pessoal do Databricks.

  5. Clique em Salvar.

Exemplo: Criar um Airflow DAG para executar um Jobdo Databricks

O exemplo a seguir demonstra como criar uma implantação simples do Airflow que é executada em sua máquina local e implantou um DAG de exemplo para acionar a execução no Databricks. Neste exemplo, você irá:

  1. Crie um novo Notebook e adicione o código para imprimir uma saudação com base em um parâmetro configurado.

  2. Crie um Job do Databricks com uma única tarefa que executa o Notebook.

  3. Configure uma conexão Airflow para seu workspace Databricks.

  4. Crie um DAG do Airflow para acionar o Job Notebook. Você define o DAG em um script Python usando DatabricksRunNowOperator.

  5. Use a IU do Airflow para acionar o DAG e view o status da execução.

Criar um Notebook

Este exemplo usa um Notebook contendo duas células:

  • A primeira célula contém um widget de texto de utilidades do Databricks definindo uma variável chamada greeting definida com o valor default world.

  • A segunda célula imprime o valor da variável greeting prefixada por hello.

Para criar o Notebook:

  1. Vá para o seu workspace do Databricks, clique em Novo ícone Novo na barra lateral e selecione Notebook.

  2. Dê um nome ao seu Notebook , como Hello Airflow e certifique-se de que o idioma default esteja definido como Python.

  3. Copie o seguinte código Python e cole-o na primeira célula do Notebook.

    dbutils.widgets.text("greeting", "world", "Greeting")
    greeting = dbutils.widgets.get("greeting")
    
  4. Adicione uma nova célula abaixo da primeira célula e copie e cole o seguinte código Python na nova célula:

    print("hello {}".format(greeting))
    

Criar um Job

  1. Clique Ícone de trabalhos fluxo de trabalho na barra lateral.

  2. Clique Botão Criar Job.

    A aba Tarefas aparece com a caixa de diálogo de criação de tarefas.

    Criar caixa de diálogo da primeira tarefa
  3. Substitua Adicione um nome para o seu Job com o nome do seu Job .

  4. No campo Nome da tarefa , insira um nome para a tarefa, por exemplo, saudação-tarefa.

  5. No menu suspenso Tipo , selecione Notebook.

  6. No menu suspenso Fonte , selecione workspace.

  7. Clique na caixa de texto Caminho e use o navegador de arquivos para localizar o Notebook que você criou, clique no nome Notebook e clique em Confirmar.

  8. Clique em Adicionar em Parâmetros. No campo key , digite greeting. No campo Valor , insira Airflow user.

  9. Clique em Criar tarefa.

No painel DetalhesJob , copie o valor do IDJob . Este valor é necessário para acionar o Job do Airflow.

execução do Job

Para testar seu novo Job na interface de fluxo de trabalho do Databricks, clique em botão de execução Agora no canto superior direito. Quando a execução for concluída, você poderá verificar a saída visualizando os detalhes da execuçãoJob .

Criar um novo DAG do Airflow

Você define um Airflow DAG em um arquivo Python. Para criar um DAG para acionar o exemplo Notebook Job:

  1. Em um editor de texto ou IDE, crie um novo arquivo chamado databricks_dag.py com o seguinte conteúdo:

    from airflow import DAG
    from airflow.providers.databricks.operators.databricks import DatabricksRunNowOperator
    from airflow.utils.dates import days_ago
    
    default_args = {
      'owner': 'airflow'
    }
    
    with DAG('databricks_dag',
      start_date = days_ago(2),
      schedule_interval = None,
      default_args = default_args
      ) as dag:
    
      opr_run_now = DatabricksRunNowOperator(
        task_id = 'run_now',
        databricks_conn_id = 'databricks_default',
        job_id = JOB_ID
      )
    

    Substitua JOB_ID pelo valor do ID Job salvo anteriormente.

  2. Salve o arquivo no diretório airflow/dags . O Airflow lê e instala automaticamente arquivos DAG armazenados em airflow/dags/.

Instale e verifique o DAG no Airflow

Para acionar e verificar o DAG na IU do Airflow:

  1. Em uma janela do navegador, abra http://localhost:8080/home. A tela Airflow DAGs é exibida.

  2. Localize databricks_dag e clique no botão pausar/Retomar DAG para retomar o DAG.

  3. Acione o DAG clicando no botão Trigger DAG .

  4. Clique em uma execução na coluna Execuções para view o status e os detalhes da execução.