Pular para o conteúdo principal

processamento de transmissão com Apache Kafka e Databricks

Este artigo descreve como você pode usar o Apache Kafka como fonte ou coletor quando executar cargas de trabalho de Structured Streaming no Databricks.

Para mais informações sobre o Kafka, consulte a documentação do Kafka.

Ler dados do Kafka

Veja a seguir um exemplo de leitura de transmissão do Kafka:

Python
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
)

O Databricks também oferece suporte à semântica de leitura em lote para fontes de dados do Kafka, conforme mostrado no exemplo a seguir:

Python
df = (spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
)

Para o carregamento incremental de lotes, o site Databricks recomenda o uso do Kafka com Trigger.AvailableNow. Consulte Configuração do processamento de lotes incrementais.

Em Databricks Runtime 13.3 LTS e acima, Databricks fornece uma função SQL para leitura de dados Kafka. A transmissão com SQL é suportada somente em DLT ou com tabelas de transmissão em Databricks SQL. Veja read_kafka função com valor de tabela.

Configurar Kafka leitor de transmissão estruturada

O Databricks fornece a palavra-chave kafka como formato de dados para configurar conexões com o Kafka 0.10+.

A seguir estão as configurações mais comuns para o Kafka:

Há várias maneiras de especificar quais tópicos assinar. Você deve fornecer somente um destes parâmetros:

Opção

Valor

Descrição

assinar

Uma lista de tópicos separados por vírgula.

A lista de tópicos para se inscrever.

subscribePattern

String Java regex.

O padrão usado para assinar tópicos.

atribuir

Sequência JSON {"topicA":[0,1],"topic":[2,4]}.

Tópico específico Partições a serem consumidas.

Outras configurações notáveis:

Opção

Valor

Valor padrão

Descrição

kafka.bootstrap.servers

Lista separada por vírgula do host.

vazio

[Obrigatório] A configuração do Kafka bootstrap.servers. Se você descobrir que não há dados do Kafka, verifique primeiro a lista de endereços do broker. Se a lista de endereços do despachante estiver incorreta, poderá não haver erros. Isso ocorre porque o cliente Kafka presume que os corretores acabarão ficando disponíveis em algum momento e, no caso de erros de rede, ficará tentando indefinidamente.

failOnDataLoss

true ou false.

true

[Opcional] Se a consulta deve falhar quando houver a possibilidade de os dados terem sido perdidos. As consultas podem falhar definitivamente na leitura de dados do Kafka devido a vários cenários, como tópicos excluídos, truncamento de tópicos antes do processamento e assim por diante. Tentamos estimar de forma conservadora se os dados foram possivelmente perdidos ou não. Às vezes, isso pode causar alarmes falsos. Defina essa opção como false se não funcionar como esperado ou se você quiser que a consulta continue sendo processada apesar da perda de dados.

minPartitions

Inteiro >= 0, 0 = desabilitado.

0 (desativado)

[Opcional] Número mínimo de partições para ler de Kafka. Você pode configurar o Spark para utilizar um mínimo arbitrário de partições para ler a partir do Kafka utilizando a opção minPartitions. Normalmente, o Spark tem um mapeamento 1-1 de Kafka topicPartitions para partições Spark consumindo do Kafka. Se você definir a opção minPartitions com um valor maior do que seu tipicPartitions do Kafka, o Spark dividirá partições grandes do Kafka em partes menores. Essa opção pode ser definida em momentos de pico de carga, distorção de dados e à medida que a transmissão ficar atrasada para aumentar a taxa de processamento. Há o custo de inicializar os consumidores do Kafka em cada gatilho, o que pode afetar o desempenho se você usar SSL na conexão com o Kafka.

kafka.group.id

Uma ID do grupo de consumidores Kafka.

não definido

[Opcional] ID do grupo a ser usado durante a leitura do Kafka. Use com cautela. Por padrão, cada consulta gera uma ID de grupo exclusiva para leitura de dados. Isso garante que cada consulta tenha seu próprio grupo de consumidores que não enfrente interferência de nenhum outro consumidor e, portanto, possa ler todas as partições de seus tópicos assinados. Em alguns cenários (por exemplo, autorização baseada em grupo Kafka), convém usar IDs de grupo autorizados específicos para ler dados. Opcionalmente, você pode definir a ID do grupo. No entanto, faça isso com extrema cautela, pois pode causar um comportamento inesperado. - É provável que as consultas em execução simultânea (lotes e transmissão) com o mesmo ID de grupo interfiram umas nas outras, fazendo com que cada consulta leia apenas parte dos dados. - Isso também pode ocorrer quando as consultas são iniciadas/reiniciadas em rápida sucessão. Para minimizar esses problemas, defina a configuração do consumidor Kafka session.timeout.ms para ser muito pequena.

