Usar SQL do Databricks em um Jobdo Databricks

É possível usar o tipo de tarefa SQL em um Databricks Job, permitindo que o senhor crie, programe, opere e monitore fluxos de trabalho que incluam objetos Databricks SQL como consultas, painéis legados e alertas. Por exemplo, seu fluxo de trabalho pode ingerir dados, preparar os dados, realizar análises usando consultas Databricks SQL e, em seguida, exibir os resultados em um painel legado.

Este artigo fornece um exemplo de fluxo de trabalho que cria um painel de controle legado exibindo métricas para as contribuições do GitHub. Neste exemplo, o senhor irá:

  • Ingest GitHub use de data a Python script and the GitHub REST API.

  • Transforme o uso de dados do GitHub em um pipeline Delta Live Tables.

  • Acione query Databricks SQL realizando análise nos dados preparados.

  • Exibir a análise em um dashboard legado.

Painel de análise do GitHub

Antes de começar

Você precisa do seguinte para concluir este passo a passo:

passo 1: armazene os tokens do GitHub em segredo

Em vez de codificar credenciais, como os access tokens pessoal do GitHub em um Job, o Databricks recomenda o uso de um Secret Scope para armazenar e gerenciar segredos com segurança. Os seguintes comandos da CLI do Databricks são um exemplo de criação de um Secret Scope e armazenamento dos tokens do GitHub em um segredo nesse escopo:

databricks secrets create-scope <scope-name>
databricks secrets put-secret <scope-name> <token-key> --string-value <token>
  • Substitua <scope-name pelo nome de um Databricks Secret Scope para armazenar os tokens.

  • Substitua <token-key> pelo nome de uma key para atribuir aos tokens.

  • Substitua <token> pelo valor dos access tokens pessoal do GitHub.

passo 2: Criar um script para buscar dados do GitHub

O script Python a seguir usa a API REST do GitHub para buscar dados em commit e contribuições de um repositório GitHub. Os argumentos de entrada especificam o repositório do GitHub. Os registros são salvos em um local no DBFS especificado por outro argumento de entrada.

Este exemplo usa o DBFS para armazenar o script Python, mas o senhor também pode usar pastas Git do Databricks ou arquivos de espaço de trabalho para armazenar e gerenciar o script.

  • Salve este script em um local em seu disco local:

    import json
    import requests
    import sys
    
    api_url = "https://api.github.com"
    
    def get_commits(owner, repo, token, path):
      page = 1
      request_url =  f"{api_url}/repos/{owner}/{repo}/commits"
      more = True
    
      get_response(request_url, f"{path}/commits", token)
    
    
    def get_contributors(owner, repo, token, path):
      page = 1
      request_url =  f"{api_url}/repos/{owner}/{repo}/contributors"
      more = True
    
      get_response(request_url, f"{path}/contributors", token)
    
    
    def get_response(request_url, path, token):
      page = 1
      more = True
    
      while more:
        response = requests.get(request_url, params={'page': page}, headers={'Authorization': "token " + token})
        if response.text != "[]":
          write(path + "/records-" + str(page) + ".json", response.text)
          page += 1
        else:
          more = False
    
    
    def write(filename, contents):
      dbutils.fs.put(filename, contents)
    
    
    def main():
      args = sys.argv[1:]
      if len(args) < 6:
        print("Usage: github-api.py owner repo request output-dir secret-scope secret-key")
        sys.exit(1)
    
      owner = sys.argv[1]
      repo = sys.argv[2]
      request = sys.argv[3]
      output_path = sys.argv[4]
      secret_scope = sys.argv[5]
      secret_key = sys.argv[6]
    
      token = dbutils.secrets.get(scope=secret_scope, key=secret_key)
    
      if (request == "commits"):
        get_commits(owner, repo, token, output_path)
      elif (request == "contributors"):
        get_contributors(owner, repo, token, output_path)
    
    
    if __name__ == "__main__":
        main()
    
  • upload do script para o DBFS:

    1. Acesse as páginas de aterrissagem do Databricks e clique Ícone de catálogo Catálogo na barra lateral.

    2. Clique em Procurar DBFS.

    3. No navegador de arquivos DBFS, clique em upload. A caixa de diálogo upload dados para DBFS é exibida.

    4. Digite um caminho no DBFS para armazenar o script, clique em Drop files to upload ou clique para navegar e selecione o script Python.

    5. Clique em Concluído.

passo 3: criar um pipeline Delta Live Tables para processar os dados do GitHub

