メインコンテンツまでスキップ

サーバレス コンピュートでのストリーミング

このページでは、Databricks上のサーバレスストリーミングワークロード(連続的なパイプライン、増分インジェスト、マネージドコネクタを含む)における適切な構成の選択方法について説明します。適切な構成は、ストリームのソース、形状、およびレイテンシーの要件によって異なります。

ストリーミングワークロードとしてカウントされるもの

ストリーミングワークロードは、ソース(クラウドオブジェクトストレージ、メッセージバス、または変更フィードなど)から無制限のデータを読み取り、シンクに段階的に書き込みます。Databricks はストリーミング ワークロードの2つのパターンをサポートしています。

  • 連続 :停止することなく実行され、到着する新しいデータを処理するパイプラインです。待ち時間は秒単位で測定されます。
  • 増分 (トリガーとも呼ばれる):スケジュールまたはトリガーに基づいて実行されるパイプラインで、前回の実行以降に到着したすべてのデータを処理し、停止します。レイテンシーは分単位で測定されます。

一部のワークロードはストリーミング パイプラインであるかのように見えますが、技術的にはパイプラインではありません。例としては、イベントをリッスンするためにWebSocketをオープン状態に保つサービス、ユーザーごとに永続的な接続を維持するチャットアプリケーション、あるいは受信HTTPリクエストを処理するWebhookレシーバーなどがあります。これらはストリーミングパイプラインではなく、アプリケーションです。それらのワークロードに最適なサーバレスオプションについては、ストリーミングパイプラインではないワークロードを参照してください。

適切なストリーミング設定を選択する

この表は、ユースケースを最適なサーバレス構成に関連付けます。このページの次のセクションでは、これらの推奨事項について詳しく説明します。

ユースケース

推奨構成

なぜ

継続的かつ低レイテンシなストリーミング ETL または変換

継続モードでのLakeflow Spark宣言型パイプライン

連続モードは、常時稼働のストリーム向けに設計されています。ストリーム パイプラインはマイクロバッチを並行して実行し、スループットとレイテンシーを改善します。管理された状態により、自動復旧が維持されます。

クラウドストレージからの増分取り込み

Auto Loaderは、低レイテンシの場合はLakeflow Spark宣言型パイプライン内で、または、より低いレイテンシが許容される場合はTrigger.AvailableNow()と組み合わせてサーバレスジョブ内で使用します。

Auto Loader は、新しいファイルを効率的に検出します。Trigger.AvailableNow()はバックログを処理して終了します。これはスケジュールまたはオンデマンドの周期に適しています。

SaaSソースまたはデータベースCDCからのマネージド取り込み

Lakeflowコネクトの標準コネクタ

サーバレスのインジェストパイプラインを備えたフルマネージドコネクタサポートされているソースでは、コードは不要です。

Delta テーブル上のストリーミング SQL

ストリーミングテーブル

マネージドパイプラインと更新を備えた、追加指向ソース向けのSQLネイティブなインクリメンタル処理。

ノートブックまたはジョブにおける定期的なマイクロバッチ処理

を使用したTrigger.AvailableNow()サーバーレスジョブ

分単位の最新性で十分な場合は、コスト効率が高いです。サーバレス コンピュートはすばやく起動し、バッチが完了すると終了します。

継続的ストリーミング

サーバレスコンピュートでの継続的なストリーミングには、Lakeflow Spark宣言型パイプラインを継続モードで使用します。パイプラインは稼働を継続し、レコードが到着すると処理を行い、障害から自動的に復旧します。

継続的なストリームを構成するには:

ヒント

サーバレスのLakeflow Spark宣言型パイプラインでは、ストリームパイプライン処理がデフォルトで有効になっています。マイクロバッチは順次ではなく並行して実行されるため、取り込み負荷の高いストリームのスループットが向上します。

時間ベースの 構造化ストリーミングトリガーTrigger.ProcessingTime(interval)Trigger.Continuous(interval) など)は、サーバレス ノートブックまたはジョブでは使用できません。常時稼働パターンには、連続モードでLakeFlow Spark宣言型パイプラインを使用します。「ストリーミングの制限事項」を参照してください。Trigger.Once() はサポートされていますが非推奨です。既存のクエリを Trigger.AvailableNow() に移行してください。

インクリメンタルおよびトリガー型ストリーミング

インクリメンタルストリーミングの場合、Trigger.AvailableNow() で構造化ストリーミングをサーバレスジョブで実行します。各実行では、最後のチェックポイント以降に到着したすべてのデータを処理し、その後終了します。

増分ストリーミングでサーバレス ジョブを設定するには:

次の例では、Auto Loader を使用してクラウドストレージ(source_path)から新しいファイルを読み込み、実行時に利用可能なすべてのデータを処理し、Delta テーブルに書き込みます。

