Avro ファイルの読み取りと書き込み
Apache Avro は、豊富なデータ構造とコンパクトで高速なバイナリエンコーディングを提供する行ベースのデータシリアル化形式です。Databricks ユーザーは、Avro が主要なシリアル化フォーマットである Apache Kafka や Google Pub/Sub などのイベント ストリーミング システムからデータを取り込む際に、それに最も多く遭遇します。Databricks は、Avro と Spark SQL のタイプ間の自動スキーマ変換、パーティショニング、圧縮、カスタムレコード名を含め、Apache Spark を使用した Avro の読み書きの両方をサポートしています。
Apache Kafkaまたは別のメッセージバスからファイルからではなくAvroエンコードされたレコードを読み込む場合は、ストリーミング Avro データの読み取りと書き込みを参照してください。このドキュメントでは、ストリーミング逆シリアル化に使用される from_avro と to_avro の関数を扱っています。
前提条件
Databricks は Avro ファイルを使用するために追加の構成は必要ありません。ただし、Auto Loaderを使用しないと、Avroファイルをストリームできません。
オプション
DataFrameReader と DataFrameWriter の .option() および .options() メソッドを使用して、Avro データソースを構成します。サポートされているオプションの完全なリストについては、DataFrameReader Avro オプションおよびDataFrameWriter Avro オプションを参照してください。
使い方
以下の例では、Wanderbricks データセットを使用して、Spark DataFrame API と SQL を用いた Avro ファイルの読み取りと書き込みを実演します。
SQL を使用して Avro ファイルを読み取る
テーブルを登録せずにAvroファイルにクエリを実行するには、read_filesを使用します。Unity Catalog の外部ロケーションに対するアクセス許可は自動的に適用されます。
SELECT * FROM read_files(
'/Volumes/<catalog>/<schema>/<volume>/reviews_avro',
format => 'avro'
)
Avro ファイルの読み取りと書き込み
ダウンストリームシステム用にAvroファイルを読み書きする必要がある場合、ロードする前に変換を適用する場合、または書き込み時にパーティショニングやスキーマなどのオプションを制御する場合に、Apache Spark DataFrame APIを使用してください。
以下の例では、Wanderbricks サンプル データセットを使用しています。
- Python
- Scala
- SQL
from pyspark.sql.functions import year, month
# Write wanderbricks reviews to Avro format
df = spark.read.table("samples.wanderbricks.reviews")
df.write.format("avro").save("/Volumes/<catalog>/<schema>/<volume>/reviews_avro")
# Read an Avro file into a DataFrame
df = spark.read.format("avro").load("/Volumes/<catalog>/<schema>/<volume>/reviews_avro")
display(df)
# Write with overwrite mode
df.write.format("avro").mode("overwrite").save("/Volumes/<catalog>/<schema>/<volume>/reviews_avro")
# Read using a custom Avro schema to select specific fields
avro_schema = """
{
"type": "record",
"name": "Review",
"fields": [
{"name": "review_id", "type": "string"},
{"name": "rating", "type": "int"},
{"name": "comment", "type": ["null", "string"]}
]
}
"""
df = spark.read.format("avro").option("avroSchema", avro_schema).load("/Volumes/<catalog>/<schema>/<volume>/reviews_avro")
# Write partitioned Avro files by year and month
df = spark.read.table("samples.wanderbricks.bookings")
df_with_parts = df.withColumn("year", year("check_in")).withColumn("month", month("check_in"))
df_with_parts.write.format("avro").partitionBy("year", "month").save("/Volumes/<catalog>/<schema>/<volume>/bookings_avro_partitioned")
# Write with a custom record name and namespace for Schema Registry compatibility
df = spark.read.table("samples.wanderbricks.reviews")
df.write.format("avro").options(
recordName="Review",
recordNamespace="com.wanderbricks"
).save("/Volumes/<catalog>/<schema>/<volume>/reviews_avro")
import org.apache.spark.sql.functions.{year, month}
// Write wanderbricks reviews to Avro format
val reviews = spark.read.table("samples.wanderbricks.reviews")
reviews.write.format("avro").save("/Volumes/<catalog>/<schema>/<volume>/reviews_avro")
// Read an Avro file into a DataFrame
val df = spark.read.format("avro").load("/Volumes/<catalog>/<schema>/<volume>/reviews_avro")
df.show()
// Write with overwrite mode
df.write.format("avro").mode("overwrite").save("/Volumes/<catalog>/<schema>/<volume>/reviews_avro")
// Read using a custom Avro schema to select specific fields
val avroSchema = """
{
"type": "record",
"name": "Review",
"fields": [
{"name": "review_id", "type": "string"},
{"name": "rating", "type": "int"},
{"name": "comment", "type": ["null", "string"]}
]
}
"""
val filtered = spark.read.format("avro").option("avroSchema", avroSchema).load("/Volumes/<catalog>/<schema>/<volume>/reviews_avro")
// Write partitioned Avro files by year and month
val bookings = spark.read.table("samples.wanderbricks.bookings")
val bookingsWithParts = bookings.withColumn("year", year(col("check_in"))).withColumn("month", month(col("check_in")))
bookingsWithParts.write.format("avro").partitionBy("year", "month").save("/Volumes/<catalog>/<schema>/<volume>/bookings_avro_partitioned")
// Write with a custom record name and namespace for Schema Registry compatibility
reviews.write.format("avro").options(Map(
"recordName" -> "Review",
"recordNamespace" -> "com.wanderbricks"
)).save("/Volumes/<catalog>/<schema>/<volume>/reviews_avro")
-- Write wanderbricks reviews to Avro format
CREATE TABLE reviews_avro
USING AVRO
AS SELECT * FROM samples.wanderbricks.reviews;
-- Write partitioned Avro files by year and month
CREATE TABLE bookings_avro_partitioned
USING AVRO
PARTITIONED BY (year, month)
AS SELECT *, year(check_in) AS year, month(check_in) AS month
FROM samples.wanderbricks.bookings;
SELECT * FROM bookings_avro_partitioned;
その他のリソース
- Parquet ファイルの読み取りと書き込み: ワークロードがストリーミングや書き込み中心ではなく、主に分析的で読み込み中心である場合、Parquet のカラム形式のレイアウトは、Avro の行ベースストレージよりも効率的なクエリパフォーマンスを提供します。