Pular para o conteúdo principal

O que é o modo de notificação de arquivo do Auto Loader?

No modo de notificação de arquivo, o Auto Loader configura automaticamente um serviço de notificação e um serviço de fila que se inscreve nos eventos de arquivo do diretório de entrada. O senhor pode usar notificações de arquivos para escalar o site Auto Loader para ingerir milhões de arquivos por hora. Quando comparado ao modo de listagem de diretórios, o modo de notificação de arquivos é mais eficiente e escalável para grandes diretórios de entrada ou um grande volume de arquivos, mas requer permissões adicionais na nuvem.

Você pode alternar entre notificações de arquivos e listagem de diretórios a qualquer momento e ainda manter garantias de processamento de dados exatamente uma vez.

atenção

A alteração do caminho de origem do Auto Loader não é compatível com o modo de notificação de arquivo. Se o modo de notificação de arquivo for usado e o caminho for alterado, talvez você não consiga ingerir arquivos que já estão presentes no novo diretório no momento da atualização do diretório.

Recurso de nuvem usado no modo de notificação de arquivo Auto Loader

important

Você precisa de permissões elevadas para configurar automaticamente a infraestrutura em nuvem para o modo de notificação de arquivos. Entre em contato com o administrador da nuvem ou com o administrador do site workspace. Veja:

Auto Loader pode configurar automaticamente as notificações de arquivos para o senhor quando definir a opção cloudFiles.useNotifications como true e fornecer as permissões necessárias para criar recursos na nuvem. Além disso, talvez o senhor precise fornecer opções adicionais para conceder autorização ao Auto Loader para criar esses recursos.

A tabela a seguir resume quais recursos são criados pelo site Auto Loader.

Armazenamento em nuvem

inscrição serviço

Fila de serviço

Prefixo *

Limite **

Amazon S3

SNS DA AWS

AWS SQS

ingestão automática de databricks

100 por bucket S3

ADLS

Grade de eventos do Azure

Armazenamento de filas do Azure

Databricks

500 por armazenamento account

GCS

Google Pub/Sub

Google Pub/Sub

ingestão automática de databricks

100 por balde de GCS

Armazenamento de Blobs do Azure

Grade de eventos do Azure

Armazenamento de filas do Azure

Databricks

500 por armazenamento account

  • Auto Loader nomeia o recurso com esse prefixo.

** Quantos pipelines de notificação de arquivos concorrente podem ser iniciados

Se o senhor precisar executar mais do que o número limitado de pipelines de notificação de arquivos para um determinado armazenamento account, poderá fazê-lo:

  • Utilize um serviço como o AWS Lambda, o Azure Functions ou o Google Cloud Functions para distribuir as notificações de uma única fila que escuta um contêiner ou bucket inteiro em filas específicas de diretório.

Eventos de notificação de arquivos

Amazon S3 Fornece um evento ObjectCreated quando um arquivo é carregado em um bucket S3, independentemente de ter sido carregado por um put ou por várias partes upload.

O ADLS fornece diferentes notificações de eventos para arquivos que aparecem no seu contêiner Gen2.

  • O Auto Loader escuta o evento FlushWithClose para processar um arquivo.
  • Auto Loader A transmissão suporta a ação RenameFile para descobrir arquivos. RenameFile exigem uma solicitação de API ao sistema de armazenamento para obter o tamanho do arquivo renomeado.
  • Auto Loader A transmissão criada com o site Databricks Runtime 9.0 e posterior suporta a ação RenameDirectory para descobrir arquivos. RenameDirectory exigem solicitações de API ao sistema de armazenamento para listar o conteúdo do diretório renomeado.

Google Cloud Storage fornece um evento OBJECT_FINALIZE quando um arquivo é carregado, o que inclui sobrescritos e cópias de arquivos. O upload com falha não gera esse evento.

nota

Os provedores de nuvem não garantem a entrega de 100% de todos os eventos de arquivo em condições muito raras e não fornecem SLAs rígidos sobre a latência dos eventos de arquivo. A Databricks recomenda que o senhor acione backfills regulares com o Auto Loader usando a opção cloudFiles.backfillInterval para garantir que todos os arquivos sejam descobertos dentro de um determinado SLA se a integridade dos dados for um requisito. Ativar preenchimentos regulares não causa duplicatas.

Permissões necessárias para configurar a notificação de arquivos para o ADLS e o Azure Blob Storage

Você deve ter permissões de leitura para o diretório de entrada. Consulte Azure Blob Storage.

Para usar o modo de notificação de arquivo, você deve fornecer credenciais de autenticação para configurar e acessar os serviços de notificação de eventos.

