サーバレス コンピュートでのストリーミング
このページでは、Databricks上のサーバレスストリーミングワークロード(連続的なパイプライン、増分インジェスト、マネージドコネクタを含む)における適切な構成の選択方法について説明します。適切な構成は、ストリームのソース、形状、およびレイテンシーの要件によって異なります。
ストリーミングワークロードとしてカウントされるもの
ストリーミングワークロードは、ソース(クラウドオブジェクトストレージ、メッセージバス、または変更フィードなど)から無制限のデータを読み取り、シンクに段階的に書き込みます。Databricks はストリーミング ワークロードの2つのパターンをサポートしています。
- 連続 :停止することなく実行され、到着する新しいデータを処理するパイプラインです。待ち時間は秒単位で測定されます。
- 増分 (トリガーとも呼ばれる):スケジュールまたはトリガーに基づいて実行されるパイプラインで、前回の実行以降に到着したすべてのデータを処理し、停止します。レイテンシーは分単位で測定されます。
一部のワークロードはストリーミング パイプラインであるかのように見えますが、技術的にはパイプラインではありません。例としては、イベントをリッスンするためにWebSocketをオープン状態に保つサービス、ユーザーごとに永続的な接続を維持するチャットアプリケーション、あるいは受信HTTPリクエストを処理するWebhookレシーバーなどがあります。これらはストリーミングパイプラインではなく、アプリケーションです。それらのワークロードに最適なサーバレスオプションについては、ストリーミングパイプラインではないワークロードを参照してください。
適切なストリーミング設定を選択する
この表は、ユースケースを最適なサーバレス構成に関連付けます。このページの次のセクションでは、これらの推奨事項について詳しく説明します。
ユースケース | 推奨構成 | なぜ |
|---|---|---|
継続的かつ低レイテンシなストリーミング ETL または変換 | 連続モードは、常時稼働のストリーム向けに設計されています。ストリーム パイプラインはマイクロバッチを並行して実行し、スループットとレイテンシーを改善します。管理された状態により、自動復旧が維持されます。 | |
クラウドストレージからの増分取り込み | Auto Loaderは、低レイテンシの場合はLakeflow Spark宣言型パイプライン内で、または、より低いレイテンシが許容される場合は | Auto Loader は、新しいファイルを効率的に検出します。 |
SaaSソースまたはデータベースCDCからのマネージド取り込み | サーバレスのインジェストパイプラインを備えたフルマネージドコネクタサポートされているソースでは、コードは不要です。 | |
Delta テーブル上のストリーミング SQL | マネージドパイプラインと更新を備えた、追加指向ソース向けのSQLネイティブなインクリメンタル処理。 | |
ノートブックまたはジョブにおける定期的なマイクロバッチ処理 | 分単位の最新性で十分な場合は、コスト効率が高いです。サーバレス コンピュートはすばやく起動し、バッチが完了すると終了します。 |
継続的ストリーミング
サーバレスコンピュートでの継続的なストリーミングには、Lakeflow Spark宣言型パイプラインを継続モードで使用します。パイプラインは稼働を継続し、レコードが到着すると処理を行い、障害から自動的に復旧します。
継続的なストリームを構成するには:
- パイプラインをサーバレスとして設定します。「サーバレス パイプラインの設定」を参照してください。
- パイプラインモードを連続に設定してください。トリガー パイプライン モードと継続的パイプライン モードを参照してください。
- 増分的に維持される出力には、ストリーミングテーブルを使用してください。
サーバレスのLakeflow Spark宣言型パイプラインでは、ストリームパイプライン処理がデフォルトで有効になっています。マイクロバッチは順次ではなく並行して実行されるため、取り込み負荷の高いストリームのスループットが向上します。
時間ベースの 構造化ストリーミングトリガー(Trigger.ProcessingTime(interval) や Trigger.Continuous(interval) など)は、サーバレス ノートブックまたはジョブでは使用できません。常時稼働パターンには、連続モードでLakeFlow Spark宣言型パイプラインを使用します。「ストリーミングの制限事項」を参照してください。Trigger.Once() はサポートされていますが非推奨です。既存のクエリを Trigger.AvailableNow() に移行してください。
インクリメンタルおよびトリガー型ストリーミング
インクリメンタルストリーミングの場合、Trigger.AvailableNow() で構造化ストリーミングをサーバレスジョブで実行します。各実行では、最後のチェックポイント以降に到着したすべてのデータを処理し、その後終了します。
増分ストリーミングでサーバレス ジョブを設定するには:
- 必要な頻度でジョブをスケジュールします。スケジュールに従ってジョブを実行するを参照してください。
- ジョブ内のすべてのストリーミングクエリで
Trigger.AvailableNow()を使用してください。構造化ストリーミングのトリガー間隔の設定を参照してください。 - メモリ使用量を予測可能にするには、バッチ サイズを
maxFilesPerTriggerまたはmaxBytesPerTriggerで調整します。サーバレス コンピュートのベスト プラクティスを参照してください。
次の例では、Auto Loader を使用してクラウドストレージ(source_path)から新しいファイルを読み込み、実行時に利用可能なすべてのデータを処理し、Delta テーブルに書き込みます。
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.maxFilesPerTrigger", 1000)
.load(source_path)
.writeStream
.trigger(availableNow=True)
.option("checkpointLocation", checkpoint_path)
.toTable("catalog.schema.target_table"))
スケジュールされたTrigger.AvailableNow()ジョブは、分レベルのレイテンシーが許容できる場合、サーバレス コンピュートでの最もコスト効率の高いストリーミングパターンです。コンピュートは数秒で起動し、バッチを実行し、シャットダウンします。
「管理対象の取り込み」
ソースがSaaSアプリケーションまたは運用データベースである場合は、構造化ストリーミングコードを記述する代わりに、LakeFlow Connectをご利用ください。LakeFlow Connect は、Salesforce、Workday、SQL Server CDC、PostgreSQL CDC などのコネクタ向けにサーバレスインジェスチョンパイプラインを実行します。LakeFlow Connectのマネージド コネクタを参照してください。
このパスが正しい答えとなるのは、以下のとおりです。
- ソースにコネクタがあります。
- カスタムコードではなく、マネージドパイプラインを選択できます。
- スキーマ進化、リネージ、モニタリングは標準で提供されています。
SQL管理のインクリメンタルデータ処理
SQLを重視するチームの場合、SQLネイティブなストリーミングワークロードにはストリーミングテーブルを使用してください。ストリーミングテーブルは、Lakeflow Spark宣言型パイプライン内で定義することも、スタンドアロンストリーミングテーブルとして定義することもできます。
CREATE OR REFRESH STREAMING TABLE SQL ステートメントで作成されたスタンドアロンのストリーミングテーブルの場合、初期データの更新とデータ作成がすぐに開始されます。専用のサーバレス パイプラインは、ストリーミングテーブルごとにシステムによって自動的に作成および管理されます。
マネージド更新でのバッチセマンティクスのクエリ結果が必要な場合は、代わりにマテリアライズドビューを使用してください。See マテリアライズドビュー.
ストリーミングパイプラインではないワークロード
永続的な接続を維持したり、ポートをリッスンしたり、着信HTTPリクエストに応答したりする必要があるワークロードは、ストリーミングパイプラインではなく、アプリケーションです。これらのワークロードをサーバレスジョブで実行しないでください。適切な Databricks オプションは次のとおりです:
- 永続的な接続またはHTTPエンドポイントを必要とする長時間実行サービス : Databricks Appsでサービスを構築してください。Databricks Apps は、FastAPI、Flask、Streamlit、Dash、Gradio、Node.js、Shiny などのカスタムアプリケーションを Databricks 上でホストできる、サーバレスプラットフォームです。See Databricks Apps.
- 受信Webhookまたはイベントリスナー : Databricks AppsでHTTPエンドポイントを公開するか、外部サービスでWebhookを終端し、イベントをクラウドストレージまたはメッセージバスに書き込み、その後、サーバレスストリーミングパイプラインで処理します。
- カスタムトークンまたは資格情報の交換 :サービスプリンシパルをOAuthで使用するか、アプリからDatabricks REST APIsを呼び出します。ストリーミング パイプラインは、ユーザーごとのセッションまたはカスタム トークンの状態を保持しません。
ワークロードがストリーミングパイプラインに適しているかどうかを評価している場合は、問いかけてください。
- ワークロードは、アンバウンドなデータソースから読み込み、シンクに書き込みますか?はい、ストリーミングパイプラインです。
- ワークロードはクライアントへの接続を開いたままにする必要がありますか?はい、その場合はアプリケーションです;Databricks Apps を使用してください。
制限事項:
サーバレスコンピュートでは、以下のストリーミング制約が適用されます。適切な製品と組み合わせた場合でも、いずれも上記のワークロードを防ぐことはできません。
- 時間ベースの構造化ストリーミング トリガー(
Trigger.ProcessingTime(interval)およびTrigger.Continuous(interval))は、サーバレス ノートブックまたはジョブではサポートされていません。Lakeflow Spark宣言型パイプラインを、常時稼働のストリームには連続モードで、またはトリガー実行にはTrigger.AvailableNow()を使用します。ストリーミングの制限事項を参照してください。 - 明示的なトリガーなしのストリーミング クエリーは
INFINITE_STREAMING_TRIGGER_NOT_SUPPORTEDで失敗します。Apache Spark のデフォルトはTrigger.ProcessingTime("0 seconds")ですが、これはサーバレス コンピュートではサポートされていません。Trigger.AvailableNow()をすべてのストリーミングクエリに常に設定してください、またはLakeflow Spark宣言型パイプラインを連続モードで使用してください。 - 標準アクセスモードでのストリーミングに関するすべての制限事項も、サーバレスコンピュートに適用されます。「ストリーミングの制限事項」を参照してください。