Pular para o conteúdo principal

Perguntas frequentes

Perguntas frequentes sobre como usar o Kafka com o Databricks.

Por que recebo um erro informando que uma opção do Kafka não é suportada ou não é reconhecida?

Um erro comum é esquecer o prefixo kafka. ao configurar opções nativas do Kafka. Todas as opções passadas diretamente para o cliente Kafka devem ser prefixadas com kafka.:

Python
# Incorrect - missing the kafka. prefix
.option("security.protocol", "SASL_SSL")
.option("sasl.mechanism", "PLAIN")

# Correct - using the kafka. prefix
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "PLAIN")

Opções específicas do conector Spark Kafka (como subscribe, startingOffsets, maxOffsetsPerTrigger) não precisam do prefixo. Consulte a seção Opções para obter a lista completa.

Por que estou recebendo um erro relacionado a classes sombreadas do Kafka?

O Databricks requer o uso de classes Kafka sombreadas (prefixadas com kafkashaded. ou shadedmskiam.). Se você vir erros como RESTRICTED_STREAMING_OPTION_PERMISSION_ENFORCED, você deve usar os nomes de classe sombreados:

  • org.apache.kafka.* As classes exigem o prefixo kafkashaded. . Por exemplo: kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule
  • software.amazon.msk.* As classes exigem o prefixo shadedmskiam. . Por exemplo: shadedmskiam.software.amazon.msk.auth.iam.IAMLoginModule

Por que estou recebendo um TimeoutException ao me conectar ao Kafka?

As causas comuns incluem:

  • Conectividade de rede : O cluster compute não consegue alcançar os brokers Kafka . Verifique as regras do firewall, os grupos de segurança e as configurações da VPC.
  • Servidores de inicialização incorretos : Verifique se o hostname e a porta kafka.bootstrap.servers estão corretos.
  • Resolução de DNS : Certifique-se de que o nome do host do broker Kafka possa ser resolvido a partir da rede Databricks .
  • Problemas com SSL/TLS : Se estiver usando SSL, verifique se os certificados estão configurados corretamente.

Para configurações de peering de Private Link ou VPC, certifique-se de que as rotas de rede corretas estejam configuradas.

Devo usar lotes ou modo de transmissão para Kafka?

Depende do seu caso de uso:

  • modo de transmissão (spark.readStream): Use quando precisar de processamento contínuo de dados ou ingestão de baixa latência.
  • modo lotes (spark.read): Use para carregamentos de dados únicos, preenchimentos retroativos ou. Requer ambos startingOffsets e endingOffsets.

Consulte Configurar intervalos de disparo de transmissão estruturada para obter detalhes sobre a configuração de intervalos de disparo como AvailableNow, ProcessingTime e tempo real mode.

Posso ler de vários tópicos Kafka em uma única transmissão?

Sim, você pode usar:

  • subscribe : Forneça uma lista de tópicos separados por vírgulas, por exemplo .option("subscribe", "topic1,topic2").
  • subscribePattern : Use um padrão regex Java para corresponder aos nomes dos tópicos, por exemplo .option("subscribePattern", "topic-.*").

Como faço para usar Kafka com o pipeline declarativo LakeFlow Spark ?

O pipeline declarativo LakeFlow Spark oferece suporte nativo para fontes Kafka . Você pode definir uma tabela de transmissão que lê do Kafka:

Python
import dlt

@dlt.table
def kafka_bronze():
return (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:port>")
.option("subscribe", "<topic>")
.load()
)

Consulte Carregar dados no pipeline para obter mais detalhes sobre as fontes de transmissão no pipeline declarativo do LakeFlow Spark .

Como faço para desserializar as colunas key e valor Kafka ?

As colunas key e value são retornadas como binárias (tipo BINARY ). Utilize operações de DataFrame para desserializá-los com base no formato dos seus dados:

Por que estou recebendo um erro de escrita idempotente?

Databricks Runtime 13.3 LTS e versões superiores incluem uma versão mais recente da biblioteca kafka-clients que habilita gravações idempotentes por default. Se o seu cluster Kafka usar a versão 2.8.0 ou inferior com ACLs configuradas, mas sem IDEMPOTENT_WRITE habilitado, a gravação falhará com: org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state.

Resolva esse erro atualizando para a versão 2.8.0 ou superior Kafka , ou definindo .option("kafka.enable.idempotence", "false") ao configurar seu gravador de transmissão estruturada.

O que é KAFKA_DATA_LOSS_ERROR e como posso resolvê-lo?

Esse erro ocorre quando a fonte Kafka detecta que os offsets armazenados no checkpoint não estão mais disponíveis no Kafka, normalmente porque:

  • A transmissão foi pausada por mais tempo que o período de retenção Kafka .
  • Os dados do tópico do Kafka foram excluídos ou o tópico foi recriado.
  • O broker Kafka sofreu perda de dados.

Para resolver:

  • Se a perda de dados for aceitável : Defina .option("failOnDataLoss", "false") para permitir que a transmissão continue a partir do primeiro ponto de interrupção disponível.
  • Se a perda de dados não for aceitável : Reset o ponto de verificação e reprocesse a partir dos deslocamentos earliest ou restaure os dados Kafka ausentes.

Consulte Condição de erro KAFKA_DATA_LOSS para obter mais informações.

Como faço para controlar a taxa de leitura de dados do Kafka?

