Arquivo Avro

Apache Avro é um sistema de serialização de dados. Avro fornece:

  • Estruturas de dados ricas.

  • Um formato de dados binário compacto e rápido.

  • Um arquivo contêiner para armazenar dados persistentes.

  • Chamada de procedimento remoto (RPC).

  • Integração simples com linguagens dinâmicas. A geração de código não é necessária para ler ou gravar arquivos de dados nem para usar ou implementar protocolos RPC. Geração de código como uma otimização opcional, que só vale a pena implementar para linguagens de tipo estaticamente.

A fonte de dados Avro suporta:

  • Conversão de esquema: conversão automática entre registros Apache Spark SQL e Avro.

  • Particionamento: Ler e gravar facilmente dados particionados sem qualquer configuração extra.

  • Compactação: compactação a ser usada ao gravar Avro no disco. Os tipos suportados são uncompressed, snappy e deflate. Você também pode especificar o nível de esvaziamento.

  • Nomes de registro: registre o nome e o namespace passando um mapa de parâmetros com recordName e recordNamespace.

Consulte também Ler e gravar dados de transmissão Avro.

Configuração

Você pode alterar o comportamento de uma fonte de dados Avro usando vários parâmetros de configuração.

Para ignorar arquivos sem a extensão .avro durante a leitura, você pode definir o parâmetro avro.mapred.ignore.inputs.without.extension na configuração do Hadoop. O default é false.

spark
  .sparkContext
  .hadoopConfiguration
  .set("avro.mapred.ignore.inputs.without.extension", "true")

Para configurar a compactação ao gravar, defina as seguintes propriedades do Spark:

  • Codec de compactação: spark.sql.avro.compression.codec. Os codecs suportados são snappy e deflate. O codec default é snappy.

  • Se o codec de compactação for deflate, você poderá definir o nível de compactação com: spark.sql.avro.deflate.level. O nível default é -1.

Você pode definir essas propriedades na configuração do Spark ou clusters no tempo de execução usando spark.conf.set(). Por exemplo:

spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")

Para Databricks Runtime 9.1 LTSe acima, você pode alterar o comportamento de inferência de esquema default no Avro fornecendo a opção mergeSchema ao ler arquivos. Definir mergeSchema como true inferirá um esquema de um conjunto de arquivos Avro no diretório de destino e os merge em vez de inferir o esquema de leitura de um único arquivo.

Tipos suportados para conversão Avro -> Spark SQL

Esta biblioteca suporta a leitura de todos os tipos Avro. Ele usa o seguinte mapeamento de tipos Avro para tipos Spark SQL:

Tipo Avro

Tipo Spark SQL

boolean

BooleanType

int

Tipo Inteiro

long

LongType

flutuador

FloatType

double

Tipo Duplo

bytes

TipoBinário

string

StringType

registro

TipoEstrutura

enumeração

StringType

variedade

ArrayType

map

Tipo de mapa

Fixo

TipoBinário

União

Consulte Tipos de união.

Tipos de união

A fonte de dados Avro suporta a leitura de tipos union . Avro considera os três tipos a seguir como union tipos:

  • union(int, long) mapeia para LongType.

  • union(float, double) mapeia para DoubleType.

  • union(something, null), onde something é qualquer tipo Avro compatível. Isso mapeia para o mesmo tipo Spark SQL de something, com nullable definido como true.

Todos os outros tipos union são tipos complexos. Eles mapeiam para StructType onde os nomes dos campos são member0, member1 e assim por diante, de acordo com os membros do union. Isso é consistente com o comportamento ao converter entre Avro e Parquet.

Tipos lógicos

A fonte de dados Avro suporta a leitura dos seguintes tipos lógicos Avro:

Tipo lógico Avro

Tipo Avro

Tipo Spark SQL

Data

int

Tipo de data

timestamp-milis

long

timestampType

timestamp-micros

long

timestampType

decimal

Fixo

Tipo Decimal

decimal

bytes

Tipo Decimal

Observação

A fonte de dados Avro ignora documentos, aliases e outras propriedades presentes no arquivo Avro.

Tipos suportados para Spark SQL -> conversão Avro

Esta biblioteca oferece suporte à gravação de todos os tipos Spark SQL no Avro. Para a maioria dos tipos, o mapeamento dos tipos Spark para os tipos Avro é direto (por exemplo IntegerType é convertido em int); a seguir está uma lista de alguns casos especiais:

Tipo Spark SQL

Tipo Avro

Tipo lógico Avro

ByteType

int

Tipo curto

int

TipoBinário

bytes

Tipo Decimal

Fixo

decimal

timestampType

long

timestamp-micros

Tipo de data

int

Data

Você também pode especificar todo o esquema Avro de saída com a opção avroSchema, para que os tipos Spark SQL possam ser convertidos em outros tipos Avro. As seguintes conversões não são aplicadas por default e exigem o esquema Avro especificado pelo usuário:

Tipo Spark SQL

Tipo Avro

Tipo lógico Avro

ByteType

Fixo

StringType

enumeração

Tipo Decimal

bytes

decimal

timestampType

long

timestamp-milis

Exemplos

Esses exemplos usam o arquivo Episodes.avro .

// The Avro records are converted to Spark types, filtered, and
// then written back out as Avro records

val df = spark.read.format("avro").load("/tmp/episodes.avro")
df.filter("doctor > 5").write.format("avro").save("/tmp/output")

Este exemplo demonstra um esquema Avro personalizado:

import org.apache.avro.Schema

val schema = new Schema.Parser().parse(new File("episode.avsc"))

spark
  .read
  .format("avro")
  .option("avroSchema", schema.toString)
  .load("/tmp/episodes.avro")
  .show()

Este exemplo demonstra as opções de compactação Avro:

// configuration to use deflate compression
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")

val df = spark.read.format("avro").load("/tmp/episodes.avro")

// writes out compressed Avro records
df.write.format("avro").save("/tmp/output")

Este exemplo demonstra registros Avro particionados:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().master("local").getOrCreate()

val df = spark.createDataFrame(
  Seq(
    (2012, 8, "Batman", 9.8),
    (2012, 8, "Hero", 8.7),
    (2012, 7, "Robot", 5.5),
    (2011, 7, "Git", 2.0))
  ).toDF("year", "month", "title", "rating")

df.toDF.write.format("avro").partitionBy("year", "month").save("/tmp/output")

Este exemplo demonstra o nome do registro e o namespace:

val df = spark.read.format("avro").load("/tmp/episodes.avro")

val name = "AvroTest"
val namespace = "org.foo"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)

df.write.options(parameters).format("avro").save("/tmp/output")
# Create a DataFrame from a specified directory
df = spark.read.format("avro").load("/tmp/episodes.avro")

#  Saves the subset of the Avro records read in
subset = df.where("doctor > 5")
subset.write.format("avro").save("/tmp/output")

Para query dados Avro em SQL, registre o arquivo de dados como uma tabela ou view temporária:

CREATE TEMPORARY VIEW episodes
USING avro
OPTIONS (path "/tmp/episodes.avro")

SELECT * from episodes

Exemplo Notebook : ler e gravar arquivos Avro

O Notebook a seguir demonstra como ler e gravar arquivos Avro.

Ler e escrever arquivos Avro Notebook

Abra o bloco de anotações em outra guia