メインコンテンツまでスキップ

リアルタイムモード参照

対応言語

リアルタイムモードは、Scala、Java、およびPythonをサポートしています。

コンピュートの種類

オンライン モードは、次のコンピュート タイプをサポートします。

クラスタータイプ

サポート対象

専用(旧:シングルユーザー)

標準(旧称:共有)

✓(Pythonのみ)

Lakeflow Spark宣言型パイプライン Classic

構造化ストリーミングとしてサポートされていません。パイプライン構成によってサポートされています。LakeFlow Spark宣言型パイプラインでリアルタイムモードを使用するを参照してください。

Lakeflow Spark宣言型パイプライン サーバレス

構造化ストリーミングとしてサポートされていません。パイプライン構成によってサポートされています。LakeFlow Spark宣言型パイプラインでリアルタイムモードを使用するを参照してください。

サーバーレス

サポートされていない

UDFを使用するレイテンシに敏感なワークロードの場合、Databricksは専用アクセスモードの使用を推奨します。テーブル関数を参照してください。

実行モード

リアルタイムモードは更新モードのみをサポートします。

実行 モード

サポート対象

更新モード

追加モード

サポートされていない

完全モード

サポートされていない

ソースとシンク

リアルタイムモードでは、以下のソースとシンクがサポートされます。

ソースまたはシンク

ソースとして

シンクとして

Apache Kafka

イベントハブ(Kafkaコネクタを使用)

Kinesis

✓(EFOモードのみ)

サポートされていない

AWS MSK

サポートされていない

Delta

サポートされていない

サポートされていない

Google Pub/Sub

サポートされていない

サポートされていない

Apache Pulsar

サポートされていない

サポートされていない

任意のシンク( forEachWriterを使用)

該当なし

オペレーター

リアルタイムモードは、ほとんどの構造化ストリーミング演算子をサポートしています。

ステートレスな操作

オペレーター

サポート対象

選択

投影

mapPartitions

