Avro ファイル
Apache Avro は、データのシリアル化システムです。 Avro は以下を提供します。
豊富なデータ構造。
コンパクトで高速なバイナリデータ形式。
永続データを格納するためのコンテナー ファイル。
リモート プロシージャ コール (RPC)。
動的言語とのシンプルな統合。 コード生成は、データ ファイルの読み取りや書き込み、RPC プロトコルの使用や実装には必要ありません。 オプションの最適化としてのコード生成は、静的型付け言語にのみ実装する価値があります。
Avro データソースは以下をサポートします。
スキーマ変換: Apache Spark SQL と Avro レコード間の自動変換。
パーティション分割: パーティション分割されたデータを簡単に読み書きでき、追加の構成は必要ありません。
圧縮: Avro をディスクに書き出すときに使用する圧縮。 サポートされているタイプは、
uncompressed
、snappy
、およびdeflate
です。 また、デフレート レベルを指定することもできます。レコード名:
recordName
とrecordNamespace
を含むパラメーターのマップを渡して、レコード名と名前空間。
「 ストリーミング Avro データの読み取りと書き込み」も参照してください。
設定
Avro データソースの動作は、さまざまな構成パラメーターを使用して変更できます。
読み取り時に .avro
拡張子のないファイルを無視するには、Hadoop構成でパラメーター avro.mapred.ignore.inputs.without.extension
を設定します。 デフォルトは false
です。
spark
.sparkContext
.hadoopConfiguration
.set("avro.mapred.ignore.inputs.without.extension", "true")
書き込み時に圧縮を構成するには、次の Spark プロパティを設定します。
圧縮コーデック:
spark.sql.avro.compression.codec
. サポートされているコーデックはsnappy
とdeflate
です。 デフォルトのコーデックはsnappy
です。圧縮コーデックが
deflate
の場合は、spark.sql.avro.deflate.level
を使用して圧縮レベルを設定できます。 デフォルトのレベルは-1
です。
これらのプロパティは、クラスター の Spark 構成 で設定することも、実行時に spark.conf.set()
. 例えば:
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")
Databricks Runtime 9.1 LTS 以降では、ファイルを読み取るときに mergeSchema
オプションを指定することで、Avro での既定のスキーマ推論動作を変更できます。mergeSchema
を true
に設定すると、1 つのファイルから読み取りスキーマを推測するのではなく、ターゲット ディレクトリ内の一連の Avro ファイルからスキーマが推測され、それらがマージされます。
Avro -> Spark SQL 変換でサポートされる型
このライブラリは、すべての Avro 型の読み取りをサポートしています。 Avro 型から Spark SQL 型への次のマッピングを使用します。
Avro タイプ |
Spark SQL の種類 |
---|---|
boolean |
BooleanType |
int |
IntegerType |
long |
LongType |
float |
FloatType |
double |
DoubleType |
bytes |
BinaryType |
string |
StringType |
record |
StructType |
enum |
StringType |
array |
ArrayType |
map |
MapType |
fixed |
BinaryType |
union |
「共用体型」を参照してください。 |
共用体型
Avro データソースは、 union
タイプの読み取りをサポートしています。 Avro では、次の 3 つのタイプを union
タイプと見なします。
union(int, long)
LongType
にマップします。union(float, double)
DoubleType
にマップします。union(something, null)
ここで、something
はサポートされている任意の Avro タイプです。 これは、something
と同じ Spark SQL 型にマップされ、nullable
はtrue
に設定されます。
他のすべての union
型は複合型です。 これらは、union
のメンバーに従って、フィールド名が member0
、 member1
などであるStructType
にマップされます。これは、Avro と Parquet の間で変換するときの動作と一致しています。
論理型
Avro データソースは、次の Avro 論理タイプの読み込みをサポートしています。
Avro 論理型 |
Avro タイプ |
Spark SQL の種類 |
---|---|---|
date |
int |
DateType |
timestamp-millis |
long |
timestampType |
timestamp-micros |
long |
timestampType |
decimal |
fixed |
DecimalType |
decimal |
bytes |
DecimalType |
注:
Avro データソースは、Avro ファイルに存在するドキュメント、エイリアス、およびその他のプロパティを無視します。
Spark SQL -> Avro 変換でサポートされる型
このライブラリは、すべての Spark SQL 型の Avro への書き込みをサポートしています。 ほとんどの型では、Spark 型から Avro 型へのマッピングは簡単です (たとえば、 IntegerType
は int
に変換されます)。以下は、いくつかの特殊なケースのリストです。
Spark SQL の種類 |
Avro タイプ |
Avro 論理型 |
---|---|---|
ByteType |
int |
|
ShortType |
int |
|
BinaryType |
bytes |
|
DecimalType |
fixed |
decimal |
timestampType |
long |
timestamp-micros |
DateType |
int |
date |
また、オプション avroSchema
を使用して出力 Avro スキーマ全体を指定して、Spark SQL 型を他の Avro 型に変換することもできます。 次の変換はデフォルトでは適用されず、ユーザー指定の Avro スキーマが必要です。
Spark SQL の種類 |
Avro タイプ |
Avro 論理型 |
---|---|---|
ByteType |
fixed |
|
StringType |
enum |
|
DecimalType |
bytes |
decimal |
timestampType |
long |
timestamp-millis |
例:
これらの例では、 episodes.avro ファイルを使用します。
// The Avro records are converted to Spark types, filtered, and
// then written back out as Avro records
val df = spark.read.format("avro").load("/tmp/episodes.avro")
df.filter("doctor > 5").write.format("avro").save("/tmp/output")
この例は、カスタム Avro スキーマを示しています。
import org.apache.avro.Schema
val schema = new Schema.Parser().parse(new File("episode.avsc"))
spark
.read
.format("avro")
.option("avroSchema", schema.toString)
.load("/tmp/episodes.avro")
.show()
次の例は、Avro 圧縮オプションを示しています。
// configuration to use deflate compression
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")
val df = spark.read.format("avro").load("/tmp/episodes.avro")
// writes out compressed Avro records
df.write.format("avro").save("/tmp/output")
次の例は、パーティション分割された Avro レコードを示しています。
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().master("local").getOrCreate()
val df = spark.createDataFrame(
Seq(
(2012, 8, "Batman", 9.8),
(2012, 8, "Hero", 8.7),
(2012, 7, "Robot", 5.5),
(2011, 7, "Git", 2.0))
).toDF("year", "month", "title", "rating")
df.toDF.write.format("avro").partitionBy("year", "month").save("/tmp/output")
次の例は、レコード名と名前空間を示しています。
val df = spark.read.format("avro").load("/tmp/episodes.avro")
val name = "AvroTest"
val namespace = "org.foo"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)
df.write.options(parameters).format("avro").save("/tmp/output")
# Create a DataFrame from a specified directory
df = spark.read.format("avro").load("/tmp/episodes.avro")
# Saves the subset of the Avro records read in
subset = df.where("doctor > 5")
subset.write.format("avro").save("/tmp/output")
SQL で Avro データをクエリーするには、データファイルをテーブルまたは一時ビューとして登録します。
CREATE TEMPORARY VIEW episodes
USING avro
OPTIONS (path "/tmp/episodes.avro")
SELECT * from episodes