Pular para o conteúdo principal

Reparticionamento de estado sob demanda para consultas de transmissão com estado

info

Visualização

Esse recurso está em Prévia Pública.

O reparticionamento de estado sob demanda permite redimensionar o número de partições para uma consulta de transmissão estruturada com estado sem perder o estado do ponto de verificação.

Sem o reparticionamento de estado sob demanda, você define o número de partições de embaralhamento durante a criação do ponto de verificação. Se você alterar spark.sql.shuffle.partitions, as consultas com pontos de verificação existentes ignorarão o novo valor. Aplicar uma nova contagem de partições exige que você reinicie a consulta com um novo ponto de verificação.

O reparticionamento de estado sob demanda oferece os seguintes benefícios:

  • Ajuste as consultas redimensionando o número de partições sem reconstruir o ponto de verificação.
  • Aumente ou diminua a escala das consultas para corresponder às alterações na carga de trabalho.

Requisitos

Alterar o número de partições

Use a configuração Spark spark.sql.streaming.stateStore.partitions e reinicie a consulta para alterar o número de partições de estado de embaralhamento e transmissão:

Python
query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "<numPartitions>")
query = df.writeStream.start()

Para consultas com estado, spark.sql.streaming.stateStore.partitions tem precedência sobre spark.sql.shuffle.partitions. Após a consulta ser reiniciada e o último microlote planejado ser concluído, a consulta executa uma operação de repartição para redistribuir os dados de estado no novo número de partições. Após a conclusão das operações de repartição, a consulta retoma o processamento.

Estado de repartição do monitor

Após a conclusão do próximo microlote, StreamingQueryProgress eventos incluem a duração das operações de repartição. Nas métricas durationMs de um evento, controlBatch.REPARTITION mostra o valor da duração em milissegundos. Estados maiores podem aumentar o tempo necessário para o reparticionamento. Veja consultas de monitoramento transmissão estruturada no Databricks.

Exemplo de transmissão estructurada

O exemplo a seguir reduz o número de partições de uma consulta de 200 (o default para 100, embaralhando as partições. Interrompa a consulta, defina a nova contagem de partições e reinicie:

Python
# Start the query with the default partition count (200)
query = (df
.withWatermark("event_time", "10 minutes")
.groupBy(
window("event_time", "5 minutes"),
"id")
.count()
.writeStream
.format("delta")
.option("checkpointLocation", "/checkpoint/path")
.outputMode("append")
.start()
)

# Stop the query and scale down to 100 partitions
query.stop()

spark.conf.set("spark.sql.streaming.stateStore.partitions", "100")

# Restart the query with the same options
query = (df
.withWatermark("event_time", "10 minutes")
.groupBy(
window("event_time", "5 minutes"),
"id")
.count()
.writeStream
.format("delta")
.option("checkpointLocation", "/checkpoint/path")
.outputMode("append")
.start()
)

Exemplo de Pipelines Declarativos Lakeflow Spark

Em Lakeflow Spark Declarative Pipelines, defina spark.sql.streaming.stateStore.partitions usando o parâmetro spark_conf no decorador @dp.table ou @dp.append_flow.

Definir partições em um fluxo:

Python
from pyspark import pipelines as dp
from pyspark.sql import functions as F

source_path = "/databricks-datasets/iot-stream/data-device/"

dp.create_streaming_table("target_table")

@dp.append_flow(
target="target_table",
name="my_flow_1",
spark_conf={&quot;spark.sql.streaming.stateStore.partitions&quot;: &quot;100&quot;}
)
def my_flow_1():
return (spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load(source_path)
.withColumn("timestamp", F.to_timestamp("timestamp"))
.withWatermark("timestamp", "10 minutes")
.groupBy(F.window("timestamp", "5 minutes"), "id")
.count())

Defina as partições no nível da tabela para o fluxo default:

Python
from pyspark import pipelines as dp
from pyspark.sql import functions as F

source_path = "/databricks-datasets/iot-stream/data-device/"

@dp.table(
name="table_1",
spark_conf={&quot;spark.sql.streaming.stateStore.partitions&quot;: &quot;100&quot;}
)
def table_1():
return (spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load(source_path)
.withColumn("timestamp", F.to_timestamp("timestamp"))
.withWatermark("timestamp", "10 minutes")
.groupBy(F.window("timestamp", "5 minutes"), "id")
.count())