Pular para o conteúdo principal

Opções

Esta página descreve as opções de configuração para leitura e gravação no Apache Kafka usando transmissão estruturada no Databricks.

O conector Databricks Kafka é construído sobre o conector Apache Spark Kafka e suporta todas as opções de configuração padrão do Kafka. Qualquer opção com o prefixo kafka. é passada diretamente para o cliente Kafka subjacente. Por exemplo, .option("kafka.max.poll.records", "500") define a propriedade max.poll.records do consumidor Kafka. Consulte a documentação de configuração do Kafka para obter a lista completa das propriedades disponíveis do Kafka.

Para obter uma lista completa de opções de origem e destino de transmissão estruturada, consulte Kafka e o guia transmissão estruturada + Kafka Integration.

Opções obrigatórias

Para obter detalhes sobre as opções necessárias, consulte Kafka.

A seguinte opção é obrigatória tanto para leitura quanto para escrita:

Chave

Descrição

kafka.bootstrap.servers

Uma lista de hosts separados por vírgulas Endereços para brokers Kafka. Define a propriedade bootstrap.servers do cliente Kafka.

Se você não encontrar dados do Kafka, verifique esta lista de endereços de brokers em busca de endereços incorretos. Se a lista de endereços do corretor estiver incorreta, pode não haver erros. Os clientes do Kafka partem do princípio de que os brokers estarão disponíveis eventualmente e continuam tentando indefinidamente quando recebem erros de rede.

Para leituras do Kafka, você também deve especificar exatamente uma das seguintes opções para identificar quais tópicos consumir:

  • subscribe
  • subscribePattern
  • assign

Ao escrever para o Kafka, você pode opcionalmente definir a opção topic para especificar um tópico de destino para todas as linhas. Se não estiver definido, o DataFrame deve incluir uma coluna topic .

Opções comuns de leitura

As seguintes opções são comumente usadas ao ler dados do Kafka:

Chave

Descrição

minPartitions

O número mínimo de partições para leitura do Kafka.

maxRecordsPerPartition

O número máximo de registros por partição do Spark.

failOnDataLoss

Indica se a consulta deve ser rejeitada quando houver possibilidade de perda de dados.

maxOffsetsPerTrigger

Número máximo de deslocamentos processados por intervalo de disparo.

startingOffsets

O deslocamento a partir do qual a consulta inicia a leitura.

endingOffsets

Onde parar de ler para perguntas sobre lotes.

groupIdPrefix

Prefixo personalizado para o ID do grupo de consumidores gerado automaticamente.

kafka.group.id

O ID do grupo a ser usado durante a leitura do Kafka.

Use com cautela, pois pode causar comportamentos inesperados. Por default, cada consulta gera um ID de grupo exclusivo para leitura de dados. Isso garante que cada consulta tenha seu próprio grupo de consumidores, evitando interferências de outros consumidores e permitindo que cada consulta leia todas as partições dos tópicos aos quais está inscrita. Em alguns cenários, como a autorização baseada em grupos do Kafka, você pode usar IDs de grupos autorizados específicos para ler dados.

Consultas com o mesmo ID de grupo podem interferir umas com as outras e ler apenas dados parciais. A interferência pode ocorrer quando você executa lotes e transmite cargas de trabalho simultâneas ou quando você inicia e reinicia consultas em rápida sucessão.

Para minimizar problemas, defina a configuração do consumidor Kafka session.timeout.ms para ser muito pequena.

includeHeaders

Indica se os cabeçalhos das mensagens do Kafka devem ser incluídos na saída.

bytesEstimateWindowLength

O intervalo de tempo usado para estimar os bytes restantes através das métricas estimatedTotalBytesBehindLatest .

Para obter uma lista completa de opções de origem e destino de transmissão estruturada, consulte Kafka e o guia transmissão estruturada + Kafka Integration.

Opções comuns de escrita

As seguintes opções são comumente usadas ao escrever para o Kafka:

Chave

Descrição

topic

Define o tópico para todas as linhas. Isso tem precedência sobre qualquer coluna topic nos dados.

includeHeaders

Indica se os cabeçalhos do Kafka devem ser incluídos na linha.

importante

Databricks Runtime 13.3 LTS e versões superiores incluem uma versão mais recente da biblioteca kafka-clients que habilita gravações idempotentes por default. Se o seu coletor Kafka usar a versão 2.8.0 ou inferior com ACLs configuradas, mas sem IDEMPOTENT_WRITE habilitado, as gravações falharão. Resolva isso atualizando para o Kafka 2.8.0 ou superior, ou definindo .option("kafka.enable.idempotence", "false").

Para obter uma lista completa de opções de origem e destino de transmissão estruturada, consulte Kafka e o guia transmissão estruturada + Kafka Integration.

Opções de autenticação

Databricks oferece suporte a vários métodos de autenticação para Kafka, incluindo credenciais de serviço Unity Catalog , SASL/SSL e opções específicas cloudpara AWS MSK, Azure Event Hubs e Google Cloud Kafka.

Databricks recomenda que você use as credenciais do serviço Unity Catalog para autenticação no serviço Kafka cloud :

Opção

Descrição

databricks.serviceCredential

O nome de uma credencial de serviço Unity Catalog para autenticação no serviço de gerenciamento cloud Kafka (AWS MSK, Azure Event Hubs ou Google Cloud Manager Kafka). Disponível no Databricks Runtime 16.1 e versões superiores.

databricks.serviceCredential.scope

O escopo OAuth para a credencial de serviço. Configure esta opção somente se o Databricks não conseguir inferir automaticamente o escopo do seu serviço Kafka.

Ao usar uma credencial de serviço do Unity Catalog, você não precisa especificar opções SASL/SSL como kafka.sasl.mechanism, kafka.sasl.jaas.config ou kafka.security.protocol.

As opções comuns de SASL/SSL incluem:

Opção

Descrição

kafka.security.protocol

O protocolo usado para se comunicar com os corretores (por exemplo, SASL_SSL, SSL, PLAINTEXT).

kafka.sasl.mechanism

O mecanismo SASL (por exemplo, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, OAUTHBEARER, AWS_MSK_IAM).

kafka.sasl.jaas.config

As strings de configuração de login do JAAS.

kafka.sasl.login.callback.handler.class

O nome de classe totalmente qualificado de um manipulador de retorno de chamada de login para autenticação SASL.

kafka.sasl.client.callback.handler.class

Nome de classe totalmente qualificado de um manipulador de retorno de chamada do cliente para autenticação SASL.

kafka.ssl.truststore.location

Localização do arquivo de armazenamento de certificados SSL.

kafka.ssl.truststore.password

A senha do arquivo de armazenamento de certificados SSL.

kafka.ssl.keystore.location

Localização do arquivo de armazenamento key SSL .

kafka.ssl.keystore.password

A senha do arquivo de armazenamento key SSL .

Para obter instruções completas sobre a configuração da autenticação, consulte Autenticação.

Recursos adicionais