プロトコルバッファの読み取りと書き込み
Databricks は、Apache Spark 構造体とプロトコル バッファー (protobuf) 間のシリアル化と逆シリアル化をネイティブにサポートします。 Protobuf のサポートは Apache Spark データフレーム トランスフォーマーとして実装され、構造化ストリーミングまたはバッチ操作で使用できます。
プロトコル バッファーの逆シリアル化とシリアル化の方法
Databricks Runtime 12.2 LTS 以降では、 from_protobuf
関数と to_protobuf
関数を使用してデータをシリアル化および逆シリアル化できます。 Protobuf シリアル化は、ストリーミング ワークロードで一般的に使用されます。
protobuf関数の基本的な構文は、読み取り関数と書き込み関数で似ています。 これらの関数は使用する前にインポートする必要があります。
from_protobuf
バイナリ列を構造体にキャストし、to_protobuf
は構造体列をバイナリにキャストします。 options
引数で指定されたスキーマレジストリか、descFilePath
引数で指定された記述子ファイルのいずれかを指定する必要があります。
- Python
- Scala
from_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)
to_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)
// While using with Schema registry:
from_protobuf(data: Column, options: Map[String, String])
// Or with Protobuf descriptor file:
from_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])
// While using with Schema registry:
to_protobuf(data: Column, options: Map[String, String])
// Or with Protobuf descriptor file:
to_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])
次の例は、 from_protobuf()
を使用してバイナリ protobuf レコードを処理し、 to_protobuf()
を使用して Spark SQL 構造体をバイナリ protobuf に変換する方法を示しています。
protobuf と Confluent Schema Registry の併用
Databricks では、 Confluent Schema Registry を使用した Protobuf の定義がサポートされています。
- Python
- Scala
from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf
schema_registry_options = {
"schema.registry.subject" : "app-events-value",
"schema.registry.address" : "https://schema-registry:8081/"
}
# Convert binary Protobuf to SQL struct with from_protobuf():
proto_events_df = (
input_df
.select(
from_protobuf("proto_bytes", options = schema_registry_options)
.alias("proto_event")
)
)
# Convert SQL struct to binary Protobuf with to_protobuf():
protobuf_binary_df = (
proto_events_df
.selectExpr("struct(name, id, context) as event")
.select(
to_protobuf("event", options = schema_registry_options)
.alias("proto_bytes")
)
)
import org.apache.spark.sql.protobuf.functions._
import scala.collection.JavaConverters._
val schemaRegistryOptions = Map(
"schema.registry.subject" -> "app-events-value",
"schema.registry.address" -> "https://schema-registry:8081/"
)
// Convert binary Protobuf to SQL struct with from_protobuf():
val protoEventsDF = inputDF
.select(
from_protobuf($"proto_bytes", options = schemaRegistryOptions.asJava)
.as("proto_event")
)
// Convert SQL struct to binary Protobuf with to_protobuf():
val protobufBinaryDF = protoEventsDF
.selectExpr("struct(name, id, context) as event")
.select(
to_protobuf($"event", options = schemaRegistryOptions.asJava)
.as("proto_bytes")
)
外部の Confluent Schema Registry に対する認証
外部の Confluent Schema Registry に対して認証を行うには、認証認証情報と API キーが含まれるようにスキーマレジストリオプションを更新します。
- Python
- Scala
schema_registry_options = {
"schema.registry.subject" : "app-events-value",
"schema.registry.address" : "https://remote-schema-registry-endpoint",
"confluent.schema.registry.basic.auth.credentials.source" : "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" : "confluentApiKey:confluentApiSecret"
}
val schemaRegistryOptions = Map(
"schema.registry.subject" -> "app-events-value",
"schema.registry.address" -> "https://remote-schema-registry-endpoint",
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret"
)
トラストストアとキーストアのファイルを Unity Catalog ボリュームで使用する
Databricks Runtime 14.3 LTS 以降では、Unity Catalog ボリューム内のトラストストア ファイルとキーストア ファイルを使用して、Confluent スキーマ レジストリに対する認証を行うことができます。 次の例に従って、スキーマレジストリオプションを更新します。
- Python
- Scala
schema_registry_options = {
"schema.registry.subject" : "app-events-value",
"schema.registry.address" : "https://remote-schema-registry-endpoint",
"confluent.schema.registry.ssl.truststore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
"confluent.schema.registry.ssl.truststore.password" : "<password>",
"confluent.schema.registry.ssl.keystore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
"confluent.schema.registry.ssl.keystore.password" : "<password>",
"confluent.schema.registry.ssl.key.password" : "<password>"
}
val schemaRegistryOptions = Map(
"schema.registry.subject" -> "app-events-value",
"schema.registry.address" -> "https://remote-schema-registry-endpoint",
"confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "<password>",
"confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
"confluent.schema.registry.ssl.keystore.password" -> "<password>",
"confluent.schema.registry.ssl.key.password" -> "<password>"
)
記述子ファイルでの Protobuf の使用
また、コンピュート クラスターで使用できる protobuf 記述子ファイルを参照することもできます。 ファイルの場所に応じて、ファイルを読み取るための適切な権限があることを確認してください。
- Python
- Scala
from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf
descriptor_file = "/path/to/proto_descriptor.desc"
proto_events_df = (
input_df.select(
from_protobuf(input_df.value, "BasicMessage", descFilePath=descriptor_file).alias("proto")
)
)
proto_binary_df = (
proto_events_df
.select(
to_protobuf(proto_events_df.proto, "BasicMessage", descriptor_file).alias("bytes")
)
)
import org.apache.spark.sql.protobuf.functions._
val descriptorFile = "/path/to/proto_descriptor.desc"
val protoEventsDF = inputDF
.select(
from_protobuf($"value", "BasicMessage", descFilePath=descriptorFile).as("proto")
)
val protoBytesDF = protoEventsDF
.select(
to_protobuf($"proto", "BasicMessage", descriptorFile).as("bytes")
)
Protobuf 関数でサポートされているオプション
Protobuf 関数では、次のオプションがサポートされています。
-
mode : Protobuf レコードの逆シリアル化中のエラーの処理方法を決定します。 エラーは、レコードの実際のスキーマと で提供される予期されるスキーマとの間の不一致など、さまざまな種類の不正なレコードが原因で発生する可能性があります
from_protobuf()
。- 値 :
FAILFAST
(デフォルト): 不正な形式のレコードが検出され、タスクが失敗すると、エラーがスローされます。PERMISSIVE
: 不正な形式のレコードに対して NULL が返されます。 このオプションは、多くのレコードがドロップされる可能性があるため、慎重に使用してください。 これは、ソース内のレコードのごく一部が正しくない場合に便利です。
- 値 :
-
recursive.fields.max.depth : 再帰フィールドのサポートを追加します。 Spark SQL スキーマは、再帰フィールドをサポートしていません。 このオプションを指定しない場合、再帰的なフィールドは許可されません。 Protobufsで再帰フィールドをサポートするには、指定された深さまで拡張する必要があります。
-
値 :
-
-1 (デフォルト): 再帰的なフィールドは許可されません。
-
0: 再帰フィールドは削除されます。
-
1: 単一レベルの再帰を許可します。
-
[2-10]:複数の再帰のしきい値を 10 レベルまで指定します。
値を 0 より大きい値に設定すると、ネストされたフィールドを設定された深さまで拡張することで、フィールドを再帰的にすることができます。 10 より大きい値は、非常に大きなスキーマを誤って作成しないようにするために許可されません。 Protobuf メッセージの深さが構成された制限を超える場合、返される Spark 構造体は再帰制限の後に切り捨てられます。
-
-
例 : 次の再帰フィールドを持つ Protobuf について考えてみます。
message Person { string name = 1; Person friend = 2; }
以下は、この設定のさまざまな値を持つエンドスキーマを示しています。
- オプションを 1 に設定:
STRUCT<name: STRING>
- オプションを 2 に設定:
STRUCT<name STRING, friend: STRUCT<name: STRING>>
- オプションを 3 に設定:
STRUCT<name STRING, friend: STRUCT<name STRING, friend: STRUCT<name: STRING>>>
- オプションを 1 に設定:
-
-
convert.any.fields.to.JSON :このオプションを使用すると、Protobuf Any フィールドを JSON に変換できます。 この機能は慎重に有効にする必要があります。 JSON の変換と処理は非効率的です。 さらに、JSON 文字列フィールドは Protobuf スキーマの安全性を失い、ダウンストリーム処理でエラーが発生しやすくなります。
-
値 :
- False (デフォルト): ランタイムでは、このようなワイルドカード フィールドには、バイナリ データとして任意の Protobuf メッセージを含めることができます。 デフォルトでは、このようなフィールドは通常の Protobuf メッセージと同様に処理されます。 スキーマ
(STRUCT<type_url: STRING, value: BINARY>)
を持つ 2 つのフィールドがあります。 デフォルトでは、バイナリvalue
フィールドはまったく解釈されません。 ただし、バイナリ データは、実際には一部のアプリケーションで動作するのに便利ではない場合があります。 - True: この値を True に設定すると、実行時に
Any
フィールドを JSON 文字列に変換できます。 このオプションを使用すると、バイナリが解析され、Protobuf メッセージが JSON 文字列に逆シリアル化されます。
- False (デフォルト): ランタイムでは、このようなワイルドカード フィールドには、バイナリ データとして任意の Protobuf メッセージを含めることができます。 デフォルトでは、このようなフィールドは通常の Protobuf メッセージと同様に処理されます。 スキーマ
-
例 : 次のように定義された 2 つの Protobuf タイプについて考えてみます。
message ProtoWithAny {
string event_name = 1;
google.protobuf.Any details = 2;
}
message Person {
string name = 1;
int32 id = 2;
}このオプションを有効にすると、
from_protobuf("col", messageName ="ProtoWithAny")
のスキーマはSTRUCT<event_name: STRING, details: STRING>
になります。実行時に、
details
フィールドに Protobuf メッセージが含まれている場合Person
戻り値は (('click', '{"@type":"type.googleapis.com/...ProtoWithAny","name":"Mario","id":100}')
) のようになります。 -
必要条件 :
Any
フィールドで使用されるすべての Protobuf タイプの定義は、from_protobuf()
に渡される Protobuf 記述子ファイルで使用できる必要があります。- Protobuf
Any
見つからない場合、そのレコードでエラーが発生します。 - この機能は現在、schema-registry ではサポートされていません。
-
-
emit.デフォルト.values : Protobuf を Spark 構造体に逆シリアル化するときに、値が 0 のフィールドのレンダリングを有効にします。 このオプションは慎重に使用してください。 通常、このようなセマンティクスの細かい違いに依存することはお勧めできません。
-
値
- False (デフォルト): シリアル化された Protobufのフィールドが空の場合、 Spark 構造体の結果のフィールドはデフォルト null になります。 このオプションを有効にせず、
null
デフォルト値として扱う方が簡単です。 - True: このオプションを有効にすると、そのようなフィールドには対応するデフォルト値が入力されます。
- False (デフォルト): シリアル化された Protobufのフィールドが空の場合、 Spark 構造体の結果のフィールドはデフォルト null になります。 このオプションを有効にせず、
-
例 : 次のように構成された Protobuf を持つ次の Protobuf について考えてみます
Person(age=0, middle_name="")
syntax = "proto3";
message Person {
string name = 1;
int64 age = 2;
optional string middle_name = 3;
optional int64 salary = 4;
}- このオプションを False に設定すると、
from_protobuf()
を呼び出した後の Spark 構造体はすべて null になります ({"name": null, "age": null, "middle_name": "", "salary": null}
. 2 つのフィールド (age
とmiddle_name
) に値が設定されていても、 Protobuf はデフォルトの値であるため、ワイヤ形式では含めません。 - このオプションを True に設定すると、
from_protobuf()
を呼び出した後の Spark 構造体は{"name": "", "age": 0, "middle_name": "", "salary": null}
になります。salary
フィールドは、optional
明示的に宣言され、入力レコードで設定されていないため、ヌルのままです。
- このオプションを False に設定すると、
-
-
enums.as.ints: 有効にすると、Protobuf の列挙型フィールドは Spark で整数フィールドとしてレンダリングされます。
-
値
- False (デフォルト)
- True: 有効にすると、Protobuf の列挙型フィールドは Spark の整数フィールドとしてレンダリングされます。
-
例 : 次の Protobuf について考えてみます。
syntax = "proto3";
message Person {
enum Job {
NONE = 0;
ENGINEER = 1;
DOCTOR = 2;
NURSE = 3;
}
Job job = 1;
}次のような Protobuf メッセージがあるとします
Person(job = ENGINEER)
- このオプションを無効にすると、対応する Spark 構造体は
{"job": "ENGINEER"}
になります。 - このオプションを有効にすると、対応する Spark 構造体は
{"job": 1}
になります。
これらのフィールドのスキーマは、それぞれのケースで異なることに注意してください (デフォルトの文字列ではなく整数)。 このような変更は、ダウンストリームテーブルのスキーマに影響を与える可能性があります。
- このオプションを無効にすると、対応する Spark 構造体は
-
スキーマレジストリオプション
次のスキーマレジストリオプションは、Protobuf 関数でスキーマレジストリを使用する場合に関連します。
-
schema.registry.subjectです。
- 必須
- スキーマレジストリのスキーマのサブジェクトを指定します (例: "client-event")。
-
schema.registry.addressの
- 必須
- スキーマレジストリの URL (
https://schema-registry.example.com:8081
-
スキーマ、レジストリ、プロトタイプ、名前
- オプション
- デフォルト:
<NONE>
。 - サブジェクトのスキーマレジストリエントリには、1 つの
proto
ファイルと同様に、複数の Protobuf 定義を含めることができます。 このオプションを指定しない場合、最初の Protobuf がスキーマに使用されます。 Protobuf メッセージの名前は、エントリの最初のメッセージでない場合は指定します。 たとえば、Protobuf の 2 つの定義 ("Person" と "Location" の順) を持つエントリについて考えてみます。 ストリームが「Person」ではなく「Location」に対応する場合は、このオプションを「Location」(またはパッケージ「com.example.protos.Location」を含むフルネーム)に設定します。
-
schema.registry.スキーマ進化.mode
-
デフォルト: "restart"。
-
対応モード:
- 「再起動」
- 「なし」
-
このオプションは、
from_protobuf()
のスキーマ進化モードを設定します。 クエリの開始時に、Spark は指定されたサブジェクトの最新の schema-id を記録します。 これにより、from_protobuf()
のスキーマが決まります。 クエリの開始後に、新しいスキーマがスキーマレジストリに発行される場合があります。 受信レコードで新しい schema-id が認識された場合、それはスキーマの変更を示しています。 このオプションは、スキーマに対するこのような変更の処理方法を決定します。- restart (デフォルト): 新しいスキーマ ID が検出されたときに
UnknownFieldException
をトリガーします。 これにより、クエリが終了します。 Databricks では、スキーマの変更を取得するために、クエリの失敗時にジョブが再起動するように構成することをお勧めします。 - none : スキーマ ID の変更は無視されます。 新しい schema-id を持つレコードは、クエリの開始時に観察されたのと同じスキーマで解析されます。 新しい Protobuf 定義は下位互換性があることが予想され、新しいフィールドは無視されます。
- restart (デフォルト): 新しいスキーマ ID が検出されたときに
-
-
confluent.schema.registry です。
<schema-registy-client-option>
- オプション
- スキーマレジストリは、Confluentスキーマレジストリクライアントを使用してConfluentスキーマレジストリに接続します。 クライアントがサポートする構成オプションはすべて、プレフィックス「confluent.schema.registry」で指定できます。 たとえば、次の2つの設定は「USER_INFO」の認証資格情報を提供します。
- "confluent.schema.registry.basic.auth.credentials.ソース": 「USER_INFO」
- 「confluent.schema.registry.basic.auth.user.info」: 「
<KEY>
:<SECRET>
」