メインコンテンツまでスキップ

DataFrameWriter クラス

DataFrameを外部ストレージシステムに書き込むために使用されるインターフェース(例:ファイルシステム、キー値ストアなど)。

Spark Connectをサポート

構文

このインターフェースにアクセスするにはDataFrame.writeを使用します。

方法

手法

説明

mode(saveMode)

データまたはテーブルがすでに存在する場合の動作を指定します。

format(source)

基礎となる出力データ ソースを指定します。

option(key, value)

基礎となるデータソースの出力オプションを追加します。

options(**options)

基礎となるデータソースの出力オプションを追加します。

partitionBy(*cols)

ファイル システム上の指定された列ごとに出力をパーティション分割します。

bucketBy(numBuckets, col, *cols)

指定された列ごとに出力をバケット化します。

sortBy(col, *cols)

各バケットの出力を、ファイル システム上の指定された列で並べ替えます。

clusterBy(*cols)

指定された列ごとにデータをクラスター化し、クエリのパフォーマンスを最適化します。

save(path, format, mode, partitionBy, **options)

DataFrameの内容をデータ ソースに保存します。

insertInto(tableName, overwrite)

DataFrame の内容を指定されたテーブルに挿入します。

saveAsTable(name, format, mode, partitionBy, **options)

DataFrame の内容を指定されたテーブルとして保存します。

json(path, mode, compression, ...)

DataFrame の内容を JSON 形式で指定されたパスに保存します。

parquet(path, mode, partitionBy, compression)

指定されたパスに、DataFrame の内容を Parquet 形式で保存します。

text(path, compression, lineSep)

指定されたパスのテキスト ファイルに DataFrame の内容を保存します。

csv(path, mode, compression, sep, ...)

指定されたパスに DataFrame の内容を CSV 形式で保存します。

xml(path, rowTag, mode, ...)

指定されたパスに DataFrame の内容を XML 形式で保存します。

orc(path, mode, partitionBy, compression)

指定されたパスに DataFrame の内容を ORC 形式で保存します。

excel(path, mode, dataAddress, headerRows)

DataFrame の内容を指定されたパスに Excel 形式で保存します。

jdbc(url, table, mode, properties)

DataFrame の内容を JDBC 経由で外部データベース テーブルに保存します。

保存モード

mode()メソッドは次のオプションをサポートしています。

  • append : この DataFrame の内容を既存のデータに追加します。
  • overwrite : 既存のデータを上書きします。
  • error または errorifexists : データがすでに存在する場合は例外をスローします (デフォルト)。
  • ignore : データがすでに存在する場合は、この操作を黙って無視します。

異なるデータソースへの書き込み

Python
# Access DataFrameWriter through DataFrame
df = spark.createDataFrame([{"name": "Alice", "age": 30}])
df.write

# Write to JSON file
df.write.json("path/to/output.json")

# Write to CSV file with options
df.write.option("header", "true").csv("path/to/output.csv")

# Write to Parquet file
df.write.parquet("path/to/output.parquet")

# Write to a table
df.write.saveAsTable("table_name")

フォーマットして保存する

Python
# Specify format explicitly
df.write.format("json").save("path/to/output.json")

# With options
df.write.format("csv") \
.option("header", "true") \
.option("compression", "gzip") \
.save("path/to/output.csv")

保存モードの指定

Python
# Overwrite existing data
df.write.mode("overwrite").parquet("path/to/output.parquet")

# Append to existing data
df.write.mode("append").parquet("path/to/output.parquet")

# Ignore if data exists
df.write.mode("ignore").json("path/to/output.json")

# Error if data exists (default)
df.write.mode("error").csv("path/to/output.csv")

データのパーティション分割

Python
# Partition by single column
df.write.partitionBy("year").parquet("path/to/output.parquet")

# Partition by multiple columns
df.write.partitionBy("year", "month").parquet("path/to/output.parquet")

# Partition with bucketing
df.write \
.bucketBy(10, "id") \
.sortBy("age") \
.saveAsTable("bucketed_table")

JDBCへの書き込み

Python
# Write to database table
df.write.jdbc(
url="jdbc:postgresql://localhost:5432/mydb",
table="users",
mode="overwrite",
properties={"user": "myuser", "password": "mypassword"}
)

メソッドチェーン

Python
# Chain multiple configuration methods
df.write \
.format("parquet") \
.mode("overwrite") \
.option("compression", "snappy") \
.partitionBy("year", "month") \
.save("path/to/output")

テーブルへの書き込み

Python
# Save as managed table
df.write.saveAsTable("my_table")

# Save as managed table with options
df.write \
.mode("overwrite") \
.format("parquet") \
.partitionBy("year") \
.saveAsTable("partitioned_table")

# Insert into existing table
df.write.insertInto("existing_table")

# Insert into existing table with overwrite
df.write.insertInto("existing_table", overwrite=True)