Pular para o conteúdo principal

Utilize o conector Zerobus Ingest.

Esta página descreve como ingerir uso de dados usando o conector Zerobus Ingest no LakeFlow Connect.

Escolha uma interface

O Zerobus Ingest suporta interfaces gRPC e REST. Os SDKs fornecem clientes personalizados baseados em gRPC com uma interface amigável para desenvolvedores, permitindo a criação de aplicações de alta complexidade. A interface REST lida com restrições arquitetônicas ao lidar com grandes frotas de dispositivos "verbais".

  • Os SDKs com "Taxa de Conexão" do gRPC: o gRPC se especializa em alto desempenho de transferência por meio de conexões persistentes. No entanto, cada transmissão aberta conta para suas cotas de simultaneidade.
  • A REST "Taxa de transferência Tax": REST requer um handshake completo para cada atualização, tornando-a sem estado. REST se alinha bem com casos de uso de dispositivos de borda, onde o status é relatado com pouca frequência.

Utilize gRPC para transmissão de alto volume e REST para grandes conjuntos de dispositivos com baixa frequência ou para idiomas não suportados.

Obtenha o URL do seu workspace e endpointde ingestão do Zerobus.

O URL do seu workspace aparece no navegador quando você log in. Embora o URL completo siga o formato https://<databricks-instance>.com/o=XXXXX, o URL workspace consiste em tudo o que vem antes do /o=XXXXX. Por exemplo, dado o seguinte URL completo, você pode determinar o URL workspace e o ID workspace .

  • URL completa: https://abcd-teste2-test-spcse2.cloud.databricks.com/?o=2281745829657864#
  • URL do espaço de trabalho: https://abcd-teste2-test-spcse2.cloud.databricks.com
  • ID do espaço de trabalho: 2281745829657864

O endpoint do servidor depende do workspace e da região:

  • endpoint do servidor: <workspace-id>.zerobus.<region>.cloud.databricks.com

Para informações sobre disponibilidade regional, consulte as limitações do conector Zerobus Ingest.

Criar ou identificar a tabela de destino

Identifique a tabela de destino na qual você deseja inserir os dados. Para criar uma nova tabela de destino, execute o comando SQL CREATE TABLE . Por exemplo, crie uma nova tabela chamada unity.default.air_quality.

SQL
    CREATE TABLE unity.default.air_quality (
device_name STRING, temp INT, humidity LONG);

Crie uma entidade de serviço e conceda permissões.

Uma entidade de serviço é uma identidade especializada que oferece mais segurança do que uma conta personalizada. Para obter mais informações sobre entidade de serviço e como usá-las para autenticação, consulte Autorizar acesso de entidade de serviço ao Databricks com OAuth.

  1. Para criar uma entidade de serviço, acesse Configurações > Identidade e Acesso .

  2. Em entidade de serviço , selecione gerenciar .

  3. Clique em Adicionar entidade de serviço .

  4. Na janela Adicionar entidade de serviço , crie uma nova entidade de serviço clicando em Adicionar nova .

  5. Gere e salve o ID do cliente e o segredo do cliente para a entidade de serviço.

  6. Conceda as permissões necessárias para o catálogo, o esquema e a tabela à entidade de serviço.

    1. Na página entidade de serviço , navegue até a tab Configurações .
    2. Copie o ID do aplicativo (UUID).
    3. Utilize o seguinte SQL para conceder permissões, substituindo o UUID de exemplo e os nomes do catálogo, do esquema e das tabelas, se necessário.
    SQL
    GRANT USE CATALOG ON CATALOG <catalog> TO `<UUID>`;
    GRANT USE SCHEMA ON SCHEMA <catalog.schema> TO `<UUID>`;
    GRANT MODIFY, SELECT ON TABLE <catalog.schema.table_name> TO `<UUID>`;
importante

Você deve conceder privilégios MODIFY e SELECT na tabela, mesmo para tabelas com ALL PRIVILEGES concedido.

Escreva para um cliente

Utilize o SDK da Zerobus na sua linguagem de programação preferida ou a API REST para inserir dados na tabela de destino.

É necessário o Python 3.9 ou superior. O SDK utiliza bindings PyO3 para o SDK Rust de alto desempenho, proporcionando uma taxa de transferência até 40 vezes maior do que Python puro e E/S de rede eficiente através do runtime assíncrono do Rust. Ele suporta JSON (mais simples) e Protocol Buffers (recomendado para produção). O SDK também oferece suporte a implementações síncronas e assíncronas, bem como a 3 métodos de ingestão diferentes (baseado em futuro, baseado em deslocamento e "disparar e esquecer").

Bash
pip install databricks-zerobus-ingest-sdk

Exemplo de JSON:

Python
import logging
from zerobus.sdk.sync import ZerobusSdk
from zerobus.sdk.shared import RecordType, StreamConfigurationOptions, TableProperties

# See "Get your workspace URL and Zerobus Ingest endpoint" for information on obtaining these values.
SERVER_ENDPOINT="https://1234567890123456.zerobus.us-west-2.cloud.databricks.com"
DATABRICKS_WORKSPACE_URL="https://dbc-a1b2c3d4-e5f6.cloud.databricks.com"
TABLE_NAME="main.default.air_quality"
CLIENT_ID="your-client-id"
CLIENT_SECRET="your-client-secret"

sdk = ZerobusSdk(
SERVER_ENDPOINT,
DATABRICKS_WORKSPACE_URL
)

table_properties = TableProperties(TABLE_NAME)
options = StreamConfigurationOptions(record_type=RecordType.JSON)
stream = sdk.create_stream(CLIENT_ID, CLIENT_SECRET, table_properties, options)

try:
for i in range(1000):
record_dict = {
"device_name": f"sensor-{i}",
"temp": 20 + i % 15,
"humidity": 50 + i % 40
}
offset = stream.ingest_record_offset(record_dict)

# Optional: Wait for durability confirmation
stream.wait_for_offset(offset)
finally:
stream.close()

Retorno de chamada de confirmação: Para rastrear o progresso da ingestão de forma assíncrona, use a opção ack_callback . Passe uma subclasse de AckCallback com os métodos on_ack(offset: int) e on_error(offset: int, error_message: str) , que são chamados quando os registros são reconhecidos ou falham.

Python
from zerobus.sdk.shared import AckCallback, StreamConfigurationOptions, RecordType

class MyAckCallback(AckCallback):
def on_ack(self, offset: int) -> None:
print(f"Record acknowledged at offset: {offset}")

def on_error(self, offset: int, error_message: str) -> None:
print(f"Error at offset {offset}: {error_message}")

options = StreamConfigurationOptions(
record_type = RecordType.JSON,
ack_callback = MyAckCallback()
)

Protocol Buffers: Para ingestão segura de tipos, use Protocol Buffers com RecordType.PROTO (default) e forneça um descriptorProto nas propriedades da tabela.

Para obter documentação completa, opções de configuração, ingestão de lotes e exemplos do Protocol Buffer, consulte o repositório SDK Python.

Próximos passos