Pular para o conteúdo principal

Autenticação

O conector Databricks Kafka suporta múltiplos métodos de autenticação para conexão com o Kafka. Este artigo aborda alguns dos métodos de autenticação mais comuns no Databricks. A lista completa dos métodos de autenticação suportados pode ser encontrada na documentação do Kafka.

Serviço de nuvem Google gerenciado para Apache Kafka

Conecte-se com as credenciais do serviço Unity Catalog.

Databricks suporta credenciais de serviço Unity Catalog para autenticar o acesso ao serviço de nuvem gerenciado do Google para Apache Kafka no Databricks Runtime 18.0 e superiores. Google cloud serviço gerenciado para execução clusters Apache Kafka em uma VPC, portanto sua compute Databricks deve ser executada em uma VPC que possa alcançar o gerenciamento cluster Kafka . compute sem servidor não é suportada porque sua execução ocorre no plano de controle Databricks . Para habilitar a conectividade, configure o peering VPC ou o serviço privado Connect. Consulte VPC Network Peering" ou "Private Service Connect" na documentação do Google Cloud.

Para autenticação, conceda à account do serviço do Google Cloud criada para suas credenciais Unity Catalog a seguinte função no projeto do Google Cloud que contém seu cluster Kafka de gerenciamento:

  • roles/managedkafka.client

Para obter instruções, consulte a seção "Conceder, alterar e revogar acesso" na documentação do Google Cloud.

O exemplo a seguir configura o Kafka como uma fonte usando uma credencial de serviço:

Python
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-hostname>:9092",
"subscribe": "<topic>",
"databricks.serviceCredential": "<service-credential-name>",
# Optional: set this only if Databricks can't infer the scope for your Kafka service.
# "databricks.serviceCredential.scope": "https://www.googleapis.com/auth/cloud-platform",
}

df = spark.read.format("kafka").options(**kafka_options).load()

Observação : ao fornecer uma credencial de serviço do Unity Catalog para o Kafka, não especifique as seguintes opções:

  • kafka.sasl.mechanism
  • kafka.sasl.jaas.config
  • kafka.security.protocol
  • kafka.sasl.client.callback.handler.class
  • kafka.sasl.login.callback.handler.class
  • kafka.sasl.oauthbearer.token.endpoint.url

Conecte-se com as opções da SASL

Você pode autenticar no serviço Google Cloud para Apache Kafka sem usar a opção de origem databricks.serviceCredential especificando as opções SASL/OAUTHBEARER diretamente. Utilize essa abordagem somente se precisar definir explicitamente as opções SASL do Kafka (por exemplo, para usar um manipulador de retorno de chamada específico).

Essa abordagem requer uma credencial de serviço do Unity Catalog:

  • A credencial do serviço da account do serviço do Google Cloud deve ter roles/managedkafka.client no projeto do Google Cloud que contém seu cluster de gerenciamento Kafka .
  • Use o manipulador de retorno de chamada de login fornecido pelo Databricks org.apache.spark.sql.kafka010.CustomGCPOAuthBearerLoginCallbackHandler (não com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler, que não está incluído no Databricks Runtime).

A autenticação com uma conta do Google fora do Unity Catalog não é suportada.

O exemplo a seguir configura o Kafka como uma fonte, especificando diretamente as opções SASL:

Python
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-hostname>:9092",
"subscribe": "<topic>",
"startingOffsets": "earliest",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "OAUTHBEARER",
"kafka.sasl.jaas.config":
"kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;",
"kafka.sasl.login.callback.handler.class":
"org.apache.spark.sql.kafka010.CustomGCPOAuthBearerLoginCallbackHandler",
# Required by the login callback handler:
"kafka.databricks.serviceCredential": "<service-credential-name>",
# Optional:
# "kafka.databricks.serviceCredential.scope": "https://www.googleapis.com/auth/cloud-platform",
}

df = spark.readStream.format("kafka").options(**kafka_options).load()

Nota : Não defina databricks.serviceCredential ao especificar essas opções SASL. Se databricks.serviceCredential estiver definido, o Databricks configura a autenticação Kafka automaticamente e não permite especificar as opções kafka.sasl.* .

Use SSL para conectar o Databricks ao Kafka.

Para habilitar conexões SSL com o Kafka, siga as instruções na documentação da Confluent sobre Criptografia e Autenticação com SSL. Você pode fornecer as configurações descritas lá, prefixadas com kafka., como opções. Por exemplo, a localização do armazenamento de confiança seria especificada com a propriedade kafka.ssl.truststore.location.

Se você for usar SSL, a Databricks recomenda que você:

  • Armazene seus certificados em um volumeUnity Catalog. Os usuários que tiverem permissão de leitura do volume poderão usar seus certificados Kafka.
  • Armazene as senhas dos seus certificados como segredos em um Escopo Secreto.

O exemplo a seguir usa locais de armazenamento de objetos e segredos de Databricks para habilitar uma conexão SSL:

Python
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", ...)
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.truststore.location", <truststore-location>)
.option("kafka.ssl.keystore.location", <keystore-location>)
.option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
.option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)

Lidar com possíveis erros

  • Exceção missing gcp_options ao autenticar

    Essa exceção é lançada se o manipulador de retorno de chamada não conseguir inferir o escopo a partir da URL de inicialização. Defina databricks.serviceCredential.scope manualmente. Para a maioria dos casos de uso, defina-o como https://www.googleapis.com/auth/cloud-platform.

  • Não foram encontradas URLs de inicialização que pudessem ser resolvidas.

    Isso significa que o cluster compute não consegue resolver o hostname de inicialização. Verifique a configuração da sua VPC para garantir que o cluster compute consiga alcançar o cluster Kafka gerenciado. Configure o peering VPC ou o serviço privado Connect conforme necessário.

  • Problemas de permissão

    • Certifique-se de que a account de serviço tenha a vinculação IAM role roles/managedkafka.client no projeto ao qual o cluster Kafka pertence.
    • Certifique-se de que o cluster Kafka tenha as ACLs corretas definidas para o tópico e a conta de serviço. Consulte a seção "Controle de acesso com ACLs Kafka na documentação do Google Cloud.
  • Nenhum registro retornado

    Se a autenticação for bem-sucedida, mas nenhum dado for retornado:

    • Verifique se você está se inscrevendo no tópico correto.
    • O default startingOffsets é latest, que apenas lê novos dados. Defina startingOffsets a earliest para ler os dados existentes.