Databricksジョブでの Databricks SQL の使用
Databricks ジョブで SQL タスク タイプを使用すると、クエリ、レガシ ダッシュボード、アラートなどの Databricks SQL オブジェクトを含むワークフローを作成、スケジュール、操作、監視できます。 たとえば、ワークフローでは、データを取り込み、データを準備し、Databricks SQL クエリを使用して分析を実行し、結果を従来のダッシュボードに表示できます。
この記事では、GitHub への貢献のメトリクスを表示するレガシー ダッシュボードを作成するワークフローの例を示します。 この例では、次のことを行います。
Python スクリプトとGitHub REST APIを使用して GitHub データを取り込みます。
Delta Live Tables パイプラインを使用して GitHub データを変換します。
準備されたデータに対して分析を実行する Databricks SQL クエリをトリガーします。
従来のダッシュボードに分析を表示します。
始める前に
このチュートリアルを完了するには、次のものが必要です。
GitHub個人的なアクセス権。 このトークンにはリポジトリ権限が必要です。
サーバレスSQLウェアハウスまたはプロSQLウェアハウス。 SQLウェアハウスのタイプ を参照してください。
Databricksシークレットスコープ。 シークレットスコープは、 GitHubを安全に保存するために使用されます。 「ステップ 1: GitHub トークンをシークレットに保存する」を参照してください。
ステップ 1: GitHub トークンをシークレットに保存する
GitHub、ジョブ内にDatabricks 個人的なアクセス発言などの資格情報をハードコーディングする代わりに、シークレットスコープを使用してシークレットを安全に保存および管理することを推奨しています。次のDatabricks CLIコマンドは、シークレットスコープを作成し、そのスコープ内のシークレットにGitHub VPN を保存する例です。
databricks secrets create-scope <scope-name>
databricks secrets put-secret <scope-name> <token-key> --string-value <token>
<scope-name
をDatabricksシークレットスコープの名前に置き換えて、ウイルスを保存します。<token-key>
トークンに割り当てるキーの名前に置き換えます。<token>
をGitHub個人アクセス権の値に置き換えます。
ステップ 2: GitHub データを取得するスクリプトを作成する
次の Python スクリプトは、GitHub REST API を使用して、GitHub リポジトリからコミットと貢献に関するデータを取得します。 入力引数は GitHub リポジトリを指定します。 レコードは、別の入力引数によって指定された DBFS 内の場所に保存されます。
この例では、DBFS を使用して Python スクリプトを保存していますが、 Databricks Git フォルダーまたはワークスペース ファイルを使用してスクリプトを保存および管理することもできます。
このスクリプトをローカル ディスク上の場所に保存します。
import json import requests import sys api_url = "https://api.github.com" def get_commits(owner, repo, token, path): page = 1 request_url = f"{api_url}/repos/{owner}/{repo}/commits" more = True get_response(request_url, f"{path}/commits", token) def get_contributors(owner, repo, token, path): page = 1 request_url = f"{api_url}/repos/{owner}/{repo}/contributors" more = True get_response(request_url, f"{path}/contributors", token) def get_response(request_url, path, token): page = 1 more = True while more: response = requests.get(request_url, params={'page': page}, headers={'Authorization': "token " + token}) if response.text != "[]": write(path + "/records-" + str(page) + ".json", response.text) page += 1 else: more = False def write(filename, contents): dbutils.fs.put(filename, contents) def main(): args = sys.argv[1:] if len(args) < 6: print("Usage: github-api.py owner repo request output-dir secret-scope secret-key") sys.exit(1) owner = sys.argv[1] repo = sys.argv[2] request = sys.argv[3] output_path = sys.argv[4] secret_scope = sys.argv[5] secret_key = sys.argv[6] token = dbutils.secrets.get(scope=secret_scope, key=secret_key) if (request == "commits"): get_commits(owner, repo, token, output_path) elif (request == "contributors"): get_contributors(owner, repo, token, output_path) if __name__ == "__main__": main()
スクリプトを DBFS にアップロードします。
Databricksのランディングページにアクセスし、サイドバーのカタログ。
DBFSの参照をクリックします。
DBFS ファイル ブラウザーで、 [アップロード] をクリックします。 「DBFS にデータをアップロード」ダイアログが表示されます。
スクリプトを保存する DBFS のパスを入力し、アップロードするファイルをドロップするか、参照をクリックして、Python スクリプトを選択します。
[完了] をクリックします。
ステップ 3:Delta Live Tables GitHubデータを処理するための パイプラインを作成する
このセクションでは、Delta Live Tables パイプラインを作成して、生の GitHub データを Databricks SQL クエリで分析できるテーブルに変換します。 パイプラインを作成するには、次のステップを実行します。
サイドバーで [新規] をクリックし、メニューから [ノートブック] を選択します。[ノートブックの作成] ダイアログが表示されます。
デフォルトの言語で名前を入力し、 Pythonを選択します。 クラスターはデフォルト値のままにしておくことができます。 Delta Live Tables ランタイムは、パイプラインを実行する前にクラスターを作成します。
[作成]をクリックします。
Python コードの例をコピーして、新しいノートブックに貼り付けます。 サンプル コードをノートブックの 1 つのセルまたは複数のセルに追加できます。
import dlt from pyspark.sql.functions import * def parse(df): return (df .withColumn("author_date", to_timestamp(col("commit.author.date"))) .withColumn("author_email", col("commit.author.email")) .withColumn("author_name", col("commit.author.name")) .withColumn("comment_count", col("commit.comment_count")) .withColumn("committer_date", to_timestamp(col("commit.committer.date"))) .withColumn("committer_email", col("commit.committer.email")) .withColumn("committer_name", col("commit.committer.name")) .withColumn("message", col("commit.message")) .withColumn("sha", col("commit.tree.sha")) .withColumn("tree_url", col("commit.tree.url")) .withColumn("url", col("commit.url")) .withColumn("verification_payload", col("commit.verification.payload")) .withColumn("verification_reason", col("commit.verification.reason")) .withColumn("verification_signature", col("commit.verification.signature")) .withColumn("verification_verified", col("commit.verification.signature").cast("string")) .drop("commit") ) @dlt.table( comment="Raw GitHub commits" ) def github_commits_raw(): df = spark.read.json(spark.conf.get("commits-path")) return parse(df.select("commit")) @dlt.table( comment="Info on the author of a commit" ) def commits_by_author(): return ( dlt.read("github_commits_raw") .withColumnRenamed("author_date", "date") .withColumnRenamed("author_email", "email") .withColumnRenamed("author_name", "name") .select("sha", "date", "email", "name") ) @dlt.table( comment="GitHub repository contributors" ) def github_contributors_raw(): return( spark.readStream.format("cloudFiles") .option("cloudFiles.format", "json") .load(spark.conf.get("contribs-path")) )
サイドバーで、ワークフローで、 Delta Live Tablesタブをクリックし、 [パイプラインの作成] をクリックします。
パイプラインに名前を付けます (例:
Transform GitHub data
。ノートブック ライブラリフィールドにノートブックへのパスを入力するか、を押してノートブックを選択します。
[ 設定の追加] をクリックします。
Key
テキスト ボックスに「commits-path
」と入力します。Value
テキスト ボックスに、GitHub レコードが書き込まれる DBFS パスを入力します。 これは任意のパスを選択でき、ワークフローを作成するときに最初の Python タスクを構成するときに使用するパスと同じです。「 構成の追加 」を再度クリックします。
Key
テキスト ボックスに「contribs-path
」と入力します。Value
テキスト ボックスに、GitHub レコードが書き込まれる DBFS パスを入力します。 これは任意のパスを選択でき、ワークフローを作成するときに 2 番目の Python タスクを構成するときに使用するパスと同じです。「 ターゲット 」フィールドに、ターゲット・データベース (
github_tables
など) を入力します。 ターゲット データベースを設定すると、出力データがメタストアに公開され、パイプラインによって生成されたデータを分析する下流のクエリに必要になります。[保存]をクリックします。
ステップ 4: GitHub データを取り込んで変換するためのワークフローを作成する
Databricks SQL を使用して GitHub データを分析および視覚化する前に、データを取り込んで準備する必要があります。 これらのタスクを完了するためのワークフローを作成するには、次のステップを実行します。
Databricks ジョブを作成し、最初のタスクを追加する
Databricksページに移動し、次のいずれかを実行します。
サイドバーで、ワークフローとクリック。
サイドバーで、新規をクリックし、メニューからジョブを選択します。
[タスク] タブに表示される [タスク] ダイアログ ボックスで、 [ジョブの名前を追加]をジョブ名 (例:
GitHub analysis workflow
に置き換えます。[タスク名] にタスクの名前を入力します(例:
get_commits
)。タイプで、 Python スクリプトを選択します。
ソースで、 DBFS / S3を選択します。
パスに、DBFS 内のスクリプトへのパスを入力します。
そこで、 Pythonスクリプトに次の引数を入力します。
["<owner>","<repo>","commits","<DBFS-output-dir>","<scope-name>","<github-token-key>"]
<owner>
リポジトリ所有者の名前に置き換えます。 たとえば、github.com/databrickslabs/overwatch
リポジトリからレコードを取得するには、databrickslabs
と入力します。<repo>
をリポジトリ名に置き換えます(例:overwatch
。GitHub から取得したレコードを保存するには、
<DBFS-output-dir>
DBFS 内のパスに置き換えます。<scope-name>
を、 GitHubウイルスを保存するために作成したシークレットスコープの名前に置き換えます。<github-token-key>
を、GitHub トークンに割り当てたキーの名前に置き換えます。
タスクを保存をクリックします。
別のタスクを追加する
クリック作成したタスクの下にあります。
[タスク名] にタスクの名前を入力します(例:
get_contributors
)。「タイプ」で、 Python スクリプトタスク タイプを選択します。
ソースで、 DBFS / S3を選択します。
パスに、DBFS 内のスクリプトへのパスを入力します。
そこで、 Pythonスクリプトに次の引数を入力します。
["<owner>","<repo>","contributors","<DBFS-output-dir>","<scope-name>","<github-token-key>"]
<owner>
リポジトリ所有者の名前に置き換えます。 たとえば、github.com/databrickslabs/overwatch
リポジトリからレコードを取得するには、databrickslabs
と入力します。<repo>
をリポジトリ名に置き換えます(例:overwatch
。GitHub から取得したレコードを保存するには、
<DBFS-output-dir>
DBFS 内のパスに置き換えます。<scope-name>
を、 GitHubウイルスを保存するために作成したシークレットスコープの名前に置き換えます。<github-token-key>
を、GitHub トークンに割り当てたキーの名前に置き換えます。
タスクを保存をクリックします。
データを変換するタスクを追加する
クリック作成したタスクの下にあります。
[タスク名] にタスクの名前を入力します(例:
transform_github_data
)。[タイプ]で[Delta Live Tables パイプライン]を選択し、タスクの名前を入力します。
[パイプライン] で、 「ステップ 3:Delta Live TablesGitHub データを処理するための パイプラインの作成」 で作成したパイプラインを選択します。
[作成]をクリックします。
ステップ 5: データ変換ワークフローを実行する
クリックワークフローを実行します。 実行の詳細 を表示するには、 ジョブ実行 ビューで実行の 開始時刻 列のリンクをクリックします。各タスクをクリックすると、タスク実行の詳細が表示されます。
ステップ 6: (オプション) ワークフロー実行の完了後に出力データを表示するには、次のステップを実行します。
実行の詳細ビューで、Delta Live Tables タスクをクリックします。
[タスク実行の詳細]パネルで、 [パイプライン]の下にあるパイプライン名をクリックします。 パイプラインの詳細ページが表示されます。
パイプライン DAG 内の
commits_by_author
テーブルを選択します。コミットパネルのMetastoreの横にあるテーブル名をクリックします。 「カタログエクスプローラ」(Catalog Explorer) ページが開きます。
カタログ エクスプローラーでは、テーブル スキーマ、サンプル データ、およびデータのその他の詳細を表示できます。 同じステップに従って、 github_contributors_raw
テーブルのデータを表示します。
ステップ 7: GitHub データを削除する
実際のアプリケーションでは、データを継続的に取り込み、処理している可能性があります。 この例ではデータ セット全体をダウンロードして処理するため、ワークフローを再実行するときにエラーが発生しないように、すでにダウンロードした GitHub データを削除する必要があります。 ダウンロードしたデータを削除するには、次のステップを実行します。
新しいノートブックを作成し、最初のセルに次のコマンドを入力します。
dbutils.fs.rm("<commits-path", True) dbutils.fs.rm("<contributors-path", True)
<commits-path>
と<contributors-path>
を、Python タスクの作成時に構成した DBFS パスに置き換えます。クリックセルの実行を選択します。
このノートブックをワークフローのタスクとして追加することもできます。
ステップ 8: Databricks SQL クエリーを作成する
ワークフローを実行して必要なテーブルを作成したら、準備したデータを分析するためのクエリを作成します。 サンプルクエリと視覚化を作成するには、次の手順を実行します。
月別の上位 10 人の貢献者を表示する
Databricksロゴの下のアイコンをクリックしますサイドバーでSQLを選択します。
[クエリの作成] をクリックして、Databricks SQL クエリ エディターを開きます。
カタログが hive_metastore に設定されていることを確認します。 hive_metastoreの横にあるデフォルトをクリックし、データベースを Delta Live Tables パイプラインで設定したターゲット値に設定します。
[新しいクエリ]タブで、次のクエリを入力します。
SELECT date_part('YEAR', date) AS year, date_part('MONTH', date) AS month, name, count(1) FROM commits_by_author WHERE name IN ( SELECT name FROM commits_by_author GROUP BY name ORDER BY count(name) DESC LIMIT 10 ) AND date_part('YEAR', date) >= 2022 GROUP BY name, year, month ORDER BY year, month, name
[新しいクエリ]タブをクリックし、クエリの名前を変更します (例:
Commits by month top 10 contributors
。デフォルトでは、結果は表として表示されます。 データの視覚化方法を変更するには (たとえば、棒グラフを使用するなどして) 、[ 結果 ] パネルで [ ] をクリックし、[ 編集] をクリックします。
[視覚化の種類] で、 [棒] を選択します。
[X] 列で [month] を選択します。
[Y 列] で [count(1)] を選択します。
「グループ化」で、名前を選択します。
[保存]をクリックします。
上位 20 人の貢献者を表示する
[+ > [新しいクエリの作成] をクリックし、カタログが hive_metastore に設定されていることを確認します。hive_metastoreの横にあるデフォルトをクリックし、データベースを Delta Live Tables パイプラインで設定したターゲット値に設定します。
次のクエリを入力します。
SELECT login, cast(contributions AS INTEGER) FROM github_contributors_raw ORDER BY contributions DESC LIMIT 20
[新しいクエリ]タブをクリックし、クエリの名前を変更します (例:
Top 20 contributors
。デフォルトのテーブルから視覚化を変更するには、結果パネルで、編集をクリックします。
[視覚化の種類] で、 [棒] を選択します。
[X] 列で [login] を選択します。
[Y 列] で、投稿を選択します。
[保存]をクリックします。
作成者別のコミットの合計を表示する
[+ > [新しいクエリの作成] をクリックし、カタログが hive_metastore に設定されていることを確認します。hive_metastoreの横にあるデフォルトをクリックし、データベースを Delta Live Tables パイプラインで設定したターゲット値に設定します。
次のクエリを入力します。
SELECT name, count(1) commits FROM commits_by_author GROUP BY name ORDER BY commits DESC LIMIT 10
[新しいクエリ]タブをクリックし、クエリの名前を変更します (例:
Total commits by author
。デフォルトのテーブルから視覚化を変更するには、結果パネルで、編集をクリックします。
[視覚化の種類] で、 [棒] を選択します。
[X] 列で [名前] を選択します。
Y 列で、コミットを選択します。
[保存]をクリックします。
ステップ 9: ダッシュボードを作成する
サイドバーで、「 ダッシュボード 」をクリックします
[ ダッシュボードの作成] をクリックします。
ダッシュボードの名前を入力します (例:
GitHub analysis
)。手順 8: Databricks SQLクエリを作成する で作成したクエリと視覚化ごとに、 [追加] > [視覚化]をクリックし、各視覚化を選択します。
ステップ 10: ワークフローに SQL タスクを追加する
「Databricks ジョブの作成」で作成したワークフローに新しいクエリ タスクを追加し、 「ステップ 8:Databricks SQL を作成する」 Databricksで作成したクエリごとに のタスクを追加する には、次の手順を実行します。
サイドバーの[ワークフロー]をクリックします。
「名前」列で、ジョブ名をクリックします。
「タスク」タブをクリックします。
クリック最後のタスクの下にあります。
タスクの名前を入力し、タイプでSQLを選択し、 SQL タスクでクエリを選択します。
SQL クエリでクエリを選択します。
SQLウェアハウスで、タスクを実行するサーバレスSQLウェアハウスまたはプロSQLウェアハウスを選択します。
[作成]をクリックします。
ステップ 11: ダッシュボードタスクを追加する
クリック最後のタスクの下にあります。
タスクの名前を入力し、 [タイプ]で[SQL]を選択し、 [SQL タスク]で[レガシー ダッシュボード]を選択します。
「ステップ 9: ダッシュボードを作成する」で作成したダッシュボードを選択します。
SQLウェアハウスで、タスクを実行するサーバレスSQLウェアハウスまたはプロSQLウェアハウスを選択します。
[作成]をクリックします。