メインコンテンツまでスキップ

Git フォルダまたはワークスペース ファイルから Python モジュールをインポートする

PythonコードをDatabricks Gitフォルダまたはワークスペースファイルに保存し、そのPythonコードをLakeFlow宣言型パイプラインにインポートできます。Git フォルダーまたはワークスペース ファイル内のモジュールの操作の詳細については、「Python モジュールと R モジュールの操作」を参照してください。

注記

Databricks Git フォルダーまたはワークスペース ファイルに格納されているノートブックからソース コードをインポートすることはできません。代わりに、パイプラインを作成または編集するときにノートブックを直接追加します。「LakeFlow宣言型パイプラインの構成」を参照してください。

Python モジュールを LakeFlow 宣言型パイプライン ノートブックにインポートする

次の例は、ワークスペース ファイルからデータセット クエリを Python モジュールとしてインポートする方法を示しています。 この例では、ワークスペース ファイルを使用してパイプラインのソース コードを格納する方法について説明しますが、Git フォルダーに格納されたソース コードでも使用できます。

この例を実行するには、次の手順に従います。

  1. ワークスペースのアイコン。 ワークスペースのサイドバーにある ワークスペース Databricksをクリックして、ワークスペースブラウザを開きます。

  2. ワークスペース ブラウザーを使用して、Python モジュールのディレクトリを選択します。

  3. 選択したディレクトリの右端の列のケバブメニューのアイコン。をクリックし、 作成 > ファイル をクリックします。

  4. ファイルの名前を入力します (例: clickstream_raw_module.py)。 ファイルエディタが開きます。 ソース・データをテーブルに読み込むモジュールを作成するには、エディタ・ウィンドウで次のように入力します。

    Python
    from dlt import *

    json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"

    def create_clickstream_raw_table(spark):
    @table
    def clickstream_raw():
    return (
    spark.read.json(json_path)
    )
  5. 準備済みデータを含む新しいテーブルを作成するモジュールを作成するには、同じディレクトリに新しいファイルを作成し、ファイルの名前 ( clickstream_prepared_module.pyなど) を入力して、新しいエディターウィンドウで次のように入力します。

    Python
    from clickstream_raw_module import *
    from dlt import read
    from pyspark.sql.functions import *
    from pyspark.sql.types import *

    def create_clickstream_prepared_table(spark):
    create_clickstream_raw_table(spark)
    @table
    @expect("valid_current_page_title", "current_page_title IS NOT NULL")
    @expect_or_fail("valid_count", "click_count > 0")
    def clickstream_prepared():
    return (
    read("clickstream_raw")
    .withColumn("click_count", expr("CAST(n AS INT)"))
    .withColumnRenamed("curr_title", "current_page_title")
    .withColumnRenamed("prev_title", "previous_page_title")
    .select("current_page_title", "click_count", "previous_page_title")
    )
  6. 次に、パイプライン ノートブックを作成します。 の [ワークスペース] で [Databricks ノートブックを作成] を選択するか、サイドバーで [新しいアイコン 新規] をクリックして[ノートブック] 選択します。 ワークスペース ブラウザーで [ ケバブメニューのアイコン。 ] をクリックし、[ノートブックの作成] をクリックしてノートブック を作成することもできます>

  7. ノートブックに名前を付け、 Python がデフォルトの言語であることを確認します。

  8. 作成 をクリックします。

  9. ノートブックに次のコード例を入力します。

注記

ノートブックがワークスペースのファイルパスまたはノートブックディレクトリとは異なる Git フォルダのパスからモジュールまたはパッケージをインポートする場合は、sys.path.append()を使用してファイルにパスを手動で追加する必要があります。

Git フォルダからファイルをインポートする場合は、パスの先頭に /Workspace/ を追加する必要があります。 たとえば、 sys.path.append('/Workspace/...'). パスから /Workspace/ を省略すると、エラーになります。

モジュールまたはパッケージがノートブックと同じディレクトリに保存されている場合は、パスを手動で追加する必要はありません。 また、Git フォルダーのルート ディレクトリからインポートする場合は、ルート ディレクトリがパスに自動的に追加されるため、パスを手動で追加する必要もありません。

Python
import sys, os
# You can omit the sys.path.append() statement when the imports are from the same directory as the notebook.
sys.path.append(os.path.abspath('<module-path>'))

import dlt
from clickstream_prepared_module import *
from pyspark.sql.functions import *
from pyspark.sql.types import *

create_clickstream_prepared_table(spark)

@dlt.table(
comment="A table containing the top pages linking to the Apache Spark page."
)
def top_spark_referrers():
return (
spark.read.table("catalog_name.schema_name.clickstream_prepared")
.filter(expr("current_page_title == 'Apache_Spark'"))
.withColumnRenamed("previous_page_title", "referrer")
.sort(desc("click_count"))
.select("referrer", "click_count")
.limit(10)
)

<module-path> を、インポートする Python モジュールを含むディレクトリへのパスに置き換えます。 10. 新しいノートブックを使用してパイプラインを作成します。

  1. パイプラインを実行するには、 パイプラインの詳細 ページで 開始 をクリックします。

Pythonコードをパッケージとしてインポートすることもできます。次のコード スニペットは、LakeFlow 宣言型パイプライン ノートブックから、ノートブックと同じディレクトリ内の dlt_packages ディレクトリから test_utils パッケージをインポートします。dlt_packages ディレクトリには test_utils.py ファイルと __init__.pyファイルが含まれており、 test_utils.py で関数 ID が定義されていますcreate_test_table()

Python
import dlt

@dlt.table
def my_table():
return spark.read.table(...)

# ...

import dlt_packages.test_utils as test_utils
test_utils.create_test_table(spark)