ステートフルなストリーミングクエリのためのオンデマンド状態再分割
オンデマンドの状態再パーティショニングを使用すると、チェックポイントの状態を失うことなく、ステートフルな構造化ストリーミングクエリのパーティション数を調整できます。
オンデマンドの状態再パーティショニングを使用しない場合、シャッフルパーティションの数はチェックポイント作成時に設定します。spark.sql.shuffle.partitionsを変更すると、既存のチェックポイントを持つクエリは新しい値を無視します。新しいパーティション数を適用するには、新しいチェックポイントを使用してクエリを再開する必要があります。
オンデマンドの状態再分割には、次のような利点があります。
- チェックポイントを再構築せずにパーティション数を変更することで、クエリを最適化します。
- ワークロードの変化に合わせてクエリの規模を拡大または縮小します。
要件
- Databricks Runtime 18.3以降。
- クエリではRocksDB状態ストア プロバイダーを使用する必要があります。 DBR 17.3以降では、RocksDBがデフォルトの状態ストアプロバイダです。「 DatabricksでRocksDB状態ストアを構成する」を参照してください。
パーティション数を変更する
Spark構成spark.sql.streaming.stateStore.partitionsを使用してクエリを再起動し、シャッフル状態とストリーミング状態のパーティション数を変更します。
- Python
- Scala
query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "<numPartitions>")
query = df.writeStream.start()
query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "<numPartitions>")
val query = df.writeStream.start()
ステートフルクエリの場合、 spark.sql.streaming.stateStore.partitions spark.sql.shuffle.partitionsよりも優先されます。クエリが再起動され、最後に計画されたマイクロバッチが完了すると、クエリは再パーティション操作を実行して、状態データを新しい数のパーティションに再分配します。再パーティション操作が完了すると、クエリの処理が再開されます。
再パーティションの状態を監視する
次のマイクロバッチが完了すると、 StreamingQueryProgressのイベントに再パーティション操作の所要時間が含まれます。イベントのdurationMsメトリクスでは、 controlBatch.REPARTITIONミリ秒単位の継続時間値を示します。州の規模が大きくなると、再分割にかかる時間が長くなる可能性がある。Databricksのモニタリング構造化ストリーミング クエリ」を参照してください。
例
次の例では、クエリのシャッフルパーティション数をデフォルト値の200から100に縮小しています。クエリを停止し、新しいパーティション数を設定して、再起動します。
- Python
- Scala
# Start the query with the default partition count (200)
query = (df
.withWatermark("event_time", "10 minutes")
.groupBy(
window("event_time", "5 minutes"),
"id")
.count()
.writeStream
.format("delta")
.option("checkpointLocation", "/checkpoint/path")
.outputMode("append")
.start()
)
# Stop the query and scale down to 100 partitions
query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "100")
# Restart the query with the same options
query = (df
.withWatermark("event_time", "10 minutes")
.groupBy(
window("event_time", "5 minutes"),
"id")
.count()
.writeStream
.format("delta")
.option("checkpointLocation", "/checkpoint/path")
.outputMode("append")
.start()
)
// Start the query with the default partition count (200)
val query = df
.withWatermark("event_time", "10 minutes")
.groupBy(
window($"event_time", "5 minutes"),
$"id")
.count()
.writeStream
.format("delta")
.option("checkpointLocation", "/checkpoint/path")
.outputMode("append")
.start()
// Stop the query and scale down to 100 partitions
query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "100")
// Restart the query with the same options
val query2 = df
.withWatermark("event_time", "10 minutes")
.groupBy(
window($"event_time", "5 minutes"),
$"id")
.count()
.writeStream
.format("delta")
.option("checkpointLocation", "/checkpoint/path")
.outputMode("append")
.start()