Pular para o conteúdo principal

Use o conector Zerobus Ingest

info

Visualização

O conector Zerobus Ingest está em visualização pública. Para experimentar, entre em contato com seu representante account Databricks .

Esta página descreve como ingerir uso de dados no conector de gravação direta Zerobus no LakeFlow Connect.

Obtenha a URL do seu workspace

Ao visualizar seu workspace Databricks após efetuar login, observe a URL no seu navegador com o seguinte formato: https://<databricks-instance>.com/o=XXXXX. A URL consiste em tudo antes de /o=XXXXX, por exemplo:

URL completa: https://abcd-teste2-test-spcse2.cloud.databricks.com/?o=2281745829657864#

URL do espaço de trabalho: https://abcd-teste2-test-spcse2.cloud.databricks.com

Crie ou identifique a tabela de destino

Identifique a tabela de destino que você deseja ingerir. Para criar uma nova tabela de destino, execute o comando SQL CREATE TABLE . Por exemplo:

SQL
    CREATE TABLE unity.default.air_quality (
device_name STRING, temp INT, humidity LONG);

Crie uma entidade de serviço e conceda permissões

Uma entidade de serviço é uma identidade especializada que fornece mais segurança do que uma conta personalizada. Mais informações sobre entidades de serviço e como usá-las para autenticação podem ser encontradas nestas instruções.

  1. Crie uma entidade de serviço no seu workspace em Configurações > Identidade e acesso .

  2. Gere e salve o ID do cliente e o segredo do cliente para a entidade de serviço.

  3. Conceda as permissões necessárias para o catálogo, o esquema e a tabela à entidade de serviço:

    1. Na página entidade de serviço, abra a tab Configurações .
    2. Copie o ID do aplicativo (UUID).
    3. Use o seguinte SQL, substituindo o UUID de exemplo e o nome de usuário, o nome do esquema e os nomes das tabelas, se necessário:
    SQL
    GRANT USE CATALOG ON CATALOG <username> TO `<UUID>`;
    GRANT USE SCHEMA ON SCHEMA <username.default> TO `<UUID>`;
    GRANT MODIFY, SELECT ON TABLE <username.default.table_name> TO `<UUID>`;

Escreva para um cliente

É necessário Python 3.9 ou superior.

  1. Instale o SDK do Python.

    Bash
    pip install databricks-zerobus-ingest-sdk

    Como alternativa, instale a partir da fonte:

    Bash
    git clone https://github.com/databricks/zerobus-sdk-py.git
    cd zerobus-sdk-py
    pip install -e .
  2. Gerar uma definição de Buffer de Protocolo.

    Gere automaticamente uma definição de protótipo a partir do esquema da tabela do Unity Catalog usando a ferramenta generate_proto :

    Bash
    python -m zerobus.tools.generate_proto \
    --uc-endpoint "https://dbc-a1b2c3d4-e5f6.cloud.databricks.com" \
    --client-id "<service-principal-application-id>" \
    --client-secret "<service-principal-secret>" \
    --table "<catalog.schema.table_name>" \
    --output "record.proto"

    Exemplo de saída para uma tabela:

    SQL
    CREATE TABLE main.default.air_quality (
    device_name STRING,
    temp INT,
    humidity BIGINT
    )
    USING DELTA;

    Gera record.proto:

    Proto
    syntax = "proto2";

    message air_quality {
    optional string device_name = 1;
    optional int32 temp = 2;
    optional int64 humidity = 3;
    }

    Você pode aprender mais sobre mensagens Protobuf aqui.

  3. Compile a definição do Protocol Buffer.

    Instale grpcio-tools e compile seu arquivo proto:

    Bash
    pip install "grpcio-tools>=1.60.0,<2.0"
    python -m grpc_tools.protoc --python_out=. --proto_path=. record.proto

    Isso gera um arquivo record_pb2.py contendo a definição do buffer de protocolo compilado.

  4. Crie um cliente Python.

    Para usar o SDK, você precisa das seguintes informações:

    • URL workspace Databricks
    • ID do espaço de trabalho (encontrado na URL do seu workspace após /o=)
    • Nome da tabela
    • entidade de serviço ID do cliente e segredo do cliente
    • endpoint do Zerobus no formato https://<workspace_id>.zerobus.<region>.cloud.databricks.com

    Exemplo síncrono:

    Python
    import logging
    from zerobus.sdk.sync import ZerobusSdk
    from zerobus.sdk.shared import TableProperties
    import record_pb2

    # Configure logging (optional).
    logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
    )

    # Configuration.
    server_endpoint = "1234567890123456.zerobus.us-west-2.cloud.databricks.com"
    workspace_url = "https://dbc-a1b2c3d4-e5f6.cloud.databricks.com"
    table_name = "main.default.air_quality"
    client_id = "your-service-principal-application-id"
    client_secret = "your-service-principal-secret"

    # Initialize SDK.
    sdk = ZerobusSdk(server_endpoint, workspace_url)

    # Configure table properties.
    table_properties = TableProperties(
    table_name,
    record_pb2.AirQuality.DESCRIPTOR
    )

    # Create stream.
    stream = sdk.create_stream(
    client_id,
    client_secret,
    table_properties
    )

    try:
    # Ingest records.
    for i in range(100):
    record = record_pb2.AirQuality(
    device_name=f"sensor-{i % 10}",
    temp=20 + (i % 15),
    humidity=50 + (i % 40)
    )

    ack = stream.ingest_record(record)
    ack.wait_for_ack() # Wait for durability.

    print(f"Ingested record {i + 1}")

    print("Successfully ingested 100 records!")
    finally:
    stream.close()

Opções de configuração

O SDK suporta várias opções de configuração via StreamConfigurationOptions:

Opção

Padrão

Descrição

máximo_registros_de_voo

50000

Número máximo de registros não reconhecidos

recuperação

True

Habilitar recuperação automática de transmissão

tempo_limite_de_recuperação_ms

15000

Tempo limite para operações de recuperação (ms)

recuperação_recuo_ms

2000

Atraso entre tentativas de recuperação (ms)

tentativas_de_recuperação

3

Número máximo de tentativas de recuperação

tempo_limite_de_descarga_ms

300000

Tempo limite para operações de limpeza (ms)

servidor_falta_de_tempo_limite_de_recepção_ms

60000

Tempo limite de confirmação do servidor (ms)

ack_callback

Nenhuma

Retorno de chamada invocado na confirmação do registro

Tratamento de erros

O SDK fornece dois tipos de exceção:

  • ZerobusException: Erros recuperáveis (problemas de rede, falhas temporárias)
  • NonRetriableException: Erros não recuperáveis (credenciais inválidas, tabela ausente)
Python
from zerobus.sdk.shared import ZerobusException, NonRetriableException

try:
stream.ingest_record(record)
except NonRetriableException as e:
print(f"Fatal error: {e}")
raise
except ZerobusException as e:
print(f"Retriable error: {e}")
# Implement retry logic with backoff.