Você pode se autenticar usando um dos seguintes métodos:

Depois de obter as credenciais de autenticação, atribua as permissões necessárias ao conector de acesso do Databricks (para credenciais de serviço) ou ao aplicativo Microsoft Entra ID (para uma entidade de serviço).

  • Usando as funções integradas do Azure

    Atribua ao conector de acesso as seguintes funções ao storage account no qual reside o caminho de entrada:

    • Colaborador : Essa função serve para configurar recursos em seu armazenamento account, como filas e inscrição de eventos.
    • Colaborador de dados da fila de armazenamento : Essa função é para executar operações de fila, como recuperar e excluir mensagens das filas. Essa função é necessária somente quando o senhor fornece uma entidade de serviço sem uma string de conexão.

    Atribua a esse conector de acesso a seguinte função para o grupo de recurso relacionado:

    Para obter mais informações, consulte Atribuir funções em Azure usando o portal Azure.

  • Usando uma função personalizada

    Se o senhor estiver preocupado com as permissões excessivas exigidas para as funções anteriores, poderá criar uma função personalizada com pelo menos as seguintes permissões, listadas abaixo no formato JSON da função do Azure:

    JSON
    "permissions": [
    {
    "actions": [
    "Microsoft.EventGrid/eventSubscriptions/write",
    "Microsoft.EventGrid/eventSubscriptions/read",
    "Microsoft.EventGrid/eventSubscriptions/delete",
    "Microsoft.EventGrid/locations/eventSubscriptions/read",
    "Microsoft.Storage/storageAccounts/read",
    "Microsoft.Storage/storageAccounts/write",
    "Microsoft.Storage/storageAccounts/queueServices/read",
    "Microsoft.Storage/storageAccounts/queueServices/write",
    "Microsoft.Storage/storageAccounts/queueServices/queues/write",
    "Microsoft.Storage/storageAccounts/queueServices/queues/read",
    "Microsoft.Storage/storageAccounts/queueServices/queues/delete"
    ],
    "notActions": [],
    "dataActions": [
    "Microsoft.Storage/storageAccounts/queueServices/queues/messages/delete",
    "Microsoft.Storage/storageAccounts/queueServices/queues/messages/read",
    "Microsoft.Storage/storageAccounts/queueServices/queues/messages/write",
    "Microsoft.Storage/storageAccounts/queueServices/queues/messages/process/action"
    ],
    "notDataActions": []
    }
    ]

    Em seguida, você pode atribuir essa função personalizada ao seu conector de acesso.

    Para obter mais informações, consulte Atribuir funções em Azure usando o portal Azure.

Configurações de permissões do Auto Loader

Permissões necessárias para configurar a notificação de arquivos para o Amazon S3

Você deve ter permissões de leitura para o diretório de entrada. Consulte os detalhes da conexão S3 para obter mais detalhes.

Para usar o modo de notificação de arquivo, anexe o seguinte documento de política JSON ao seu usuário ou função IAM. Este IAM role é necessário para criar uma Databricks credencial de serviço para que o senhor Auto Loader possa se autenticar com o. O suporte a credenciais de serviço está disponível em Databricks Runtime 16.2 e acima.

JSON
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "DatabricksAutoLoaderSetup",
"Effect": "Allow",
"Action": [
"s3:GetBucketNotification",
"s3:PutBucketNotification",
"sns:ListSubscriptionsByTopic",
"sns:GetTopicAttributes",
"sns:SetTopicAttributes",
"sns:CreateTopic",
"sns:TagResource",
"sns:Publish",
"sns:Subscribe",
"sqs:CreateQueue",
"sqs:DeleteMessage",
"sqs:ReceiveMessage",
"sqs:SendMessage",
"sqs:GetQueueUrl",
"sqs:GetQueueAttributes",
"sqs:SetQueueAttributes",
"sqs:TagQueue",
"sqs:ChangeMessageVisibility",
"sqs:PurgeQueue"
],
"Resource": [
"arn:aws:s3:::<bucket-name>",
"arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
"arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
]
},
{
"Sid": "DatabricksAutoLoaderList",
"Effect": "Allow",
"Action": ["sqs:ListQueues", "sqs:ListQueueTags", "sns:ListTopics"],
"Resource": "*"
},
{
"Sid": "DatabricksAutoLoaderTeardown",
"Effect": "Allow",
"Action": ["sns:Unsubscribe", "sns:DeleteTopic", "sqs:DeleteQueue"],
"Resource": [
"arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
"arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
]
}
]
}