Nesta seção, você cria um pipeline Delta Live Tables para converter os dados brutos do GitHub em tabelas que podem ser analisadas pela query Databricks SQL. Para criar o pipeline, execute as seguintes passos:

  1. Na barra lateral, clique em Novo ícone Novo e selecione Notebook no menu. A caixa de diálogo Criar Notebook é exibida.

  2. Em Idiomadefault , insira um nome e selecione Python. Você pode deixar clusters definidos com o valor default . O Delta Live Tables Runtime cria clusters antes de executar seu pipeline.

  3. Clique em Criar.

  4. Copie o exemplo de código Python e cole-o em seu novo Notebook. Você pode adicionar o código de exemplo a uma única célula do Notebook ou a várias células.

    import dlt
    from pyspark.sql.functions import *
    
    def parse(df):
       return (df
         .withColumn("author_date", to_timestamp(col("commit.author.date")))
         .withColumn("author_email", col("commit.author.email"))
         .withColumn("author_name", col("commit.author.name"))
         .withColumn("comment_count", col("commit.comment_count"))
         .withColumn("committer_date", to_timestamp(col("commit.committer.date")))
         .withColumn("committer_email", col("commit.committer.email"))
         .withColumn("committer_name", col("commit.committer.name"))
         .withColumn("message", col("commit.message"))
         .withColumn("sha", col("commit.tree.sha"))
         .withColumn("tree_url", col("commit.tree.url"))
         .withColumn("url", col("commit.url"))
         .withColumn("verification_payload", col("commit.verification.payload"))
         .withColumn("verification_reason", col("commit.verification.reason"))
         .withColumn("verification_signature", col("commit.verification.signature"))
         .withColumn("verification_verified", col("commit.verification.signature").cast("string"))
         .drop("commit")
       )
    
    @dlt.table(
       comment="Raw GitHub commits"
    )
    def github_commits_raw():
      df = spark.read.json(spark.conf.get("commits-path"))
      return parse(df.select("commit"))
    
    @dlt.table(
      comment="Info on the author of a commit"
    )
    def commits_by_author():
      return (
        dlt.read("github_commits_raw")
          .withColumnRenamed("author_date", "date")
          .withColumnRenamed("author_email", "email")
          .withColumnRenamed("author_name", "name")
          .select("sha", "date", "email", "name")
      )
    
    @dlt.table(
      comment="GitHub repository contributors"
    )
    def github_contributors_raw():
      return(
        spark.readStream.format("cloudFiles")
          .option("cloudFiles.format", "json")
          .load(spark.conf.get("contribs-path"))
      )
    
  5. Na barra lateral, clique Ícone de trabalhos em fluxo de trabalho, clique na tab Delta Live Tables e clique em Criar pipeline.

  6. Dê um nome ao pipeline, por exemplo, Transform GitHub data.

  7. No campo Notebook library , insira o caminho para o seu Notebook ou clique em Ícone do Seletor de Arquivos para selecionar o Notebook.

  8. Clique em Adicionar configuração. Na caixa de texto Key , digite commits-path. Na caixa de texto Value , insira o caminho DBFS onde os registros do GitHub serão gravados. Pode ser qualquer caminho que você escolher e é o mesmo caminho que você usará ao configurar a primeira tarefa do Python ao criar o fluxo de trabalho.

  9. Clique em Adicionar configuração novamente. Na caixa de texto Key , digite contribs-path. Na caixa de texto Value , insira o caminho DBFS onde os registros do GitHub serão gravados. Pode ser qualquer caminho que você escolher e é o mesmo caminho que você usará ao configurar a segunda tarefa do Python ao criar o fluxo de trabalho.

  10. No campo Destino , insira um banco de dados de destino, por exemplo, github_tables. Definir um banco de dados de destino publica os dados de saída no metastore e é necessário para a query downstream que analisa os dados produzidos pelo pipeline.

  11. Clique em Salvar.

passo 4: Criar um fluxo de trabalho para ingerir e transformar dados do GitHub

Antes de analisar e visualizar os dados do GitHub com Databricks SQL, você precisa ingerir e preparar os dados. Para criar um fluxo de trabalho para concluir essas tarefas, execute as seguintes passos:

Crie um Job do Databricks e adicione a primeira tarefa

  1. Vá para suas páginas de aterrissagem do Databricks e faça um dos seguintes:

    • Na barra lateral, clique em Ícone de trabalhos fluxo de trabalho e clique Botão Criar Job.

    • Na barra lateral, clique em Novo ícone Novo e selecione Job no menu.

  2. Na caixa de diálogo da tarefa que aparece na tab Tarefas , substitua Adicionar um nome para o Job pelo nome Job , por exemplo, GitHub analysis workflow.

  3. Em Nome da tarefa, insira um nome para a tarefa, por exemplo, get_commits.

  4. Em Tipo, selecione script Python.

  5. Em Origem, selecione DBFS/S3.

  6. Em Path, insira o caminho para o script em DBFS.

  7. Em Parâmetros, insira os seguintes argumentos para o script Python:

    ["<owner>","<repo>","commits","<DBFS-output-dir>","<scope-name>","<github-token-key>"]

    • Substitua <owner> pelo nome do proprietário do repositório. Por exemplo, para buscar registros do repositório github.com/databrickslabs/overwatch , digite databrickslabs.

    • Substitua <repo> pelo nome do repositório, por exemplo, overwatch.

    • Substitua <DBFS-output-dir> por um caminho no DBFS para armazenar os registros obtidos do GitHub.

    • Substitua <scope-name> pelo nome do Secret Scope que você criou para armazenar os tokens do GitHub.

    • Substitua <github-token-key> pelo nome da key que você atribuiu aos tokens do GitHub.

  8. Clique em Salvar tarefa.

