Perguntas frequentes
Perguntas frequentes sobre como usar o Kafka com o Databricks.
Por que recebo um erro informando que uma opção do Kafka não é suportada ou não é reconhecida?
Um erro comum é esquecer o prefixo kafka. ao configurar opções nativas do Kafka. Todas as opções passadas diretamente para o cliente Kafka devem ser prefixadas com kafka.:
# Incorrect - missing the kafka. prefix
.option("security.protocol", "SASL_SSL")
.option("sasl.mechanism", "PLAIN")
# Correct - using the kafka. prefix
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "PLAIN")
Opções específicas do conector Spark Kafka (como subscribe, startingOffsets, maxOffsetsPerTrigger) não precisam do prefixo. Consulte a seção Opções para obter a lista completa.
Por que estou recebendo um erro relacionado a classes sombreadas do Kafka?
O Databricks requer o uso de classes Kafka sombreadas (prefixadas com kafkashaded. ou shadedmskiam.). Se você vir erros como RESTRICTED_STREAMING_OPTION_PERMISSION_ENFORCED, você deve usar os nomes de classe sombreados:
org.apache.kafka.*As classes exigem o prefixokafkashaded.. Por exemplo:kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModulesoftware.amazon.msk.*As classes exigem o prefixoshadedmskiam.. Por exemplo:shadedmskiam.software.amazon.msk.auth.iam.IAMLoginModule
Por que estou recebendo um TimeoutException ao me conectar ao Kafka?
As causas comuns incluem:
- Conectividade de rede : O cluster compute não consegue alcançar os brokers Kafka . Verifique as regras do firewall, os grupos de segurança e as configurações da VPC.
- Servidores de inicialização incorretos : Verifique se o hostname e a porta
kafka.bootstrap.serversestão corretos. - Resolução de DNS : Certifique-se de que o nome do host do broker Kafka possa ser resolvido a partir da rede Databricks .
- Problemas com SSL/TLS : Se estiver usando SSL, verifique se os certificados estão configurados corretamente.
Para configurações de peering de Private Link ou VPC, certifique-se de que as rotas de rede corretas estejam configuradas.
Devo usar lotes ou modo de transmissão para Kafka?
Depende do seu caso de uso:
- modo de transmissão (
spark.readStream): Use quando precisar de processamento contínuo de dados ou ingestão de baixa latência. - modo lotes (
spark.read): Use para carregamentos de dados únicos, preenchimentos retroativos ou. Requer ambosstartingOffsetseendingOffsets.
Consulte Configurar intervalos de disparo de transmissão estruturada para obter detalhes sobre a configuração de intervalos de disparo como AvailableNow, ProcessingTime e tempo real mode.
Posso ler de vários tópicos Kafka em uma única transmissão?
Sim, você pode usar:
subscribe: Forneça uma lista de tópicos separados por vírgulas, por exemplo.option("subscribe", "topic1,topic2").subscribePattern: Use um padrão regex Java para corresponder aos nomes dos tópicos, por exemplo.option("subscribePattern", "topic-.*").
Como faço para usar Kafka com o pipeline declarativo LakeFlow Spark ?
O pipeline declarativo LakeFlow Spark oferece suporte nativo para fontes Kafka . Você pode definir uma tabela de transmissão que lê do Kafka:
- Python
- SQL
import dlt
@dlt.table
def kafka_bronze():
return (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:port>")
.option("subscribe", "<topic>")
.load()
)
CREATE OR REFRESH STREAMING TABLE kafka_bronze AS
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<server:port>',
subscribe => '<topic>'
);
Consulte Carregar dados no pipeline para obter mais detalhes sobre as fontes de transmissão no pipeline declarativo do LakeFlow Spark .
Como faço para desserializar as colunas key e valor Kafka ?
As colunas key e value são retornadas como binárias (tipo BINARY ). Utilize operações de DataFrame para desserializá-los com base no formato dos seus dados:
- dados de strings : Use
cast("string")para converter binário em strings. - DadosJSON : Use
from_json()após a conversão para strings. Veja a funçãofrom_json. - Dados Avro : Use
from_avro()para desserializar dados codificados em Avro. Consulte Ler e gravar dados de transmissão Avro. - Protocol buffers : Use
from_protobuf()para desserializar dados protobuf. Consulte a seção "Ler e escrever buffers de protocolo".
Por que estou recebendo um erro de escrita idempotente?
Databricks Runtime 13.3 LTS e versões superiores incluem uma versão mais recente da biblioteca kafka-clients que habilita gravações idempotentes por default. Se o seu cluster Kafka usar a versão 2.8.0 ou inferior com ACLs configuradas, mas sem IDEMPOTENT_WRITE habilitado, a gravação falhará com: org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state.
Resolva esse erro atualizando para a versão 2.8.0 ou superior Kafka , ou definindo .option("kafka.enable.idempotence", "false") ao configurar seu gravador de transmissão estruturada.
O que é KAFKA_DATA_LOSS_ERROR e como posso resolvê-lo?
Esse erro ocorre quando a fonte Kafka detecta que os offsets armazenados no checkpoint não estão mais disponíveis no Kafka, normalmente porque:
- A transmissão foi pausada por mais tempo que o período de retenção Kafka .
- Os dados do tópico do Kafka foram excluídos ou o tópico foi recriado.
- O broker Kafka sofreu perda de dados.
Para resolver:
- Se a perda de dados for aceitável : Defina
.option("failOnDataLoss", "false")para permitir que a transmissão continue a partir do primeiro ponto de interrupção disponível. - Se a perda de dados não for aceitável : Reset o ponto de verificação e reprocesse a partir dos deslocamentos
earliestou restaure os dados Kafka ausentes.
Consulte Condição de erro KAFKA_DATA_LOSS para obter mais informações.
Como faço para controlar a taxa de leitura de dados do Kafka?
Use a opção maxOffsetsPerTrigger para limitar o número de offsets (aproximadamente o número de registros) processados por microlote. Isso ajuda a evitar grandes lotes que poderiam sobrecarregar o processamento subsequente ou causar problemas de memória ao recuperar o atraso.
- Python
- Scala
- SQL
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:port>")
.option("subscribe", "<topic>")
.option("maxOffsetsPerTrigger", 10000)
.load()
)
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:port>")
.option("subscribe", "<topic>")
.option("maxOffsetsPerTrigger", 10000)
.load()
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<server:port>',
subscribe => '<topic>',
maxOffsetsPerTrigger => '10000'
);
Alternativamente, use opções como minPartitions ou maxRecordsPerPartition para controlar quantas partições Spark são criadas para cada lote.
Como posso monitorar o quão defasada minha transmissão está em relação aos offsets mais recentes Kafka ?
Utilize as métricas avgOffsetsBehindLatest, maxOffsetsBehindLatest e minOffsetsBehindLatest disponíveis no progresso da consulta de transmissão. Esses relatórios indicam quantas defasagens (offsets) sua transmissão está atrasada em relação à última defasagem disponível, considerando todas as partições de tópico inscritas. Veja consultas de monitoramento transmissão estruturada no Databricks.
Você também pode usar estimatedTotalBytesBehindLatest para estimar o total de bytes de dados que ainda não foram processados.
Por que a inicialização do meu sistema Kafka está lenta?
A transmissão Kafka requer tempo para:
- Conecte-se ao cluster Kafka e busque os metadados.
- Descubra as partições de tópicos.
- Obter deslocamentos iniciais.
Para clusters Kafka on-premises ou remotos, a latência da rede pode impactar significativamente o tempo de inicialização. Se você estiver executando um pipeline acionado/agendado com reinicializações frequentes, considere usar o modo de transmissão contínua para evitar a sobrecarga de inicialização repetida.
Por que adicionar mais executores Spark não está aumentando minha taxa de transferência Kafka ?
Quando os brokers Kafka ficam saturados, adicionar mais executores Spark aumenta o custo sem aumentar a taxa de transferência.
Sinais de que o Kafka é o gargalo:
- A taxa de transferência estabiliza apesar da adição de mais núcleos.
- O uso da CPU ou da rede pelo broker Kafka está elevado.
- A tarefa Spark é concluída rapidamente, mas é preciso aguardar novos dados.
Para resolver isso, dimensione seu cluster Kafka adicionando brokers ou aumentando o número de partições para distribuir a carga.
Como posso otimizar o custo e a utilização de recursos compute para transmissões Kafka ?
Para os modos microlotes e AvailableNow:
- Dimensionar corretamente o cluster : Monitore as métricas e defina um tamanho fixo adequado para o cluster, considerando a carga máxima.
- Use
maxOffsetsPerTrigger: Limite o tamanho dos lotes para controlar o uso de recursos durante picos de carga. - Evite o escalonamento automático : a transmissão da execução do trabalho continuamente e a adição ou remoção de nós causa sobrecarga de reequilíbrio da tarefa.
- Reduzir a distorção de dados : Partições distorcidas fazem com que algumas tarefas processem significativamente mais dados do que outras, levando a tarefas atrasadas que retardam a conclusão geral dos lotes e desperdiçam recursos compute em tarefas paralelas. Use a opção
minPartitionspara dividir partições grandes do Kafka em partições menores do Spark para um processamento mais equilibrado.
Para o modo tempo real, o dimensionamento correto é especialmente importante porque a tarefa pode ficar parado enquanto aguarda dados. Principais considerações:
- Defina
maxPartitionspara que cada tarefa lide com várias partições do Kafka para reduzir a sobrecarga. - Ajuste
spark.sql.shuffle.partitionspara Job com muita variação aleatória.
Consulte modo tempo real em transmissão estruturada para obter orientação sobre dimensionamento clusters para modo tempo real.
Por que minha transmissão não está retornando nenhum registro, mesmo que existam dados no tópico?
As causas comuns incluem:
- Configuração
startingOffsetsincorreta : O valor default élatest, que lê apenas novos dados que chegam após o início da transmissão. DefinastartingOffsetsaearliestpara ler os dados existentes. - Nome do tópico incorreto : verifique se você está se inscrevendo no tópico correto.
- Problemas de autenticação : Sua transmissão pode ter se conectado com sucesso, mas não possui permissões para ler o tópico. Verifique suas ACLs do Kafka.
- Expiração do offset : Se sua transmissão foi interrompida por um longo tempo e os offsets no checkpoint expiraram (foram excluídos pela retenção Kafka ), você pode precisar redefinir o checkpoint ou ajustar
failOnDataLoss.