Perguntas frequentes
Perguntas frequentes sobre como usar o Kafka com o Databricks.
Por que recebo um erro informando que uma opção do Kafka não é suportada ou não é reconhecida?
Um erro comum é esquecer o prefixo kafka. ao configurar opções nativas do Kafka. Todas as opções passadas diretamente para o cliente Kafka devem ser prefixadas com kafka.:
# Incorrect - missing the kafka. prefix
.option("security.protocol", "SASL_SSL")
.option("sasl.mechanism", "PLAIN")
# Correct - using the kafka. prefix
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "PLAIN")
Opções específicas do conector Spark Kafka (como subscribe, startingOffsets, maxOffsetsPerTrigger) não precisam do prefixo. Consulte a seção Opções para obter a lista completa.
Por que estou recebendo um erro relacionado a classes sombreadas do Kafka?
O Databricks requer o uso de classes Kafka sombreadas (prefixadas com kafkashaded. ou shadedmskiam.). Se você vir erros como RESTRICTED_STREAMING_OPTION_PERMISSION_ENFORCED, você deve usar os nomes de classe sombreados:
org.apache.kafka.*As classes exigem o prefixokafkashaded.. Por exemplo:kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModulesoftware.amazon.msk.*As classes exigem o prefixoshadedmskiam.. Por exemplo:shadedmskiam.software.amazon.msk.auth.iam.IAMLoginModule
Por que estou recebendo um TimeoutException ao me conectar ao Kafka?
As causas comuns incluem:
- Conectividade de rede : O cluster compute não consegue alcançar os brokers Kafka . Verifique as regras do firewall, os grupos de segurança e as configurações da VPC.
- Servidores de inicialização incorretos : Verifique se o hostname e a porta
kafka.bootstrap.serversestão corretos. - Resolução de DNS : Certifique-se de que o nome do host do broker Kafka possa ser resolvido a partir da rede Databricks .
- Problemas com SSL/TLS : Se estiver usando SSL, verifique se os certificados estão configurados corretamente.
Para configurações de peering de Private Link ou VPC, certifique-se de que as rotas de rede corretas estejam configuradas.
Devo usar lotes ou modo de transmissão para Kafka?
Depende do seu caso de uso:
- modo de transmissão (
spark.readStream): Use quando precisar de processamento contínuo de dados ou ingestão de baixa latência. - modo lotes (
spark.read): Use para carregamentos de dados únicos, preenchimentos retroativos ou. Requer ambosstartingOffsetseendingOffsets.
Consulte Configurar intervalos de disparo de transmissão estruturada para obter detalhes sobre a configuração de intervalos de disparo como AvailableNow, ProcessingTime e tempo real mode.
Posso ler de vários tópicos Kafka em uma única transmissão?
Sim, você pode usar:
subscribe: Forneça uma lista de tópicos separados por vírgulas, por exemplo.option("subscribe", "topic1,topic2").subscribePattern: Use um padrão regex Java para corresponder aos nomes dos tópicos, por exemplo.option("subscribePattern", "topic-.*").
Como faço para usar Kafka com o pipeline declarativo LakeFlow Spark ?
O pipeline declarativo LakeFlow Spark oferece suporte nativo para fontes Kafka . Você pode definir uma tabela de transmissão que lê do Kafka:
- Python
- SQL
import dlt
@dlt.table
def kafka_bronze():
return (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:port>")
.option("subscribe", "<topic>")
.load()
)
CREATE OR REFRESH STREAMING TABLE kafka_bronze AS
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<server:port>',
subscribe => '<topic>'
);
Consulte Carregar dados no pipeline para obter mais detalhes sobre as fontes de transmissão no pipeline declarativo do LakeFlow Spark .
Como faço para desserializar as colunas key e valor Kafka ?
As colunas key e value são retornadas como binárias (tipo BINARY ). Utilize operações de DataFrame para desserializá-los com base no formato dos seus dados:
- dados de strings : Use
cast("string")para converter binário em strings. - DadosJSON : Use
from_json()após a conversão para strings. Veja a funçãofrom_json. - Dados Avro : Use
from_avro()para desserializar dados codificados em Avro. Consulte Ler e gravar dados de transmissão Avro. - Protocol buffers : Use
from_protobuf()para desserializar dados protobuf. Consulte a seção "Ler e escrever buffers de protocolo".
Por que estou recebendo um erro de escrita idempotente?
Databricks Runtime 13.3 LTS e versões superiores incluem uma versão mais recente da biblioteca kafka-clients que habilita gravações idempotentes por default. Se o seu cluster Kafka usar a versão 2.8.0 ou inferior com ACLs configuradas, mas sem IDEMPOTENT_WRITE habilitado, a gravação falhará com: org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state.
Resolva esse erro atualizando para a versão 2.8.0 ou superior Kafka , ou definindo .option("kafka.enable.idempotence", "false") ao configurar seu gravador de transmissão estruturada.
O que é KAFKA_DATA_LOSS_ERROR e como posso resolvê-lo?
Esse erro ocorre quando a fonte Kafka detecta que os offsets armazenados no checkpoint não estão mais disponíveis no Kafka, normalmente porque:
- A transmissão foi pausada por mais tempo que o período de retenção Kafka .
- Os dados do tópico do Kafka foram excluídos ou o tópico foi recriado.
- O broker Kafka sofreu perda de dados.
Para resolver:
- Se a perda de dados for aceitável : Defina
.option("failOnDataLoss", "false")para permitir que a transmissão continue a partir do primeiro ponto de interrupção disponível. - Se a perda de dados não for aceitável : Reset o ponto de verificação e reprocesse a partir dos deslocamentos
earliestou restaure os dados Kafka ausentes.
Consulte Condição de erro KAFKA_DATA_LOSS para obter mais informações.
Como faço para controlar a taxa de leitura de dados do Kafka?
Use a opção maxOffsetsPerTrigger para limitar o número de offsets (aproximadamente o número de registros) processados por microlote. Isso ajuda a evitar grandes lotes que poderiam sobrecarregar o processamento subsequente ou causar problemas de memória ao recuperar o atraso.
- Python
- Scala
- SQL
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:port>")
.option("subscribe", "<topic>")
.option("maxOffsetsPerTrigger", 10000)
.load()
)
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:port>")
.option("subscribe", "<topic>")
.option("maxOffsetsPerTrigger", 10000)
.load()
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<server:port>',
subscribe => '<topic>',
maxOffsetsPerTrigger => '10000'
);
Alternativamente, use opções como minPartitions ou maxRecordsPerPartition para controlar quantas partições Spark são criadas para cada lote.
Como posso monitorar o quão defasada minha transmissão está em relação aos offsets mais recentes Kafka ?
Utilize as métricas avgOffsetsBehindLatest, maxOffsetsBehindLatest e minOffsetsBehindLatest disponíveis no progresso da consulta de transmissão. Esses relatórios indicam quantas defasagens (offsets) sua transmissão está atrasada em relação à última defasagem disponível, considerando todas as partições de tópico inscritas. Veja consultas de monitoramento transmissão estruturada no Databricks.
Você também pode usar estimatedTotalBytesBehindLatest para estimar o total de bytes de dados que ainda não foram processados.
Por que minhas métricas de atraso de offset do Kafka continuam mostrando valores diferentes de zero após a atualização para o Databricks Runtime 17.1?
No Databricks Runtime 17.1 e versões superiores, os offsets mais recentes Kafka são obtidos após a conclusão de cada microlote. Em tópicos que recebem dados continuamente, as métricas de backlog podem apresentar valores pequenos, porém persistentes, diferentes de zero. Esse comportamento é esperado e não indica que a transmissão esteja atrasada.
No Databricks Runtime 17.0 e versões anteriores, os offsets mais recentes Kafka são obtidos no instante de início dos microlotes. As métricas do backlog poderão retornar 0 quando as consultas de transmissão consumirem consistentemente todos os registros disponíveis no início dos microlotes.
Se os valores forem altos ou estiverem crescendo continuamente, a transmissão pode não estar conseguindo acompanhar os dados recebidos. Veja consultas de monitoramento transmissão estruturada no Databricks.
Por que a inicialização do meu sistema Kafka está lenta?
A transmissão Kafka requer tempo para:
- Conecte-se ao cluster Kafka e busque os metadados.
- Descubra as partições de tópicos.
- Obter deslocamentos iniciais.
Para clusters Kafka on-premises ou remotos, a latência da rede pode impactar significativamente o tempo de inicialização. Se você estiver executando um pipeline acionado/agendado com reinicializações frequentes, considere usar o modo de transmissão contínua para evitar a sobrecarga de inicialização repetida.
Por que adicionar mais executores Spark não está aumentando minha taxa de transferência Kafka ?
Quando os brokers Kafka ficam saturados, adicionar mais executores Spark aumenta o custo sem aumentar a taxa de transferência.
Sinais de que o Kafka é o gargalo:
- A taxa de transferência estabiliza apesar da adição de mais núcleos.
- O uso da CPU ou da rede pelo broker Kafka está elevado.
- A tarefa Spark é concluída rapidamente, mas é preciso aguardar novos dados.
Para resolver isso, dimensione seu cluster Kafka adicionando brokers ou aumentando o número de partições para distribuir a carga.
Como posso otimizar o custo e a utilização de recursos compute para transmissões Kafka ?
Para os modos microlotes e AvailableNow:
- Dimensionar corretamente o cluster : Monitore as métricas e defina um tamanho fixo adequado para o cluster, considerando a carga máxima.
- Use
maxOffsetsPerTrigger: Limite o tamanho dos lotes para controlar o uso de recursos durante picos de carga. - Evite o escalonamento automático : a transmissão da execução do trabalho continuamente e a adição ou remoção de nós causa sobrecarga de reequilíbrio da tarefa.
- Reduzir a distorção de dados : Partições distorcidas fazem com que algumas tarefas processem significativamente mais dados do que outras, levando a tarefas atrasadas que retardam a conclusão geral dos lotes e desperdiçam recursos compute em tarefas paralelas. Use a opção
minPartitionspara dividir partições grandes do Kafka em partições menores do Spark para um processamento mais equilibrado.
Para o modo tempo real, o dimensionamento correto é especialmente importante porque a tarefa pode ficar parado enquanto aguarda dados. Principais considerações:
- Defina
maxPartitionspara que cada tarefa lide com várias partições do Kafka para reduzir a sobrecarga. - Ajuste
spark.sql.shuffle.partitionspara Job com muita variação aleatória.
Consulte a seção de dimensionamento de computação para obter orientações sobre como dimensionar clusters para o modo tempo real.
Por que minha transmissão não está retornando nenhum registro, mesmo que existam dados no tópico?
As causas comuns incluem:
- Configuração
startingOffsetsincorreta : O valor default élatest, que lê apenas novos dados que chegam após o início da transmissão. DefinastartingOffsetsaearliestpara ler os dados existentes. - Nome do tópico incorreto : verifique se você está se inscrevendo no tópico correto.
- Problemas de autenticação : Sua transmissão pode ter se conectado com sucesso, mas não possui permissões para ler o tópico. Verifique suas ACLs do Kafka.
- Expiração do offset : Se sua transmissão foi interrompida por um longo tempo e os offsets no checkpoint expiraram (foram excluídos pela retenção Kafka ), você pode precisar redefinir o checkpoint ou ajustar
failOnDataLoss.