Usar SQL do Databricks em um Jobdo Databricks
É possível usar o tipo de tarefa SQL em um Databricks Job, permitindo que o senhor crie, programe, opere e monitore fluxos de trabalho que incluam objetos Databricks SQL como consultas, painéis legados e alertas. Por exemplo, seu fluxo de trabalho pode ingerir dados, preparar os dados, realizar análises usando consultas Databricks SQL e, em seguida, exibir os resultados em um painel legado.
Este artigo fornece um exemplo de fluxo de trabalho que cria um painel de controle legado exibindo métricas para as contribuições do GitHub. Neste exemplo, o senhor irá:
Ingest GitHub use de data a Python script and the GitHub REST API.
Transforme o uso de dados do GitHub em um pipeline Delta Live Tables.
Acione query Databricks SQL realizando análise nos dados preparados.
Exibir a análise em um dashboard legado.
Antes de começar
Você precisa do seguinte para concluir este passo a passo:
access tokenspessoal do GitHub. Esses tokens devem ter a permissão de repo .
Um serverless SQL warehouse ou um pro SQL warehouse. Consulte SQL warehouse types.
Um Secret Scope do Databricks. O Secret Scope é usado para armazenar com segurança os tokens do GitHub. Consulte a passo 1: armazene os tokens do GitHub em um segredo.
passo 1: armazene os tokens do GitHub em segredo
Em vez de codificar credenciais, como os access tokens pessoal do GitHub em um Job, o Databricks recomenda o uso de um Secret Scope para armazenar e gerenciar segredos com segurança. Os seguintes comandos da CLI do Databricks são um exemplo de criação de um Secret Scope e armazenamento dos tokens do GitHub em um segredo nesse escopo:
databricks secrets create-scope <scope-name>
databricks secrets put-secret <scope-name> <token-key> --string-value <token>
Substitua
<scope-name
pelo nome de um Databricks Secret Scope para armazenar os tokens.Substitua
<token-key>
pelo nome de uma key para atribuir aos tokens.Substitua
<token>
pelo valor dos access tokens pessoal do GitHub.
passo 2: Criar um script para buscar dados do GitHub
O script Python a seguir usa a API REST do GitHub para buscar dados em commit e contribuições de um repositório GitHub. Os argumentos de entrada especificam o repositório do GitHub. Os registros são salvos em um local no DBFS especificado por outro argumento de entrada.
Este exemplo usa o DBFS para armazenar o script Python, mas o senhor também pode usar pastas Git do Databricks ou arquivos de espaço de trabalho para armazenar e gerenciar o script.
Salve este script em um local em seu disco local:
import json import requests import sys api_url = "https://api.github.com" def get_commits(owner, repo, token, path): page = 1 request_url = f"{api_url}/repos/{owner}/{repo}/commits" more = True get_response(request_url, f"{path}/commits", token) def get_contributors(owner, repo, token, path): page = 1 request_url = f"{api_url}/repos/{owner}/{repo}/contributors" more = True get_response(request_url, f"{path}/contributors", token) def get_response(request_url, path, token): page = 1 more = True while more: response = requests.get(request_url, params={'page': page}, headers={'Authorization': "token " + token}) if response.text != "[]": write(path + "/records-" + str(page) + ".json", response.text) page += 1 else: more = False def write(filename, contents): dbutils.fs.put(filename, contents) def main(): args = sys.argv[1:] if len(args) < 6: print("Usage: github-api.py owner repo request output-dir secret-scope secret-key") sys.exit(1) owner = sys.argv[1] repo = sys.argv[2] request = sys.argv[3] output_path = sys.argv[4] secret_scope = sys.argv[5] secret_key = sys.argv[6] token = dbutils.secrets.get(scope=secret_scope, key=secret_key) if (request == "commits"): get_commits(owner, repo, token, output_path) elif (request == "contributors"): get_contributors(owner, repo, token, output_path) if __name__ == "__main__": main()
upload do script para o DBFS:
Acesse as páginas de aterrissagem do Databricks e clique Catálogo na barra lateral.
Clique em Procurar DBFS.
No navegador de arquivos DBFS, clique em upload. A caixa de diálogo upload dados para DBFS é exibida.
Digite um caminho no DBFS para armazenar o script, clique em Drop files to upload ou clique para navegar e selecione o script Python.
Clique em Concluído.
passo 3: criar um pipeline Delta Live Tables para processar os dados do GitHub
Nesta seção, você cria um pipeline Delta Live Tables para converter os dados brutos do GitHub em tabelas que podem ser analisadas pela query Databricks SQL. Para criar o pipeline, execute as seguintes passos:
Na barra lateral, clique em Novo e selecione Notebook no menu. A caixa de diálogo Criar Notebook é exibida.
Em Idiomadefault , insira um nome e selecione Python. Você pode deixar clusters definidos com o valor default . O Delta Live Tables Runtime cria clusters antes de executar seu pipeline.
Clique em Criar.
Copie o exemplo de código Python e cole-o em seu novo Notebook. Você pode adicionar o código de exemplo a uma única célula do Notebook ou a várias células.
import dlt from pyspark.sql.functions import * def parse(df): return (df .withColumn("author_date", to_timestamp(col("commit.author.date"))) .withColumn("author_email", col("commit.author.email")) .withColumn("author_name", col("commit.author.name")) .withColumn("comment_count", col("commit.comment_count")) .withColumn("committer_date", to_timestamp(col("commit.committer.date"))) .withColumn("committer_email", col("commit.committer.email")) .withColumn("committer_name", col("commit.committer.name")) .withColumn("message", col("commit.message")) .withColumn("sha", col("commit.tree.sha")) .withColumn("tree_url", col("commit.tree.url")) .withColumn("url", col("commit.url")) .withColumn("verification_payload", col("commit.verification.payload")) .withColumn("verification_reason", col("commit.verification.reason")) .withColumn("verification_signature", col("commit.verification.signature")) .withColumn("verification_verified", col("commit.verification.signature").cast("string")) .drop("commit") ) @dlt.table( comment="Raw GitHub commits" ) def github_commits_raw(): df = spark.read.json(spark.conf.get("commits-path")) return parse(df.select("commit")) @dlt.table( comment="Info on the author of a commit" ) def commits_by_author(): return ( dlt.read("github_commits_raw") .withColumnRenamed("author_date", "date") .withColumnRenamed("author_email", "email") .withColumnRenamed("author_name", "name") .select("sha", "date", "email", "name") ) @dlt.table( comment="GitHub repository contributors" ) def github_contributors_raw(): return( spark.readStream.format("cloudFiles") .option("cloudFiles.format", "json") .load(spark.conf.get("contribs-path")) )
Na barra lateral, clique em fluxo de trabalho, clique na tab Delta Live Tables e clique em Criar pipeline.
Dê um nome ao pipeline, por exemplo,
Transform GitHub data
.No campo Notebook library , insira o caminho para o seu Notebook ou clique em para selecionar o Notebook.
Clique em Adicionar configuração. Na caixa de texto
Key
, digitecommits-path
. Na caixa de textoValue
, insira o caminho DBFS onde os registros do GitHub serão gravados. Pode ser qualquer caminho que você escolher e é o mesmo caminho que você usará ao configurar a primeira tarefa do Python ao criar o fluxo de trabalho.Clique em Adicionar configuração novamente. Na caixa de texto
Key
, digitecontribs-path
. Na caixa de textoValue
, insira o caminho DBFS onde os registros do GitHub serão gravados. Pode ser qualquer caminho que você escolher e é o mesmo caminho que você usará ao configurar a segunda tarefa do Python ao criar o fluxo de trabalho.No campo Destino , insira um banco de dados de destino, por exemplo,
github_tables
. Definir um banco de dados de destino publica os dados de saída no metastore e é necessário para a query downstream que analisa os dados produzidos pelo pipeline.Clique em Salvar.
passo 4: Criar um fluxo de trabalho para ingerir e transformar dados do GitHub
Antes de analisar e visualizar os dados do GitHub com Databricks SQL, você precisa ingerir e preparar os dados. Para criar um fluxo de trabalho para concluir essas tarefas, execute as seguintes passos:
Crie um Job do Databricks e adicione a primeira tarefa
Vá para suas páginas de aterrissagem do Databricks e faça um dos seguintes:
Na barra lateral, clique em fluxo de trabalho e clique .
Na barra lateral, clique em Novo e selecione Job no menu.
Na caixa de diálogo da tarefa que aparece na tab Tarefas , substitua Adicionar um nome para o Job pelo nome Job , por exemplo,
GitHub analysis workflow
.Em Nome da tarefa, insira um nome para a tarefa, por exemplo,
get_commits
.Em Tipo, selecione script Python.
Em Origem, selecione DBFS/S3.
Em Path, insira o caminho para o script em DBFS.
Em Parâmetros, insira os seguintes argumentos para o script Python:
["<owner>","<repo>","commits","<DBFS-output-dir>","<scope-name>","<github-token-key>"]
Substitua
<owner>
pelo nome do proprietário do repositório. Por exemplo, para buscar registros do repositóriogithub.com/databrickslabs/overwatch
, digitedatabrickslabs
.Substitua
<repo>
pelo nome do repositório, por exemplo,overwatch
.Substitua
<DBFS-output-dir>
por um caminho no DBFS para armazenar os registros obtidos do GitHub.Substitua
<scope-name>
pelo nome do Secret Scope que você criou para armazenar os tokens do GitHub.Substitua
<github-token-key>
pelo nome da key que você atribuiu aos tokens do GitHub.
Clique em Salvar tarefa.
Adicionar outra tarefa
Clique abaixo da tarefa que você acabou de criar.
Em Nome da tarefa, insira um nome para a tarefa, por exemplo,
get_contributors
.Em Tipo, selecione o tipo de tarefa de script Python .
Em Origem, selecione DBFS/S3.
Em Path, insira o caminho para o script em DBFS.
Em Parâmetros, insira os seguintes argumentos para o script Python:
["<owner>","<repo>","contributors","<DBFS-output-dir>","<scope-name>","<github-token-key>"]
Substitua
<owner>
pelo nome do proprietário do repositório. Por exemplo, para buscar registros do repositóriogithub.com/databrickslabs/overwatch
, digitedatabrickslabs
.Substitua
<repo>
pelo nome do repositório, por exemplo,overwatch
.Substitua
<DBFS-output-dir>
por um caminho no DBFS para armazenar os registros obtidos do GitHub.Substitua
<scope-name>
pelo nome do Secret Scope que você criou para armazenar os tokens do GitHub.Substitua
<github-token-key>
pelo nome da key que você atribuiu aos tokens do GitHub.
Clique em Salvar tarefa.
Adicione uma tarefa para transformar os dados
Clique abaixo da tarefa que você acabou de criar.
Em Nome da tarefa, insira um nome para a tarefa, por exemplo,
transform_github_data
.Em Type, selecione o pipeline Delta Live Tables e insira um nome para a tarefa.
Em Pipeline, selecione o pipeline criado na passo 3: Crie um pipeline Delta Live Tables para processar os dados do GitHub.
Clique em Criar.
passo 5: a execução dos dados mudou o fluxo de trabalho
Clique para executar o fluxo de trabalho. Para ver os detalhes da execução, clique no link na coluna de horário de início da execução na visualização Job runs . Clique em cada tarefa para view os detalhes da execução da tarefa.
passo 6: (opcional) para visualizar os dados de saída após a conclusão da execução do fluxo de trabalho, execute as seguintes passos:
Na view de detalhes da execução, clique na tarefa Delta Live Tables.
No painel Detalhes da execução da tarefa , clique no nome do pipeline em Pipeline. A página de detalhes do pipeline é exibida.
Selecione a tabela
commits_by_author
no pipeline DAG.Clique no nome da tabela próximo a Metastore no painel commits_by_author . A página Explorador de Catálogo é aberta.
No Catalog Explorer, você pode view o esquema da tabela, dados de amostra e outros detalhes dos dados. Siga os mesmos passos para view os dados da tabela github_contributors_raw
.
passo 7: Remova os dados do GitHub
Em um aplicativo do mundo real, você pode ingerir e processar dados continuamente. Como este exemplo downloads e processa todo o conjunto de dados, você deve remover os dados do GitHub já downloads para evitar um erro ao executar novamente o fluxo de trabalho. Para remover os dados downloads , execute os seguintes passos:
Crie um novo Notebook e insira os seguintes comandos na primeira célula:
dbutils.fs.rm("<commits-path", True) dbutils.fs.rm("<contributors-path", True)
Substitua
<commits-path>
e<contributors-path>
pelos caminhos DBFS que você configurou ao criar as tarefas Python.Clique e selecione a célula de execução.
Você também pode adicionar este Notebook como uma tarefa no fluxo de trabalho.
passo 8: criar a query Databricks SQL
Depois de executar o fluxo de trabalho e criar as tabelas necessárias, crie query para analisar os dados preparados. Para criar a query de exemplo e as visualizações, execute as seguintes passos:
Exibir os 10 principais colaboradores por mês
Clique no ícone abaixo do logotipo do Databricks na barra lateral e selecione SQL.
Clique em Criar uma query para abrir o editor query Databricks SQL .
Certifique-se de que o catálogo esteja definido como hive_metastore. Clique em default ao lado de hive_metastore e defina o banco de dados com o valor Target definido no pipeline Delta Live Tables.
Na tab Novaquery , insira a seguinte query:
SELECT date_part('YEAR', date) AS year, date_part('MONTH', date) AS month, name, count(1) FROM commits_by_author WHERE name IN ( SELECT name FROM commits_by_author GROUP BY name ORDER BY count(name) DESC LIMIT 10 ) AND date_part('YEAR', date) >= 2022 GROUP BY name, year, month ORDER BY year, month, name
Clique na tab Nova query e renomeie a query , por exemplo,
Commits by month top 10 contributors
.Por default, os resultados são exibidos como uma tabela. Para alterar como os dados são visualizados, por exemplo, usando um gráfico de barras, no painel Resultados , clique em e clique em Editar.
Em Tipo de visualização, selecione Barra.
Na coluna X, selecione o mês.
Em Y colunas, selecione count(1).
Em Agrupar por, selecione o nome.
Clique em Salvar.
Exibir os 20 principais contribuidores
Clique em + > Criar nova query e verifique se o catálogo está definido como hive_metastore. Clique em default ao lado de hive_metastore e defina o banco de dados com o valor Target definido no pipeline Delta Live Tables.
Digite a seguinte query:
SELECT login, cast(contributions AS INTEGER) FROM github_contributors_raw ORDER BY contributions DESC LIMIT 20
Clique na tab Nova query e renomeie a query , por exemplo,
Top 20 contributors
.Para alterar a visualização da tabela default , no painel Resultados , clique em e clique em Editar.
Em Tipo de visualização, selecione Barra.
Na coluna X, selecione login.
Nas colunas Y, selecione as contribuições.
Clique em Salvar.
Exibir o total de commits por autor
Clique em + > Criar nova query e verifique se o catálogo está definido como hive_metastore. Clique em default ao lado de hive_metastore e defina o banco de dados com o valor Target definido no pipeline Delta Live Tables.
Digite a seguinte query:
SELECT name, count(1) commits FROM commits_by_author GROUP BY name ORDER BY commits DESC LIMIT 10
Clique na tab Nova query e renomeie a query , por exemplo,
Total commits by author
.Para alterar a visualização da tabela default , no painel Resultados , clique em e clique em Editar.
Em Tipo de visualização, selecione Barra.
Na coluna X, selecione o nome.
Nas colunas Y, selecione commit.
Clique em Salvar.
passo 9: criar um painel
Na barra lateral, clique em Painéis
Clique em Criar painel.
Insira um nome para o painel, por exemplo,
GitHub analysis
.Para cada query e visualização criada na passo 8: Crie a query Databricks SQL, clique em Adicionar > Visualização e selecione cada visualização.
passo 10: Adicionar as tarefas SQL ao fluxo de trabalho
Para adicionar as novas tarefas query ao fluxo de trabalho que você criou em Criar um Job do Databricks e adicionar a primeira tarefa, para cada query criada na passo 8: Crie a query Databricks SQL:
Clique fluxo de trabalho na barra lateral.
Na coluna Nome , clique no nome Job .
Clique na tab tarefa .
Clique abaixo da última tarefa.
Insira um nome para a tarefa, em Tipo selecione SQL e em Tarefa SQL selecione query.
Selecione a query na querySQL.
No SQL warehouse, selecione um armazém SQL serverless ou um armazém pro SQL para executar a tarefa.
Clique em Criar.
passo 11: Adicionar uma tarefa de painel
Clique abaixo da última tarefa.
Digite um nome para a tarefa, em Type (Tipo), selecione SQLe, em SQL tarefa, selecione Legacy dashboard (Painel legado).
Selecione o painel criado na passo 9: Criar um painel.
No SQL warehouse, selecione um armazém SQL serverless ou um armazém pro SQL para executar a tarefa.
Clique em Criar.
passo 12: execução do fluxo de trabalho completo
Para executar o fluxo de trabalho, clique em . Para ver os detalhes da execução, clique no link na coluna de horário de início da execução na visualização Job runs .