Pular para o conteúdo principal

Opções

Esta página descreve as opções de configuração para leitura e gravação no Apache Kafka usando transmissão estruturada no Databricks.

O conector Databricks Kafka é construído sobre o conector Apache Spark Kafka e suporta todas as opções de configuração padrão do Kafka. Qualquer opção com o prefixo kafka. é passada diretamente para o cliente Kafka subjacente. Por exemplo, .option("kafka.max.poll.records", "500") define a propriedade max.poll.records do consumidor Kafka. Consulte a documentação de configuração do Kafka para obter a lista completa das propriedades disponíveis do Kafka.

Para opções adicionais de origem e destino de transmissão estruturada não listadas nesta página, consulte o guia transmissão estruturada + Integração Kafka.

Opções obrigatórias

A seguinte opção é obrigatória tanto para leitura quanto para escrita:

Opção

Valor

Descrição

kafka.bootstrap.servers

Uma lista de hosts separados por vírgulas

A configuração Kafka bootstrap.servers . Se você não encontrar dados do Kafka, verifique primeiro a lista de endereços do broker. Se a lista de endereços do corretor estiver incorreta, pode não haver erros. Isso ocorre porque o cliente Kafka pressupõe que os brokers ficarão disponíveis eventualmente e, em caso de erros de rede, tentará novamente indefinidamente.

Ao ler dados do Kafka, você também deve especificar uma das seguintes opções para identificar quais tópicos consumir:

Opção

Valor

Descrição

subscribe

Uma lista de tópicos separados por vírgulas

A lista de tópicos para se inscrever.

subscribePattern

Strings de expressões regulares Java

O padrão usado para assinar tópicos.

assign

Cadeias JSON {"topicA":[0,1],"topicB":[2,4]}

Tópico específico Partições a serem consumidas.

Ao escrever para o Kafka, você pode opcionalmente definir a opção topic para especificar um tópico de destino para todas as linhas. Se não estiver definido, o DataFrame deve incluir uma coluna topic .

Opções comuns de leitura

As seguintes opções são comumente usadas ao ler dados do Kafka:

Opção

Valor

Padrão

Descrição

minPartitions

INT

Nenhuma

Número mínimo de partições para leitura do Kafka. Normalmente, o Spark cria uma partição por tópico do Kafka. Aumentar esse valor divide as grandes partições do Kafka em partições menores do Spark, aumentando o paralelismo. Útil para lidar com distorção de dados ou picos de carga. Observação: Habilitar essa opção reinicializa os consumidores do Kafka a cada gatilho, o que pode afetar o desempenho ao usar SSL.

maxRecordsPerPartition

LONG

Nenhuma

Número máximo de registros por partição do Spark. Quando configurado, o Spark divide as partições do Kafka de forma que cada partição do Spark tenha, no máximo, essa quantidade de registros. Pode ser usado com minPartitions; quando ambos estão definidos, o Spark usa aquele que resultar em mais partições.

failOnDataLoss

BOOLEAN

true

Indica se a consulta deve ser rejeitada quando houver possibilidade de perda de dados. As consultas podem falhar permanentemente na leitura de dados do Kafka devido a vários cenários, como tópicos excluídos, truncamento de tópicos antes do processamento e assim por diante. Tentamos estimar, de forma conservadora, se houve ou não perda de dados. Às vezes, isso pode causar alarmes falsos. Defina esta opção como false se não funcionar como esperado ou se desejar que a consulta continue a ser processada apesar da perda de dados.

maxOffsetsPerTrigger

LONG

Nenhuma

[somente transmissão] Limite de taxa para o número máximo de offsets processados por intervalo de gatilho. O número total de deslocamentos é dividido proporcionalmente entre as partições do tópico. Para um controle de fluxo mais avançado, você também pode usar minOffsetsPerTrigger (deslocamentos mínimos antes do acionamento) e maxTriggerDelay (tempo máximo de espera, default 15m). Consulte o guia de integração do Spark com o Kafka para obter detalhes.

startingOffsets

earliest, latest ou strings JSON

latest

Determina onde iniciar a leitura quando uma consulta é iniciada. Use earliest para ler a partir dos primeiros deslocamentos disponíveis, latest para ler apenas novos dados após o início da transmissão ou uma string JSON para especificar um deslocamento inicial para cada partição do tópico (por exemplo, {"topicA":{"0":23,"1":-2},"topicB":{"0":-2}}). No JSON, -2 refere-se ao mais antigo e -1 ao mais recente. Para consultas de transmissão, isso só se aplica quando uma nova consulta é iniciada; a retomada sempre continua de onde a consulta parou. As partições recém-descobertas começam em earliest. Nota: Para consultas de lotes, latest (seja implicitamente ou usando -1 em JSON) não é permitido. Para começar a partir de um carimbo de data/hora específico, use startingTimestamp ou startingOffsetsByTimestamp.

endingOffsets

latest ou strings JSON

latest

[somente lotes] O ponto final quando uma consulta lotes é encerrada. Use latest para ler até os deslocamentos mais recentes ou uma string JSON para especificar um deslocamento final para cada partição do tópico (por exemplo, {"topicA":{"0":50,"1":-1},"topicB":{"0":-1}}). No JSON, -1 refere-se ao mais recente; -2 (mais antigo) não é permitido. Para terminar em um carimbo de data/hora específico, use endingTimestamp ou endingOffsetsByTimestamp.

