Pular para o conteúdo principal

Autenticação

Esta página mostra os métodos de autenticação mais comuns para o conector Kafka no Databricks.

A lista completa de métodos de autenticação compatíveis pode ser encontrada na documentação do Kafka.

Conecte-se ao serviço de nuvem do Google gerenciado para Apache Kafka

Para autenticar-se no Serviço Gerenciado do Google Cloud para Apache Kafka, use uma credencial de serviço do Unity Catalog. É possível permitir que o Databricks configure a autenticação automaticamente ou configurar manualmente as opções SASL e OAUTHBEARER.

Conecte-se com as credenciais do serviço Unity Catalog.

No Databricks Runtime 18.0 e acima, o Databricks dá suporte a credenciais de serviço do Unity Catalog para autenticar o acesso ao serviço gerenciado do Google Cloud para Apache Kafka.

Os clusters do Serviço Gerenciado do Google Cloud para Apache Kafka são executados em uma VPC. Seu compute do Databricks deve ser executado em uma VPC que possa alcançar o cluster Kafka gerenciado. Para habilitar a conectividade, configure o VPC peering ou o Private Service Connect. Consulte Pareamento de rede VPC ou Private Service Connect na documentação do Google Cloud. Serverless compute não é compatível porque é executado no plano de controle gerenciado pelo Databricks.

Para autenticação, conceda à account de serviço do Google Cloud, criada para sua credencial de serviço do Unity Catalog, a seguinte função no projeto do Google Cloud que contém seu cluster Kafka gerenciado:

  • roles/managedkafka.client

Para obter instruções, consulte a seção "Conceder, alterar e revogar acesso" na documentação do Google Cloud.

O exemplo a seguir configura o Kafka como uma fonte usando uma credencial de serviço:

Python
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-hostname>:9092",
"subscribe": "<topic>",
"databricks.serviceCredential": "<service-credential-name>",
# Optional: set this only if Databricks can't infer the scope for your Kafka service.
# "databricks.serviceCredential.scope": "https://www.googleapis.com/auth/cloud-platform",
}

df = spark.read.format("kafka").options(**kafka_options).load()
nota

Quando você usa uma credencial de serviço do Unity Catalog para se conectar ao Kafka, não use as seguintes opções:

  • kafka.sasl.mechanism
  • kafka.sasl.jaas.config
  • kafka.security.protocol
  • kafka.sasl.client.callback.handler.class
  • kafka.sasl.login.callback.handler.class
  • kafka.sasl.oauthbearer.token.endpoint.url

Conecte-se com as opções da SASL

É possível autenticar-se no Google Cloud Serviço Gerenciado para Apache Kafka sem usar a opção de origem databricks.serviceCredential, especificando as opções SASL/OAUTHBEARER. Use esta abordagem somente se for necessário definir as opções SASL do Kafka explicitamente. Por exemplo, para usar um manipulador de callback específico.

Para se conectar com SASL, você deve usar uma credencial de serviço do Unity Catalog:

  • A credencial do serviço da account do serviço do Google Cloud deve ter roles/managedkafka.client no projeto do Google Cloud que contém seu cluster de gerenciamento Kafka .
  • Use o manipulador de callback de login fornecido pelo Databricks org.apache.spark.sql.kafka010.CustomGCPOAuthBearerLoginCallbackHandler. Não use com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler, que não está incluído no Databricks Runtime.

A autenticação com accounts de serviço do Google sem Unity Catalog não é suportada.

O exemplo a seguir configura o Kafka como uma origem com SASL:

Python
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-hostname>:9092",
"subscribe": "<topic>",
"startingOffsets": "earliest",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "OAUTHBEARER",
"kafka.sasl.jaas.config":
"kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;",
"kafka.sasl.login.callback.handler.class":
"org.apache.spark.sql.kafka010.CustomGCPOAuthBearerLoginCallbackHandler",
# Required by the login callback handler:
"kafka.databricks.serviceCredential": "<service-credential-name>",
# Optional:
# "kafka.databricks.serviceCredential.scope": "https://www.googleapis.com/auth/cloud-platform",
}

df = spark.readStream.format("kafka").options(**kafka_options).load()
nota

Não defina databricks.serviceCredential ao especificar as opções SASL. Se você definir databricks.serviceCredential, o Databricks configura a autenticação do Kafka automaticamente e impede a especificação das opções kafka.sasl.*.

Use SASL/PLAIN para autenticar

Para conectar-se ao Kafka usando autenticação SASL/PLAIN (nome de usuário e senha), configure as seguintes opções. Use o nome da classe sombreada PlainLoginModule :

Python
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-server>:9093",
"subscribe": "<topic>",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "PLAIN",
"kafka.sasl.jaas.config":
'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";',
}

df = spark.readStream.format("kafka").options(**kafka_options).load()

O Databricks recomenda que você armazene sua senha como um segredo em vez de incluí-la diretamente em seu código. Para obter mais informações, consulte Gerenciamento de segredos.

Use SASL/SCRAM para autenticar.

Para conectar-se ao Kafka usando SASL/SCRAM (SCRAM-SHA-256 ou SCRAM-SHA-512), configure as seguintes opções. Use o nome da classe sombreada ScramLoginModule :