Use a opção maxOffsetsPerTrigger para limitar o número de offsets (aproximadamente o número de registros) processados por microlote. Isso ajuda a evitar grandes lotes que poderiam sobrecarregar o processamento subsequente ou causar problemas de memória ao recuperar o atraso.

Python
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:port>")
.option("subscribe", "<topic>")
.option("maxOffsetsPerTrigger", 10000)
.load()
)

Alternativamente, use opções como minPartitions ou maxRecordsPerPartition para controlar quantas partições Spark são criadas para cada lote.

Como posso monitorar o quão defasada minha transmissão está em relação aos offsets mais recentes Kafka ?

Utilize as métricas avgOffsetsBehindLatest, maxOffsetsBehindLatest e minOffsetsBehindLatest disponíveis no progresso da consulta de transmissão. Esses relatórios indicam quantas defasagens (offsets) sua transmissão está atrasada em relação à última defasagem disponível, considerando todas as partições de tópico inscritas. Veja consultas de monitoramento transmissão estruturada no Databricks.

Você também pode usar estimatedTotalBytesBehindLatest para estimar o total de bytes de dados que ainda não foram processados.

Por que minhas métricas de atraso de offset do Kafka continuam mostrando valores diferentes de zero após a atualização para o Databricks Runtime 17.1?

No Databricks Runtime 17.1 e versões superiores, os offsets mais recentes Kafka são obtidos após a conclusão de cada microlote. Em tópicos que recebem dados continuamente, as métricas de backlog podem apresentar valores pequenos, porém persistentes, diferentes de zero. Esse comportamento é esperado e não indica que a transmissão esteja atrasada.

No Databricks Runtime 17.0 e versões anteriores, os offsets mais recentes Kafka são obtidos no instante de início dos microlotes. As métricas do backlog poderão retornar 0 quando as consultas de transmissão consumirem consistentemente todos os registros disponíveis no início dos microlotes.

Se os valores forem altos ou estiverem crescendo continuamente, a transmissão pode não estar conseguindo acompanhar os dados recebidos. Veja consultas de monitoramento transmissão estruturada no Databricks.

Por que a inicialização do meu sistema Kafka está lenta?

A transmissão Kafka requer tempo para:

  1. Conecte-se ao cluster Kafka e busque os metadados.
  2. Descubra as partições de tópicos.
  3. Obter deslocamentos iniciais.

Para clusters Kafka on-premises ou remotos, a latência da rede pode impactar significativamente o tempo de inicialização. Se você estiver executando um pipeline acionado/agendado com reinicializações frequentes, considere usar o modo de transmissão contínua para evitar a sobrecarga de inicialização repetida.

Por que adicionar mais executores Spark não está aumentando minha taxa de transferência Kafka ?

Quando os brokers Kafka ficam saturados, adicionar mais executores Spark aumenta o custo sem aumentar a taxa de transferência.

Sinais de que o Kafka é o gargalo:

  • A taxa de transferência estabiliza apesar da adição de mais núcleos.
  • O uso da CPU ou da rede pelo broker Kafka está elevado.
  • A tarefa Spark é concluída rapidamente, mas é preciso aguardar novos dados.

Para resolver isso, dimensione seu cluster Kafka adicionando brokers ou aumentando o número de partições para distribuir a carga.

Como posso otimizar o custo e a utilização de recursos compute para transmissões Kafka ?

Para os modos microlotes e AvailableNow:

  • Dimensionar corretamente o cluster : Monitore as métricas e defina um tamanho fixo adequado para o cluster, considerando a carga máxima.
  • Use maxOffsetsPerTrigger : Limite o tamanho dos lotes para controlar o uso de recursos durante picos de carga.
  • Evite o escalonamento automático : a transmissão da execução do trabalho continuamente e a adição ou remoção de nós causa sobrecarga de reequilíbrio da tarefa.
  • Reduzir a distorção de dados : Partições distorcidas fazem com que algumas tarefas processem significativamente mais dados do que outras, levando a tarefas atrasadas que retardam a conclusão geral dos lotes e desperdiçam recursos compute em tarefas paralelas. Use a opção minPartitions para dividir partições grandes do Kafka em partições menores do Spark para um processamento mais equilibrado.

Para o modo tempo real, o dimensionamento correto é especialmente importante porque a tarefa pode ficar parado enquanto aguarda dados. Principais considerações:

  • Defina maxPartitions para que cada tarefa lide com várias partições do Kafka para reduzir a sobrecarga.
  • Ajuste spark.sql.shuffle.partitions para Job com muita variação aleatória.

Consulte a seção de dimensionamento de computação para obter orientações sobre como dimensionar clusters para o modo tempo real.

Por que minha transmissão não está retornando nenhum registro, mesmo que existam dados no tópico?

As causas comuns incluem:

  • Configuração startingOffsets incorreta : O valor default é latest, que lê apenas novos dados que chegam após o início da transmissão. Defina startingOffsets a earliest para ler os dados existentes.
  • Nome do tópico incorreto : verifique se você está se inscrevendo no tópico correto.
  • Problemas de autenticação : Sua transmissão pode ter se conectado com sucesso, mas não possui permissões para ler o tópico. Verifique suas ACLs do Kafka.
  • Expiração do offset : Se sua transmissão foi interrompida por um longo tempo e os offsets no checkpoint expiraram (foram excluídos pela retenção Kafka ), você pode precisar redefinir o checkpoint ou ajustar failOnDataLoss.