Avro ファイル
Apache Avro は、データ シリアル化システムです。 Avro は以下を提供します。
- 豊富なデータ構造。
- コンパクトで高速なバイナリデータ形式。
- 永続データを格納するためのコンテナファイル。
- リモート プロシージャ コール (RPC)。
- 動的言語との簡単な統合。 データ ファイルの読み取りや書き込み、RPC プロトコルの使用や実装にコード生成は必要ありません。 オプションの最適化としてのコード生成は、静的に型指定された言語にのみ実装する価値があります。
Avro データソースは以下をサポートします。
- スキーマ変換: Apache Spark SQL と Avro レコード間の自動変換。
- パーティショニング:追加の設定なしで、パーティショニングされたデータを簡単に読み書きできます。
- 圧縮: Avro をディスクに書き込むときに使用する圧縮。 サポートされているタイプは、
uncompressed
、snappy
、およびdeflate
です。 また、収縮レベルを指定することもできます。 - レコード名:
recordName
とrecordNamespace
を持つパラメーターのマップを渡すことにより、レコード名と名前空間を
「ストリーミング Avro データの読み取りと書き込み」も参照してください。
構成
Avro データソースの動作は、さまざまな構成パラメーターを使用して変更できます。
読み取り時に .avro
拡張子のないファイルを無視するには、Hadoop 設定でパラメーター [ avro.mapred.ignore.inputs.without.extension
] を設定します。 デフォルトは false
です。
spark
.sparkContext
.hadoopConfiguration
.set("avro.mapred.ignore.inputs.without.extension", "true")
書き込み時の圧縮を構成するには、次の Spark プロパティを設定します。
- 圧縮コーデック:
spark.sql.avro.compression.codec
。 サポートされているコーデックはsnappy
とdeflate
です。 デフォルトのコーデックはsnappy
です。 - 圧縮コーデックが
deflate
の場合は、spark.sql.avro.deflate.level
で圧縮レベルを設定できます。 デフォルトのレベルは-1
です。
これらのプロパティは、クラスター Spark構成で設定することも、実行時に spark.conf.set()
を使用して設定することもできます。例えば:
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")
Databricks Runtime 9.1 LTS 以降では、ファイルの読み取り時に mergeSchema
オプションを提供することで、Avro でデフォルト スキーマ推論の動作を変更できます。mergeSchema
を true
に設定すると、1 つのファイルから読み取られたスキーマを推論するのではなく、ターゲット ディレクトリ内の一連の Avro ファイルからスキーマを推論してマージします。
Avro -> Spark SQL 変換でサポートされている型
このライブラリは、すべての Avro タイプの読み取りをサポートしています。 次のものを使用します Avro 型から Spark SQL 型へのマッピング:
Avro タイプ | Spark SQL タイプ |
---|---|
ブーリアン | BooleanType |
int | IntegerType |
ロング | LongType |
float | FloatType |
double | DoubleType |
バイト | BinaryType |
string | StringType |
record | StructType |
enum | StringType |
配列 | ArrayType |
マップ | MapType |
固定 | BinaryType |
union | 「ユニオンタイプ」を参照してください。 |
ユニオンの種類
Avro データソースは、union
タイプの読み込みをサポートしています。Avro では、次の 3 つのタイプを union
タイプと位置付けています。
union(int, long)
はLongType
にマップします。union(float, double)
はDoubleType
にマップします。union(something, null)
ここで、something
はサポートされている任意の Avro タイプです。 これは、 と同じ Spark SQL タイプにマップされますsomething
で、nullable
はtrue
に設定されます。
他のすべての union
型は複合型です。 これらは
フィールド名がmember0
、member1
などであるStructType
、
union
のメンバーに準じます。これは、
Avro と Parquet の間で変換するときの動作。
論理タイプ
Avro データソースは、次の Avro 論理タイプの読み込みをサポートしています。
Avro 論理タイプ | Avro タイプ | Spark SQL タイプ |
---|---|---|
日付 | int | DateType |
timestamp-millis | ロング | TimestampType |
timestamp-micros | ロング | TimestampType |
DECIMALタイプ | 固定 | DecimalType |
DECIMALタイプ | バイト | DecimalType |
Avro データソースは、Avro ファイルに存在する docs、エイリアス、およびその他のプロパティを無視します。
Spark SQL -> Avro 変換でサポートされている型
このライブラリは、すべてのSpark SQLタイプのAvroへの書き込みをサポートしています。 ほとんどの人にとって
SparkタイプからAvroタイプへのマッピングは簡単です
(たとえば、 IntegerType
は int
に変換されます)。次に、いくつかの特殊なケースを示します。
Spark SQL タイプ | Avro タイプ | Avro 論理タイプ |
---|---|---|
ByteType | int | |
ShortType | int | |
BinaryType | バイト | |
DecimalType | 固定 | DECIMALタイプ |
TimestampType | ロング | timestamp-micros |
DateType | int | 日付 |
また、オプション avroSchema
を使用して出力 Avro スキーマ全体を指定して、Spark SQL タイプを他の Avro タイプに変換することもできます。次の変換はデフォルトでは適用されず、ユーザー指定の Avro スキーマが必要です。
Spark SQL タイプ | Avro タイプ | Avro 論理タイプ |
---|---|---|
ByteType | 固定 | |
StringType | enum | |
DecimalType | バイト | DECIMALタイプ |
TimestampType | ロング | timestamp-millis |
例
これらの例では、 episodes.avro ファイルを使用します。
- Scala
- Python
- SQL
// The Avro records are converted to Spark types, filtered, and
// then written back out as Avro records
val df = spark.read.format("avro").load("/tmp/episodes.avro")
df.filter("doctor > 5").write.format("avro").save("/tmp/output")
This example demonstrates a custom Avro schema:
import org.apache.avro.Schema
val schema = new Schema.Parser().parse(new File("episode.avsc"))
spark
.read
.format("avro")
.option("avroSchema", schema.toString)
.load("/tmp/episodes.avro")
.show()
This example demonstrates Avro compression options:
// configuration to use deflate compression
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")
val df = spark.read.format("avro").load("/tmp/episodes.avro")
// writes out compressed Avro records
df.write.format("avro").save("/tmp/output")
This example demonstrates partitioned Avro records:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().master("local").getOrCreate()
val df = spark.createDataFrame(
Seq(
(2012, 8, "Batman", 9.8),
(2012, 8, "Hero", 8.7),
(2012, 7, "Robot", 5.5),
(2011, 7, "Git", 2.0))
).toDF("year", "month", "title", "rating")
df.toDF.write.format("avro").partitionBy("year", "month").save("/tmp/output")
This example demonstrates the record name and namespace:
val df = spark.read.format("avro").load("/tmp/episodes.avro")
val name = "AvroTest"
val namespace = "org.foo"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)
df.write.options(parameters).format("avro").save("/tmp/output")
# Create a DataFrame from a specified directory
df = spark.read.format("avro").load("/tmp/episodes.avro")
# Saves the subset of the Avro records read in
subset = df.where("doctor > 5")
subset.write.format("avro").save("/tmp/output")
To query Avro data in SQL, register the data file as a table or temporary view:
CREATE TEMPORARY VIEW episodes
USING avro
OPTIONS (path "/tmp/episodes.avro")
SELECT * from episodes
ノートブックの例: Avro ファイルの読み取りと書き込み
次のノートブックは、Avro ファイルの読み取りと書き込みの方法を示しています。