Pular para o conteúdo principal

Conecte-se ao Apache Kafka

Este artigo descreve como você pode usar o Apache Kafka como fonte ou coletor quando executar cargas de trabalho de Structured Streaming no Databricks.

Para obter mais informações sobre Kafka, consulte a documentaçãoApache Kafka.

Leia os dados do Kafka

O Databricks fornece a palavra-chave kafka como um formato de dados para configurar conexões com o Kafka. Segue abaixo um exemplo de leitura de um jogo:

Python
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
)

Databricks também suporta semântica de leitura de lotes, como mostrado no exemplo a seguir:

Python
df = (spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
)

Para carregamento incremental de lotes, Databricks recomenda o uso Kafka com Trigger.AvailableNow. Ver AvailableNow: Processamento incremental de lotes.

No Databricks Runtime 13.3 LTS e versões superiores, Databricks também fornece uma função SQL para leitura de dados Kafka . A transmissão com SQL é suportada apenas no pipeline declarativo LakeFlow Spark ou com tabelas de transmissão no Databricks SQL. Veja read_kafka função com valor de tabela.

Configurar leitor Kafka transmissão estruturada

A seguinte opção deve ser configurada para a fonte Kafka , tanto para consultas de lotes quanto de transmissões:

Opção

Valor

Descrição

kafka.bootstrap.servers

Uma lista de hosts separados por vírgulas

Servidores de inicialização do cluster Kafka

Além disso, é necessário selecionar uma das seguintes opções para especificar a quais tópicos se inscrever:

Opção

Valor

Descrição

subscribe

Uma lista de tópicos separados por vírgula.

A lista de tópicos para se inscrever.

subscribePattern

String Java regex.

O padrão usado para assinar tópicos.

assign

Sequência JSON {"topicA":[0,1],"topic":[2,4]}.

Tópico específico Partições a serem consumidas.

Consulte a página Opções para obter a lista completa de opções disponíveis.

Esquema para registros do Kafka

Os registros retornados pelo leitor de transmissão estruturada Kafka terão o seguinte esquema:

Coluna

Tipo

key

binary

value

binary

topic

string

partition

int

offset

long

timestamp

long

timestampType

int

Os key e os value são sempre desserializados como arrays de bytes com os ByteArrayDeserializer. Use operações DataFrame (como cast("string") ou from_avro) para desserializar explicitamente a chave e os valores.

Escrever dados para o Kafka

Veja a seguir um exemplo de uma gravação em transmissão para o Kafka:

Python
(df.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.start()
)

O Databricks também oferece suporte à semântica de gravação em lote nos coletores de dados do Kafka, conforme mostrado no exemplo a seguir:

Python
(df.write
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.save()
)

Configurar o escritor de transmissão estruturada Kafka

importante

Databricks Runtime 13.3 LTS e versões superiores incluem uma versão mais recente da biblioteca kafka-clients que habilita gravações idempotentes por default. Se um coletor Kafka usar a versão 2.8.0 ou inferior com ACLs configuradas, mas sem IDEMPOTENT_WRITE habilitado, a gravação falhará com a mensagem de erro org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state.

Resolva esse erro atualizando para a versão 2.8.0 ou superior Kafka , ou definindo .option(“kafka.enable.idempotence”, “false”) ao configurar seu gravador de transmissão estruturada.

Veja a seguir as opções comuns definidas ao gravar no Kafka:

Opção

Valor

Valor padrão

Descrição

kafka.boostrap.servers

Uma lista delimitada por vírgulas de <host:port>

Nenhuma

[Obrigatório] A configuração do Kafka bootstrap.servers.

topic

STRING

não definido

[Opcional] Define o tópico para todas as linhas a serem gravadas. Essa opção substitui todas as colunas de tópico existentes nos dados.

includeHeaders

BOOLEAN

false

[Opcional] Se os cabeçalhos do Kafka devem ser incluídos na linha.

Consulte a página Opções para obter a lista completa de opções disponíveis.

Esquema para o gravador Kafka

Ao gravar dados no Kafka, o DataFrame fornecido pode incluir os seguintes campos:

Nome da coluna

Obrigatório ou opcional

Tipo

key

opcional

STRING ou BINARY

value

Obrigatório

STRING ou BINARY

headers

opcional

ARRAY

topic

Opcional (ignorado se topic estiver definido como opção de writer)

STRING

partition

opcional

INT

Autenticação

Databricks oferece suporte a vários métodos de autenticação para Kafka, incluindo credenciais de serviço Unity Catalog , SASL/SSL e opções específicas cloudpara AWS MSK, Azure Event Hubs e Google Cloud Kafka. Consulte Autenticação.

Recuperar métricas do Kafka

Você pode monitorar o quanto uma consulta de transmissão está atrasada em relação ao Kafka usando as métricas avgOffsetsBehindLatest, maxOffsetsBehindLatest e minOffsetsBehindLatest . Esses relatórios mostram o atraso médio, máximo e mínimo de deslocamento em todas as partições de tópicos inscritas, em relação aos deslocamentos mais recentes no Kafka. Veja a leitura de métricas interativamente.

Para estimar a quantidade de dados que a consulta ainda não consumiu, use as métricas estimatedTotalBytesBehindLatest . Esta métrica estima o número total de bytes restantes em todas as partições inscritas, com base nos lotes processados nos últimos 300 segundos. Você pode modificar o intervalo de tempo usado para esta estimativa definindo a opção bytesEstimateWindowLength . Por exemplo, para definir para 10 minutos:

Python
df = (spark.readStream
.format("kafka")
.option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)

Se você estiver executando a transmissão em um notebook, poderá ver essas métricas na aba Dados brutos no painel de progresso da consulta de transmissão:

JSON
{
"sources": [
{
"description": "KafkaV2[Subscribe[topic]]",
"metrics": {
"avgOffsetsBehindLatest": "4.0",
"maxOffsetsBehindLatest": "4",
"minOffsetsBehindLatest": "4",
"estimatedTotalBytesBehindLatest": "80.0"
}
}
]
}

Consulte consultas sobre monitoramento transmissão estruturada no Databricks para mais informações.

Exemplo de código: Kafka para Delta

O exemplo a seguir demonstra um fluxo de trabalho completo para transmissão contínua de dados do Kafka para uma tabela Delta. Esse padrão é ideal para cargas de trabalho de ingestão quase em tempo real.

Este exemplo utiliza um esquema JSON fixo. Para outros formatos como Avro ou Protobuf, use from_avro ou from_protobuf. Você também pode integrar com um registro de esquemas. Veja o exemplo com o Registro de Esquemas.

Python
from pyspark.sql.functions import from_json, col

# Define simple JSON schemas for key and value
key_schema = "user_id STRING"
value_schema = "event_type STRING, event_ts TIMESTAMP"

# Configure Kafka options with service credentials
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-server>:9092",
"subscribe": "<topic-name>",
"databricks.serviceCredential": "<service-credential-name>",
}

# Read from Kafka and parse JSON
parsed_df = (spark.readStream
.format("kafka")
.options(**kafka_options)
.load()
.select(
from_json(col("key").cast("string"), key_schema).alias("key"),
from_json(col("value").cast("string"), value_schema).alias("value")
)
.select("key.*", "value.*")
)

# Write to Delta table
query = (parsed_df.writeStream
.format("delta")
.option("checkpointLocation", "/path/to/checkpoint")
.trigger(processingTime="10 seconds")
.toTable("catalog.schema.events_table")
)

query.awaitTermination()