サポートされていません(制限事項を参照

結合

✓(一部制限あり

UDF

オペレーター

サポート対象

Scala UDF

✓(一部制限あり

Python UDF

✓(一部制限あり

集約

関数

サポート対象

合計

count

max

min

avg

集計関数

ウィンドウ処理

オペレーター

サポート対象

転倒

スライディング

セッション

サポートされていない

重複排除

オペレーター

サポート対象

重複を削除

ウォーターマーク内の重複を削除

ストリームからテーブルへの結合

オペレーター

サポート対象

Inner join

アウター結合

ブロードキャストテーブル結合 (テーブルサイズが10MB以下)

テーブル結合(ブロードキャストなし)

サポートされていない

ストリーム-ストリームJOIN

オペレーター

サポート対象

Inner join

✓ (Databricks Runtime 18以降、一部構成あり)

外部結合

サポートされていない

注記

リアルタイムモードでストリーム-ストリーム結合を使用するには、追加の Spark 設定を行う必要があります。構成および複数のストリームを実行するための要件に関する詳細については、ストリーム-ストリームJOINを参照してください。

任意のステートフル演算子

オペレーター

サポート対象

(flat)MapGroupsWithState

サポートされていない

transformWithState

✓(多少の違いはあるものの

ユーザー定義シンク

シンク

サポート対象

For each

forEachBatch

サポートされていない

特別な考慮事項

リアルタイムモードで使用する場合、一部の演算子や機能には特別な考慮事項や違いがあります。

transformWithStateリアルタイムモード

カスタム ステートフル アプリケーションを構築するために、 Databricks Apache Spark構造化ストリーミングのAPIであるtransformWithStateサポートしています。 APIとコードスニペットの詳細については、 「カスタムステートフルアプリケーションの構築」を参照してください。

しかし、APIはリアルタイムモードではマイクロバッチクエリとは動作が異なります。

  • リアルタイムモードでは、各行に対してhandleInputRows(key: String, inputRows: Iterator[T], timerValues: TimerValues)メソッドが呼び出されます。

    • inputRowsイテレータは単一の値を返します。マイクロバッチモードでは、キーごとに一度呼び出され、 inputRowsイテレータはマイクロバッチ内のキーに対応するすべての値を返します。
    • コードを書くときにこの違いを考慮してください
  • リアルタイムモードでは、イベントタイマーはサポートされていません。

  • transformWithStateInPandas リアルタイムモードではサポートされていません。代わりに、 Pandas DataFramesではなくRowオブジェクトを使用する行ベースのtransformWithState API使用してください。

  • リアルタイムモードでは、タイマーはデータの到着に応じて作動が遅延します。

    • タイマーが10:00:00に設定されている場合でも、データが到着しない場合は、タイマーはすぐに作動しません。
    • データが10時00分10秒に到着した場合、タイマーは10秒遅れて作動します。
    • データが到着せず、長時間実行中のバッチ処理が終了しようとしている場合、バッチ処理が終了する前にタイマーが作動します。
注記

Databricks Runtime 18.1以前のバージョンでは、スループットが低い(1秒あたり5レコード未満)PythonでtransformWithStateとリアルタイムモードを使用すると、レイテンシが数百ミリ秒まで増加する可能性があります。Databricksは、この問題を解決するためにDatabricks Runtime 18.2以降へのアップグレードを推奨しています。

リアルタイムモードのPython UDF

Databricksは、リアルタイムモードでPythonのユーザー定義関数(UDF)の大部分をサポートしています。

ステートレス

UDF型

サポート対象

PythonのスカラーUDF(ユーザー定義スカラー関数 - Python

矢印スカラーUDF

PandasスカラーUDF ( Pandasユーザー定義関数

矢印関数 ( mapInArrow )

Pandas関数( Map

ステートフルグルーピング(UDAF)

UDF型

サポート対象

transformWithStateRowインターフェースのみ)

transformWithStateInPandas

サポートされていません。代わりに、 Pandas DataFramesではなくRowオブジェクトを使用する行ベースのtransformWithState API使用してください。 詳細については、 transformWithStateInPandasがサポートされていないことを参照してください。

applyInPandasWithState

サポートされていない

非ステートフルなグループ化(UDAF)

UDF型

サポート対象

apply

サポートされていない

applyInArrow

サポートされていない

applyInPandas

サポートされていない

テーブル関数

UDF型

サポート対象

UDTF( Pythonユーザー定義テーブル関数(UDTF)

サポートされていない

UC UDF

サポートされていない

Python UDFをリアルタイムモードで使用する際には、考慮すべき点がいくつかあります。

  • レイテンシーを最小限に抑えるには、Arrow バッチサイズ(spark.sql.execution.arrow.maxRecordsPerBatch)を 1 に設定してください。

    • トレードオフ:この構成は、スループットを犠牲にしてレイテンシを最適化します。ほとんどのワークロードにおいて、この設定が推奨されます。
    • 入力量に対応するためにスループットの向上が必要な場合のみ、バッチサイズを増やしてください。その際、レイテンシが増加する可能性を許容してください。
  • PandasのUDFと関数は、Arrowのバッチサイズが1の場合、パフォーマンスが低下します。

    • Pandas UDFまたは関数を使用する場合は、矢印 サイズをより大きな値(たとえば、100以上)に設定してください。
    • これは、より高い遅延を意味する。Databricksは、可能であればArrow UDFまたは関数を使用することを推奨しています。
  • transformWithStateInPandas リアルタイムモードではサポートされていません。代わりに、 Pandas DataFramesではなくRowオブジェクトを使用する行ベースのtransformWithState API使用してください。 行ベースの API を使用した動作する Python の例については、 transformWithStateInPandasサポートされていないものおよびリアルタイム モードの例を参照してください。

  • UDFを使用するレイテンシに敏感なワークロードの場合、Databricksは専用アクセスモードの使用を推奨します。標準アクセスモードでは、セキュリティ分離のオーバーヘッドによりUDFのパフォーマンスが低下する可能性があります。