Pular para o conteúdo principal

Arquivo Avro

O Apache Avro é um sistema de serialização de dados. A Avro fornece:

  • Estruturas de dados ricas.
  • Um formato de dados binário compacto e rápido.
  • Um arquivo de contêiner, para armazenar dados persistentes.
  • Chamada de procedimento remoto (RPC).
  • Integração simples com linguagens dinâmicas. A geração de código não é necessária para ler ou gravar arquivos de dados nem para usar ou implementar protocolos RPC. Geração de código como uma otimização opcional, que só vale a pena implementar para linguagens de tipagem estática.

O site Avro fonte de dados oferece suporte:

  • Conversão de esquema: Conversão automática entre registros do Apache Spark SQL e do Avro.
  • Particionamento: leitura e gravação fáceis de dados particionados sem nenhuma configuração extra.
  • Compression (Compressão): Compressão a ser usada ao gravar o Avro no disco. Os tipos suportados são uncompressed, snappy e deflate. Você também pode especificar o nível de deflação.
  • Nomes de registro: registre o nome e o namespace passando um mapa de parâmetros com recordName e recordNamespace.

Consulte também a transmissão de leitura e gravação Avro data.

Configuração

O senhor pode alterar o comportamento de uma Avro fonte de dados usando vários parâmetros de configuração.

Para ignorar arquivos sem a extensão .avro durante a leitura, o senhor pode definir o parâmetro avro.mapred.ignore.inputs.without.extension na configuração do Hadoop. O endereço default é false.

Scala
spark
.sparkContext
.hadoopConfiguration
.set("avro.mapred.ignore.inputs.without.extension", "true")

Para configurar a compactação durante a gravação, defina as seguintes propriedades do Spark:

  • Codec de compressão: spark.sql.avro.compression.codec. Os codecs suportados são snappy e deflate. O codec default é snappy.
  • Se o codec de compressão for deflate, você pode definir o nível de compressão com: spark.sql.avro.deflate.level. O nível default é -1.

O senhor pode definir essas propriedades na configuração doSpark de clustering ou em tempo de execução usando spark.conf.set(). Por exemplo:

Scala
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")

Para Databricks Runtime 9.1 LTS e acima, o senhor pode alterar o comportamento de inferência do esquema default em Avro fornecendo a opção mergeSchema ao ler arquivos. Ao definir mergeSchema como true, o senhor deduzirá um esquema de um conjunto de arquivos Avro no diretório de destino e os merge em vez de deduzir o esquema de leitura de um único arquivo.

Tipos compatíveis com a conversão Avro -> Spark SQL

Essa biblioteca suporta a leitura de todos os tipos de Avro. Ele usa o seguinte mapeamento de tipos Avro para tipos Spark SQL:

Tipo Avro

Tipo Spark SQL

boolean

Tipo booleano

int

Tipo de número inteiro

long

Tipo longo

Float

Tipo de flutuação

double

Tipo duplo

bytes

Tipo binário

string

Tipo de string

disco

Tipo de estrutura

enumeração

Tipo de string

matriz

Tipo de matriz

map

Tipo de mapa

Fixo

Tipo binário

união

Consulte Tipos de união.

Tipos de união

O site Avro fonte de dados suporta a leitura de tipos union. A Avro considera os três tipos a seguir como union:

  • union(int, long) mapeia para LongType.
  • union(float, double) mapeia para DoubleType.
  • union(something, null)onde something é qualquer tipo de Avro compatível. Isso mapeia para o mesmo tipo de Spark SQL que o de something, com nullable definido como true.

Todos os outros tipos de union são tipos complexos. Eles mapeiam para StructType em que os nomes dos campos são member0, member1 e assim por diante, em de acordo com os membros do union. Isso é consistente com o comportamento do site ao converter entre Avro e Parquet.

Tipos lógicos

A Avro fonte de dados suporta a leitura dos seguintes tipos lógicosAvro:

Tipo lógico Avro

Tipo Avro

Tipo Spark SQL

Data

int

Tipo de data

timestamp-millis

long

timestampType

timestamp-micros

long

timestampType

Decimal

Fixo

Tipo decimal

Decimal

bytes

Tipo decimal

nota

A Avro fonte de dados ignora documentos, aliases e outras propriedades presentes no arquivo Avro.

Tipos suportados para Spark SQL -> Conversão Avro

Essa biblioteca oferece suporte à gravação de todos os tipos de Spark SQL no Avro. Para a maioria dos tipos , o mapeamento dos tipos Spark para os tipos Avro é direto (por exemplo, IntegerType é convertido em int); a seguir, uma lista dos poucos casos especiais:

Tipo Spark SQL

Tipo Avro

Tipo lógico Avro

Byte Type

int

Tipo curto

int

Tipo binário

bytes

Tipo decimal

Fixo

Decimal

timestampType

long

timestamp-micros

Tipo de data

int

Data

O senhor também pode especificar todo o esquema Avro de saída com a opção avroSchema, de modo que os tipos Spark SQL possam ser convertidos em outros tipos Avro. As conversões a seguir não são aplicadas pelo site default e exigem um esquema Avro especificado pelo usuário:

Tipo Spark SQL

Tipo Avro

Tipo lógico Avro

Byte Type

Fixo

Tipo de string

enumeração

Tipo decimal

bytes

Decimal

timestampType

long

timestamp-millis

Exemplos

Esses exemplos usam o arquivo episódios.avro.

Scala
// The Avro records are converted to Spark types, filtered, and
// then written back out as Avro records

val df = spark.read.format("avro").load("/tmp/episodes.avro")
df.filter("doctor > 5").write.format("avro").save("/tmp/output")

This example demonstrates a custom Avro schema:

Scala
import org.apache.avro.Schema

val schema = new Schema.Parser().parse(new File("episode.avsc"))

spark
.read
.format("avro")
.option("avroSchema", schema.toString)
.load("/tmp/episodes.avro")
.show()

This example demonstrates Avro compression options:

Scala
// configuration to use deflate compression
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")

val df = spark.read.format("avro").load("/tmp/episodes.avro")

// writes out compressed Avro records
df.write.format("avro").save("/tmp/output")

This example demonstrates partitioned Avro records:

Scala
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().master("local").getOrCreate()

val df = spark.createDataFrame(
Seq(
(2012, 8, "Batman", 9.8),
(2012, 8, "Hero", 8.7),
(2012, 7, "Robot", 5.5),
(2011, 7, "Git", 2.0))
).toDF("year", "month", "title", "rating")

df.toDF.write.format("avro").partitionBy("year", "month").save("/tmp/output")

This example demonstrates the record name and namespace:

Scala
val df = spark.read.format("avro").load("/tmp/episodes.avro")

val name = "AvroTest"
val namespace = "org.foo"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)

df.write.options(parameters).format("avro").save("/tmp/output")

Notebook exemplo: Ler e gravar arquivos em Avro

O Notebook a seguir demonstra como ler e gravar arquivos Avro.

Ler e gravar arquivos em Avro Notebook

Open notebook in new tab