スタンドアロンのパイプラインでPythonを使用する
Pythonを使用してノートブックから、スタンドアロンのマテリアライズドビューとストリーミングテーブルを作成および更新できます。Python ノートブックでパイプラインを作成し、spark.sql() で実行します。これにより、他のPythonベースのノートブックワークフローと並行して、スタンドアロンのパイプラインを管理できます。
スタンドアロン パイプライン用の Python ソースには、 サーバレス汎用コンピュート にアタッチされたノートブックが必要です。Databricks SQLウェアハウスから、Pythonを使用してスタンドアロンのパイプラインを作成または更新することはできません。ウェアハウスはPythonノートブックではなくSQLステートメントを実行するためです。代わりにSQLウェアハウスを使用するには、「スタンドアロンのマテリアライズドビューを使用する」および「スタンドアロンのストリーミングテーブルを使用する」を参照してください。
要件
Pythonでスタンドアロンのパイプラインを作成および更新するには、Databricks Runtime 18.1以降のサーバレス汎用コンピュートにアタッチされたノートブックが必要です。地域ごとの提供状況と権限を含む、完全な要件のリストについては、「ノートブック」を参照してください。
仕組み
Pythonノートブックでは、Databricks SQLウェアハウスから実行するのと同じステートメントを、spark.sql()に渡します。スタンドアロンのマテリアライズドビューとストリーミングテーブルの構文は同一です;ステートメントの送信方法のみが異なります。ウェアハウスと同様に、各CREATEまたはREFRESHステートメントは、操作を処理するためにサーバレス パイプラインを実行します。
spark セッションは Databricks ノートブックでデフォルトで利用できるため、インポートは必要ありません。
マテリアライズドビューの作成
次の例では、ベーステーブル base_table1 から mv1 マテリアライズドビューを作成します:
spark.sql("""
CREATE OR REPLACE MATERIALIZED VIEW mv1
AS SELECT
date,
sum(sales) AS sum_of_sales
FROM base_table1
GROUP BY date
""")
スケジュールされた更新やトリガーされた更新など、完全なCREATE MATERIALIZED VIEW詳細については、「マテリアライズドビューの作成」を参照してください。
ストリーミングテーブルを作成
次の例では、raw_dataテーブルからsalesというストリーミングテーブルを作成します:
spark.sql("""
CREATE OR REFRESH STREAMING TABLE sales
AS SELECT product, price FROM STREAM raw_data
""")
CREATE STREAMING TABLE の詳細 (Auto Loader を使用したファイルの読み込みやスケジューリングなど) については、「スタンドアロン ストリーミングテーブルの使用」を参照してください。
マテリアライズドビューまたはストリーミングテーブルを更新
スタンドアロンテーブルを、ソースからの最新データで更新するには、REFRESHステートメントを使用します。
spark.sql("REFRESH MATERIALIZED VIEW mv1")
spark.sql("REFRESH STREAMING TABLE sales")
サーバレス汎用コンピュートでは、更新は同期的に行われます。非同期更新 (ASYNC キーワード) はサポートされていません。サーバレスジェネラルコンピュートを参照してください。
ステートメントのパラメータ化
値をPythonコードからステートメントにハードコーディングする代わりに渡すには、SQLで名前付きパラメーターマーカーを使用し、spark.sql()のargs引数を通じてその値を供給します。リテラル値には、:min_salesなどのマーカーを直接使用してください。識別子はプレーンな文字列値として置換できないため、パラメーターがテーブル、ビュー、スキーマなどのオブジェクト名である場合にのみ、マーカーをIDENTIFIER()で囲んでください。
次の例では、マテリアライズドビュー名とフィルター値の両方をパラメーター化します。
mv_name = "main.sales.regional_sales"
min_sales = 1000
spark.sql("""
CREATE OR REPLACE MATERIALIZED VIEW IDENTIFIER(:mv)
AS SELECT
region,
sum(sales) AS sum_of_sales
FROM base_table1
WHERE sales > :min_sales
GROUP BY region
""", args={
"mv": mv_name,
"min_sales": min_sales,
})
詳細については、「パラメーター・マーカー」および「IDENTIFIER 句」を参照してください。
その他のステートメントを実行
Python ノートブックから、spark.sql() に渡すことで、更新のスケジュール設定、テーブルの変更、またはテーブルの削除などのステートメントを含む、任意のスタンドアロンのマテリアライズドビューまたはストリーミングテーブルのステートメントを実行できます。マテリアライズドビューとストリーミングテーブルの使用方法をSQL構文を含めて理解するには、「スタンドアロンのマテリアライズドビューを使用する」および「スタンドアロンのストリーミングテーブルを使用する」を参照してください。
制限事項:
サーバレス汎用コンピュートで作成されたスタンドアロンのマテリアライズドビューとストリーミングテーブルには、非同期更新のサポートがないこと、およびテーブルごとのコスト配分がないことなど、追加の制限があります。詳細については、「サーバレス全般コンピュート」を参照してください。