groupIdPrefix

STRING

spark-kafka-source (transmissão) ou spark-kafka-relation (lotes)

Prefixo para o ID do grupo de consumidores gerado automaticamente. O conector gera automaticamente um group.id exclusivo para cada consulta; esta opção personaliza o prefixo desse ID gerado. Ignorado se kafka.group.id estiver definido.

kafka.group.id

STRING

Nenhuma

ID do grupo a ser usado durante a leitura do Kafka. Use com cautela. Por default, cada consulta gera um ID de grupo exclusivo para leitura de dados. Isso garante que cada consulta tenha seu próprio grupo de consumidores, que não sofre interferência de nenhum outro consumidor e, portanto, pode ler todas as partições dos tópicos aos quais está inscrito. Em alguns cenários (por exemplo, autorização baseada em grupos do Kafka), você pode querer usar IDs de grupos autorizados específicos para ler dados. Você pode definir o ID do grupo, se desejar. No entanto, faça isso com extrema cautela, pois pode causar comportamentos inesperados. - Consultas executadas simultaneamente (tanto lotes quanto transmissões) com o mesmo ID de grupo provavelmente irão interferir umas com as outras, fazendo com que cada consulta leia apenas parte dos dados. Isso também pode ocorrer quando as consultas são iniciadas/reiniciadas em rápida sucessão. Para minimizar esses problemas, defina a configuração do consumidor Kafka session.timeout.ms para ser muito pequena.

includeHeaders

BOOLEAN

false

Indica se os cabeçalhos das mensagens do Kafka devem ser incluídos na saída.

bytesEstimateWindowLength

STRING

300s

[somente transmissão] Janela de tempo usada para estimar os bytes restantes através das estimatedTotalBytesBehindLatest métricas. Aceita strings de duração como 10m (10 minutos) ou 600s (600 segundos). Consulte Recuperar métricas do Kafka.

Opções comuns de escrita

As seguintes opções são comumente usadas ao escrever para o Kafka:

Opção

Valor

Padrão

Descrição

topic

STRING

Nenhuma

Define o tópico para todas as linhas. Isso substitui qualquer coluna topic nos dados.

includeHeaders

BOOLEAN

false

Indica se os cabeçalhos do Kafka devem ser incluídos na linha.

importante

Databricks Runtime 13.3 LTS e versões superiores incluem uma versão mais recente da biblioteca kafka-clients que habilita gravações idempotentes por default. Se o seu coletor Kafka usar a versão 2.8.0 ou inferior com ACLs configuradas, mas sem IDEMPOTENT_WRITE habilitado, as gravações falharão. Resolva isso atualizando para o Kafka 2.8.0 ou superior, ou definindo .option("kafka.enable.idempotence", "false").

Opções de autenticação

Databricks oferece suporte a vários métodos de autenticação para Kafka, incluindo credenciais de serviço Unity Catalog , SASL/SSL e opções específicas cloudpara AWS MSK, Azure Event Hubs e Google Cloud Kafka.

Databricks recomenda usar credenciais do serviço Unity Catalog para autenticação na cloud-gerenciar serviço Kafka :

Opção

Valor

Descrição

databricks.serviceCredential

STRING

O nome de uma credencial de serviço Unity Catalog para autenticação no serviço de gerenciamento cloud Kafka (AWS MSK, Azure Event Hubs ou Google Cloud Manager Kafka). Disponível no Databricks Runtime 16.1 e versões superiores.

databricks.serviceCredential.scope

STRING

O escopo OAuth para a credencial de serviço. Configure esta opção somente se o Databricks não conseguir inferir automaticamente o escopo do seu serviço Kafka.

Ao usar uma credencial de serviço do Unity Catalog, você não precisa especificar opções SASL/SSL como kafka.sasl.mechanism, kafka.sasl.jaas.config ou kafka.security.protocol.

As opções comuns de SASL/SSL incluem:

Opção

Valor

Descrição

kafka.security.protocol

STRING

Protocolo usado para comunicação com corretores (por exemplo, SASL_SSL, SSL, PLAINTEXT).

kafka.sasl.mechanism

STRING

Mecanismo SASL (por exemplo, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, OAUTHBEARER, AWS_MSK_IAM).

kafka.sasl.jaas.config

STRING

Cadeias de configuração de login JAAS.

kafka.sasl.login.callback.handler.class

STRING

Nome de classe totalmente qualificado de um manipulador de retorno de chamada de login para autenticação SASL.

kafka.sasl.client.callback.handler.class

STRING

Nome de classe totalmente qualificado de um manipulador de retorno de chamada do cliente para autenticação SASL.

kafka.ssl.truststore.location

STRING

Localização do arquivo de armazenamento de certificados SSL.

kafka.ssl.truststore.password

STRING

Senha para o arquivo de armazenamento de certificados SSL.

kafka.ssl.keystore.location

STRING

Localização do arquivo de armazenamento key SSL .

kafka.ssl.keystore.password

STRING

Senha para o arquivo de armazenamento key SSL .

Para obter instruções completas sobre a configuração da autenticação, consulte Autenticação.

Recursos adicionais