processamento de transmissão com Apache Kafka e Databricks
Este artigo descreve como você pode usar o Apache Kafka como fonte ou coletor quando executar cargas de trabalho de Structured Streaming no Databricks.
Para mais informações sobre o Kafka, consulte a documentação do Kafka.
Ler dados do Kafka
Veja a seguir um exemplo de leitura de transmissão do Kafka:
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
)
O Databricks também oferece suporte à semântica de leitura em lote para fontes de dados do Kafka, conforme mostrado no exemplo a seguir:
df = (spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
)
Para o carregamento incremental de lotes, o site Databricks recomenda o uso do Kafka com Trigger.AvailableNow
. Consulte Configuração do processamento de lotes incrementais.
Em Databricks Runtime 13.3 LTS e acima, Databricks fornece uma função SQL para leitura de dados Kafka. A transmissão com SQL é suportada somente em DLT ou com tabelas de transmissão em Databricks SQL. Veja read_kafka
função com valor de tabela.
Configurar Kafka leitor de transmissão estruturada
O Databricks fornece a palavra-chave kafka
como formato de dados para configurar conexões com o Kafka 0.10+.
A seguir estão as configurações mais comuns para o Kafka:
Há várias maneiras de especificar quais tópicos assinar. Você deve fornecer somente um destes parâmetros:
Opção | Valor | Descrição |
---|---|---|
assinar | Uma lista de tópicos separados por vírgula. | A lista de tópicos para se inscrever. |
subscribePattern | String Java regex. | O padrão usado para assinar tópicos. |
atribuir | Sequência JSON | Tópico específico Partições a serem consumidas. |
Outras configurações notáveis:
Opção | Valor | Valor padrão | Descrição |
---|---|---|---|
kafka.bootstrap.servers | Lista separada por vírgula do host. | vazio | [Obrigatório] A configuração do Kafka |
failOnDataLoss |
|
| [Opcional] Se a consulta deve falhar quando houver a possibilidade de os dados terem sido perdidos. As consultas podem falhar definitivamente na leitura de dados do Kafka devido a vários cenários, como tópicos excluídos, truncamento de tópicos antes do processamento e assim por diante. Tentamos estimar de forma conservadora se os dados foram possivelmente perdidos ou não. Às vezes, isso pode causar alarmes falsos. Defina essa opção como |
minPartitions | Inteiro >= 0, 0 = desabilitado. | 0 (desativado) | [Opcional] Número mínimo de partições para ler de Kafka. Você pode configurar o Spark para utilizar um mínimo arbitrário de partições para ler a partir do Kafka utilizando a opção |
kafka.group.id | Uma ID do grupo de consumidores Kafka. | não definido | [Opcional] ID do grupo a ser usado durante a leitura do Kafka. Use com cautela. Por padrão, cada consulta gera uma ID de grupo exclusiva para leitura de dados. Isso garante que cada consulta tenha seu próprio grupo de consumidores que não enfrente interferência de nenhum outro consumidor e, portanto, possa ler todas as partições de seus tópicos assinados. Em alguns cenários (por exemplo, autorização baseada em grupo Kafka), convém usar IDs de grupo autorizados específicos para ler dados. Opcionalmente, você pode definir a ID do grupo. No entanto, faça isso com extrema cautela, pois pode causar um comportamento inesperado. - É provável que as consultas em execução simultânea (lotes e transmissão) com o mesmo ID de grupo interfiram umas nas outras, fazendo com que cada consulta leia apenas parte dos dados. - Isso também pode ocorrer quando as consultas são iniciadas/reiniciadas em rápida sucessão. Para minimizar esses problemas, defina a configuração do consumidor Kafka |
startingOffsets | earliest , latest | latest | [Opcional] O ponto inicial quando uma consulta é iniciada, seja "earliest", que é a partir dos primeiros deslocamentos, ou uma cadeia de caracteres json especificando um deslocamento inicial para cada TopicPartition. No json, -2 como deslocamento pode ser usado para fazer referência ao mais antigo, -1 ao mais recente. Observação: para consultas em lote, a opção latest (implicitamente ou com -1 em json) não é permitida. Para consultas de transmissão, isso só se aplica quando uma nova consulta for iniciada, e essa retomada sempre continuará de onde a consulta parou. As partições recém-descobertas durante uma consulta começarão o mais cedo possível. |
Consulte o Guia de integração de transmissão estruturada do Kafka para obter outras configurações opcionais.
Esquema para registros do Kafka
O esquema dos registros do Kafka é:
Coluna | Tipo |
---|---|
chave | binário |
valor | binário |
tópico | string |
partição | int |
deslocamento | long |
carimbo de data/hora | long |
timestampType | int |
O key
e o value
são sempre desserializados como matrizes de bytes com o ByteArrayDeserializer
. Utilize operações do DataFrame (como cast("string")
) para desserializar explicitamente as chaves e valores.
Gravar dados no Kafka
Veja a seguir um exemplo de uma gravação em transmissão para o Kafka:
(df
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.start()
)
O Databricks também oferece suporte à semântica de gravação em lote nos coletores de dados do Kafka, conforme mostrado no exemplo a seguir:
(df
.write
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.save()
)
Configurar o gravador Kafka transmissão estructurada
Databricks Runtime 13.3 LTS e acima incluem uma versão mais recente da biblioteca kafka-clients
que permite gravações idempotentes por default. Se um sink Kafka usar a versão 2.8.0 ou abaixo com ACLs configuradas, mas sem IDEMPOTENT_WRITE
ativado, a gravação falhará com a mensagem de erro org.apache.kafka.common.KafkaException:
. Cannot execute transactional method because we are in an error state
.
Para solucionar esse erro, atualize para a versão 2.8.0 ou superior do site Kafka ou defina .option(“kafka.enable.idempotence”, “false”)
ao configurar o gravador de transmissão estruturada.
O esquema fornecido ao DataStreamWriter interage com o sink do Kafka. Você pode usar os seguintes campos:
Nome da coluna | Obrigatório ou opcional | Tipo |
---|---|---|
| opcional |
|
| obrigatório |
|
| opcional |
|
| Opcional (ignorado se |
|
| opcional |
|
Veja a seguir as opções comuns definidas ao gravar no Kafka:
Opção | Valor | Valor padrão | Descrição |
---|---|---|---|
| Uma lista delimitada por vírgulas de | nenhum | [Obrigatório] A configuração do Kafka |
|
| não definido | [Opcional] Define o tópico para todas as linhas a serem gravadas. Essa opção substitui todas as colunas de tópico existentes nos dados. |
|
|
| [Opcional] Se os cabeçalhos do Kafka devem ser incluídos na linha. |
Consulte o Guia de integração de transmissão estruturada do Kafka para obter outras configurações opcionais.
Recuperar métricas do Kafka
Você pode obter a média, o mínimo e o máximo do número de deslocamentos em que a consulta de transmissão estiver atrás do último deslocamento disponível entre todos os tópicos inscritos com as métricas avgOffsetsBehindLatest
, maxOffsetsBehindLatest
e minOffsetsBehindLatest
. Consulte Leitura interativa de métricas.
Disponível no Databricks Runtime 9.1e acima.
Obtenha o número total estimado de bytes que o processo de consulta não consumiu dos tópicos inscritos examinando o valor de estimatedTotalBytesBehindLatest
. Essa estimativa baseia-se nos lotes que foram processados nos últimos 300 segundos. O período de tempo em que a estimativa se baseia pode ser alterado definindo-se a opção bytesEstimateWindowLength
com um valor diferente. Por exemplo, para defini-lo com 10 minutos:
df = (spark.readStream
.format("kafka")
.option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)
Se você estiver executando a transmissão em um notebook, poderá ver essas métricas na aba Dados brutos no painel de progresso da consulta de transmissão:
{
"sources": [
{
"description": "KafkaV2[Subscribe[topic]]",
"metrics": {
"avgOffsetsBehindLatest": "4.0",
"maxOffsetsBehindLatest": "4",
"minOffsetsBehindLatest": "4",
"estimatedTotalBytesBehindLatest": "80.0"
}
}
]
}
Use SSL para conectar o Databricks ao Kafka
Para ativar as conexões SSL com o Kafka, siga as instruções na documentação do Confluent Encryption and Authentication with SSL. Você pode fornecer as configurações descritas lá, prefixadas com kafka.
, como opções. Por exemplo, você especifica a localização do repositório confiável na propriedade kafka.ssl.truststore.location
.
A Databricks recomenda que você:
- Armazene seus certificados no armazenamento de objetos na nuvem. O senhor pode restringir o acesso aos certificados apenas aos clusters que podem acessar Kafka. Veja governança de dados com Unity Catalog.
- Armazene as senhas dos certificados como segredos em um escopo secreto.
O exemplo a seguir usa locais de armazenamento de objetos e segredos de Databricks para habilitar uma conexão SSL:
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", ...)
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.ssl.truststore.location", <truststore-location>)
.option("kafka.ssl.keystore.location", <keystore-location>)
.option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
.option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)
Use Amazon gerenciar transmissão para Kafka com IAM
Visualização
Esse recurso está em Public Preview em Databricks Runtime 13.3 LTS e acima.
O senhor pode usar Databricks para se conectar a Amazon gerenciar transmissão para Kafka (MSK) usando IAM. Para obter instruções de configuração do MSK, consulte Configuração do Amazon MSK.
As configurações a seguir só são necessárias se o senhor estiver usando o IAM para se conectar ao MSK. O senhor também pode configurar as conexões com o MSK usando as opções fornecidas pelo conector do Apache Spark Kafka.
Databricks recomenda que o senhor gerencie sua conexão com a MSK usando o site instance profile. Veja o perfil da instância.
Você deve configurar as seguintes opções para conectar ao MSK com um perfil de instância:
- Scala
- Python
"kafka.sasl.mechanism" -> "AWS_MSK_IAM",
"kafka.sasl.jaas.config" ->
"shadedmskiam.software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.client.callback.handler.class" ->
"shadedmskiam.software.amazon.msk.auth.iam.IAMClientCallbackHandler"
"kafka.sasl.mechanism": "AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"shadedmskiam.software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.client.callback.handler.class":
"shadedmskiam.software.amazon.msk.auth.iam.IAMClientCallbackHandler"
Opcionalmente, o senhor pode configurar sua conexão com o MSK com um usuário IAM ou IAM role em vez de um instance profile. O senhor deve fornecer valores para seu acesso AWS key e chave secreta usando as variáveis ambientais AWS_ACCESS_KEY_ID
e AWS_SECRET_ACCESS_KEY
. Consulte Usar um segredo em uma propriedade de configuração ou variável de ambiente do Spark.
Além disso, se você optar por configurar sua conexão usando uma função do IAM, deverá modificar o valor fornecido a kafka.sasl.jaas.config
para incluir o ARN da função, como no exemplo a seguir: shadedmskiam.software.amazon.msk.auth.iam.IAMLoginModule required awsRoleArn="arn:aws:iam::123456789012:role/msk_client_role"
.
autenticação de entidade de serviço com Microsoft Entra ID e Azure Event Hubs
Databricks suporta a autenticação do Spark Job com o serviço Event Hubs. Essa autenticação é feita via OAuth com o Microsoft Entra ID.
Databricks suporta a autenticação Microsoft Entra ID com uma ID de cliente e um segredo nos seguintes ambientes compute:
- Databricks Runtime 12.2 LTS e acima em compute configurado com modo de acesso dedicado (anteriormente, modo de acesso de usuário único).
- Databricks Runtime 14.3 LTS e acima em compute configurado com o modo de acesso padrão (antigo modo de acesso compartilhado).
- Pipeline DLT configurado sem Unity Catalog.
Databricks não é compatível com a autenticação Microsoft Entra ID com um certificado em qualquer ambiente compute ou no pipeline DLT configurado com Unity Catalog.
Essa autenticação não funciona em compute com modo de acesso padrão ou em Unity Catalog DLT.
Configuração da transmissão estruturada Kafka Connector
Para realizar a autenticação com o Microsoft Entra ID, o senhor precisará dos seguintes valores:
-
A tenant ID. O senhor pode encontrar isso no site Microsoft Entra ID serviço tab.
-
Um ClientID (também conhecido como ID do aplicativo).
-
Um segredo de cliente. Quando tiver isso, o senhor deve adicioná-lo como um segredo ao seu Databricks Workspace. Para adicionar esse segredo, consulte Gerenciamento de segredos.
-
Um tópico do EventHubs. Você pode encontrar uma lista de tópicos na seção Hubs de Eventos , na seção Entidades , em uma página específica de namespace de hubs de eventos . Para trabalhar com vários tópicos, o senhor pode definir o endereço IAM role no nível dos Event Hubs.
-
Um servidor EventHubs. Você pode encontrar isso na página de visão geral do seu namespace específico dos Hubs de Eventos :
Além disso, para usar o Entra ID, precisamos dizer ao Kafka para usar o mecanismo OAuth SASL (SASL é um protocolo genérico, e OAuth é um tipo de "mecanismo" SASL):
kafka.security.protocol
deveria serSASL_SSL
kafka.sasl.mechanism
deveria serOAUTHBEARER
kafka.sasl.login.callback.handler.class
deve ser um nome totalmente qualificado da classe Java com um valor dekafkashaded
para o manipulador de retorno de chamada de login da nossa classe Kafka sombreada. Veja o exemplo a seguir para ver a classe exata.
Exemplo
A seguir, vamos ver um exemplo em execução:
- Python
- Scala
# This is the only section you need to modify for auth purposes!
# ------------------------------
tenant_id = "..."
client_id = "..."
client_secret = dbutils.secrets.get("your-scope", "your-secret-name")
event_hubs_server = "..."
event_hubs_topic = "..."
# -------------------------------
sasl_config = f'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="{client_id}" clientSecret="{client_secret}" scope="https://{event_hubs_server}/.default" ssl.protocol="SSL";'
kafka_options = {
# Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers": f"{event_hubs_server}:9093",
"kafka.sasl.jaas.config": sasl_config,
"kafka.sasl.oauthbearer.token.endpoint.url": f"https://login.microsoft.com/{tenant_id}/oauth2/v2.0/token",
"subscribe": event_hubs_topic,
# You should not need to modify these
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class": "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
}
df = spark.readStream.format("kafka").options(**kafka_options)
display(df)
// This is the only section you need to modify for auth purposes!
// -------------------------------
val tenantId = "..."
val clientId = "..."
val clientSecret = dbutils.secrets.get("your-scope", "your-secret-name")
val eventHubsServer = "..."
val eventHubsTopic = "..."
// -------------------------------
val saslConfig = s"""kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="$clientId" clientSecret="$clientSecret" scope="https://$eventHubsServer/.default" ssl.protocol="SSL";"""
val kafkaOptions = Map(
// Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers" -> s"$eventHubsServer:9093",
"kafka.sasl.jaas.config" -> saslConfig,
"kafka.sasl.oauthbearer.token.endpoint.url" -> s"https://login.microsoft.com/$tenantId/oauth2/v2.0/token",
"subscribe" -> eventHubsTopic,
// You should not need to modify these
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class" -> "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
)
val scalaDF = spark.readStream
.format("kafka")
.options(kafkaOptions)
.load()
display(scalaDF)
Lidando com possíveis erros
-
não há suporte para opções de transmissão.
Se o senhor tentar usar esse mecanismo de autenticação em um pipeline DLT configurado com o Unity Catalog, poderá receber o seguinte erro:
Para resolver esse erro, use uma configuração compatível do site compute. Veja a autenticação de entidade de serviço com Microsoft Entra ID e Azure Event Hubs.
-
Falha ao criar um novo
KafkaAdminClient
.Esse é um erro interno que o Kafka gera se alguma das opções de autenticação a seguir estiver incorreta:
- ID do cliente (também conhecida como ID do aplicativo)
- ID do locatário
- Servidor EventHubs
Para resolver o erro, verifique se os valores estão corretos para essas opções.
Além disso, esse erro poderá ocorrer se o senhor modificar as opções de configuração fornecidas por default no exemplo (que foram solicitadas a não modificar), como
kafka.security.protocol
. -
Não há registros sendo devolvidos
Se o senhor estiver tentando exibir ou processar o DataFrame, mas não estiver obtendo resultados, verá o seguinte na interface do usuário.
Essa mensagem significa que a autenticação foi bem-sucedida, mas o EventHubs não retornou nenhum dado. Algumas razões possíveis (embora não exaustivas) são:
- Você especificou o tópico errado do EventHubs.
- A opção de configuração default Kafka para
startingOffsets
élatest
e o senhor ainda não está recebendo nenhum dado por meio do tópico. O senhor pode definirstartingOffsetstoearliest
para começar a ler os dados a partir dos primeiros offsets do Kafka.