メインコンテンツまでスキップ

ストリーミング Avro データの読み取りと書き込み

Apache Avro は、ストリーミングの世界で一般的に使用されるデータ シリアル化システムです。 一般的なソリューションは、 形式のデータをAvro ApacheKafkaConfluent Schema Registry の メタデータに配置し、Kafka と Schema Registry の両方に接続するストリーミングフレームワークを使用してクエリを実行することです。

Databricks は、Kafka の Avro データと Schema Registry のメタデータを使用してストリーミングパイプラインを構築するための from_avro 関数と to_avro 関数 をサポートしています。 この関数は、列を Avro 形式のバイナリとしてエンコードし to_avro Avro バイナリ データを列にデコード from_avro 。 どちらの関数も 1 つの列を別の列に変換し、入力/出力 SQL データ型は複合型またはプリミティブ型にすることができます。

注記

from_avroto_avro の機能は次のとおりです。

  • Python、Scala、Java で利用できます。
  • バッチクエリとストリーミングクエリの両方でSQL関数に渡すことができます。

Avro file データソースも参照してください。

手動で指定したスキーマの例

from_jsonやto_jsonと同様に from_avro、 とto_avro はどのバイナリ列でも使用できます。次の例のように、Avro スキーマを手動で指定できます。

Scala
import org.apache.spark.sql.avro.functions._
import org.apache.avro.SchemaBuilder

// When reading the key and value of a Kafka topic, decode the
// binary (Avro) data into structured data.
// The schema of the resulting DataFrame is: <key: string, value: int>
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro($"key", SchemaBuilder.builder().stringType()).as("key"),
from_avro($"value", SchemaBuilder.builder().intType()).as("value"))

// Convert structured data to binary from string (key column) and
// int (value column) and save to a Kafka topic.
dataDF
.select(
to_avro($"key").as("key"),
to_avro($"value").as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()

jsonFormatSchema の例

スキーマを JSON 文字列として指定することもできます。 たとえば、 /tmp/user.avsc が次の場合です。

JSON
{
"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{ "name": "name", "type": "string" },
{ "name": "favorite_color", "type": ["string", "null"] }
]
}

JSON文字列を作成できます。

Python
from pyspark.sql.avro.functions import from_avro, to_avro

jsonFormatSchema = open("/tmp/user.avsc", "r").read()

次に、次のスキーマを使用しますfrom_avro

Python
# 1. Decode the Avro data into a struct.
# 2. Filter by column "favorite_color".
# 3. Encode the column "name" in Avro format.

output = df\
.select(from_avro("value", jsonFormatSchema).alias("user"))\
.where('user.favorite_color == "red"')\
.select(to_avro("user.name").alias("value"))

スキーマレジストリの例

クラスターに Schema Registry サービスがある場合は、from_avro で操作できるため、 Avro スキーマを手動で指定する必要はありません。

次の例は、キーと値が Schema Registry に STRING 型と INT型のサブジェクト t-key" と t-value" として既に登録されていると仮定して、Kafka トピック "t" を読み取る方法を示しています。

Scala
import org.apache.spark.sql.avro.functions._

val schemaRegistryAddr = "https://myhost:8081"
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro(data = $"key", subject = "t-key", schemaRegistryAddress = schemaRegistryAddr).as("key"),
from_avro(data = $"value", subject = "t-value", schemaRegistryAddress = schemaRegistryAddr).as("value"))

to_avroの場合、デフォルトの出力 Avro スキーマは、次の理由により、Schema Registry サービスのターゲットサブジェクトのスキーマと一致しない場合があります。

  • Spark SQL タイプから Avro スキーマへのマッピングは 1 対 1 ではありません。 Spark SQL -> Avro 変換でサポートされているタイプを参照してください。
  • 変換された出力 Avro スキーマがレコード・タイプの場合、レコード名は topLevelRecord され、デフォルトでは名前空間はありません。

デフォルトの出力スキーマ to_avro がターゲット・サブジェクトのスキーマと一致する場合は、次の操作を実行できます。

Scala
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()

