Pular para o conteúdo principal

Utilize o conector Zerobus Ingest.

Esta página descreve como ingerir uso de dados usando o conector Zerobus Ingest no LakeFlow Connect.

Escolha uma interface

O Zerobus Ingest suporta interfaces gRPC, REST e OpenTelemetry (OTLP). Os SDKs fornecem clientes personalizados baseados em gRPC com uma interface amigável para desenvolvedores, permitindo a criação de aplicações de alta complexidade. A interface REST lida com restrições arquitetônicas ao lidar com grandes frotas de dispositivos "verbais". A interface OTLP aceita dados OpenTelemetry padrão sem a necessidade de uma biblioteca personalizada.

  • Os SDKs com "Taxa de Conexão" do gRPC: o gRPC se especializa em alto desempenho de transferência por meio de conexões persistentes. No entanto, cada transmissão aberta conta para suas cotas de simultaneidade.
  • A REST "Taxa de transferência Tax": REST requer um handshake completo para cada atualização, tornando-a sem estado. REST se alinha bem com casos de uso de dispositivos de borda, onde o status é relatado com pouca frequência.
  • OpenTelemetry (OTLP): Se você já usa os SDKs ou coletores do OpenTelemetry, o endpoint OTLP ingere rastreamentos, logs e métricas em tabelas Delta Unity Catalog sem necessidade de integração personalizada. Para obter mais informações, consulte Ingerir dados do OpenTelemetry com o Zerobus Ingest.

Utilize gRPC para transmissão de alto volume, REST para grandes conjuntos de dispositivos de baixa frequência e OTLP para ambientes já instrumentados com OpenTelemetry.

Obtenha o URL do seu workspace e endpointde ingestão do Zerobus.

O URL do seu workspace aparece no navegador quando você log in. Embora o URL completo siga o formato https://<databricks-instance>.gcp.databricks.com/?o=XXXXX, o URL workspace consiste em tudo o que vem antes do /?o=XXXXX. Por exemplo, dado o seguinte URL completo, você pode determinar o URL workspace e o ID workspace .

  • URL completa: https://1234567890123456.0.gcp.databricks.com/?o=1234567890123456#
  • URL do espaço de trabalho: https://1234567890123456.0.gcp.databricks.com
  • ID do espaço de trabalho: 1234567890123456

O endpoint do servidor depende do workspace e da região:

  • endpoint do servidor: <workspace-id>.zerobus.<region>.gcp.databricks.com

Para informações sobre disponibilidade regional, consulte as limitações do conector Zerobus Ingest.

Criar ou identificar a tabela de destino

Identifique a tabela de destino na qual você deseja inserir os dados. Para criar uma nova tabela de destino, execute o comando SQL CREATE TABLE . Por exemplo, crie uma nova tabela chamada unity.default.air_quality.

SQL
    CREATE TABLE unity.default.air_quality (
device_name STRING, temp INT, humidity LONG);
nota

Para a ingestão de dados do OpenTelemetry, as tabelas devem usar esquemas predefinidos para cada tipo de sinal (rastreamentos, logs, métricas). Consulte Criar tabelas de destino no Unity Catalog.

Crie uma entidade de serviço e conceda permissões.

Uma entidade de serviço é uma identidade especializada que oferece mais segurança do que uma conta personalizada. Para obter mais informações sobre entidade de serviço e como usá-las para autenticação, consulte Autorizar acesso de entidade de serviço ao Databricks com OAuth.

  1. Para criar uma entidade de serviço, acesse Configurações > Identidade e Acesso .

  2. Em entidade de serviço , selecione gerenciar .

  3. Clique em Adicionar entidade de serviço .

  4. Na janela Adicionar entidade de serviço , crie uma nova entidade de serviço clicando em Adicionar nova .

  5. Gere e salve o ID do cliente e o segredo do cliente para a entidade de serviço.

  6. Conceda as permissões necessárias para o catálogo, o esquema e a tabela à entidade de serviço.

    1. Na página entidade de serviço , navegue até a tab Configurações .
    2. Copie o ID do aplicativo (UUID).
    3. Utilize o seguinte SQL para conceder permissões, substituindo o UUID de exemplo e os nomes do catálogo, do esquema e das tabelas, se necessário.
    SQL
    GRANT USE CATALOG ON CATALOG <catalog> TO `<UUID>`;
    GRANT USE SCHEMA ON SCHEMA <catalog.schema> TO `<UUID>`;
    GRANT MODIFY, SELECT ON TABLE <catalog.schema.table_name> TO `<UUID>`;
