メインコンテンツまでスキップ

Avro ファイル

Apache Avro は、データ シリアル化システムです。 Avro は以下を提供します。

  • 豊富なデータ構造。
  • コンパクトで高速なバイナリデータ形式。
  • 永続データを格納するためのコンテナファイル。
  • リモート プロシージャ コール (RPC)。
  • 動的言語との簡単な統合。 データ ファイルの読み取りや書き込み、RPC プロトコルの使用や実装にコード生成は必要ありません。 オプションの最適化としてのコード生成は、静的に型指定された言語にのみ実装する価値があります。

Avro データソースは以下をサポートします。

  • スキーマ変換: Apache Spark SQL と Avro レコード間の自動変換。
  • パーティショニング:追加の設定なしで、パーティショニングされたデータを簡単に読み書きできます。
  • 圧縮: Avro をディスクに書き込むときに使用する圧縮。 サポートされているタイプは、 uncompressedsnappy、および deflateです。 また、収縮レベルを指定することもできます。
  • レコード名: recordNamerecordNamespaceを持つパラメーターのマップを渡すことにより、レコード名と名前空間を

「ストリーミング Avro データの読み取りと書き込み」も参照してください。

構成

Avro データソースの動作は、さまざまな構成パラメーターを使用して変更できます。

読み取り時に .avro 拡張子のないファイルを無視するには、Hadoop 設定でパラメーター [ avro.mapred.ignore.inputs.without.extension ] を設定します。 デフォルトは falseです。

Scala
spark
.sparkContext
.hadoopConfiguration
.set("avro.mapred.ignore.inputs.without.extension", "true")

書き込み時の圧縮を構成するには、次の Spark プロパティを設定します。

  • 圧縮コーデック: spark.sql.avro.compression.codec。 サポートされているコーデックは snappydeflateです。 デフォルトのコーデックは snappyです。
  • 圧縮コーデックが deflateの場合は、 spark.sql.avro.deflate.levelで圧縮レベルを設定できます。 デフォルトのレベルは -1です。

これらのプロパティは、クラスター Spark構成で設定することも、実行時に spark.conf.set() を使用して設定することもできます。例えば:

Scala
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")

Databricks Runtime 9.1 LTS 以降では、ファイルの読み取り時に mergeSchema オプションを提供することで、Avro でデフォルト スキーマ推論の動作を変更できます。mergeSchematrue に設定すると、1 つのファイルから読み取られたスキーマを推論するのではなく、ターゲット ディレクトリ内の一連の Avro ファイルからスキーマを推論してマージします。

Avro -> Spark SQL 変換でサポートされている型

このライブラリは、すべての Avro タイプの読み取りをサポートしています。 次のものを使用します Avro 型から Spark SQL 型へのマッピング:

Avro タイプ

Spark SQL タイプ

ブーリアン

BooleanType

int

IntegerType

ロング

LongType

float

FloatType

double

DoubleType

バイト

BinaryType

string

StringType

record

StructType

enum

StringType

配列

ArrayType

マップ

MapType

固定

BinaryType

union

「ユニオンタイプ」を参照してください。

ユニオンの種類

Avro データソースは、unionタイプの読み込みをサポートしています。Avro では、次の 3 つのタイプを union タイプと位置付けています。

  • union(int, long)LongTypeにマップします。
  • union(float, double)DoubleTypeにマップします。
  • union(something, null)ここで、 something はサポートされている任意の Avro タイプです。 これは、 と同じ Spark SQL タイプにマップされます somethingで、 nullabletrueに設定されます。

他のすべての union 型は複合型です。 これらは フィールド名がmember0member1などであるStructTypeunionのメンバーに準じます。これは、 Avro と Parquet の間で変換するときの動作。

論理タイプ

Avro データソースは、次の Avro 論理タイプの読み込みをサポートしています。

Avro 論理タイプ

Avro タイプ

Spark SQL タイプ

日付

int

DateType

timestamp-millis

ロング

TimestampType

timestamp-micros

ロング

TimestampType

DECIMALタイプ

固定

DecimalType

DECIMALタイプ

バイト

DecimalType

注記

Avro データソースは、Avro ファイルに存在する docs、エイリアス、およびその他のプロパティを無視します。

Spark SQL -> Avro 変換でサポートされている型

このライブラリは、すべてのSpark SQLタイプのAvroへの書き込みをサポートしています。 ほとんどの人にとって SparkタイプからAvroタイプへのマッピングは簡単です (たとえば、 IntegerTypeintに変換されます)。次に、いくつかの特殊なケースを示します。

Spark SQL タイプ

Avro タイプ

Avro 論理タイプ

ByteType

int

ShortType

int

BinaryType

バイト

DecimalType

固定

DECIMALタイプ

TimestampType

ロング

timestamp-micros

DateType

int

日付

また、オプション avroSchemaを使用して出力 Avro スキーマ全体を指定して、Spark SQL タイプを他の Avro タイプに変換することもできます。次の変換はデフォルトでは適用されず、ユーザー指定の Avro スキーマが必要です。

Spark SQL タイプ

Avro タイプ

Avro 論理タイプ

ByteType

固定

StringType

enum

DecimalType

バイト

DECIMALタイプ

TimestampType

ロング

timestamp-millis

これらの例では、 episodes.avro ファイルを使用します。

Scala
// The Avro records are converted to Spark types, filtered, and
// then written back out as Avro records

val df = spark.read.format("avro").load("/tmp/episodes.avro")
df.filter("doctor > 5").write.format("avro").save("/tmp/output")

This example demonstrates a custom Avro schema:

Scala
import org.apache.avro.Schema

val schema = new Schema.Parser().parse(new File("episode.avsc"))

spark
.read
.format("avro")
.option("avroSchema", schema.toString)
.load("/tmp/episodes.avro")
.show()

This example demonstrates Avro compression options:

Scala
// configuration to use deflate compression
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")

val df = spark.read.format("avro").load("/tmp/episodes.avro")

// writes out compressed Avro records
df.write.format("avro").save("/tmp/output")

This example demonstrates partitioned Avro records:

Scala
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().master("local").getOrCreate()

val df = spark.createDataFrame(
Seq(
(2012, 8, "Batman", 9.8),
(2012, 8, "Hero", 8.7),
(2012, 7, "Robot", 5.5),
(2011, 7, "Git", 2.0))
).toDF("year", "month", "title", "rating")

df.toDF.write.format("avro").partitionBy("year", "month").save("/tmp/output")

This example demonstrates the record name and namespace:

Scala
val df = spark.read.format("avro").load("/tmp/episodes.avro")

val name = "AvroTest"
val namespace = "org.foo"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)

df.write.options(parameters).format("avro").save("/tmp/output")

ノートブックの例: Avro ファイルの読み取りと書き込み

次のノートブックは、Avro ファイルの読み取りと書き込みの方法を示しています。

Avro ファイル ノートブックの読み取りと書き込み

Open notebook in new tab