Read and write protocol buffers

Databricks provides native support for serialization and deserialization between Apache Spark structs and protocol buffers (protobuf). Protobuf support is implemented as an Apache Spark DataFrame transformer and can be used with Structured Streaming or for batch operations.

How to deserialize and serialize protocol buffers

In Databricks Runtime 12.1 and above, you can use from_protobuf and to_protobuf functions to serialize and deserialize data. Protobuf serialization is commonly used in streaming workloads.

The basic syntax for protobuf functions is similar for read and write functions. You must import these functions before use.

from_protobuf casts a binary column to a struct, and to_protobuf casts a struct column to binary. You must provide either a schema registry specified with the options argument or a descriptor file identified by the descFilePath argument.

from_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)

to_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)
// While using with Schema registry:
from_protobuf(data: Column, options: Map[String, String])

// Or with Protobuf descriptor file:
from_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])

// While using with Schema registry:
to_protobuf(data: Column, options: Map[String, String])

// Or with Protobuf descriptor file:
to_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])

The following examples illustrate processing binary protobuf records with from_protobuf() and converting Spark SQL struct to binary protobuf with to_protobuf().

Use protobuf with Confluent Schema Registry

Databricks supports using the Confluent Schema Registry to define Protobuf.

from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf

schema_registry_options = {
  "schema.registry.subject" : "app-events-value",
  "schema.registry.address" : "https://schema-registry:8081/"
}

# Convert binary Protobuf to SQL struct with from_protobuf():
proto_events_df = (
  input_df
    .select(
      from_protobuf("proto_bytes", options = schema_registry_options)
        .alias("proto_event")
    )
)

# Convert SQL struct to binary Protobuf with to_protobuf():
protobuf_binary_df = (
  proto_events_df
    .selectExpr("struct(name, id, context) as event")
    .select(
      to_protobuf("event", options = schema_registry_options)
        .alias("proto_bytes")
    )
)
import org.apache.spark.sql.protobuf.functions._
import scala.collection.JavaConverters._

val schemaRegistryOptions = Map(
    "schema.registry.subject" -> "app-events-value",
    "schema.registry.address" -> "https://schema-registry:8081/"
)

// Convert binary Protobuf to SQL struct with from_protobuf():
val protoEventsDF = inputDF
    .select(
        from_protobuf($"proto_bytes", options = schemaRegistryOptions.asJava)
            .as("proto_event")
    )

// Convert SQL struct to binary Protobuf with to_protobuf():
val protobufBinaryDF = protoEventsDF
    .selectExpr("struct(name, id, context) as event")
    .select(
        to_protobuf($"event", options = schemaRegistryOptions.asJava)
            .as("proto_bytes")
    )

Authenticate to an external Confluent Schema Registry

To authenticate to an external Confluent Schema Registry, update your schema registry options to include auth credentials and API keys.

schema_registry_options = {
    "schema.registry.subject" : "app-events-value",
    "schema.registry.address" : "https://remote-schema-registry-endpoint",
    "confluent.schema.registry.basic.auth.credentials.source" : "USER_INFO",
    "confluent.schema.registry.basic.auth.user.info" : "confluentApiKey:confluentApiSecret"
  }
val schemaRegistryOptions = Map(
      "schema.registry.subject" -> "app-events-value",
      "schema.registry.address" -> "https://remote-schema-registry-endpoint",
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret"
)

Use truststore and keystore files in Unity Catalog volumes

In Databricks Runtime 14.3 LTS and above, you can use truststore and keystore files in Unity Catalog volumes to authenticate to a Confluent Schema Registry. Update your schema registry options according to the following example:

schema_registry_options = {
    "schema.registry.subject" : "app-events-value",
    "schema.registry.address" : "https://remote-schema-registry-endpoint",
    "confluent.schema.registry.ssl.truststore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
    "confluent.schema.registry.ssl.truststore.password" : "<password>",
    "confluent.schema.registry.ssl.keystore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
    "confluent.schema.registry.ssl.keystore.password" : "<password>",
    "confluent.schema.registry.ssl.key.password" : "<password>"
  }
