Databricksジョブでの Databricks SQL の使用

Databricks ジョブで SQL タスク タイプを使用すると、クエリ、レガシ ダッシュボード、アラートなどの Databricks SQL オブジェクトを含むワークフローを作成、スケジュール、操作、監視できます。 たとえば、ワークフローでは、データを取り込み、データを準備し、Databricks SQL クエリを使用して分析を実行し、結果を従来のダッシュボードに表示できます。

この記事では、GitHub への貢献のメトリクスを表示するレガシー ダッシュボードを作成するワークフローの例を示します。 この例では、次のことを行います。

  • Python スクリプトと GitHub REST API を使用して GitHub データを取り込みます。

  • Delta Live Tables パイプラインを使用して GitHub データを変換します。

  • 準備されたデータの分析を実行する Databricks SQL クエリーをトリガーします。

  • レガシーダッシュボードに分析を表示します。

GitHub 分析ダッシュボード

始める前に

このチュートリアルを完了するには、次のものが必要です。

ステップ 1: GitHub トークンをシークレットに保存する

Databricks では、GitHub 個人用アクセストークンなどの資格情報をジョブにハードコーディングする代わりに、シークレットスコープを使用してシークレットを安全に格納および管理することをお勧めします。 次の Databricks CLI コマンドは、シークレット スコープを作成し、そのスコープ内のシークレットに GitHub トークンを格納する例です。

databricks secrets create-scope <scope-name>
databricks secrets put-secret <scope-name> <token-key> --string-value <token>
  • <scope-name を Databricks シークレット スコープの名前に置き換えて、トークンを格納します。

  • <token-key> をトークンに割り当てるキーの名前に置き換えます。

  • <token> を GitHub 個人用アクセストークンの値に置き換えます。

ステップ 2: GitHub データを取得するスクリプトを作成する

次の Python スクリプトは、GitHub REST API を使用して、GitHub リポジトリからコミットとコントリビューションに関するデータを取得します。 入力引数は GitHub リポジトリを指定します。 レコードは、別の入力引数で指定された DBFS 内の場所に保存されます。

この例では DBFS を使用して Python スクリプトを保存していますが、 Databricks Git フォルダーまたはワークスペース ファイルを使用してスクリプトを保存および管理することもできます。

  • このスクリプトをローカルディスク上の場所に保存します。

    import json
    import requests
    import sys
    
    api_url = "https://api.github.com"
    
    def get_commits(owner, repo, token, path):
      page = 1
      request_url =  f"{api_url}/repos/{owner}/{repo}/commits"
      more = True
    
      get_response(request_url, f"{path}/commits", token)
    
    
    def get_contributors(owner, repo, token, path):
      page = 1
      request_url =  f"{api_url}/repos/{owner}/{repo}/contributors"
      more = True
    
      get_response(request_url, f"{path}/contributors", token)
    
    
    def get_response(request_url, path, token):
      page = 1
      more = True
    
      while more:
        response = requests.get(request_url, params={'page': page}, headers={'Authorization': "token " + token})
        if response.text != "[]":
          write(path + "/records-" + str(page) + ".json", response.text)
          page += 1
        else:
          more = False
    
    
    def write(filename, contents):
      dbutils.fs.put(filename, contents)
    
    
    def main():
      args = sys.argv[1:]
      if len(args) < 6:
        print("Usage: github-api.py owner repo request output-dir secret-scope secret-key")
        sys.exit(1)
    
      owner = sys.argv[1]
      repo = sys.argv[2]
      request = sys.argv[3]
      output_path = sys.argv[4]
      secret_scope = sys.argv[5]
      secret_key = sys.argv[6]
    
      token = dbutils.secrets.get(scope=secret_scope, key=secret_key)
    
      if (request == "commits"):
        get_commits(owner, repo, token, output_path)
      elif (request == "contributors"):
        get_contributors(owner, repo, token, output_path)
    
    
    if __name__ == "__main__":
        main()
    
  • スクリプトを DBFS にアップロードします。

    1. Databricks ランディングページに移動し、サイドバーのカタログ アイコン [ カタログ] をクリックします 。

    2. [ DBFS の参照] をクリックします。

    3. DBFS ファイル ブラウザで、[ アップロード] をクリックします。 [ DBFS へのデータのアップロード] ダイアログが表示されます。

    4. スクリプトを保存するパスを DBFS に入力し、[ アップロードするファイルのドロップ] をクリックするか、クリックして参照し、Python スクリプトを選択します。

    5. [ 完了] をクリックします。

