Perguntas frequentes
Perguntas frequentes sobre como usar o Kafka com o Databricks.
Por que recebo um erro informando que uma opção do Kafka não é suportada ou não é reconhecida?
Este erro ocorre caso você se esqueça de usar o prefixo kafka. ao definir as opções de configuração do cliente Kafka. Todas as opções passadas diretamente para o cliente Kafka devem ser prefixadas com kafka.:
O código a seguir mostra opções incorretas sem o prefixo kafka.:
.option("security.protocol", "SASL_SSL")
.option("sasl.mechanism", "PLAIN")
O código a seguir mostra as opções corretas.
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "PLAIN")
Opções para o conector Spark Kafka (como subscribe, startingOffsets, maxOffsetsPerTrigger) não exigem o prefixo. Para a lista completa de opções, consulte Kafka.
Por que estou recebendo um erro relacionado a classes sombreadas do Kafka?
A Databricks requer o uso de classes Kafka sombreadas (prefixadas com kafkashaded. ou shadedmskiam.). Se você encontrar erros, como RESTRICTED_STREAMING_OPTION_PERMISSION_ENFORCED, deve utilizar os nomes de classe sombreados:
org.apache.kafka.*As classes exigem o prefixokafkashaded.. Por exemplo:kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModulesoftware.amazon.msk.*As classes exigem o prefixoshadedmskiam.. Por exemplo:shadedmskiam.software.amazon.msk.auth.iam.IAMLoginModule
Por que estou recebendo um TimeoutException ao me conectar ao Kafka?
As causas comuns incluem:
- Conectividade de rede : O cluster compute não consegue alcançar os brokers Kafka . Verifique as regras do firewall, os grupos de segurança e as configurações da VPC.
- Servidores de inicialização incorretos : Verifique se o hostname e a porta
kafka.bootstrap.serversestão corretos. - Resolução de DNS: Verifique se os hostnames do broker Kafka podem ser resolvidos a partir da rede Databricks.
- Problemas com SSL/TLS : Se estiver usando SSL, verifique se os certificados estão configurados corretamente.
Para configurações de Private Link ou emparelhamento de VPC, verifique se as rotas de rede corretas estão em vigor.
Devo usar lotes ou modo de transmissão para Kafka?
Depende do seu caso de uso:
- modo de transmissão (
spark.readStream): Use quando precisar de processamento contínuo de dados ou ingestão de baixa latência. - modo lotes (
spark.read): Use para carregamentos de dados únicos, preenchimentos retroativos ou. Requer ambosstartingOffsetseendingOffsets.
Consulte Configurar intervalos de disparo de transmissão estruturada para obter detalhes sobre a configuração de intervalos de disparo como AvailableNow, ProcessingTime e tempo real mode.
Posso ler de vários tópicos Kafka em uma única transmissão?
Sim, você pode usar:
subscribe: Forneça uma lista de tópicos separados por vírgulas, por exemplo.option("subscribe", "topic1,topic2").subscribePattern: Use um padrão regex Java para corresponder aos nomes dos tópicos, por exemplo.option("subscribePattern", "topic-.*").
Como faço para usar Kafka com o pipeline declarativo LakeFlow Spark ?
Lakeflow Spark Declarative Pipelines oferece suporte integrado para fontes Kafka.
É possível definir uma tabela de transmissão que lê do Kafka, como no seguinte código:
- Python
- SQL
import dlt
@dlt.table
def kafka_bronze():
return (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:port>")
.option("subscribe", "<topic>")
.load()
)
CREATE OR REFRESH STREAMING TABLE kafka_bronze AS
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<server:port>',
subscribe => '<topic>'
);
Consulte Carregar dados no pipeline para obter mais detalhes sobre as fontes de transmissão no pipeline declarativo do LakeFlow Spark .
Como faço para desserializar as colunas key e valor Kafka ?
As colunas key e value são do tipo BINARY. Utilize operações do DataFrame para desserializá-los com base no seu formato de dados:
- dados de strings : Use
cast("string")para converter binário em strings. - DadosJSON : Use
from_json()após a conversão para strings. Veja a funçãofrom_json. - Dados Avro : Use
from_avro()para desserializar dados codificados em Avro. Consulte Ler e gravar dados de transmissão Avro. - Protocol buffers : Use
from_protobuf()para desserializar dados protobuf. Consulte a seção "Ler e escrever buffers de protocolo".
Por que estou recebendo um erro de escrita idempotente?
Databricks Runtime 13.3 LTS e versões superiores incluem uma versão mais recente da biblioteca kafka-clients que habilita gravações idempotentes por default. Se o seu cluster Kafka usar a versão 2.8.0 ou inferior com ACLs configuradas, mas sem IDEMPOTENT_WRITE habilitado, a gravação falhará com: org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state.
Resolva esse erro atualizando para a versão 2.8.0 ou superior Kafka , ou definindo .option("kafka.enable.idempotence", "false") ao configurar seu gravador de transmissão estruturada.
O que é KAFKA_DATA_LOSS_ERROR e como posso resolvê-lo?
Esse erro ocorre quando a fonte Kafka detecta que os offsets armazenados no checkpoint não estão mais disponíveis no Kafka, normalmente porque:
- A transmissão foi pausada por mais tempo que o período de retenção Kafka .
- Os dados do tópico do Kafka foram excluídos ou o tópico foi recriado.
- O broker Kafka sofreu perda de dados.
Para resolver:
- Se a perda de dados for aceitável : Defina
.option("failOnDataLoss", "false")para permitir que a transmissão continue a partir do primeiro ponto de interrupção disponível. - Se a perda de dados não for aceitável : Reset o ponto de verificação e reprocesse a partir dos deslocamentos
earliestou restaure os dados Kafka ausentes.
Consulte Condição de erro KAFKA_DATA_LOSS para obter mais informações.
Como faço para controlar a taxa de leitura de dados do Kafka?
Use a opção maxOffsetsPerTrigger para limitar o número de offsets (aproximadamente o número de registros) processados por microlote. Isso ajuda a evitar grandes lotes que poderiam sobrecarregar o processamento subsequente ou causar problemas de memória ao recuperar o atraso.
- Python
- Scala
- SQL
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:port>")
.option("subscribe", "<topic>")
.option("maxOffsetsPerTrigger", 10000)
.load()
)
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:port>")
.option("subscribe", "<topic>")
.option("maxOffsetsPerTrigger", 10000)
.load()
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<server:port>',
subscribe => '<topic>',
maxOffsetsPerTrigger => '10000'
);
Alternativamente, use opções como minPartitions ou maxRecordsPerPartition para controlar quantas partições Spark são criadas para cada lote.
Como posso monitorar o quão defasada minha transmissão está em relação aos offsets mais recentes Kafka ?
Utilize as métricas avgOffsetsBehindLatest, maxOffsetsBehindLatest e minOffsetsBehindLatest disponíveis no progresso da consulta de transmissão. Esses relatórios indicam quantas defasagens (offsets) sua transmissão está atrasada em relação à última defasagem disponível, considerando todas as partições de tópico inscritas. Veja consultas de monitoramento transmissão estruturada no Databricks.
Você também pode usar estimatedTotalBytesBehindLatest para estimar o total de bytes de dados que ainda não foram processados.
Por que minhas métricas de atraso de offset do Kafka continuam mostrando valores diferentes de zero após a atualização para o Databricks Runtime 17.1?
No Databricks Runtime 17.1 e versões superiores, os offsets mais recentes Kafka são obtidos após a conclusão de cada microlote. Em tópicos que recebem dados continuamente, as métricas de backlog podem apresentar valores pequenos, porém persistentes, diferentes de zero. Esse comportamento é esperado e não indica que a transmissão esteja atrasada.
No Databricks Runtime 17.0 e versões anteriores, os offsets mais recentes Kafka são obtidos no instante de início dos microlotes. As métricas do backlog poderão retornar 0 quando as consultas de transmissão consumirem consistentemente todos os registros disponíveis no início dos microlotes.
Se os valores forem altos ou estiverem crescendo continuamente, a transmissão pode não estar conseguindo acompanhar os dados recebidos. Veja consultas de monitoramento transmissão estruturada no Databricks.
Por que a inicialização do meu sistema Kafka está lenta?
A transmissão Kafka requer tempo para:
- Conecte-se ao cluster Kafka e busque os metadados.
- Descubra as partições de tópicos.
- Obter deslocamentos iniciais.
Para clusters Kafka on-premises ou remotos, a latência da rede pode impactar significativamente o tempo de inicialização. Se você estiver executando um pipeline acionado/agendado com reinicializações frequentes, considere usar o modo de transmissão contínua para evitar a sobrecarga de inicialização repetida.
Por que a adição de mais executores do Spark não aumenta a taxa de transferência do meu Kafka?
Quando os brokers Kafka ficam saturados, adicionar mais executores Spark aumenta o custo sem aumentar a taxa de transferência.
Sinais de que o Kafka é o gargalo:
- A taxa de transferência estabiliza apesar da adição de mais núcleos.
- O uso da CPU ou da rede pelo broker Kafka está elevado.
- A tarefa Spark é concluída rapidamente, mas é preciso aguardar novos dados.
Para resolver isso, dimensione seu cluster Kafka adicionando brokers ou aumentando o número de partições para distribuir a carga.
Como posso otimizar o custo e a utilização de recursos compute para transmissões Kafka ?
Para os modos microlotes e AvailableNow:
- Dimensionar corretamente o cluster : Monitore as métricas e defina um tamanho fixo adequado para o cluster, considerando a carga máxima.
- Use
maxOffsetsPerTrigger: Limite o tamanho dos lotes para controlar o uso de recursos durante picos de carga. - Evite o escalonamento automático : a transmissão da execução do trabalho continuamente e a adição ou remoção de nós causa sobrecarga de reequilíbrio da tarefa.
- Reduzir a distorção de dados : Partições distorcidas fazem com que algumas tarefas processem significativamente mais dados do que outras, levando a tarefas atrasadas que retardam a conclusão geral dos lotes e desperdiçam recursos compute em tarefas paralelas. Use a opção
minPartitionspara dividir partições grandes do Kafka em partições menores do Spark para um processamento mais equilibrado.
Para o modo em tempo real, dimensionar o compute é especialmente importante porque as tarefas podem permanecer paradas enquanto aguardam dados. As principais considerações:
- Defina
maxPartitionspara que cada tarefa lide com várias partições do Kafka para reduzir a sobrecarga. - Ajuste
spark.sql.shuffle.partitionspara Job com muita variação aleatória.
Consulte a seção de dimensionamento de computação para obter orientações sobre como dimensionar clusters para o modo tempo real.
Por que minha transmissão não está retornando nenhum registro, mesmo que existam dados no tópico?
As causas comuns incluem:
- Configuração
startingOffsetsincorreta : O valor default élatest, que lê apenas novos dados que chegam após o início da transmissão. DefinastartingOffsetsaearliestpara ler os dados existentes. - Nome do tópico incorreto : verifique se você está se inscrevendo no tópico correto.
- Problemas de autenticação : Sua transmissão pode ter se conectado com sucesso, mas não possui permissões para ler o tópico. Verifique suas ACLs do Kafka.
- Expiração do offset : Se sua transmissão foi interrompida por um longo tempo e os offsets no checkpoint expiraram (foram excluídos pela retenção Kafka ), você pode precisar redefinir o checkpoint ou ajustar
failOnDataLoss.