startingOffsets

earliest , latest

latest

[Opcional] O ponto inicial quando uma consulta é iniciada, seja "earliest", que é a partir dos primeiros deslocamentos, ou uma cadeia de caracteres json especificando um deslocamento inicial para cada TopicPartition. No json, -2 como deslocamento pode ser usado para fazer referência ao mais antigo, -1 ao mais recente. Observação: para consultas em lote, a opção latest (implicitamente ou com -1 em json) não é permitida. Para consultas de transmissão, isso só se aplica quando uma nova consulta for iniciada, e essa retomada sempre continuará de onde a consulta parou. As partições recém-descobertas durante uma consulta começarão o mais cedo possível.

Consulte o Guia de integração de transmissão estruturada do Kafka para obter outras configurações opcionais.

Esquema para registros do Kafka

O esquema dos registros do Kafka é:

Coluna

Tipo

chave

binário

valor

binário

tópico

string

partição

int

deslocamento

long

carimbo de data/hora

long

timestampType

int

O key e o value são sempre desserializados como matrizes de bytes com o ByteArrayDeserializer. Utilize operações do DataFrame (como cast("string")) para desserializar explicitamente as chaves e valores.

Gravar dados no Kafka

Veja a seguir um exemplo de uma gravação em transmissão para o Kafka:

Python
(df
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.start()
)

O Databricks também oferece suporte à semântica de gravação em lote nos coletores de dados do Kafka, conforme mostrado no exemplo a seguir:

Python
(df
.write
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.save()
)

Configurar o gravador Kafka transmissão estructurada

important

Databricks Runtime 13.3 LTS e acima incluem uma versão mais recente da biblioteca kafka-clients que permite gravações idempotentes por default. Se um sink Kafka usar a versão 2.8.0 ou abaixo com ACLs configuradas, mas sem IDEMPOTENT_WRITE ativado, a gravação falhará com a mensagem de erro org.apache.kafka.common.KafkaException:. Cannot execute transactional method because we are in an error state.

Para solucionar esse erro, atualize para a versão 2.8.0 ou superior do site Kafka ou defina .option(“kafka.enable.idempotence”, “false”) ao configurar o gravador de transmissão estruturada.

O esquema fornecido ao DataStreamWriter interage com o sink do Kafka. Você pode usar os seguintes campos:

Nome da coluna

Obrigatório ou opcional

Tipo

key

opcional

STRING ou BINARY

value

obrigatório

STRING ou BINARY

headers

opcional

ARRAY

topic

Opcional (ignorado se topic estiver definido como opção de writer)

STRING

partition

opcional

INT

Veja a seguir as opções comuns definidas ao gravar no Kafka:

Opção

Valor

Valor padrão

Descrição

kafka.boostrap.servers

Uma lista delimitada por vírgulas de <host:port>

nenhum

[Obrigatório] A configuração do Kafka bootstrap.servers.

topic

STRING

não definido

[Opcional] Define o tópico para todas as linhas a serem gravadas. Essa opção substitui todas as colunas de tópico existentes nos dados.

includeHeaders

BOOLEAN

false

[Opcional] Se os cabeçalhos do Kafka devem ser incluídos na linha.

Consulte o Guia de integração de transmissão estruturada do Kafka para obter outras configurações opcionais.

Recuperar métricas do Kafka

Você pode obter a média, o mínimo e o máximo do número de deslocamentos em que a consulta de transmissão estiver atrás do último deslocamento disponível entre todos os tópicos inscritos com as métricas avgOffsetsBehindLatest, maxOffsetsBehindLatest e minOffsetsBehindLatest. Consulte Leitura interativa de métricas.

nota

Disponível no Databricks Runtime 9.1e acima.