val schemaRegistryOptions = Map(
      "schema.registry.subject" -> "app-events-value",
      "schema.registry.address" -> "https://remote-schema-registry-endpoint",
      "confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "<password>",
      "confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
      "confluent.schema.registry.ssl.keystore.password" -> "<password>",
      "confluent.schema.registry.ssl.key.password" -> "<password>"
)

Use Protobuf with a descriptor file

You can also reference a protobuf descriptor file that is available to your compute cluster. Make sure you have proper permissions to read the file, depending on its location.

from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf

descriptor_file = "/path/to/proto_descriptor.desc"

proto_events_df = (
    input_df.select(
      from_protobuf(input_df.value, "BasicMessage", descFilePath=descriptor_file).alias("proto")
    )
)

proto_binary_df = (
  proto_events_df
    .select(
      to_protobuf(proto_events_df.proto, "BasicMessage", descriptor_file).alias("bytes")
    )
)
import org.apache.spark.sql.protobuf.functions._

val descriptorFile = "/path/to/proto_descriptor.desc"

val protoEventsDF = inputDF
  .select(
    from_protobuf($"value", "BasicMessage", descFilePath=descriptorFile).as("proto")
  )

val protoBytesDF = protoEventsDF
  .select(
    to_protobuf($"proto", "BasicMessage", descriptorFile).as("bytes")
  )

Supported options in Protobuf functions

The following options are supported in Protobuf functions.

  • mode: Determines how errors while deserializing Protobuf records are handled. The errors might be caused by various types of malformed records including a mismatch between the actual schema of the record and the expected schema provided in from_protobuf().

    • Values:

      • FAILFAST(default): An error is thrown when a malformed record is encountered and the task fails.

      • PERMISSIVE: A NULL is returned for malformed records. Use this option carefully since it can result in dropping many records. This is useful when a small fraction of the records in the source are incorrect.

  • recursive.fields.max.depth: Adds support for recursive fields. Spark SQL schemas do not support recursive fields. When this option is not specified, recursive fields are not permitted. In order to support recursive fields in Protobufs, they need to be expanding to a specified depth.

    • Values:

      • -1 (default): Recursive fields are not allowed.

      • 0: Recursive fields are dropped.

      • 1: Allows a single level of recursion.

      • [2-10]: Specify a threshold for multiple recursion, up to 10 levels.

        Setting a value to greater than 0 allows recursive fields by expanding the nested fields to the configured depth. Values larger than 10 are not allowed in order to avoid inadvertently creating very large schemas. If a Protobuf message has depth beyond the configured limit, the Spark struct returned is truncated after the recursion limit.

    • Example: Consider a Protobuf with the following recursive field:

      message Person { string name = 1; Person friend = 2; }
      

      The following lists the end schema with different values for this setting:

      • Option set to 1: STRUCT<name: STRING>

      • Option set to 2: STRUCT<name STRING, friend: STRUCT<name: STRING>>

      • Option set to 3: STRUCT<name STRING, friend: STRUCT<name STRING, friend: STRUCT<name: STRING>>>

  • convert.any.fields.to.json: This option enables converting Protobuf Any fields to JSON. This feature should be enabled carefully. JSON conversion and processing are inefficient. In addition, the JSON string field loses Protobuf schema safety making downstream processing prone to errors.

    • Values:

      • False (default): At runtime, such wildcard fields can contain arbitrary Protobuf messages as binary data. By default such fields are handled like a normal Protobuf message. It has two fields with schema (STRUCT<type_url: STRING, value: BINARY>). By default, the binary value field is not interpreted in any way. But the binary data might not be convenient in practice to work in some applications.

      • True: Setting this value to True enables converting Any fields to JSON strings at runtime. With this option, the binary is parsed and the Protobuf message is deserialized into a JSON string.

    • Example: Consider two Protobuf types defined as follows:

      message ProtoWithAny {
         string event_name = 1;
         google.protobuf.Any details = 2;
      }
      
      message Person {
         string name = 1;
         int32 id = 2;
      }
      

      With this option enabled, the schema for from_protobuf("col", messageName ="ProtoWithAny") would be: STRUCT<event_name: STRING, details: STRING>.

      At run time, if details field contains Person Protobuf message, the returned value looks like this: ('click', '{"@type":"type.googleapis.com/...ProtoWithAny","name":"Mario","id":100}').

    • Requirements:

      • The definitions for all the possible Protobuf types that are used in Any fields should be available in the Protobuf descriptor file passed to from_protobuf().

      • If Any Protobuf is not found, it will result in an error for that record.

      • This feature is currently not supported with schema-registry.

  • emit.default.values: Enables rendering fields with zero values when deserializing Protobuf to a Spark struct. This option should be used sparingly. It is usually not advisable to depend on such finer differences in semantics.

    • Values

      • False (default): When a field is empty in the serialized Protobuf, the resulting field in the Spark struct is by default null. It is simpler to not enable this option and treat null as the default value.

      • True: When this option is enabled, such fields are filled with corresponding default values.

    • Example: Consider the following Protobuf with the Protobuf constructed like Person(age=0, middle_name=""):

      syntax = "proto3";
      
      message Person {
         string name = 1;
         int64 age = 2;
         optional string middle_name = 3;
         optional int64 salary = 4;
      }
      
      • With this option set to False, the Spark struct after calling from_protobuf() would be all nulls: {"name": null, "age": null, "middle_name": "", "salary": null}. Even though two fields (age and middle_name) had values set, Protobuf does not include them in wire-format since they are default values.

      • With this option set to True, the Spark struct after calling from_protobuf() would be: {"name": "", "age": 0, "middle_name": "", "salary": null}. The salary field remains null since it is explicitly declared optional and it is not set in the input record.

  • enums.as.ints: When enabled, enum fields in Protobuf are rendered as integer fields in Spark.

    • Values

      • False (default)

      • True: When enabled, enum fields in Protobuf are rendered as integer fields in Spark.

    • Example: Consider the following Protobuf:

      syntax = "proto3";
      
      message Person {
         enum Job {
           NONE = 0;
           ENGINEER = 1;
           DOCTOR = 2;
           NURSE = 3;
         }
         Job job = 1;
      }
      

      Given a Protobuf message like Person(job = ENGINEER):

      • With this option disabled, the corresponding Spark struct would be {"job": "ENGINEER"}.

      • With this option enabled, the corresponding Spark struct would be {"job": 1}.

      Notice that the schema for these fields is different in each case (integer rather than default string). Such a change can affect the schema of downstream tables.

