DataFrameWriterV2 クラス
v2 API を使用して DataFrame を外部ストレージに書き込むために使用されるインターフェース。
Databricks テーブルと Delta Lake のほとんどのユース ケースでは、DataFrameWriterV2 は元の DataFrameWriter よりも強力で柔軟なオプションを提供します。
- より優れたテーブルプロパティのサポート
- パーティショニングをより細かく制御
- 条件付き上書き機能
- クラスタリングのサポート
- 作成または置換操作のより明確なセマンティクス
Spark Connectをサポート
構文
このインターフェースにアクセスするにはDataFrame.writeTo(table)を使用します。
方法
手法 | 説明 |
|---|---|
基礎となる出力データソースのプロバイダーを指定します。 | |
書き込みオプションを追加します。たとえば、マネージドテーブルを作成するには: | |
書き込みオプションを追加します。 | |
テーブルプロパティを追加します。たとえば、 | |
指定された列または変換を使用して、create、createOrReplace、または replace によって作成された出力テーブルをパーティション分割します。 | |
| 指定された列ごとにデータをクラスター化し、クエリのパフォーマンスを最適化します。 |
データフレームの内容から新しいテーブルを作成します。 | |
既存のテーブルをデータフレームの内容に置き換えます。 | |
新しいテーブルを作成するか、既存のテーブルをデータ フレームの内容に置き換えます。 | |
データフレームの内容を出力テーブルに追加します。 | |
指定されたフィルタ条件に一致する行を、出力テーブルのデータフレームの内容で上書きします。 | |
データ フレームに少なくとも 1 行が含まれているすべてのパーティションを、出力テーブルのデータ フレームの内容で上書きします。 |
例
新しいテーブルを作成する
# Create a new table with DataFrame contents
df = spark.createDataFrame([{"name": "Alice", "age": 30}])
df.writeTo("my_table").create()
# Create with a specific provider
df.writeTo("my_table").using("parquet").create()
データのパーティション分割
# Partition by single column
df.writeTo("my_table") \
.partitionedBy("year") \
.create()
# Partition by multiple columns
df.writeTo("my_table") \
.partitionedBy("year", "month") \
.create()
# Partition using transform functions
from pyspark.sql.functions import years, months, days
df.writeTo("my_table") \
.partitionedBy(years("date"), months("date")) \
.create()
テーブルプロパティの設定
# Add table properties
df.writeTo("my_table") \
.tableProperty("key1", "value1") \
.tableProperty("key2", "value2") \
.create()
オプションの使用
# Add write options
df.writeTo("my_table") \
.option("compression", "snappy") \
.option("maxRecordsPerFile", "10000") \
.create()
# Add multiple options at once
df.writeTo("my_table") \
.options(compression="snappy", maxRecordsPerFile="10000") \
.create()
データのクラスタリング
# Cluster by columns for query optimization
df.writeTo("my_table") \
.clusterBy("user_id", "timestamp") \
.create()
置換操作
# Replace existing table
df.writeTo("my_table") \
.using("parquet") \
.replace()
# Create or replace (safe operation)
df.writeTo("my_table") \
.using("parquet") \
.createOrReplace()
追加操作
# Append to existing table
df.writeTo("my_table").append()
上書き操作
from pyspark.sql.functions import col
# Overwrite specific rows based on condition
df.writeTo("my_table") \
.overwrite(col("date") == "2025-01-01")
# Overwrite entire partitions
df.writeTo("my_table") \
.overwritePartitions()
メソッドチェーン
# Combine multiple configurations
df.writeTo("my_table") \
.using("parquet") \
.option("compression", "snappy") \
.tableProperty("description", "User data table") \
.partitionedBy("year", "month") \
.clusterBy("user_id") \
.createOrReplace()