メインコンテンツまでスキップ

CSV ファイルの読み取りと書き込み

CSV(comma-separated values)は、データ交換、ETLパイプライン、汎用的なデータストレージに広く利用されているプレーンテキスト形式の表形式です。Databricks は、Apache Spark を使用した CSV の読み取りと書き込みの両方で、スキーマ推論、圧縮、不正な形式のレコード処理、およびレスキューデータをサポートしています。

注記

Databricks では、SQL ユーザーが CSV ファイルを読み取るためにread_files テーブル値関数を推奨していますread_files は Databricks Runtime 13.3 LTS 以降で使用できます。

一時的なビューを使用することもできます。一時ビューまたはread_filesを使用せずにSQLを使用してCSVデータを直接読み取る場合は、次の制限が適用されます。

前提条件

Databricks は CSV ファイルを使用するために、追加設定は必要ありません。ただし、CSVファイルをストリームするには、Auto Loaderが必要です。

オプション

CSVデータ ソースを構成するには、 DataFrameReaderDataFrameWriter.option()メソッドと.options()メソッドを使用します。 サポートされているオプションの完全なリストについては、 DataFrameReader CSV オプションDataFrameWriter CSV オプションを参照してください。

使い方

以下の例では、CSV ファイルの読み取りと書き込み、スキーマの指定、不正なレコードの処理方法を示します。

CSV ファイルの読み取り

以下の例では、ワンダーブリックス サンプルデータセットを使用しています。レビューデータを CSV に書き込み、再度読み込みます。

Python
# Write wanderbricks reviews to CSV format
df = spark.read.table("samples.wanderbricks.reviews")
df.write.format("csv").option("header", "true").save("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")

# Read the CSV file into a DataFrame
df = (spark.read
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv"))
display(df)
df.printSchema()

SQLを使用したCSVファイルの読み取り

次のSQL例では、 read_files を使用してCSVファイルを読み取ります。

SQL
-- mode "FAILFAST" aborts file parsing with a RuntimeException if malformed lines are encountered
SELECT * FROM read_files(
's3://<bucket>/<path>/<file>.csv',
format => 'csv',
header => true,
mode => 'FAILFAST')

スキーマの指定

CSVファイルのスキーマがわかっている場合は、 schema オプションを用いてCSVリーダーに目的のスキーマを指定できます。

Python
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

schema = StructType([
StructField("review_id", StringType(), True),
StructField("rating", IntegerType(), True),
StructField("comment", StringType(), True)
])

df = spark.read.format("csv").schema(schema).option("header", "true").load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
df.printSchema()

列のサブセットを読み取る

CSV パーサーの動作は、読み取られる列によって異なります。指定されたスキーマがファイルレイアウトと一致しない場合、アクセスされる列によって結果が大きく異なる可能性があります。CSVには列名のメタデータがないため、Sparkはスキーマフィールドを位置で列にマッピングします。そのため、スキーマが一致しないと、値が誤ったフィールドにずれてしまいます。

Python
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Read only a subset of columns by specifying a partial schema
schema = StructType([
StructField("review_id", StringType(), True),
StructField("rating", IntegerType(), True)
])

df = spark.read.format("csv").schema(schema).option("header", "true").load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
display(df)

不正な形式の CSV レコードを処理する

指定されたスキーマを持つCSVファイルを読み取る場合、ファイル内のデータがスキーマと一致しない可能性があります。例えば、都市の名前を含むフィールドは整数として解析されません。パーサーが実行されるモードによって結果は異なります。

  • PERMISSIVE (デフォルト): 正しく解析されなかったフィールドにnullが挿入されます
  • DROPMALFORMED: 解析されなかったフィールドを含む行を削除します
  • FAILFAST: 不正な形式のデータが見つかった場合に読み取りを中止します

モードを設定するには、mode オプションを使用します。

Python
df = (spark.read
.format("csv")
.option("header", "true")
.option("mode", "PERMISSIVE")
.load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
)

PERMISSIVE モードでは、次のいずれかの方法を使用して、正しく解析されなかった行を検査することができます。

  • 破損したレコードをファイルに記録するために badRecordsPath オプションにカスタムパスを指定できます。
  • データフレームReaderに提供されたスキーマに列 _corrupt_record を追加して、結果のデータフレーム内の破損したレコードを確認できます。
注記

badRecordsPath オプションは _corrupt_recordよりも優先されます。つまり、指定されたパスに書き込まれた不正な形式の行は、結果のデータフレームには表示されません。

不正な形式のレコードのデフォルト動作は、 レスキューされたデータ列を使用すると変更されます。

_corrupt_recordを使用して形式が正しくない行を検査するには、スキーマに追加して、空値以外の値でフィルタリングします

Python
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

schema = StructType([
StructField("review_id", StringType(), True),
StructField("rating", IntegerType(), True),
StructField("comment", StringType(), True),
StructField("_corrupt_record", StringType(), True)
])

df = (spark.read
.format("csv")
.option("header", "true")
.option("mode", "PERMISSIVE")
.schema(schema)
.load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
)
display(df.filter(df["_corrupt_record"].isNotNull()))

レスキューされたデータ列を有効にする

注記

この機能は、Databricks Runtime 8.3以降でサポートされています。

PERMISSIVE モードを使用すると、レスキューされたデータ列を有効にして、レコード内の 1 つ以上のフィールドに次のいずれかの問題があるために解析されなかったデータをキャプチャできます。

  • 指定されたスキーマに存在しない
  • 指定されたスキーマのデータ型と一致しない
  • 指定されたスキーマのフィールド名と大文字小文字の組み合わせが一致しない

レスキューされたデータ列は、レスキューされた列とレコードのソースファイルパスを含むJSONドキュメントとして返されます。

レスキューされたデータ列を有効にするには、読み取るときに rescuedDataColumn オプションを列名に設定します。

Python
df = spark.read.option("rescuedDataColumn", "_rescued_data").format("csv").load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")

レスキューされたデータ列からソースファイルパスを削除するには、設定します:

Python
spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")

CSVパーサーは、レコードの解析時に PERMISSIVEDROPMALFORMED、および FAILFASTの3つのモードで対応します。rescuedDataColumnと組み合わせて使用すると、データ型の不一致によってDROPMALFORMEDモードでレコードが削除されたり、FAILFASTモードでエラーがスローされたりすることはありません。破損したレコード(不完全または不正な形式のCSV)のみが削除されるか、エラーがスローされます。

rescuedDataColumnPERMISSIVE モードで使用すると、破損したレコードには次のルールが適用されます

  • ファイルの最初の行(ヘッダー行またはデータ行)は、予想される行の長さを設定します。
  • 列数が異なる行は不完全と見なされます。
  • データ型の不一致は、破損したレコードとは見なされません。
  • 不完全で不正な形式のCSVレコードのみが破損していると見なされ、 _corrupt_record 列または badRecordsPathに記録されます。

その他のリソース

  • Parquet ファイルの読み書き: ワークロードでより優れたクエリパフォーマンスやより効率的なストレージが必要な場合、Parquet のカラム型レイアウトは、CSV のプレーンテキスト形式よりも大きな利点を提供します。