Pular para o conteúdo principal

Exemplos de fluxos no pipeline declarativo LakeFlow Spark

Exemplo: Escrever em uma tabela de transmissão a partir de vários tópicos Kafka

Os exemplos a seguir criam uma tabela de transmissão chamada kafka_target e grava nessa tabela de transmissão a partir de dois tópicos de Kafka:

Python
from pyspark import pipelines as dp

dp.create_streaming_table("kafka_target")

# Kafka stream from multiple topics
@dp.append_flow(target = "kafka_target")
def topic1():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic1")
.load()
)

@dp.append_flow(target = "kafka_target")
def topic2():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic2")
.load()
)

Para saber mais sobre a função com valor de tabela read_kafka() usada nas consultas SQL, consulte read_kafka na referência da linguagem SQL.

Em Python, você pode criar programaticamente vários fluxos que têm como alvo uma única tabela. O exemplo a seguir mostra esse padrão para uma lista de tópicos do Kafka.

nota

Este padrão tem os mesmos requisitos que usar um loop for para criar tabelas. Você deve passar explicitamente um valor Python para a função que define o fluxo. Veja Criar tabelas em um loop for.

Python
from pyspark import pipelines as dp

dp.create_streaming_table("kafka_target")

topic_list = ["topic1", "topic2", "topic3"]

for topic_name in topic_list:

@dp.append_flow(target = "kafka_target", name=f"{topic_name}_flow")
def topic_flow(topic=topic_name):
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", topic)
.load()
)

Exemplo: execução de um preenchimento único de dados

Se você quiser executar uma consulta para anexar dados a uma tabela de transmissão existente, use append_flow.

Depois de anexar um conjunto de dados existentes, você tem várias opções:

  • Se você quiser que a consulta anexe novos dados se eles chegarem ao diretório de preenchimento, deixe a consulta no local.
  • Se você quiser que isso seja um preenchimento único e nunca mais seja executado, remova a consulta após executar o pipeline uma vez.
  • Se você quiser que a consulta seja executada uma vez e somente executada novamente nos casos em que os dados estão sendo totalmente atualizados, defina o parâmetro once como True no fluxo de acréscimo. Em SQL, use INSERT INTO ONCE.

Os exemplos a seguir executam uma consulta para acrescentar dados históricos a uma tabela de transmissão:

Python
from pyspark import pipelines as dp

@dp.table()
def csv_target():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/sourceDir")

@dp.append_flow(
target = "csv_target",
once = True)
def backfill():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/backfill/data/dir")

Para um exemplo mais detalhado, consulte Preenchimento de dados históricos com pipeline.

Exemplo: use o processamento de fluxo de acréscimo em vez de UNION

Em vez de usar uma consulta com uma cláusula UNION , você pode usar consultas de fluxo de acréscimo para combinar várias fontes e gravar em uma única tabela de transmissão. Usar consultas de fluxo de acréscimo em vez de UNION permite que você anexe dados a uma tabela de transmissão de várias fontes sem executar uma refreshcompleta.

O exemplo de Python a seguir inclui uma consulta que combina várias fontes de dados com uma cláusula UNION:

Python
@dp.create_table(name="raw_orders")
def unioned_raw_orders():
raw_orders_us = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
)

raw_orders_eu = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
)

return raw_orders_us.union(raw_orders_eu)

Os exemplos a seguir substituem a consulta UNION por consultas de fluxo de acréscimo:

Python
dp.create_streaming_table("raw_orders")

@dp.append_flow(target="raw_orders")
def raw_orders_us():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")

@dp.append_flow(target="raw_orders")
def raw_orders_eu():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")

# Additional flows can be added without the full refresh that a UNION query would require:
@dp.append_flow(target="raw_orders")
def raw_orders_apac():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/apac")

Exemplo: Use transformWithState para monitorar os batimentos cardíacos do sensor

