メインコンテンツまでスキップ

DataFrameWriterV2 クラス

v2 API を使用して DataFrame を外部ストレージに書き込むために使用されるインターフェース。

Databricks テーブルと Delta Lake のほとんどのユース ケースでは、DataFrameWriterV2 は元の DataFrameWriter よりも強力で柔軟なオプションを提供します。

  • より優れたテーブルプロパティのサポート
  • パーティショニングをより細かく制御
  • 条件付き上書き機能
  • クラスタリングのサポート
  • 作成または置換操作のより明確なセマンティクス

Spark Connectをサポート

構文

このインターフェースにアクセスするにはDataFrame.writeTo(table)を使用します。

方法

手法

説明

using(provider)

基礎となる出力データソースのプロバイダーを指定します。

option(key, value)

書き込みオプションを追加します。たとえば、マネージドテーブルを作成するには: df.writeTo("test").using("delta").option("path", "s3://test").createOrReplace()

options(**options)

書き込みオプションを追加します。

tableProperty(property, value)

テーブルプロパティを追加します。たとえば、 tableProperty("location", "s3://test") (アンマネージドテーブル.

partitionedBy(col, *cols)

指定された列または変換を使用して、create、createOrReplace、または replace によって作成された出力テーブルをパーティション分割します。

clusterBy(col, *cols)

指定された列ごとにデータをクラスター化し、クエリのパフォーマンスを最適化します。

create()

データフレームの内容から新しいテーブルを作成します。

replace()

既存のテーブルをデータフレームの内容に置き換えます。

createOrReplace()

新しいテーブルを作成するか、既存のテーブルをデータ フレームの内容に置き換えます。

append()

データフレームの内容を出力テーブルに追加します。

overwrite(condition)

指定されたフィルタ条件に一致する行を、出力テーブルのデータフレームの内容で上書きします。

overwritePartitions()

データ フレームに少なくとも 1 行が含まれているすべてのパーティションを、出力テーブルのデータ フレームの内容で上書きします。

新しいテーブルを作成する

Python
# Create a new table with DataFrame contents
df = spark.createDataFrame([{"name": "Alice", "age": 30}])
df.writeTo("my_table").create()

# Create with a specific provider
df.writeTo("my_table").using("parquet").create()

データのパーティション分割

Python
# Partition by single column
df.writeTo("my_table") \
.partitionedBy("year") \
.create()

# Partition by multiple columns
df.writeTo("my_table") \
.partitionedBy("year", "month") \
.create()

# Partition using transform functions
from pyspark.sql.functions import years, months, days

df.writeTo("my_table") \
.partitionedBy(years("date"), months("date")) \
.create()

テーブルプロパティの設定

Python
# Add table properties
df.writeTo("my_table") \
.tableProperty("key1", "value1") \
.tableProperty("key2", "value2") \
.create()

オプションの使用

Python
# Add write options
df.writeTo("my_table") \
.option("compression", "snappy") \
.option("maxRecordsPerFile", "10000") \
.create()

# Add multiple options at once
df.writeTo("my_table") \
.options(compression="snappy", maxRecordsPerFile="10000") \
.create()

データのクラスタリング

Python
# Cluster by columns for query optimization
df.writeTo("my_table") \
.clusterBy("user_id", "timestamp") \
.create()

置換操作

Python
# Replace existing table
df.writeTo("my_table") \
.using("parquet") \
.replace()

# Create or replace (safe operation)
df.writeTo("my_table") \
.using("parquet") \
.createOrReplace()

追加操作

Python
# Append to existing table
df.writeTo("my_table").append()

上書き操作

Python
from pyspark.sql.functions import col

# Overwrite specific rows based on condition
df.writeTo("my_table") \
.overwrite(col("date") == "2025-01-01")

# Overwrite entire partitions
df.writeTo("my_table") \
.overwritePartitions()

メソッドチェーン

Python
# Combine multiple configurations
df.writeTo("my_table") \
.using("parquet") \
.option("compression", "snappy") \
.tableProperty("description", "User data table") \
.partitionedBy("year", "month") \
.clusterBy("user_id") \
.createOrReplace()