Conecte-se ao Apache Kafka
Esta página descreve como você pode usar o Apache Kafka como fonte ou coletor quando executar cargas de trabalho de transmissão estructurada no Databricks.
Para obter mais informações sobre Kafka, consulte a documentaçãoApache Kafka.
Leia os dados do Kafka
Use o formato kafka para configurar conexões com o Kafka. Veja a seguir um exemplo de leitura de transmissão:
- Python
- Scala
- SQL
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
)
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
CREATE OR REFRESH STREAMING TABLE <table_name> AS
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<server:ip>',
subscribe => '<topic>'
);
O Databricks também oferece suporte a leitura em lote de Kafka, conforme no exemplo a seguir:
- Python
- Scala
- SQL
df = (spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
)
val df = spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
SELECT * FROM read_kafka(
bootstrapServers => '<server:ip>',
subscribe => '<topic>',
startingOffsets => 'earliest',
endingOffsets => 'latest'
);
Para carregamento incremental de lotes, Databricks recomenda o uso Kafka com Trigger.AvailableNow. Ver AvailableNow: Processamento incremental de lotes.
No Databricks Runtime 13.3 LTS e versões superiores, Databricks também fornece uma função SQL para leitura de dados Kafka . A transmissão com SQL é suportada apenas no pipeline declarativo LakeFlow Spark ou com tabelas de transmissão no Databricks SQL. Veja read_kafka função com valor de tabela.
Configurar leitor Kafka transmissão estruturada
Para consultas de lotes e de transmissão, é necessário definir os servidores bootstrap para a fonte Kafka com a seguinte opção:
Chave | Valor | Descrição |
|---|---|---|
| Uma lista de hosts separados por vírgulas | Servidores de inicialização do cluster Kafka |
Para configurar tópicos de inscrição, você deve especificar uma das seguintes opções:
Opção | Valor | Descrição |
|---|---|---|
| Uma lista de tópicos separados por vírgula. | A lista de tópicos para se inscrever. |
| String Java regex. | O padrão usado para assinar tópicos. |
| Sequência JSON | Específico |
Consulte Kafka para a lista completa de opções disponíveis.
Esquema de registros do Kafka
O leitor de transmissão estructurada do Kafka retorna linhas com o seguinte esquema:
Coluna | Tipo |
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Os key e os value são sempre desserializados como arrays de bytes com os ByteArrayDeserializer. Use operações DataFrame (como cast("string") ou from_avro) para desserializar explicitamente a chave e os valores.
Escrever dados para o Kafka
Veja a seguir um exemplo de uma gravação em transmissão para o Kafka:
- Python
- Scala
(df.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.start()
)
df.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.start()
O Databricks também oferece suporte à semântica de gravação em lote nos coletores de dados do Kafka, conforme mostrado no exemplo a seguir:
- Python
- Scala
(df.write
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.save()
)
df.write
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.save()
Configurar o escritor de transmissão estruturada Kafka
Databricks Runtime 13.3 LTS e versões superiores incluem uma versão mais recente da biblioteca kafka-clients que habilita gravações idempotentes por default. Se um coletor Kafka usar a versão 2.8.0 ou inferior com ACLs configuradas, mas sem IDEMPOTENT_WRITE habilitado, a gravação falhará com a mensagem de erro org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state.
Resolva esse erro atualizando para a versão 2.8.0 ou superior Kafka , ou definindo .option(“kafka.enable.idempotence”, “false”) ao configurar seu gravador de transmissão estruturada.
Veja a seguir as opções comuns para gravações no Kafka:
Chave | Valor | Valor padrão | Descrição |
|---|---|---|---|
| Uma lista delimitada por vírgulas de | Nenhuma | Obrigatório. A configuração do Kafka |
|
| não definido | Opcional. Define o tópico para todas as linhas a serem gravadas. Essa opção substitui todas as colunas de tópico existentes nos dados. |
|
|
| Opcional. Se os cabeçalhos do Kafka devem ser incluídos na linha. |
Consulte sink do Kafka para obter a lista completa de opções disponíveis.
Esquema para o gravador Kafka
Ao gravar dados no Kafka, o DataFrame fornecido pode incluir os seguintes campos:
Nome da coluna | Obrigatório ou opcional | Tipo |
|---|---|---|
| opcional |
|
| Obrigatório |
|
| opcional |
|
| Opcional (ignorado se |
|
| opcional |
|
Autenticação
Databricks oferece suporte a vários métodos de autenticação para Kafka, incluindo credenciais de serviço Unity Catalog , SASL/SSL e opções específicas cloudpara AWS MSK, Azure Event Hubs e Google Cloud Kafka. Consulte Autenticação.
Recuperar métricas do Kafka
Para monitorar o atraso da consulta de transmissão em relação ao Kafka, use as métricas avgOffsetsBehindLatest, maxOffsetsBehindLatest e minOffsetsBehindLatest. Essas métricas relatam o atraso de compensação médio, máximo e mínimo em todas as partições de tópico inscritas, em relação aos offsets mais recentes no Kafka. Consulte Leitura interativa de métricas.
No Databricks Runtime 17.1 e versões superiores, os offsets mais recentes Kafka são obtidos após a conclusão de cada microlote. Em tópicos que recebem dados continuamente, as métricas de backlog podem apresentar valores pequenos, porém persistentes, diferentes de zero. Esse comportamento é esperado e não indica que a transmissão esteja atrasada.
No Databricks Runtime 17.0 e versões anteriores, os offsets mais recentes Kafka são obtidos no instante de início dos microlotes. As métricas do backlog poderão retornar 0 quando as consultas de transmissão consumirem consistentemente todos os registros disponíveis no início dos microlotes.
Para estimar os dados restantes que uma consulta precisa ler, use a métrica estimatedTotalBytesBehindLatest. Essa métrica estima o número total de bytes restantes em todas as partições inscritas com base nos lotes processados nos últimos 300 segundos. Você pode modificar o período de tempo usado para esta estimativa definindo a opção bytesEstimateWindowLength.
Por exemplo, para definir o tamanho da janela para 10 minutos:
- Python
- Scala
df = (spark.readStream
.format("kafka")
.option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)
val df = spark.readStream
.format("kafka")
.option("bytesEstimateWindowLength", "10m") // m for minutes, you can also use "600s" for 600 seconds
Se você estiver executando a transmissão em um notebook, poderá ver essas métricas na aba Dados brutos no painel de progresso da consulta de transmissão:
{
"sources": [
{
"description": "KafkaV2[Subscribe[topic]]",
"metrics": {
"avgOffsetsBehindLatest": "4.0",
"maxOffsetsBehindLatest": "4",
"minOffsetsBehindLatest": "4",
"estimatedTotalBytesBehindLatest": "80.0"
}
}
]
}
Consulte consultas sobre monitoramento transmissão estruturada no Databricks para mais informações.
Exemplo para Kafka para Delta Lake
O exemplo a seguir mostra um fluxo de trabalho completo para uma gravação de transmissão incremental do Kafka para uma tabela Delta Lake usando o gatilho availableNow. É possível usar esta abordagem para cargas de trabalho de ingestão de dados incrementais.
Este exemplo utiliza um esquema JSON fixo. Para outros formatos como Avro ou Protobuf, use from_avro ou from_protobuf. Você também pode integrar com um registro de esquemas. Veja o exemplo com o Registro de Esquemas.
- Python
- Scala
- SQL
from pyspark.sql.functions import from_json, col
# Define simple JSON schemas for key and value
key_schema = "user_id STRING"
value_schema = "event_type STRING, event_ts TIMESTAMP"
# Configure Kafka options with service credentials
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-server>:9092",
"subscribe": "<topic-name>",
"databricks.serviceCredential": "<service-credential-name>",
}
# Read from Kafka and parse JSON
parsed_df = (spark.readStream
.format("kafka")
.options(**kafka_options)
.load()
.select(
from_json(col("key").cast("string"), key_schema).alias("key"),
from_json(col("value").cast("string"), value_schema).alias("value")
)
.select("key.*", "value.*")
)
# Write to Delta table
query = (parsed_df.writeStream
.format("delta")
.option("checkpointLocation", "/path/to/checkpoint")
.trigger(availableNow=True)
.toTable("catalog.schema.events_table")
)
query.awaitTermination()
import org.apache.spark.sql.functions.{from_json, col}
import org.apache.spark.sql.streaming.Trigger
// Define JSON schemas for key and value
val keySchema = "user_id STRING"
val valueSchema = "event_type STRING, event_ts TIMESTAMP"
// Configure Kafka options with service credentials
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> "<bootstrap-server>:9092",
"subscribe" -> "<topic-name>",
"databricks.serviceCredential" -> "<service-credential-name>"
)
// Read from Kafka and parse JSON
val parsedDF = spark.readStream
.format("kafka")
.options(kafkaOptions)
.load()
.select(
from_json(col("key").cast("string"), keySchema).alias("key"),
from_json(col("value").cast("string"), valueSchema).alias("value")
)
.select("key.*", "value.*")
// Write to Delta table
val query = parsedDF.writeStream
.format("delta")
.option("checkpointLocation", "/path/to/checkpoint")
.trigger(Trigger.ProcessingTime("10 seconds"))
.toTable("catalog.schema.events_table")
query.awaitTermination()
-- Create a streaming table from Kafka using read_kafka
CREATE OR REFRESH STREAMING TABLE catalog.schema.events_table AS
SELECT
key::string:user_id AS user_id,
value::string:event_type AS event_type,
to_timestamp(value::string:event_ts) AS event_ts
FROM STREAM read_kafka(
bootstrapServers => '<bootstrap-server>:9092',
subscribe => '<topic-name>',
serviceCredential => '<service-credential-name>'
);
Na compute serverless do Databricks, o gatilho availableNow é recomendado para transmissão incremental. Para transmissão contínua de baixa latência, use o modo contínuo dos Pipelines Declarativos do LakeFlow. Consulte os gatilhos de transmissão estructurada para a lista completa de opções suportadas.