プロトコルバッファの読み取りと書き込み
プロトコル バッファー (protobuf) は、Google によって開発された、言語に依存しないバイナリシリアル化形式です。Databricks ユーザーは、Apache Kafka などのイベントストリーミングシステムからのバイナリエンコードされたレコードを処理する際に、多くの場合遭遇します。Databricks は、Apache Spark を使用した protobuf データの読み取りと書き込みを、バイナリ protobuf と Spark SQL 構造体タイプ間を変換する from_protobuf および to_protobuf 関数を介してサポートしており、ストリーミングおよびバッチ ワークロードの両方に対応しています。
前提条件
Protobuf 関数には Databricks Runtime 12.2 LTS 以降が必要です。
関数構文
from_protobuf を使用してバイナリ列を構造体にキャストし、to_protobuf を使用して構造体列をバイナリにキャストします。descFilePath 引数で識別される記述子ファイル、または options 引数で指定されるスキーマレジストリのいずれかを提供する必要があります。オプションの完全なリストについては、Protobufをご覧ください。
- Python
- Scala
from_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)
to_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)
// While using with Schema registry:
from_protobuf(data: Column, options: Map[String, String])
// Or with Protobuf descriptor file:
from_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])
// While using with Schema registry:
to_protobuf(data: Column, options: Map[String, String])
// Or with Protobuf descriptor file:
to_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])
オプション
from_protobuf と to_protobuf に、options 引数を使用してオプションを渡します。サポートされているオプションの完全なリストについては、Protobufを参照してください。
スキーマレジストリオプション
以下のオプションはスキーマレジストリの使用に特有のものであり、一般的なオプションのリファレンスには記載されていません。
オプション | 必須 | デフォルト | 説明 |
|---|---|---|---|
| No |
| 受信レコードでより新しいスキーマIDが検出された場合のスキーマ変更の処理方法。 |
| No | — | プレフィックス |
使い方
以下の例では、Wanderbricks データセット を使用し、to_protobuf() で Apache Spark 構造体をバイナリ protobuf にシリアル化し、from_protobuf() でバイナリ protobuf レコードを逆シリアル化する方法を示します。
protobuf と Confluent Schema Registry の併用
Databricks では、 Confluent Schema Registry を使用した Protobuf の定義がサポートされています。
- Python
- Scala
from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf
from pyspark.sql.functions import struct
schema_registry_options = {
"schema.registry.subject" : "app-events-value",
"schema.registry.address" : "https://schema-registry:8081/"
}
# Serialize Wanderbricks reviews to binary Protobuf using schema registry
reviews_df = spark.read.table("samples.wanderbricks.reviews")
proto_bytes_df = reviews_df.select(
to_protobuf(struct("review_id", "rating", "comment"), options=schema_registry_options).alias("proto_bytes")
)
# Deserialize binary Protobuf records back to a struct
reviews_restored_df = proto_bytes_df.select(
from_protobuf("proto_bytes", options=schema_registry_options).alias("proto_event")
)
display(reviews_restored_df)
import org.apache.spark.sql.protobuf.functions._
import org.apache.spark.sql.functions.struct
import scala.collection.JavaConverters._
val schemaRegistryOptions = Map(
"schema.registry.subject" -> "app-events-value",
"schema.registry.address" -> "https://schema-registry:8081/"
)
// Serialize Wanderbricks reviews to binary Protobuf using schema registry
val reviewsDF = spark.read.table("samples.wanderbricks.reviews")
val protoBytesDF = reviewsDF.select(
to_protobuf(struct($"review_id", $"rating", $"comment"), options = schemaRegistryOptions.asJava)
.as("proto_bytes")
)
// Deserialize binary Protobuf records back to a struct
val reviewsRestoredDF = protoBytesDF.select(
from_protobuf($"proto_bytes", options = schemaRegistryOptions.asJava)
.as("proto_event")
)
reviewsRestoredDF.show()
外部の Confluent Schema Registry に対する認証
外部の Confluent Schema Registry に対して認証を行うには、認証認証情報と API キーが含まれるようにスキーマレジストリオプションを更新します。
- Python
- Scala
schema_registry_options = {
"schema.registry.subject" : "app-events-value",
"schema.registry.address" : "https://remote-schema-registry-endpoint",
"confluent.schema.registry.basic.auth.credentials.source" : "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" : "confluentApiKey:confluentApiSecret"
}
val schemaRegistryOptions = Map(
"schema.registry.subject" -> "app-events-value",
"schema.registry.address" -> "https://remote-schema-registry-endpoint",
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret"
)
トラストストアとキーストアのファイルを Unity Catalog ボリュームで使用する
Databricks Runtime 14.3 LTS 以降では、Unity Catalog ボリューム内のトラストストア ファイルとキーストア ファイルを使用して、Confluent スキーマ レジストリに対する認証を行うことができます。 次の例に従って、スキーマレジストリオプションを更新します。
- Python
- Scala
schema_registry_options = {
"schema.registry.subject" : "app-events-value",
"schema.registry.address" : "https://remote-schema-registry-endpoint",
"confluent.schema.registry.ssl.truststore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
"confluent.schema.registry.ssl.truststore.password" : "<password>",
"confluent.schema.registry.ssl.keystore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
"confluent.schema.registry.ssl.keystore.password" : "<password>",
"confluent.schema.registry.ssl.key.password" : "<password>"
}
val schemaRegistryOptions = Map(
"schema.registry.subject" -> "app-events-value",
"schema.registry.address" -> "https://remote-schema-registry-endpoint",
"confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "<password>",
"confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
"confluent.schema.registry.ssl.keystore.password" -> "<password>",
"confluent.schema.registry.ssl.key.password" -> "<password>"
)
記述子ファイルでの Protobuf の使用
また、コンピュート クラスターで使用できる protobuf 記述子ファイルを参照することもできます。 ファイルの場所に応じて、ファイルを読み取るための適切な権限があることを確認してください。
- Python
- Scala
from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf
from pyspark.sql.functions import struct
descriptor_file = "/path/to/proto_descriptor.desc"
# Serialize Wanderbricks reviews to binary Protobuf using a descriptor file
reviews_df = spark.read.table("samples.wanderbricks.reviews")
proto_bytes_df = reviews_df.select(
to_protobuf(struct("review_id", "rating", "comment"), "Review", descriptor_file).alias("proto_bytes")
)
# Deserialize binary Protobuf records back to a struct
reviews_restored_df = proto_bytes_df.select(
from_protobuf("proto_bytes", "Review", descFilePath=descriptor_file).alias("review")
)
display(reviews_restored_df)
import org.apache.spark.sql.protobuf.functions._
import org.apache.spark.sql.functions.struct
val descriptorFile = "/path/to/proto_descriptor.desc"
// Serialize Wanderbricks reviews to binary Protobuf using a descriptor file
val reviewsDF = spark.read.table("samples.wanderbricks.reviews")
val protoBytesDF = reviewsDF.select(
to_protobuf(struct($"review_id", $"rating", $"comment"), "Review", descriptorFile).as("proto_bytes")
)
// Deserialize binary Protobuf records back to a struct
val reviewsRestoredDF = protoBytesDF.select(
from_protobuf($"proto_bytes", "Review", descFilePath=descriptorFile).as("review")
)
reviewsRestoredDF.show()
その他のリソース
- ストリーミング Avro データの読み書き:ストリーミングワークロードが Protobuf ではなく Avro シリアル化を使用する場合は、同等の
from_avroおよびto_avro関数については Avro ストリーミング関数を参照してください。