Utilize o conector Zerobus Ingest.
Esta página descreve como ingerir uso de dados usando o conector Zerobus Ingest no LakeFlow Connect.
Escolha uma interface
O Zerobus Ingest suporta interfaces gRPC e REST. Os SDKs fornecem clientes personalizados baseados em gRPC com uma interface amigável para desenvolvedores, permitindo a criação de aplicações de alta complexidade. A interface REST lida com restrições arquitetônicas ao lidar com grandes frotas de dispositivos "verbais".
- Os SDKs com "Taxa de Conexão" do gRPC: o gRPC se especializa em alto desempenho de transferência por meio de conexões persistentes. No entanto, cada transmissão aberta conta para suas cotas de simultaneidade.
- A REST "Taxa de transferência Tax": REST requer um handshake completo para cada atualização, tornando-a sem estado. REST se alinha bem com casos de uso de dispositivos de borda, onde o status é relatado com pouca frequência.
Utilize gRPC para transmissão de alto volume e REST para grandes conjuntos de dispositivos com baixa frequência ou para idiomas não suportados.
Obtenha o URL do seu workspace e endpointde ingestão do Zerobus.
O URL do seu workspace aparece no navegador quando você log in. Embora o URL completo siga o formato https://<databricks-instance>.com/o=XXXXX, o URL workspace consiste em tudo o que vem antes do /o=XXXXX. Por exemplo, dado o seguinte URL completo, você pode determinar o URL workspace e o ID workspace .
- URL completa:
https://abcd-teste2-test-spcse2.cloud.databricks.com/?o=2281745829657864# - URL do espaço de trabalho:
https://abcd-teste2-test-spcse2.cloud.databricks.com - ID do espaço de trabalho:
2281745829657864
O endpoint do servidor depende do workspace e da região:
- endpoint do servidor:
<workspace-id>.zerobus.<region>.cloud.databricks.com
Para informações sobre disponibilidade regional, consulte as limitações do conector Zerobus Ingest.
Criar ou identificar a tabela de destino
Identifique a tabela de destino na qual você deseja inserir os dados. Para criar uma nova tabela de destino, execute o comando SQL CREATE TABLE . Por exemplo, crie uma nova tabela chamada unity.default.air_quality.
CREATE TABLE unity.default.air_quality (
device_name STRING, temp INT, humidity LONG);
Crie uma entidade de serviço e conceda permissões.
Uma entidade de serviço é uma identidade especializada que oferece mais segurança do que uma conta personalizada. Para obter mais informações sobre entidade de serviço e como usá-las para autenticação, consulte Autorizar acesso de entidade de serviço ao Databricks com OAuth.
-
Para criar uma entidade de serviço, acesse Configurações > Identidade e Acesso .
-
Em entidade de serviço , selecione gerenciar .
-
Clique em Adicionar entidade de serviço .
-
Na janela Adicionar entidade de serviço , crie uma nova entidade de serviço clicando em Adicionar nova .
-
Gere e salve o ID do cliente e o segredo do cliente para a entidade de serviço.
-
Conceda as permissões necessárias para o catálogo, o esquema e a tabela à entidade de serviço.
- Na página entidade de serviço , navegue até a tab Configurações .
- Copie o ID do aplicativo (UUID).
- Utilize o seguinte SQL para conceder permissões, substituindo o UUID de exemplo e os nomes do catálogo, do esquema e das tabelas, se necessário.
SQLGRANT USE CATALOG ON CATALOG <catalog> TO `<UUID>`;
GRANT USE SCHEMA ON SCHEMA <catalog.schema> TO `<UUID>`;
GRANT MODIFY, SELECT ON TABLE <catalog.schema.table_name> TO `<UUID>`;
Você deve conceder privilégios MODIFY e SELECT na tabela, mesmo para tabelas com ALL PRIVILEGES concedido.
Escreva para um cliente
Utilize o SDK da Zerobus na sua linguagem de programação preferida ou a API REST para inserir dados na tabela de destino.
- Python SDK
- Rust SDK
- Java SDK
- Go SDK
- TypeScript SDK
- REST API
É necessário o Python 3.9 ou superior. O SDK utiliza bindings PyO3 para o SDK Rust de alto desempenho, proporcionando uma taxa de transferência até 40 vezes maior do que Python puro e E/S de rede eficiente através do runtime assíncrono do Rust. Ele suporta JSON (mais simples) e Protocol Buffers (recomendado para produção). O SDK também oferece suporte a implementações síncronas e assíncronas, bem como a 3 métodos de ingestão diferentes (baseado em futuro, baseado em deslocamento e "disparar e esquecer").
pip install databricks-zerobus-ingest-sdk
Exemplo de JSON:
import logging
from zerobus.sdk.sync import ZerobusSdk
from zerobus.sdk.shared import RecordType, StreamConfigurationOptions, TableProperties
# See "Get your workspace URL and Zerobus Ingest endpoint" for information on obtaining these values.
SERVER_ENDPOINT="https://1234567890123456.zerobus.us-west-2.cloud.databricks.com"
DATABRICKS_WORKSPACE_URL="https://dbc-a1b2c3d4-e5f6.cloud.databricks.com"
TABLE_NAME="main.default.air_quality"
CLIENT_ID="your-client-id"
CLIENT_SECRET="your-client-secret"
sdk = ZerobusSdk(
SERVER_ENDPOINT,
DATABRICKS_WORKSPACE_URL
)
table_properties = TableProperties(TABLE_NAME)
options = StreamConfigurationOptions(record_type=RecordType.JSON)
stream = sdk.create_stream(CLIENT_ID, CLIENT_SECRET, table_properties, options)
try:
for i in range(1000):
record_dict = {
"device_name": f"sensor-{i}",
"temp": 20 + i % 15,
"humidity": 50 + i % 40
}
offset = stream.ingest_record_offset(record_dict)
# Optional: Wait for durability confirmation
stream.wait_for_offset(offset)
finally:
stream.close()
Retorno de chamada de confirmação: Para rastrear o progresso da ingestão de forma assíncrona, use a opção ack_callback . Passe uma subclasse de AckCallback com os métodos on_ack(offset: int) e on_error(offset: int, error_message: str) , que são chamados quando os registros são reconhecidos ou falham.
from zerobus.sdk.shared import AckCallback, StreamConfigurationOptions, RecordType
class MyAckCallback(AckCallback):
def on_ack(self, offset: int) -> None:
print(f"Record acknowledged at offset: {offset}")
def on_error(self, offset: int, error_message: str) -> None:
print(f"Error at offset {offset}: {error_message}")
options = StreamConfigurationOptions(
record_type = RecordType.JSON,
ack_callback = MyAckCallback()
)
Protocol Buffers: Para ingestão segura de tipos, use Protocol Buffers com RecordType.PROTO (default) e forneça um descriptorProto nas propriedades da tabela.
Para obter documentação completa, opções de configuração, ingestão de lotes e exemplos do Protocol Buffer, consulte o repositório SDK Python.
É necessário o Rust 1.70 ou superior. O SDK utiliza E/S assíncrona e gRPC para ingestão de alta taxa de transferência e serve como núcleo de todos os outros SDKs. Ele suporta JSON (mais simples) e Protocol Buffers (recomendado para produção).
Primeiro, importe o pacote.
cargo add databricks-zerobus-ingest-sdk
Ou adicione-o ao seu Cargo.toml.
[dependencies]
databricks-zerobus-ingest-sdk = "0.5.0" # Latest version at time of publication
Exemplo de JSON:
use databricks_zerobus_ingest_sdk::{
databricks::zerobus::RecordType, JsonString, StreamConfigurationOptions,
TableProperties, ZerobusSdk,
};
use std::error::Error;
// See "Get your workspace URL and Zerobus Ingest endpoint" for information on obtaining these values.
const DATABRICKS_WORKSPACE_URL: &str = "https://dbc-a1b2c3d4-e5f6.cloud.databricks.com";
const SERVER_ENDPOINT: &str = "https://1234567890123456.zerobus.us-west-2.cloud.databricks.com";
const TABLE_NAME: &str = "main.default.air_quality";
const CLIENT_ID: &str = "your-client-id";
const CLIENT_SECRET: &str = "your-client-secret";
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let table_properties = TableProperties {
table_name: TABLE_NAME.to_string(),
descriptor_proto: None,
};
let stream_configuration_options = StreamConfigurationOptions {
max_inflight_requests: 100,
record_type: RecordType::Json,
..Default::default()
};
let sdk_handle = ZerobusSdk::builder()
.endpoint(SERVER_ENDPOINT)
.unity_catalog_url(DATABRICKS_WORKSPACE_URL)
.build()?;
let mut stream = sdk_handle
.create_stream(
table_properties.clone(),
CLIENT_ID.to_string(),
CLIENT_SECRET.to_string(),
Some(stream_configuration_options),
)
.await?;
let offset = stream.ingest_record_offset(
JsonString("{
\"device_name\": \"sensor\",
\"temp\": 22,
\"humidity\": 55}".to_string())).await?;
stream.wait_for_offset(offset).await?;
println!("Record ingested successfully");
stream.close().await?;
println!("Stream closed successfully");
Ok(())
}
Protocol Buffers: Para ingestão segura de tipos, use Protocol Buffers com RecordType::Proto (default) e forneça um descriptor_proto nas propriedades da tabela. Gere os arquivos necessários usando a ferramenta generate_proto e importe-os para o seu projeto.
Para obter documentação completa, opções de configuração, ingestão de lotes, ferramenta generate_proto e exemplos de Protocol Buffer, consulte o repositório SDK Rust.
É necessário o Java 8 ou superior. O SDK utiliza ligações JNI (Java Native Interface) com o SDK Rust de alto desempenho, proporcionando menor latência do que o gRPC Java puro e E/S de rede eficiente através do ambiente de execução assíncrono do Rust. Ele suporta JSON (mais simples) e Protocol Buffers (recomendado para produção).
Maven:
<dependency>
<groupId>com.databricks</groupId>
<artifactId>zerobus-ingest-sdk</artifactId>
<version>0.2.0</version>
</dependency>
Exemplo de JSON:
import com.databricks.zerobus.*;
public class ZerobusClient {
// See "Get your workspace URL and Zerobus Ingest endpoint" for information on obtaining these values.
private static final String SERVER_ENDPOINT =
"https://1234567890123456.zerobus.us-west-2.cloud.databricks.com";
private static final String DATABRICKS_WORKSPACE_URL =
"https://dbc-a1b2c3d4-e5f6.cloud.databricks.com";
private static final String TABLE_NAME = "main.default.air_quality";
private static final String CLIENT_ID = "your-client-id";
private static final String CLIENT_SECRET = "your-client-secret";
public static void main(String[] args) throws Exception {
ZerobusSdk sdk = new ZerobusSdk(
SERVER_ENDPOINT,
DATABRICKS_WORKSPACE_URL
);
ZerobusJsonStream stream = sdk.createJsonStream(
TABLE_NAME, CLIENT_ID, CLIENT_SECRET
).join();
try {
long lastOffset = 0;
for (int i = 0; i < 100; i++) {
String record = String.format(
"{\"device_name\": \"sensor-%d\", \"temp\": 22, \"humidity\": 55}", i
);
lastOffset = stream.ingestRecordOffset(record);
}
stream.waitForOffset(lastOffset);
} finally {
stream.close();
}
}
}
Protocol Buffers: Para ingestão com segurança de tipo, use ZerobusProtoStream com createProtoStream(). Gere um esquema a partir da sua tabela usando a ferramenta JAR incluída e, em seguida, compile-o com protoc.
Para obter documentação completa, opções de configuração, ingestão de lotes e exemplos do Protocol Buffer, consulte o repositório SDK Java.
É necessário o Go versão 1.21 ou superior. O SDK envolve o Rust SDK de alto desempenho usando CGO e FFI, fornecendo a mesma taxa de transferência e desempenho. Ele suporta JSON (mais simples) e Protocol Buffers (recomendado para produção).
go get github.com/databricks/zerobus-sdk-go@latest
Exemplo de JSON:
Para simplificar, os erros serão ignorados aqui. Em código de produção, sempre verifique os erros.
package main
import (
"fmt"
zerobus "github.com/databricks/zerobus-sdk-go"
)
// See "Get your workspace URL and Zerobus Ingest endpoint" for information on obtaining these values.
const (
ServerEndpoint = "https://1234567890123456.zerobus.us-west-2.cloud.databricks.com"
DatabricksWorkspaceURL = "https://dbc-a1b2c3d4-e5f6.cloud.databricks.com"
TableName = "main.default.air_quality"
ClientID = "your-client-id"
ClientSecret = "your-client-secret"
)
func main() {
sdk, _ := zerobus.NewZerobusSdk(
ServerEndpoint,
DatabricksWorkspaceURL,
)
defer sdk.Free()
options := zerobus.DefaultStreamConfigurationOptions()
options.RecordType = zerobus.RecordTypeJson
stream, _ := sdk.CreateStream(
zerobus.TableProperties{
TableName: TableName,
},
ClientID,
ClientSecret,
options,
)
defer stream.Close()
offset, _ := stream.IngestRecordOffset(`{
"device_name": "sensor-001",
"temp": 20,
"humidity": 60
}`)
_ = stream.WaitForOffset(offset)
fmt.Println("Record ingested successfully")
_ = stream.Close()
fmt.Println("Stream closed successfully")
}
Protocol Buffers: Para ingestão segura de tipos, use Protocol Buffers com RecordTypeProto (default) e forneça um descriptorProto nas propriedades da tabela. Crie um arquivo .proto arquivo correspondente ao esquema da sua tabela e script de execução generate_proto para ajudar você a importar os arquivos para o seu projeto.
Para obter documentação completa, opções de configuração, ingestão de lotes, ferramenta generate_proto e exemplos do Protocol Buffer, consulte o repositório SDK Go.
É necessário o Node.js versão 16 ou superior. O SDK encapsula o SDK Rust de alto desempenho usando vinculações nativas NAPI-RS, fornecendo desempenho nativo com futuros Rust mapeados para Promises JavaScript. Ele suporta JSON (mais simples) e Protocol Buffers (recomendado para produção).
npm install @databricks/zerobus-ingest-sdk
Exemplo de JSON:
import { ZerobusSdk, RecordType } from '@databricks/zerobus-ingest-sdk';
// See "Get your workspace URL and Zerobus Ingest endpoint" for information on obtaining these values.
const SERVER_ENDPOINT = 'https://1234567890123456.zerobus.us-west-2.cloud.databricks.com';
const DATABRICKS_WORKSPACE_URL = 'https://dbc-a1b2c3d4-e5f6.cloud.databricks.com';
const TABLE_NAME = 'main.default.air_quality';
const CLIENT_ID = 'your-client-id';
const CLIENT_SECRET = 'your-client-secret';
const sdk = new ZerobusSdk(SERVER_ENDPOINT, DATABRICKS_WORKSPACE_URL);
const stream = await sdk.createStream({ tableName: TABLE_NAME }, CLIENT_ID, CLIENT_SECRET, {
recordType: RecordType.Json,
});
try {
let lastOffset = BigInt(0);
for (let i = 0; i < 100; i++) {
const record = { device_name: `sensor-${i}`, temp: 22, humidity: 55 };
lastOffset = await stream.ingestRecordOffset(record);
}
await stream.waitForOffset(lastOffset);
} finally {
await stream.close();
}
Protocol Buffers: Para ingestão segura de tipos, use Protocol Buffers com RecordType.Proto (default) e forneça um descriptorProto nas propriedades da tabela.
Para obter documentação completa, opções de configuração, ingestão de lotes e exemplos do Protocol Buffer, consulte o repositório SDK do TypeScript.
Visualização
A API REST para o conector Zerobus Ingest está em versão Beta.
A API REST permite que você insira um único registro enviando uma solicitação HTTP POST para o endpoint /zerobus/v1/tables/<table-name>/insert . O próprio registro está incluído no corpo da solicitação e deve estar no formato JSON.
Este exemplo mostra como usar o CURL para enviar dados para o Zerobus Ingest usando a API REST.
Cabeçalhos
A solicitação requer dois cabeçalhos HTTP específicos para autenticar e formatar a solicitação corretamente.
-
Content-Type: application/json- Campo obrigatório para especificar o tipo de conteúdo. Atualmente, JSON é o único formato de mensagem suportado.
-
Authorization: Bearer <token>- Substitua
<token>pelos tokens OAuth que você obteve usando o comando curl fornecido posteriormente.
- Substitua
Obter tokens OAuth : Esses tokens expiram a cada hora e precisam ser renovados. Você pode refresh -los buscando novamente os tokens OAuth .
Preencha os seguintes parâmetros:
$CATALOG,$SCHEMA,$TABLE,$WORKSPACE_ID,$WORKSPACE_URL$DATABRICKS_CLIENT_IDe$DATABRICKS_CLIENT_SECRET- Esses dois parâmetros correspondem ao princípio de serviço que você criou.
authorization_details=$(cat <<EOF
[{
"type": "unity_catalog_privileges",
"privileges": ["USE CATALOG"],
"object_type": "CATALOG",
"object_full_path": "$CATALOG"
},
{
"type": "unity_catalog_privileges",
"privileges": ["USE SCHEMA"],
"object_type": "SCHEMA",
"object_full_path": "$CATALOG.$SCHEMA"
},
{
"type": "unity_catalog_privileges",
"privileges": ["SELECT", "MODIFY"],
"object_type": "TABLE",
"object_full_path": "$CATALOG.$SCHEMA.$TABLE"
}]
EOF
)
export OAUTH_TOKEN=$(curl -X POST \
-u "$DATABRICKS_CLIENT_ID:$DATABRICKS_CLIENT_SECRET" \
-d "grant_type=client_credentials" \
-d "scope=all-apis" \
-d "resource=api://databricks/workspaces/$WORKSPACE_ID/zerobusDirectWriteApi" \
--data-urlencode "authorization_details=$authorization_details" \
"$WORKSPACE_URL/oidc/v1/token" | jq -r '.access_token')
Registro de ingestão:
Preencha os seguintes parâmetros:
-
$ZEROBUS_ENDPOINT- Conforme definido na seção Obtenha o URL do seu workspace e o endpoint de ingestão do Zerobus .
-
$CATALOG,$SCHEMA,$TABLE,$WORKSPACE_ID,$WORKSPACE_URL -
$OAUTH_TOKEN- Isso foi criado no passo anterior.
O corpo da requisição deve ser uma lista de objetos JSON.
curl -X POST \
"$ZEROBUS_ENDPOINT/zerobus/v1/tables/$CATALOG.$SCHEMA.$TABLE/insert" \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $OAUTH_TOKEN" \
-d '[{ "device_name": "device_num_1", "temp": 28, "humidity": 60 },
{ "device_name": "device_num_1", "temp": 28, "humidity": 60 }]'
Se todas as informações forem preenchidas corretamente, você deverá receber uma resposta JSON vazia com um código de status HTTP 200.