Parquet ファイルの読み取りと書き込み
Apache Parquet は、分析ワークロードに最適化された列指向ファイル形式です。これにより、クエリエンジンは必要な列のみを読み取り、不要な行グループをスキップできます。ParquetはDelta Lake(/delta/index.md)の基盤となるストレージ形式であり、Databricksに格納されるデータの最も一般的な形式となっています。Databricksは、Apache SparkでのParquetの読み取りと書き込みの両方をサポートしており、スキーマ指定、パーティション分割、および書き込み圧縮が含まれます。
前提条件
Databricks は Parquet ファイルを使用するのに、追加の設定は不要です。ただし、Parquet ファイルをストリームするには、Auto Loaderが必要です。
オプション
DataFrameReaderとDataFrameWriterの.option()メソッドと.options()メソッドを使用して、 Parquetデータ ソースを構成します。 サポートされているオプションの完全なリストについては、 DataFrameReader Parquet オプションとDataFrameWriter Parquet オプションを参照してください。
使い方
以下の例では、Wanderbricksサンプルデータセットを使用して、Spark DataFrame APIとSQLを使用したParquetファイルの読み書きを実演します。
SQLを使用してParquetファイルを読み取る
read_files を使用して、テーブルを作成せずに、クラウドストレージから Parquet ファイルを SQL で直接クエリします。
SELECT * FROM read_files(
'/Volumes/<catalog>/<schema>/<volume>/reviews_parquet',
format => 'parquet'
)
Parquet ファイルの読み取りと書き込み
次の例では、Wanderbricks のレビューを Parquet 形式で書き込み、DataFrame に読み戻し、上書きモードを示します。
- Python
- Scala
- SQL
# Write wanderbricks reviews to Parquet format
df = spark.read.table("samples.wanderbricks.reviews")
df.write.format("parquet").save("/Volumes/<catalog>/<schema>/<volume>/reviews_parquet")
# Read a Parquet file into a DataFrame
df = spark.read.format("parquet").load("/Volumes/<catalog>/<schema>/<volume>/reviews_parquet")
display(df)
# Write with overwrite mode
df.write.format("parquet").mode("overwrite").save("/Volumes/<catalog>/<schema>/<volume>/reviews_parquet")
// Write wanderbricks reviews to Parquet format
val reviews = spark.read.table("samples.wanderbricks.reviews")
reviews.write.format("parquet").save("/Volumes/<catalog>/<schema>/<volume>/reviews_parquet")
// Read a Parquet file into a DataFrame
val df = spark.read.format("parquet").load("/Volumes/<catalog>/<schema>/<volume>/reviews_parquet")
df.show()
// Write with overwrite mode
df.write.format("parquet").mode("overwrite").save("/Volumes/<catalog>/<schema>/<volume>/reviews_parquet")
-- Write wanderbricks reviews to Parquet format
CREATE TABLE reviews_parquet
USING PARQUET
AS SELECT * FROM samples.wanderbricks.reviews;
SELECT * FROM reviews_parquet;
スキーマの指定
Parquetファイルを読み込む際は、スキーマ推論のオーバーヘッドを避けるためにスキーマを指定してください。例えば、review_id、rating、comment のフィールドを持つスキーマを定義し、reviews_parquet を DataFrame に読み込みます。
- Python
- Scala
- SQL
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
schema = StructType([
StructField("review_id", StringType(), True),
StructField("rating", IntegerType(), True),
StructField("comment", StringType(), True)
])
df = spark.read.format("parquet").schema(schema).load("/Volumes/<catalog>/<schema>/<volume>/reviews_parquet")
df.printSchema()
df.show()
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}
val schema = StructType(Array(
StructField("review_id", StringType, nullable = true),
StructField("rating", IntegerType, nullable = true),
StructField("comment", StringType, nullable = true)
))
val df = spark.read.format("parquet").schema(schema).load("/Volumes/<catalog>/<schema>/<volume>/reviews_parquet")
df.printSchema()
df.show()
-- Create a table with an explicit schema from Parquet files
CREATE TABLE reviews_parquet (
review_id STRING,
rating INT,
comment STRING
)
USING PARQUET
OPTIONS (path "/Volumes/<catalog>/<schema>/<volume>/reviews_parquet");
SELECT * FROM reviews_parquet;
パーティション分割された Parquet ファイルの書き込み
大規模なデータセットでのクエリパフォーマンスを最適化するため、パーティション分割されたParquetファイルを書き込みます。たとえば、samples.wanderbricks.bookingsを読み込み、check_in列から派生したyearとmonthでパーティション化されたbookings_parquet_partitionedに書き込みます。
- Python
- Scala
- SQL
from pyspark.sql.functions import year, month
df = spark.read.table("samples.wanderbricks.bookings")
df_with_parts = df.withColumn("year", year("check_in")).withColumn("month", month("check_in"))
df_with_parts.write.format("parquet").partitionBy("year", "month").save("/Volumes/<catalog>/<schema>/<volume>/bookings_parquet_partitioned")
import org.apache.spark.sql.functions.{year, month}
val bookings = spark.read.table("samples.wanderbricks.bookings")
val bookingsWithParts = bookings.withColumn("year", year(col("check_in"))).withColumn("month", month(col("check_in")))
bookingsWithParts.write.format("parquet").partitionBy("year", "month").save("/Volumes/<catalog>/<schema>/<volume>/bookings_parquet_partitioned")
-- Write partitioned Parquet files by year and month
CREATE TABLE bookings_parquet_partitioned
USING PARQUET
PARTITIONED BY (year, month)
AS SELECT *, year(check_in) AS year, month(check_in) AS month
FROM samples.wanderbricks.bookings;
その他のリソース
- DatabricksにおけるDelta Lakeとは?:Parquetの列指向パフォーマンスに加え、ACIDトランザクション、スキーマ強制、またはタイムトラベルが必要な場合、Delta LakeはDatabricksに保存されるデータの推奨フォーマットです。