メインコンテンツまでスキップ

Parquet ファイルの読み取りと書き込み

Apache Parquet は、分析ワークロードに最適化された列指向ファイル形式です。これにより、クエリエンジンは必要な列のみを読み取り、不要な行グループをスキップできます。ParquetはDelta Lake(/delta/index.md)の基盤となるストレージ形式であり、Databricksに格納されるデータの最も一般的な形式となっています。Databricksは、Apache SparkでのParquetの読み取りと書き込みの両方をサポートしており、スキーマ指定、パーティション分割、および書き込み圧縮が含まれます。

前提条件

Databricks は Parquet ファイルを使用するのに、追加の設定は不要です。ただし、Parquet ファイルをストリームするには、Auto Loaderが必要です。

オプション

DataFrameReaderDataFrameWriter.option()メソッドと.options()メソッドを使用して、 Parquetデータ ソースを構成します。 サポートされているオプションの完全なリストについては、 DataFrameReader Parquet オプションDataFrameWriter Parquet オプションを参照してください。

使い方

以下の例では、Wanderbricksサンプルデータセットを使用して、Spark DataFrame APIとSQLを使用したParquetファイルの読み書きを実演します。

SQLを使用してParquetファイルを読み取る

read_files を使用して、テーブルを作成せずに、クラウドストレージから Parquet ファイルを SQL で直接クエリします。

SQL
SELECT * FROM read_files(
'/Volumes/<catalog>/<schema>/<volume>/reviews_parquet',
format => 'parquet'
)

Parquet ファイルの読み取りと書き込み

次の例では、Wanderbricks のレビューを Parquet 形式で書き込み、DataFrame に読み戻し、上書きモードを示します。

Python
# Write wanderbricks reviews to Parquet format
df = spark.read.table("samples.wanderbricks.reviews")
df.write.format("parquet").save("/Volumes/<catalog>/<schema>/<volume>/reviews_parquet")

# Read a Parquet file into a DataFrame
df = spark.read.format("parquet").load("/Volumes/<catalog>/<schema>/<volume>/reviews_parquet")
display(df)

# Write with overwrite mode
df.write.format("parquet").mode("overwrite").save("/Volumes/<catalog>/<schema>/<volume>/reviews_parquet")

スキーマの指定

Parquetファイルを読み込む際は、スキーマ推論のオーバーヘッドを避けるためにスキーマを指定してください。例えば、review_idratingcomment のフィールドを持つスキーマを定義し、reviews_parquet を DataFrame に読み込みます。

Python
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

schema = StructType([
StructField("review_id", StringType(), True),
StructField("rating", IntegerType(), True),
StructField("comment", StringType(), True)
])

df = spark.read.format("parquet").schema(schema).load("/Volumes/<catalog>/<schema>/<volume>/reviews_parquet")
df.printSchema()
df.show()

パーティション分割された Parquet ファイルの書き込み

大規模なデータセットでのクエリパフォーマンスを最適化するため、パーティション分割されたParquetファイルを書き込みます。たとえば、samples.wanderbricks.bookingsを読み込み、check_in列から派生したyearmonthでパーティション化されたbookings_parquet_partitionedに書き込みます。

Python
from pyspark.sql.functions import year, month

df = spark.read.table("samples.wanderbricks.bookings")
df_with_parts = df.withColumn("year", year("check_in")).withColumn("month", month("check_in"))
df_with_parts.write.format("parquet").partitionBy("year", "month").save("/Volumes/<catalog>/<schema>/<volume>/bookings_parquet_partitioned")

その他のリソース

  • DatabricksにおけるDelta Lakeとは?:Parquetの列指向パフォーマンスに加え、ACIDトランザクション、スキーマ強制、またはタイムトラベルが必要な場合、Delta LakeはDatabricksに保存されるデータの推奨フォーマットです。