CSV ファイルの読み取りと書き込み
CSV(comma-separated values)は、データ交換、ETLパイプライン、汎用的なデータストレージに広く利用されているプレーンテキスト形式の表形式です。Databricks は、Apache Spark を使用した CSV の読み取りと書き込みの両方で、スキーマ推論、圧縮、不正な形式のレコード処理、およびレスキューデータをサポートしています。
Databricks では、SQL ユーザーが CSV ファイルを読み取るために、read_files テーブル値関数を推奨しています。read_files は Databricks Runtime 13.3 LTS 以降で使用できます。
一時的なビューを使用することもできます。一時ビューまたはread_filesを使用せずにSQLを使用してCSVデータを直接読み取る場合は、次の制限が適用されます。
- データソースのオプションは指定できません。
- データの スキーマを指定する ことはできません。
前提条件
Databricks は CSV ファイルを使用するために、追加設定は必要ありません。ただし、CSVファイルをストリームするには、Auto Loaderが必要です。
オプション
CSVデータ ソースを構成するには、 DataFrameReaderとDataFrameWriterの.option()メソッドと.options()メソッドを使用します。 サポートされているオプションの完全なリストについては、 DataFrameReader CSV オプションとDataFrameWriter CSV オプションを参照してください。
使い方
以下の例では、CSV ファイルの読み取りと書き込み、スキーマの指定、不正なレコードの処理方法を示します。
CSV ファイルの読み取り
以下の例では、ワンダーブリックス サンプルデータセットを使用しています。レビューデータを CSV に書き込み、再度読み込みます。
- Python
- Scala
- R
# Write wanderbricks reviews to CSV format
df = spark.read.table("samples.wanderbricks.reviews")
df.write.format("csv").option("header", "true").save("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
# Read the CSV file into a DataFrame
df = (spark.read
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv"))
display(df)
df.printSchema()
// Write wanderbricks reviews to CSV format
val reviews = spark.read.table("samples.wanderbricks.reviews")
reviews.write.format("csv").option("header", "true").save("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
// Read the CSV file into a DataFrame
val df = spark.read
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
df.show()
df.printSchema()
df <- read.df("/Volumes/<catalog>/<schema>/<volume>/reviews_csv", source = "csv", header = "true", inferSchema = "true")
display(df)
printSchema(df)
SQLを使用したCSVファイルの読み取り
次のSQL例では、 read_files を使用してCSVファイルを読み取ります。
-- mode "FAILFAST" aborts file parsing with a RuntimeException if malformed lines are encountered
SELECT * FROM read_files(
'gs://<bucket>/<path>/<file>.csv',
format => 'csv',
header => true,
mode => 'FAILFAST')
スキーマの指定
CSVファイルのスキーマがわかっている場合は、 schema オプションを用いてCSVリーダーに目的のスキーマを指定できます。
- Python
- Scala
- SQL
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
schema = StructType([
StructField("review_id", StringType(), True),
StructField("rating", IntegerType(), True),
StructField("comment", StringType(), True)
])
df = spark.read.format("csv").schema(schema).option("header", "true").load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
df.printSchema()
import org.apache.spark.sql.types._
val schema = StructType(Array(
StructField("review_id", StringType, nullable = true),
StructField("rating", IntegerType, nullable = true),
StructField("comment", StringType, nullable = true)
))
val df = spark.read.format("csv").schema(schema).option("header", "true").load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
df.printSchema()
SELECT * FROM read_files(
'/Volumes/<catalog>/<schema>/<volume>/reviews_csv',
format => 'csv',
header => true,
schema => 'review_id string, rating int, comment string'
)
列のサブセットを読み取る
CSV パーサーの動作は、読み取られる列によって異なります。指定されたスキーマがファイルレイアウトと一致しない場合、アクセスされる列によって結果が大きく異なる可能性があります。CSVには列名のメタデータがないため、Sparkはスキーマフィールドを位置で列にマッピングします。そのため、スキーマが一致しないと、値が誤ったフィールドにずれてしまいます。
- Python
- Scala
- SQL
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
# Read only a subset of columns by specifying a partial schema
schema = StructType([
StructField("review_id", StringType(), True),
StructField("rating", IntegerType(), True)
])
df = spark.read.format("csv").schema(schema).option("header", "true").load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
display(df)
import org.apache.spark.sql.types._
val schema = StructType(Array(
StructField("review_id", StringType, nullable = true),
StructField("rating", IntegerType, nullable = true)
))
val df = spark.read.format("csv").schema(schema).option("header", "true").load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
df.show()
SELECT * FROM read_files(
'/Volumes/<catalog>/<schema>/<volume>/reviews_csv',
format => 'csv',
header => true,
schema => 'review_id string, rating int'
)
不正な形式の CSV レコードを処理する
指定されたスキーマを持つCSVファイルを読み取る場合、ファイル内のデータがスキーマと一致しない可能性があります。例えば、都市の名前を含むフィールドは整数として解析されません。パーサーが実行されるモードによって結果は異なります。
PERMISSIVE(デフォルト): 正しく解析されなかったフィールドにnullが挿入されますDROPMALFORMED: 解析されなかったフィールドを含む行を削除しますFAILFAST: 不正な形式のデータが見つかった場合に読み取りを中止します
モードを設定するには、mode オプションを使用します。
- Python
- Scala
- SQL
df = (spark.read
.format("csv")
.option("header", "true")
.option("mode", "PERMISSIVE")
.load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
)
val df = spark.read
.format("csv")
.option("header", "true")
.option("mode", "PERMISSIVE")
.load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
SELECT * FROM read_files(
'/Volumes/<catalog>/<schema>/<volume>/reviews_csv',
format => 'csv',
header => true,
mode => 'PERMISSIVE'
)
PERMISSIVE モードでは、次のいずれかの方法を使用して、正しく解析されなかった行を検査することができます。
- 破損したレコードをファイルに記録するために
badRecordsPathオプションにカスタムパスを指定できます。 - データフレームReaderに提供されたスキーマに列
_corrupt_recordを追加して、結果のデータフレーム内の破損したレコードを確認できます。
badRecordsPath オプションは _corrupt_recordよりも優先されます。つまり、指定されたパスに書き込まれた不正な形式の行は、結果のデータフレームには表示されません。
不正な形式のレコードのデフォルト動作は、 レスキューされたデータ列を使用すると変更されます。
_corrupt_recordを使用して形式が正しくない行を検査するには、スキーマに追加して、空値以外の値でフィルタリングします
- Python
- Scala
- SQL
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
schema = StructType([
StructField("review_id", StringType(), True),
StructField("rating", IntegerType(), True),
StructField("comment", StringType(), True),
StructField("_corrupt_record", StringType(), True)
])
df = (spark.read
.format("csv")
.option("header", "true")
.option("mode", "PERMISSIVE")
.schema(schema)
.load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
)
display(df.filter(df["_corrupt_record"].isNotNull()))
import org.apache.spark.sql.types._
val schema = StructType(Array(
StructField("review_id", StringType, nullable = true),
StructField("rating", IntegerType, nullable = true),
StructField("comment", StringType, nullable = true),
StructField("_corrupt_record", StringType, nullable = true)
))
val df = spark.read
.format("csv")
.option("header", "true")
.option("mode", "PERMISSIVE")
.schema(schema)
.load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
df.filter(df("_corrupt_record").isNotNull).show()
SELECT * FROM read_files(
'/Volumes/<catalog>/<schema>/<volume>/reviews_csv',
format => 'csv',
header => true,
mode => 'PERMISSIVE',
schema => 'review_id string, rating int, comment string, _corrupt_record string'
)
WHERE _corrupt_record IS NOT NULL
レスキューされたデータ列を有効にする
この機能は、Databricks Runtime 8.3以降でサポートされています。
PERMISSIVE モードを使用すると、レスキューされたデータ列を有効にして、レコード内の 1 つ以上のフィールドに次のいずれかの問題があるために解析されなかったデータをキャプチャできます。
- 指定されたスキーマに存在しない
- 指定されたスキーマのデータ型と一致しない
- 指定されたスキーマのフィールド名と大文字小文字の組み合わせが一致しない
レスキューされたデータ列は、レスキューされた列とレコードのソースファイルパスを含むJSONドキュメントとして返されます。
レスキューされたデータ列を有効にするには、読み取るときに rescuedDataColumn オプションを列名に設定します。
- Python
- Scala
- SQL
df = spark.read.option("rescuedDataColumn", "_rescued_data").format("csv").load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
val df = spark.read.option("rescuedDataColumn", "_rescued_data").format("csv").load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
SELECT * FROM read_files(
'/Volumes/<catalog>/<schema>/<volume>/reviews_csv',
format => 'csv',
header => true,
rescuedDataColumn => '_rescued_data'
)
レスキューされたデータ列からソースファイルパスを削除するには、設定します:
spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")
CSVパーサーは、レコードの解析時に PERMISSIVE、 DROPMALFORMED、および FAILFASTの3つのモードで対応します。rescuedDataColumnと組み合わせて使用すると、データ型の不一致によってDROPMALFORMEDモードでレコードが削除されたり、FAILFASTモードでエラーがスローされたりすることはありません。破損したレコード(不完全または不正な形式のCSV)のみが削除されるか、エラーがスローされます。
rescuedDataColumn を PERMISSIVE モードで使用すると、破損したレコードには次のルールが適用されます。
- ファイルの最初の行(ヘッダー行またはデータ行)は、予想される行の長さを設定します。
- 列数が異なる行は不完全と見なされます。
- データ型の不一致は、破損したレコードとは見なされません。
- 不完全で不正な形式のCSVレコードのみが破損していると見なされ、
_corrupt_record列またはbadRecordsPathに記録されます。
その他のリソース
- Parquet ファイルの読み書き: ワークロードでより優れたクエリパフォーマンスやより効率的なストレージが必要な場合、Parquet のカラム型レイアウトは、CSV のプレーンテキスト形式よりも大きな利点を提供します。