ステップ 3: GitHub データを 処理するための Delta Live Tables パイプラインを作成する

このセクションでは、生の GitHub データを Databricks SQL クエリーで分析できるテーブルに変換する Delta Live Tables パイプラインを作成します。 パイプラインを作成するには、次のステップを実行します。

  1. サイドバーで 新しいアイコン [ 新規 ] をクリックし、メニューから [ノートブック ] を選択します。[ ノートブックの作成] ダイアログが表示されます。

  2. [既定の言語] に名前を入力し、[Python] を選択します。[クラスター] は既定値に設定したままにしておくことができます。Delta Live Tables ランタイムは、パイプラインを実行する前にクラスターを作成します。

  3. [作成]をクリックします。

  4. Python コード例をコピーして、新しいノートブックに貼り付けます。 コード例は、ノートブックの 1 つのセルまたは複数のセルに追加できます。

    import dlt
    from pyspark.sql.functions import *
    
    def parse(df):
       return (df
         .withColumn("author_date", to_timestamp(col("commit.author.date")))
         .withColumn("author_email", col("commit.author.email"))
         .withColumn("author_name", col("commit.author.name"))
         .withColumn("comment_count", col("commit.comment_count"))
         .withColumn("committer_date", to_timestamp(col("commit.committer.date")))
         .withColumn("committer_email", col("commit.committer.email"))
         .withColumn("committer_name", col("commit.committer.name"))
         .withColumn("message", col("commit.message"))
         .withColumn("sha", col("commit.tree.sha"))
         .withColumn("tree_url", col("commit.tree.url"))
         .withColumn("url", col("commit.url"))
         .withColumn("verification_payload", col("commit.verification.payload"))
         .withColumn("verification_reason", col("commit.verification.reason"))
         .withColumn("verification_signature", col("commit.verification.signature"))
         .withColumn("verification_verified", col("commit.verification.signature").cast("string"))
         .drop("commit")
       )
    
    @dlt.table(
       comment="Raw GitHub commits"
    )
    def github_commits_raw():
      df = spark.read.json(spark.conf.get("commits-path"))
      return parse(df.select("commit"))
    
    @dlt.table(
      comment="Info on the author of a commit"
    )
    def commits_by_author():
      return (
        dlt.read("github_commits_raw")
          .withColumnRenamed("author_date", "date")
          .withColumnRenamed("author_email", "email")
          .withColumnRenamed("author_name", "name")
          .select("sha", "date", "email", "name")
      )
    
    @dlt.table(
      comment="GitHub repository contributors"
    )
    def github_contributors_raw():
      return(
        spark.readStream.format("cloudFiles")
          .option("cloudFiles.format", "json")
          .load(spark.conf.get("contribs-path"))
      )
    
  5. サイドバーで、[ジョブアイコン ワークフロー ] をクリックし、[Delta Live Tables ] タブをクリックして、[ パイプラインの作成] をクリックします。

  6. パイプラインに Transform GitHub dataなどの名前を付けます。

  7. [ ノートブック ライブラリ ] フィールドに、ノートブックへのパスを入力するか、ノートブックをクリックして ファイル ピッカー アイコン 選択します。

  8. [ 構成の追加] をクリックします。 Key テキスト ボックスに commits-pathと入力します。[ Value ] テキスト ボックスに、GitHub レコードが書き込まれる DBFS パスを入力します。 これは任意のパスを選択でき、 ワークフローの作成時に最初の Python タスクを構成するときに使用するパスと同じです。

  9. [ 構成の追加 ] をもう一度クリックします。 Key テキスト ボックスに contribs-pathと入力します。[ Value ] テキスト ボックスに、GitHub レコードが書き込まれる DBFS パスを入力します。 これは任意のパスを選択でき、 ワークフローの作成時に 2 番目の Python タスクを構成するときに使用するパスと同じです。

  10. [ ターゲット ] フィールドに、ターゲット データベースを入力します (例: github_tables)。 ターゲット データベースを設定すると、出力データがメタストアに発行され、パイプラインによって生成されたデータをダウンストリーム クエリーで分析するために必要です。

  11. [保存]をクリックします。