Python
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.maxFilesPerTrigger", 1000)
.load(source_path)
.writeStream
.trigger(availableNow=True)
.option("checkpointLocation", checkpoint_path)
.toTable("catalog.schema.target_table"))

スケジュールされたTrigger.AvailableNow()ジョブは、分レベルのレイテンシーが許容できる場合、サーバレス コンピュートでの最もコスト効率の高いストリーミングパターンです。コンピュートは数秒で起動し、バッチを実行し、シャットダウンします。

「管理対象の取り込み」

ソースがSaaSアプリケーションまたは運用データベースである場合は、構造化ストリーミングコードを記述する代わりに、LakeFlow Connectをご利用ください。LakeFlow Connect は、Salesforce、Workday、SQL Server CDC、PostgreSQL CDC などのコネクタ向けにサーバレスインジェスチョンパイプラインを実行します。LakeFlow Connectのマネージド コネクタを参照してください。

このパスが正しい答えとなるのは、以下のとおりです。

  • ソースにコネクタがあります。
  • カスタムコードではなく、マネージドパイプラインを選択できます。
  • スキーマ進化、リネージ、モニタリングは標準で提供されています。

SQL管理のインクリメンタルデータ処理

SQLを重視するチームの場合、SQLネイティブなストリーミングワークロードにはストリーミングテーブルを使用してください。ストリーミングテーブルは、Lakeflow Spark宣言型パイプライン内で定義することも、スタンドアロンストリーミングテーブルとして定義することもできます。

CREATE OR REFRESH STREAMING TABLE SQL ステートメントで作成されたスタンドアロンのストリーミングテーブルの場合、初期データの更新とデータ作成がすぐに開始されます。専用のサーバレス パイプラインは、ストリーミングテーブルごとにシステムによって自動的に作成および管理されます。

マネージド更新でのバッチセマンティクスのクエリ結果が必要な場合は、代わりにマテリアライズドビューを使用してください。See マテリアライズドビュー.

ストリーミングパイプラインではないワークロード

永続的な接続を維持したり、ポートをリッスンしたり、着信HTTPリクエストに応答したりする必要があるワークロードは、ストリーミングパイプラインではなく、アプリケーションです。これらのワークロードをサーバレスジョブで実行しないでください。適切な Databricks オプションは次のとおりです:

  • 永続的な接続またはHTTPエンドポイントを必要とする長時間実行サービス : Databricks Appsでサービスを構築してください。Databricks Apps は、FastAPI、Flask、Streamlit、Dash、Gradio、Node.js、Shiny などのカスタムアプリケーションを Databricks 上でホストできる、サーバレスプラットフォームです。See Databricks Apps.
  • 受信Webhookまたはイベントリスナー : Databricks AppsでHTTPエンドポイントを公開するか、外部サービスでWebhookを終端し、イベントをクラウドストレージまたはメッセージバスに書き込み、その後、サーバレスストリーミングパイプラインで処理します。
  • カスタムトークンまたは資格情報の交換サービスプリンシパルをOAuthで使用するか、アプリからDatabricks REST APIsを呼び出します。ストリーミング パイプラインは、ユーザーごとのセッションまたはカスタム トークンの状態を保持しません。

ワークロードがストリーミングパイプラインに適しているかどうかを評価している場合は、問いかけてください。

  • ワークロードは、アンバウンドなデータソースから読み込み、シンクに書き込みますか?はい、ストリーミングパイプラインです。
  • ワークロードはクライアントへの接続を開いたままにする必要がありますか?はい、その場合はアプリケーションです;Databricks Apps を使用してください。

制限事項:

サーバレスコンピュートでは、以下のストリーミング制約が適用されます。適切な製品と組み合わせた場合でも、いずれも上記のワークロードを防ぐことはできません。

  • 時間ベースの構造化ストリーミング トリガー(Trigger.ProcessingTime(interval)およびTrigger.Continuous(interval))は、サーバレス ノートブックまたはジョブではサポートされていません。Lakeflow Spark宣言型パイプラインを、常時稼働のストリームには連続モードで、またはトリガー実行にはTrigger.AvailableNow()を使用します。ストリーミングの制限事項を参照してください。
  • 明示的なトリガーなしのストリーミング クエリーはINFINITE_STREAMING_TRIGGER_NOT_SUPPORTEDで失敗します。Apache Spark のデフォルトは Trigger.ProcessingTime("0 seconds") ですが、これはサーバレス コンピュートではサポートされていません。Trigger.AvailableNow()をすべてのストリーミングクエリに常に設定してください、またはLakeflow Spark宣言型パイプラインを連続モードで使用してください。
  • 標準アクセスモードでのストリーミングに関するすべての制限事項も、サーバレスコンピュートに適用されます。「ストリーミングの制限事項」を参照してください。

次のステップ