Use o conector Zerobus Ingest
Visualização
O conector Zerobus Ingest está em visualização pública. Para experimentar, entre em contato com seu representante account Databricks .
Esta página descreve como ingerir uso de dados no conector de gravação direta Zerobus no LakeFlow Connect.
Obtenha a URL do seu workspace
Ao visualizar seu workspace Databricks após efetuar login, observe a URL no seu navegador com o seguinte formato: https://<databricks-instance>.com/o=XXXXX. A URL consiste em tudo antes de /o=XXXXX, por exemplo:
URL completa: https://abcd-teste2-test-spcse2.cloud.databricks.com/?o=2281745829657864#
URL do espaço de trabalho: https://abcd-teste2-test-spcse2.cloud.databricks.com
Crie ou identifique a tabela de destino
Identifique a tabela de destino que você deseja ingerir. Para criar uma nova tabela de destino, execute o comando SQL CREATE TABLE . Por exemplo:
CREATE TABLE unity.default.air_quality (
device_name STRING, temp INT, humidity LONG);
Crie uma entidade de serviço e conceda permissões
Uma entidade de serviço é uma identidade especializada que fornece mais segurança do que uma conta personalizada. Mais informações sobre entidades de serviço e como usá-las para autenticação podem ser encontradas nestas instruções.
-
Crie uma entidade de serviço no seu workspace em Configurações > Identidade e acesso .
-
Gere e salve o ID do cliente e o segredo do cliente para a entidade de serviço.
-
Conceda as permissões necessárias para o catálogo, o esquema e a tabela à entidade de serviço:
- Na página entidade de serviço, abra a tab Configurações .
- Copie o ID do aplicativo (UUID).
- Use o seguinte SQL, substituindo o UUID de exemplo e o nome de usuário, o nome do esquema e os nomes das tabelas, se necessário:
SQLGRANT USE CATALOG ON CATALOG <username> TO `<UUID>`;
GRANT USE SCHEMA ON SCHEMA <username.default> TO `<UUID>`;
GRANT MODIFY, SELECT ON TABLE <username.default.table_name> TO `<UUID>`;
Escreva para um cliente
- Python SDK
- Rust SDK
- Java SDK
É necessário Python 3.9 ou superior.
-
Instale o SDK do Python.
Bashpip install databricks-zerobus-ingest-sdkComo alternativa, instale a partir da fonte:
Bashgit clone https://github.com/databricks/zerobus-sdk-py.git
cd zerobus-sdk-py
pip install -e . -
Gerar uma definição de Buffer de Protocolo.
Gere automaticamente uma definição de protótipo a partir do esquema da tabela do Unity Catalog usando a ferramenta
generate_proto:Bashpython -m zerobus.tools.generate_proto \
--uc-endpoint "https://dbc-a1b2c3d4-e5f6.cloud.databricks.com" \
--client-id "<service-principal-application-id>" \
--client-secret "<service-principal-secret>" \
--table "<catalog.schema.table_name>" \
--output "record.proto"Exemplo de saída para uma tabela:
SQLCREATE TABLE main.default.air_quality (
device_name STRING,
temp INT,
humidity BIGINT
)
USING DELTA;Gera
record.proto:Protosyntax = "proto2";
message air_quality {
optional string device_name = 1;
optional int32 temp = 2;
optional int64 humidity = 3;
}Você pode aprender mais sobre mensagens Protobuf aqui.
-
Compile a definição do Protocol Buffer.
Instale
grpcio-toolse compile seu arquivo proto:Bashpip install "grpcio-tools>=1.60.0,<2.0"
python -m grpc_tools.protoc --python_out=. --proto_path=. record.protoIsso gera um arquivo
record_pb2.pycontendo a definição do buffer de protocolo compilado. -
Crie um cliente Python.
Para usar o SDK, você precisa das seguintes informações:
- URL workspace Databricks
- ID do espaço de trabalho (encontrado na URL do seu workspace após
/o=) - Nome da tabela
- entidade de serviço ID do cliente e segredo do cliente
- endpoint do Zerobus no formato
https://<workspace_id>.zerobus.<region>.cloud.databricks.com
Exemplo síncrono:
Pythonimport logging
from zerobus.sdk.sync import ZerobusSdk
from zerobus.sdk.shared import TableProperties
import record_pb2
# Configure logging (optional).
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
# Configuration.
server_endpoint = "1234567890123456.zerobus.us-west-2.cloud.databricks.com"
workspace_url = "https://dbc-a1b2c3d4-e5f6.cloud.databricks.com"
table_name = "main.default.air_quality"
client_id = "your-service-principal-application-id"
client_secret = "your-service-principal-secret"
# Initialize SDK.
sdk = ZerobusSdk(server_endpoint, workspace_url)
# Configure table properties.
table_properties = TableProperties(
table_name,
record_pb2.AirQuality.DESCRIPTOR
)
# Create stream.
stream = sdk.create_stream(
client_id,
client_secret,
table_properties
)
try:
# Ingest records.
for i in range(100):
record = record_pb2.AirQuality(
device_name=f"sensor-{i % 10}",
temp=20 + (i % 15),
humidity=50 + (i % 40)
)
ack = stream.ingest_record(record)
ack.wait_for_ack() # Wait for durability.
print(f"Ingested record {i + 1}")
print("Successfully ingested 100 records!")
finally:
stream.close()
Opções de configuração
O SDK suporta várias opções de configuração via StreamConfigurationOptions:
Opção | Padrão | Descrição |
|---|---|---|
máximo_registros_de_voo | 50000 | Número máximo de registros não reconhecidos |
recuperação | True | Habilitar recuperação automática de transmissão |
tempo_limite_de_recuperação_ms | 15000 | Tempo limite para operações de recuperação (ms) |
recuperação_recuo_ms | 2000 | Atraso entre tentativas de recuperação (ms) |
tentativas_de_recuperação | 3 | Número máximo de tentativas de recuperação |
tempo_limite_de_descarga_ms | 300000 | Tempo limite para operações de limpeza (ms) |
servidor_falta_de_tempo_limite_de_recepção_ms | 60000 | Tempo limite de confirmação do servidor (ms) |
ack_callback | Nenhuma | Retorno de chamada invocado na confirmação do registro |
Tratamento de erros
O SDK fornece dois tipos de exceção:
ZerobusException: Erros recuperáveis (problemas de rede, falhas temporárias)NonRetriableException: Erros não recuperáveis (credenciais inválidas, tabela ausente)
from zerobus.sdk.shared import ZerobusException, NonRetriableException
try:
stream.ingest_record(record)
except NonRetriableException as e:
print(f"Fatal error: {e}")
raise
except ZerobusException as e:
print(f"Retriable error: {e}")
# Implement retry logic with backoff.
-
Clone o repositório do Zerobus SDK (necessário para a ferramenta de geração de esquema e exemplos).
Bashgit clone https://github.com/databricks/zerobus-sdk-rs.git
cd zerobus-sdk-rsVocê deverá ver algo assim:
Textzerobus-sdk-rs/
├── sdk/ # Core Zerobus SDK library
│ ├── src/ # Core SDK logic
│ ├── zerobus_service.proto # gRPC protocol definition
│ ├── build.rs # Build script for protobuf compilation
│ └── Cargo.toml
│
├── tools/
│ └── generate_files/ # Schema generation CLI tool
│ ├── src/ # Unity Catalog -> Proto conversion
│ ├── README.md # Tool documentation
│ └── Cargo.toml
│
├── examples/
│ └── basic_example/ # Working example application
│ ├── README.md # Example documentation
│ ├── src/main.rs # Example usage code
│ ├── output/ # Generated schema files
│ │ ├── orders.proto
│ │ ├── orders.rs
│ │ └── orders.descriptor
│ └── Cargo.toml
│
└── README.md # Main README file -
Crie seu projeto Rust.
Bashcd .. # Go back to your workspace directory
cargo new example_project
cd example_projectVocê deve obter algo parecido com o seguinte:
Textexample_project/
├── Cargo.toml
└── src/
└── main.rs -
Adicione dependências ao seu
Cargo.toml:Toml[dependencies]
zerobus-ingest-sdk = "0.1.0" # From crates.io
prost = "0.13.3"
prost-types = "0.13.3"
tokio = { version = "1.42.0", features = ["macros", "rt-multi-thread"] } -
execução a ferramenta de geração de esquema para gerar arquivos protobuf necessários para ingestão. Esta ferramenta ajuda a gerar a definição proto para sua tabela, juntamente com um descritor e o arquivo Rust correspondente.
Bash# From your example_project directory
cd ../zerobus-sdk-rs/tools/generate_files
cargo run --
--uc-endpoint "https://your-workspace.cloud.databricks.com"
--client-id "your-client-id"
--client-secret "your-client-secret"
--table "catalog.schema.table"
--output-dir "../../output"Isso criará uma pasta
outputno seu projeto com 3 arquivos dentro:Textexample_project/
├── Cargo.toml
├── output/
│ ├── <table_name>.rs
│ ├── <table_name>.proto
│ └── <table_name>.descriptor
└── src/
└── main.rs -
Crie seu aplicativo cliente.
Para usar o SDK, você precisa das seguintes informações:
- URL workspace Databricks (por exemplo,
https://your-workspace.cloud.databricks.com) - Nome da tabela (por exemplo,
catalog.schema.table) - ID e segredo do cliente da entidade de serviço
- endpoint do Zerobus no formato
https://<workspace_id>.zerobus.<region>.cloud.databricks.com
Exemplo com uma tabela simulada
air_quality:Depois de gerar todos os arquivos, sua árvore de arquivos fica assim:
Textexample_project/
├── Cargo.toml
├── output/
│ ├── air_quality.rs
│ ├── air_quality.proto
│ └── air_quality.descriptor
└── src/
└── main.rsair_quality.protoProtosyntax = "proto2";
message table_AirQuality {
optional string device_name = 1;
optional int32 temp = 2;
optional int64 humidity = 3;
}air_quality.rs(gerado automaticamente)Rust// @generated by prost-build
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct table_AirQuality {
#[prost(string, optional, tag = "1")]
pub device_name: Option<String>,
#[prost(int32, optional, tag = "2")]
pub temp: Option<i32>,
#[prost(int64, optional, tag = "3")]
pub humidity: Option<i64>,
}main.rs(seu código de aplicação):Rustuse std::error::Error;
use databricks_zerobus_ingest_sdk::{
StreamConfigurationOptions, TableProperties, ZerobusSdk
};
use prost_types::DescriptorProto;
use std::fs;
use prost::Message;
// Change constants to match your setup
const DATABRICKS_WORKSPACE_URL: &str = "https://your-workspace.cloud.databricks.com";
const TABLE_NAME: &str = "catalog.schema.air_quality";
const DATABRICKS_CLIENT_ID: &str = "your-client-id";
const DATABRICKS_CLIENT_SECRET: &str = "your-client-secret";
const ZEROBUS_ENDPOINT: &str = "https://workspace-id.zerobus.region.cloud.databricks.com";
pub mod air_quality {
include!("../output/air_quality.rs");
}
use crate::air_quality::table_AirQuality;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let descriptor_proto = load_descriptor_proto(
"output/air_quality.descriptor",
"air_quality.proto",
"table_AirQuality"
);
let table_properties = TableProperties {
table_name: TABLE_NAME.to_string(),
descriptor_proto,
};
let stream_configuration_options = StreamConfigurationOptions {
max_inflight_records: 1000,
..Default::default()
};
let sdk = ZerobusSdk::new(
ZEROBUS_ENDPOINT.to_string(),
DATABRICKS_WORKSPACE_URL.to_string(),
)?;
let mut stream = sdk
.create_stream(
table_properties,
DATABRICKS_CLIENT_ID.to_string(),
DATABRICKS_CLIENT_SECRET.to_string(),
Some(stream_configuration_options),
)
.await?;
println!("Stream created successfully!");
let record = table_AirQuality {
device_name: Some("sensor-001".to_string()),
temp: Some(22),
humidity: Some(55),
};
let ack_future = stream
.ingest_record(record.encode_to_vec())
.await?;
println!("Record submitted for ingestion");
let offset_id = ack_future.await?;
println!("Record acknowledged at offset: {}", offset_id);
stream.close().await?;
println!("Stream closed successfully");
Ok(())
}
fn load_descriptor_proto(
path: &str,
file_name: &str,
message_name: &str
) -> DescriptorProto {
let descriptor_bytes = fs::read(path)
.expect("Failed to read proto descriptor file");
let file_descriptor_set = prost_types::FileDescriptorSet::decode(
descriptor_bytes.as_ref()
).unwrap();
let file_descriptor_proto = file_descriptor_set
.file
.into_iter()
.find(|f| f.name.as_ref().map(|n| n.as_str()) == Some(file_name))
.expect("File descriptor not found");
file_descriptor_proto
.message_type
.into_iter()
.find(|m| m.name.as_ref().map(|n| n.as_str()) == Some(message_name))
.expect("Message descriptor not found")
} - URL workspace Databricks (por exemplo,
-
execução da sua aplicação:
Bashcd example_project
cargo runVocê deverá ver uma saída como esta:
TextStream created successfully!
Record submitted for ingestion
Record acknowledged at offset: 0
Stream closed successfully
É necessário Java 8 ou superior.
-
Instale o Java SDK.
Opção A: Maven
Adicione a dependência ao seu
pom.xml:XML<dependency>
<groupId>com.databricks</groupId>
<artifactId>zerobus-ingest-sdk</artifactId>
<version>0.1.0</version>
</dependency>Para usar o JAR gordo com todas as dependências agrupadas:
XML<dependency>
<groupId>com.databricks</groupId>
<artifactId>zerobus-ingest-sdk</artifactId>
<version>0.1.0</version>
<classifier>jar-with-dependencies</classifier>
</dependency>Opção B: Construir a partir do código-fonte
Clone e construa o SDK:
Bashgit clone https://github.com/databricks/zerobus-sdk-java.git
cd zerobus-sdk-java
mvn clean packageIsso gera dois arquivos JAR no diretório
target/:-
Fat JAR (recomendado) :
zerobus-ingest-sdk-0.1.0-jar-with-dependencies.jar(~18 MB)- Autocontido com todas as dependências agrupadas
- Nenhuma dependência adicional necessária
-
JAR regular :
zerobus-ingest-sdk-0.1.0.jar(~144 KB)- Somente classes SDK
- Requer dependências no classpath
Dependências (gerenciadas automaticamente pelo Maven ou necessárias se estiver usando JAR regular manualmente):
protobuf-java3.24.0grpc-netty-shaded1.58.0grpc-protobuf1.58.0grpc-stub1.58.0javax.annotation-api1.3.2slf4j-api1.7.36
-
-
Gerar uma definição de Buffer de Protocolo.
Gere automaticamente uma definição de protótipo a partir do esquema de tabela do Unity Catalog usando a ferramenta integrada.
Se estiver usando o JAR gordo da compilação:
Bashjava -jar zerobus-ingest-sdk-0.1.0-jar-with-dependencies.jar \
--uc-endpoint "https://dbc-a1b2c3d4-e5f6.cloud.databricks.com" \
--client-id "<service-principal-application-id>" \
--client-secret "<service-principal-secret>" \
--table "<catalog.schema.table_name>" \
--output "record.proto" \
--proto-msg "AirQuality"Se estiver usando dependência Maven:
Bashmvn dependency:copy-dependencies
java -jar target/dependency/zerobus-ingest-sdk-0.1.0-jar-with-dependencies.jar \
--uc-endpoint "https://dbc-a1b2c3d4-e5f6.cloud.databricks.com" \
--client-id "<service-principal-application-id>" \
--client-secret "<service-principal-secret>" \
--table "<catalog.schema.table_name>" \
--output "record.proto" \
--proto-msg "AirQuality"Exemplo de saída para uma tabela:
SQLCREATE TABLE main.default.air_quality (
device_name STRING,
temp INT,
humidity BIGINT
)
USING DELTA;Gera
record.proto:Protosyntax = "proto2";
package com.example;
option java_package = "com.example.proto";
option java_outer_classname = "Record";
message AirQuality {
optional string device_name = 1;
optional int32 temp = 2;
optional int64 humidity = 3;
}Você pode aprender mais sobre mensagens Protobuf aqui.
-
Compile a definição do Protocol Buffer.
Usando Maven
Adicione o plugin protobuf ao seu
pom.xml:XML<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.7.1</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:3.24.0:exe:${os.detected.classifier}</protocArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>Coloque seu
record.protoemsrc/main/proto/e execute:Bashmvn compileUsando protoc diretamente
Instalar o compilador Protocol Buffers (protoc 3.24.0) e compilar:
Bashprotoc --java_out=src/main/java src/main/proto/record.protoIsso gera um arquivo Java (por exemplo,
src/main/java/com/example/proto/Record.java). -
Crie um cliente Java.
Para usar o SDK, você precisa das seguintes informações:
- URL workspace Databricks
- ID do espaço de trabalho (encontrado na URL do seu workspace após
/o=) - Nome da tabela
- entidade de serviço ID do cliente e segredo do cliente
- endpoint do Zerobus no formato
https://<workspace_id>.zerobus.<region>.cloud.databricks.com
Exemplo de ingestão de bloqueio:
Javapackage com.example;
import com.databricks.zerobus.*;
import com.example.proto.Record.AirQuality;
public class ZerobusClient {
public static void main(String[] args) throws Exception {
// Configuration.
String serverEndpoint = "1234567890123456.zerobus.us-west-2.cloud.databricks.com";
String workspaceUrl = "https://dbc-a1b2c3d4-e5f6.cloud.databricks.com";
String tableName = "main.default.air_quality";
String clientId = "your-service-principal-application-id";
String clientSecret = "your-service-principal-secret";
// Initialize SDK.
ZerobusSdk sdk = new ZerobusSdk(serverEndpoint, workspaceUrl);
// Configure table properties.
TableProperties<AirQuality> tableProperties = new TableProperties<>(
tableName,
AirQuality.getDefaultInstance()
);
// Create stream.
ZerobusStream<AirQuality> stream = sdk.createStream(
tableProperties,
clientId,
clientSecret
).join();
try {
// Ingest records.
for (int i = 0; i < 100; i++) {
AirQuality record = AirQuality.newBuilder()
.setDeviceName("sensor-" + (i % 10))
.setTemp(20 + (i % 15))
.setHumidity(50 + (i % 40))
.build();
stream.ingestRecord(record).join(); // Wait for durability.
System.out.println("Ingested record " + (i + 1));
}
System.out.println("Successfully ingested 100 records!");
} finally {
stream.close();
}
}
}Exemplo de ingestão não bloqueadora:
Para ingestão com alta taxa de transferência:
Javapackage com.example;
import com.databricks.zerobus.*;
import com.example.proto.Record.AirQuality;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
public class NonBlockingClient {
public static void main(String[] args) throws Exception {
// Configuration.
String serverEndpoint = "1234567890123456.zerobus.us-west-2.cloud.databricks.com";
String workspaceUrl = "https://dbc-a1b2c3d4-e5f6.cloud.databricks.com";
String tableName = "main.default.air_quality";
String clientId = "your-service-principal-application-id";
String clientSecret = "your-service-principal-secret";
// Initialize SDK.
ZerobusSdk sdk = new ZerobusSdk(serverEndpoint, workspaceUrl);
// Configure table properties.
TableProperties<AirQuality> tableProperties = new TableProperties<>(
tableName,
AirQuality.getDefaultInstance()
);
// Configure stream options.
StreamConfigurationOptions options = StreamConfigurationOptions.builder()
.setMaxInflightRecords(50000)
.setAckCallback(response ->
System.out.println("Acknowledged offset: " +
response.getDurabilityAckUpToOffset()))
.build();
// Create stream.
ZerobusStream<AirQuality> stream = sdk.createStream(
tableProperties,
clientId,
clientSecret,
options
).join();
List<CompletableFuture<Void>> futures = new ArrayList<>();
try {
// Ingest records.
for (int i = 0; i < 100000; i++) {
AirQuality record = AirQuality.newBuilder()
.setDeviceName("sensor-" + (i % 10))
.setTemp(20 + (i % 15))
.setHumidity(50 + (i % 40))
.build();
futures.add(stream.ingestRecord(record));
}
// Flush and wait for all records to be acknowledged.
stream.flush();
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
System.out.println("Successfully ingested 100,000 records!");
} finally {
stream.close();
}
}
} -
execução do seu cliente.
Usando Maven:
Bashmvn compile exec:java -Dexec.mainClass="com.example.ZerobusClient"Compilar e executar manualmente:
Bashjavac -cp "lib/*" -d out src/main/java/com/example/ZerobusClient.java src/main/java/com/example/proto/Record.java
java -cp "lib/*:out" com.example.ZerobusClient
Opções de configuração
O SDK suporta várias opções de configuração via StreamConfigurationOptions:
Opção | Padrão | Descrição |
|---|---|---|
maxInflightRecords | 50000 | Número máximo de registros não reconhecidos |
recuperação | True | Habilitar recuperação automática de transmissão |
tempo limite de recuperação Sra. | 15000 | Tempo limite para operações de recuperação (ms) |
recuperaçãoRecuoMs | 2000 | Atraso entre tentativas de recuperação (ms) |
tentativas de recuperação | 3 | Número máximo de tentativas de recuperação |
flushTimeoutMs | 300000 | Tempo limite para operações de limpeza (ms) |
serverLackOfAckTimeoutMs | 60000 | Tempo limite de confirmação do servidor (ms) |
ackRetorno de chamada | null | Retorno de chamada invocado na confirmação do registro |
Configuração de exemplo:
StreamConfigurationOptions options = StreamConfigurationOptions.builder()
.setMaxInflightRecords(10000)
.setRecovery(true)
.setRecoveryTimeoutMs(20000)
.setAckCallback(response ->
System.out.println("Ack: " + response.getDurabilityAckUpToOffset()))
.build();
Registro
O SDK usa SLF4J para registro.
Habilitar registro de depuração:
java -Dorg.slf4j.simpleLogger.log.com.databricks.zerobus=debug -cp "lib/*:out" com.example.ZerobusClient
Com Maven:
mvn exec:java -Dexec.mainClass="com.example.ZerobusClient" -Dorg.slf4j.simpleLogger.log.com.databricks.zerobus=debug
Níveis de log disponíveis: trace, debug, info, warn, error
Tratamento de erros
O SDK fornece dois tipos de exceção:
ZerobusException: Erros recuperáveis (problemas de rede, falhas temporárias)NonRetriableException: Erros não recuperáveis (credenciais inválidas, tabela ausente)
try {
stream.ingestRecord(record);
} catch (NonRetriableException e) {
System.err.println("Fatal error: " + e.getMessage());
throw e;
} catch (ZerobusException e) {
System.err.println("Retriable error: " + e.getMessage());
// Implement retry logic with backoff.
}