onde:

  • <bucket-name>: O nome do bucket S3 onde a transmissão lerá os arquivos, por exemplo, auto-logs. Você pode usar * como curinga, por exemplo, databricks-*-logs. Para descobrir o bucket S3 subjacente ao seu caminho DBFS, o senhor pode listar todos os pontos de montagem DBFS em um Notebook executando %fs mounts.
  • <region>: A região do AWS em que o bucket do S3 reside, por exemplo, us-west-2. Se você não quiser especificar a região, use *.
  • <account-number>: O número AWS account que possui o bucket S3, por exemplo, 123456789012. Se o senhor não quiser especificar o número account, use *.

A cadeia de caracteres databricks-auto-ingest-* na especificação do SQS e do SNS ARN é o prefixo do nome que a fonte cloudFiles usa ao criar o serviço SQS e SNS. Como o site Databricks configura o serviço de notificação na execução inicial da transmissão, o senhor pode usar uma política com permissões reduzidas após a execução inicial (por exemplo, interromper a transmissão e depois reiniciá-la).

nota

A política anterior trata apenas das permissões necessárias para configurar o serviço de notificação de arquivos, ou seja, a notificação do bucket S3, o SNS e o serviço SQS, e pressupõe que o senhor já tenha acesso de leitura ao bucket S3. Se o senhor precisar adicionar permissões somente leitura S3, adicione o seguinte à lista Action na declaração DatabricksAutoLoaderSetup no documento JSON:

  • s3:ListBucket
  • s3:GetObject

Permissões reduzidas após a configuração inicial

As permissões de configuração do recurso descritas acima são necessárias apenas durante a execução inicial da transmissão. Após a primeira execução, o senhor pode mudar para a seguinte política de IAM com permissões reduzidas.

important

Com as permissões reduzidas, o senhor não pode começar novas consultas de transmissão ou recriar recursos em caso de falhas (por exemplo, a fila do SQS foi excluída acidentalmente); também não pode usar o gerenciamento de recursos na nuvem API para listar ou remover recursos.

JSON
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "DatabricksAutoLoaderUse",
"Effect": "Allow",
"Action": [
"s3:GetBucketNotification",
"sns:ListSubscriptionsByTopic",
"sns:GetTopicAttributes",
"sns:TagResource",
"sns:Publish",
"sqs:DeleteMessage",
"sqs:ReceiveMessage",
"sqs:SendMessage",
"sqs:GetQueueUrl",
"sqs:GetQueueAttributes",
"sqs:TagQueue",
"sqs:ChangeMessageVisibility",
"sqs:PurgeQueue"
],
"Resource": [
"arn:aws:sqs:<region>:<account-number>:<queue-name>",
"arn:aws:sns:<region>:<account-number>:<topic-name>",
"arn:aws:s3:::<bucket-name>"
]
},
{
"Effect": "Allow",
"Action": ["s3:GetBucketLocation", "s3:ListBucket"],
"Resource": ["arn:aws:s3:::<bucket-name>"]
},
{
"Effect": "Allow",
"Action": ["s3:PutObject", "s3:PutObjectAcl", "s3:GetObject", "s3:DeleteObject"],
"Resource": ["arn:aws:s3:::<bucket-name>/*"]
},
{
"Sid": "DatabricksAutoLoaderListTopics",
"Effect": "Allow",
"Action": ["sqs:ListQueues", "sqs:ListQueueTags", "sns:ListTopics"],
"Resource": "arn:aws:sns:<region>:<account-number>:*"
}
]
}

Ingerir dados com segurança em um AWS diferente account

Auto Loader O senhor pode carregar dados na conta AWS assumindo uma conta IAM role. Depois de definir as credenciais de segurança temporárias criadas por AssumeRole, o senhor pode fazer com que Auto Loader carregue arquivos de nuvem entre contas. Para configurar a conta Auto Loader para cross-AWS, siga o documento: Acessar bucketsaccount S3 cruzados com uma política AssumeRole. Certifique-se de:

  • Verifique se o senhor tem a meta-função AssumeRole atribuída ao clustering.

  • Configure a configuração Spark do clustering para incluir as seguintes propriedades:

    ini
    fs.s3a.credentialsType AssumeRole
    fs.s3a.stsAssumeRole.arn arn:aws:iam::<bucket-owner-acct-id>:role/MyRoleB
    fs.s3a.acl.default BucketOwnerFullControl

Permissões necessárias para configurar a notificação de arquivo para o GCS

O senhor deve ter as permissões list e get no seu bucket GCS e em todos os objetos. Para obter detalhes, consulte a documentação do Google sobre permissões de IAM.

