Conecte-se ao Apache Kafka
Esta página descreve como você pode usar o Apache Kafka como fonte ou coletor quando executar cargas de trabalho de transmissão estructurada no Databricks.
Para obter mais informações sobre Kafka, consulte a documentaçãoApache Kafka.
Leia os dados do Kafka
Use o formato kafka para configurar conexões com o Kafka. Veja a seguir um exemplo de leitura de transmissão:
- Python
- Scala
- SQL
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
)
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
CREATE OR REFRESH STREAMING TABLE <table_name> AS
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<server:ip>',
subscribe => '<topic>'
);
O Databricks também oferece suporte a leitura em lote de Kafka, conforme no exemplo a seguir:
- Python
- Scala
- SQL
df = (spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
)
val df = spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
SELECT * FROM read_kafka(
bootstrapServers => '<server:ip>',
subscribe => '<topic>',
startingOffsets => 'earliest',
endingOffsets => 'latest'
);
Para carregamento incremental de lotes, Databricks recomenda o uso Kafka com Trigger.AvailableNow. Ver AvailableNow: Processamento incremental de lotes.
No Databricks Runtime 13.3 LTS e versões superiores, Databricks também fornece uma função SQL para leitura de dados Kafka . A transmissão com SQL é suportada apenas no pipeline declarativo LakeFlow Spark ou com tabelas de transmissão no Databricks SQL. Veja read_kafka função com valor de tabela.
Configurar leitor Kafka transmissão estruturada
Para consultas de lotes e de transmissão, é necessário definir os servidores bootstrap para a fonte Kafka com a seguinte opção:
Chave | Valor | Descrição |
|---|---|---|
| Uma lista de hosts separados por vírgulas | Servidores de inicialização do cluster Kafka |
Para configurar tópicos de inscrição, você deve especificar uma das seguintes opções:
Opção | Valor | Descrição |
|---|---|---|
| Uma lista de tópicos separados por vírgula. | A lista de tópicos para se inscrever. |
| String Java regex. | O padrão usado para assinar tópicos. |
| Sequência JSON | Específico |
Consulte Kafka para a lista completa de opções disponíveis.
Esquema de registros do Kafka
O leitor de transmissão estructurada do Kafka retorna linhas com o seguinte esquema:
Coluna | Tipo |
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Os key e os value são sempre desserializados como arrays de bytes com os ByteArrayDeserializer. Use operações DataFrame (como cast("string") ou from_avro) para desserializar explicitamente a chave e os valores.
Escrever dados para o Kafka
Veja a seguir um exemplo de uma gravação em transmissão para o Kafka:
- Python
- Scala
(df.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.start()
)
df.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.start()
O Databricks também oferece suporte à semântica de gravação em lote nos coletores de dados do Kafka, conforme mostrado no exemplo a seguir:
- Python
- Scala
(df.write
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.save()
)
df.write
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.save()
Configurar o escritor de transmissão estruturada Kafka
Databricks Runtime 13.3 LTS e versões superiores incluem uma versão mais recente da biblioteca kafka-clients que habilita gravações idempotentes por default. Se um coletor Kafka usar a versão 2.8.0 ou inferior com ACLs configuradas, mas sem IDEMPOTENT_WRITE habilitado, a gravação falhará com a mensagem de erro org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state.
Resolva esse erro atualizando para a versão 2.8.0 ou superior Kafka , ou definindo .option(“kafka.enable.idempotence”, “false”) ao configurar seu gravador de transmissão estruturada.
Veja a seguir as opções comuns para gravações no Kafka:
Chave | Valor | Valor padrão | Descrição |
|---|---|---|---|
| Uma lista delimitada por vírgulas de | Nenhuma | Obrigatório. A configuração do Kafka |
|
| não definido | Opcional. Define o tópico para todas as linhas a serem gravadas. Essa opção substitui todas as colunas de tópico existentes nos dados. |
|
|
| Opcional. Se os cabeçalhos do Kafka devem ser incluídos na linha. |
Consulte sink do Kafka para obter a lista completa de opções disponíveis.
Esquema para o gravador Kafka
Ao gravar dados no Kafka, o DataFrame fornecido pode incluir os seguintes campos:
Nome da coluna | Obrigatório ou opcional | Tipo |
|---|---|---|
| opcional |
|
| Obrigatório |
|
| opcional |
|
| Opcional (ignorado se |
|
| opcional |
|
Autenticação
Databricks oferece suporte a vários métodos de autenticação para Kafka, incluindo credenciais de serviço Unity Catalog , SASL/SSL e opções específicas cloudpara AWS MSK, Azure Event Hubs e Google Cloud Kafka. Consulte Autenticação.
Recuperar métricas do Kafka
Para monitorar o atraso da consulta de transmissão em relação ao Kafka, use as métricas avgOffsetsBehindLatest, maxOffsetsBehindLatest e minOffsetsBehindLatest. Essas métricas relatam o atraso de compensação médio, máximo e mínimo em todas as partições de tópico inscritas, em relação aos offsets mais recentes no Kafka. Consulte Leitura interativa de métricas.
No Databricks Runtime 17.1 e versões superiores, os offsets mais recentes Kafka são obtidos após a conclusão de cada microlote. Em tópicos que recebem dados continuamente, as métricas de backlog podem apresentar valores pequenos, porém persistentes, diferentes de zero. Esse comportamento é esperado e não indica que a transmissão esteja atrasada.
No Databricks Runtime 17.0 e versões anteriores, os offsets mais recentes Kafka são obtidos no instante de início dos microlotes. As métricas do backlog poderão retornar 0 quando as consultas de transmissão consumirem consistentemente todos os registros disponíveis no início dos microlotes.
Para estimar os dados restantes que uma consulta precisa ler, use a métrica estimatedTotalBytesBehindLatest. Essa métrica estima o número total de bytes restantes em todas as partições inscritas com base nos lotes processados nos últimos 300 segundos. Você pode modificar o período de tempo usado para esta estimativa definindo a opção bytesEstimateWindowLength.
Por exemplo, para definir o tamanho da janela para 10 minutos:
- Python
- Scala
df = (spark.readStream
.format("kafka")
.option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)
val df = spark.readStream
.format("kafka")
.option("bytesEstimateWindowLength", "10m") // m for minutes, you can also use "600s" for 600 seconds
Se você estiver executando a transmissão em um notebook, poderá ver essas métricas na aba Dados brutos no painel de progresso da consulta de transmissão:
{
"sources": [
{
"description": "KafkaV2[Subscribe[topic]]",
"metrics": {
"avgOffsetsBehindLatest": "4.0",
"maxOffsetsBehindLatest": "4",
"minOffsetsBehindLatest": "4",
"estimatedTotalBytesBehindLatest": "80.0"
}
}
]
}
Consulte consultas sobre monitoramento transmissão estruturada no Databricks para mais informações.
Exemplo para Kafka para Delta Lake
O exemplo a seguir mostra um fluxo de trabalho completo para a transmissão contínua de dados do Kafka para uma tabela Delta Lake. Você pode usar essa abordagem para cargas de trabalho de ingestão de dados quase em tempo real.
Este exemplo utiliza um esquema JSON fixo. Para outros formatos como Avro ou Protobuf, use from_avro ou from_protobuf. Você também pode integrar com um registro de esquemas. Veja o exemplo com o Registro de Esquemas.
- Python
- Scala
- SQL
from pyspark.sql.functions import from_json, col
# Define simple JSON schemas for key and value
key_schema = "user_id STRING"
value_schema = "event_type STRING, event_ts TIMESTAMP"
# Configure Kafka options with service credentials
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-server>:9092",
"subscribe": "<topic-name>",
"databricks.serviceCredential": "<service-credential-name>",
}
# Read from Kafka and parse JSON
parsed_df = (spark.readStream
.format("kafka")
.options(**kafka_options)
.load()
.select(
from_json(col("key").cast("string"), key_schema).alias("key"),
from_json(col("value").cast("string"), value_schema).alias("value")
)
.select("key.*", "value.*")
)
# Write to Delta table
query = (parsed_df.writeStream
.format("delta")
.option("checkpointLocation", "/path/to/checkpoint")
.trigger(processingTime="10 seconds")
.toTable("catalog.schema.events_table")
)
query.awaitTermination()
import org.apache.spark.sql.functions.{from_json, col}
import org.apache.spark.sql.streaming.Trigger
// Define JSON schemas for key and value
val keySchema = "user_id STRING"
val valueSchema = "event_type STRING, event_ts TIMESTAMP"
// Configure Kafka options with service credentials
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> "<bootstrap-server>:9092",
"subscribe" -> "<topic-name>",
"databricks.serviceCredential" -> "<service-credential-name>"
)
// Read from Kafka and parse JSON
val parsedDF = spark.readStream
.format("kafka")
.options(kafkaOptions)
.load()
.select(
from_json(col("key").cast("string"), keySchema).alias("key"),
from_json(col("value").cast("string"), valueSchema).alias("value")
)
.select("key.*", "value.*")
// Write to Delta table
val query = parsedDF.writeStream
.format("delta")
.option("checkpointLocation", "/path/to/checkpoint")
.trigger(Trigger.ProcessingTime("10 seconds"))
.toTable("catalog.schema.events_table")
query.awaitTermination()
-- Create a streaming table from Kafka using read_kafka
CREATE OR REFRESH STREAMING TABLE catalog.schema.events_table AS
SELECT
key::string:user_id AS user_id,
value::string:event_type AS event_type,
to_timestamp(value::string:event_ts) AS event_ts
FROM STREAM read_kafka(
bootstrapServers => '<bootstrap-server>:9092',
subscribe => '<topic-name>',
serviceCredential => '<service-credential-name>'
);