メインコンテンツまでスキップ

Git フォルダまたはワークスペース ファイルから Python モジュールをインポートする

Python コードを Databricks Git フォルダー または ワークスペース ファイルに 格納し、その Python コードを DLT パイプラインにインポートできます。Git フォルダーまたはワークスペース ファイル内のモジュールの操作の詳細については、「Python モジュールと R モジュールの操作」を参照してください。

注記

Databricks Git フォルダーまたはワークスペース ファイルに格納されているノートブックからソース コードをインポートすることはできません。 代わりに、パイプラインを作成または編集するときにノートブックを直接追加します。 DLT パイプラインの設定を参照してください。

Python モジュールを DLT パイプラインにインポートする

次の例は、ワークスペース ファイルからデータセット クエリを Python モジュールとしてインポートする方法を示しています。 この例では、ワークスペース ファイルを使用してパイプラインのソース コードを格納する方法について説明しますが、Git フォルダーに格納されたソース コードでも使用できます。

この例を実行するには、次の手順に従います。

  1. ワークスペースアイコン ワークスペースのサイドバーにある ワークスペース Databricksをクリックして、ワークスペースブラウザを開きます。

  2. ワークスペース ブラウザーを使用して、Python モジュールのディレクトリを選択します。

  3. 選択したディレクトリの右端の列のケバブメニューをクリックし、 作成 > ファイル をクリックします。

  4. ファイルの名前を入力します (例: clickstream_raw_module.py)。 ファイルエディタが開きます。 ソース・データをテーブルに読み込むモジュールを作成するには、エディタ・ウィンドウで次のように入力します。

    Python
    from dlt import *

    json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"

    def create_clickstream_raw_table(spark):
    @table
    def clickstream_raw():
    return (
    spark.read.json(json_path)
    )
  5. 準備済みデータを含む新しいテーブルを作成するモジュールを作成するには、同じディレクトリに新しいファイルを作成し、ファイルの名前 ( clickstream_prepared_module.pyなど) を入力して、新しいエディターウィンドウで次のように入力します。

    Python
    from clickstream_raw_module import *
    from dlt import read
    from pyspark.sql.functions import *
    from pyspark.sql.types import *

    def create_clickstream_prepared_table(spark):
    create_clickstream_raw_table(spark)
    @table
    @expect("valid_current_page_title", "current_page_title IS NOT NULL")
    @expect_or_fail("valid_count", "click_count > 0")
    def clickstream_prepared():
    return (
    read("clickstream_raw")
    .withColumn("click_count", expr("CAST(n AS INT)"))
    .withColumnRenamed("curr_title", "current_page_title")
    .withColumnRenamed("prev_title", "previous_page_title")
    .select("current_page_title", "click_count", "previous_page_title")
    )
  6. 次に、パイプライン ノートブックを作成します。 の [ワークスペース] で [Databricks ノートブックを作成] を選択するか、サイドバーで [新しいアイコン 新規] をクリックして[ノートブック] 選択します。 ワークスペース ブラウザーで [ ケバブメニュー ] をクリックし、[ノートブックの作成] をクリックしてノートブック を作成することもできます>

  7. ノートブックに名前を付け、 Python がデフォルトの言語であることを確認します。

  8. 作成 をクリックします。

  9. ノートブックにサンプルコードを入力します。

注記

ノートブックがワークスペースのファイルパスまたはノートブックディレクトリとは異なる Git フォルダのパスからモジュールまたはパッケージをインポートする場合は、sys.path.append()を使用してファイルにパスを手動で追加する必要があります。

Git フォルダからファイルをインポートする場合は、パスの先頭に /Workspace/ を追加する必要があります。 たとえば、 sys.path.append('/Workspace/...'). パスから /Workspace/ を省略すると、エラーになります。

モジュールまたはパッケージがノートブックと同じディレクトリに保存されている場合は、パスを手動で追加する必要はありません。 また、Git フォルダーのルート ディレクトリからインポートする場合は、ルート ディレクトリがパスに自動的に追加されるため、パスを手動で追加する必要もありません。

Python
import sys, os
# You can omit the sys.path.append() statement when the imports are from the same directory as the notebook.
sys.path.append(os.path.abspath('<module-path>'))

import dlt
from clickstream_prepared_module import *
from pyspark.sql.functions import *
from pyspark.sql.types import *

create_clickstream_prepared_table(spark)

@dlt.table(
comment="A table containing the top pages linking to the Apache Spark page."
)
def top_spark_referrers():
return (
spark.read.table("catalog_name.schema_name.clickstream_prepared")
.filter(expr("current_page_title == 'Apache_Spark'"))
.withColumnRenamed("previous_page_title", "referrer")
.sort(desc("click_count"))
.select("referrer", "click_count")
.limit(10)
)

<module-path> を、インポートする Python モジュールを含むディレクトリへのパスに置き換えます。 10. 新しいノートブックを使用してパイプラインを作成します。

  1. パイプラインを実行するには、 パイプラインの詳細 ページで 開始 をクリックします。

Pythonコードをパッケージとしてインポートすることもできます。 DLT ノートブックの次のコード スニペットは、ノートブックと同じディレクトリ内の dlt_packages ディレクトリから test_utils パッケージをインポートします。dlt_packages ディレクトリには test_utils.py ファイルと __init__.pyファイルが含まれており、 test_utils.py で関数 ID が定義されていますcreate_test_table()

Python
import dlt

@dlt.table
def my_table():
return spark.read.table(...)

# ...

import dlt_packages.test_utils as test_utils
test_utils.create_test_table(spark)