Arquivo Avro
O Apache Avro é um sistema de serialização de dados. A Avro fornece:
- Estruturas de dados ricas.
- Um formato de dados binário compacto e rápido.
- Um arquivo de contêiner, para armazenar dados persistentes.
- Chamada de procedimento remoto (RPC).
- Integração simples com linguagens dinâmicas. A geração de código não é necessária para ler ou gravar arquivos de dados nem para usar ou implementar protocolos RPC. Geração de código como uma otimização opcional, que só vale a pena implementar para linguagens de tipagem estática.
O site Avro fonte de dados oferece suporte:
- Conversão de esquema: Conversão automática entre registros do Apache Spark SQL e do Avro.
- Particionamento: leitura e gravação fáceis de dados particionados sem nenhuma configuração extra.
- Compression (Compressão): Compressão a ser usada ao gravar o Avro no disco. Os tipos suportados são
uncompressed
,snappy
edeflate
. Você também pode especificar o nível de deflação. - Nomes de registro: registre o nome e o namespace passando um mapa de parâmetros com
recordName
erecordNamespace
.
Consulte também a transmissão de leitura e gravação Avro data.
Configuração
O senhor pode alterar o comportamento de uma Avro fonte de dados usando vários parâmetros de configuração.
Para ignorar arquivos sem a extensão .avro
durante a leitura, o senhor pode definir o parâmetro avro.mapred.ignore.inputs.without.extension
na configuração do Hadoop. O endereço default é false
.
spark
.sparkContext
.hadoopConfiguration
.set("avro.mapred.ignore.inputs.without.extension", "true")
Para configurar a compactação durante a gravação, defina as seguintes propriedades do Spark:
- Codec de compressão:
spark.sql.avro.compression.codec
. Os codecs suportados sãosnappy
edeflate
. O codec default ésnappy
. - Se o codec de compressão for
deflate
, você pode definir o nível de compressão com:spark.sql.avro.deflate.level
. O nível default é-1
.
O senhor pode definir essas propriedades na configuração doSpark de clustering ou em tempo de execução usando spark.conf.set()
. Por exemplo:
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")
Para Databricks Runtime 9.1 LTS e acima, o senhor pode alterar o comportamento de inferência do esquema default em Avro fornecendo a opção mergeSchema
ao ler arquivos. Ao definir mergeSchema
como true
, o senhor deduzirá um esquema de um conjunto de arquivos Avro no diretório de destino e os merge em vez de deduzir o esquema de leitura de um único arquivo.
Tipos compatíveis com a conversão Avro -> Spark SQL
Essa biblioteca suporta a leitura de todos os tipos de Avro. Ele usa o seguinte mapeamento de tipos Avro para tipos Spark SQL:
Tipo Avro | Tipo Spark SQL |
---|---|
boolean | Tipo booleano |
int | Tipo de número inteiro |
long | Tipo longo |
Float | Tipo de flutuação |
double | Tipo duplo |
bytes | Tipo binário |
string | Tipo de string |
disco | Tipo de estrutura |
enumeração | Tipo de string |
matriz | Tipo de matriz |
map | Tipo de mapa |
Fixo | Tipo binário |
união | Consulte Tipos de união. |
Tipos de união
O site Avro fonte de dados suporta a leitura de tipos union
. A Avro considera os três tipos a seguir como union
:
union(int, long)
mapeia paraLongType
.union(float, double)
mapeia paraDoubleType
.union(something, null)
ondesomething
é qualquer tipo de Avro compatível. Isso mapeia para o mesmo tipo de Spark SQL que o desomething
, comnullable
definido comotrue
.
Todos os outros tipos de union
são tipos complexos. Eles mapeiam para
StructType
em que os nomes dos campos são member0
, member1
e assim por diante, em
de acordo com os membros do union
. Isso é consistente com o comportamento do site
ao converter entre Avro e Parquet.
Tipos lógicos
A Avro fonte de dados suporta a leitura dos seguintes tipos lógicosAvro:
Tipo lógico Avro | Tipo Avro | Tipo Spark SQL |
---|---|---|
Data | int | Tipo de data |
timestamp-millis | long | timestampType |
timestamp-micros | long | timestampType |
Decimal | Fixo | Tipo decimal |
Decimal | bytes | Tipo decimal |
A Avro fonte de dados ignora documentos, aliases e outras propriedades presentes no arquivo Avro.
Tipos suportados para Spark SQL -> Conversão Avro
Essa biblioteca oferece suporte à gravação de todos os tipos de Spark SQL no Avro. Para a maioria dos tipos
, o mapeamento dos tipos Spark para os tipos Avro é direto
(por exemplo, IntegerType
é convertido em int
); a seguir, uma lista dos poucos casos especiais:
Tipo Spark SQL | Tipo Avro | Tipo lógico Avro |
---|---|---|
Byte Type | int | |
Tipo curto | int | |
Tipo binário | bytes | |
Tipo decimal | Fixo | Decimal |
timestampType | long | timestamp-micros |
Tipo de data | int | Data |
O senhor também pode especificar todo o esquema Avro de saída com a opção avroSchema
, de modo que os tipos Spark SQL possam ser convertidos em outros tipos Avro.
As conversões a seguir não são aplicadas pelo site default e exigem um esquema Avro especificado pelo usuário:
Tipo Spark SQL | Tipo Avro | Tipo lógico Avro |
---|---|---|
Byte Type | Fixo | |
Tipo de string | enumeração | |
Tipo decimal | bytes | Decimal |
timestampType | long | timestamp-millis |
Exemplos
Esses exemplos usam o arquivo episódios.avro.
- Scala
- Python
- SQL
// The Avro records are converted to Spark types, filtered, and
// then written back out as Avro records
val df = spark.read.format("avro").load("/tmp/episodes.avro")
df.filter("doctor > 5").write.format("avro").save("/tmp/output")
This example demonstrates a custom Avro schema:
import org.apache.avro.Schema
val schema = new Schema.Parser().parse(new File("episode.avsc"))
spark
.read
.format("avro")
.option("avroSchema", schema.toString)
.load("/tmp/episodes.avro")
.show()
This example demonstrates Avro compression options:
// configuration to use deflate compression
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")
val df = spark.read.format("avro").load("/tmp/episodes.avro")
// writes out compressed Avro records
df.write.format("avro").save("/tmp/output")
This example demonstrates partitioned Avro records:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().master("local").getOrCreate()
val df = spark.createDataFrame(
Seq(
(2012, 8, "Batman", 9.8),
(2012, 8, "Hero", 8.7),
(2012, 7, "Robot", 5.5),
(2011, 7, "Git", 2.0))
).toDF("year", "month", "title", "rating")
df.toDF.write.format("avro").partitionBy("year", "month").save("/tmp/output")
This example demonstrates the record name and namespace:
val df = spark.read.format("avro").load("/tmp/episodes.avro")
val name = "AvroTest"
val namespace = "org.foo"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)
df.write.options(parameters).format("avro").save("/tmp/output")
# Create a DataFrame from a specified directory
df = spark.read.format("avro").load("/tmp/episodes.avro")
# Saves the subset of the Avro records read in
subset = df.where("doctor > 5")
subset.write.format("avro").save("/tmp/output")
To query Avro data in SQL, register the data file as a table or temporary view:
CREATE TEMPORARY VIEW episodes
USING avro
OPTIONS (path "/tmp/episodes.avro")
SELECT * from episodes
Notebook exemplo: Ler e gravar arquivos em Avro
O Notebook a seguir demonstra como ler e gravar arquivos Avro.