O exemplo a seguir mostra um processador com estado que lê do Kafka e verifica se os sensores estão emitindo sinais de pulsação periodicamente. Se um sinal de pulsação não for recebido em 5 minutos, o processador emite uma entrada para a tabela Delta de destino para análise.

Para obter mais informações sobre como criar aplicativos com estado personalizados, consulte Criar um aplicativo com estado personalizado.

nota

RocksDB é o provedor de estado default a partir do Databricks Runtime 17.2. Se a consulta falhar devido a uma exceção de provedor não suportado, adicione as seguintes configurações pipeline , execute uma refresh completa ou uma redefinição de ponto de verificação e, em seguida, execute o pipeline novamente:

JSON
"configuration": {
"spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider",
"spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled": "true"
}
Python
from typing import Iterator

import pandas as pd

from pyspark import pipelines as dp
from pyspark.sql.functions import col, from_json
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import StructType, StructField, LongType, StringType, TimestampType

KAFKA_TOPIC = "<your-kafka-topic>"

output_schema = StructType([
StructField("sensor_id", LongType(), False),
StructField("sensor_type", StringType(), False),
StructField("last_heartbeat_time", TimestampType(), False)])

class SensorHeartbeatProcessor(StatefulProcessor):
def init(self, handle: StatefulProcessorHandle) -> None:
# Define state schema to store sensor information (sensor_id is the grouping key)
state_schema = StructType([
StructField("sensor_type", StringType(), False),
StructField("last_heartbeat_time", TimestampType(), False)])
self.sensor_state = handle.getValueState("sensorState", state_schema)
# State variable to track the previously registered timer
timer_schema = StructType([StructField("timer_ts", LongType(), False)])
self.timer_state = handle.getValueState("timerState", timer_schema)
self.handle = handle

def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
# Process one row from input and update state
pdf = next(rows)
row = pdf.iloc[0]
# Store or update the sensor information in state using current timestamp
current_time = pd.Timestamp(timerValues.getCurrentProcessingTimeInMs(), unit='ms')
self.sensor_state.update((
row["sensor_type"],
current_time
))

# Delete old timer if already registered
if self.timer_state.exists():
old_timer = self.timer_state.get()[0]
self.handle.deleteTimer(old_timer)

# Register a timer for 5 minutes from current processing time
expiry_time = timerValues.getCurrentProcessingTimeInMs() + (5 * 60 * 1000)
self.handle.registerTimer(expiry_time)
# Store the new timer timestamp in state
self.timer_state.update((expiry_time,))

# No output on input processing, output only on timer expiry
return iter([])

def handleExpiredTimer(self, key, timerValues, expiredTimerInfo) -> Iterator[pd.DataFrame]:
# Emit output row based on state store
if self.sensor_state.exists():
state = self.sensor_state.get()
output = pd.DataFrame({
"sensor_id": [key[0]], # Use grouping key as sensor_id
"sensor_type": [state[0]],
"last_heartbeat_time": [state[1]]
})
# Remove the entry for the sensor from the state store
self.sensor_state.clear()
# Remove the timer state entry
self.timer_state.clear()
yield output

def close(self) -> None:
pass

dp.create_streaming_table("sensorAlerts")

# Define the schema for the Kafka message value
sensor_schema = StructType([
StructField("sensor_id", LongType(), False),
StructField("sensor_type", StringType(), False),
StructField("sensor_value", LongType(), False)])

@dp.append_flow(target = "sensorAlerts")
def kafka_delta_flow():
return (
spark.readStream
.format("kafka")
.option("subscribe", KAFKA_TOPIC)
.option("startingOffsets", "earliest")
.load()
.select(from_json(col("value").cast("string"), sensor_schema).alias("data"), col("timestamp"))
.select("data.*", "timestamp")
.withWatermark('timestamp', '1 hour')
.groupBy(col("sensor_id"))
.transformWithStateInPandas(
statefulProcessor = SensorHeartbeatProcessor(),
outputStructType = output_schema,
outputMode = 'update',
timeMode = 'ProcessingTime'))