Pular para o conteúdo principal

Conecte-se ao Apache Kafka

Esta página descreve como você pode usar o Apache Kafka como fonte ou coletor quando executar cargas de trabalho de transmissão estructurada no Databricks.

Para obter mais informações sobre Kafka, consulte a documentaçãoApache Kafka.

Leia os dados do Kafka

Use o formato kafka para configurar conexões com o Kafka. Veja a seguir um exemplo de leitura de transmissão:

Python
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
)

O Databricks também oferece suporte a leitura em lote de Kafka, conforme no exemplo a seguir:

Python
df = (spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
)

Para carregamento incremental de lotes, Databricks recomenda o uso Kafka com Trigger.AvailableNow. Ver AvailableNow: Processamento incremental de lotes.

No Databricks Runtime 13.3 LTS e versões superiores, Databricks também fornece uma função SQL para leitura de dados Kafka . A transmissão com SQL é suportada apenas no pipeline declarativo LakeFlow Spark ou com tabelas de transmissão no Databricks SQL. Veja read_kafka função com valor de tabela.

Configurar leitor Kafka transmissão estruturada

Para consultas de lotes e de transmissão, é necessário definir os servidores bootstrap para a fonte Kafka com a seguinte opção:

Chave

Valor

Descrição

kafka.bootstrap.servers

Uma lista de hosts separados por vírgulas

Servidores de inicialização do cluster Kafka

Para configurar tópicos de inscrição, você deve especificar uma das seguintes opções:

Opção

Valor

Descrição

subscribe

Uma lista de tópicos separados por vírgula.

A lista de tópicos para se inscrever.

subscribePattern

String Java regex.

O padrão usado para assinar tópicos.

assign

Sequência JSON {"topicA":[0,1],"topic":[2,4]}.

Específico topicPartitions para consumir.

Consulte Kafka para a lista completa de opções disponíveis.

Esquema de registros do Kafka

O leitor de transmissão estructurada do Kafka retorna linhas com o seguinte esquema:

Coluna

Tipo

key

binary

value

binary

topic

string

partition

int

offset

long

timestamp

long

timestampType

int

Os key e os value são sempre desserializados como arrays de bytes com os ByteArrayDeserializer. Use operações DataFrame (como cast("string") ou from_avro) para desserializar explicitamente a chave e os valores.

Escrever dados para o Kafka

Veja a seguir um exemplo de uma gravação em transmissão para o Kafka:

Python
(df.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.start()
)

O Databricks também oferece suporte à semântica de gravação em lote nos coletores de dados do Kafka, conforme mostrado no exemplo a seguir:

Python
(df.write
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.save()
)

Configurar o escritor de transmissão estruturada Kafka

importante

Databricks Runtime 13.3 LTS e versões superiores incluem uma versão mais recente da biblioteca kafka-clients que habilita gravações idempotentes por default. Se um coletor Kafka usar a versão 2.8.0 ou inferior com ACLs configuradas, mas sem IDEMPOTENT_WRITE habilitado, a gravação falhará com a mensagem de erro org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state.

Resolva esse erro atualizando para a versão 2.8.0 ou superior Kafka , ou definindo .option(“kafka.enable.idempotence”, “false”) ao configurar seu gravador de transmissão estruturada.

Veja a seguir as opções comuns para gravações no Kafka:

Chave

Valor

Valor padrão

Descrição

kafka.boostrap.servers

Uma lista delimitada por vírgulas de <host:port>

Nenhuma

Obrigatório. A configuração do Kafka bootstrap.servers.

topic

STRING

não definido

Opcional. Define o tópico para todas as linhas a serem gravadas. Essa opção substitui todas as colunas de tópico existentes nos dados.

includeHeaders

BOOLEAN

false

Opcional. Se os cabeçalhos do Kafka devem ser incluídos na linha.

Consulte sink do Kafka para obter a lista completa de opções disponíveis.

Esquema para o gravador Kafka

Ao gravar dados no Kafka, o DataFrame fornecido pode incluir os seguintes campos:

Nome da coluna

