Git フォルダまたはワークスペース ファイルから Python モジュールをインポートする

Python コードをDatabricks Git フォルダーまたはワークスペース ファイルに保存し、その Python コードを Delta Live Tables パイプラインにインポートできます。 Gitフォルダーまたはワークスペース ファイル内のモジュールの操作の詳細については、 Pythonおよび R モジュールの操作」を参照してください。

Databricks Git フォルダーまたはワークスペース ファイルに保存されているノートブックからソース コードをインポートすることはできません。 代わりに、パイプラインを作成または編集するときにノートブックを直接追加します。 「パイプラインを作成する」を参照してください。

Python モジュールを Delta Live Tables パイプラインにインポートする

次の例は、ワークスペース ファイルからデータ セット クエリを Python モジュールとしてインポートする方法を示しています。 この例では、ワークスペース ファイルを使用してパイプラインのソース コードを保存する方法について説明していますが、Git フォルダーに保存されているソース コードでも使用できます。

この例を実行するには、次の手順を使用します。

  1. ワークスペースアイコンクリック のサイドバーにある ワークスペース Databricksをクリックして、 ワークスペース ブラウザーを開きます。

  2. ワークスペース ブラウザを使用して、Python モジュールのディレクトリを選択します。

  3. 選択したディレクトリの右端の列をクリックしケバブメニュー、「>ファイルの作成」をクリックします。

  4. ファイルの名前を入力します (例: clickstream_raw_module.py)。 ファイルエディタが開きます。 ソース データをテーブルに読み込むモジュールを作成するには、エディター ウィンドウに次のように入力します。

    from dlt import *
    
    json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
    
    def create_clickstream_raw_table(spark):
      @table
      def clickstream_raw():
        return (
          spark.read.json(json_path)
        )
    
  5. 準備されたデータを含む新しいテーブルを作成するモジュールを作成するには、同じディレクトリに新しいファイルを作成し、ファイルの名前( clickstream_prepared_module.pyなど)を入力して、新しいエディタウィンドウに次のように入力します。

    from clickstream_raw_module import *
    from dlt import read
    from pyspark.sql.functions import *
    from pyspark.sql.types import *
    
    def create_clickstream_prepared_table(spark):
      create_clickstream_raw_table(spark)
      @table
      @expect("valid_current_page_title", "current_page_title IS NOT NULL")
      @expect_or_fail("valid_count", "click_count > 0")
      def clickstream_prepared():
        return (
          read("clickstream_raw")
            .withColumn("click_count", expr("CAST(n AS INT)"))
            .withColumnRenamed("curr_title", "current_page_title")
            .withColumnRenamed("prev_title", "previous_page_title")
            .select("current_page_title", "click_count", "previous_page_title")
        )
    
  6. 次に、パイプライン ノートブックを作成します。 Databricksページに移動し、データベースの作成を選択するか、新しいアイコンサイドバーで「New」を選択し、 「書籍」を選択します。 ワークスペースブラウザで「ノートブック」を作成することもできます。ケバブメニュー [作成] > [ユーザー名]をクリックします。

  7. ノートブックに名前を付け、 Python がデフォルトの言語であることを確認します。

  8. [作成]をクリックします。

  9. ノートブックにサンプル コードを入力します。

    ノートブックが、ノートブック ディレクトリとは異なるワークスペース ファイル パスまたは Git フォルダー パスからモジュールまたはパッケージをインポートする場合は、 sys.path.append()を使用してファイルにパスを手動で追加する必要があります。

    Git フォルダーからファイルをインポートする場合は、パスの先頭に/Workspace/を追加する必要があります。 たとえば、 sys.path.append('/Workspace/...')です。 パスから /Workspace/ を省略すると、エラーが発生します。

    モジュールまたはパッケージがノートブックと同じディレクトリに保存されている場合は、パスを手動で追加する必要はありません。 また、Git フォルダーのルート ディレクトリからインポートする場合は、ルート ディレクトリがパスに自動的に追加されるため、手動でパスを追加する必要はありません。

    import sys, os
    # You can omit the sys.path.append() statement when the imports are from the same directory as the notebook.
    sys.path.append(os.path.abspath('<module-path>'))
    
    import dlt
    from clickstream_prepared_module import *
    from pyspark.sql.functions import *
    from pyspark.sql.types import *
    
    create_clickstream_prepared_table(spark)
    
    @dlt.table(
      comment="A table containing the top pages linking to the Apache Spark page."
    )
    def top_spark_referrers():
      return (
        dlt.read("clickstream_prepared")
          .filter(expr("current_page_title == 'Apache_Spark'"))
          .withColumnRenamed("previous_page_title", "referrer")
          .sort(desc("click_count"))
          .select("referrer", "click_count")
          .limit(10)
      )
    

    <module-path>を、インポートする Python モジュールを含むディレクトリへのパスに置き換えます。

  10. 新しいノートブックを使用してパイプラインを作成します。

  11. パイプラインを実行するには、[ パイプラインの詳細 ] ページで [ 開始] をクリックします。

Python コードをパッケージとしてインポートすることもできます。 Delta Live Tables ノートブックからの次のコード スニペットは、ノートブックと同じディレクトリ内のdlt_packagesディレクトリからtest_utilsパッケージをインポートします。 dlt_packages ディレクトリには、ファイル test_utils.py__init__.pyが含まれており、test_utils.py は関数create_test_table()を定義します。

import dlt

@dlt.table
def my_table():
  return dlt.read(...)

# ...

import dlt_packages.test_utils as test_utils
test_utils.create_test_table(spark)