modo tempo real em transmissão estruturada
Visualização
Esse recurso está em Public Preview.
Esta página descreve o modo tempo real, um tipo de gatilho para transmissão estruturada que permite o processamento de dados com latência ultrabaixa, com latência de ponta a ponta de apenas 5 ms. Este modo foi projetado para cargas de trabalho operacionais que exigem resposta imediata aos dados de transmissão.
LTS O modo tempo real está disponível no Databricks Runtime, 16.4 e versões superiores.
Cargas de trabalho operacionais
As cargas de trabalho de transmissão podem ser amplamente divididas em cargas de trabalho analíticas e cargas de trabalho operacionais:
- As cargas de trabalho analíticas utilizam a ingestão de dados e transformações, seguindo normalmente a arquitetura medalhão (por exemplo, ingestão de dados nas tabelas bronze, prata e ouro).
- As cargas de trabalho operacionais consomem dados em tempo real, aplicam lógica de negócios e acionam ações ou decisões posteriores.
Alguns exemplos de cargas de trabalho operacionais são:
- Bloquear ou sinalizar uma transação com cartão de crédito em tempo real se uma pontuação de fraude exceder um limite, com base em fatores como localização incomum, grande valor da transação ou padrões de gastos rápidos.
- Entregar uma mensagem promocional quando os dados do clickstream mostram que um usuário está procurando jeans há cinco minutos, oferecendo um desconto de 25% se ele comprar nos próximos 15 minutos.
Em geral, as cargas de trabalho operacionais são caracterizadas pela necessidade de latência de ponta a ponta de menos de um segundo. Isso pode ser alcançado com o modo tempo real em uma transmissão estruturada Apache Spark.
Como o modo em tempo real alcança baixa latência
O modo em tempo real aprimora a arquitetura de execução ao:
- Executar lotes de longa duração (o “ default ” é de 5 minutos), nos quais os dados são processados à medida que ficam disponíveis na fonte.
- Todos os estágios da consulta são agendados simultaneamente. Isso requer que o número de slots de tarefa disponíveis seja igual ou superior ao número de tarefas de todas as etapas em um lote.
- Os dados são transmitidos entre as etapas assim que são produzidos, utilizando um shuffle de transmissão.
Ao final do processamento de um lote, e antes do próximo lote começar, a transmissão estruturada verifica o progresso dos pontos de controle e disponibiliza as métricas do último lote. Se os lotes forem mais longos, essas atividades podem ser menos frequentes, levando a repetições mais longas em caso de falha e atraso na disponibilidade das métricas. Por outro lado, se os lotes forem menores, essas atividades se tornam mais frequentes, podendo afetar a latência. A Databricks recomenda que você compare o modo em tempo real com sua carga de trabalho e seus requisitos para encontrar o intervalo de acionamento adequado.
configuração de agrupamento
Para utilizar o modo tempo real na transmissão estruturada, é necessário configurar um LakeFlow Job clássico:
-
No seu Databricks workspace, clique em Novo no canto superior esquerdo. Selecione Mais e clique em agrupamento .
-
Limpe a aceleração doPhoton .
-
Desative a opção “Ativar escala automática ”.
-
Em Desempenho avançado , desmarque a opção Usar instâncias spot .
-
Em Modo Avançado e de Acesso, clique em Manual e selecione Dedicado (anteriormente: Usuário único).
-
Em Spark , insira o seguinte em Spark config :
spark.databricks.streaming.realTimeMode.enabled true
-
Clique em Criar .
requisitos de tamanho do agrupamento
É possível executar uma tarefa em tempo real por agrupamento, caso o agrupamento possua slots de tarefas suficientes.
Para execução no modo de baixa latência, o número total de slots de tarefa disponíveis deve ser maior ou igual ao número de tarefas em todas as etapas da consulta.
Exemplos de cálculo de slots
pipeline e sem estado de estágio único (Kafka e fonte + coletor):
Se maxPartitions = 8, você precisará de pelo menos 8 slots. Se maxPartitions não estiver definido, utilize o número de partições do tópico Kafka.
pipeline e com estado em duas etapas (Kafka e fonte + shuffle):
Se maxPartitions = 8 e shuffle partitions = 20, você precisará de 8 + 20 = 28 slots.
pipeline e em três etapas (Kafka fonte + shuffle + repartição):
Com maxPartitions = 8 e dois estágios aleatórios de 20 cada, você precisa de 8 + 20 + 20 = 48 slots.
considerações importantes
Ao configurar o clustering, considere o seguinte:
-
Ao contrário do modo micro-lotes, a tarefa em tempo real pode permanecer parada enquanto aguarda dados, portanto, o dimensionamento correto é essencial para evitar desperdício de recursos.
-
Almeje um nível de utilização alvo (por exemplo, 50%) ajustando:
maxPartitions
(para Kafka)spark.sql.shuffle.partitions
(para fases aleatórias)
-
A Databricks recomenda definir maxPartitions para que cada tarefa lide com várias partições do Kafka, a fim de reduzir a sobrecarga.
-
Ajuste os slots de tarefas por worker para corresponder à carga de trabalho para tarefas simples de uma etapa.
-
Para tarefas com muitas reorganizações, experimente encontrar o número mínimo de partições de reorganização que evitem atrasos e ajuste a partir daí. A tarefa não será agendada se o agrupamento não tiver slots suficientes.
A partir de Databricks Runtime 16.4 LTS e acima, todos os pipelines tempo real utilizam o checkpoint v2, que permite a alternância perfeita entre os modos tempo real e micro-lotes.
Configuração de consulta
É necessário habilitar o gatilho em tempo real para especificar que uma consulta deve ser executada usando o modo de baixa latência. Além disso, os acionadores em tempo real são suportados apenas no modo de atualização. Por exemplo:
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
val readStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic).load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.outputMode(“update”)
.trigger(RealTimeTrigger.apply())
.start()
O RealTimeTrigger também pode aceitar um argumento especificando o intervalo do ponto de verificação. Por exemplo, esse código indica um intervalo de ponto de verificação de 5 minutos:
.trigger(RealTimeTrigger.apply("5 minutes"))
Observabilidade
Anteriormente, a latência de consulta de ponta a ponta estava intimamente ligada à duração dos lotes, tornando a duração dos lotes um bom indicador da latência da consulta. No entanto, este método não se aplica mais no modo em tempo real, exigindo abordagens alternativas para medir a latência. A latência de ponta a ponta é específica da carga de trabalho e, às vezes, só pode ser medida com precisão com a lógica comercial. Por exemplo, se o carimbo de data/hora da fonte for emitido no Kafka, a latência pode ser calculada como a diferença entre o carimbo de data/hora de saída do Kafka e o carimbo de data/hora da fonte.
É possível estimar a latência de ponta a ponta de várias maneiras, com base em informações parciais coletadas durante o processo de transmissão.
Use o StreamingQueryProgress
As seguintes métricas estão incluídas no evento “ StreamingQueryProgress
”, que é automaticamente registrado no driver “ logs”. Você também pode acessá-los por meio da função de retorno de chamada onQueryProgress()
do StreamingQueryListener
. QueryProgressEvent.json()
ou toString()
incluem métricas adicionais do modo em tempo real.
- Latência de processamento (ProcessingLatencyMS ). O tempo decorrido entre o momento em que a consulta no modo em tempo real lê um registro e antes de ele ser gravado no próximo estágio ou downstream. Para consultas de estágio único, isso mede a mesma duração que a latência E2E. Essas métricas são relatadas por tarefa.
- Latência do enfileiramento de origem (SourceQueuingLatencyMS) . O tempo decorrido entre o momento em que um registro é gravado com sucesso em um barramento de mensagens, por exemplo, o tempo de acréscimo ao log no Kafka, e o momento em que o registro foi lido pela primeira vez pela consulta em modo em tempo real. Essas métricas são relatadas por tarefa.
- Latência E2E (latências E2E) . O tempo entre o momento em que o registro é gravado com sucesso em um barramento de mensagens e o momento em que o registro é gravado a jusante pela consulta em modo tempo real. Essas métricas são agregadas por lotes em todos os registros processados por todas as tarefas.
Por exemplo:
"rtmMetrics" : {
"processingLatencyMs" : {
"P0" : 0,
"P50" : 0,
"P90" : 0,
"P95" : 0,
"P99" : 0
},
"sourceQueuingLatencyMs" : {
"P0" : 0,
"P50" : 1,
"P90" : 1,
"P95" : 2,
"P99" : 3
},
"e2eLatencyMs" : {
"P0" : 0,
"P50" : 1,
"P90" : 1,
"P95" : 2,
"P99" : 4
},
Utilizar “ API ” na função
O recurso "Observar" API auxilia na medição da latência sem iniciar outra tarefa. Se você tiver um carimbo de data/hora da fonte que se aproxime da hora de chegada dos dados da fonte e ele for passado antes de chegar ao destino, ou se você puder encontrar uma maneira de passar o carimbo de data/hora, você pode estimar a latência de cada lote usando o Observe API:
val currentTimestampUDF = udf(() => System.currentTimeMillis())
// Query before outputting
.withColumn("temp-timestamp", currentTimestampUDF())
.withColumn(
"latency",
col("temp-timestamp").cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
name = "observedLatency",
avg(col("latency")).as("avg"),
max(col("latency")).as("max"),
percentile_approx(col("latency"), lit(0.99), lit(150)).as("p99"),
percentile_approx(col("latency"), lit(0.5), lit(150)).as("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
// Output part of the query. For example, .WriteStream, etc.
Neste exemplo, um timestamp atual é registrado antes da saída da entrada, e a latência é estimada calculando a diferença entre esse timestamp e o timestamp de origem do registro. Os resultados são incluídos nos relatórios de progresso e disponibilizados aos ouvintes. Aqui está um exemplo de saída:
"observedMetrics" : {
"observedLatency" : {
"avg" : 63.8369765176552,
"max" : 219,
"p99" : 154,
"p50" : 49
}
}
O que é suportado?
Ambientes
Tipo de cluster | Suportado |
---|---|
Dedicado (anteriormente: usuário único) | Sim |
Padrão (anteriormente: compartilhado) | Não |
LakeFlow Pipeline declarativo clássico | Não |
LakeFlow Servidor de pipeline declarativo sem servidor | Não |
Serverless | Não |
Idiomas
Idioma | Suportado |
---|---|
Scala | Sim |
Java | Sim |
Python | Sim |
Modos de execução
Execução Mode | Suportado |
---|---|
Modo de atualização | Sim |
Modo de anexação | Não |
Modo completo | Não |
Fontes
Fontes | Suportado |
---|---|
Apache Kafka | Sim |
AWS MSK | Sim |
Eventhub (utilizando o Conector Kafka) | Sim |
Kinesis | Sim (somente modo EFO) |
Google Pub/Sub | Não |
Apache Pulsar | Não |
Pias
Pias | Suportado |
---|---|
Apache Kafka | Sim |
Eventhub (utilizando o Conector Kafka) | Sim |
Kinesis | Não |
Google Pub/Sub | Não |
Apache Pulsar | Não |
Dissipadores arbitrários (usando ForEachWriter) | Sim |
Operadores
Operadores | Suportado |
---|---|
Operações sem estado | |
| Sim |
| Sim |
UDFs | |
| Sim |
| Sim (com algumas limitações) |
Agregação | |
| Sim |
| Sim |
| Sim |
| Sim |
| Sim |
Sim | |
Janelamento | |
| Sim |
| Sim |
| Não |
Desduplicação | |
| Sim (o estado é ilimitado) |
| Não |
transmissão - Junção de tabelas | |
| Sim |
transmissão - transmissão join | Não |
Grupos de mapas (planos) com estado | Não |
Transformar com estado | Sim (com algumas diferenças) |
união | Sim (com algumas limitações) |
Para cada | Sim |
Para cada lote | Não |
Partições do mapa | Sim |
Utilize transformWithState no modo em tempo real.
Para a criação de aplicativos personalizados com estado, o Databricks suporta transformWithState
, um API em Apache Spark transmissão estruturada. Consulte Criar um aplicativo com estado personalizado para obter mais informações sobre o API e trechos de código.
No entanto, existem algumas diferenças entre o comportamento do API no modo tempo real e as consultas de transmissão tradicionais que utilizam a arquitetura de micro-lotes.
-
O método no modo em tempo real
handleInputRows(key: String, inputRows: Iterator[T], timerValues: TimerValues)
é chamado para cada linha.- O iterador
inputRows
retorna um único valor. No modo micro lotes, ele é chamado uma vez para cada key, e o iteradorinputRows
retorna todos os valores para um key nos micro lotes. - Você deve estar ciente dessa diferença ao escrever seu código.
- O iterador
-
Os temporizadores de eventos não são suportados no modo em tempo real.
-
No modo em tempo real, os temporizadores são atrasados em sua ativação, dependendo da chegada dos dados. Caso contrário, se não houver dados, ele será disparado no final dos lotes de longa execução. Por exemplo, se um cronômetro deve disparar às 10:00:00 e não houver chegada de dados simultaneamente, ele não será acionado. Em vez disso, se os dados chegarem às 10:00:10, o cronômetro será acionado com um atraso de 10 segundos. Ou, se nenhum dado chegar e os lotes de longa execução estiverem sendo encerrados, execute o temporizador antes de encerrar os lotes de longa execução.
UDFs do Python
O Databricks suporta a maioria das funções definidas pelo usuário (UDFs) do Python no modo em tempo real:
Tipo de UDF | Suportado |
---|---|
UDF sem estado | |
| Sim |
| Sim |
| Sim |
| Sim |
| Sim |
UDF de agrupamento com estado (UDAF) | |
| Sim |
| Não |
UDF de agrupamento não fatídico (UDAF) | |
| Não |
| Não |
| Não |
Função de tabela | |
Não | |
UC UDF | Não |
Há vários pontos a serem considerados ao usar UDFs do Python no modo de tempo real:
-
Para minimizar a latência, configure o tamanho dos lotes do Arrow (spark.sql.execution.arrow.maxRecordsPerBatch) como 1.
- Compensação: essa configuração otimiza a latência em detrimento da taxa de transferência. Para a maioria das cargas de trabalho, essa configuração é recomendada.
- Aumente o tamanho dos lotes somente se for necessária uma taxa de transferência maior para acomodar o volume de entrada, aceitando o possível aumento na latência.
-
Pandas Os UDFs e as funções não funcionam bem com um tamanho de lote Arrow de 1.
- Se o senhor usar Pandas UDFs ou funções, defina o tamanho dos lotes Arrow para um valor mais alto (por exemplo, 100 ou mais).
- Observe que isso implica maior latência. A Databricks recomenda o uso do Arrow UDF ou da função, se possível.
-
Devido ao problema de desempenho com Pandas, transformWithState só é compatível com a interface
Row
.
Limitações
Limitação da fonte
Para o Kinesis, o modo de pesquisa não é compatível. Além disso, repartições frequentes podem afetar negativamente a latência.
Limitação sindical
Para o Union, existem algumas limitações:
-
A autounião não é suportada:
- Kafka : Não é possível utilizar o mesmo objeto de estrutura de dados de origem e estruturas de dados derivadas da união. Solução alternativa: use dataframes diferentes que sejam lidos da mesma fonte.
- Kinesis : Não é possível unir estruturas de dados derivadas da mesma fonte Kinesis com a mesma configuração. Solução alternativa: além de utilizar Dataframes diferentes, é possível atribuir uma opção “consumerName” diferente a cada DataFrame.
-
Operadores com estado (por exemplo,
aggregate
,deduplicate
,transformWithState
) definidos antes da União não são suportados. -
A união com fontes lotes não é suportada.
Exemplos
Os exemplos abaixo mostram consultas que são suportadas.
Consultas sem estado
Todas as consultas sem estado de um ou vários estágios são suportadas.
Fonte Kafka para Destino Kafka
Neste exemplo, você lê de uma fonte Kafka e grava em um destino Kafka.
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", inputTopic)
.load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", Files.createTempDirectory("some-prefix").toFile.getName)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
Repartição
Neste exemplo, você lê de uma fonte Kafka, reparticiona os dados em 20 partições e grava em um destino Kafka.
Devido a uma limitação atual da implementação, defina a configuração do Spark spark.sql.execution.sortBeforeRepartition
como false
antes de utilizar a repartição.
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
// Sorting is not supported in repartition with real-time mode, so this has to be set to false to achieve low latency.
spark.conf.set("spark.sql.execution.sortBeforeRepartition", "false")
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", inputTopic)
.load()
.repartition(20)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", Files.createTempDirectory("some-prefix").toFile.getName)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
transmissão-Instantâneo join (apenas transmissão)
Neste exemplo, você lê de Kafka, join os dados com uma tabela estática e grava em um sink Kafka. Observe que apenas a junção transmissão-estática que transmite a tabela estática é suportada, o que significa que a tabela estática deve caber na memória.
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", inputTopic)
.load()
.join(broadcast(spark.read.format("parquet").load(staticTableLocation)), expr("joinKey = lookupKey"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", Files.createTempDirectory("some-prefix").toFile.getName)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
Fonte Kinesis para destino Kafka
Neste exemplo, você lê de uma fonte Kinesis e grava em um destino Kafka.
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
spark.readStream
.format("kinesis")
.option(REGION_KEY, regionName)
.option(AWS_ACCESS_ID_KEY, awsAccessKeyId)
.option(AWS_SECRET_KEY, awsSecretAccessKey)
.option(CONSUMER_MODE_KEY, CONSUMER_MODE_EFO)
.option(CONSUMER_NAME_KEY, kinesisSourceStream.consumerName)
.load()
.select(
col("partitionKey").alias("key"),
col("data").cast("string").alias("value")
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", Files.createTempDirectory("some-prefix").toFile.getName)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
União
Neste exemplo, você une dois DataFrames do Kafka de dois tópicos diferentes e grava em um destino do Kafka.
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
val df1 = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", inputTopic1)
.load()
val df2 = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", inputTopic2)
.load()
df1.union(df2)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", Files.createTempDirectory("some-prefix").toFile.getName)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
Consultas com estado
Desduplicação
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
spark.conf.set(SQLConf.SHUFFLE_PARTITIONS.key, 40)
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", inputTopic)
.load()
.dropDuplicates("timestamp", "value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", Files.createTempDirectory("some-prefix").toFile.getName)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
Agregação
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
spark.conf.set(SQLConf.SHUFFLE_PARTITIONS.key, 20)
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", inputTopic)
.load()
.groupBy(col("timestamp"), col("value"))
.count()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", Files.createTempDirectory("some-prefix").toFile.getName)
.trigger(RealTimeTrigger.apply("5 minutes"))
.outputMode(OutputMode.Update())
.start()
União com agregação
Neste exemplo, primeiro você une dois DataFrames do Kafka de dois tópicos diferentes e, em seguida, realiza uma agregação. Por fim, você grava no sink Kafka.
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
val df1 = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", inputTopic1)
.load()
val df2 = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", inputTopic2)
.load()
df1.union(df2)
.groupBy(col("timestamp"), col("value"))
.count()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", Files.createTempDirectory("some-prefix").toFile.getName)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
Transformar com estado
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
/**
* This processor counts the number of records it has seen for each key using state variables
* with TTLs. It redundantly maintains this count with a value, list, and map state to put load
* on the state variable cleanup mechanism. (In practice, only one value state is needed to maintain
* the count for a given grouping key.)
*
* The input schema it expects is (String, Long) which represents a (key, source-timestamp) tuple.
* The source-timestamp is passed through so that we can calculate end-to-end latency. The output
* schema is (String, Long, Long), which represents a (key, count, source-timestamp) 3-tuple.
*
*/
class RTMStatefulProcessor(ttlConfig: TTLConfig)
extends StatefulProcessor[String, (String, Long), (String, Long, Long)] {
@transient private var _value: ValueState[Long] = _
@transient private var _map: MapState[Long, String] = _
@transient private var _list: ListState[String] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
// Counts the number of records this key has seen
_value = getHandle.getValueState("value", Encoders.scalaLong, ttlConfig)
_map = getHandle.getMapState("map", Encoders.scalaLong, Encoders.STRING, ttlConfig)
_list = getHandle.getListState("list", Encoders.STRING, ttlConfig)
}
override def handleInputRows(
key: String,
inputRows: Iterator[(String, Long)],
timerValues: TimerValues): Iterator[(String, Long, Long)] = {
inputRows.map { row =>
val key = row._1
val sourceTimestamp = row._2
val oldValue = _value.get()
_value.update(oldValue + 1)
_map.updateValue(oldValue, key)
_list.appendValue(key)
(key, oldValue + 1, sourceTimestamp)
}
}
}
spark.conf.set(SQLConf.SHUFFLE_PARTITIONS.key, 20)
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", inputTopic)
.load()
.select(col("key").cast("STRING"), col("value").cast("STRING"), col("timestamp"))
.as[(String, String, Timestamp)]
.groupByKey(row => row._1)
.transformWithState(new RTMStatefulProcessor(TTLConfig(Duration.ofSeconds(30))), TimeMode.ProcessingTime, OutputMode.Update)
.as[(String, Long, Long)]
.select(
col("_1").as("key"),
col("_2").as("value")
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", Files.createTempDirectory("some-prefix").toFile.getName)
.trigger(RealTimeTrigger.apply("5 minutes"))
.outputMode(OutputMode.Update())
.start()
Existe uma diferença entre a forma como o modo tempo real e outros modos de execução na transmissão estruturada executam o StatefulProcessor
em transformWithState
. Consulte Utilizar transformWithState no modo em tempo real.
TransformWithState (PySpark, interface Row)
from typing import Iterator, Tuple
from pyspark.sql import Row
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import LongType, StringType, TimestampType, StructField, StructType
class RTMStatefulProcessor(StatefulProcessor):
"""
This processor counts the number of records it has seen for each key using state variables
with TTLs. It redundantly maintains this count with a value, list, and map state to put load
on the state variable cleanup mechanism. (In practice, only one value state is needed to maintain
the count for a given grouping key.)
The input schema it expects is (String, Long) which represents a (key, source-timestamp) tuple.
The source-timestamp is passed through so that we can calculate end-to-end latency. The output
schema is (String, Long, Long), which represents a (key, count, source-timestamp) 3-tuple.
"""
def init(self, handle: StatefulProcessorHandle) -> None:
state_schema = StructType([StructField("value", LongType(), True)])
self.value_state = handle.getValueState("value", state_schema, 30000)
map_key_schema = StructType([StructField("key", LongType(), True)])
map_value_schema = StructType([StructField("value", StringType(), True)])
self.map_state = handle.getMapState("map", map_key_schema, map_value_schema, 30000)
list_schema = StructType([StructField("value", StringType(), True)])
self.list_state = handle.getListState("list", list_schema, 30000)
def handleInputRows(self, key, rows, timerValues) -> Iterator[Row]:
for row in rows:
# row is a tuple (key, source_timestamp)
key_str = row[0]
source_timestamp = row[1]
old_value = value.get()
if old_value is None:
old_value = 0
self.value_state.update((old_value + 1,))
self.map_state.update((old_value,), (key_str,))
self.list_state.appendValue((key_str,))
yield Row(key=key_str, value=old_value + 1, timestamp=source_timestamp)
def close(self) -> None:
pass
spark.conf.set("spark.sql.shuffle.partitions", "20")
output_schema = StructType(
[
StructField("key", StringType(), True),
StructField("value", LongType(), True),
StructField("timestamp", TimestampType(), True),
]
)
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", inputTopic)
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
.groupBy("key")
.transformWithState(
statefulProcessor=RTMStatefulProcessor(),
outputStructType=output_schema,
outputMode="Update",
timeMode="processingTime",
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", "/your/checkpoint/location")
.trigger(realTime="5 minutes")
.outputMode("Update")
.start()
)
Há uma diferença entre o modo de tempo real e outros modos de execução na transmissão estruturada que executam o StatefulProcessor
em transformWithState
. Consulte Utilizar transformWithState no modo em tempo real.
Pias
Escrevendo para o Postgres via ForEachSink
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.sql.{ForeachWriter, Row}
/**
* Groups connection properties for
* the JDBC writers.
*
* @param url JDBC url of the form jdbc:subprotocol:subname to connect to
* @param dbtable database table that should be written into
* @param username username for authentication
* @param password password for authentication
*/
class JdbcWriterConfig(
val url: String,
val dbtable: String,
val username: String,
val password: String,
) extends Serializable
/**
* Handles streaming data writes to a database sink via JDBC, by:
* - connecting to the database
* - buffering incoming data rows in batches to reduce write overhead
*
* @param config connection parameters and configuration knobs for the writer
*/
class JdbcStreamingDataWriter(config: JdbcWriterConfig)
extends ForeachWriter[Row] with Serializable {
// The writer currently only supports this hard-coded schema
private val UPSERT_STATEMENT_SQL =
s"""MERGE INTO "${config.dbtable}"
|USING (
| SELECT
| CAST(? AS INTEGER) AS "id",
| CAST(? AS CHARACTER VARYING) AS "data"
|) AS "source"
|ON "test"."id" = "source"."id"
|WHEN MATCHED THEN
| UPDATE SET "data" = "source"."data"
|WHEN NOT MATCHED THEN
| INSERT ("id", "data") VALUES ("source"."id", "source"."data")
|""".stripMargin
private val MAX_BUFFER_SIZE = 3
private val buffer = new Array[Row](MAX_BUFFER_SIZE)
private var bufferSize = 0
private var connection: Connection = _
/**
* Flushes the [[buffer]] by writing all rows in the buffer to the database.
*/
private def flushBuffer(): Unit = {
require(connection != null)
if (bufferSize == 0) {
return
}
var upsertStatement: PreparedStatement = null
try {
upsertStatement = connection.prepareStatement(UPSERT_STATEMENT_SQL)
for (i <- 0 until bufferSize) {
val row = buffer(i)
upsertStatement.setInt(1, row.getAs[String]("key"))
upsertStatement.setString(2, row.getAs[String]("value"))
upsertStatement.addBatch()
}
upsertStatement.executeBatch()
connection.commit()
bufferSize = 0
} catch { case e: Exception =>
if (connection != null) {
connection.rollback()
}
throw e
} finally {
if (upsertStatement != null) {
upsertStatement.close()
}
}
}
override def open(partitionId: Long, epochId: Long): Boolean = {
connection = DriverManager.getConnection(config.url, config.username, config.password)
true
}
override def process(row: Row): Unit = {
buffer(bufferSize) = row
bufferSize += 1
if (bufferSize >= MAX_BUFFER_SIZE) {
flushBuffer()
}
}
override def close(errorOrNull: Throwable): Unit = {
flushBuffer()
if (connection != null) {
connection.close()
connection = null
}
}
}
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", inputTopic)
.load()
.writeStream
.outputMode(OutputMode.Update())
.trigger(defaultTrigger)
.foreach(new JdbcStreamingDataWriter(new JdbcWriterConfig(jdbcUrl, tableName, jdbcUsername, jdbcPassword)))
.start()
Exibição
Este recurso está disponível no Databricks Runtime, versão 17.1 e superior.
Fonte da taxa de exibição
Neste exemplo, você lê de uma fonte de taxa e exibe a transmissão DataFrame em um Notebook.
- Scala
- Python
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.streaming.OutputMode
val inputDF = spark
.readStream
.format("rate")
.option("numPartitions", 2)
.option("rowsPerSecond", 1)
.load()
display(inputDF, trigger=Trigger.RealTime("30 seconds"), outputMode=OutputMode.Update())
inputDF = spark \
.readStream \
.format("rate") \
.option("numPartitions", 2) \
.option("rowsPerSecond", 1) \
.load()
display(inputDF, realTime="30 seconds", outputMode="update")