Arquivo Avro
Apache Avro é um sistema de serialização de dados. Avro fornece:
Estruturas de dados ricas.
Um formato de dados binário compacto e rápido.
Um arquivo contêiner para armazenar dados persistentes.
Chamada de procedimento remoto (RPC).
Integração simples com linguagens dinâmicas. A geração de código não é necessária para ler ou gravar arquivos de dados nem para usar ou implementar protocolos RPC. Geração de código como uma otimização opcional, que só vale a pena implementar para linguagens de tipo estaticamente.
A fonte de dados Avro suporta:
Conversão de esquema: conversão automática entre registros Apache Spark SQL e Avro.
Particionamento: Ler e gravar facilmente dados particionados sem qualquer configuração extra.
Compactação: compactação a ser usada ao gravar Avro no disco. Os tipos suportados são
uncompressed
,snappy
edeflate
. Você também pode especificar o nível de esvaziamento.Nomes de registro: registre o nome e o namespace passando um mapa de parâmetros com
recordName
erecordNamespace
.
Consulte também Ler e gravar dados de transmissão Avro.
Configuração
Você pode alterar o comportamento de uma fonte de dados Avro usando vários parâmetros de configuração.
Para ignorar arquivos sem a extensão .avro
durante a leitura, você pode definir o parâmetro avro.mapred.ignore.inputs.without.extension
na configuração do Hadoop. O default é false
.
spark
.sparkContext
.hadoopConfiguration
.set("avro.mapred.ignore.inputs.without.extension", "true")
Para configurar a compactação ao gravar, defina as seguintes propriedades do Spark:
Codec de compactação:
spark.sql.avro.compression.codec
. Os codecs suportados sãosnappy
edeflate
. O codec default ésnappy
.Se o codec de compactação for
deflate
, você poderá definir o nível de compactação com:spark.sql.avro.deflate.level
. O nível default é-1
.
Você pode definir essas propriedades na configuração do Spark ou clusters no tempo de execução usando spark.conf.set()
. Por exemplo:
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")
Para Databricks Runtime 9.1 LTSe acima, você pode alterar o comportamento de inferência de esquema default no Avro fornecendo a opção mergeSchema
ao ler arquivos. Definir mergeSchema
como true
inferirá um esquema de um conjunto de arquivos Avro no diretório de destino e os merge em vez de inferir o esquema de leitura de um único arquivo.
Tipos suportados para conversão Avro -> Spark SQL
Esta biblioteca suporta a leitura de todos os tipos Avro. Ele usa o seguinte mapeamento de tipos Avro para tipos Spark SQL:
Tipo Avro |
Tipo Spark SQL |
---|---|
boolean |
BooleanType |
int |
Tipo Inteiro |
long |
LongType |
flutuador |
FloatType |
double |
Tipo Duplo |
bytes |
TipoBinário |
string |
StringType |
registro |
TipoEstrutura |
enumeração |
StringType |
variedade |
ArrayType |
map |
Tipo de mapa |
Fixo |
TipoBinário |
União |
Consulte Tipos de união. |
Tipos de união
A fonte de dados Avro suporta a leitura de tipos union
. Avro considera os três tipos a seguir como union
tipos:
union(int, long)
mapeia paraLongType
.union(float, double)
mapeia paraDoubleType
.union(something, null)
, ondesomething
é qualquer tipo Avro compatível. Isso mapeia para o mesmo tipo Spark SQL desomething
, comnullable
definido comotrue
.
Todos os outros tipos union
são tipos complexos. Eles mapeiam para StructType
onde os nomes dos campos são member0
, member1
e assim por diante, de acordo com os membros do union
. Isso é consistente com o comportamento ao converter entre Avro e Parquet.
Tipos lógicos
A Avro fonte de dados suporta a leitura dos seguintes tipos lógicosAvro:
Tipo lógico Avro |
Tipo Avro |
Tipo Spark SQL |
---|---|---|
Data |
int |
Tipo de data |
timestamp-milis |
long |
timestampType |
timestamp-micros |
long |
timestampType |
decimal |
Fixo |
Tipo Decimal |
decimal |
bytes |
Tipo Decimal |
Observação
A fonte de dados Avro ignora documentos, aliases e outras propriedades presentes no arquivo Avro.
Tipos suportados para Spark SQL -> conversão Avro
Esta biblioteca oferece suporte à gravação de todos os tipos Spark SQL no Avro. Para a maioria dos tipos, o mapeamento dos tipos Spark para os tipos Avro é direto (por exemplo IntegerType
é convertido em int
); a seguir está uma lista de alguns casos especiais:
Tipo Spark SQL |
Tipo Avro |
Tipo lógico Avro |
---|---|---|
ByteType |
int |
|
Tipo curto |
int |
|
TipoBinário |
bytes |
|
Tipo Decimal |
Fixo |
decimal |
timestampType |
long |
timestamp-micros |
Tipo de data |
int |
Data |
Você também pode especificar todo o esquema Avro de saída com a opção avroSchema
, para que os tipos Spark SQL possam ser convertidos em outros tipos Avro. As seguintes conversões não são aplicadas por default e exigem o esquema Avro especificado pelo usuário:
Tipo Spark SQL |
Tipo Avro |
Tipo lógico Avro |
---|---|---|
ByteType |
Fixo |
|
StringType |
enumeração |
|
Tipo Decimal |
bytes |
decimal |
timestampType |
long |
timestamp-milis |
Exemplos
Esses exemplos usam o arquivo Episodes.avro .
// The Avro records are converted to Spark types, filtered, and
// then written back out as Avro records
val df = spark.read.format("avro").load("/tmp/episodes.avro")
df.filter("doctor > 5").write.format("avro").save("/tmp/output")
Este exemplo demonstra um esquema Avro personalizado:
import org.apache.avro.Schema
val schema = new Schema.Parser().parse(new File("episode.avsc"))
spark
.read
.format("avro")
.option("avroSchema", schema.toString)
.load("/tmp/episodes.avro")
.show()
Este exemplo demonstra as opções de compactação Avro:
// configuration to use deflate compression
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")
val df = spark.read.format("avro").load("/tmp/episodes.avro")
// writes out compressed Avro records
df.write.format("avro").save("/tmp/output")
Este exemplo demonstra registros Avro particionados:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().master("local").getOrCreate()
val df = spark.createDataFrame(
Seq(
(2012, 8, "Batman", 9.8),
(2012, 8, "Hero", 8.7),
(2012, 7, "Robot", 5.5),
(2011, 7, "Git", 2.0))
).toDF("year", "month", "title", "rating")
df.toDF.write.format("avro").partitionBy("year", "month").save("/tmp/output")
Este exemplo demonstra o nome do registro e o namespace:
val df = spark.read.format("avro").load("/tmp/episodes.avro")
val name = "AvroTest"
val namespace = "org.foo"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)
df.write.options(parameters).format("avro").save("/tmp/output")
# Create a DataFrame from a specified directory
df = spark.read.format("avro").load("/tmp/episodes.avro")
# Saves the subset of the Avro records read in
subset = df.where("doctor > 5")
subset.write.format("avro").save("/tmp/output")
Para query dados Avro em SQL, registre o arquivo de dados como uma tabela ou view temporária:
CREATE TEMPORARY VIEW episodes
USING avro
OPTIONS (path "/tmp/episodes.avro")
SELECT * from episodes