Schema Registry Options

The following schema registry options are relevant while using schema registry with Protobuf functions.

  • schema.registry.subject

    • Required

    • Specifies subject for schema in Schema Registry, such as “client-event”

  • schema.registry.address

    • Required

    • URL for schema registry, such as https://schema-registry.example.com:8081

  • schema.registry.protobuf.name

    • Optional

    • Default: <NONE>.

    • A schema-registry entry for a subject can contain multiple Protobuf definitions, just like a single proto file. When this option is not specified, the first Protobuf is used for the schema. Specify the name of the Protobuf message when it is not the first one in the entry. For example, consider an entry with two Protobuf definitions: “Person” and “Location” in that order. If the stream corresponds to “Location” rather than “Person”, set this option to “Location” (or its full name including package “com.example.protos.Location”).

  • schema.registry.schema.evolution.mode

    • Default: “restart”.

    • Supported modes:

      • “restart”

      • “none”

    • This option sets schema-evolution mode for from_protobuf(). At the start of a query, Spark records the latest schema-id for the given subject. This determines the schema for from_protobuf(). A new schema might be published to the schema registry after the query starts. When a newer schema-id is noticed in an incoming record, it indicates a change to the schema. This option determines how such a change to schema is handled:

      • restart (default): Triggers an UnknownFieldException when a newer schema-id is noticed. This terminates the query. Databricks recommends configuring workflows to restart on query failure to pick up schema changes.

      • none: Schema-id changes are ignored. The records with newer schema-id are parsed with the same schema that was observed at the start of the query. Newer Protobuf definitions are expected to be backward compatible, and new fields are ignored.

  • confluent.schema.registry.<schema-registy-client-option>

    • Optional

    • Schema-registry connects to Confluent schema-registry using the Confluent Schema Registry client. Any configuration options supported by the client can be specified with the prefix “confluent.schema.registry”. For example, the following two settings provide “USER_INFO” authentication credentials:

      • “confluent.schema.registry.basic.auth.credentials.source”: ‘USER_INFO’

      • “confluent.schema.registry.basic.auth.user.info”: “<KEY> : <SECRET>