Databricksジョブでの Databricks SQL の使用
Databricks ジョブで SQL タスク タイプを使用すると、クエリ、レガシ ダッシュボード、アラートなどの Databricks SQL オブジェクトを含むワークフローを作成、スケジュール、操作、監視できます。 たとえば、ワークフローでは、データを取り込み、データを準備し、Databricks SQL クエリを使用して分析を実行し、結果を従来のダッシュボードに表示できます。
この記事では、GitHub への貢献のメトリクスを表示するレガシー ダッシュボードを作成するワークフローの例を示します。 この例では、次のことを行います。
Python スクリプトと GitHub REST API を使用して GitHub データを取り込みます。
Delta Live Tables パイプラインを使用して GitHub データを変換します。
準備されたデータの分析を実行する Databricks SQL クエリーをトリガーします。
レガシーダッシュボードに分析を表示します。
始める前に
このチュートリアルを完了するには、次のものが必要です。
A GitHub personal accesstokenこのトークンには、 リポジトリ のアクセス許可が必要です。
サーバーレス SQL ウェアハウスまたはプロ SQL ウェアハウス。 SQLウェアハウス型を参照してください。
A Databricks シークレットスコープ. シークレットスコープは、GitHub トークンを安全に格納するために使用されます。 「 ステップ 1: GitHub トークンをシークレットに保存する」を参照してください。
ステップ 1: GitHub トークンをシークレットに保存する
Databricks では、GitHub 個人用アクセストークンなどの資格情報をジョブにハードコーディングする代わりに、シークレットスコープを使用してシークレットを安全に格納および管理することをお勧めします。 次の Databricks CLI コマンドは、シークレット スコープを作成し、そのスコープ内のシークレットに GitHub トークンを格納する例です。
databricks secrets create-scope <scope-name>
databricks secrets put-secret <scope-name> <token-key> --string-value <token>
<scope-name
を Databricks シークレット スコープの名前に置き換えて、トークンを格納します。<token-key>
をトークンに割り当てるキーの名前に置き換えます。<token>
を GitHub 個人用アクセストークンの値に置き換えます。
ステップ 2: GitHub データを取得するスクリプトを作成する
次の Python スクリプトは、GitHub REST API を使用して、GitHub リポジトリからコミットとコントリビューションに関するデータを取得します。 入力引数は GitHub リポジトリを指定します。 レコードは、別の入力引数で指定された DBFS 内の場所に保存されます。
この例では DBFS を使用して Python スクリプトを保存していますが、 Databricks Git フォルダーまたはワークスペース ファイルを使用してスクリプトを保存および管理することもできます。
このスクリプトをローカルディスク上の場所に保存します。
import json import requests import sys api_url = "https://api.github.com" def get_commits(owner, repo, token, path): page = 1 request_url = f"{api_url}/repos/{owner}/{repo}/commits" more = True get_response(request_url, f"{path}/commits", token) def get_contributors(owner, repo, token, path): page = 1 request_url = f"{api_url}/repos/{owner}/{repo}/contributors" more = True get_response(request_url, f"{path}/contributors", token) def get_response(request_url, path, token): page = 1 more = True while more: response = requests.get(request_url, params={'page': page}, headers={'Authorization': "token " + token}) if response.text != "[]": write(path + "/records-" + str(page) + ".json", response.text) page += 1 else: more = False def write(filename, contents): dbutils.fs.put(filename, contents) def main(): args = sys.argv[1:] if len(args) < 6: print("Usage: github-api.py owner repo request output-dir secret-scope secret-key") sys.exit(1) owner = sys.argv[1] repo = sys.argv[2] request = sys.argv[3] output_path = sys.argv[4] secret_scope = sys.argv[5] secret_key = sys.argv[6] token = dbutils.secrets.get(scope=secret_scope, key=secret_key) if (request == "commits"): get_commits(owner, repo, token, output_path) elif (request == "contributors"): get_contributors(owner, repo, token, output_path) if __name__ == "__main__": main()
スクリプトを DBFS にアップロードします。
Databricks ランディングページに移動し、サイドバーの [ カタログ] をクリックします 。
[ DBFS の参照] をクリックします。
DBFS ファイル ブラウザで、[ アップロード] をクリックします。 [ DBFS へのデータのアップロード] ダイアログが表示されます。
スクリプトを保存するパスを DBFS に入力し、[ アップロードするファイルのドロップ] をクリックするか、クリックして参照し、Python スクリプトを選択します。
[ 完了] をクリックします。
ステップ 3: GitHub データを 処理するための Delta Live Tables パイプラインを作成する
このセクションでは、生の GitHub データを Databricks SQL クエリーで分析できるテーブルに変換する Delta Live Tables パイプラインを作成します。 パイプラインを作成するには、次のステップを実行します。
サイドバーで [ 新規 ] をクリックし、メニューから [ノートブック ] を選択します。[ ノートブックの作成] ダイアログが表示されます。
[既定の言語] に名前を入力し、[Python] を選択します。[クラスター] は既定値に設定したままにしておくことができます。Delta Live Tables ランタイムは、パイプラインを実行する前にクラスターを作成します。
[作成]をクリックします。
Python コード例をコピーして、新しいノートブックに貼り付けます。 コード例は、ノートブックの 1 つのセルまたは複数のセルに追加できます。
import dlt from pyspark.sql.functions import * def parse(df): return (df .withColumn("author_date", to_timestamp(col("commit.author.date"))) .withColumn("author_email", col("commit.author.email")) .withColumn("author_name", col("commit.author.name")) .withColumn("comment_count", col("commit.comment_count")) .withColumn("committer_date", to_timestamp(col("commit.committer.date"))) .withColumn("committer_email", col("commit.committer.email")) .withColumn("committer_name", col("commit.committer.name")) .withColumn("message", col("commit.message")) .withColumn("sha", col("commit.tree.sha")) .withColumn("tree_url", col("commit.tree.url")) .withColumn("url", col("commit.url")) .withColumn("verification_payload", col("commit.verification.payload")) .withColumn("verification_reason", col("commit.verification.reason")) .withColumn("verification_signature", col("commit.verification.signature")) .withColumn("verification_verified", col("commit.verification.signature").cast("string")) .drop("commit") ) @dlt.table( comment="Raw GitHub commits" ) def github_commits_raw(): df = spark.read.json(spark.conf.get("commits-path")) return parse(df.select("commit")) @dlt.table( comment="Info on the author of a commit" ) def commits_by_author(): return ( dlt.read("github_commits_raw") .withColumnRenamed("author_date", "date") .withColumnRenamed("author_email", "email") .withColumnRenamed("author_name", "name") .select("sha", "date", "email", "name") ) @dlt.table( comment="GitHub repository contributors" ) def github_contributors_raw(): return( spark.readStream.format("cloudFiles") .option("cloudFiles.format", "json") .load(spark.conf.get("contribs-path")) )
サイドバーで、[ ワークフロー ] をクリックし、[Delta Live Tables ] タブをクリックして、[ パイプラインの作成] をクリックします。
パイプラインに
Transform GitHub data
などの名前を付けます。[ ノートブック ライブラリ ] フィールドに、ノートブックへのパスを入力するか、ノートブックをクリックして 選択します。
[ 構成の追加] をクリックします。
Key
テキスト ボックスにcommits-path
と入力します。[Value
] テキスト ボックスに、GitHub レコードが書き込まれる DBFS パスを入力します。 これは任意のパスを選択でき、 ワークフローの作成時に最初の Python タスクを構成するときに使用するパスと同じです。[ 構成の追加 ] をもう一度クリックします。
Key
テキスト ボックスにcontribs-path
と入力します。[Value
] テキスト ボックスに、GitHub レコードが書き込まれる DBFS パスを入力します。 これは任意のパスを選択でき、 ワークフローの作成時に 2 番目の Python タスクを構成するときに使用するパスと同じです。[ ターゲット ] フィールドに、ターゲット データベースを入力します (例:
github_tables
)。 ターゲット データベースを設定すると、出力データがメタストアに発行され、パイプラインによって生成されたデータをダウンストリーム クエリーで分析するために必要です。[保存]をクリックします。
ステップ 4: GitHub データを取り込んで変換するためのワークフローを作成する
Databricks SQL を使用して GitHub データを分析および視覚化する前に、データを取り込んで準備する必要があります。 これらのタスクを実行するワークフローを作成するには、次のステップを実行します。
Databricks ジョブを作成し、最初のタスクを追加する
Databricks ランディングページに移動し、次のいずれかの操作を行います。
サイドバーで「 ワークフロー」 をクリックし 、 をクリックします。
サイドバーで「 新規 」をクリックし、メニューから「 ジョブ 」を選択します。
[タスク] タブに表示されるタスク ダイアログ ボックスで、[ジョブの名前の追加] をジョブ名 (
GitHub analysis workflow
など) に置き換えます。[ タスク名] に、タスクの名前を入力します (例:
get_commits
)。[ 種類] で [ Python スクリプト] を選択します。
[ソース] で [DBFS/S3] を選択します。
[パス] に、DBFS のスクリプトへのパスを入力します。
[パラメーター] に、Python スクリプトの次の引数を入力します。
["<owner>","<repo>","commits","<DBFS-output-dir>","<scope-name>","<github-token-key>"]
<owner>
をリポジトリ所有者の名前に置き換えます。たとえば、github.com/databrickslabs/overwatch
リポジトリからレコードを取得するには、databrickslabs
と入力します。<repo>
をリポジトリ名 (overwatch
など) に置き換えます。<DBFS-output-dir>
を DBFS のパスに置き換えて、GitHub から取得したレコードを保存します。<scope-name>
を、GitHub トークンを格納するために作成したシークレットスコープの名前に置き換えます。<github-token-key>
を GitHub トークンに割り当てたキーの名前に置き換えます。
[ タスクの保存] をクリックします。
別のタスクを追加する
作成したタスクの下をクリックします 。
[ タスク名] に、タスクの名前を入力します (例:
get_contributors
)。[ 種類] で、タスクの種類として [Python スクリプト ] を選択します。
[ソース] で [DBFS/S3] を選択します。
[パス] に、DBFS のスクリプトへのパスを入力します。
[パラメーター] に、Python スクリプトの次の引数を入力します。
["<owner>","<repo>","contributors","<DBFS-output-dir>","<scope-name>","<github-token-key>"]
<owner>
をリポジトリ所有者の名前に置き換えます。たとえば、github.com/databrickslabs/overwatch
リポジトリからレコードを取得するには、databrickslabs
と入力します。<repo>
をリポジトリ名 (overwatch
など) に置き換えます。<DBFS-output-dir>
を DBFS のパスに置き換えて、GitHub から取得したレコードを保存します。<scope-name>
を、GitHub トークンを格納するために作成したシークレットスコープの名前に置き換えます。<github-token-key>
を GitHub トークンに割り当てたキーの名前に置き換えます。
[ タスクの保存] をクリックします。
データを変換するタスクを追加する
作成したタスクの下をクリックします 。
[ タスク名] に、タスクの名前を入力します (例:
transform_github_data
)。[種類] で [パイプラインDelta Live Tables ] を選択し、タスクの名前を入力します。
[パイプライン] で、「 ステップ 3: Delta Live Tables パイプラインを作成する」で作成したパイプラインを選択して、GitHub データを処理します。
[作成]をクリックします。
ステップ 5: データ変換ワークフローを実行する
クリックして ワークフローを実行します。 実行の詳細を表示するには、 「 ジョブの実行 」ビュー で 実 行 の「 開始時 刻 」列のリンクをクリックします。各タスクをクリックすると、タスク実行の詳細が表示されます。
ステップ 6: (オプション) ワークフロー実行の完了後に出力データを表示するには、次の手順を実行します。
実行の詳細ビューで、 Delta Live Tables タスクをクリックします。
[ タスク実行の詳細 ] パネルで、[パイプライン] の下にある パイプライン名をクリックします。 [パイプラインの詳細] ページが表示されます。
パイプライン DAG で
commits_by_author
テーブルを選択します。コミット パネルの [メタストア] の横にあるテーブル名をクリックします。[カタログ エクスプローラ]ページが開きます。
カタログ エクスプローラーでは、テーブル スキーマ、サンプル データ、およびデータのその他の詳細を表示できます。 同じ手順に従って、 github_contributors_raw
テーブルのデータを表示します。
ステップ 7: GitHub データを削除する
実際のアプリケーションでは、継続的にデータを取り込んで処理している可能性があります。 この例ではデータセット全体をダウンロードして処理するため、ワークフローを再実行するときにエラーが発生しないように、ダウンロード済みの GitHub データを削除する必要があります。 ダウンロードしたデータを削除するには、次の手順に従います。
新しいノートブックを作成し、最初のセルに次のコマンドを入力します。
dbutils.fs.rm("<commits-path", True) dbutils.fs.rm("<contributors-path", True)
<commits-path>
と<contributors-path>
を、Python タスクの作成時に構成した DBFS パスに置き換えます。をクリックして [ セルの実行] を選択します。
このノートブックをワークフローのタスクとして追加することもできます。
ステップ 8: Databricks SQL クエリーを作成する
ワークフローを実行し、必要なテーブルを作成したら、準備したデータを分析するためのクエリーを作成します。 クエリーとビジュアライゼーションの例を作成するには、次のステップを実行します。
月別の上位 10 人の貢献者を表示する
サイドバーの Databricks ロゴ の下にあるアイコンをクリックし、[ SQL] を選択します。
[ クエリの作成 ] をクリックして、Databricks SQL クエリ エディターを開きます。
カタログが hive_metastore に設定されていることを確認します。 [hive_metastore ] の横にある [デフォルト ] をクリックし、 Delta Live Tables パイプラインで設定した Target 値にデータベースを設定します。
[ 新しいクエリー ]タブで、次のように入力します。
SELECT date_part('YEAR', date) AS year, date_part('MONTH', date) AS month, name, count(1) FROM commits_by_author WHERE name IN ( SELECT name FROM commits_by_author GROUP BY name ORDER BY count(name) DESC LIMIT 10 ) AND date_part('YEAR', date) >= 2022 GROUP BY name, year, month ORDER BY year, month, name
[ 新しいクエリー ] タブをクリックし、クエリーの名前を
Commits by month top 10 contributors
に変更します。デフォルトにより、結果は表として表示されます。 棒グラフなどを使用してデータの視覚化方法を変更するには、[ 結果 ] パネルで をクリックし 、[ 編集] をクリックします。
[ビジュアライゼーションの種類] で [バー] を選択します。
[X] 列で [月] を選択します。
Y 列で count(1) を選択します。
[ グループ化] で [ 名前] を選択します。
[保存]をクリックします。
上位 20 人の貢献者を表示する
[ + > [新しいクエリー の作成] をクリックし、カタログが [hive_metastore] に設定されていることを確認します。 [hive_metastore ] の横にある [デフォルト ] をクリックし、 Delta Live Tables パイプラインで設定した Target 値にデータベースを設定します。
クエリーを次のように入力します。
SELECT login, cast(contributions AS INTEGER) FROM github_contributors_raw ORDER BY contributions DESC LIMIT 20
[ 新しいクエリー ] タブをクリックし、クエリーの名前を
Top 20 contributors
に変更します。ビジュアライゼーションをデフォルト テーブルから変更するには、[ 結果 ] パネルで をクリックし 、[ 編集] をクリックします。
[ビジュアライゼーションの種類] で [バー] を選択します。
[X] 列で [ログイン] を選択します。
Y 列で、 貢献度を選択します。
[保存]をクリックします。
作成者別のコミットの合計を表示する
[ + > [新しいクエリー の作成] をクリックし、カタログが [hive_metastore] に設定されていることを確認します。 [hive_metastore ] の横にある [デフォルト ] をクリックし、 Delta Live Tables パイプラインで設定した Target 値にデータベースを設定します。
クエリーを次のように入力します。
SELECT name, count(1) commits FROM commits_by_author GROUP BY name ORDER BY commits DESC LIMIT 10
[ 新しいクエリー ] タブをクリックし、クエリーの名前を
Total commits by author
に変更します。ビジュアライゼーションをデフォルト テーブルから変更するには、[ 結果 ] パネルで をクリックし 、[ 編集] をクリックします。
[ビジュアライゼーションの種類] で [バー] を選択します。
[X] 列で [名前] を選択します。
[Y] 列で [コミット] を選択します。
[保存]をクリックします。
ステップ 9: ダッシュボードを作成する
サイドバーで、[ ダッシュボード] をクリックします。
[ ダッシュボードの作成] をクリックします。
ダッシュボードの名前を入力します (例:
GitHub analysis
)。「ステップ 8: Databricks SQL を作成する」で作成したクエリーとビジュアライゼーションごとに、[> ビジュアライゼーションの追加] をクリックし、各 ビジュアライゼーション を選択します。
ステップ 10: ワークフローに SQL タスクを追加する
「Databricks ジョブの作成」で作成したワークフローに新しいクエリー タスクを追加し、「 ステップ 8: Databricks SQL クエリーを作成する」で作成したクエリーごとに最初のタスクを追加するには、次のようにします。
サイドバー の [ワークフロー] をクリックします 。
[ 名前 ] 列で、ジョブ名をクリックします。
[タスク] タブをクリックします。
最後のタスクの下をクリックします 。
タスクの名前を入力し、[ 種類 ] で [SQL] を選択し、[ SQL タスク ] で [クエリー] を選択します。
SQL クエリーでクエリーを選択します。
SQLウェアハウスで、タスクを実行するサーバレス SQLウェアハウスまたはプロのSQL ウェアハウスを選択します。
[作成]をクリックします。
ステップ 11: ダッシュボードタスクを追加する
最後のタスクの下をクリックします 。
タスクの名前を入力し、 [タイプ]で[SQL]を選択し、 [SQL タスク]で[レガシー ダッシュボード]を選択します。
「ステップ 9: ダッシュボードを作成する」で作成したダッシュボードを選択します。
SQLウェアハウスで、タスクを実行するサーバレス SQLウェアハウスまたはプロのSQL ウェアハウスを選択します。
[作成]をクリックします。