Avro ファイル

Apache Avro は、データのシリアル化システムです。 Avro は以下を提供します。

  • 豊富なデータ構造。

  • コンパクトで高速なバイナリデータ形式。

  • 永続データを格納するためのコンテナー ファイル。

  • リモート プロシージャ コール (RPC)。

  • 動的言語とのシンプルな統合。 コード生成は、データ ファイルの読み取りや書き込み、RPC プロトコルの使用や実装には必要ありません。 オプションの最適化としてのコード生成は、静的型付け言語にのみ実装する価値があります。

Avro データソースは以下をサポートします。

  • スキーマ変換: Apache Spark SQL と Avro レコード間の自動変換。

  • パーティション分割: パーティション分割されたデータを簡単に読み書きでき、追加の構成は必要ありません。

  • 圧縮: Avro をディスクに書き出すときに使用する圧縮。 サポートされているタイプは、 uncompressedsnappy、および deflateです。 また、デフレート レベルを指定することもできます。

  • レコード名: recordNamerecordNamespaceを含むパラメーターのマップを渡して、レコード名と名前空間。

ストリーミング Avro データの読み取りと書き込み」も参照してください。

設定

Avro データソースの動作は、さまざまな構成パラメーターを使用して変更できます。

読み取り時に .avro 拡張子のないファイルを無視するには、Hadoop構成でパラメーター avro.mapred.ignore.inputs.without.extension を設定します。 デフォルトは falseです。

spark
  .sparkContext
  .hadoopConfiguration
  .set("avro.mapred.ignore.inputs.without.extension", "true")

書き込み時に圧縮を構成するには、次の Spark プロパティを設定します。

  • 圧縮コーデック: spark.sql.avro.compression.codec. サポートされているコーデックは snappydeflateです。 デフォルトのコーデックは snappyです。

  • 圧縮コーデックが deflateの場合は、 spark.sql.avro.deflate.levelを使用して圧縮レベルを設定できます。 デフォルトのレベルは -1です。

これらのプロパティは、クラスター の Spark 構成 で設定することも、実行時に spark.conf.set(). 例えば:

spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")

Databricks Runtime 9.1 LTS 以降では、ファイルを読み取るときに mergeSchema オプションを指定することで、Avro での既定のスキーマ推論動作を変更できます。mergeSchematrue に設定すると、1 つのファイルから読み取りスキーマを推測するのではなく、ターゲット ディレクトリ内の一連の Avro ファイルからスキーマが推測され、それらがマージされます。

Avro -> Spark SQL 変換でサポートされる型

このライブラリは、すべての Avro 型の読み取りをサポートしています。 Avro 型から Spark SQL 型への次のマッピングを使用します。

Avro タイプ

Spark SQL の種類

boolean

BooleanType

int

IntegerType

long

LongType

float

FloatType

double

DoubleType

bytes

BinaryType

string

StringType

record

StructType

enum

StringType

array

ArrayType

map

MapType

fixed

BinaryType

union

共用体型」を参照してください。

共用体型

Avro データソースは、 union タイプの読み取りをサポートしています。 Avro では、次の 3 つのタイプを union タイプと見なします。

  • union(int, long) LongTypeにマップします。

  • union(float, double) DoubleTypeにマップします。

  • union(something, null)ここで、 something はサポートされている任意の Avro タイプです。 これは、 somethingと同じ Spark SQL 型にマップされ、 nullabletrueに設定されます。

他のすべての union 型は複合型です。 これらは、unionのメンバーに従って、フィールド名が member0member1などであるStructTypeにマップされます。これは、Avro と Parquet の間で変換するときの動作と一致しています。

論理型

Avro データソースは、次の Avro 論理タイプの読み込みをサポートしています。

Avro 論理型

Avro タイプ

Spark SQL の種類

date

int

DateType

timestamp-millis

long

timestampType

timestamp-micros

long

timestampType

decimal

fixed

DecimalType

decimal

bytes

DecimalType

注:

Avro データソースは、Avro ファイルに存在するドキュメント、エイリアス、およびその他のプロパティを無視します。

Spark SQL -> Avro 変換でサポートされる型

このライブラリは、すべての Spark SQL 型の Avro への書き込みをサポートしています。 ほとんどの型では、Spark 型から Avro 型へのマッピングは簡単です (たとえば、 IntegerTypeintに変換されます)。以下は、いくつかの特殊なケースのリストです。

Spark SQL の種類

Avro タイプ

Avro 論理型

ByteType

int

ShortType

int

BinaryType

bytes

DecimalType

fixed

decimal

timestampType

long

timestamp-micros

DateType

int

date

また、オプション avroSchemaを使用して出力 Avro スキーマ全体を指定して、Spark SQL 型を他の Avro 型に変換することもできます。 次の変換はデフォルトでは適用されず、ユーザー指定の Avro スキーマが必要です。

Spark SQL の種類

Avro タイプ

Avro 論理型

ByteType

fixed

StringType

enum

DecimalType

bytes

decimal

timestampType

long

timestamp-millis

例:

これらの例では、 episodes.avro ファイルを使用します。

// The Avro records are converted to Spark types, filtered, and
// then written back out as Avro records

val df = spark.read.format("avro").load("/tmp/episodes.avro")
df.filter("doctor > 5").write.format("avro").save("/tmp/output")

この例は、カスタム Avro スキーマを示しています。

import org.apache.avro.Schema

val schema = new Schema.Parser().parse(new File("episode.avsc"))

spark
  .read
  .format("avro")
  .option("avroSchema", schema.toString)
  .load("/tmp/episodes.avro")
  .show()

次の例は、Avro 圧縮オプションを示しています。

// configuration to use deflate compression
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")

val df = spark.read.format("avro").load("/tmp/episodes.avro")

// writes out compressed Avro records
df.write.format("avro").save("/tmp/output")

次の例は、パーティション分割された Avro レコードを示しています。

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().master("local").getOrCreate()

val df = spark.createDataFrame(
  Seq(
    (2012, 8, "Batman", 9.8),
    (2012, 8, "Hero", 8.7),
    (2012, 7, "Robot", 5.5),
    (2011, 7, "Git", 2.0))
  ).toDF("year", "month", "title", "rating")

df.toDF.write.format("avro").partitionBy("year", "month").save("/tmp/output")

次の例は、レコード名と名前空間を示しています。

val df = spark.read.format("avro").load("/tmp/episodes.avro")

val name = "AvroTest"
val namespace = "org.foo"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)

df.write.options(parameters).format("avro").save("/tmp/output")
# Create a DataFrame from a specified directory
df = spark.read.format("avro").load("/tmp/episodes.avro")

#  Saves the subset of the Avro records read in
subset = df.where("doctor > 5")
subset.write.format("avro").save("/tmp/output")

SQL で Avro データをクエリーするには、データファイルをテーブルまたは一時ビューとして登録します。

CREATE TEMPORARY VIEW episodes
USING avro
OPTIONS (path "/tmp/episodes.avro")

SELECT * from episodes

ノートブックの例: Avro ファイルの読み取りと書き込み

次のノートブックは、Avro ファイルの読み取りと書き込みの方法を示しています。

Avro ファイル ノートブックの読み取りと書き込み

ノートブックを新しいタブで開く