Databricksジョブでの Databricks SQL の使用

Databricks ジョブで SQL タスク タイプを使用すると、クエリ、レガシ ダッシュボード、アラートなどの Databricks SQL オブジェクトを含むワークフローを作成、スケジュール、操作、監視できます。 たとえば、ワークフローでは、データを取り込み、データを準備し、Databricks SQL クエリを使用して分析を実行し、結果を従来のダッシュボードに表示できます。

この記事では、GitHub への貢献のメトリクスを表示するレガシー ダッシュボードを作成するワークフローの例を示します。 この例では、次のことを行います。

  • Python スクリプトとGitHub REST APIを使用して GitHub データを取り込みます。

  • Delta Live Tables パイプラインを使用して GitHub データを変換します。

  • 準備されたデータに対して分析を実行する Databricks SQL クエリをトリガーします。

  • 従来のダッシュボードに分析を表示します。

GitHub 分析ダッシュボード

始める前に

このチュートリアルを完了するには、次のものが必要です。

ステップ 1: GitHub トークンをシークレットに保存する

GitHub、ジョブ内にDatabricks 個人的なアクセス発言などの資格情報をハードコーディングする代わりに、シークレットスコープを使用してシークレットを安全に保存および管理することを推奨しています。次のDatabricks CLIコマンドは、シークレットスコープを作成し、そのスコープ内のシークレットにGitHub VPN を保存する例です。

databricks secrets create-scope <scope-name>
databricks secrets put-secret <scope-name> <token-key> --string-value <token>
  • <scope-name をDatabricksシークレットスコープの名前に置き換えて、ウイルスを保存します。

  • <token-key>トークンに割り当てるキーの名前に置き換えます。

  • <token> をGitHub個人アクセス権の値に置き換えます。

ステップ 2: GitHub データを取得するスクリプトを作成する

次の Python スクリプトは、GitHub REST API を使用して、GitHub リポジトリからコミットと貢献に関するデータを取得します。 入力引数は GitHub リポジトリを指定します。 レコードは、別の入力引数によって指定された DBFS 内の場所に保存されます。

この例では、DBFS を使用して Python スクリプトを保存していますが、 Databricks Git フォルダーまたはワークスペース ファイルを使用してスクリプトを保存および管理することもできます。

  • このスクリプトをローカル ディスク上の場所に保存します。

    import json
    import requests
    import sys
    
    api_url = "https://api.github.com"
    
    def get_commits(owner, repo, token, path):
      page = 1
      request_url =  f"{api_url}/repos/{owner}/{repo}/commits"
      more = True
    
      get_response(request_url, f"{path}/commits", token)
    
    
    def get_contributors(owner, repo, token, path):
      page = 1
      request_url =  f"{api_url}/repos/{owner}/{repo}/contributors"
      more = True
    
      get_response(request_url, f"{path}/contributors", token)
    
    
    def get_response(request_url, path, token):
      page = 1
      more = True
    
      while more:
        response = requests.get(request_url, params={'page': page}, headers={'Authorization': "token " + token})
        if response.text != "[]":
          write(path + "/records-" + str(page) + ".json", response.text)
          page += 1
        else:
          more = False
    
    
    def write(filename, contents):
      dbutils.fs.put(filename, contents)
    
    
    def main():
      args = sys.argv[1:]
      if len(args) < 6:
        print("Usage: github-api.py owner repo request output-dir secret-scope secret-key")
        sys.exit(1)
    
      owner = sys.argv[1]
      repo = sys.argv[2]
      request = sys.argv[3]
      output_path = sys.argv[4]
      secret_scope = sys.argv[5]
      secret_key = sys.argv[6]
    
      token = dbutils.secrets.get(scope=secret_scope, key=secret_key)
    
      if (request == "commits"):
        get_commits(owner, repo, token, output_path)
      elif (request == "contributors"):
        get_contributors(owner, repo, token, output_path)
    
    
    if __name__ == "__main__":
        main()
    
  • スクリプトを DBFS にアップロードします。

    1. Databricksのランディングページにアクセスし、カタログアイコンサイドバーのカタログ

    2. DBFSの参照をクリックします。

    3. DBFS ファイル ブラウザーで、 [アップロード] をクリックします。 「DBFS にデータをアップロード」ダイアログが表示されます。

    4. スクリプトを保存する DBFS のパスを入力し、アップロードするファイルをドロップするか、参照をクリックして、Python スクリプトを選択します。

    5. [完了] をクリックします。