ステップ 4: GitHub データを取り込んで変換するためのワークフローを作成する

Databricks SQL を使用して GitHub データを分析および視覚化する前に、データを取り込んで準備する必要があります。 これらのタスクを実行するワークフローを作成するには、次のステップを実行します。

Databricks ジョブを作成し、最初のタスクを追加する

  1. Databricks ランディングページに移動し、次のいずれかの操作を行います。

    • サイドバーで「 ジョブアイコン ワークフロー」 をクリックし[ジョブの作成] ボタン をクリックします。

    • サイドバーで「 新しいアイコン 新規 」をクリックし、メニューから「 ジョブ 」を選択します。

  2. [タスク] タブに表示されるタスク ダイアログ ボックスで、[ジョブの名前の追加] をジョブ名 ( GitHub analysis workflowなど) に置き換えます。

  3. [ タスク名] に、タスクの名前を入力します (例: get_commits)。

  4. [ 種類] で [ Python スクリプト] を選択します。

  5. [ソース][DBFS/S3] を選択します。

  6. [パス] に、DBFS のスクリプトへのパスを入力します。

  7. [パラメーター] に、Python スクリプトの次の引数を入力します。

    ["<owner>","<repo>","commits","<DBFS-output-dir>","<scope-name>","<github-token-key>"]

    • <owner> をリポジトリ所有者の名前に置き換えます。たとえば、 github.com/databrickslabs/overwatch リポジトリからレコードを取得するには、 databrickslabsと入力します。

    • <repo> をリポジトリ名 ( overwatchなど) に置き換えます。

    • <DBFS-output-dir> を DBFS のパスに置き換えて、GitHub から取得したレコードを保存します。

    • <scope-name> を、GitHub トークンを格納するために作成したシークレットスコープの名前に置き換えます。

    • <github-token-key> を GitHub トークンに割り当てたキーの名前に置き換えます。

  8. [ タスクの保存] をクリックします。

別のタスクを追加する

  1. 作成したタスクの下をクリックします [タスクの追加] ボタン

  2. [ タスク名] に、タスクの名前を入力します (例: get_contributors)。

  3. [ 種類] で、タスクの種類として [Python スクリプト ] を選択します。

  4. [ソース][DBFS/S3] を選択します。

  5. [パス] に、DBFS のスクリプトへのパスを入力します。

  6. [パラメーター] に、Python スクリプトの次の引数を入力します。

    ["<owner>","<repo>","contributors","<DBFS-output-dir>","<scope-name>","<github-token-key>"]

    • <owner> をリポジトリ所有者の名前に置き換えます。たとえば、 github.com/databrickslabs/overwatch リポジトリからレコードを取得するには、 databrickslabsと入力します。

    • <repo> をリポジトリ名 ( overwatchなど) に置き換えます。

    • <DBFS-output-dir> を DBFS のパスに置き換えて、GitHub から取得したレコードを保存します。

    • <scope-name> を、GitHub トークンを格納するために作成したシークレットスコープの名前に置き換えます。

    • <github-token-key> を GitHub トークンに割り当てたキーの名前に置き換えます。

  7. [ タスクの保存] をクリックします。

データを変換するタスクを追加する

  1. 作成したタスクの下をクリックします [タスクの追加] ボタン

  2. [ タスク名] に、タスクの名前を入力します (例: transform_github_data)。

  3. [種類] で [パイプラインDelta Live Tables ] を選択し、タスクの名前を入力します。

  4. [パイプライン] で、「 ステップ 3: Delta Live Tables パイプラインを作成する」で作成したパイプラインを選択して、GitHub データを処理します

  5. [作成]をクリックします。

ステップ 5: データ変換ワークフローを実行する

