リアルタイムモードを設定する
このページでは、構造化ストリーミングでリアルタイムモードのクエリを実行するために必要な前提条件と構成について説明します。段階的なチュートリアルについては、「チュートリアル: 連続ストリーミング ワークロードの実行」を参照してください。 ラケット モードに関する概念的な情報については、 「構造化ストリーミング」の「ラケット モード」を参照してください。
前提条件
ログイン モードを使用するには、次の要件を満たすようにコンピュートを構成する必要があります。
- クラシックコンピュートでは専用アクセスモードを使用してください。 標準アクセスモード、 LakeFlow Spark宣言型パイプライン、サーバレスクラスターには対応しておりません。
- Databricks Runtime 16.4 LTS以降を使用してください。
- オートスケールをオフにします。
- Photonをオフにしてください。
spark.databricks.streaming.realTimeMode.enabledtrueに設定します。- 中断を避けるため、スポットインスタンスをオフにしてください。
クラシック コンピュートの作成および構成の手順については、 「コンピュート構成リファレンス」を参照してください。
クエリ構成
クエリをリアルタイムモードで実行するには、リアルタイムトリガーを有効にする必要があります。リアルタイムトリガーは更新モードでのみサポートされます。
- Python
- Scala
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.outputMode("update")
# In PySpark, the realTime trigger requires specifying the interval.
.trigger(realTime="5 minutes")
.start()
)
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
val readStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic).load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.outputMode("update")
.trigger(RealTimeTrigger.apply())
// RealTimeTrigger can also accept an argument specifying the checkpoint interval.
// For example, this code indicates a checkpoint interval of 5 minutes:
// .trigger(RealTimeTrigger.apply("5 minutes"))
.start()
コンピュートのサイジング
コンピュートに十分なタスク スロットがある場合は、コンピュート リソースごとに 1 つの保留ジョブを実行できます。
低遅延モードで実行するには、使用可能なタスク スロットの合計数が、すべてのクエリ ステージにわたるタスクの数以上である必要があります。
スロット計算例
パイプラインタイプ | 構成 | 必要なスロット数 |
|---|---|---|
シングルステージステートレス ( Kafkaソース + シンク) |
| 8スロット |
2 段階ステートフル ( Kafkaソース + シャッフル) |
| 28スロット(8+20) |
3 段階 ( Kafkaソース + シャッフル + 再分割) |
| 48スロット(8 + 20 + 20) |
maxPartitionsを設定しない場合は、Kafkaトピックのパーティション数を使用します。