メインコンテンツまでスキップ

リアルタイムモードを設定する

このページでは、構造化ストリーミングでリアルタイムモードのクエリを実行するために必要な前提条件と構成について説明します。段階的なチュートリアルについては、「チュートリアル: 連続ストリーミング ワークロードの実行」を参照してください。 ラケット モードに関する概念的な情報については、 「構造化ストリーミング」の「ラケット モード」を参照してください。

前提条件

ログイン モードを使用するには、次の要件を満たすようにコンピュートを構成する必要があります。

  • クラシックなコンピュートを使用します。 専用アクセスモードと標準アクセスモードの両方がサポートされています。標準アクセスモードはPythonのみでサポートされています。LakeFlow Spark 宣言型パイプラインおよびSparkはサポートされていません。
  • Databricks Runtime 16.4 LTS以降を使用してください。
  • オートスケールをオフにします。
  • Photonをオフにしてください。
  • spark.databricks.streaming.realTimeMode.enabled trueに設定します。
  • 中断を避けるため、スポットインスタンスをオフにしてください。

UDFを使用するレイテンシに敏感なワークロードの場合、Databricksは専用アクセスモードの使用を推奨します。テーブル関数を参照してください。

クラシック コンピュートの作成および構成の手順については、 「コンピュート構成リファレンス」を参照してください。

クエリ構成

クエリをリアルタイムモードで実行するには、リアルタイムトリガーを有効にする必要があります。リアルタイムトリガーは更新モードでのみサポートされます。

Python
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.outputMode("update")
# In PySpark, the realTime trigger requires specifying the interval.
.trigger(realTime="5 minutes")
.start()
)

コンピュートのサイジング

コンピュートに十分なタスク スロットがある場合は、コンピュート リソースごとに 1 つの保留ジョブを実行できます。

低遅延モードで実行するには、使用可能なタスク スロットの合計数が、すべてのクエリ ステージにわたるタスクの数以上である必要があります。

スロット計算例

パイプラインタイプ

構成

必要なスロット数

シングルステージステートレス ( Kafkaソース + シンク)

maxPartitions = 8

8スロット

2 段階ステートフル ( Kafkaソース + シャッフル)

maxPartitions = 8、シャッフルパーティション = 20

28スロット(8+20)

3 段階 ( Kafkaソース + シャッフル + 再分割)

maxPartitions = 8、20回のシャッフルステージが2回

48スロット(8 + 20 + 20)

maxPartitionsを設定しない場合は、Kafkaトピックのパーティション数を使用します。

その他のリソース