Obtenha o número total estimado de bytes que o processo de consulta não consumiu dos tópicos inscritos examinando o valor de estimatedTotalBytesBehindLatest. Essa estimativa baseia-se nos lotes que foram processados nos últimos 300 segundos. O período de tempo em que a estimativa se baseia pode ser alterado definindo-se a opção bytesEstimateWindowLength com um valor diferente. Por exemplo, para defini-lo com 10 minutos:

Python
df = (spark.readStream
.format("kafka")
.option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)

Se você estiver executando a transmissão em um notebook, poderá ver essas métricas na aba Dados brutos no painel de progresso da consulta de transmissão:

JSON
{
"sources": [
{
"description": "KafkaV2[Subscribe[topic]]",
"metrics": {
"avgOffsetsBehindLatest": "4.0",
"maxOffsetsBehindLatest": "4",
"minOffsetsBehindLatest": "4",
"estimatedTotalBytesBehindLatest": "80.0"
}
}
]
}

Use SSL para conectar o Databricks ao Kafka

Para ativar as conexões SSL com o Kafka, siga as instruções na documentação do Confluent Encryption and Authentication with SSL. Você pode fornecer as configurações descritas lá, prefixadas com kafka., como opções. Por exemplo, você especifica a localização do repositório confiável na propriedade kafka.ssl.truststore.location.

A Databricks recomenda que você:

O exemplo a seguir usa locais de armazenamento de objetos e segredos de Databricks para habilitar uma conexão SSL:

Python
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", ...)
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.ssl.truststore.location", <truststore-location>)
.option("kafka.ssl.keystore.location", <keystore-location>)
.option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
.option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)

Use Amazon gerenciar transmissão para Kafka com IAM

info

Visualização

Esse recurso está em Public Preview em Databricks Runtime 13.3 LTS e acima.

O senhor pode usar Databricks para se conectar a Amazon gerenciar transmissão para Kafka (MSK) usando IAM. Para obter instruções de configuração do MSK, consulte Configuração do Amazon MSK.

nota

As configurações a seguir só são necessárias se o senhor estiver usando o IAM para se conectar ao MSK. O senhor também pode configurar as conexões com o MSK usando as opções fornecidas pelo conector do Apache Spark Kafka.

Databricks recomenda que o senhor gerencie sua conexão com a MSK usando o site instance profile. Veja o perfil da instância.

Você deve configurar as seguintes opções para conectar ao MSK com um perfil de instância:

Scala
"kafka.sasl.mechanism" -> "AWS_MSK_IAM",
"kafka.sasl.jaas.config" ->
"shadedmskiam.software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.client.callback.handler.class" ->
"shadedmskiam.software.amazon.msk.auth.iam.IAMClientCallbackHandler"

Opcionalmente, o senhor pode configurar sua conexão com o MSK com um usuário IAM ou IAM role em vez de um instance profile. O senhor deve fornecer valores para seu acesso AWS key e chave secreta usando as variáveis ambientais AWS_ACCESS_KEY_ID e AWS_SECRET_ACCESS_KEY. Consulte Usar um segredo em uma propriedade de configuração ou variável de ambiente do Spark.

Além disso, se você optar por configurar sua conexão usando uma função do IAM, deverá modificar o valor fornecido a kafka.sasl.jaas.config para incluir o ARN da função, como no exemplo a seguir: shadedmskiam.software.amazon.msk.auth.iam.IAMLoginModule required awsRoleArn="arn:aws:iam::123456789012:role/msk_client_role".

autenticação de entidade de serviço com Microsoft Entra ID e Azure Event Hubs

Databricks suporta a autenticação do Spark Job com o serviço Event Hubs. Essa autenticação é feita via OAuth com o Microsoft Entra ID.

Diagrama de autenticação AAD

Databricks suporta a autenticação Microsoft Entra ID com uma ID de cliente e um segredo nos seguintes ambientes compute:

  • Databricks Runtime 12.2 LTS e acima em compute configurado com modo de acesso dedicado (anteriormente, modo de acesso de usuário único).
  • Databricks Runtime 14.3 LTS e acima em compute configurado com o modo de acesso padrão (antigo modo de acesso compartilhado).
  • Pipeline DLT configurado sem Unity Catalog.

Databricks não é compatível com a autenticação Microsoft Entra ID com um certificado em qualquer ambiente compute ou no pipeline DLT configurado com Unity Catalog.

