Conecte-se ao Apache Kafka
Este artigo descreve como você pode usar o Apache Kafka como fonte ou coletor quando executar cargas de trabalho de Structured Streaming no Databricks.
Para obter mais informações sobre Kafka, consulte a documentaçãoApache Kafka.
Leia os dados do Kafka
O Databricks fornece a palavra-chave kafka como um formato de dados para configurar conexões com o Kafka. Segue abaixo um exemplo de leitura de um jogo:
- Python
- Scala
- SQL
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
)
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
CREATE OR REFRESH STREAMING TABLE <table_name> AS
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<server:ip>',
subscribe => '<topic>'
);
Databricks também suporta semântica de leitura de lotes, como mostrado no exemplo a seguir:
- Python
- Scala
- SQL
df = (spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
)
val df = spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
SELECT * FROM read_kafka(
bootstrapServers => '<server:ip>',
subscribe => '<topic>',
startingOffsets => 'earliest',
endingOffsets => 'latest'
);
Para carregamento incremental de lotes, Databricks recomenda o uso Kafka com Trigger.AvailableNow. Ver AvailableNow: Processamento incremental de lotes.
No Databricks Runtime 13.3 LTS e versões superiores, Databricks também fornece uma função SQL para leitura de dados Kafka . A transmissão com SQL é suportada apenas no pipeline declarativo LakeFlow Spark ou com tabelas de transmissão no Databricks SQL. Veja read_kafka função com valor de tabela.
Configurar leitor Kafka transmissão estruturada
A seguinte opção deve ser configurada para a fonte Kafka , tanto para consultas de lotes quanto de transmissões:
Opção | Valor | Descrição |
|---|---|---|
| Uma lista de hosts separados por vírgulas | Servidores de inicialização do cluster Kafka |
Além disso, é necessário selecionar uma das seguintes opções para especificar a quais tópicos se inscrever:
Opção | Valor | Descrição |
|---|---|---|
| Uma lista de tópicos separados por vírgula. | A lista de tópicos para se inscrever. |
| String Java regex. | O padrão usado para assinar tópicos. |
| Sequência JSON | Tópico específico Partições a serem consumidas. |
Consulte a página Opções para obter a lista completa de opções disponíveis.
Esquema para registros do Kafka
Os registros retornados pelo leitor de transmissão estruturada Kafka terão o seguinte esquema:
Coluna | Tipo |
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Os key e os value são sempre desserializados como arrays de bytes com os ByteArrayDeserializer. Use operações DataFrame (como cast("string") ou from_avro) para desserializar explicitamente a chave e os valores.
Escrever dados para o Kafka
Veja a seguir um exemplo de uma gravação em transmissão para o Kafka:
- Python
- Scala
(df.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.start()
)
df.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.start()
O Databricks também oferece suporte à semântica de gravação em lote nos coletores de dados do Kafka, conforme mostrado no exemplo a seguir:
- Python
- Scala
(df.write
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.save()
)
df.write
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.save()
Configurar o escritor de transmissão estruturada Kafka
Databricks Runtime 13.3 LTS e versões superiores incluem uma versão mais recente da biblioteca kafka-clients que habilita gravações idempotentes por default. Se um coletor Kafka usar a versão 2.8.0 ou inferior com ACLs configuradas, mas sem IDEMPOTENT_WRITE habilitado, a gravação falhará com a mensagem de erro org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state.
Resolva esse erro atualizando para a versão 2.8.0 ou superior Kafka , ou definindo .option(“kafka.enable.idempotence”, “false”) ao configurar seu gravador de transmissão estruturada.
Veja a seguir as opções comuns definidas ao gravar no Kafka:
Opção | Valor | Valor padrão | Descrição |
|---|---|---|---|
| Uma lista delimitada por vírgulas de | Nenhuma | [Obrigatório] A configuração do Kafka |
|
| não definido | [Opcional] Define o tópico para todas as linhas a serem gravadas. Essa opção substitui todas as colunas de tópico existentes nos dados. |
|
|
| [Opcional] Se os cabeçalhos do Kafka devem ser incluídos na linha. |
Consulte a página Opções para obter a lista completa de opções disponíveis.
Esquema para o gravador Kafka
Ao gravar dados no Kafka, o DataFrame fornecido pode incluir os seguintes campos:
Nome da coluna | Obrigatório ou opcional | Tipo |
|---|---|---|
| opcional |
|
| Obrigatório |
|
| opcional |
|
| Opcional (ignorado se |
|
| opcional |
|
Autenticação
Databricks oferece suporte a vários métodos de autenticação para Kafka, incluindo credenciais de serviço Unity Catalog , SASL/SSL e opções específicas cloudpara AWS MSK, Azure Event Hubs e Google Cloud Kafka. Consulte Autenticação.
Recuperar métricas do Kafka
Você pode monitorar o quanto uma consulta de transmissão está atrasada em relação ao Kafka usando as métricas avgOffsetsBehindLatest, maxOffsetsBehindLatest e minOffsetsBehindLatest . Esses relatórios mostram o atraso médio, máximo e mínimo de deslocamento em todas as partições de tópicos inscritas, em relação aos deslocamentos mais recentes no Kafka. Veja a leitura de métricas interativamente.
Para estimar a quantidade de dados que a consulta ainda não consumiu, use as métricas estimatedTotalBytesBehindLatest . Esta métrica estima o número total de bytes restantes em todas as partições inscritas, com base nos lotes processados nos últimos 300 segundos. Você pode modificar o intervalo de tempo usado para esta estimativa definindo a opção bytesEstimateWindowLength . Por exemplo, para definir para 10 minutos:
- Python
- Scala
df = (spark.readStream
.format("kafka")
.option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)
val df = spark.readStream
.format("kafka")
.option("bytesEstimateWindowLength", "10m") // m for minutes, you can also use "600s" for 600 seconds
Se você estiver executando a transmissão em um notebook, poderá ver essas métricas na aba Dados brutos no painel de progresso da consulta de transmissão:
{
"sources": [
{
"description": "KafkaV2[Subscribe[topic]]",
"metrics": {
"avgOffsetsBehindLatest": "4.0",
"maxOffsetsBehindLatest": "4",
"minOffsetsBehindLatest": "4",
"estimatedTotalBytesBehindLatest": "80.0"
}
}
]
}
Consulte consultas sobre monitoramento transmissão estruturada no Databricks para mais informações.
Exemplo de código: Kafka para Delta
O exemplo a seguir demonstra um fluxo de trabalho completo para transmissão contínua de dados do Kafka para uma tabela Delta. Esse padrão é ideal para cargas de trabalho de ingestão quase em tempo real.
Este exemplo utiliza um esquema JSON fixo. Para outros formatos como Avro ou Protobuf, use from_avro ou from_protobuf. Você também pode integrar com um registro de esquemas. Veja o exemplo com o Registro de Esquemas.
- Python
- Scala
- SQL
from pyspark.sql.functions import from_json, col
# Define simple JSON schemas for key and value
key_schema = "user_id STRING"
value_schema = "event_type STRING, event_ts TIMESTAMP"
# Configure Kafka options with service credentials
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-server>:9092",
"subscribe": "<topic-name>",
"databricks.serviceCredential": "<service-credential-name>",
}
# Read from Kafka and parse JSON
parsed_df = (spark.readStream
.format("kafka")
.options(**kafka_options)
.load()
.select(
from_json(col("key").cast("string"), key_schema).alias("key"),
from_json(col("value").cast("string"), value_schema).alias("value")
)
.select("key.*", "value.*")
)
# Write to Delta table
query = (parsed_df.writeStream
.format("delta")
.option("checkpointLocation", "/path/to/checkpoint")
.trigger(processingTime="10 seconds")
.toTable("catalog.schema.events_table")
)
query.awaitTermination()
import org.apache.spark.sql.functions.{from_json, col}
import org.apache.spark.sql.streaming.Trigger
// Define JSON schemas for key and value
val keySchema = "user_id STRING"
val valueSchema = "event_type STRING, event_ts TIMESTAMP"
// Configure Kafka options with service credentials
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> "<bootstrap-server>:9092",
"subscribe" -> "<topic-name>",
"databricks.serviceCredential" -> "<service-credential-name>"
)
// Read from Kafka and parse JSON
val parsedDF = spark.readStream
.format("kafka")
.options(kafkaOptions)
.load()
.select(
from_json(col("key").cast("string"), keySchema).alias("key"),
from_json(col("value").cast("string"), valueSchema).alias("value")
)
.select("key.*", "value.*")
// Write to Delta table
val query = parsedDF.writeStream
.format("delta")
.option("checkpointLocation", "/path/to/checkpoint")
.trigger(Trigger.ProcessingTime("10 seconds"))
.toTable("catalog.schema.events_table")
query.awaitTermination()
-- Create a streaming table from Kafka using read_kafka
CREATE OR REFRESH STREAMING TABLE catalog.schema.events_table AS
SELECT
key::string:user_id AS user_id,
value::string:event_type AS event_type,
to_timestamp(value::string:event_ts) AS event_ts
FROM STREAM read_kafka(
bootstrapServers => '<bootstrap-server>:9092',
subscribe => '<topic-name>',
serviceCredential => '<service-credential-name>'
);