Git フォルダまたはワークスペース ファイルから Python モジュールをインポートする
PythonコードをDatabricks Gitフォルダまたはワークスペースファイルに保存し、そのPythonコードをLakeflow宣言型パイプラインにインポートできます。Git フォルダーまたはワークスペース ファイル内のモジュールの操作の詳細については、「Python モジュールと R モジュールの操作」を参照してください。
Python ファイルをインポートするには、複数のオプションがあります。
- Python モジュールをユーティリティ ファイルとしてパイプラインに含めます。これは、モジュールがパイプラインに固有のものである場合に最適に機能します。
- 共有モジュールを、それを使用する必要があるすべてのパイプラインのパイプライン環境に追加します。
- importステートメントを使用して、ワークスペース内のモジュールを Python ソースに直接インポートします。
パイプラインにPythonモジュールを含める
パイプラインの一部として Python モジュールを作成できます。パイプラインのルート フォルダーはsys.pathに自動的に追加されます。これにより、パイプラインの Python ソース コード内でモジュールを直接参照できるようになります。
次の例は、パイプラインのルート フォルダーの下に Python モジュールを作成し、パイプライン ソース内の Python ソース ファイルからそれを参照する方法を示しています。
- 
パイプライン エディターでパイプラインを開きます。 
- 
左側のパイプラインアセットブラウザから、 を追加し 、メニューから 「ユーティリティ」 を選択します。 
- 
名前 に my_utils.py入力します。
- 
デフォルトのパスをそのままにして、 「作成」を クリックします。 これにより、パイプラインの utilitiesフォルダーにmy_utils.pyファイルが作成され、存在しない場合はutilitiesフォルダーが作成されます。このフォルダー内のファイルはデフォルトではパイプライン ソースに追加されませんが、ソース コードの一部である.pyファイルから呼び出すことができます。デフォルトでは、ユーティリティ ファイルには、距離をマイル単位で取得して変換する distance_km()というサンプル関数が含まれています。
- 
変換フォルダ内のPythonソースファイル(以下を選択して作成できます) 追加 をクリックし 、メニューから 変換 を選択して、次のコードを追加します。 Pythonfrom utilities import my_utils
これで、その Python ファイルからmy_utils内の関数を呼び出すことができます。モジュール内の関数を呼び出す必要があるすべての Python ファイルからimportステートメントを追加する必要があります。
パイプライン環境にPythonモジュールを追加する
複数のパイプライン間でPythonモジュールを共有したい場合は、ワークスペース ファイル内の任意の場所にモジュールを保存し、それを使用する必要があるパイプラインの環境からそのモジュールを参照できます。 次のような Python モジュールを参照できます。
- 個々の Python ( .py) ファイル。
- Pythonプロジェクトは、 Python wheel ( .whl) ファイルとしてパッケージ化されています。
- pyproject.tomlファイルを含むパッケージ化されていない Python プロジェクト (プロジェクト名とバージョンを定義するため)。
次の例は、パイプラインに依存関係を追加する方法を示しています。
- 
パイプライン エディターでパイプラインを開きます。 
- 
クリック 上部のバーから 設定します 。 
- 
パイプライン設定 のスライドアウトの パイプライン環境 の下にある 編集環境 。 
- 
依存関係を追加します。たとえば、ワークスペースにファイルを追加するには、 /Volumes/libraries/path/to/python_files/file.pyを追加します。Gitフォルダーに保存されているPython wheelの場合、パスは/Workspace/libraries/path/to/wheel_files/file.whlのようになります。ファイルがパイプラインのルート フォルダーにある場合は、パスなしまたは相対パスでファイルを追加できます。 
依存関係を使用して共有フォルダーへのパスを追加し、コード内のimportステートメントでインポートするモジュールを見つけられるようにすることもできます。たとえば、 -e /Workspace/Users/<user_name>/path/to/add/ 。
importステートメントを使用して Python モジュールをインポートします
Python ソース コード内でワークスペース ファイルを直接参照することもできます。
- 
ファイルがパイプラインの utilitiesフォルダ内にある場合は、パスなしで参照できます。Pythonfrom utilities import my_module
- 
ファイルが他の場所にある場合は、まずモジュールのパスを sys.pathに追加してインポートできます。Pythonimport sys, os
 sys.path.append(os.path.abspath('<module-path>'))
 from my_module import *