クリックして [今すぐ実行] ボタン ワークフローを実行します。 実行の詳細を表示するには、 「 ジョブの実行 」ビュー で 実 行 の「 開始時 刻 」列のリンクをクリックします。各タスクをクリックすると、タスク実行の詳細が表示されます。

ステップ 6: (オプション) ワークフロー実行の完了後に出力データを表示するには、次の手順を実行します。

  1. 実行の詳細ビューで、 Delta Live Tables タスクをクリックします。

  2. [ タスク実行の詳細 ] パネルで、[パイプライン] の下にある パイプライン名をクリックします。 [パイプラインの詳細] ページが表示されます。

  3. パイプライン DAG で commits_by_author テーブルを選択します。

  4. コミット パネルの [メタストア] の横にあるテーブル名をクリックします。[カタログ エクスプローラ]ページが開きます。

カタログ エクスプローラーでは、テーブル スキーマ、サンプル データ、およびデータのその他の詳細を表示できます。 同じ手順に従って、 github_contributors_raw テーブルのデータを表示します。

ステップ 7: GitHub データを削除する

実際のアプリケーションでは、継続的にデータを取り込んで処理している可能性があります。 この例ではデータセット全体をダウンロードして処理するため、ワークフローを再実行するときにエラーが発生しないように、ダウンロード済みの GitHub データを削除する必要があります。 ダウンロードしたデータを削除するには、次の手順に従います。

  1. 新しいノートブックを作成し、最初のセルに次のコマンドを入力します。

    dbutils.fs.rm("<commits-path", True)
    dbutils.fs.rm("<contributors-path", True)
    

    <commits-path><contributors-path> を、Python タスクの作成時に構成した DBFS パスに置き換えます。

  2. をクリックして 実行メニュー [ セルの実行] を選択します。

このノートブックをワークフローのタスクとして追加することもできます。

ステップ 8: Databricks SQL クエリーを作成する

ワークフローを実行し、必要なテーブルを作成したら、準備したデータを分析するためのクエリーを作成します。 クエリーとビジュアライゼーションの例を作成するには、次のステップを実行します。

月別の上位 10 人の貢献者を表示する

  1. サイドバーの Databricks ロゴ Databricks ロゴ の下にあるアイコンをクリックし、[ SQL] を選択します。

  2. [ クエリの作成 ] をクリックして、Databricks SQL クエリ エディターを開きます。

  3. カタログが hive_metastore に設定されていることを確認します。 [hive_metastore ] の横にある [デフォルト ] をクリックし、 Delta Live Tables パイプラインで設定した Target 値にデータベースを設定します。

  4. [ 新しいクエリー ]タブで、次のように入力します。

    SELECT
      date_part('YEAR', date) AS year,
      date_part('MONTH', date) AS month,
      name,
      count(1)
    FROM
      commits_by_author
    WHERE
      name IN (
        SELECT
          name
        FROM
          commits_by_author
        GROUP BY
          name
        ORDER BY
          count(name) DESC
        LIMIT 10
      )
      AND
        date_part('YEAR', date) >= 2022
    GROUP BY
      name, year, month
    ORDER BY
      year, month, name
    
  5. [ 新しいクエリー ] タブをクリックし、クエリーの名前を Commits by month top 10 contributorsに変更します。

  6. デフォルトにより、結果は表として表示されます。 棒グラフなどを使用してデータの視覚化方法を変更するには、[ 結果 ] パネルで をクリックし ジョブの垂直省略記号 、[ 編集] をクリックします。

  7. [ビジュアライゼーションの種類] で [バー] を選択します。

  8. [X] 列[月] を選択します。

  9. Y 列count(1) を選択します。

  10. [ グループ化] で [ 名前] を選択します。

  11. [保存]をクリックします。

