Read and write protocol buffers
Databricks provides native support for serialization and deserialization between Apache Spark structs and protocol buffers (protobuf). Protobuf support is implemented as an Apache Spark DataFrame transformer and can be used with Structured Streaming or for batch operations.
How to deserialize and serialize protocol buffers
In Databricks Runtime 12.2 LTS and above, you can use from_protobuf
and to_protobuf
functions to serialize and deserialize data. Protobuf serialization is commonly used in streaming workloads.
The basic syntax for protobuf functions is similar for read and write functions. You must import these functions before use.
from_protobuf
casts a binary column to a struct, and to_protobuf
casts a struct column to binary. You must provide either a schema registry specified with the options
argument or a descriptor file identified by the descFilePath
argument.
from_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)
to_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)
// While using with Schema registry:
from_protobuf(data: Column, options: Map[String, String])
// Or with Protobuf descriptor file:
from_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])
// While using with Schema registry:
to_protobuf(data: Column, options: Map[String, String])
// Or with Protobuf descriptor file:
to_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])
The following examples illustrate processing binary protobuf records with from_protobuf()
and converting Spark SQL struct to binary protobuf with to_protobuf()
.
Use protobuf with Confluent Schema Registry
Databricks supports using the Confluent Schema Registry to define Protobuf.
from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf
schema_registry_options = {
"schema.registry.subject" : "app-events-value",
"schema.registry.address" : "https://schema-registry:8081/"
}
# Convert binary Protobuf to SQL struct with from_protobuf():
proto_events_df = (
input_df
.select(
from_protobuf("proto_bytes", options = schema_registry_options)
.alias("proto_event")
)
)
# Convert SQL struct to binary Protobuf with to_protobuf():
protobuf_binary_df = (
proto_events_df
.selectExpr("struct(name, id, context) as event")
.select(
to_protobuf("event", options = schema_registry_options)
.alias("proto_bytes")
)
)
import org.apache.spark.sql.protobuf.functions._
import scala.collection.JavaConverters._
val schemaRegistryOptions = Map(
"schema.registry.subject" -> "app-events-value",
"schema.registry.address" -> "https://schema-registry:8081/"
)
// Convert binary Protobuf to SQL struct with from_protobuf():
val protoEventsDF = inputDF
.select(
from_protobuf($"proto_bytes", options = schemaRegistryOptions.asJava)
.as("proto_event")
)
// Convert SQL struct to binary Protobuf with to_protobuf():
val protobufBinaryDF = protoEventsDF
.selectExpr("struct(name, id, context) as event")
.select(
to_protobuf($"event", options = schemaRegistryOptions.asJava)
.as("proto_bytes")
)
Authenticate to an external Confluent Schema Registry
To authenticate to an external Confluent Schema Registry, update your schema registry options to include auth credentials and API keys.
schema_registry_options = {
"schema.registry.subject" : "app-events-value",
"schema.registry.address" : "https://remote-schema-registry-endpoint",
"confluent.schema.registry.basic.auth.credentials.source" : "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" : "confluentApiKey:confluentApiSecret"
}
val schemaRegistryOptions = Map(
"schema.registry.subject" -> "app-events-value",
"schema.registry.address" -> "https://remote-schema-registry-endpoint",
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret"
)
Use truststore and keystore files in Unity Catalog volumes
In Databricks Runtime 14.3 LTS and above, you can use truststore and keystore files in Unity Catalog volumes to authenticate to a Confluent Schema Registry. Update your schema registry options according to the following example:
schema_registry_options = {
"schema.registry.subject" : "app-events-value",
"schema.registry.address" : "https://remote-schema-registry-endpoint",
"confluent.schema.registry.ssl.truststore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
"confluent.schema.registry.ssl.truststore.password" : "<password>",
"confluent.schema.registry.ssl.keystore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
"confluent.schema.registry.ssl.keystore.password" : "<password>",
"confluent.schema.registry.ssl.key.password" : "<password>"
}
val schemaRegistryOptions = Map(
"schema.registry.subject" -> "app-events-value",
"schema.registry.address" -> "https://remote-schema-registry-endpoint",
"confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "<password>",
"confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
"confluent.schema.registry.ssl.keystore.password" -> "<password>",
"confluent.schema.registry.ssl.key.password" -> "<password>"
)
Use Protobuf with a descriptor file
You can also reference a protobuf descriptor file that is available to your compute cluster. Make sure you have proper permissions to read the file, depending on its location.
from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf
descriptor_file = "/path/to/proto_descriptor.desc"
proto_events_df = (
input_df.select(
from_protobuf(input_df.value, "BasicMessage", descFilePath=descriptor_file).alias("proto")
)
)
proto_binary_df = (
proto_events_df
.select(
to_protobuf(proto_events_df.proto, "BasicMessage", descriptor_file).alias("bytes")
)
)
import org.apache.spark.sql.protobuf.functions._
val descriptorFile = "/path/to/proto_descriptor.desc"
val protoEventsDF = inputDF
.select(
from_protobuf($"value", "BasicMessage", descFilePath=descriptorFile).as("proto")
)
val protoBytesDF = protoEventsDF
.select(
to_protobuf($"proto", "BasicMessage", descriptorFile).as("bytes")
)
Supported options in Protobuf functions
The following options are supported in Protobuf functions.
mode: Determines how errors while deserializing Protobuf records are handled. The errors might be caused by various types of malformed records including a mismatch between the actual schema of the record and the expected schema provided in
from_protobuf()
.Values:
FAILFAST
(default): An error is thrown when a malformed record is encountered and the task fails.PERMISSIVE
: A NULL is returned for malformed records. Use this option carefully since it can result in dropping many records. This is useful when a small fraction of the records in the source are incorrect.
recursive.fields.max.depth: Adds support for recursive fields. Spark SQL schemas do not support recursive fields. When this option is not specified, recursive fields are not permitted. In order to support recursive fields in Protobufs, they need to be expanding to a specified depth.
Values:
-1 (default): Recursive fields are not allowed.
0: Recursive fields are dropped.
1: Allows a single level of recursion.
[2-10]: Specify a threshold for multiple recursion, up to 10 levels.
Setting a value to greater than 0 allows recursive fields by expanding the nested fields to the configured depth. Values larger than 10 are not allowed in order to avoid inadvertently creating very large schemas. If a Protobuf message has depth beyond the configured limit, the Spark struct returned is truncated after the recursion limit.
Example: Consider a Protobuf with the following recursive field:
message Person { string name = 1; Person friend = 2; }
The following lists the end schema with different values for this setting:
Option set to 1:
STRUCT<name: STRING>
Option set to 2:
STRUCT<name STRING, friend: STRUCT<name: STRING>>
Option set to 3:
STRUCT<name STRING, friend: STRUCT<name STRING, friend: STRUCT<name: STRING>>>
convert.any.fields.to.json: This option enables converting Protobuf Any fields to JSON. This feature should be enabled carefully. JSON conversion and processing are inefficient. In addition, the JSON string field loses Protobuf schema safety making downstream processing prone to errors.
Values:
False (default): At runtime, such wildcard fields can contain arbitrary Protobuf messages as binary data. By default such fields are handled like a normal Protobuf message. It has two fields with schema
(STRUCT<type_url: STRING, value: BINARY>)
. By default, the binaryvalue
field is not interpreted in any way. But the binary data might not be convenient in practice to work in some applications.True: Setting this value to True enables converting
Any
fields to JSON strings at runtime. With this option, the binary is parsed and the Protobuf message is deserialized into a JSON string.
Example: Consider two Protobuf types defined as follows:
message ProtoWithAny { string event_name = 1; google.protobuf.Any details = 2; } message Person { string name = 1; int32 id = 2; }
With this option enabled, the schema for
from_protobuf("col", messageName ="ProtoWithAny")
would be:STRUCT<event_name: STRING, details: STRING>
.At run time, if
details
field containsPerson
Protobuf message, the returned value looks like this:('click', '{"@type":"type.googleapis.com/...ProtoWithAny","name":"Mario","id":100}')
.Requirements:
The definitions for all the possible Protobuf types that are used in
Any
fields should be available in the Protobuf descriptor file passed tofrom_protobuf()
.If
Any
Protobuf is not found, it will result in an error for that record.This feature is currently not supported with schema-registry.
emit.default.values: Enables rendering fields with zero values when deserializing Protobuf to a Spark struct. This option should be used sparingly. It is usually not advisable to depend on such finer differences in semantics.
Values
False (default): When a field is empty in the serialized Protobuf, the resulting field in the Spark struct is by default null. It is simpler to not enable this option and treat
null
as the default value.True: When this option is enabled, such fields are filled with corresponding default values.
Example: Consider the following Protobuf with the Protobuf constructed like
Person(age=0, middle_name="")
:syntax = "proto3"; message Person { string name = 1; int64 age = 2; optional string middle_name = 3; optional int64 salary = 4; }
With this option set to False, the Spark struct after calling
from_protobuf()
would be all nulls:{"name": null, "age": null, "middle_name": "", "salary": null}
. Even though two fields (age
andmiddle_name
) had values set, Protobuf does not include them in wire-format since they are default values.With this option set to True, the Spark struct after calling
from_protobuf()
would be:{"name": "", "age": 0, "middle_name": "", "salary": null}
. Thesalary
field remains null since it is explicitly declaredoptional
and it is not set in the input record.
enums.as.ints: When enabled, enum fields in Protobuf are rendered as integer fields in Spark.
Values
False (default)
True: When enabled, enum fields in Protobuf are rendered as integer fields in Spark.
Example: Consider the following Protobuf:
syntax = "proto3"; message Person { enum Job { NONE = 0; ENGINEER = 1; DOCTOR = 2; NURSE = 3; } Job job = 1; }
Given a Protobuf message like
Person(job = ENGINEER)
:With this option disabled, the corresponding Spark struct would be
{"job": "ENGINEER"}
.With this option enabled, the corresponding Spark struct would be
{"job": 1}
.
Notice that the schema for these fields is different in each case (integer rather than default string). Such a change can affect the schema of downstream tables.
Schema Registry Options
The following schema registry options are relevant while using schema registry with Protobuf functions.
schema.registry.subject
Required
Specifies subject for schema in Schema Registry, such as “client-event”
schema.registry.address
Required
URL for schema registry, such as
https://schema-registry.example.com:8081
schema.registry.protobuf.name
Optional
Default:
<NONE>
.A schema-registry entry for a subject can contain multiple Protobuf definitions, just like a single
proto
file. When this option is not specified, the first Protobuf is used for the schema. Specify the name of the Protobuf message when it is not the first one in the entry. For example, consider an entry with two Protobuf definitions: “Person” and “Location” in that order. If the stream corresponds to “Location” rather than “Person”, set this option to “Location” (or its full name including package “com.example.protos.Location”).
schema.registry.schema.evolution.mode
Default: “restart”.
Supported modes:
“restart”
“none”
This option sets schema-evolution mode for
from_protobuf()
. At the start of a query, Spark records the latest schema-id for the given subject. This determines the schema forfrom_protobuf()
. A new schema might be published to the schema registry after the query starts. When a newer schema-id is noticed in an incoming record, it indicates a change to the schema. This option determines how such a change to schema is handled:restart (default): Triggers an
UnknownFieldException
when a newer schema-id is noticed. This terminates the query. Databricks recommends configuring jobs to restart on query failure to pick up schema changes.none: Schema-id changes are ignored. The records with newer schema-id are parsed with the same schema that was observed at the start of the query. Newer Protobuf definitions are expected to be backward compatible, and new fields are ignored.
confluent.schema.registry.
<schema-registy-client-option>
Optional
Schema-registry connects to Confluent schema-registry using the Confluent Schema Registry client. Any configuration options supported by the client can be specified with the prefix “confluent.schema.registry”. For example, the following two settings provide “USER_INFO” authentication credentials:
“confluent.schema.registry.basic.auth.credentials.source”: ‘USER_INFO’
“confluent.schema.registry.basic.auth.user.info”: “
<KEY>
:<SECRET>
”