それ以外の場合は、 to_avro 関数でターゲット・サブジェクトのスキーマを指定する必要があります。

Scala
// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()

外部の Confluent Schema Registry に対する認証

Databricks Runtime 12.2 LTS 以降では、外部の Confluent スキーマ レジストリに対して認証できます。 次の例は、認証認証情報と API キーを含めるようにスキーマレジストリオプションを設定する方法を示しています。

Scala
import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret")

val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro(data = $"key", subject = "t-key", schemaRegistryAddress = schemaRegistryAddr, options = schemaRegistryOptions.asJava).as("key"),
from_avro(data = $"value", subject = "t-value", schemaRegistryAddress = schemaRegistryAddr, options = schemaRegistryOptions.asJava).as("value"))

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

トラストストアとキーストアのファイルを Unity Catalog ボリュームで使用する

Databricks Runtime 14.3 LTS 以降では、Unity Catalog ボリューム内のトラストストア ファイルとキーストア ファイルを使用して、Confluent スキーマ レジストリに対する認証を行うことができます。 前の例の設定を次の構文を使用して更新します。

Scala
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/truststore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "truststorePassword",
"confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/keystore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "keystorePassword",
"confluent.schema.registry.ssl.key.password" -> "keyPassword")

スキーマ進化モードを次のように使用します。 from_avro

Databricks Runtime 14.2 以降では、 from_avroでスキーマ進化モードを使用できます。 スキーマ進化モードを有効にすると、スキーマ進化を検出した後にジョブが UnknownFieldException をスローします。 Databricks では、スキーマ進化モードを使用してジョブを構成して、タスクの失敗時に自動的に再起動することをお勧めします。 構造化ストリーミングについては、本番運用に関する考慮事項を参照してください。

スキーマの進化は、ソース データのスキーマが時間の経過と共に進化し、データソースからすべてのフィールドを取り込むと予想される場合に便利です。 クエリでデータソースでクエリするフィールドをすでに明示的に指定している場合、追加されたフィールドはスキーマの進化に関係なく無視されます。

avroSchemaEvolutionModeオプションを使用して、スキーマの進化を有効にします。次の表では、スキーマ進化モードのオプションについて説明します。

オプション

挙動

none

デフォルト 。 スキーマの進化を無視し、ジョブを続行します。

restart

スキーマの進化を検出するときに UnknownFieldException をスローします。 ジョブの再起動が必要です。

注記

この構成は、ストリーミング ジョブ間で変更し、同じチェックポイントを再利用できます。 スキーマ進化を無効にすると、列が削除される可能性があります。

解析モードを構成する

解析モードを構成して、スキーマ進化モードが無効になり、スキーマが下位互換性のない方法で進化するときに、失敗させるか、null レコードを出力するかを決定できます。 デフォルト設定では、互換性のないスキーマの変更が観察されると、 from_avro は失敗します。

解析モードを指定するには、 mode オプションを使用します。 次の表では、解析モードのオプションについて説明します。

オプション

挙動

FAILFAST

デフォルト 。 解析エラーは、errorClassMALFORMED_AVRO_MESSAGESparkExceptionをスローします。

PERMISSIVE

解析エラーは無視され、ヌル・レコードが出力されます。

注記

スキーマ進化を有効にすると、レコードが破損している場合にのみ FAILFAST が例外をスローします。

スキーマ進化の使用例と解析モードの設定

次の例は、スキーマ進化を有効にし、Confluent Schema Registry で FAILFAST 解析モードを指定する方法を示しています。

Scala
import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret",
"avroSchemaEvolutionMode" -> "restart",
"mode" -> "FAILFAST")

val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
// We read the "key" binary column from the subject "t-key" in the schema
// registry at schemaRegistryAddr. We provide schemaRegistryOptions,
// which has avroSchemaEvolutionMode -> "restart". This instructs from_avro
// to fail the query if the schema for the subject t-key evolves.
from_avro(
data = $"key",
subject = "t-key",
schemaRegistryAddress = schemaRegistryAddr,
options = schemaRegistryOptions.asJava).as("key"))