Autenticação
O conector Databricks Kafka suporta múltiplos métodos de autenticação para conexão com o Kafka. Este artigo aborda alguns dos métodos de autenticação mais comuns no Databricks. A lista completa dos métodos de autenticação suportados pode ser encontrada na documentação do Kafka.
Amazon gerenciando transmissão para Kafka com IAM
Você pode se conectar ao Amazon Gerenciamento de Transmissões para Kafka (MSK) a partir do Databricks usando autenticação baseada em IAM. Para obter instruções de configuração do MSK, consulte Configuração do Amazon MSK.
As configurações a seguir são necessárias apenas se você estiver usando o IAM para se conectar ao MSK. Você também pode configurar conexões com o MSK usando as opções fornecidas pelo conector Apache Spark Kafka.
Conecte-se com as credenciais do serviço Unity Catalog.
Desde o lançamento do Databricks Runtime 16.1, Databricks oferece suporte a credenciais de serviço Unity Catalog para autenticar o acesso ao AWS Gerenciar Transmissões para Apache Kafka (MSK). Databricks recomenda essa abordagem, especialmente ao executar transmissões Kafka em clusters compartilhados ou compute serverless .
Para usar uma credencial de serviço Unity Catalog para autenticação, execute os seguintes passos:
-
Crie uma nova credencial de serviço do Unity Catalog. Se você não estiver familiarizado com esse processo, consulte Criar credenciais de serviço para obter instruções.
- Certifique-se de que a IAM role associada às suas credenciais de serviço tenha as permissões necessárias para se conectar ao seu cluster MSK.
-
Forneça o nome da sua credencial de serviço do Unity Catalog como uma opção de origem na sua configuração do Kafka. Defina a opção
databricks.serviceCredentialcom o nome da sua credencial de serviço.
- Python
- Scala
- SQL
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-hostname>:9092",
"subscribe": "<topic>",
"databricks.serviceCredential": "<service-credential-name>",
}
df = spark.readStream.format("kafka").options(**kafka_options).load()
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> "<bootstrap-hostname>:9092",
"subscribe" -> "<topic>",
"databricks.serviceCredential" -> "<service-credential-name>",
)
val df = spark.readStream.format("kafka").options(kafkaOptions).load()
SELECT * FROM read_kafka(
bootstrapServers => '<bootstrap-hostname>:9092',
subscribe => '<topic>',
serviceCredential => '<service-credential-name>'
);
Observação : ao usar uma credencial de serviço do Unity Catalog para se conectar ao Kafka, as seguintes opções não serão mais necessárias:
kafka.sasl.mechanismkafka.sasl.jaas.configkafka.security.protocolkafka.sasl.client.callback.handler.class
Conectar com o perfil da instância
Você pode usar um instance profile para autenticar em clusters Amazon MSK que tenham a autenticação IAM habilitada. Para obter informações detalhadas sobre como configurar o perfil da instância, consulte Perfil da instância.
Para conectar-se ao MSK usando um instance profile, configure as seguintes opções:
- Python
- Scala
"kafka.sasl.mechanism": "AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"shadedmskiam.software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.client.callback.handler.class":
"shadedmskiam.software.amazon.msk.auth.iam.IAMClientCallbackHandler"
"kafka.sasl.mechanism" -> "AWS_MSK_IAM",
"kafka.sasl.jaas.config" ->
"shadedmskiam.software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.client.callback.handler.class" ->
"shadedmskiam.software.amazon.msk.auth.iam.IAMClientCallbackHandler"
Conectar-se com usuários/funções do IAM
Opcionalmente, você pode configurar sua conexão com o MSK usando um usuário ou IAM role IAM vez de um instance profile. Para fazer isso, você deve fornecer valores para sua key de acesso AWS e chave secreta usando as variáveis de ambiente AWS_ACCESS_KEY_ID e AWS_SECRET_ACCESS_KEY. Consulte Usar um segredo em uma propriedade de configuração ou variável de ambiente do Spark.
Além disso, se você optar por configurar sua conexão usando uma IAM role, você deve modificar o valor fornecido para kafka.sasl.jaas.config para incluir o ARN da função, como no exemplo a seguir.
- Python
- Scala
"kafka.sasl.mechanism": "AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"shadedmskiam.software.amazon.msk.auth.iam.IAMLoginModule required awsRoleArn='arn:aws:iam::123456789012:role/msk_client_role'",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.client.callback.handler.class":
"shadedmskiam.software.amazon.msk.auth.iam.IAMClientCallbackHandler"
"kafka.sasl.mechanism" -> "AWS_MSK_IAM",
"kafka.sasl.jaas.config" ->
"shadedmskiam.software.amazon.msk.auth.iam.IAMLoginModule required awsRoleArn='arn:aws:iam::123456789012:role/msk_client_role'",
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.client.callback.handler.class" ->
"shadedmskiam.software.amazon.msk.auth.iam.IAMClientCallbackHandler"
Use SSL para conectar o Databricks ao Kafka.
Para habilitar conexões SSL com o Kafka, siga as instruções na documentação da Confluent sobre Criptografia e Autenticação com SSL. Você pode fornecer as configurações descritas lá, prefixadas com kafka., como opções. Por exemplo, a localização do armazenamento de confiança seria especificada com a propriedade kafka.ssl.truststore.location.
Se você for usar SSL, a Databricks recomenda que você:
- Armazene seus certificados em um volumeUnity Catalog. Os usuários que tiverem permissão de leitura do volume poderão usar seus certificados Kafka.
- Armazene as senhas dos seus certificados como segredos em um Escopo Secreto.
O exemplo a seguir usa locais de armazenamento de objetos e segredos de Databricks para habilitar uma conexão SSL:
- Python
- Scala
- SQL
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", ...)
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.truststore.location", <truststore-location>)
.option("kafka.ssl.keystore.location", <keystore-location>)
.option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
.option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", ...)
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.truststore.location", <truststore-location>)
.option("kafka.ssl.keystore.location", <keystore-location>)
.option("kafka.ssl.keystore.password", dbutils.secrets.get(scope = <certificate-scope-name>, key = <keystore-password-key-name>))
.option("kafka.ssl.truststore.password", dbutils.secrets.get(scope = <certificate-scope-name>, key = <truststore-password-key-name>))
SELECT * FROM read_kafka(
bootstrapServers => '<bootstrap-server>:9092',
subscribe => '<topic>',
`kafka.security.protocol` => 'SSL',
`kafka.ssl.truststore.location` => '<truststore-location>',
`kafka.ssl.keystore.location` => '<keystore-location>',
`kafka.ssl.keystore.password` => secret('<certificate-scope-name>', '<keystore-password-key-name>'),
`kafka.ssl.truststore.password` => secret('<certificate-scope-name>', '<truststore-password-key-name>')
);
Lidar com possíveis erros
-
falhas de autenticação do IAM
Se você vir
SaslException,Failed to construct kafka consumerou erros de autenticação, verifique:- O ARN IAM role em seu
kafka.sasl.jaas.configestá correto e formatado adequadamente. - A IAM role tem as permissões necessárias para acessar seu cluster MSK (por exemplo,
kafka-cluster:Connect,kafka-cluster:ReadData). - Para configurar o perfil da instance profile , certifique-se de que ele esteja anexado ao cluster e possua permissões MSK.
- Para acesso entreaccount , verifique se a relação de confiança permite que a account Databricks assuma a função.
- O ARN IAM role em seu
-
Problemas de conectividade de rede
Se você vir
TimeoutExceptionou falhas de conexão:- Verifique se o grupo de segurança do cluster MSK permite tráfego de entrada do grupo de segurança compute do Databricks nas portas Kafka (normalmente 9092 para PLAINTEXT, 9094 para SASL/SSL ou 9098 para IAM).
- Certifique-se de que o peering de VPC ou o PrivateLink esteja configurado corretamente entre a VPC do Databricks e a VPC do MSK.
- Confirme se o hostname e a porta
kafka.bootstrap.serversestão corretos.
-
Nenhum registro retornado
Se a autenticação for bem-sucedida, mas nenhum dado for retornado:
- Verifique se você está se inscrevendo no tópico correto.
- O default
startingOffsetsélatest, que apenas lê novos dados. DefinastartingOffsetsaearliestpara ler os dados existentes. - Verifique se sua IAM role tem permissão
kafka-cluster:ReadDatapara o tópico.