DataFrameWriter クラス
DataFrameを外部ストレージシステムに書き込むために使用されるインターフェース(例:ファイルシステム、キー値ストアなど)。
Spark Connectをサポート
構文
このインターフェースにアクセスするにはDataFrame.writeを使用します。
方法
手法 | 説明 |
|---|---|
データまたはテーブルがすでに存在する場合の動作を指定します。 | |
基礎となる出力データ ソースを指定します。 | |
基礎となるデータソースの出力オプションを追加します。 | |
基礎となるデータソースの出力オプションを追加します。 | |
ファイル システム上の指定された列ごとに出力をパーティション分割します。 | |
指定された列ごとに出力をバケット化します。 | |
各バケットの出力を、ファイル システム上の指定された列で並べ替えます。 | |
| 指定された列ごとにデータをクラスター化し、クエリのパフォーマンスを最適化します。 |
DataFrameの内容をデータ ソースに保存します。 | |
DataFrame の内容を指定されたテーブルに挿入します。 | |
DataFrame の内容を指定されたテーブルとして保存します。 | |
DataFrame の内容を JSON 形式で指定されたパスに保存します。 | |
指定されたパスに、DataFrame の内容を Parquet 形式で保存します。 | |
指定されたパスのテキスト ファイルに DataFrame の内容を保存します。 | |
指定されたパスに DataFrame の内容を CSV 形式で保存します。 | |
| 指定されたパスに DataFrame の内容を XML 形式で保存します。 |
指定されたパスに DataFrame の内容を ORC 形式で保存します。 | |
| DataFrame の内容を指定されたパスに Excel 形式で保存します。 |
DataFrame の内容を JDBC 経由で外部データベース テーブルに保存します。 |
保存モード
mode()メソッドは次のオプションをサポートしています。
- append : この DataFrame の内容を既存のデータに追加します。
- overwrite : 既存のデータを上書きします。
- error または errorifexists : データがすでに存在する場合は例外をスローします (デフォルト)。
- ignore : データがすでに存在する場合は、この操作を黙って無視します。
例
異なるデータソースへの書き込み
# Access DataFrameWriter through DataFrame
df = spark.createDataFrame([{"name": "Alice", "age": 30}])
df.write
# Write to JSON file
df.write.json("path/to/output.json")
# Write to CSV file with options
df.write.option("header", "true").csv("path/to/output.csv")
# Write to Parquet file
df.write.parquet("path/to/output.parquet")
# Write to a table
df.write.saveAsTable("table_name")
フォーマットして保存する
# Specify format explicitly
df.write.format("json").save("path/to/output.json")
# With options
df.write.format("csv") \
.option("header", "true") \
.option("compression", "gzip") \
.save("path/to/output.csv")
保存モードの指定
# Overwrite existing data
df.write.mode("overwrite").parquet("path/to/output.parquet")
# Append to existing data
df.write.mode("append").parquet("path/to/output.parquet")
# Ignore if data exists
df.write.mode("ignore").json("path/to/output.json")
# Error if data exists (default)
df.write.mode("error").csv("path/to/output.csv")
データのパーティション分割
# Partition by single column
df.write.partitionBy("year").parquet("path/to/output.parquet")
# Partition by multiple columns
df.write.partitionBy("year", "month").parquet("path/to/output.parquet")
# Partition with bucketing
df.write \
.bucketBy(10, "id") \
.sortBy("age") \
.saveAsTable("bucketed_table")
JDBCへの書き込み
# Write to database table
df.write.jdbc(
url="jdbc:postgresql://localhost:5432/mydb",
table="users",
mode="overwrite",
properties={"user": "myuser", "password": "mypassword"}
)
メソッドチェーン
# Chain multiple configuration methods
df.write \
.format("parquet") \
.mode("overwrite") \
.option("compression", "snappy") \
.partitionBy("year", "month") \
.save("path/to/output")
テーブルへの書き込み
# Save as managed table
df.write.saveAsTable("my_table")
# Save as managed table with options
df.write \
.mode("overwrite") \
.format("parquet") \
.partitionBy("year") \
.saveAsTable("partitioned_table")
# Insert into existing table
df.write.insertInto("existing_table")
# Insert into existing table with overwrite
df.write.insertInto("existing_table", overwrite=True)