ステップ 3:Delta Live Tables GitHubデータを処理するための パイプラインを作成する

このセクションでは、Delta Live Tables パイプラインを作成して、生の GitHub データを Databricks SQL クエリで分析できるテーブルに変換します。 パイプラインを作成するには、次のステップを実行します。

  1. サイドバーで新しいアイコン [新規] をクリックし、メニューから [ノートブック] を選択します。[ノートブックの作成] ダイアログが表示されます。

  2. デフォルトの言語で名前を入力し、 Pythonを選択します。 クラスターはデフォルト値のままにしておくことができます。 Delta Live Tables ランタイムは、パイプラインを実行する前にクラスターを作成します。

  3. [作成]をクリックします。

  4. Python コードの例をコピーして、新しいノートブックに貼り付けます。 サンプル コードをノートブックの 1 つのセルまたは複数のセルに追加できます。

    import dlt
    from pyspark.sql.functions import *
    
    def parse(df):
       return (df
         .withColumn("author_date", to_timestamp(col("commit.author.date")))
         .withColumn("author_email", col("commit.author.email"))
         .withColumn("author_name", col("commit.author.name"))
         .withColumn("comment_count", col("commit.comment_count"))
         .withColumn("committer_date", to_timestamp(col("commit.committer.date")))
         .withColumn("committer_email", col("commit.committer.email"))
         .withColumn("committer_name", col("commit.committer.name"))
         .withColumn("message", col("commit.message"))
         .withColumn("sha", col("commit.tree.sha"))
         .withColumn("tree_url", col("commit.tree.url"))
         .withColumn("url", col("commit.url"))
         .withColumn("verification_payload", col("commit.verification.payload"))
         .withColumn("verification_reason", col("commit.verification.reason"))
         .withColumn("verification_signature", col("commit.verification.signature"))
         .withColumn("verification_verified", col("commit.verification.signature").cast("string"))
         .drop("commit")
       )
    
    @dlt.table(
       comment="Raw GitHub commits"
    )
    def github_commits_raw():
      df = spark.read.json(spark.conf.get("commits-path"))
      return parse(df.select("commit"))
    
    @dlt.table(
      comment="Info on the author of a commit"
    )
    def commits_by_author():
      return (
        dlt.read("github_commits_raw")
          .withColumnRenamed("author_date", "date")
          .withColumnRenamed("author_email", "email")
          .withColumnRenamed("author_name", "name")
          .select("sha", "date", "email", "name")
      )
    
    @dlt.table(
      comment="GitHub repository contributors"
    )
    def github_contributors_raw():
      return(
        spark.readStream.format("cloudFiles")
          .option("cloudFiles.format", "json")
          .load(spark.conf.get("contribs-path"))
      )
    
  5. サイドバーで、ワークフローアイコンワークフローでDelta Live Tablesタブをクリックし、 [パイプラインの作成] をクリックします。

  6. パイプラインに名前を付けます (例: Transform GitHub data

  7. ノートブック ライブラリフィールドにノートブックへのパスを入力するか、ファイルピッカーアイコンを押してノートブックを選択します。

  8. [ 設定の追加] をクリックします。 Key テキスト ボックスに「commits-path」と入力します。Valueテキスト ボックスに、GitHub レコードが書き込まれる DBFS パスを入力します。 これは任意のパスを選択でき、ワークフローを作成するときに最初の Python タスクを構成するときに使用するパスと同じです。

  9. 構成の追加 」を再度クリックします。 Key テキスト ボックスに「contribs-path」と入力します。Valueテキスト ボックスに、GitHub レコードが書き込まれる DBFS パスを入力します。 これは任意のパスを選択でき、ワークフローを作成するときに 2 番目の Python タスクを構成するときに使用するパスと同じです。

  10. ターゲット 」フィールドに、ターゲット・データベース ( github_tablesなど) を入力します。 ターゲット データベースを設定すると、出力データがメタストアに公開され、パイプラインによって生成されたデータを分析する下流のクエリに必要になります。

  11. [保存]をクリックします。

ステップ 4: GitHub データを取り込んで変換するためのワークフローを作成する

Databricks SQL を使用して GitHub データを分析および視覚化する前に、データを取り込んで準備する必要があります。 これらのタスクを完了するためのワークフローを作成するには、次のステップを実行します。

Databricks ジョブを作成し、最初のタスクを追加する

  1. Databricksページに移動し、次のいずれかを実行します。

    • サイドバーで、ワークフローアイコンワークフローとクリック「ジョブを作成」ボタン

    • サイドバーで、新しいアイコン新規をクリックし、メニューからジョブを選択します。

  2. [タスク] タブに表示される [タスク] ダイアログ ボックスで、 [ジョブの名前を追加]をジョブ名 (例: GitHub analysis workflowに置き換えます。

  3. [タスク名] にタスクの名前を入力します(例:get_commits)。

  4. タイプで、 Python スクリプトを選択します。

  5. ソースで、 DBFS / S3を選択します。

  6. パスに、DBFS 内のスクリプトへのパスを入力します。

  7. そこで、 Pythonスクリプトに次の引数を入力します。

    ["<owner>","<repo>","commits","<DBFS-output-dir>","<scope-name>","<github-token-key>"]

    • <owner>リポジトリ所有者の名前に置き換えます。 たとえば、 github.com/databrickslabs/overwatchリポジトリからレコードを取得するには、 databrickslabsと入力します。

    • <repo>をリポジトリ名に置き換えます(例: overwatch

    • GitHub から取得したレコードを保存するには、 <DBFS-output-dir> DBFS 内のパスに置き換えます。

    • <scope-name> を、 GitHubウイルスを保存するために作成したシークレットスコープの名前に置き換えます。

    • <github-token-key>を、GitHub トークンに割り当てたキーの名前に置き換えます。

  8. タスクを保存をクリックします。

別のタスクを追加する

  1. クリック「タスクを追加」ボタン作成したタスクの下にあります。

  2. [タスク名] にタスクの名前を入力します(例:get_contributors)。

  3. 「タイプ」で、 Python スクリプトタスク タイプを選択します。

  4. ソースで、 DBFS / S3を選択します。

  5. パスに、DBFS 内のスクリプトへのパスを入力します。

  6. そこで、 Pythonスクリプトに次の引数を入力します。

    ["<owner>","<repo>","contributors","<DBFS-output-dir>","<scope-name>","<github-token-key>"]

    • <owner>リポジトリ所有者の名前に置き換えます。 たとえば、 github.com/databrickslabs/overwatchリポジトリからレコードを取得するには、 databrickslabsと入力します。

    • <repo>をリポジトリ名に置き換えます(例: overwatch

    • GitHub から取得したレコードを保存するには、 <DBFS-output-dir> DBFS 内のパスに置き換えます。

    • <scope-name> を、 GitHubウイルスを保存するために作成したシークレットスコープの名前に置き換えます。

    • <github-token-key>を、GitHub トークンに割り当てたキーの名前に置き換えます。

  7. タスクを保存をクリックします。

データを変換するタスクを追加する

  1. クリック「タスクを追加」ボタン作成したタスクの下にあります。

  2. [タスク名] にタスクの名前を入力します(例:transform_github_data)。

  3. [タイプ][Delta Live Tables パイプライン]を選択し、タスクの名前を入力します。

  4. [パイプライン] で、 「ステップ 3:Delta Live TablesGitHub データを処理するための パイプラインの作成」 で作成したパイプラインを選択します。

  5. [作成]をクリックします。

ステップ 5: データ変換ワークフローを実行する

クリック「今すぐ実行」ボタンワークフローを実行します。 実行の詳細 を表示するには、 ジョブ実行 ビューで実行の 開始時刻 列のリンクをクリックします。各タスクをクリックすると、タスク実行の詳細が表示されます。

ステップ 6: (オプション) ワークフロー実行の完了後に出力データを表示するには、次のステップを実行します。

  1. 実行の詳細ビューで、Delta Live Tables タスクをクリックします。

  2. [タスク実行の詳細]パネルで、 [パイプライン]の下にあるパイプライン名をクリックします。 パイプラインの詳細ページが表示されます。

  3. パイプライン DAG 内のcommits_by_authorテーブルを選択します。

  4. コミットパネルのMetastoreの横にあるテーブル名をクリックします。 「カタログエクスプローラ」(Catalog Explorer) ページが開きます。

カタログ エクスプローラーでは、テーブル スキーマ、サンプル データ、およびデータのその他の詳細を表示できます。 同じステップに従って、 github_contributors_rawテーブルのデータを表示します。

ステップ 7: GitHub データを削除する

実際のアプリケーションでは、データを継続的に取り込み、処理している可能性があります。 この例ではデータ セット全体をダウンロードして処理するため、ワークフローを再実行するときにエラーが発生しないように、すでにダウンロードした GitHub データを削除する必要があります。 ダウンロードしたデータを削除するには、次のステップを実行します。

  1. 新しいノートブックを作成し、最初のセルに次のコマンドを入力します。

    dbutils.fs.rm("<commits-path", True)
    dbutils.fs.rm("<contributors-path", True)
    

    <commits-path><contributors-path>を、Python タスクの作成時に構成した DBFS パスに置き換えます。

  2. クリック実行メニューセルの実行を選択します。

このノートブックをワークフローのタスクとして追加することもできます。

ステップ 8: Databricks SQL クエリーを作成する

ワークフローを実行して必要なテーブルを作成したら、準備したデータを分析するためのクエリを作成します。 サンプルクエリと視覚化を作成するには、次の手順を実行します。

月別の上位 10 人の貢献者を表示する

  1. Databricksロゴの下のアイコンをクリックしますDatabricksロゴサイドバーでSQLを選択します。

  2. [クエリの作成] をクリックして、Databricks SQL クエリ エディターを開きます。

  3. カタログが hive_metastore に設定されていることを確認します。 hive_metastoreの横にあるデフォルトをクリックし、データベースを Delta Live Tables パイプラインで設定したターゲット値に設定します。

  4. [新しいクエリ]タブで、次のクエリを入力します。

    SELECT
      date_part('YEAR', date) AS year,
      date_part('MONTH', date) AS month,
      name,
      count(1)
    FROM
      commits_by_author
    WHERE
      name IN (
        SELECT
          name
        FROM
          commits_by_author
        GROUP BY
          name
        ORDER BY
          count(name) DESC
        LIMIT 10
      )
      AND
        date_part('YEAR', date) >= 2022
    GROUP BY
      name, year, month
    ORDER BY
      year, month, name
    
  5. [新しいクエリ]タブをクリックし、クエリの名前を変更します (例: Commits by month top 10 contributors

  6. デフォルトでは、結果は表として表示されます。 データの視覚化方法を変更するには (たとえば、棒グラフを使用するなどして) 、[ 結果 ] パネルで [ ケバブメニュー ] をクリックし、[ 編集] をクリックします。

  7. [視覚化の種類] で、 [棒] を選択します。

  8. [X] 列[month] を選択します。

  9. [Y 列] で [count(1)] を選択します。

  10. 「グループ化」で、名前を選択します。

  11. [保存]をクリックします。

上位 20 人の貢献者を表示する

  1. [+ > [新しいクエリの作成] をクリックし、カタログが hive_metastore に設定されていることを確認します。hive_metastoreの横にあるデフォルトをクリックし、データベースを Delta Live Tables パイプラインで設定したターゲット値に設定します。

  2. 次のクエリを入力します。

    SELECT
      login,
      cast(contributions AS INTEGER)
    FROM
      github_contributors_raw
    ORDER BY
      contributions DESC
    LIMIT 20
    
  3. [新しいクエリ]タブをクリックし、クエリの名前を変更します (例: Top 20 contributors

  4. デフォルトのテーブルから視覚化を変更するには、結果パネルで、ケバブメニュー編集をクリックします。

  5. [視覚化の種類] で、 [棒] を選択します。

  6. [X] 列[login] を選択します。

  7. [Y 列] で、投稿を選択します

  8. [保存]をクリックします。

作成者別のコミットの合計を表示する

  1. [+ > [新しいクエリの作成] をクリックし、カタログが hive_metastore に設定されていることを確認します。hive_metastoreの横にあるデフォルトをクリックし、データベースを Delta Live Tables パイプラインで設定したターゲット値に設定します。

  2. 次のクエリを入力します。

    SELECT
      name,
      count(1) commits
    FROM
      commits_by_author
    GROUP BY
      name
    ORDER BY
      commits DESC
    LIMIT 10
    
  3. [新しいクエリ]タブをクリックし、クエリの名前を変更します (例: Total commits by author

  4. デフォルトのテーブルから視覚化を変更するには、結果パネルで、ケバブメニュー編集をクリックします。

  5. [視覚化の種類] で、 [棒] を選択します。

  6. [X] 列[名前] を選択します。

  7. Y 列で、コミットを選択します。

  8. [保存]をクリックします。

ステップ 9: ダッシュボードを作成する

  1. サイドバーで、「ダッシュボードアイコン ダッシュボード 」をクリックします

  2. [ ダッシュボードの作成] をクリックします。

  3. ダッシュボードの名前を入力します (例: GitHub analysis)。

  4. 手順 8: Databricks SQLクエリを作成する で作成したクエリと視覚化ごとに、 [追加] > [視覚化]をクリックし、各視覚化を選択します。

ステップ 10: ワークフローに SQL タスクを追加する

「Databricks ジョブの作成」で作成したワークフローに新しいクエリ タスクを追加し、 「ステップ 8:Databricks SQL を作成する」 Databricksで作成したクエリごとに のタスクを追加する には、次の手順を実行します。

  1. サイドバーのワークフローアイコンワークフロー]をクリックします。

  2. 「名前」列で、ジョブ名をクリックします。

  3. 「タスク」タブをクリックします。

  4. クリック「タスクを追加」ボタン最後のタスクの下にあります。

  5. タスクの名前を入力し、タイプSQLを選択し、 SQL タスククエリを選択します。

  6. SQL クエリでクエリを選択します。

  7. SQLウェアハウスで、タスクを実行するサーバレスSQLウェアハウスまたはプロSQLウェアハウスを選択します。

  8. [作成]をクリックします。

ステップ 11: ダッシュボードタスクを追加する

  1. クリック「タスクを追加」ボタン最後のタスクの下にあります。

  2. タスクの名前を入力し、 [タイプ][SQL]を選択し、 [SQL タスク][レガシー ダッシュボード]を選択します。

  3. 「ステップ 9: ダッシュボードを作成する」で作成したダッシュボードを選択します。

  4. SQLウェアハウスで、タスクを実行するサーバレスSQLウェアハウスまたはプロSQLウェアハウスを選択します。

  5. [作成]をクリックします。

ステップ 12: 完全なワークフローを実行する

ワークフローを実行するには、「今すぐ実行」ボタン実行の詳細 を表示するには、 ジョブ実行 ビューで実行の 開始時刻 列のリンクをクリックします。

ステップ 13: 結果を表示する

実行が完了したときに結果を表示するには、最後のダッシュボード タスクをクリックし、右側のパネルのSQL ダッシュボードの下のダッシュボード名をクリックします。