リアルタイムモード参照
このページでは、構造化ストリーミングのリアルタイムモードに関する参考情報を提供します。サポートされている環境、言語、ソース、シンク、およびオペレーターなどが含まれます。既知の制限事項については、 「リアルタイムモードの制限事項」を参照してください。
対応言語
リアルタイムモードは、Scala、Java、およびPythonをサポートしています。
コンピュートの種類
オンライン モードは、次のコンピュート タイプをサポートします。
クラスタータイプ | サポート対象 |
|---|---|
専用(旧:シングルユーザー) | ✓ |
標準(旧称:共有) | ✓(Pythonのみ) |
Lakeflow Spark宣言型パイプライン Classic | サポートされていない |
Lakeflow Spark宣言型パイプライン サーバレス | サポートされていない |
サーバーレス | サポートされていない |
実行モード
リアルタイムモードは更新モードのみをサポートします。
実行 モード | サポート対象 |
|---|---|
更新モード | ✓ |
追加モード | サポートされていない |
完全モード | サポートされていない |
ソースとシンク
リアルタイムモードでは、以下のソースとシンクがサポートされます。
ソースまたはシンク | ソースとして | シンクとして |
|---|---|---|
Apache Kafka | ✓ | ✓ |
イベントハブ(Kafkaコネクタを使用) | ✓ | ✓ |
Kinesis | ✓(EFOモードのみ) | サポートされていない |
AWS MSK | ✓ | サポートされていない |
Delta | サポートされていない | サポートされていない |
Google Pub/Sub | サポートされていない | サポートされていない |
Apache Pulsar | サポートされていない | サポートされていない |
任意のシンク( | 該当なし | ✓ |
オペレーター
リアルタイムモードは、ほとんどの構造化ストリーミング演算子をサポートしています。
ステートレスな操作
オペレーター | サポート対象 |
|---|---|
選択 | ✓ |
投影 | ✓ |
UDF
集約
オペレーター | サポート対象 |
|---|---|
合計 | ✓ |
count | ✓ |
max | ✓ |
min | ✓ |
avg | ✓ |
✓ |
ウィンドウ処理
オペレーター | サポート対象 |
|---|---|
転倒 | ✓ |
スライディング | ✓ |
セッション | サポートされていない |
重複排除
オペレーター | サポート対象 |
|---|---|
重複を削除 | ✓(状態は無制限です) |
ウォーターマーク内の重複を削除 | サポートされていない |
ストリームからテーブルへの結合
オペレーター | サポート対象 |
|---|---|
ブロードキャストテーブル結合(テーブルは小さいサイズである必要があります) | ✓ |
ストリームからストリームへの参加 | サポートされていない |
(flat)MapGroupsWithState | サポートされていない |
transformWithState | ✓(多少の違いはあるものの) |
union | ✓(一部制限あり) |
For each | ✓ |
forEachBatch | サポートされていない |
mapPartitions | サポートされていません(制限事項を参照) |
特別な考慮事項
リアルタイムモードで使用する場合、一部の演算子や機能には特別な考慮事項や違いがあります。
transformWithStateリアルタイムモード
カスタム ステートフル アプリケーションを構築するために、 Databricks Apache Spark構造化ストリーミングのAPIであるtransformWithStateサポートしています。 APIとコードスニペットの詳細については、 「カスタムステートフルアプリケーションの構築」を参照してください。
ただし、リアルタイムモードでのAPIの動作と、マイクロバッチアーキテクチャを活用する従来のストリーミングクエリの動作には、いくつかの違いがあります。
-
リアルタイムモードでは、各行に対して
handleInputRows(key: String, inputRows: Iterator[T], timerValues: TimerValues)メソッドが呼び出されます。inputRowsイテレータは単一の値を返します。マイクロバッチモードでは、キーごとに一度呼び出され、inputRowsイテレータはマイクロバッチ内のキーに対応するすべての値を返します。- コードを書くときにこの違いを考慮してください
-
リアルタイムモードでは、イベントタイマーはサポートされていません。
-
リアルタイムモードでは、タイマーはデータの到着に応じて作動が遅延します。
- タイマーが10:00:00に設定されている場合でも、データが到着しない場合は、タイマーはすぐに作動しません。
- データが10時00分10秒に到着した場合、タイマーは10秒遅れて作動します。
- データが到着せず、長時間実行中のバッチ処理が終了しようとしている場合、バッチ処理が終了する前にタイマーが作動します。
リアルタイムモードのPython UDF
Databricksは、リアルタイムモードでPythonのユーザー定義関数(UDF)の大部分をサポートしています。
ステートレス
UDF型 | サポート対象 |
|---|---|
PythonのスカラーUDF(ユーザー定義スカラー関数 - Python ) | ✓ |
矢印スカラーUDF | ✓ |
PandasスカラーUDF ( Pandasユーザー定義関数) | ✓ |
矢印関数 ( | ✓ |
Pandas関数( Map ) | ✓ |
ステートフルグルーピング(UDAF)
UDF型 | サポート対象 |
|---|---|
| ✓ |
| サポートされていない |
非ステートフルなグループ化(UDAF)
UDF型 | サポート対象 |
|---|---|
| サポートされていない |
| サポートされていない |
| サポートされていない |
テーブル関数
UDF型 | サポート対象 |
|---|---|
UDTF( Pythonユーザー定義テーブル関数(UDTF) ) | サポートされていない |
UC UDF | サポートされていない |
Python UDFをリアルタイムモードで使用する際には、考慮すべき点がいくつかあります。
-
遅延を最小限に抑えるには、Arrow のバッチサイズ (
spark.sql.execution.arrow.maxRecordsPerBatch) を 1 に設定してください。- トレードオフ:この構成は、スループットを犠牲にしてレイテンシを最適化します。ほとんどのワークロードにおいて、この設定が推奨されます。
- 入力量に対応するためにスループットの向上が必要な場合のみ、バッチサイズを増やしてください。その際、レイテンシが増加する可能性を許容してください。
-
PandasのUDFと関数は、Arrowのバッチサイズが1の場合、パフォーマンスが低下します。
- Pandas UDFまたは関数を使用する場合は、矢印 サイズをより大きな値(たとえば、100以上)に設定してください。
- これは、より高い遅延を意味する。Databricksは、可能であればArrow UDFまたは関数を使用することを推奨しています。
-
Pandasのパフォーマンス上の問題により、transformWithStateは
Rowインターフェースでのみサポートされています。