Buffers de protocolo de leitura e gravação
Buffers de protocolo (protobuf) é um formato de serialização binária independente de linguagem desenvolvido pelo Google. Os usuários do Databricks o encontram com mais frequência ao processar registros codificados em binário de sistemas de transmissão de eventos, como Apache Kafka. A Databricks dá suporte à leitura e gravação de dados protobuf com o Apache Spark por meio das funções from_protobuf e to_protobuf, que convertem entre tipos protobuf binário e struct do Spark SQL para cargas de trabalho de transmissão e em lotes.
Pré-requisitos
As funções Protobuf requerem Databricks Runtime 12.2 LTS ou acima.
Sintaxe da função
Use from_protobuf para converter uma coluna binária em uma estrutura e to_protobuf para converter uma coluna de estrutura em binário. Deve-se fornecer um arquivo descritor identificado pelo argumento descFilePath ou um registro de esquema especificado com o argumento options. Para uma lista completa de opções, consulte Protobuf.
- Python
- Scala
from_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)
to_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)
// While using with Schema registry:
from_protobuf(data: Column, options: Map[String, String])
// Or with Protobuf descriptor file:
from_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])
// While using with Schema registry:
to_protobuf(data: Column, options: Map[String, String])
// Or with Protobuf descriptor file:
to_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])
Opções
Passar opções para from_protobuf e to_protobuf usando o argumento options. Para obter uma lista completa de opções compatíveis, consulte Protobuf.
Opções do Registro de Esquema
As opções a seguir são específicas para o uso do registro de esquema e não são abordadas na referência geral de opções.
Opção | Obrigatório | Padrão | Descrição |
|---|---|---|---|
| Não |
| Como as alterações de esquema são tratadas quando um ID de esquema mais recente é detectado em um registro de entrada. |
| Não | — | Passe qualquer opção de cliente do Registro de Esquema Confluent usando o prefixo |
Uso
Os exemplos a seguir usam o dataset Wanderbricks para demonstrar a serialização de structs do Apache Spark para protobuf binário com to_protobuf() e a desserialização de registros protobuf binários com from_protobuf().
Use protobuf com o Confluent Schema Registry
O Databricks suporta o uso do Confluent Schema Registry para definir o Protobuf.
- Python
- Scala
from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf
from pyspark.sql.functions import struct
schema_registry_options = {
"schema.registry.subject" : "app-events-value",
"schema.registry.address" : "https://schema-registry:8081/"
}
# Serialize Wanderbricks reviews to binary Protobuf using schema registry
reviews_df = spark.read.table("samples.wanderbricks.reviews")
proto_bytes_df = reviews_df.select(
to_protobuf(struct("review_id", "rating", "comment"), options=schema_registry_options).alias("proto_bytes")
)
# Deserialize binary Protobuf records back to a struct
reviews_restored_df = proto_bytes_df.select(
from_protobuf("proto_bytes", options=schema_registry_options).alias("proto_event")
)
display(reviews_restored_df)
import org.apache.spark.sql.protobuf.functions._
import org.apache.spark.sql.functions.struct
import scala.collection.JavaConverters._
val schemaRegistryOptions = Map(
"schema.registry.subject" -> "app-events-value",
"schema.registry.address" -> "https://schema-registry:8081/"
)
// Serialize Wanderbricks reviews to binary Protobuf using schema registry
val reviewsDF = spark.read.table("samples.wanderbricks.reviews")
val protoBytesDF = reviewsDF.select(
to_protobuf(struct($"review_id", $"rating", $"comment"), options = schemaRegistryOptions.asJava)
.as("proto_bytes")
)
// Deserialize binary Protobuf records back to a struct
val reviewsRestoredDF = protoBytesDF.select(
from_protobuf($"proto_bytes", options = schemaRegistryOptions.asJava)
.as("proto_event")
)
reviewsRestoredDF.show()
Autentique-se em um Confluent Schema Registry externo
Para autenticar em um Confluent Schema Registry externo, atualize as opções de registro de esquema para incluir credenciais de autenticação e a chave API.
- Python
- Scala
schema_registry_options = {
"schema.registry.subject" : "app-events-value",
"schema.registry.address" : "https://remote-schema-registry-endpoint",
"confluent.schema.registry.basic.auth.credentials.source" : "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" : "confluentApiKey:confluentApiSecret"
}
val schemaRegistryOptions = Map(
"schema.registry.subject" -> "app-events-value",
"schema.registry.address" -> "https://remote-schema-registry-endpoint",
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret"
)
Usar arquivos truststore e keystore nos volumes do Unity Catalog
Em Databricks Runtime 14.3 LTS e acima, o senhor pode usar arquivos truststore e keystore em volumes Unity Catalog para autenticar em um Confluent Schema Registry. Atualize suas opções de registro do esquema de acordo com o exemplo a seguir:
- Python
- Scala
schema_registry_options = {
"schema.registry.subject" : "app-events-value",
"schema.registry.address" : "https://remote-schema-registry-endpoint",
"confluent.schema.registry.ssl.truststore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
"confluent.schema.registry.ssl.truststore.password" : "<password>",
"confluent.schema.registry.ssl.keystore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
"confluent.schema.registry.ssl.keystore.password" : "<password>",
"confluent.schema.registry.ssl.key.password" : "<password>"
}
val schemaRegistryOptions = Map(
"schema.registry.subject" -> "app-events-value",
"schema.registry.address" -> "https://remote-schema-registry-endpoint",
"confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "<password>",
"confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
"confluent.schema.registry.ssl.keystore.password" -> "<password>",
"confluent.schema.registry.ssl.key.password" -> "<password>"
)
Usar o Protobuf com um arquivo descritor
O senhor também pode fazer referência a um arquivo descritor protobuf que esteja disponível para o seu clustering compute. Verifique se você tem as permissões adequadas para ler o arquivo, dependendo de sua localização.
- Python
- Scala
from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf
from pyspark.sql.functions import struct
descriptor_file = "/path/to/proto_descriptor.desc"
# Serialize Wanderbricks reviews to binary Protobuf using a descriptor file
reviews_df = spark.read.table("samples.wanderbricks.reviews")
proto_bytes_df = reviews_df.select(
to_protobuf(struct("review_id", "rating", "comment"), "Review", descriptor_file).alias("proto_bytes")
)
# Deserialize binary Protobuf records back to a struct
reviews_restored_df = proto_bytes_df.select(
from_protobuf("proto_bytes", "Review", descFilePath=descriptor_file).alias("review")
)
display(reviews_restored_df)
import org.apache.spark.sql.protobuf.functions._
import org.apache.spark.sql.functions.struct
val descriptorFile = "/path/to/proto_descriptor.desc"
// Serialize Wanderbricks reviews to binary Protobuf using a descriptor file
val reviewsDF = spark.read.table("samples.wanderbricks.reviews")
val protoBytesDF = reviewsDF.select(
to_protobuf(struct($"review_id", $"rating", $"comment"), "Review", descriptorFile).as("proto_bytes")
)
// Deserialize binary Protobuf records back to a struct
val reviewsRestoredDF = protoBytesDF.select(
from_protobuf($"proto_bytes", "Review", descFilePath=descriptorFile).as("review")
)
reviewsRestoredDF.show()
Recursos adicionais
- Ler e gravar dados de transmissão Avro: Se sua carga de trabalho de transmissão usar serialização Avro em vez de Protobuf, consulte as funções de transmissão Avro para as funções
from_avroeto_avroequivalentes.