メインコンテンツまでスキップ

Git フォルダまたはワークスペース ファイルから Python モジュールをインポートする

PythonコードをDatabricks Gitフォルダまたはワークスペースファイルに保存し、そのPythonコードをLakeflow宣言型パイプラインにインポートできます。Git フォルダーまたはワークスペース ファイル内のモジュールの操作の詳細については、「Python モジュールと R モジュールの操作」を参照してください。

Python ファイルをインポートするには、複数のオプションがあります。

  • Python モジュールをユーティリティ ファイルとしてパイプラインに含めます。これは、モジュールがパイプラインに固有のものである場合に最適に機能します。
  • 共有モジュールを、それを使用する必要があるすべてのパイプラインのパイプライン環境に追加します。
  • importステートメントを使用して、ワークスペース内のモジュールを Python ソースに直接インポートします。

パイプラインにPythonモジュールを含める

パイプラインの一部として Python モジュールを作成できます。パイプラインのルート フォルダーはsys.pathに自動的に追加されます。これにより、パイプラインの Python ソース コード内でモジュールを直接参照できるようになります。

次の例は、パイプラインのルート フォルダーの下に Python モジュールを作成し、パイプライン ソース内の Python ソース ファイルからそれを参照する方法を示しています。

  1. パイプライン エディターでパイプラインを開きます。

  2. 左側のパイプラインアセットブラウザから、プラスアイコン。 を追加し 、メニューから 「ユーティリティ」 を選択します。

  3. 名前my_utils.py入力します。

  4. デフォルトのパスをそのままにして、 「作成」を クリックします。

    これにより、パイプラインのutilitiesフォルダーにmy_utils.pyファイルが作成され、存在しない場合はutilitiesフォルダーが作成されます。このフォルダー内のファイルはデフォルトではパイプライン ソースに追加されませんが、ソース コードの一部である.pyファイルから呼び出すことができます。

    デフォルトでは、ユーティリティ ファイルには、距離をマイル単位で取得して変換するdistance_km()というサンプル関数が含まれています。

  5. 変換フォルダ内のPythonソースファイル(以下を選択して作成できます)プラスアイコン。 追加 をクリックし 、メニューから 変換 を選択して、次のコードを追加します。

    Python
    from utilities import my_utils

これで、その Python ファイルからmy_utils内の関数を呼び出すことができます。モジュール内の関数を呼び出す必要があるすべての Python ファイルからimportステートメントを追加する必要があります。

パイプライン環境にPythonモジュールを追加する

複数のパイプライン間でPythonモジュールを共有したい場合は、ワークスペース ファイル内の任意の場所にモジュールを保存し、それを使用する必要があるパイプラインの環境からそのモジュールを参照できます。 次のような Python モジュールを参照できます。

  • 個々の Python ( .py ) ファイル。
  • Pythonプロジェクトは、 Python wheel ( .whl ) ファイルとしてパッケージ化されています。
  • pyproject.tomlファイルを含むパッケージ化されていない Python プロジェクト (プロジェクト名とバージョンを定義するため)。

次の例は、パイプラインに依存関係を追加する方法を示しています。

  1. パイプライン エディターでパイプラインを開きます。

  2. クリック歯車アイコン。上部のバーから 設定します

  3. パイプライン設定 のスライドアウトの パイプライン環境 の下にある鉛筆アイコン。 編集環境

  4. 依存関係を追加します。たとえば、ワークスペースにファイルを追加するには、 /Volumes/libraries/path/to/python_files/file.pyを追加します。Gitフォルダーに保存されているPython wheelの場合、パスは/Workspace/libraries/path/to/wheel_files/file.whlのようになります。

    ファイルがパイプラインのルート フォルダーにある場合は、パスなしまたは相対パスでファイルを追加できます。

注記

依存関係を使用して共有フォルダーへのパスを追加し、コード内のimportステートメントでインポートするモジュールを見つけられるようにすることもできます。たとえば、 -e /Workspace/Users/<user_name>/path/to/add/

importステートメントを使用して Python モジュールをインポートします