importante

Você deve conceder privilégios MODIFY e SELECT na tabela, mesmo para tabelas com ALL PRIVILEGES concedido.

Escreva para um cliente

Utilize o SDK da Zerobus na sua linguagem de programação preferida ou a API REST para inserir dados na tabela de destino.

É necessário o Python 3.9 ou superior. O SDK utiliza bindings PyO3 para o SDK Rust de alto desempenho, proporcionando uma taxa de transferência até 40 vezes maior do que Python puro e E/S de rede eficiente através do runtime assíncrono do Rust. Ele suporta JSON (mais simples) e Protocol Buffers (recomendado para produção). O SDK também oferece suporte a implementações síncronas e assíncronas, bem como a 3 métodos de ingestão diferentes (baseado em futuro, baseado em deslocamento e "disparar e esquecer").

Bash
pip install databricks-zerobus-ingest-sdk

Exemplo de JSON:

Python
import logging
from zerobus.sdk.sync import ZerobusSdk
from zerobus.sdk.shared import RecordType, StreamConfigurationOptions, TableProperties

# See "Get your workspace URL and Zerobus Ingest endpoint" for information on obtaining these values.
SERVER_ENDPOINT="https://1234567890123456.zerobus.us-east1.gcp.databricks.com"
DATABRICKS_WORKSPACE_URL="https://1234567890123456.0.gcp.databricks.com"
TABLE_NAME="main.default.air_quality"
CLIENT_ID="your-client-id"
CLIENT_SECRET="your-client-secret"

sdk = ZerobusSdk(
SERVER_ENDPOINT,
DATABRICKS_WORKSPACE_URL
)

table_properties = TableProperties(TABLE_NAME)
options = StreamConfigurationOptions(record_type=RecordType.JSON)
stream = sdk.create_stream(CLIENT_ID, CLIENT_SECRET, table_properties, options)

try:
for i in range(1000):
record_dict = {
"device_name": f"sensor-{i}",
"temp": 20 + i % 15,
"humidity": 50 + i % 40
}
offset = stream.ingest_record_offset(record_dict)

# Optional: Wait for durability confirmation
stream.wait_for_offset(offset)
finally:
stream.close()

Retorno de chamada de confirmação: Para rastrear o progresso da ingestão de forma assíncrona, use a opção ack_callback . Passe uma subclasse de AckCallback com os métodos on_ack(offset: int) e on_error(offset: int, error_message: str) , que são chamados quando os registros são reconhecidos ou falham.

Python
from zerobus.sdk.shared import AckCallback, StreamConfigurationOptions, RecordType

class MyAckCallback(AckCallback):
def on_ack(self, offset: int) -> None:
print(f"Record acknowledged at offset: {offset}")

def on_error(self, offset: int, error_message: str) -> None:
print(f"Error at offset {offset}: {error_message}")

options = StreamConfigurationOptions(
record_type = RecordType.JSON,
ack_callback = MyAckCallback()
)

Protocol Buffers: Para ingestão segura de tipos, use Protocol Buffers com RecordType.PROTO (default) e forneça um descriptorProto nas propriedades da tabela.

Para obter documentação completa, opções de configuração, ingestão de lotes e exemplos do Protocol Buffer, consulte o repositório SDK Python.

Próximos passos