Para usar o modo de notificação de arquivo, o senhor precisa adicionar permissões para o serviçoGCS accounte o serviço account usado para acessar o recurso Google Cloud Pub/Sub.

Adicione a função Pub/Sub Publisher ao serviço GCS account. Isso permite que o site account publique mensagens de notificação de eventos dos seus buckets GCS no Google Cloud Pub/Sub.

Quanto ao serviço account usado para o recurso Google Cloud Pub/Sub, o senhor precisa adicionar as seguintes permissões. Esse serviço account é criado automaticamente quando o Databricks senhor cria uma credencial de serviço. O suporte a credenciais de serviço está disponível em Databricks Runtime 16.2 e acima.

pubsub.subscriptions.consume
pubsub.subscriptions.create
pubsub.subscriptions.delete
pubsub.subscriptions.get
pubsub.subscriptions.list
pubsub.subscriptions.update
pubsub.topics.attachSubscription
pubsub.topics.create
pubsub.topics.delete
pubsub.topics.get
pubsub.topics.list
pubsub.topics.update

Para fazer isso, o senhor pode criar uma função personalizada do IAM com essas permissões ou atribuir funções pré-existentes do GCP para cobrir essas permissões.

Localizar a conta do serviço GCS

No Console do Google Cloud para o projeto correspondente, navegue até Cloud Storage > Settings. A seção "Conta do serviço de armazenamento em nuvem" contém o endereço email do serviço GCS account.

GCS conta de serviços

Criação de uma função personalizada do Google Cloud IAM para notificação de arquivos Mode

No console do Google Cloud para o projeto correspondente, navegue até IAM & Admin > Roles. Em seguida, crie uma função na parte superior ou atualize uma função existente. Na tela de criação ou edição da função, clique em Add Permissions. É exibido um menu no qual você pode adicionar as permissões desejadas à função.

Funções personalizadas do GCP IAM

Configurar ou gerenciar manualmente o recurso de notificação de arquivo

Os usuários privilegiados podem configurar manualmente ou gerenciar o recurso de notificação de arquivo.

  • Configure o serviço de notificação de arquivos manualmente por meio do provedor de nuvem e especifique manualmente o identificador da fila. Consulte Opções de notificação de arquivo para obter mais detalhes.
  • Use Scala APIs para criar ou gerenciar o serviço de notificações e enfileiramento, conforme mostrado no exemplo a seguir:
nota

Você deve ter as permissões apropriadas para configurar ou modificar a infraestrutura em nuvem. Consulte a documentação de permissões do Azure, S3 ou GCS.

Python
# Databricks notebook source
# MAGIC %md ## Python bindings for CloudFiles Resource Managers for all 3 clouds

# COMMAND ----------

#####################################
## Creating a ResourceManager in AWS
#####################################

# Using a Databricks service credential
manager = spark._jvm.com.databricks.sql.CloudFilesAWSResourceManager \
.newManager() \
.option("cloudFiles.region", <region>) \
.option("path", <path-to-specific-bucket-and-folder>) \
.option("databricks.serviceCredential", <service-credential-name>) \
.create()

# Using AWS access key and secret key
manager = spark._jvm.com.databricks.sql.CloudFilesAWSResourceManager \
.newManager() \
.option("cloudFiles.region", <region>) \
.option("cloudFiles.awsAccessKey", <aws-access-key>) \
.option("cloudFiles.awsSecretKey", <aws-secret-key>) \
.option("cloudFiles.roleArn", <role-arn>) \
.option("cloudFiles.roleExternalId", <role-external-id>) \
.option("cloudFiles.roleSessionName", <role-session-name>) \
.option("cloudFiles.stsEndpoint", <sts-endpoint>) \
.option("path", <path-to-specific-bucket-and-folder>) \
.create()

#######################################
## Creating a ResourceManager in Azure
#######################################

# Using a Databricks service credential
manager = spark._jvm.com.databricks.sql.CloudFilesAzureResourceManager \
.newManager() \
.option("cloudFiles.resourceGroup", <resource-group>) \
.option("cloudFiles.subscriptionId", <subscription-id>) \
.option("databricks.serviceCredential", <service-credential-name>) \
.option("path", <path-to-specific-container-and-folder>) \
.create()

# Using an Azure service principal
manager = spark._jvm.com.databricks.sql.CloudFilesAzureResourceManager \
.newManager() \
.option("cloudFiles.connectionString", <connection-string>) \
.option("cloudFiles.resourceGroup", <resource-group>) \
.option("cloudFiles.subscriptionId", <subscription-id>) \
.option("cloudFiles.tenantId", <tenant-id>) \
.option("cloudFiles.clientId", <service-principal-client-id>) \
.option("cloudFiles.clientSecret", <service-principal-client-secret>) \
.option("path", <path-to-specific-container-and-folder>) \
.create()