Python
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-server>:9093",
"subscribe": "<topic>",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "SCRAM-SHA-512",
"kafka.sasl.jaas.config":
'kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";',
}

df = spark.readStream.format("kafka").options(**kafka_options).load()
nota

Substitua SCRAM-SHA-512 por SCRAM-SHA-256 se o seu cluster Kafka estiver configurado para usar SCRAM-SHA-256.

O Databricks recomenda que você armazene sua senha como um segredo em vez de incluí-la diretamente em seu código. Para obter mais informações, consulte Gerenciamento de segredos.

Use SSL para conectar o Databricks ao Kafka.

Para ativar as conexões SSL/TLS com o Kafka, defina kafka.security.protocol como SSL e forneça as opções de configuração do armazenamento de confiança e do armazenamento de chaves prefixadas com kafka.. Para conexões SSL que exigem apenas autenticação de servidor (TLS unidirecional), é necessário usar um armazenamento de confiança. Para TLS mútuo (mTLS), onde o broker Kafka também autentica o cliente, é necessário usar tanto um repositório de confiança quanto um repositório de chaves.

As seguintes opções de SSL/TLS estão disponíveis. Para obter a lista completa de propriedades SSL, consulte a documentação de configuração SSL do Apache Kafka e a seção Criptografia e Autenticação com SSL na documentação da Confluent.

Opção

Descrição

kafka.security.protocol

Defina como SSL para ativar a criptografia TLS.

kafka.ssl.truststore.location

Caminho para o arquivo de armazenamento de certificados confiáveis que contém os certificados de CA confiáveis.

kafka.ssl.truststore.password

Senha para o arquivo de armazenamento de certificados confiáveis.

kafka.ssl.truststore.type

Formato de arquivo de armazenamento confiável (default: JKS).

kafka.ssl.keystore.location

Caminho para o arquivo de armazenamento key contendo o certificado do cliente e key privada (necessário para mTLS).

kafka.ssl.keystore.password

Senha para o arquivo de armazenamento key .

kafka.ssl.key.password

Senha da key privada no repositório key .

kafka.ssl.endpoint.identification.algorithm

Algoritmo de verificação de nome de host. O valor padrão é https. Defina como uma string vazia para desativar.

Se você usa SSL, a Databricks recomenda que você:

  • Armazene seus certificados em um volume Unity Catalog . Os usuários que têm permissão de leitura do volume podem usar seus certificados Kafka. Para obter mais informações, consulte O que são volumes Unity Catalog ?.
  • Armazene as senhas dos seus certificados como segredos em um Escopo Secreto. Para mais informações, veja Gerenciando Escopo Secreto.

O exemplo a seguir usa locais de armazenamento de objetos e segredos de Databricks para habilitar uma conexão SSL:

Python
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<bootstrap-server>:9093")
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.truststore.location", <truststore-location>)
.option("kafka.ssl.keystore.location", <keystore-location>)
.option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
.option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)

Use os nomes de classe sombreados do Databricks para Kafka.

Databricks inclui versões proprietárias e personalizadas da biblioteca de clientes Kafka . Todos os nomes de classe do cliente Kafka que você referencia nas opções de configuração de autenticação devem usar o prefixo de nome de classe sombreado em vez do nome de classe padrão de código aberto. Isso se aplica a qualquer classe referenciada em opções como kafka.sasl.jaas.config, kafka.sasl.login.callback.handler.class e kafka.sasl.client.callback.handler.class.

Se você usar nomes de classe não sombreados, seu código gerará um erro RESTRICTED_STREAMING_OPTION_PERMISSION_ENFORCED. Consulte as Perguntas frequentes para obter mais detalhes.

Lidar com possíveis erros

  • Exceção missing gcp_options ao autenticar

    Essa exceção é lançada se o manipulador de retorno de chamada não conseguir inferir o escopo a partir da URL de inicialização. Defina databricks.serviceCredential.scope manualmente. Para a maioria dos casos de uso, defina-o como https://www.googleapis.com/auth/cloud-platform.

  • Não foram encontradas URLs de inicialização que pudessem ser resolvidas.

    Isso significa que o cluster compute não consegue resolver o hostname de inicialização. Verifique a configuração da sua VPC para garantir que o cluster compute consiga alcançar o cluster Kafka gerenciado. Configure o peering VPC ou o serviço privado Connect conforme necessário.

  • Problemas de permissão

    • Certifique-se de que a account de serviço tenha a vinculação IAM role roles/managedkafka.client no projeto ao qual o cluster Kafka pertence.
    • Certifique-se de que o cluster Kafka tenha as ACLs corretas definidas para o tópico e a conta de serviço. Consulte a seção "Controle de acesso com ACLs Kafka na documentação do Google Cloud.
  • Nenhum registro retornado

    Se a autenticação for bem-sucedida, mas nenhum dado for retornado:

    • Verifique se você está se inscrevendo no tópico correto.
    • O default startingOffsets é latest, que apenas lê novos dados. Defina startingOffsets a earliest para ler os dados existentes.