Ler e gravar arquivos Parquet
O Apache Parquet é um formato de arquivo em coluna otimizado para cargas de trabalho analíticas. Ele permite que os mecanismos de consulta leiam apenas as colunas necessárias e pulem grupos de linhas irrelevantes. Parquet é o formato de armazenamento subjacente para Delta Lake(/delta/index.md), tornando-o o formato mais comum para dados armazenados no Databricks. O Databricks suporta Parquet para leitura e gravação com Apache Spark, incluindo especificação de esquema, particionamento e compressão de gravação.
Pré-requisitos
Databricks não exige configuração adicional para usar arquivos Parquet. No entanto, para transmitir arquivos Parquet, você precisa de Auto Loader.
Opções
Use os métodos .option() e .options() de DataFrameReader e DataFrameWriter para configurar a fonte de dados Parquet . Para obter uma lista completa das opções suportadas, consulte DataFrameReader Opções do Parquet e DataFrameWriter Opções do Parquet.
Uso
Os exemplos a seguir usam o dataset de exemplo Wanderbricks para demonstrar a leitura e gravação de arquivos Parquet usando a API Spark DataFrame e SQL.
Ler arquivos Parquet com SQL
Use read_files para consultar arquivos Parquet diretamente do armazenamento em cloud usando SQL, sem criar uma tabela.
SELECT * FROM read_files(
'/Volumes/<catalog>/<schema>/<volume>/reviews_parquet',
format => 'parquet'
)
Ler e gravar arquivos Parquet
Os exemplos a seguir gravam as avaliações do Wanderbricks no formato Parquet, os leem de volta em um DataFrame, e demonstram o modo de substituição.
- Python
- Scala
- SQL
# Write wanderbricks reviews to Parquet format
df = spark.read.table("samples.wanderbricks.reviews")
df.write.format("parquet").save("/Volumes/<catalog>/<schema>/<volume>/reviews_parquet")
# Read a Parquet file into a DataFrame
df = spark.read.format("parquet").load("/Volumes/<catalog>/<schema>/<volume>/reviews_parquet")
display(df)
# Write with overwrite mode
df.write.format("parquet").mode("overwrite").save("/Volumes/<catalog>/<schema>/<volume>/reviews_parquet")
// Write wanderbricks reviews to Parquet format
val reviews = spark.read.table("samples.wanderbricks.reviews")
reviews.write.format("parquet").save("/Volumes/<catalog>/<schema>/<volume>/reviews_parquet")
// Read a Parquet file into a DataFrame
val df = spark.read.format("parquet").load("/Volumes/<catalog>/<schema>/<volume>/reviews_parquet")
df.show()
// Write with overwrite mode
df.write.format("parquet").mode("overwrite").save("/Volumes/<catalog>/<schema>/<volume>/reviews_parquet")
-- Write wanderbricks reviews to Parquet format
CREATE TABLE reviews_parquet
USING PARQUET
AS SELECT * FROM samples.wanderbricks.reviews;
SELECT * FROM reviews_parquet;
Especificar esquema
Especifique um esquema ao ler arquivos Parquet para evitar a sobrecarga da inferência de esquema. Por exemplo, defina um esquema com os campos review_id, rating e comment e leia reviews_parquet em um DataFrame.
- Python
- Scala
- SQL
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
schema = StructType([
StructField("review_id", StringType(), True),
StructField("rating", IntegerType(), True),
StructField("comment", StringType(), True)
])
df = spark.read.format("parquet").schema(schema).load("/Volumes/<catalog>/<schema>/<volume>/reviews_parquet")
df.printSchema()
df.show()
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}
val schema = StructType(Array(
StructField("review_id", StringType, nullable = true),
StructField("rating", IntegerType, nullable = true),
StructField("comment", StringType, nullable = true)
))
val df = spark.read.format("parquet").schema(schema).load("/Volumes/<catalog>/<schema>/<volume>/reviews_parquet")
df.printSchema()
df.show()
-- Create a table with an explicit schema from Parquet files
CREATE TABLE reviews_parquet (
review_id STRING,
rating INT,
comment STRING
)
USING PARQUET
OPTIONS (path "/Volumes/<catalog>/<schema>/<volume>/reviews_parquet");
SELECT * FROM reviews_parquet;
Escrever arquivos Parquet particionados
Grave arquivos Parquet particionados para otimizar o desempenho de consultas em grandes conjuntos de dados. Por exemplo, leia samples.wanderbricks.bookings e o grave em bookings_parquet_partitioned particionado por year e month derivado da coluna check_in.
- Python
- Scala
- SQL
from pyspark.sql.functions import year, month
df = spark.read.table("samples.wanderbricks.bookings")
df_with_parts = df.withColumn("year", year("check_in")).withColumn("month", month("check_in"))
df_with_parts.write.format("parquet").partitionBy("year", "month").save("/Volumes/<catalog>/<schema>/<volume>/bookings_parquet_partitioned")
import org.apache.spark.sql.functions.{year, month}
val bookings = spark.read.table("samples.wanderbricks.bookings")
val bookingsWithParts = bookings.withColumn("year", year(col("check_in"))).withColumn("month", month(col("check_in")))
bookingsWithParts.write.format("parquet").partitionBy("year", "month").save("/Volumes/<catalog>/<schema>/<volume>/bookings_parquet_partitioned")
-- Write partitioned Parquet files by year and month
CREATE TABLE bookings_parquet_partitioned
USING PARQUET
PARTITIONED BY (year, month)
AS SELECT *, year(check_in) AS year, month(check_in) AS month
FROM samples.wanderbricks.bookings;
Recursos adicionais
- O que é o Delta Lake no Databricks?: Se precisar de transações ACID, imposição de esquema ou viagem do tempo juntamente com o desempenho colunar do Parquet, o Delta Lake é o formato recomendado para dados armazenados no Databricks.