#######################################
## Creating a ResourceManager in GCP
#######################################

# Using a Databricks service credential
manager = spark._jvm.com.databricks.sql.CloudFilesGCPResourceManager \
.newManager() \
.option("cloudFiles.projectId", <project-id>) \
.option("databricks.serviceCredential", <service-credential-name>) \
.option("path", <path-to-specific-bucket-and-folder>) \
.create()

# Using a Google service account
manager = spark._jvm.com.databricks.sql.CloudFilesGCPResourceManager \
.newManager() \
.option("cloudFiles.projectId", <project-id>) \
.option("cloudFiles.client", <client-id>) \
.option("cloudFiles.clientEmail", <client-email>) \
.option("cloudFiles.privateKey", <private-key>) \
.option("cloudFiles.privateKeyId", <private-key-id>) \
.option("path", <path-to-specific-bucket-and-folder>) \
.create()

# Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)

# List notification services created by <AL>
from pyspark.sql import DataFrame
df = DataFrame(manager.listNotificationServices(), spark)

# Tear down the notification services created for a specific stream ID.
# Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)

Use setUpNotificationServices(<resource-suffix>) para criar uma fila e uma inscrição com o nome <prefix>-<resource-suffix> (o prefixo depende do sistema de armazenamento resumido no recurso Cloud usado no modo de notificação de arquivo Auto Loader). Se houver um recurso existente com o mesmo nome, o Databricks reutilizará o recurso existente em vez de criar um novo. Essa função retorna um identificador de fila que você pode passar para a fonte cloudFiles usando o identificador nas opções de notificação de arquivo. Isso permite que o usuário de origem do cloudFiles tenha menos permissões do que o usuário que cria o recurso.

Forneça a opção "path" para newManager somente se ligar para setUpNotificationServices; ela não é necessária para listNotificationServices ou tearDownNotificationServices. Esse é o mesmo path que o senhor usa ao executar uma consulta de transmissão.

A matriz a seguir indica quais métodos de API são suportados em qual Databricks Runtime para cada tipo de armazenamento:

Armazenamento em nuvem

API de configuração

Lista API

Desmontar a API

Amazon S3

Todas as versões

Todas as versões

Todas as versões

ADLS

Todas as versões

Todas as versões

Todas as versões

GCS

Databricks Runtime 9.1e acima

Databricks Runtime 9.1e acima

Databricks Runtime 9.1e acima

Armazenamento de Blobs do Azure

Todas as versões

Todas as versões

Todas as versões

Limpar o recurso de notificação de eventos criado pelo Auto Loader

Auto Loader não elimina automaticamente o recurso de notificação de arquivo. Para desativar o recurso de notificação de arquivo, o senhor deve usar o gerenciador de recurso na nuvem, conforme mostrado na seção anterior. O senhor também pode excluir esses recursos manualmente usando a UI do provedor de nuvem ou APIs.

Solucionar erros comuns

Esta seção descreve erros comuns ao usar o Auto Loader com o modo de notificação de arquivo e como resolvê-los.

Falha ao criar a inscrição na grade de eventos

Se o senhor vir a seguinte mensagem de erro ao executar o site Auto Loader pela primeira vez, o Event Grid não está registrado como um provedor de recurso no site Azure inscrição.

java.lang.RuntimeException: Failed to create event grid subscription.

Para registrar o Event Grid como um provedor de recursos, faça o seguinte:

  1. No portal Azure, acesse sua inscrição.
  2. Clique em recurso Providers na seção Settings (Configurações).
  3. registro do provedor Microsoft.EventGrid.

Autorização necessária para realizar operações de inscrição no Event Grid

Se o senhor vir a seguinte mensagem de erro ao executar Auto Loader pela primeira vez, confirme se a função Contributor está atribuída à entidade de serviço do Event Grid e ao storage account.

403 Forbidden ... does not have authorization to perform action 'Microsoft.EventGrid/eventSubscriptions/[read|write]' over scope ...

O cliente Event Grid ignora o proxy

Em Databricks Runtime 15.2 e acima, as conexões do Event Grid em Auto Loader usam as configurações de proxy das propriedades do sistema em default. No Databricks Runtime 13.3 LTS, 14.3 LTS e 15.0 a 15.2, o senhor pode configurar manualmente as conexões do Event Grid para usar um proxy, definindo a propriedade Spark Config spark.databricks.cloudFiles.eventGridClient.useSystemProperties true. Consulte Definir propriedades de configuração do Spark em Databricks.