Utilize o conector Zerobus Ingest.
Esta página descreve como ingerir uso de dados usando o conector Zerobus Ingest no LakeFlow Connect.
Escolha uma interface
O Zerobus Ingest suporta interfaces gRPC, REST e OpenTelemetry (OTLP). Os SDKs fornecem clientes personalizados baseados em gRPC com uma interface amigável para desenvolvedores, permitindo a criação de aplicações de alta complexidade. A interface REST lida com restrições arquitetônicas ao lidar com grandes frotas de dispositivos "verbais". A interface OTLP aceita dados OpenTelemetry padrão sem a necessidade de uma biblioteca personalizada.
- Os SDKs com "Taxa de Conexão" do gRPC: o gRPC se especializa em alto desempenho de transferência por meio de conexões persistentes. No entanto, cada transmissão aberta conta para suas cotas de simultaneidade.
- A REST "Taxa de transferência Tax": REST requer um handshake completo para cada atualização, tornando-a sem estado. REST se alinha bem com casos de uso de dispositivos de borda, onde o status é relatado com pouca frequência.
- OpenTelemetry (OTLP): Se você já usa os SDKs ou coletores do OpenTelemetry, o endpoint OTLP ingere rastreamentos, logs e métricas em tabelas Delta Unity Catalog sem necessidade de integração personalizada. Para obter mais informações, consulte Ingerir dados do OpenTelemetry com o Zerobus Ingest.
Utilize gRPC para transmissão de alto volume, REST para grandes conjuntos de dispositivos de baixa frequência e OTLP para ambientes já instrumentados com OpenTelemetry.
Obtenha o URL do seu workspace e endpointde ingestão do Zerobus.
O URL do seu workspace aparece no navegador quando você log in. Embora o URL completo siga o formato https://<databricks-instance>.gcp.databricks.com/?o=XXXXX, o URL workspace consiste em tudo o que vem antes do /?o=XXXXX. Por exemplo, dado o seguinte URL completo, você pode determinar o URL workspace e o ID workspace .
- URL completa:
https://1234567890123456.0.gcp.databricks.com/?o=1234567890123456# - URL do espaço de trabalho:
https://1234567890123456.0.gcp.databricks.com - ID do espaço de trabalho:
1234567890123456
O endpoint do servidor depende do workspace e da região:
- endpoint do servidor:
<workspace-id>.zerobus.<region>.gcp.databricks.com
Para informações sobre disponibilidade regional, consulte as limitações do conector Zerobus Ingest.
Criar ou identificar a tabela de destino
Identifique a tabela de destino na qual você deseja inserir os dados. Para criar uma nova tabela de destino, execute o comando SQL CREATE TABLE . Por exemplo, crie uma nova tabela chamada unity.default.air_quality.
CREATE TABLE unity.default.air_quality (
device_name STRING, temp INT, humidity LONG);
Para a ingestão de dados do OpenTelemetry, as tabelas devem usar esquemas predefinidos para cada tipo de sinal (rastreamentos, logs, métricas). Consulte Criar tabelas de destino no Unity Catalog.
Crie uma entidade de serviço e conceda permissões.
Uma entidade de serviço é uma identidade especializada que oferece mais segurança do que uma conta personalizada. Para obter mais informações sobre entidade de serviço e como usá-las para autenticação, consulte Autorizar acesso de entidade de serviço ao Databricks com OAuth.
-
Para criar uma entidade de serviço, acesse Configurações > Identidade e Acesso .
-
Em entidade de serviço , selecione gerenciar .
-
Clique em Adicionar entidade de serviço .
-
Na janela Adicionar entidade de serviço , crie uma nova entidade de serviço clicando em Adicionar nova .
-
Gere e salve o ID do cliente e o segredo do cliente para a entidade de serviço.
-
Conceda as permissões necessárias para o catálogo, o esquema e a tabela à entidade de serviço.
- Na página entidade de serviço , navegue até a tab Configurações .
- Copie o ID do aplicativo (UUID).
- Utilize o seguinte SQL para conceder permissões, substituindo o UUID de exemplo e os nomes do catálogo, do esquema e das tabelas, se necessário.
SQLGRANT USE CATALOG ON CATALOG <catalog> TO `<UUID>`;
GRANT USE SCHEMA ON SCHEMA <catalog.schema> TO `<UUID>`;
GRANT MODIFY, SELECT ON TABLE <catalog.schema.table_name> TO `<UUID>`;
Você deve conceder privilégios MODIFY e SELECT na tabela, mesmo para tabelas com ALL PRIVILEGES concedido.
Escreva para um cliente
Utilize o SDK da Zerobus na sua linguagem de programação preferida ou a API REST para inserir dados na tabela de destino.
- Python SDK
- Rust SDK
- Java SDK
- Go SDK
- TypeScript SDK
- REST API
É necessário o Python 3.9 ou superior. O SDK utiliza bindings PyO3 para o SDK Rust de alto desempenho, proporcionando uma taxa de transferência até 40 vezes maior do que Python puro e E/S de rede eficiente através do runtime assíncrono do Rust. Ele suporta JSON (mais simples) e Protocol Buffers (recomendado para produção). O SDK também oferece suporte a implementações síncronas e assíncronas, bem como a 3 métodos de ingestão diferentes (baseado em futuro, baseado em deslocamento e "disparar e esquecer").
pip install databricks-zerobus-ingest-sdk
Exemplo de JSON:
import logging
from zerobus.sdk.sync import ZerobusSdk
from zerobus.sdk.shared import RecordType, StreamConfigurationOptions, TableProperties
# See "Get your workspace URL and Zerobus Ingest endpoint" for information on obtaining these values.
SERVER_ENDPOINT="https://1234567890123456.zerobus.us-east1.gcp.databricks.com"
DATABRICKS_WORKSPACE_URL="https://1234567890123456.0.gcp.databricks.com"
TABLE_NAME="main.default.air_quality"
CLIENT_ID="your-client-id"
CLIENT_SECRET="your-client-secret"
sdk = ZerobusSdk(
SERVER_ENDPOINT,
DATABRICKS_WORKSPACE_URL
)
table_properties = TableProperties(TABLE_NAME)
options = StreamConfigurationOptions(record_type=RecordType.JSON)
stream = sdk.create_stream(CLIENT_ID, CLIENT_SECRET, table_properties, options)
try:
for i in range(1000):
record_dict = {
"device_name": f"sensor-{i}",
"temp": 20 + i % 15,
"humidity": 50 + i % 40
}
offset = stream.ingest_record_offset(record_dict)
# Optional: Wait for durability confirmation
stream.wait_for_offset(offset)
finally:
stream.close()
Retorno de chamada de confirmação: Para rastrear o progresso da ingestão de forma assíncrona, use a opção ack_callback . Passe uma subclasse de AckCallback com os métodos on_ack(offset: int) e on_error(offset: int, error_message: str) , que são chamados quando os registros são reconhecidos ou falham.
from zerobus.sdk.shared import AckCallback, StreamConfigurationOptions, RecordType
class MyAckCallback(AckCallback):
def on_ack(self, offset: int) -> None:
print(f"Record acknowledged at offset: {offset}")
def on_error(self, offset: int, error_message: str) -> None:
print(f"Error at offset {offset}: {error_message}")
options = StreamConfigurationOptions(
record_type = RecordType.JSON,
ack_callback = MyAckCallback()
)
Protocol Buffers: Para ingestão segura de tipos, use Protocol Buffers com RecordType.PROTO (default) e forneça um descriptorProto nas propriedades da tabela.
Para obter documentação completa, opções de configuração, ingestão de lotes e exemplos do Protocol Buffer, consulte o repositório SDK Python.
É necessário o Rust 1.70 ou superior. O SDK utiliza E/S assíncrona e gRPC para ingestão de alta taxa de transferência e serve como núcleo de todos os outros SDKs. Ele suporta JSON (mais simples) e Protocol Buffers (recomendado para produção).
Primeiro, importe o pacote.
cargo add databricks-zerobus-ingest-sdk
Ou adicione-o ao seu Cargo.toml.
[dependencies]
databricks-zerobus-ingest-sdk = "0.5.0" # Latest version at time of publication
Exemplo de JSON:
use databricks_zerobus_ingest_sdk::{
databricks::zerobus::RecordType, JsonString, StreamConfigurationOptions,
TableProperties, ZerobusSdk,
};
use std::error::Error;
// See "Get your workspace URL and Zerobus Ingest endpoint" for information on obtaining these values.
const DATABRICKS_WORKSPACE_URL: &str = "https://1234567890123456.0.gcp.databricks.com";
const SERVER_ENDPOINT: &str = "https://1234567890123456.zerobus.us-east1.gcp.databricks.com";
const TABLE_NAME: &str = "main.default.air_quality";
const CLIENT_ID: &str = "your-client-id";
const CLIENT_SECRET: &str = "your-client-secret";
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let table_properties = TableProperties {
table_name: TABLE_NAME.to_string(),
descriptor_proto: None,
};
let stream_configuration_options = StreamConfigurationOptions {
max_inflight_requests: 100,
record_type: RecordType::Json,
..Default::default()
};
let sdk_handle = ZerobusSdk::builder()
.endpoint(SERVER_ENDPOINT)
.unity_catalog_url(DATABRICKS_WORKSPACE_URL)
.build()?;
let mut stream = sdk_handle
.create_stream(
table_properties.clone(),
CLIENT_ID.to_string(),
CLIENT_SECRET.to_string(),
Some(stream_configuration_options),
)
.await?;
let offset = stream.ingest_record_offset(
JsonString("{
\"device_name\": \"sensor\",
\"temp\": 22,
\"humidity\": 55}".to_string())).await?;
stream.wait_for_offset(offset).await?;
println!("Record ingested successfully");
stream.close().await?;
println!("Stream closed successfully");
Ok(())
}
Protocol Buffers: Para ingestão segura de tipos, use Protocol Buffers com RecordType::Proto (default) e forneça um descriptor_proto nas propriedades da tabela. Gere os arquivos necessários usando a ferramenta generate_proto e importe-os para o seu projeto.
Para obter documentação completa, opções de configuração, ingestão de lotes, ferramenta generate_proto e exemplos de Protocol Buffer, consulte o repositório SDK Rust.
É necessário o Java 8 ou superior. O SDK utiliza ligações JNI (Java Native Interface) com o SDK Rust de alto desempenho, proporcionando menor latência do que o gRPC Java puro e E/S de rede eficiente através do ambiente de execução assíncrono do Rust. Ele suporta JSON (mais simples) e Protocol Buffers (recomendado para produção).
Maven:
<dependency>
<groupId>com.databricks</groupId>
<artifactId>zerobus-ingest-sdk</artifactId>
<version>0.2.0</version>
</dependency>
Exemplo de JSON:
import com.databricks.zerobus.*;
public class ZerobusClient {
// See "Get your workspace URL and Zerobus Ingest endpoint" for information on obtaining these values.
private static final String SERVER_ENDPOINT =
"https://1234567890123456.zerobus.us-east1.gcp.databricks.com";
private static final String DATABRICKS_WORKSPACE_URL =
"https://1234567890123456.0.gcp.databricks.com";
private static final String TABLE_NAME = "main.default.air_quality";
private static final String CLIENT_ID = "your-client-id";
private static final String CLIENT_SECRET = "your-client-secret";
public static void main(String[] args) throws Exception {
ZerobusSdk sdk = new ZerobusSdk(
SERVER_ENDPOINT,
DATABRICKS_WORKSPACE_URL
);
ZerobusJsonStream stream = sdk.createJsonStream(
TABLE_NAME, CLIENT_ID, CLIENT_SECRET
).join();
try {
long lastOffset = 0;
for (int i = 0; i < 100; i++) {
String record = String.format(
"{\"device_name\": \"sensor-%d\", \"temp\": 22, \"humidity\": 55}", i
);
lastOffset = stream.ingestRecordOffset(record);
}
stream.waitForOffset(lastOffset);
} finally {
stream.close();
}
}
}
Protocol Buffers: Para ingestão com segurança de tipo, use ZerobusProtoStream com createProtoStream(). Gere um esquema a partir da sua tabela usando a ferramenta JAR incluída e, em seguida, compile-o com protoc.
Para obter documentação completa, opções de configuração, ingestão de lotes e exemplos do Protocol Buffer, consulte o repositório SDK Java.
É necessário o Go versão 1.21 ou superior. O SDK envolve o Rust SDK de alto desempenho usando CGO e FFI, fornecendo a mesma taxa de transferência e desempenho. Ele suporta JSON (mais simples) e Protocol Buffers (recomendado para produção).
go get github.com/databricks/zerobus-sdk-go@latest
Exemplo de JSON:
Para simplificar, os erros serão ignorados aqui. Em código de produção, sempre verifique os erros.
package main
import (
"fmt"
zerobus "github.com/databricks/zerobus-sdk-go"
)
// See "Get your workspace URL and Zerobus Ingest endpoint" for information on obtaining these values.
const (
ServerEndpoint = "https://1234567890123456.zerobus.us-east1.gcp.databricks.com"
DatabricksWorkspaceURL = "https://1234567890123456.0.gcp.databricks.com"
TableName = "main.default.air_quality"
ClientID = "your-client-id"
ClientSecret = "your-client-secret"
)
func main() {
sdk, _ := zerobus.NewZerobusSdk(
ServerEndpoint,
DatabricksWorkspaceURL,
)
defer sdk.Free()
options := zerobus.DefaultStreamConfigurationOptions()
options.RecordType = zerobus.RecordTypeJson
stream, _ := sdk.CreateStream(
zerobus.TableProperties{
TableName: TableName,
},
ClientID,
ClientSecret,
options,
)
defer stream.Close()
offset, _ := stream.IngestRecordOffset(`{
"device_name": "sensor-001",
"temp": 20,
"humidity": 60
}`)
_ = stream.WaitForOffset(offset)
fmt.Println("Record ingested successfully")
_ = stream.Close()
fmt.Println("Stream closed successfully")
}
Protocol Buffers: Para ingestão segura de tipos, use Protocol Buffers com RecordTypeProto (default) e forneça um descriptorProto nas propriedades da tabela. Crie um arquivo .proto arquivo correspondente ao esquema da sua tabela e script de execução generate_proto para ajudar você a importar os arquivos para o seu projeto.
Para obter documentação completa, opções de configuração, ingestão de lotes, ferramenta generate_proto e exemplos do Protocol Buffer, consulte o repositório SDK Go.
É necessário o Node.js versão 16 ou superior. O SDK encapsula o SDK Rust de alto desempenho usando vinculações nativas NAPI-RS, fornecendo desempenho nativo com futuros Rust mapeados para Promises JavaScript. Ele suporta JSON (mais simples) e Protocol Buffers (recomendado para produção).
npm install @databricks/zerobus-ingest-sdk
Exemplo de JSON:
import { ZerobusSdk, RecordType } from '@databricks/zerobus-ingest-sdk';
// See "Get your workspace URL and Zerobus Ingest endpoint" for information on obtaining these values.
const SERVER_ENDPOINT = 'https://1234567890123456.zerobus.us-east1.gcp.databricks.com';
const DATABRICKS_WORKSPACE_URL = 'https://1234567890123456.0.gcp.databricks.com';
const TABLE_NAME = 'main.default.air_quality';
const CLIENT_ID = 'your-client-id';
const CLIENT_SECRET = 'your-client-secret';
const sdk = new ZerobusSdk(SERVER_ENDPOINT, DATABRICKS_WORKSPACE_URL);
const stream = await sdk.createStream({ tableName: TABLE_NAME }, CLIENT_ID, CLIENT_SECRET, {
recordType: RecordType.Json,
});
try {
let lastOffset = BigInt(0);
for (let i = 0; i < 100; i++) {
const record = { device_name: `sensor-${i}`, temp: 22, humidity: 55 };
lastOffset = await stream.ingestRecordOffset(record);
}
await stream.waitForOffset(lastOffset);
} finally {
await stream.close();
}
Protocol Buffers: Para ingestão segura de tipos, use Protocol Buffers com RecordType.Proto (default) e forneça um descriptorProto nas propriedades da tabela.
Para obter documentação completa, opções de configuração, ingestão de lotes e exemplos do Protocol Buffer, consulte o repositório SDK do TypeScript.
Visualização
A API REST para o conector Zerobus Ingest está em versão Beta.
A API REST permite que você insira um único registro enviando uma solicitação HTTP POST para o endpoint /zerobus/v1/tables/<table-name>/insert . O próprio registro está incluído no corpo da solicitação e deve estar no formato JSON.
Este exemplo mostra como usar o CURL para enviar dados para o Zerobus Ingest usando a API REST.
Cabeçalhos
A solicitação requer dois cabeçalhos HTTP específicos para autenticar e formatar a solicitação corretamente.
-
Content-Type: application/json- Campo obrigatório para especificar o tipo de conteúdo. Atualmente, JSON é o único formato de mensagem suportado.
-
Authorization: Bearer <token>- Substitua
<token>pelos tokens OAuth que você obteve usando o comando curl fornecido posteriormente.
- Substitua
Obter tokens OAuth : Esses tokens expiram a cada hora e precisam ser renovados. Você pode refresh -los buscando novamente os tokens OAuth .
Preencha os seguintes parâmetros:
$CATALOG,$SCHEMA,$TABLE,$WORKSPACE_ID,$WORKSPACE_URL$DATABRICKS_CLIENT_IDe$DATABRICKS_CLIENT_SECRET- Esses dois parâmetros correspondem ao princípio de serviço que você criou.
authorization_details=$(cat <<EOF
[{
"type": "unity_catalog_privileges",
"privileges": ["USE CATALOG"],
"object_type": "CATALOG",
"object_full_path": "$CATALOG"
},
{
"type": "unity_catalog_privileges",
"privileges": ["USE SCHEMA"],
"object_type": "SCHEMA",
"object_full_path": "$CATALOG.$SCHEMA"
},
{
"type": "unity_catalog_privileges",
"privileges": ["SELECT", "MODIFY"],
"object_type": "TABLE",
"object_full_path": "$CATALOG.$SCHEMA.$TABLE"
}]
EOF
)
export OAUTH_TOKEN=$(curl -X POST \
-u "$DATABRICKS_CLIENT_ID:$DATABRICKS_CLIENT_SECRET" \
-d "grant_type=client_credentials" \
-d "scope=all-apis" \
-d "resource=api://databricks/workspaces/$WORKSPACE_ID/zerobusDirectWriteApi" \
--data-urlencode "authorization_details=$authorization_details" \
"$WORKSPACE_URL/oidc/v1/token" | jq -r '.access_token')
Registro de ingestão:
Preencha os seguintes parâmetros:
-
$ZEROBUS_ENDPOINT- Conforme definido na seção Obtenha o URL do seu workspace e o endpoint de ingestão do Zerobus .
-
$CATALOG,$SCHEMA,$TABLE,$WORKSPACE_ID,$WORKSPACE_URL -
$OAUTH_TOKEN- Isso foi criado no passo anterior.
O corpo da requisição deve ser uma lista de objetos JSON.
curl -X POST \
"$ZEROBUS_ENDPOINT/zerobus/v1/tables/$CATALOG.$SCHEMA.$TABLE/insert" \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $OAUTH_TOKEN" \
-d '[{ "device_name": "device_num_1", "temp": 28, "humidity": 60 },
{ "device_name": "device_num_1", "temp": 28, "humidity": 60 }]'
Se todas as informações forem preenchidas corretamente, você deverá receber uma resposta JSON vazia com um código de status HTTP 200.