Utilize o conector Zerobus Ingest.
Visualização
O conector Zerobus Ingest está em versão prévia pública. Para experimentar, entre em contato com seu representante account Databricks .
Esta página descreve como ingerir o uso de dados do conector de gravação direta Zerobus no LakeFlow Connect.
Obtenha o URL do seu workspace .
Ao visualizar seu workspace Databricks após fazer login, observe o URL em seu navegador com o seguinte formato: https://<databricks-instance>.com/o=XXXXX. A URL consiste em tudo o que vem antes de /o=XXXXX, por exemplo:
URL completa: https://abcd-teste2-test-spcse2.cloud.databricks.com/?o=2281745829657864#
URL do espaço de trabalho: https://abcd-teste2-test-spcse2.cloud.databricks.com
Criar ou identificar a tabela de destino
Identifique a tabela de destino na qual você deseja inserir os dados. Para criar uma nova tabela de destino, execute o comando SQL CREATE TABLE . Por exemplo:
CREATE TABLE unity.default.air_quality (
device_name STRING, temp INT, humidity LONG);
Crie uma entidade de serviço e conceda permissões.
Uma entidade de serviço é uma identidade especializada que oferece mais segurança do que uma conta personalizada. Mais informações sobre entidades de serviço e como utilizá-las para autenticação podem ser encontradas nestas instruções.
-
Crie uma entidade de serviço em seu workspace em Configurações > Identidade e Acesso .
-
Gere e salve o ID do cliente e o segredo do cliente para a entidade de serviço.
-
Conceda as permissões necessárias para o catálogo, o esquema e a tabela à entidade de serviço:
- Na página entidade de serviço, abra a tab Configurações .
- Copie o ID do aplicativo (UUID).
- Utilize o seguinte SQL, substituindo o UUID e o nome de usuário de exemplo, o nome do esquema e os nomes das tabelas, se necessário:
SQLGRANT USE CATALOG ON CATALOG <username> TO `<UUID>`;
GRANT USE SCHEMA ON SCHEMA <username.default> TO `<UUID>`;
GRANT MODIFY, SELECT ON TABLE <username.default.table_name> TO `<UUID>`;
Formatos de endpoint
Os formatos de URL endpoint do servidor Zerobus e workspace variam conforme o provedor cloud :
Nuvem | endpointdo servidor | URL do espaço de trabalho |
|---|---|---|
AWS |
|
|
Azure |
|
|
Exemplo para AWS:
Server endpoint: 1234567890123456.zerobus.us-west-2.cloud.databricks.com
Workspace URL: https://dbc-a1b2c3d4-e5f6.cloud.databricks.com
Exemplo para o Azure:
Server endpoint: 1234567890123456.zerobus.eastus.azuredatabricks.net
Workspace URL: https://adb-1234567890123456.12.azuredatabricks.net
Escreva para um cliente
- Python SDK
- Rust SDK
- Java SDK
- Go SDK
- TypeScript SDK
É necessário o Python 3.9 ou superior. O SDK oferece suporte a JSON (mais simples) e Protocol Buffers (recomendado para produção).
pip install databricks-zerobus-ingest-sdk
Exemplo de JSON:
from zerobus.sdk.sync import ZerobusSdk
from zerobus.sdk.shared import RecordType, StreamConfigurationOptions, TableProperties
# Configuration - see "Before you begin" section for how to obtain these values.
server_endpoint = "1234567890123456.zerobus.us-west-2.cloud.databricks.com"
workspace_url = "https://dbc-a1b2c3d4-e5f6.cloud.databricks.com"
table_name = "main.default.air_quality"
client_id = "your-service-principal-application-id"
client_secret = "your-service-principal-secret"
sdk = ZerobusSdk(server_endpoint, workspace_url)
table_properties = TableProperties(table_name)
options = StreamConfigurationOptions(record_type=RecordType.JSON)
stream = sdk.create_stream(client_id, client_secret, table_properties, options)
try:
for i in range(100):
record = {"device_name": f"sensor-{i}", "temp": 22, "humidity": 55}
ack = stream.ingest_record(record) # Pass dict directly, SDK handles serialization.
ack.wait_for_ack()
finally:
stream.close()
Retorno de chamada de confirmação: Para rastrear o progresso da ingestão de forma assíncrona, use a opção ack_callback . A função de retorno recebe um objeto IngestRecordResponse com um atributo durability_ack_up_to_offset (int) indicando que todos os registros até aquele deslocamento foram gravados de forma permanente:
from zerobus.sdk.shared import IngestRecordResponse
def on_ack(response: IngestRecordResponse) -> None:
print(f"Records acknowledged up to offset: {response.durability_ack_up_to_offset}")
options = StreamConfigurationOptions(
record_type=RecordType.JSON,
ack_callback=on_ack
)
Protocol Buffers: Para ingestão com segurança de tipo, use Protocol Buffers com RecordType.PROTO (default). Gere um esquema a partir da sua tabela usando python -m zerobus.tools.generate_proto.
Para obter documentação completa, opções de configuração, API assíncrona e exemplos do Protocol Buffer, consulte o repositório do SDK Python.
É necessário o Rust 1.70 ou superior. O SDK oferece suporte a JSON (mais simples) e Protocol Buffers (recomendado para produção).
cargo add databricks-zerobus-ingest-sdk
cargo add tokio --features macros,rt-multi-thread
Exemplo de JSON:
use std::error::Error;
use databricks_zerobus_ingest_sdk::{
RecordType, StreamConfigurationOptions, TableProperties, ZerobusSdk
};
// Configuration - see "Before you begin" section for how to obtain these values.
const ZEROBUS_ENDPOINT: &str = "workspace-id.zerobus.region.cloud.databricks.com";
const WORKSPACE_URL: &str = "https://your-workspace.cloud.databricks.com";
const TABLE_NAME: &str = "catalog.schema.air_quality";
const CLIENT_ID: &str = "your-client-id";
const CLIENT_SECRET: &str = "your-client-secret";
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let table_properties = TableProperties {
table_name: TABLE_NAME.to_string(),
descriptor_proto: None,
};
let options = StreamConfigurationOptions {
record_type: RecordType::Json,
..Default::default()
};
let sdk = ZerobusSdk::new(ZEROBUS_ENDPOINT.to_string(), WORKSPACE_URL.to_string())?;
let mut stream = sdk.create_stream(
table_properties, CLIENT_ID.to_string(), CLIENT_SECRET.to_string(), Some(options)
).await?;
for i in 0..100 {
let record = format!(r#"{{"device_name": "sensor-{}", "temp": 22, "humidity": 55}}"#, i);
let ack = stream.ingest_record(record.into_bytes()).await?;
ack.await?;
}
stream.close().await?;
Ok(())
}
Protocol Buffers: Para ingestão com segurança de tipo, use Protocol Buffers com RecordType::Proto (default). Gere um esquema usando a ferramenta tools/generate_files no repositório.
Para obter documentação completa, opções de configuração, ingestão de lotes e exemplos do Protocol Buffer, consulte o repositório SDK Rust.
É necessário o Java 8 ou superior. O SDK utiliza Protocol Buffers para definição de esquemas com segurança de tipos.
Maven:
<dependency>
<groupId>com.databricks</groupId>
<artifactId>zerobus-ingest-sdk</artifactId>
<version>0.1.0</version>
</dependency>
Configuração do Protocol Buffer:
Gere um esquema proto a partir da sua tabela e, em seguida, compile-o:
# Generate proto from Unity Catalog table schema.
java -jar zerobus-ingest-sdk-0.1.0-jar-with-dependencies.jar \
--uc-endpoint "https://dbc-a1b2c3d4-e5f6.cloud.databricks.com" \
--client-id "<client-id>" --client-secret "<client-secret>" \
--table "<catalog.schema.table>" --output "record.proto"
# Compile proto to Java.
protoc --java_out=src/main/java record.proto
Exemplo:
import com.databricks.zerobus.*;
import com.example.proto.Record.AirQuality;
public class ZerobusClient {
public static void main(String[] args) throws Exception {
// Configuration - see "Before you begin" section for how to obtain these values.
ZerobusSdk sdk = new ZerobusSdk(
"1234567890123456.zerobus.us-west-2.cloud.databricks.com",
"https://dbc-a1b2c3d4-e5f6.cloud.databricks.com"
);
TableProperties<AirQuality> tableProperties = new TableProperties<>(
"main.default.air_quality",
AirQuality.getDefaultInstance()
);
ZerobusStream<AirQuality> stream = sdk.createStream(
tableProperties, "your-client-id", "your-client-secret"
).join();
try {
for (int i = 0; i < 100; i++) {
AirQuality record = AirQuality.newBuilder()
.setDeviceName("sensor-" + i)
.setTemp(22)
.setHumidity(55)
.build();
stream.ingestRecord(record).join();
}
} finally {
stream.close();
}
}
}
Para obter documentação completa, opções de configuração, ingestão não bloqueante e configuração de registro, consulte o repositório do SDK Java.
É necessário o Go versão 1.21 ou superior. O SDK oferece suporte a JSON (mais simples) e Protocol Buffers (recomendado para produção).
go get github.com/databricks/zerobus-go-sdk
Exemplo de JSON:
package main
import (
"fmt"
"log"
zerobus "github.com/databricks/zerobus-go-sdk/sdk"
)
func main() {
// Configuration - see "Before you begin" section for how to obtain these values.
sdk, _ := zerobus.NewZerobusSdk(
"1234567890123456.zerobus.us-west-2.cloud.databricks.com",
"https://dbc-a1b2c3d4-e5f6.cloud.databricks.com",
)
defer sdk.Free()
options := zerobus.DefaultStreamConfigurationOptions()
options.RecordType = zerobus.RecordTypeJson
stream, _ := sdk.CreateStream(
zerobus.TableProperties{TableName: "main.default.air_quality"},
"your-client-id", "your-client-secret", options,
)
defer stream.Close()
for i := 0; i < 100; i++ {
record := fmt.Sprintf(`{"device_name": "sensor-%d", "temp": 22, "humidity": 55}`, i)
ack, _ := stream.IngestRecord(record)
ack.Await()
}
stream.Flush()
}
Protocol Buffers: Para ingestão segura de tipos, use Protocol Buffers com RecordTypeProto (default) e forneça um DescriptorProto em TableProperties.
Para obter documentação completa, opções de configuração e exemplos do Protocol Buffer, consulte o repositório do SDK Go.
É necessário o Node.js versão 16 ou superior. O SDK oferece suporte a JSON (mais simples) e Protocol Buffers (recomendado para produção).
npm install @databricks/zerobus-ingest-sdk
Exemplo de JSON:
import { ZerobusSdk, RecordType } from '@databricks/zerobus-ingest-sdk';
// Configuration - see "Before you begin" section for how to obtain these values.
const sdk = new ZerobusSdk(
'1234567890123456.zerobus.us-west-2.cloud.databricks.com',
'https://dbc-a1b2c3d4-e5f6.cloud.databricks.com',
);
const stream = await sdk.createStream(
{ tableName: 'main.default.air_quality' },
'your-client-id',
'your-client-secret',
{ recordType: RecordType.Json },
);
try {
for (let i = 0; i < 100; i++) {
const record = { device_name: `sensor-${i}`, temp: 22, humidity: 55 };
await stream.ingestRecord(record);
}
await stream.flush();
} finally {
await stream.close();
}
Protocol Buffers: Para ingestão segura de tipos, use Protocol Buffers com RecordType.Proto (default) e forneça um descriptorProto nas propriedades da tabela.
Para obter documentação completa, opções de configuração e exemplos do Protocol Buffer, consulte o repositório do SDK do TypeScript.