Adicionar outra tarefa

  1. Clique Botão Adicionar tarefa abaixo da tarefa que você acabou de criar.

  2. Em Nome da tarefa, insira um nome para a tarefa, por exemplo, get_contributors.

  3. Em Tipo, selecione o tipo de tarefa de script Python .

  4. Em Origem, selecione DBFS/S3.

  5. Em Path, insira o caminho para o script em DBFS.

  6. Em Parâmetros, insira os seguintes argumentos para o script Python:

    ["<owner>","<repo>","contributors","<DBFS-output-dir>","<scope-name>","<github-token-key>"]

    • Substitua <owner> pelo nome do proprietário do repositório. Por exemplo, para buscar registros do repositório github.com/databrickslabs/overwatch , digite databrickslabs.

    • Substitua <repo> pelo nome do repositório, por exemplo, overwatch.

    • Substitua <DBFS-output-dir> por um caminho no DBFS para armazenar os registros obtidos do GitHub.

    • Substitua <scope-name> pelo nome do Secret Scope que você criou para armazenar os tokens do GitHub.

    • Substitua <github-token-key> pelo nome da key que você atribuiu aos tokens do GitHub.

  7. Clique em Salvar tarefa.

Adicione uma tarefa para transformar os dados

  1. Clique Botão Adicionar tarefa abaixo da tarefa que você acabou de criar.

  2. Em Nome da tarefa, insira um nome para a tarefa, por exemplo, transform_github_data.

  3. Em Type, selecione o pipeline Delta Live Tables e insira um nome para a tarefa.

  4. Em Pipeline, selecione o pipeline criado na passo 3: Crie um pipeline Delta Live Tables para processar os dados do GitHub.

  5. Clique em Criar.

passo 5: a execução dos dados mudou o fluxo de trabalho

Clique botão de execução Agora para executar o fluxo de trabalho. Para ver os detalhes da execução, clique no link na coluna de horário de início da execução na visualização Job runs . Clique em cada tarefa para view os detalhes da execução da tarefa.

passo 6: (opcional) para visualizar os dados de saída após a conclusão da execução do fluxo de trabalho, execute as seguintes passos:

  1. Na view de detalhes da execução, clique na tarefa Delta Live Tables.

  2. No painel Detalhes da execução da tarefa , clique no nome do pipeline em Pipeline. A página de detalhes do pipeline é exibida.

  3. Selecione a tabela commits_by_author no pipeline DAG.

  4. Clique no nome da tabela próximo a Metastore no painel commits_by_author . A página Explorador de Catálogo é aberta.

No Catalog Explorer, você pode view o esquema da tabela, dados de amostra e outros detalhes dos dados. Siga os mesmos passos para view os dados da tabela github_contributors_raw.

passo 7: Remova os dados do GitHub

Em um aplicativo do mundo real, você pode ingerir e processar dados continuamente. Como este exemplo downloads e processa todo o conjunto de dados, você deve remover os dados do GitHub já downloads para evitar um erro ao executar novamente o fluxo de trabalho. Para remover os dados downloads , execute os seguintes passos:

  1. Crie um novo Notebook e insira os seguintes comandos na primeira célula:

    dbutils.fs.rm("<commits-path", True)
    dbutils.fs.rm("<contributors-path", True)
    

    Substitua <commits-path> e <contributors-path> pelos caminhos DBFS que você configurou ao criar as tarefas Python.

  2. Clique menu de execução e selecione a célula de execução.

Você também pode adicionar este Notebook como uma tarefa no fluxo de trabalho.

passo 8: criar a query Databricks SQL

Depois de executar o fluxo de trabalho e criar as tabelas necessárias, crie query para analisar os dados preparados. Para criar a query de exemplo e as visualizações, execute as seguintes passos:

Exibir os 10 principais colaboradores por mês

  1. Clique no ícone abaixo do logotipo do Databricks Logotipo Databricks na barra lateral e selecione SQL.

  2. Clique em Criar uma query para abrir o editor query Databricks SQL .

  3. Certifique-se de que o catálogo esteja definido como hive_metastore. Clique em default ao lado de hive_metastore e defina o banco de dados com o valor Target definido no pipeline Delta Live Tables.

  4. Na tab Novaquery , insira a seguinte query:

    SELECT
      date_part('YEAR', date) AS year,
      date_part('MONTH', date) AS month,
      name,
      count(1)
    FROM
      commits_by_author
    WHERE
      name IN (
        SELECT
          name
        FROM
          commits_by_author
        GROUP BY
          name
        ORDER BY
          count(name) DESC
        LIMIT 10
      )
      AND
        date_part('YEAR', date) >= 2022
    GROUP BY
      name, year, month
    ORDER BY
      year, month, name
    
  5. Clique na tab Nova query e renomeie a query , por exemplo, Commits by month top 10 contributors.

  6. Por default, os resultados são exibidos como uma tabela. Para alterar como os dados são visualizados, por exemplo, usando um gráfico de barras, no painel Resultados , clique em Trabalhos Reticências Verticais e clique em Editar.

  7. Em Tipo de visualização, selecione Barra.

  8. Na coluna X, selecione o mês.

  9. Em Y colunas, selecione count(1).

  10. Em Agrupar por, selecione o nome.

  11. Clique em Salvar.

Exibir os 20 principais contribuidores

  1. Clique em + > Criar nova query e verifique se o catálogo está definido como hive_metastore. Clique em default ao lado de hive_metastore e defina o banco de dados com o valor Target definido no pipeline Delta Live Tables.

  2. Digite a seguinte query:

    SELECT
      login,
      cast(contributions AS INTEGER)
    FROM
      github_contributors_raw
    ORDER BY
      contributions DESC
    LIMIT 20
    
  3. Clique na tab Nova query e renomeie a query , por exemplo, Top 20 contributors.

  4. Para alterar a visualização da tabela default , no painel Resultados , clique em Trabalhos Reticências Verticais e clique em Editar.

  5. Em Tipo de visualização, selecione Barra.

  6. Na coluna X, selecione login.

  7. Nas colunas Y, selecione as contribuições.

  8. Clique em Salvar.

Exibir o total de commits por autor

  1. Clique em + > Criar nova query e verifique se o catálogo está definido como hive_metastore. Clique em default ao lado de hive_metastore e defina o banco de dados com o valor Target definido no pipeline Delta Live Tables.

  2. Digite a seguinte query:

    SELECT
      name,
      count(1) commits
    FROM
      commits_by_author
    GROUP BY
      name
    ORDER BY
      commits DESC
    LIMIT 10
    
  3. Clique na tab Nova query e renomeie a query , por exemplo, Total commits by author.

  4. Para alterar a visualização da tabela default , no painel Resultados , clique em Trabalhos Reticências Verticais e clique em Editar.

  5. Em Tipo de visualização, selecione Barra.

  6. Na coluna X, selecione o nome.

  7. Nas colunas Y, selecione commit.

  8. Clique em Salvar.

passo 9: criar um painel

  1. Na barra lateral, clique em Ícone de painéis Painéis

  2. Clique em Criar painel.

  3. Insira um nome para o painel, por exemplo, GitHub analysis.

  4. Para cada query e visualização criada na passo 8: Crie a query Databricks SQL, clique em Adicionar > Visualização e selecione cada visualização.

passo 10: Adicionar as tarefas SQL ao fluxo de trabalho

Para adicionar as novas tarefas query ao fluxo de trabalho que você criou em Criar um Job do Databricks e adicionar a primeira tarefa, para cada query criada na passo 8: Crie a query Databricks SQL:

  1. Clique Ícone de trabalhos fluxo de trabalho na barra lateral.

  2. Na coluna Nome , clique no nome Job .

  3. Clique na tab tarefa .

  4. Clique Botão Adicionar tarefa abaixo da última tarefa.

  5. Insira um nome para a tarefa, em Tipo selecione SQL e em Tarefa SQL selecione query.

  6. Selecione a query na querySQL.

  7. No SQL warehouse, selecione um armazém SQL serverless ou um armazém pro SQL para executar a tarefa.

  8. Clique em Criar.

passo 11: Adicionar uma tarefa de painel

  1. Clique Botão Adicionar tarefa abaixo da última tarefa.

  2. Digite um nome para a tarefa, em Type (Tipo), selecione SQLe, em SQL tarefa, selecione Legacy dashboard (Painel legado).

  3. Selecione o painel criado na passo 9: Criar um painel.

  4. No SQL warehouse, selecione um armazém SQL serverless ou um armazém pro SQL para executar a tarefa.

  5. Clique em Criar.

passo 12: execução do fluxo de trabalho completo

Para executar o fluxo de trabalho, clique em botão de execução Agora. Para ver os detalhes da execução, clique no link na coluna de horário de início da execução na visualização Job runs .

passo 13: Veja os resultados

Para view os resultados quando a execução for concluída, clique na tarefa final do painel e clique no nome do painel em Painel SQL no painel direito.