Essa autenticação não funciona em compute com modo de acesso padrão ou em Unity Catalog DLT.

Configuração da transmissão estruturada Kafka Connector

Para realizar a autenticação com o Microsoft Entra ID, o senhor precisará dos seguintes valores:

  • A tenant ID. O senhor pode encontrar isso no site Microsoft Entra ID serviço tab.

  • Um ClientID (também conhecido como ID do aplicativo).

  • Um segredo de cliente. Quando tiver isso, o senhor deve adicioná-lo como um segredo ao seu Databricks Workspace. Para adicionar esse segredo, consulte Gerenciamento de segredos.

  • Um tópico do EventHubs. Você pode encontrar uma lista de tópicos na seção Hubs de Eventos , na seção Entidades , em uma página específica de namespace de hubs de eventos . Para trabalhar com vários tópicos, o senhor pode definir o endereço IAM role no nível dos Event Hubs.

  • Um servidor EventHubs. Você pode encontrar isso na página de visão geral do seu namespace específico dos Hubs de Eventos :

    Namespace dos Hubs de Eventos

Além disso, para usar o Entra ID, precisamos dizer ao Kafka para usar o mecanismo OAuth SASL (SASL é um protocolo genérico, e OAuth é um tipo de "mecanismo" SASL):

  • kafka.security.protocol deveria ser SASL_SSL
  • kafka.sasl.mechanism deveria ser OAUTHBEARER
  • kafka.sasl.login.callback.handler.class deve ser um nome totalmente qualificado da classe Java com um valor de kafkashaded para o manipulador de retorno de chamada de login da nossa classe Kafka sombreada. Veja o exemplo a seguir para ver a classe exata.

Exemplo

A seguir, vamos ver um exemplo em execução:

Python
# This is the only section you need to modify for auth purposes!
# ------------------------------
tenant_id = "..."
client_id = "..."
client_secret = dbutils.secrets.get("your-scope", "your-secret-name")

event_hubs_server = "..."
event_hubs_topic = "..."
# -------------------------------

sasl_config = f'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="{client_id}" clientSecret="{client_secret}" scope="https://{event_hubs_server}/.default" ssl.protocol="SSL";'

kafka_options = {
# Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers": f"{event_hubs_server}:9093",
"kafka.sasl.jaas.config": sasl_config,
"kafka.sasl.oauthbearer.token.endpoint.url": f"https://login.microsoft.com/{tenant_id}/oauth2/v2.0/token",
"subscribe": event_hubs_topic,

# You should not need to modify these
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class": "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
}

df = spark.readStream.format("kafka").options(**kafka_options)

display(df)

Lidando com possíveis erros

  • não há suporte para opções de transmissão.

    Se o senhor tentar usar esse mecanismo de autenticação em um pipeline DLT configurado com o Unity Catalog, poderá receber o seguinte erro:

    Erro de transmissão não suportado

    Para resolver esse erro, use uma configuração compatível do site compute. Veja a autenticação de entidade de serviço com Microsoft Entra ID e Azure Event Hubs.

  • Falha ao criar um novo KafkaAdminClient.

    Esse é um erro interno que o Kafka gera se alguma das opções de autenticação a seguir estiver incorreta:

    • ID do cliente (também conhecida como ID do aplicativo)
    • ID do locatário
    • Servidor EventHubs

    Para resolver o erro, verifique se os valores estão corretos para essas opções.

    Além disso, esse erro poderá ocorrer se o senhor modificar as opções de configuração fornecidas por default no exemplo (que foram solicitadas a não modificar), como kafka.security.protocol.

  • Não há registros sendo devolvidos

    Se o senhor estiver tentando exibir ou processar o DataFrame, mas não estiver obtendo resultados, verá o seguinte na interface do usuário.

    Nenhuma mensagem de resultados

    Essa mensagem significa que a autenticação foi bem-sucedida, mas o EventHubs não retornou nenhum dado. Algumas razões possíveis (embora não exaustivas) são:

    • Você especificou o tópico errado do EventHubs.
    • A opção de configuração default Kafka para startingOffsets é latest e o senhor ainda não está recebendo nenhum dado por meio do tópico. O senhor pode definir startingOffsetstoearliest para começar a ler os dados a partir dos primeiros offsets do Kafka.