Autenticação
Esta página mostra os métodos de autenticação mais comuns para o conector Kafka no Databricks.
A lista completa de métodos de autenticação compatíveis pode ser encontrada na documentação do Kafka.
Conecte-se ao Amazon MSK com o IAM.
Você pode se conectar ao Amazon Gerenciamento de Transmissões para Kafka (MSK) a partir do Databricks usando autenticação baseada em IAM. Para obter instruções de configuração do MSK, consulte Configuração do Amazon MSK.
Se você usar IAM para se conectar ao MSK, deverá usar um dos métodos de conexão abaixo. Alternativamente, você pode configurar conexões com o MSK usando as opções fornecidas pelo conector Apache Spark Kafka.
Conecte-se com as credenciais do serviço Unity Catalog.
No Databricks Runtime 16.1 e superior, o Databricks oferece suporte a credenciais de serviço do Unity Catalog para autenticar o acesso ao AWS Managed Transmissão para Apache Kafka (MSK). O Databricks recomenda este método de autenticação se você usar clusters compartilhados ou compute serverless.
Para usar uma credencial de serviço do Unity Catalog para autenticação, faça o seguinte:
-
Criar uma nova credencial de serviço do Unity Catalog. Consulte Criar credenciais de serviço.
- Confirme que o IAM role anexado à sua credencial de serviço tenha as permissões corretas para se conectar ao seu cluster MSK.
-
Defina a opção de origem
databricks.serviceCredentialcomo o nome da sua credencial de serviço do Unity Catalog.
- Python
- Scala
- SQL
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-hostname>:9092",
"subscribe": "<topic>",
"databricks.serviceCredential": "<service-credential-name>",
}
df = spark.readStream.format("kafka").options(**kafka_options).load()
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> "<bootstrap-hostname>:9092",
"subscribe" -> "<topic>",
"databricks.serviceCredential" -> "<service-credential-name>",
)
val df = spark.readStream.format("kafka").options(kafkaOptions).load()
SELECT * FROM read_kafka(
bootstrapServers => '<bootstrap-hostname>:9092',
subscribe => '<topic>',
serviceCredential => '<service-credential-name>'
);
Quando você usa uma credencial de serviço do Unity Catalog para se conectar ao Kafka, não use as seguintes opções:
kafka.sasl.mechanismkafka.sasl.jaas.configkafka.security.protocolkafka.sasl.client.callback.handler.class
Conectar com o perfil da instância
Você pode usar um instance profile para se autenticar em clusters MSK da Amazon que têm a autenticação IAM habilitada. See instance profile.
Para conectar-se ao MSK usando um instance profile, configure as seguintes opções:
- Python
- Scala
"kafka.sasl.mechanism": "AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"shadedmskiam.software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.client.callback.handler.class":
"shadedmskiam.software.amazon.msk.auth.iam.IAMClientCallbackHandler"
"kafka.sasl.mechanism" -> "AWS_MSK_IAM",
"kafka.sasl.jaas.config" ->
"shadedmskiam.software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.client.callback.handler.class" ->
"shadedmskiam.software.amazon.msk.auth.iam.IAMClientCallbackHandler"
Conectar com usuários do IAM e IAM roles
Opcionalmente, você pode configurar sua conexão com o MSK com um usuário do IAM ou uma função do IAM em vez de um instance profile. Você deve fornecer valores para sua chave de acesso da AWS e chave secreta utilizando as variáveis de ambiente AWS_ACCESS_KEY_ID e AWS_SECRET_ACCESS_KEY. Consulte Use um segredo em uma propriedade de configuração ou variável de ambiente do Spark.
Para configurar sua conexão usando uma função do IAM, você deve modificar o valor fornecido a kafka.sasl.jaas.config para incluir o ARN da função, como no exemplo a seguir:
- Python
- Scala
"kafka.sasl.mechanism": "AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"shadedmskiam.software.amazon.msk.auth.iam.IAMLoginModule required awsRoleArn='arn:aws:iam::123456789012:role/msk_client_role'",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.client.callback.handler.class":
"shadedmskiam.software.amazon.msk.auth.iam.IAMClientCallbackHandler"
"kafka.sasl.mechanism" -> "AWS_MSK_IAM",
"kafka.sasl.jaas.config" ->
"shadedmskiam.software.amazon.msk.auth.iam.IAMLoginModule required awsRoleArn='arn:aws:iam::123456789012:role/msk_client_role'",
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.client.callback.handler.class" ->
"shadedmskiam.software.amazon.msk.auth.iam.IAMClientCallbackHandler"
Use SASL/PLAIN para autenticar
Para conectar-se ao Kafka usando autenticação SASL/PLAIN (nome de usuário e senha), configure as seguintes opções. Use o nome da classe sombreada PlainLoginModule :
- Python
- Scala
- SQL
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-server>:9093",
"subscribe": "<topic>",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "PLAIN",
"kafka.sasl.jaas.config":
'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";',
}
df = spark.readStream.format("kafka").options(**kafka_options).load()
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> "<bootstrap-server>:9093",
"subscribe" -> "<topic>",
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "PLAIN",
"kafka.sasl.jaas.config" ->
"""kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";""",
)
val df = spark.readStream.format("kafka").options(kafkaOptions).load()
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<bootstrap-server>:9093',
subscribe => '<topic>',
`kafka.security.protocol` => 'SASL_SSL',
`kafka.sasl.mechanism` => 'PLAIN',
`kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";'
);
O Databricks recomenda que você armazene sua senha como um segredo em vez de incluí-la diretamente em seu código. Para obter mais informações, consulte Gerenciamento de segredos.
Use SASL/SCRAM para autenticar.
Para conectar-se ao Kafka usando SASL/SCRAM (SCRAM-SHA-256 ou SCRAM-SHA-512), configure as seguintes opções. Use o nome da classe sombreada ScramLoginModule :
- Python
- Scala
- SQL
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-server>:9093",
"subscribe": "<topic>",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "SCRAM-SHA-512",
"kafka.sasl.jaas.config":
'kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";',
}
df = spark.readStream.format("kafka").options(**kafka_options).load()
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> "<bootstrap-server>:9093",
"subscribe" -> "<topic>",
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "SCRAM-SHA-512",
"kafka.sasl.jaas.config" ->
"""kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";""",
)
val df = spark.readStream.format("kafka").options(kafkaOptions).load()
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<bootstrap-server>:9093',
subscribe => '<topic>',
`kafka.security.protocol` => 'SASL_SSL',
`kafka.sasl.mechanism` => 'SCRAM-SHA-512',
`kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";'
);
Substitua SCRAM-SHA-512 por SCRAM-SHA-256 se o seu cluster Kafka estiver configurado para usar SCRAM-SHA-256.
O Databricks recomenda que você armazene sua senha como um segredo em vez de incluí-la diretamente em seu código. Para obter mais informações, consulte Gerenciamento de segredos.
Use SSL para conectar o Databricks ao Kafka.
Para ativar as conexões SSL/TLS com o Kafka, defina kafka.security.protocol como SSL e forneça as opções de configuração do armazenamento de confiança e do armazenamento de chaves prefixadas com kafka.. Para conexões SSL que exigem apenas autenticação de servidor (TLS unidirecional), é necessário usar um armazenamento de confiança. Para TLS mútuo (mTLS), onde o broker Kafka também autentica o cliente, é necessário usar tanto um repositório de confiança quanto um repositório de chaves.
As seguintes opções de SSL/TLS estão disponíveis. Para obter a lista completa de propriedades SSL, consulte a documentação de configuração SSL do Apache Kafka e a seção Criptografia e Autenticação com SSL na documentação da Confluent.
Opção | Descrição |
|---|---|
| Defina como |
| Caminho para o arquivo de armazenamento de certificados confiáveis que contém os certificados de CA confiáveis. |
| Senha para o arquivo de armazenamento de certificados confiáveis. |
| Formato de arquivo de armazenamento confiável (default: |
| Caminho para o arquivo de armazenamento key contendo o certificado do cliente e key privada (necessário para mTLS). |
| Senha para o arquivo de armazenamento key . |
| Senha da key privada no repositório key . |
| Algoritmo de verificação de nome de host. O valor padrão é |
Se você usa SSL, a Databricks recomenda que você:
- Armazene seus certificados em um volume Unity Catalog . Os usuários que têm permissão de leitura do volume podem usar seus certificados Kafka. Para obter mais informações, consulte O que são volumes Unity Catalog ?.
- Armazene as senhas dos seus certificados como segredos em um Escopo Secreto. Para mais informações, veja Gerenciando Escopo Secreto.
O exemplo a seguir usa locais de armazenamento de objetos e segredos de Databricks para habilitar uma conexão SSL:
- Python
- Scala
- SQL
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<bootstrap-server>:9093")
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.truststore.location", <truststore-location>)
.option("kafka.ssl.keystore.location", <keystore-location>)
.option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
.option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<bootstrap-server>:9093")
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.truststore.location", <truststore-location>)
.option("kafka.ssl.keystore.location", <keystore-location>)
.option("kafka.ssl.keystore.password", dbutils.secrets.get(scope = <certificate-scope-name>, key = <keystore-password-key-name>))
.option("kafka.ssl.truststore.password", dbutils.secrets.get(scope = <certificate-scope-name>, key = <truststore-password-key-name>))
SELECT * FROM read_kafka(
bootstrapServers => '<bootstrap-server>:9093',
subscribe => '<topic>',
`kafka.security.protocol` => 'SSL',
`kafka.ssl.truststore.location` => '<truststore-location>',
`kafka.ssl.keystore.location` => '<keystore-location>',
`kafka.ssl.keystore.password` => secret('<certificate-scope-name>', '<keystore-password-key-name>'),
`kafka.ssl.truststore.password` => secret('<certificate-scope-name>', '<truststore-password-key-name>')
);
Use os nomes de classe sombreados do Databricks para Kafka.
Databricks inclui versões proprietárias e personalizadas da biblioteca de clientes Kafka . Todos os nomes de classe do cliente Kafka que você referencia nas opções de configuração de autenticação devem usar o prefixo de nome de classe sombreado em vez do nome de classe padrão de código aberto. Isso se aplica a qualquer classe referenciada em opções como kafka.sasl.jaas.config, kafka.sasl.login.callback.handler.class e kafka.sasl.client.callback.handler.class.
Se você usar nomes de classe não sombreados, seu código gerará um erro RESTRICTED_STREAMING_OPTION_PERMISSION_ENFORCED. Consulte as Perguntas frequentes para obter mais detalhes.
Lidar com possíveis erros
-
falhas de autenticação do IAM
Se você vir
SaslException,Failed to construct kafka consumerou erros de autenticação, verifique:- O ARN IAM role em seu
kafka.sasl.jaas.configestá correto e formatado adequadamente. - A IAM role tem as permissões necessárias para acessar seu cluster MSK (por exemplo,
kafka-cluster:Connect,kafka-cluster:ReadData). - Para configurar o perfil da instance profile , certifique-se de que ele esteja anexado ao cluster e possua permissões MSK.
- Para acesso entreaccount , verifique se a relação de confiança permite que a account Databricks assuma a função.
- O ARN IAM role em seu
-
Problemas de conectividade de rede
Se você vir
TimeoutExceptionou falhas de conexão:- Verifique se o grupo de segurança do cluster MSK permite tráfego de entrada do grupo de segurança compute do Databricks nas portas Kafka (normalmente 9092 para PLAINTEXT, 9094 para SASL/SSL ou 9098 para IAM).
- Certifique-se de que o peering de VPC ou o PrivateLink esteja configurado corretamente entre a VPC do Databricks e a VPC do MSK.
- Confirme se o hostname e a porta
kafka.bootstrap.serversestão corretos.
-
Nenhum registro retornado
Se a autenticação for bem-sucedida, mas nenhum dado for retornado:
- Verifique se você está se inscrevendo no tópico correto.
- O default
startingOffsetsélatest, que apenas lê novos dados. DefinastartingOffsetsaearliestpara ler os dados existentes. - Verifique se sua IAM role tem permissão
kafka-cluster:ReadDatapara o tópico.