Pular para o conteúdo principal

Utilize o conector Zerobus Ingest.

info

Visualização

O conector Zerobus Ingest está em versão prévia pública. Para experimentar, entre em contato com seu representante account Databricks .

Esta página descreve como ingerir o uso de dados do conector de gravação direta Zerobus no LakeFlow Connect.

Obtenha o URL do seu workspace .

Ao visualizar seu workspace Databricks após fazer login, observe o URL em seu navegador com o seguinte formato: https://<databricks-instance>.com/o=XXXXX. A URL consiste em tudo o que vem antes de /o=XXXXX, por exemplo:

URL completa: https://abcd-teste2-test-spcse2.cloud.databricks.com/?o=2281745829657864#

URL do espaço de trabalho: https://abcd-teste2-test-spcse2.cloud.databricks.com

Criar ou identificar a tabela de destino

Identifique a tabela de destino na qual você deseja inserir os dados. Para criar uma nova tabela de destino, execute o comando SQL CREATE TABLE . Por exemplo:

SQL
    CREATE TABLE unity.default.air_quality (
device_name STRING, temp INT, humidity LONG);

Crie uma entidade de serviço e conceda permissões.

Uma entidade de serviço é uma identidade especializada que oferece mais segurança do que uma conta personalizada. Mais informações sobre entidades de serviço e como utilizá-las para autenticação podem ser encontradas nestas instruções.

  1. Crie uma entidade de serviço em seu workspace em Configurações > Identidade e Acesso .

  2. Gere e salve o ID do cliente e o segredo do cliente para a entidade de serviço.

  3. Conceda as permissões necessárias para o catálogo, o esquema e a tabela à entidade de serviço:

    1. Na página entidade de serviço, abra a tab Configurações .
    2. Copie o ID do aplicativo (UUID).
    3. Utilize o seguinte SQL, substituindo o UUID e o nome de usuário de exemplo, o nome do esquema e os nomes das tabelas, se necessário:
    SQL
    GRANT USE CATALOG ON CATALOG <username> TO `<UUID>`;
    GRANT USE SCHEMA ON SCHEMA <username.default> TO `<UUID>`;
    GRANT MODIFY, SELECT ON TABLE <username.default.table_name> TO `<UUID>`;

Formatos de endpoint

Os formatos de URL endpoint do servidor Zerobus e workspace variam conforme o provedor cloud :

Nuvem

endpointdo servidor

URL do espaço de trabalho

AWS

<workspace-id>.zerobus.<region>.cloud.databricks.com

https://<instance>.cloud.databricks.com

Azure

<workspace-id>.zerobus.<region>.azuredatabricks.net

https://<instance>.azuredatabricks.net

Exemplo para AWS:

Text
Server endpoint: 1234567890123456.zerobus.us-west-2.cloud.databricks.com
Workspace URL: https://dbc-a1b2c3d4-e5f6.cloud.databricks.com

Exemplo para o Azure:

Text
Server endpoint: 1234567890123456.zerobus.eastus.azuredatabricks.net
Workspace URL: https://adb-1234567890123456.12.azuredatabricks.net

Escreva para um cliente

É necessário o Python 3.9 ou superior. O SDK oferece suporte a JSON (mais simples) e Protocol Buffers (recomendado para produção).

Bash
pip install databricks-zerobus-ingest-sdk

Exemplo de JSON:

Python
from zerobus.sdk.sync import ZerobusSdk
from zerobus.sdk.shared import RecordType, StreamConfigurationOptions, TableProperties

# Configuration - see "Before you begin" section for how to obtain these values.
server_endpoint = "1234567890123456.zerobus.us-west-2.cloud.databricks.com"
workspace_url = "https://dbc-a1b2c3d4-e5f6.cloud.databricks.com"
table_name = "main.default.air_quality"
client_id = "your-service-principal-application-id"
client_secret = "your-service-principal-secret"

sdk = ZerobusSdk(server_endpoint, workspace_url)
table_properties = TableProperties(table_name)
options = StreamConfigurationOptions(record_type=RecordType.JSON)

stream = sdk.create_stream(client_id, client_secret, table_properties, options)

try:
for i in range(100):
record = {"device_name": f"sensor-{i}", "temp": 22, "humidity": 55}
ack = stream.ingest_record(record) # Pass dict directly, SDK handles serialization.
ack.wait_for_ack()
finally:
stream.close()

Retorno de chamada de confirmação: Para rastrear o progresso da ingestão de forma assíncrona, use a opção ack_callback . A função de retorno recebe um objeto IngestRecordResponse com um atributo durability_ack_up_to_offset (int) indicando que todos os registros até aquele deslocamento foram gravados de forma permanente:

Python
from zerobus.sdk.shared import IngestRecordResponse

def on_ack(response: IngestRecordResponse) -> None:
print(f"Records acknowledged up to offset: {response.durability_ack_up_to_offset}")

options = StreamConfigurationOptions(
record_type=RecordType.JSON,
ack_callback=on_ack
)

Protocol Buffers: Para ingestão com segurança de tipo, use Protocol Buffers com RecordType.PROTO (default). Gere um esquema a partir da sua tabela usando python -m zerobus.tools.generate_proto.

Para obter documentação completa, opções de configuração, API assíncrona e exemplos do Protocol Buffer, consulte o repositório do SDK Python.