Python ソース コード内でワークスペース ファイルを直接参照することもできます。

  • ファイルがパイプラインのutilitiesフォルダ内にある場合は、パスなしで参照できます。

    Python
    from utilities import my_module
  • ファイルが他の場所にある場合は、まずモジュールのパスをsys.pathに追加してインポートできます。

    Python
    import sys, os
    sys.path.append(os.path.abspath('<module-path>'))

    from my_module import *
  • 前のセクションで説明したように、パイプライン環境へのパスを追加することで、すべてのパイプライン ソース ファイルのsys.pathに追加することもできます。

クエリをPythonモジュールとしてインポートする例

次の例は、ワークスペース ファイルからデータセット クエリを Python モジュールとしてインポートする方法を示しています。 この例では、ワークスペース ファイルを使用してパイプラインのソース コードを格納する方法について説明しますが、Git フォルダーに格納されたソース コードでも使用できます。

この例を実行するには、次の手順に従います。

  1. ワークスペースのアイコン。 ワークスペースのサイドバーにある ワークスペース Databricksをクリックして、ワークスペースブラウザを開きます。

  2. ワークスペース ブラウザーを使用して、Python モジュールのディレクトリを選択します。

  3. 選択したディレクトリの右端の列のケバブメニューのアイコン。をクリックし、 作成 > ファイル をクリックします。

  4. ファイルの名前を入力します (例: clickstream_raw_module.py)。 ファイルエディタが開きます。 ソース・データをテーブルに読み込むモジュールを作成するには、エディタ・ウィンドウで次のように入力します。

    Python
    from dlt import *

    json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"

    def create_clickstream_raw_table(spark):
    @table
    def clickstream_raw():
    return (
    spark.read.json(json_path)
    )
  5. 準備済みデータを含む新しいテーブルを作成するモジュールを作成するには、同じディレクトリに新しいファイルを作成し、ファイルの名前 ( clickstream_prepared_module.pyなど) を入力して、新しいエディターウィンドウで次のように入力します。

    Python
    from clickstream_raw_module import *
    from dlt import read
    from pyspark.sql.functions import *
    from pyspark.sql.types import *

    def create_clickstream_prepared_table(spark):
    create_clickstream_raw_table(spark)
    @table
    @expect("valid_current_page_title", "current_page_title IS NOT NULL")
    @expect_or_fail("valid_count", "click_count > 0")
    def clickstream_prepared():
    return (
    read("clickstream_raw")
    .withColumn("click_count", expr("CAST(n AS INT)"))
    .withColumnRenamed("curr_title", "current_page_title")
    .withColumnRenamed("prev_title", "previous_page_title")
    .select("current_page_title", "click_count", "previous_page_title")
    )
  6. 次に、パイプライン ソースに Python ファイルを作成します。パイプラインエディタから、プラスアイコン。 を追加して 、変換します。

  7. ファイルに名前を付け、 Python がデフォルトの言語であることを確認します。

  8. 作成 をクリックします。

  9. ノートブックに次のコード例を入力します。

注記

ノートブックがワークスペースのファイルパスまたはノートブックディレクトリとは異なる Git フォルダのパスからモジュールまたはパッケージをインポートする場合は、sys.path.append()を使用してファイルにパスを手動で追加する必要があります。

Git フォルダからファイルをインポートする場合は、パスの先頭に /Workspace/ を追加する必要があります。 たとえば、 sys.path.append('/Workspace/...'). パスから /Workspace/ を省略すると、エラーになります。

モジュールまたはパッケージがノートブックと同じディレクトリに保存されている場合は、パスを手動で追加する必要はありません。 また、Git フォルダーのルート ディレクトリからインポートする場合は、ルート ディレクトリがパスに自動的に追加されるため、パスを手動で追加する必要もありません。

Python
import sys, os
sys.path.append(os.path.abspath('<module-path>'))

import dlt
from clickstream_prepared_module import *
from pyspark.sql.functions import *
from pyspark.sql.types import *

create_clickstream_prepared_table(spark)

@dlt.table(
comment="A table containing the top pages linking to the Apache Spark page."
)
def top_spark_referrers():
return (
spark.read.table("catalog_name.schema_name.clickstream_prepared")
.filter(expr("current_page_title == 'Apache_Spark'"))
.withColumnRenamed("previous_page_title", "referrer")
.sort(desc("click_count"))
.select("referrer", "click_count")
.limit(10)
)

<module-path> を、インポートする Python モジュールを含むディレクトリへのパスに置き換えます。 10. パイプラインを実行するには、「 パイプライン実行」を クリックします。