上位 20 人の貢献者を表示する

  1. [ + > [新しいクエリー の作成] をクリックし、カタログが [hive_metastore] に設定されていることを確認します。 [hive_metastore ] の横にある [デフォルト ] をクリックし、 Delta Live Tables パイプラインで設定した Target 値にデータベースを設定します。

  2. クエリーを次のように入力します。

    SELECT
      login,
      cast(contributions AS INTEGER)
    FROM
      github_contributors_raw
    ORDER BY
      contributions DESC
    LIMIT 20
    
  3. [ 新しいクエリー ] タブをクリックし、クエリーの名前を Top 20 contributorsに変更します。

  4. ビジュアライゼーションをデフォルト テーブルから変更するには、[ 結果 ] パネルで をクリックし ジョブの垂直省略記号 、[ 編集] をクリックします。

  5. [ビジュアライゼーションの種類] で [バー] を選択します。

  6. [X] 列で [ログイン] を選択します。

  7. Y 列で、 貢献度を選択します。

  8. [保存]をクリックします。

作成者別のコミットの合計を表示する

  1. [ + > [新しいクエリー の作成] をクリックし、カタログが [hive_metastore] に設定されていることを確認します。 [hive_metastore ] の横にある [デフォルト ] をクリックし、 Delta Live Tables パイプラインで設定した Target 値にデータベースを設定します。

  2. クエリーを次のように入力します。

    SELECT
      name,
      count(1) commits
    FROM
      commits_by_author
    GROUP BY
      name
    ORDER BY
      commits DESC
    LIMIT 10
    
  3. [ 新しいクエリー ] タブをクリックし、クエリーの名前を Total commits by authorに変更します。

  4. ビジュアライゼーションをデフォルト テーブルから変更するには、[ 結果 ] パネルで をクリックし ジョブの垂直省略記号 、[ 編集] をクリックします。

  5. [ビジュアライゼーションの種類] で [バー] を選択します。

  6. [X] 列で [名前] を選択します。

  7. [Y] 列で [コミット] を選択します。

  8. [保存]をクリックします。

ステップ 9: ダッシュボードを作成する

  1. サイドバーで、[ダッシュボードアイコン ダッシュボード] をクリックします。

  2. [ ダッシュボードの作成] をクリックします。

  3. ダッシュボードの名前を入力します (例: GitHub analysis)。

  4. 「ステップ 8: Databricks SQL を作成する」で作成したクエリーとビジュアライゼーションごとに、[> ビジュアライゼーションの追加] をクリックし、各 ビジュアライゼーション を選択します。

ステップ 10: ワークフローに SQL タスクを追加する

「Databricks ジョブの作成」で作成したワークフローに新しいクエリー タスクを追加し、「 ステップ 8: Databricks SQL クエリーを作成する」で作成したクエリーごとに最初のタスクを追加するには、次のようにします。

  1. ジョブアイコン サイドバー の [ワークフロー] をクリックします

  2. [ 名前 ] 列で、ジョブ名をクリックします。

  3. [タスク] タブをクリックします。

  4. 最後のタスクの下をクリックします [タスクの追加] ボタン

  5. タスクの名前を入力し、[ 種類 ] で [SQL] を選択し、[ SQL タスク ] で [クエリー] を選択します。

  6. SQL クエリーでクエリーを選択します。

  7. SQLウェアハウスで、タスクを実行するサーバレス SQLウェアハウスまたはプロのSQL ウェアハウスを選択します。

  8. [作成]をクリックします。

ステップ 11: ダッシュボードタスクを追加する

  1. 最後のタスクの下をクリックします [タスクの追加] ボタン

  2. タスクの名前を入力し、 [タイプ][SQL]を選択し、 [SQL タスク][レガシー ダッシュボード]を選択します。

  3. 「ステップ 9: ダッシュボードを作成する」で作成したダッシュボードを選択します。

  4. SQLウェアハウスで、タスクを実行するサーバレス SQLウェアハウスまたはプロのSQL ウェアハウスを選択します。

  5. [作成]をクリックします。

ステップ 12: 完全なワークフローを実行する

ワークフローを実行するには、 [今すぐ実行] ボタンをクリックします。 実行の詳細を表示するには、 「 ジョブの実行 」ビュー で 実 行 の「 開始時 刻 」列のリンクをクリックします。

ステップ 13: 結果を表示する

実行の完了時に結果を表示するには、最後のダッシュボード タスクをクリックし、右側のパネルの [SQL ダッシュボード] の下にあるダッシュボード名をクリックします。