メインコンテンツまでスキップ

リアルタイムモード参照

このページでは、構造化ストリーミングのリアルタイムモードに関する参考情報を提供します。サポートされている環境、言語、ソース、シンク、およびオペレーターなどが含まれます。既知の制限事項については、 「リアルタイムモードの制限事項」を参照してください。

対応言語

リアルタイムモードは、Scala、Java、およびPythonをサポートしています。

コンピュートの種類

オンライン モードは、次のコンピュート タイプをサポートします。

クラスタータイプ

サポート対象

専用(旧:シングルユーザー)

標準(旧称:共有)

✓(Pythonのみ)

Lakeflow Spark宣言型パイプライン Classic

サポートされていない

Lakeflow Spark宣言型パイプライン サーバレス

サポートされていない

サーバーレス

サポートされていない

実行モード

リアルタイムモードは更新モードのみをサポートします。

実行 モード

サポート対象

更新モード

追加モード

サポートされていない

完全モード

サポートされていない

ソースとシンク

リアルタイムモードでは、以下のソースとシンクがサポートされます。

ソースまたはシンク

ソースとして

シンクとして

Apache Kafka

イベントハブ(Kafkaコネクタを使用)

Kinesis

✓(EFOモードのみ)

サポートされていない

AWS MSK

サポートされていない

Delta

サポートされていない

サポートされていない

Google Pub/Sub

サポートされていない

サポートされていない

Apache Pulsar

サポートされていない

サポートされていない

任意のシンク( forEachWriterを使用)

該当なし

オペレーター

リアルタイムモードは、ほとんどの構造化ストリーミング演算子をサポートしています。

ステートレスな操作

オペレーター

サポート対象

選択

投影

UDF

オペレーター

サポート対象

Scala UDF

✓(一部制限あり

Python UDF

✓(一部制限あり

集約

オペレーター

サポート対象

合計

count

max

min

avg

集計関数

ウィンドウ処理

オペレーター

サポート対象

転倒

スライディング

セッション

サポートされていない

重複排除

オペレーター

サポート対象

重複を削除

✓(状態は無制限です)

ウォーターマーク内の重複を削除

サポートされていない

ストリームからテーブルへの結合

オペレーター

サポート対象

ブロードキャストテーブル結合(テーブルは小さいサイズである必要があります)

ストリームからストリームへの参加

サポートされていない

(flat)MapGroupsWithState

サポートされていない

transformWithState

✓(多少の違いはあるものの

union

✓(一部制限あり

For each

forEachBatch

サポートされていない

mapPartitions

サポートされていません(制限事項を参照

特別な考慮事項

リアルタイムモードで使用する場合、一部の演算子や機能には特別な考慮事項や違いがあります。

transformWithStateリアルタイムモード

カスタム ステートフル アプリケーションを構築するために、 Databricks Apache Spark構造化ストリーミングのAPIであるtransformWithStateサポートしています。 APIとコードスニペットの詳細については、 「カスタムステートフルアプリケーションの構築」を参照してください。

ただし、リアルタイムモードでのAPIの動作と、マイクロバッチアーキテクチャを活用する従来のストリーミングクエリの動作には、いくつかの違いがあります。

  • リアルタイムモードでは、各行に対してhandleInputRows(key: String, inputRows: Iterator[T], timerValues: TimerValues)メソッドが呼び出されます。

    • inputRowsイテレータは単一の値を返します。マイクロバッチモードでは、キーごとに一度呼び出され、 inputRowsイテレータはマイクロバッチ内のキーに対応するすべての値を返します。
    • コードを書くときにこの違いを考慮してください
  • リアルタイムモードでは、イベントタイマーはサポートされていません。

  • リアルタイムモードでは、タイマーはデータの到着に応じて作動が遅延します。

    • タイマーが10:00:00に設定されている場合でも、データが到着しない場合は、タイマーはすぐに作動しません。
    • データが10時00分10秒に到着した場合、タイマーは10秒遅れて作動します。
    • データが到着せず、長時間実行中のバッチ処理が終了しようとしている場合、バッチ処理が終了する前にタイマーが作動します。

リアルタイムモードのPython UDF

Databricksは、リアルタイムモードでPythonのユーザー定義関数(UDF)の大部分をサポートしています。

ステートレス

UDF型

サポート対象

PythonのスカラーUDF(ユーザー定義スカラー関数 - Python

矢印スカラーUDF

PandasスカラーUDF ( Pandasユーザー定義関数

矢印関数 ( mapInArrow )

Pandas関数( Map

ステートフルグルーピング(UDAF)

UDF型

サポート対象

transformWithStateRowインターフェースのみ)

applyInPandasWithState

サポートされていない

非ステートフルなグループ化(UDAF)

UDF型

サポート対象

apply

サポートされていない

applyInArrow

サポートされていない

applyInPandas

サポートされていない

テーブル関数

UDF型

サポート対象

UDTF( Pythonユーザー定義テーブル関数(UDTF)

サポートされていない

UC UDF

サポートされていない

Python UDFをリアルタイムモードで使用する際には、考慮すべき点がいくつかあります。

  • 遅延を最小限に抑えるには、Arrow のバッチサイズ ( spark.sql.execution.arrow.maxRecordsPerBatch ) を 1 に設定してください。

    • トレードオフ:この構成は、スループットを犠牲にしてレイテンシを最適化します。ほとんどのワークロードにおいて、この設定が推奨されます。
    • 入力量に対応するためにスループットの向上が必要な場合のみ、バッチサイズを増やしてください。その際、レイテンシが増加する可能性を許容してください。
  • PandasのUDFと関数は、Arrowのバッチサイズが1の場合、パフォーマンスが低下します。

    • Pandas UDFまたは関数を使用する場合は、矢印 サイズをより大きな値(たとえば、100以上)に設定してください。
    • これは、より高い遅延を意味する。Databricksは、可能であればArrow UDFまたは関数を使用することを推奨しています。
  • Pandasのパフォーマンス上の問題により、transformWithStateはRowインターフェースでのみサポートされています。