- 
前のセクションで説明したように、パイプライン環境へのパスを追加することで、すべてのパイプライン ソース ファイルの sys.pathに追加することもできます。
クエリをPythonモジュールとしてインポートする例
次の例は、ワークスペース ファイルからデータセット クエリを Python モジュールとしてインポートする方法を示しています。この例では、ワークスペース ファイルを使用してパイプラインのソース コードを保存する方法について説明していますが、Git フォルダーに保存されているソース コードでも使用できます。
この例を実行するには、次のステップを使用します。
- 
クリック Databricksワークスペースのサイドバーにある 「ワークスペース」をクリックして、 ワークスペース ブラウザーを開きます。 
- 
ワークスペース ブラウザを使用して、Python モジュールのディレクトリを選択します。 
- 
選択したディレクトリの右端の列の をクリックし、 作成 > ファイル をクリックします。 
- 
ファイルの名前を入力します(例: clickstream_raw_module.py)。ファイルエディターが開きます。ソース データをテーブルに読み込むモジュールを作成するには、エディター ウィンドウに次のように入力します。Pythonfrom pyspark import pipelines as dp
 json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
 def create_clickstream_raw_table(spark):
 @dp.table
 def clickstream_raw():
 return (
 spark.read.json(json_path)
 )
- 
準備されたデータを含む新しいテーブルを作成するモジュールを作成するには、同じディレクトリに新しいファイルを作成し、ファイルの名前 (例: clickstream_prepared_module.py) を入力して、新しいエディター ウィンドウに次のように入力します。Pythonfrom clickstream_raw_module import *
 from pyspark import pipelines as dp
 from pyspark.sql.functions import *
 from pyspark.sql.types import *
 def create_clickstream_prepared_table(spark):
 create_clickstream_raw_table(spark)
 @dp.table
 @dp.expect("valid_current_page_title", "current_page_title IS NOT NULL")
 @dp.expect_or_fail("valid_count", "click_count > 0")
 def clickstream_prepared():
 return (
 spark.read("clickstream_raw")
 .withColumn("click_count", expr("CAST(n AS INT)"))
 .withColumnRenamed("curr_title", "current_page_title")
 .withColumnRenamed("prev_title", "previous_page_title")
 .select("current_page_title", "click_count", "previous_page_title")
 )
- 
次に、パイプライン ソースに Python ファイルを作成します。パイプラインエディタから、 を追加して 、変換します。 
- 
ファイルに名前を付け、 Python がデフォルトの言語であることを確認します。 
- 
作成 をクリックします。 
- 
ノートブックに次のサンプル コードを入力します。 
ノートブックが、ノートブック ディレクトリとは異なるワークスペース ファイル パスまたは Git フォルダー パスからモジュールまたはパッケージをインポートする場合は、 sys.path.append()を使用してファイルにパスを手動で追加する必要があります。
Git フォルダーからファイルをインポートする場合は、パスの先頭に/Workspace/追加する必要があります。たとえば、 sys.path.append('/Workspace/...') 。パスから/Workspace/を省略するとエラーが発生します。
モジュールまたはパッケージがノートブックと同じディレクトリに保存されている場合は、パスを手動で追加する必要はありません。また、Git フォルダーのルート ディレクトリからインポートする場合は、ルート ディレクトリがパスに自動的に追加されるため、手動でパスを追加する必要はありません。
import sys, os
sys.path.append(os.path.abspath('<module-path>'))
from pyspark import pipelines as dp
from clickstream_prepared_module import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
create_clickstream_prepared_table(spark)
@dp.table(
  comment="A table containing the top pages linking to the Apache Spark page."
)
def top_spark_referrers():
  return (
    spark.read.table("catalog_name.schema_name.clickstream_prepared")
      .filter(expr("current_page_title == 'Apache_Spark'"))
      .withColumnRenamed("previous_page_title", "referrer")
      .sort(desc("click_count"))
      .select("referrer", "click_count")
      .limit(10)
  )
<module-path> 、インポートする Python モジュールを含むディレクトリへのパスに置き換えます。
10. パイプラインを実行するには、「 パイプライン実行」を クリックします。