Obrigatório ou opcional

Tipo

key

opcional

STRING ou BINARY

value

Obrigatório

STRING ou BINARY

headers

opcional

ARRAY

topic

Opcional (ignorado se topic estiver definido como opção de writer)

STRING

partition

opcional

INT

Autenticação

Databricks oferece suporte a vários métodos de autenticação para Kafka, incluindo credenciais de serviço Unity Catalog , SASL/SSL e opções específicas cloudpara AWS MSK, Azure Event Hubs e Google Cloud Kafka. Consulte Autenticação.

Recuperar métricas do Kafka

Para monitorar o atraso da consulta de transmissão em relação ao Kafka, use as métricas avgOffsetsBehindLatest, maxOffsetsBehindLatest e minOffsetsBehindLatest. Essas métricas relatam o atraso de compensação médio, máximo e mínimo em todas as partições de tópico inscritas, em relação aos offsets mais recentes no Kafka. Consulte Leitura interativa de métricas.

nota

No Databricks Runtime 17.1 e versões superiores, os offsets mais recentes Kafka são obtidos após a conclusão de cada microlote. Em tópicos que recebem dados continuamente, as métricas de backlog podem apresentar valores pequenos, porém persistentes, diferentes de zero. Esse comportamento é esperado e não indica que a transmissão esteja atrasada.

No Databricks Runtime 17.0 e versões anteriores, os offsets mais recentes Kafka são obtidos no instante de início dos microlotes. As métricas do backlog poderão retornar 0 quando as consultas de transmissão consumirem consistentemente todos os registros disponíveis no início dos microlotes.

Para estimar os dados restantes que uma consulta precisa ler, use a métrica estimatedTotalBytesBehindLatest. Essa métrica estima o número total de bytes restantes em todas as partições inscritas com base nos lotes processados nos últimos 300 segundos. Você pode modificar o período de tempo usado para esta estimativa definindo a opção bytesEstimateWindowLength.

Por exemplo, para definir o tamanho da janela para 10 minutos:

Python
df = (spark.readStream
.format("kafka")
.option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)

Se você estiver executando a transmissão em um notebook, poderá ver essas métricas na aba Dados brutos no painel de progresso da consulta de transmissão:

JSON
{
"sources": [
{
"description": "KafkaV2[Subscribe[topic]]",
"metrics": {
"avgOffsetsBehindLatest": "4.0",
"maxOffsetsBehindLatest": "4",
"minOffsetsBehindLatest": "4",
"estimatedTotalBytesBehindLatest": "80.0"
}
}
]
}

Consulte consultas sobre monitoramento transmissão estruturada no Databricks para mais informações.

Exemplo para Kafka para Delta Lake

O exemplo a seguir mostra um fluxo de trabalho completo para a transmissão contínua de dados do Kafka para uma tabela Delta Lake. Você pode usar essa abordagem para cargas de trabalho de ingestão de dados quase em tempo real.

Este exemplo utiliza um esquema JSON fixo. Para outros formatos como Avro ou Protobuf, use from_avro ou from_protobuf. Você também pode integrar com um registro de esquemas. Veja o exemplo com o Registro de Esquemas.

Python
from pyspark.sql.functions import from_json, col

# Define simple JSON schemas for key and value
key_schema = "user_id STRING"
value_schema = "event_type STRING, event_ts TIMESTAMP"

# Configure Kafka options with service credentials
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-server>:9092",
"subscribe": "<topic-name>",
"databricks.serviceCredential": "<service-credential-name>",
}

# Read from Kafka and parse JSON
parsed_df = (spark.readStream
.format("kafka")
.options(**kafka_options)
.load()
.select(
from_json(col("key").cast("string"), key_schema).alias("key"),
from_json(col("value").cast("string"), value_schema).alias("value")
)
.select("key.*", "value.*")
)

# Write to Delta table
query = (parsed_df.writeStream
.format("delta")
.option("checkpointLocation", "/path/to/checkpoint")
.trigger(processingTime="10 seconds")
.toTable("catalog.schema.events_table")
)

query.awaitTermination()