メインコンテンツまでスキップ

Spark APIオプションのリファレンス

このページでは、データの読み書きを行うSpark APIsで利用可能な入力および出力オプションを一覧表示します。

DataFrameReaderのオプション

これらのオプションをDataFrameReader.option()DataFrameReader.options()read_filesCOPY INTO 、およびAuto Loaderと組み合わせて使用すると、Databricksがデータファイルを読み込む方法を制御できます。

次の例では、JSON ファイルの読み込み時にmultiLineTrueに設定します。

Python
df = spark.read.format("json").option("multiLine", True).load("/path/to/data")

一般

次のオプションは、すべてのファイル形式に適用されます。

Key

デフォルト

説明

ignoreCorruptFiles

false

破損したファイルを無視するかどうか。もしそうであれば、 Spark破損したファイルに遭遇しても実行を続け、読み取られた内容は引き続き返されます。 COPY INTOの場合、スキップされた破損ファイルは、Delta Lake 履歴のoperationMetrics列でnumSkippedCorruptFilesとして確認できます。Databricks Runtime 11.3 LTS以降で利用可能です。

ignoreMissingFiles

false Auto Loaderの場合、 trueCOPY INTO (レガシー)

欠落したファイルを無視するかどうか。true の場合、ファイルが見つからない場合でもSparkジョブは実行を続行し、コンテンツは引き続き返されます。 Databricks Runtime 11.3 LTS以降で利用可能です。

modifiedAfter

なし

オプションのタイムスタンプをフィルターとして使用することで、指定されたタイムスタンプ以降の更新タイムスタンプを持つファイルのみを取り込むことができます。

modifiedBefore

なし

オプションのタイムスタンプをフィルターとして使用することで、指定されたタイムスタンプより前の更新タイムスタンプを持つファイルのみを取り込むことができます。

pathGlobFilter または fileNamePattern

なし

ファイルを選択するための潜在的なグロブパターン。COPY INTO (レガシー) のPATTERNに相当します。fileNamePattern read_filesで使用できます。

recursiveFileLookup

false

trueの場合、このオプションは、名前がdate=2019-07-01ようなパーティション命名規則に従っていない場合でも、ネストされたディレクトリを検索します。

Avro

Key

デフォルト

説明

avroSchema

なし

ユーザーがAvro形式で提供するオプションのスキーマ。Avroを読み込む際、このオプションは、実際のAvroスキーマとは互換性があるものの異なる、進化したスキーマに設定できます。逆シリアル化スキーマは、進化後のスキーマと一致している。例えば、デフォルト値を持つ追加の列を含む進化したスキーマを設定した場合、読み取り結果にはその新しい列も含まれます。

avroSchemaEvolutionMode

none

スキーマ レジストリを使用する場合のスキーマ進化の処理方法。 有効な値: none (スキーマの変更を無視してジョブを続行します)、 restart (スキーマの変更が検出された場合、 UnknownFieldExceptionを発生させてジョブの再起動が必要です)。

datetimeRebaseMode

LEGACY

ユリウス暦と開始グレゴリオ暦の間での DATE 値と TIMESTAMP 値のリベースを制御します。 有効な値: EXCEPTIONLEGACYCORRECTED

enableStableIdentifiersForUnionType

false

Avro Union型に安定したフィールド名を使用するかどうか。有効にすると、共用型フィールド名は、その型名を小文字にしたものから派生します(例: member_intmember_string )。小文字に変換した後の型名が同じ場合、例外をスローします。

mergeSchema

false

複数のファイルにまたがるスキーマを推測し、各ファイルのスキーマをマージするかどうか。 AvroのmergeSchema では、データ型は緩和されません。

mode

FAILFAST

破損したレコードを処理するためのパーサーモード。有効な値: FAILFAST (例外をスローする)、 PERMISSIVE (不正なフィールドを null に設定する)、 DROPMALFORMED (不正なレコードを黙って削除する)。

readerCaseSensitive

true

rescuedDataColumnが有効になっている場合の大文字小文字の区別動作を指定します。もしそうであれば、スキーマから大文字小文字が異なる名前のデータ列を復元します。偽の場合、大文字小文字を区別せずにデータを読み込む。

recursiveFieldMaxDepth

なし

再帰的なAvroフィールドの最大再帰深度。すべての再帰フィールドを切り捨てるには1に設定し、1 レベルの再帰を許可するには2に設定し、以下同様に15まで設定します。未設定または0場合、再帰フィールドは許可されません。有効な値: 015

rescuedDataColumn

なし

データ型の不一致、スキーマの不一致(列の大文字小文字の区別を含む)などが原因で解析できないすべてのデータを、別の列に収集するかどうか。この列は、 Auto Loader使用するときに自動的に組み込まれます。

COPY INTO (レガシー)は、 COPY INTOを使用してスキーマを手動で設定できないため、救出されたデータ列をサポートしていません。Databricksは、ほとんどのデータ取り込みシナリオにおいてAuto Loaderの使用を推奨しています。

詳細については、 「救出されたデータ列とは何ですか?」を参照してください。

stableIdentifierPrefixForUnionType

member_

enableStableIdentifiersForUnionType=trueの場合に安定共用型フィールド名に使用する接頭辞。

CSV

Key

デフォルト

説明

badRecordsPath

なし

不正なCSVレコードに関する情報を記録するファイルを保存するパス。

charToEscapeQuoteEscaping

\0

引用符のエスケープに使用される文字をエスケープするために使用される文字。たとえば、次のレコードの場合:[ " a\\", b ]

  • '\'をエスケープするための文字が未定義の場合、レコードは解析されません。パーサーは文字[a],[\],["],[,],[ ],[b]を読み取りますが、閉じ引用符が見つからないためエラーをスローします。
  • '\'をエスケープする文字が'\'として定義されている場合、レコードは2つの値[a\][b]で読み取られます。

columnNameOfCorruptRecord

_corrupt_record

Auto Loaderに対応しています。 COPY INTO (レガシー) ではサポートされていません。形式が不正で解析できないレコードを格納するための列。解析用のmodeDROPMALFORMEDに設定されている場合、この列は空になります。

comment

\0

テキスト行の先頭にある場合に、行コメントを表す文字を定義します。コメントのスキップを無効にするには、'\0'を使用します。

dateFormat

yyyy-MM-dd

日付文字列を解析するための形式。

emptyValue

空の文字列

空の値の文字列形式。

enableDateTimeParsingFallback

false

指定された形式で値を解析できない場合に、従来の日付およびタイムスタンプの解析動作に戻すかどうか。falseの場合、解析の失敗はmodeに応じてエラーを発生させるか null を生成します。

encoding または charset

UTF-8

CSVファイルのエンコード名。オプションのリストについては、java.nio.charset.Charsetを参照してください。multilinetrueの場合、 UTF-16UTF-32 は使用できません。

enforceSchema

true

指定したスキーマまたは推論されたスキーマを CSV ファイルに強制的に適用するかどうか。 このオプションを有効にすると、CSV ファイルのヘッダーは無視されます。 このオプションは、 Auto Loader を使用してデータをレスキューし、スキーマ進化を許可する場合、デフォルトで無視されます。

escape

\

データの解析時に使用するエスケープ文字。

extension

csv

想定されるファイル名拡張子。この拡張子を持たないファイルは、読み込み時に除外されます。

failOnUnknownFields

false

CSVレコードにスキーマに存在しない列が含まれている場合に、エラーとして処理するかどうか。falseの場合、認識されない列はrescuedDataColumnに応じてサイレントに削除または救済されます。

failOnWidenedFields

false

フィールド値が、拡張せずに宣言されたスキーマ型として解析できない場合に、エラーとするかどうか。falseの場合、型拡張された値はrescuedDataColumnに応じて暗黙的に救済されます。設定failOnUnknownFields=trueは、このオプションの効果を隠蔽する可能性があります。

header

false

CSVファイルにヘッダーが含まれているかどうか。Auto Loaderは、スキーマを推論するときにファイルにヘッダーがあると想定します。

ignoreLeadingWhiteSpace

false

解析された各値の先頭の空白を無視するかどうか。

ignoreTrailingWhiteSpace

false

解析された各値の末尾の空白を無視するかどうか。

inferSchema

false

解析されたCSVレコードのデータ型を推測するか、すべての列がStringTypeであると仮定するかどうか。trueに設定されている場合は、データの追加パスが必要です。Auto Loaderでは、代わりにcloudFiles.inferColumnTypesを使用します。

inputBufferSize

1048576 (1MB)

CSVパーサーのバッファサイズ(バイト単位)。大きなCSVファイルを解析する際のメモリ使用量を調整するのに役立ちます。有効な値:正の整数。

lineSep

なし、これは\r\r\n 、およびをカバーします \n

連続する2つのCSVレコード間の文字列。

locale

US

java.util.Locale識別子。CSV内のデフォルトの日付、タイムスタンプ、小数点の解析に影響を与えます。

maxCharsPerColumn

-1

解析対象となる値から想定される最大文字数。メモリエラーを回避するために使用できます。デフォルト値は-1で、これは無制限を意味します。有効な値:正の整数、または無制限の場合は-1

maxColumns

20480

レコードが持つことができる列数の上限。有効な値:正の整数。

mergeSchema

false

複数のファイルにまたがるスキーマを推測し、各ファイルのスキーマをマージするかどうか。 スキーマを推論するときに Auto Loader するためにデフォルトで有効になります。

mode

PERMISSIVE

不正な形式のレコードを処理するためのパーサーモード。有効な値: PERMISSIVEDROPMALFORMEDFAILFAST

multiLine

false

CSVレコードが複数行にまたがるかどうか。

nanValue

NaN

FloatType列およびDoubleType列を解析するときの数値以外の値の文字列形式。

negativeInf

-Inf

FloatTypeまたはDoubleType列を解析するときの負の無限大の文字列表現。

nullValue

空の文字列

null値の文字列形式。

parserCaseSensitive (非推奨)

false

ファイルの読み取り中に、ヘッダーで宣言された列をスキーマの大文字と小文字を区別して配置するかどうか。Auto Loaderのデフォルトでは、これはtrueです。有効にすると、大文字と小文字が異なる列がrescuedDataColumnで救出されます。このオプションは廃止され、readerCaseSensitiveが採用されました。

positiveInf

Inf

FloatTypeまたはDoubleType列を解析するときの正の無限大の文字列表現。

preferDate

true

可能な場合は、文字列をタイムスタンプではなく日付として推測しようとします。またinferSchemaを有効にするか、Auto LoaderでcloudFiles.inferColumnTypesを使用して、スキーマ推論を使用する必要があります。

quote

"

フィールド区切り文字が値の一部である場合に、値をエスケープするために使用される文字。

readerCaseSensitive

true

rescuedDataColumnが有効になっている場合の大文字小文字の区別動作を指定します。もしそうであれば、スキーマから大文字小文字が異なる名前のデータ列を復元します。偽の場合、大文字小文字を区別せずにデータを読み込む。

rescuedDataColumn

なし

データ型の不一致、スキーマの不一致(列の大文字小文字の区別を含む)などが原因で解析できないすべてのデータを、別の列に収集するかどうか。この列は、 Auto Loader使用するときに自動的に組み込まれます。 詳細については、 「救出されたデータ列とは何ですか?」を参照してください。

COPY INTO (レガシー)は、 COPY INTOを使用してスキーマを手動で設定できないため、救出されたデータ列をサポートしていません。Databricksは、ほとんどのデータ取り込みシナリオにおいてAuto Loaderの使用を推奨しています。

sep または delimiter

,

列間の区切り記号文字列。

singleVariantColumn

なし

列名を指定すると、各フィールドを個別の列に解析するのではなく、CSVレコード全体をその名前の単一のVariantType列に読み込みます。必要条件: header=true

skipRows

0

CSVファイルの先頭から無視する行数(コメント行と空行を含む)。headerが真の場合、ヘッダーはスキップもコメントもされていない最初の行になります。有効な値:正の整数または0。

timeFormat

HH:mm:ss

TimeType列の値を解析するためのフォーマット。

timestampFormat

yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]

タイムスタンプ文字列を解析するための形式。

timestampNTZFormat

yyyy-MM-dd'T'HH:mm:ss[.SSS]

タイムゾーン( TimestampNTZType )文字列を含まないタイムスタンプを解析するためのフォーマット。

timeZone

なし

タイムスタンプと日付を解析するときに使用するjava.time.ZoneId

unescapedQuoteHandling

STOP_AT_DELIMITER

エスケープされていない引用符を処理するための戦略。許可されるオプション:

  • STOP_AT_CLOSING_QUOTE:入力内でエスケープされていない引用符が見つかった場合は、引用符文字を蓄積し、閉じ引用符が見つかるまで値を引用符で囲まれた値として解析します。
  • BACK_TO_DELIMITER:入力内にエスケープされていない引用符が見つかった場合、その値は引用符で囲まれていない値とみなされます。これにより、パーサーは、sepで定義された区切り文字が見つかるまで、現在の解析値のすべての文字を蓄積します。値に区切り文字が見つからない場合、パーサーは区切り文字または行末が見つかるまで入力から文字を蓄積し続けます。
  • STOP_AT_DELIMITER:入力内にエスケープされていない引用符が見つかった場合、その値は引用符で囲まれていない値とみなされます。これにより、パーサーは、sepで定義された区切り文字、または入力内で行末が見つかるまで、すべての文字を蓄積します。
  • SKIP_VALUE:入力内でエスケープされていない引用符が見つかった場合、指定された値に対して解析されたコンテンツは(次の区切り文字が見つかるまで)スキップされ、代わりにnullValueに設定された値が生成されます。
  • RAISE_ERROR:入力内にエスケープされていない引用符が見つかった場合は、TextParsingExceptionが投げられます。

Excel

Key

デフォルト

説明

dataAddress

なし

Excel構文で読み込むセル範囲。省略した場合、最初のシートから有効なセルをすべて読み取ります。指定したシートから範囲を読み取るには"SheetName!C5:H10" 、最初のシートから範囲を読み取るには"C5:H10" 、特定のシートからすべてのデータを読み取るには"SheetName"使用します。

headerRows

0

列名ヘッダーとして使用する初期行数。dataAddressが指定されている場合、これはセル範囲内に適用されます。0の場合、列名は_c1_c2_c3などとして自動生成されます。有効な値: 01

operation

readSheet

Excelワークブックに対して実行する操作。有効な値: readSheet (シートからデータを読み込む)、 listSheets (各シートに対してフィールドsheetIndex: longsheetName: Stringを持つ構造体を返す)。

timestampNTZFormat

yyyy-MM-dd'T'HH:mm:ss[.SSS]

Excelに文字列として保存される、タイムゾーンなしのタイムスタンプ値のカスタム形式文字列。 カスタム日付形式は、Datetime パターンの形式に従います。

dateFormat

yyyy-MM-dd

文字列値のカスタムフォーマット文字列はDateとして読み取られます。カスタム日付形式は、Datetime パターンの形式に従います。

JSON

Key

デフォルト

説明

allowBackslashEscapingAnyCharacter

false

バックスラッシュの後に続く文字のエスケープを許可するかどうか。有効にしない場合、JSON仕様で明示的にリストされている文字のみをエスケープできます。

allowComments

false

構文解析されたコンテンツ内で、Java、C、C++スタイルのコメント('/''*''//'の各種)の使用を許可するかどうか。

allowNonNumericNumbers

true

数値でない(NaN)トークンの集合を有効な浮動小数点数値として許可するかどうか。

allowNumericLeadingZeros

false

整数が追加の(無視できる)ゼロ(例えば、 000001 )で始まることを許可するかどうか。

allowSingleQuotes

true

文字列(名前と文字列値)の引用符付けに一重引用符(アポストロフィ、文字 '\')の使用を許可するかどうか。

allowUnquotedControlChars

false

JSON文字列に、エスケープされていない制御文字(タブ文字や改行文字を含む、値が32未満のASCII文字)を含めることを許可するかどうか。

allowUnquotedFieldNames

false

JavaScriptでは許可されているが、JSON仕様では許可されていない、引用符なしのフィールド名の使用を許可するかどうか。

alternateVariantEncoding

なし

バリアント値に使用されるエンコードは、 JSONの形式で記述されます。 インラインJSONとして保存される代わりにBase85エンコードされたVariant値をデコードするには、 Z85に設定します。

badRecordsPath

なし

不正なJSONレコードに関する情報を記録するためのファイルを保存するパス。

ファイルベースのデータソースでbadRecordsPathオプションを使用する場合、以下の制限があります。

  • これは取引を伴わないため、一貫性のない結果につながる可能性があります。
  • 一時的なエラーは障害として扱われます。

columnNameOfCorruptRecord

_corrupt_record

形式が正しくなく、解析できないレコードを格納するための列。解析用のmode DROPMALFORMEDに設定されている場合、この列は空になります。

dateFormat

yyyy-MM-dd

日付文字列を解析するための形式。

dropFieldIfAllNull

false

スキーマ推論中に、すべてのNULL値の列、または空の配列および構造体を無視するかどうか。

encoding または charset

UTF-8

JSONファイルのエンコーディングの名前。オプションのリストについては、java.nio.charset.Charsetを参照してください。multilinetrueの場合は、UTF-16UTF-32を使用できません。

inferTimestamp

false

タイムスタンプ文字列をTimestampTypeとして推測するかどうか。trueに設定すると、スキーマ推論に著しく時間がかかる場合があります。Auto Loaderで使用するには、 cloudFiles.inferColumnTypes有効にする必要があります。

lineSep

なし、これは\r\r\n 、およびをカバーします \n

連続する2つのJSONレコード間の文字列。

locale

US

java.util.Locale識別子。JSON内のデフォルトの日付、タイムスタンプ、小数の解析に影響します。

maxNestingDepth

500

JSONオブジェクトおよび配列の最大許容ネスト深度。深くネストされたドキュメントの場合は、この値を増やしてください。有効な値:正の整数。

maxNumLen

1000

JSON入力における数値トークンの最大長。大きな数値リテラルを含むJSONの場合は、この値を増やしてください。有効な値:正の整数。

maxStringLen

無制限

JSON入力における文字列値の最大長。大きな文字列を含むJSONを解析する際のメモリ使用量を制限するように設定します。有効な値:正の整数。

mode

PERMISSIVE

不正な形式のレコードを処理するためのパーサーモード。有効な値: PERMISSIVEDROPMALFORMEDFAILFAST

multiLine

false

JSONレコードが複数行にまたがるかどうか。

prefersDecimal

false

可能な場合は、float型またはdouble型ではなく、文字列をDecimalTypeとして推論しようとします。また、inferSchemaを有効にするか、cloudFiles.inferColumnTypesをAuto Loaderとともに使用することで、スキーマ推論を使用する必要があります。

primitivesAsString

false

数値やブール値などのプリミティブ型をStringTypeとして推論するかどうか。

readerCaseSensitive

true

rescuedDataColumnが有効になっている場合の大文字小文字の区別動作を指定します。もしそうであれば、スキーマから大文字小文字が異なる名前のデータ列を復元します。偽の場合、大文字小文字を区別せずにデータを読み込む。Databricks Runtime 13.3以降で利用可能です。

rescuedDataColumn

なし

データ型の不一致またはスキーマの不一致(列の大文字小文字の区別を含む)により解析できないすべてのデータを、別の列に収集するかどうか。この列は、 Auto Loader使用するときに自動的に組み込まれます。 詳細については、 「救出されたデータ列とは何ですか?」を参照してください。

COPY INTO (レガシー)は、 COPY INTOを使用してスキーマを手動で設定できないため、救出されたデータ列をサポートしていません。Databricksは、ほとんどのデータ取り込みシナリオにおいてAuto Loaderの使用を推奨しています。

singleVariantColumn

なし

JSONドキュメント全体を取り込み、指定された文字列を列名とする単一のVariant列に解析するかどうか。設定されていない場合、JSONフィールドはそれぞれ独自の列に取り込まれます。有効な値:任意の文字列。

timestampFormat

yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]

タイムスタンプ文字列を解析するための形式。

timestampNTZFormat

yyyy-MM-dd'T'HH:mm:ss[.SSS]

タイムゾーン( TimestampNTZType )文字列を含まないタイムスタンプを解析するためのフォーマット。

timeZone

なし

タイムスタンプと日付を解析するときに使用するjava.time.ZoneId

upgradeExceptionAsBadRecord

false

型アップグレード例外(例えば、値を宣言された列型に拡張できない場合など)を例外をスローするのではなく、不良レコードとして扱うかどうか。

ORC

Key

デフォルト

説明

mergeSchema

false

複数ファイルからスキーマを推定し、各ファイルのスキーマをマージするかどうか。

Parquet

Key

デフォルト

説明

datetimeRebaseMode

LEGACY

ユリウス暦と開始グレゴリオ暦の間での DATE 値と TIMESTAMP 値のリベースを制御します。 有効な値: EXCEPTIONLEGACYCORRECTED

int96RebaseMode

LEGACY

ユリウス暦と開始グレゴリオ暦の間での INT96 タイムスタンプ値のリベースを制御します。 有効な値: EXCEPTIONLEGACYCORRECTED

mergeSchema

false

複数ファイルからスキーマを推定し、各ファイルのスキーマをマージするかどうか。

readerCaseSensitive

true

rescuedDataColumnが有効になっている場合の大文字小文字の区別動作を指定します。もしそうであれば、スキーマから大文字小文字が異なる名前のデータ列を復元します。偽の場合、大文字小文字を区別せずにデータを読み込む。

rescuedDataColumn

なし

データ型の不一致、スキーマの不一致(列の大文字小文字の区別を含む)などが原因で解析できないすべてのデータを、別の列に収集するかどうか。この列は、 Auto Loader使用するときに自動的に組み込まれます。 詳細については、 「救出されたデータ列とは何ですか?」を参照してください。

COPY INTO (レガシー)は、 COPY INTOを使用してスキーマを手動で設定できないため、救出されたデータ列をサポートしていません。Databricksは、ほとんどのデータ取り込みシナリオにおいてAuto Loaderの使用を推奨しています。

文章

Key

デフォルト

説明

encoding

UTF-8

TEXTファイルの行区切り文字のエンコーディング名。 オプションの一覧については、 java.nio.charset.Charsetを参照してください。このオプションはファイルの内容に影響を与えず、そのまま読み込まれます。

lineSep

なし、これは\r\r\nおよび \n

連続する2つのTEXTレコード間の文字列。

wholeText

false

ファイルを単一レコードとして読み取るかどうか。

XML

Key

デフォルト

説明

rowTag

なし

行として扱うXMLファイルの行タグ。例の XML <books> <book><book>...<books>では、適切な値はbookです。これは必須オプションです。

samplingRatio

1.0

スキーマ推論に使用する行の割合を定義します。XMLの組み込み関数はこのオプションを無視します。有効な値: 0.01.0

excludeAttribute

false

要素内の属性を除外するかどうか。

mode

なし

解析中に破損したレコードを処理するためのMode 。 PERMISSIVE : 破損したレコードの場合、 columnNameOfCorruptRecordで設定されたフィールドに不正な文字列を格納し、不正なフィールドをnullに設定します。破損したレコードを保持するには、ユーザー定義スキーマでcolumnNameOfCorruptRecordという名前のstring型のフィールドを設定できます。スキーマに該当フィールドが存在しない場合、解析中に破損したレコードは削除されます。スキーマを推論する際、パーサーは出力スキーマに暗黙的にcolumnNameOfCorruptRecordフィールドを追加します。DROPMALFORMED : 破損したレコードを無視します。このモードは、XML 組み込み関数ではサポートされていません。 FAILFAST : パーサーが破損したレコードに遭遇した場合に例外をスローします。

inferSchema

true

trueの場合、結果として得られる各 DataFrame 列に対して適切な型を推測しようとします。falseの場合、結果として得られるすべての列はstring型になります。XMLの組み込み関数はこのオプションを無視します。

columnNameOfCorruptRecord

spark.sql.columnNameOfCorruptRecord

PERMISSIVEモードによって作成された不正な文字列を含む新しいフィールドの名前変更を許可します。

attributePrefix

なし

属性を要素と区別するための、属性の接頭辞。これはフィールド名の接頭辞になります。デフォルトは_です。XMLの読み取り時には空でも構いませんが、書き込み時には空にすることはできません。DataFrameWriter の XML オプションにも適用されます。

valueTag

_VALUE

属性または子要素を持つ要素内の文字データに使用されるタグ。ユーザーはスキーマ内でvalueTagフィールドを指定することもできますし、文字データが他の要素や属性を持つ要素内に存在する場合、スキーマ推論中に自動的に追加されます。DataFrameWriter の XML オプションにも適用されます。

encoding

UTF-8

読み取り時には、指定されたエンコード方式でXMLファイルをデコードします。書き込み時に、保存されるXMLファイルのエンコーディング(文字セット)を指定します。XMLの組み込み関数はこのオプションを無視します。DataFrameWriter の XML オプションにも適用されます。

ignoreSurroundingSpaces

true

値の周囲の空白をスキップする必要があるかどうか。空白文字のみのデータは無視されます。

rowValidationXSDPath

なし

各行のXMLを個別に検証するために使用される、オプションのXSDファイルへのパス。検証に失敗した行は、構文解析エラーとして扱われます。XSDは、提供されたスキーマであろうと推論されたスキーマであろうと、それ以外のスキーマには影響を与えません。

ignoreNamespace

false

trueの場合、XML 要素および属性の名前空間のプレフィックスは無視されます。例えば、タグ<abc:author><def:author> 、両方とも<author>であるかのように扱われます。rowTag要素では名前空間を無視することはできません。無視できるのはその子要素のみです。XML解析はfalseであっても名前空間を認識しません。

timestampFormat

yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]

datetime パターン形式に従うカスタムタイムスタンプ形式文字列。これはtimestamp型に適用されます。DataFrameWriter の XML オプションにも適用されます。

timestampNTZFormat

yyyy-MM-dd'T'HH:mm:ss[.SSS]

タイムゾーンを含まないタイムスタンプのカスタムフォーマット文字列。datetime パターンフォーマットに従います。これはTimestampNTZType型に適用されます。DataFrameWriter の XML オプションにも適用されます。

dateFormat

yyyy-MM-dd

datetime パターン形式に従うカスタム日付形式文字列。これは日付型に適用されます。DataFrameWriter の XML オプションにも適用されます。

locale

en-US

IETF BCP 47形式の言語タグとしてロケールを設定します。例えば、 locale日付やタイムスタンプを解析する際に使用されます。

nullValue

string null

null値の文字列表現を設定します。これがnullの場合、パーサーはフィールドの属性と要素を書き込みません。DataFrameWriter の XML オプションにも適用されます。

readerCaseSensitive

true

rescuedDataColumn が有効になっている場合の大文字小文字の区別動作を指定します。もしそうであれば、スキーマから大文字小文字が異なる名前のデータ列を復元します。偽の場合、大文字小文字を区別せずにデータを読み込む。

rescuedDataColumn

なし

データ型の不一致やスキーマの不一致(列の大文字小文字の区別を含む)により解析できないすべてのデータを、別の列に収集するかどうか。この列は、 Auto Loader使用するときに自動的に組み込まれます。 詳細については、 「救出されたデータ列とは何ですか?」を参照してください。 COPY INTO (レガシー) は、 COPY INTOを使用してスキーマを手動で設定できないため、救出されたデータ列をサポートしていません。Databricksは、ほとんどのデータ取り込みシナリオにおいてAuto Loaderの使用を推奨しています。

singleVariantColumn

none

単一バリアント列の名前を指定します。このオプションが読み取り用に指定されている場合、XMLレコード全体を解析して、指定されたオプション文字列値を列名とする単一のVariant列を作成します。このオプションが書き込み用に提供されている場合、単一のVariant列の値をXMLファイルに書き込みます。DataFrameWriter の XML オプションにも適用されます。

useLegacyXMLParser

true

従来のXMLパーサーを使用するかどうか。従来のパーサーは、不正なコンテンツに対する検証が緩い反面、メモリ効率が劣ります。より厳格なデフォルトパーサーを有効にするには、 falseに設定してください。

wildcardColName

xs_any

ワイルドカード( xs:any )スキーマ要素に一致するXML要素をキャプチャするために使用される列名。rescuedDataColumnと併用することはできません。

DataStreamReaderのオプション

これらのオプションをDataStreamReader.option()と組み合わせて使用すると、Delta Lakeテーブルやその他のファイルベースのソースからのストリーミング読み取りを設定できます。

ファイル形式のオプション(JSON、CSV、Parquetなど)については、 DataFrameReaderのオプションを参照してください。

Auto Loader ( cloudFiles.* )のオプションについては、 Auto Loader参照してください。

次の例では、 Delta Lakeテーブル ストリームのmaxFilesPerTrigger10に設定します。

Python
df = spark.readStream.format("delta").option("maxFilesPerTrigger", 10).load("/path/to/delta-table")

一般

以下のオプションは、Delta Lakeテーブルおよびその他のファイルベースのストリーミングソースに適用されます。

Key

デフォルト

説明

cleanSource

off

ストリーム処理後のソースファイルの取り扱い方法。有効な値: off (何もしない)、 delete (ソースファイルを完全に削除する)、 archive ( sourceArchiveDirに移動)。archiveに設定する場合は、 sourceArchiveDirも設定する必要があります。Delta Lakeテーブル ストリーミングには適用されません。

fileNameOnly

false

既に処理済みのファイルを、フルパスではなくファイル名のみで識別するかどうか。trueの場合、異なるパスにある同じファイル名のファイルは同じファイルとして扱われ、再処理されません。Delta Lakeテーブル ストリーミングには適用されません。

latestFirst

false

各マイクロバッチ内で、最も最近変更されたファイルを最初に処理するかどうか。最新のデータをできるだけ迅速に処理したい場合に役立ちます。truemaxFilesPerTriggerまたはmaxBytesPerTriggerが設定されている場合、 maxFileAgeは無視されます。Delta Lakeテーブル ストリーミングには適用されません。

maxBytesPerTrigger

なし

マイクロバッチごとに処理されるデータ量のソフト最大値。最小入力単位が制限値を超えた場合、バッチ処理で制限値を超える処理が行われる可能性があります。maxFilesPerTriggerと併用した場合、マイクロバッチはどちらかの制限に先に到達するまでデータを処理します。有効な値:正の整数。

Auto Loaderの場合は、代わりにcloudFiles.maxBytesPerTriggerを使用してください。 共通項を参照。

maxCachedFiles

10000

後続のマイクロバッチ処理のためにキャッシュする未処理ファイルの最大数。キャッシュを無効にするには、 0に設定してください。ソースディレクトリにトリガーごとに多数の新規ファイルが含まれる場合は、この値を増やしてください。Delta Lakeテーブル ストリーミングには適用されません。 有効な値:正の整数または0。

maxFileAge

7d

処理対象となるファイルの最大経過時間は、現在のシステム時刻ではなく、最後に変更されたファイルのタイムスタンプを基準とします。このしきい値よりも古いファイルは無視されます。7d4hのような期間文字列を受け入れます。latestFirsttrueで、かつmaxFilesPerTriggerまたはmaxBytesPerTriggerが設定されている場合は無視されます。Delta Lakeテーブル ストリーミングには適用されません。

maxFilesPerTrigger

1000 Delta LakeとAuto Loader向け。 その他のファイルベースのソースには上限はありません。

各マイクロバッチで処理される新規ファイル数の上限。maxBytesPerTriggerと併用した場合、マイクロバッチはどちらかの制限に先に到達するまでデータを処理します。有効な値:正の整数。

Auto Loaderの場合は、代わりにcloudFiles.maxFilesPerTriggerを使用してください。 共通項を参照。

sourceArchiveDir

なし

cleanSourcearchiveに設定されている場合のアーカイブディレクトリへのパス。ソースファイルは処理後、元のディレクトリ構造を維持したままこのパスに移動されます。Delta Lakeテーブル ストリーミングには適用されません。

Auto Loader

これらのオプションをcloudFilesソースと組み合わせて使用すると、クラウドストレージからのストリーミング取り込み用にAuto Loaderを設定できます。cloudFilesソースに固有のオプションには、他の構造化ストリーミングソースオプションとは別の名前空間に保持するために、 cloudFilesという接頭辞が付けられます。

一般

Key

デフォルト

説明

cloudFiles.allowOverwrites

false

入力ディレクトリファイルの変更が既存のデータを上書きすることを許可するかどうか。

設定上の注意点については、 「ファイルが追加または上書きされた場合、Auto Loader はファイルを再度処理しますか?」を参照してください。

cloudFiles.backfillInterval

なし

Auto Loader指定された間隔で非同期のバックフィルを実行できます。 例えば、 1 day毎日バックアップ、 1 weekは毎週バックアップを実行します。詳細については、 「cloudFiles.backfillInterval を使用して定期的なバックフィルをトリガーする」を参照してください。

cloudFiles.useManagedFileEventstrueに設定されている場合は使用しないでください。

cloudFiles.cleanSource

OFF

処理済みのファイルを入力ディレクトリから自動的に削除するかどうか。OFF (デフォルト)に設定すると、ファイルは削除されません。

DELETEに設定すると、Auto Loaderは処理後30日後にファイルを自動的に削除します。これを行うには、Auto Loader がソースディレクトリへの書き込み権限を持っている必要があります。

MOVEに設定すると、Auto Loaderは処理後cloudFiles.cleanSource.moveDestination日後に指定された場所にファイルを自動的に移動します。これを行うには、Auto Loader がソースディレクトリと移動先ディレクトリの両方に対して書き込み権限を持っている必要があります。

ファイルは、 cloud_files_stateテーブル値関数の結果のcommit_timeに null 以外の値がある場合に処理済みとみなされます。cloud_files_stateテーブル値関数を参照してください。処理後の30日間の追加待機はcloudFiles.cleanSource.retentionDurationを使用して設定できます。

cloudFiles.cleanSourceを有効にする前に、以下の事項を確認してください。

  • ソースの場所からデータを消費するストリームが複数ある場合、 Databricksこのオプションの使用をお勧めしません。これは、最速のコンシューマーがファイルを削除し、遅いソースには取り込まれないためです。
  • この機能を有効にするには、Auto Loader がチェックポイントに追加の状態を保持する必要があり、パフォーマンスのオーバーヘッドが発生しますが、 cloud_files_stateテーブル値関数を通じて監視性を向上させることができます。cloud_files_stateテーブル値関数を参照してください。
  • cleanSource 現在の設定を使用して、特定のファイルをMOVEまたはDELETEするかどうかを決定します。例えば、ファイルが最初に処理されたときの設定がMOVEだったが、30日後にファイルがクリーンアップの対象になったときにDELETEに変更されたとします。この場合、cleanSourceはファイルを削除します。
  • retentionDuration有効期限が切れた直後にファイルが削除されるとは限りません。コストを抑えるため、Auto Loaderはストリーム処理と同時にファイルを削除し、ストリーム処理が完了または終了するとすぐに終了します。ストリーム処理中にクリーンアップの対象となったものの、クリーンアップできなかったファイルは、次回のAuto Loader実行時に処理されます。

Databricks Runtime 16.4以降で利用可能です。

cloudFiles.cleanSource.retentionDuration

30 days

処理されたファイルがアーカイブ候補になるまでの待機時間( cleanSource )。DELETEの場合は 7 日以上である必要があります。MOVEには最低制限はありません。

値はCalendarInterval型の文字列です。例えば、 "14 days""30 days""2 weeks" 、または"1 month"

Databricks Runtime 16.4以降で利用可能です。

cloudFiles.cleanSource.moveDestination

なし

cloudFiles.cleanSourceMOVEに設定された場合の処理済みファイルのアーカイブ先パス。これはクラウドストレージパスまたはUnity Catalogボリュームパス(例: /Volumes/my_catalog/my_schema/my_volume/archive/ )です。

移転先は以下の条件を満たす必要があります。

  • ソースディレクトリの子ディレクトリであってはならない。移動先をソースディレクトリ内に配置した場合、アーカイブされたファイルが再度取り込まれます。
  • ソースと同じ外部ロケーション、ボリューム、またはDBFSマウント上に配置してください。バケット間およびコンテナ間の移動はサポートされておらず、エラーが発生します。

Auto Loaderには、このディレクトリへの書き込み権限が必要です。

Databricks Runtime 16.4以降で利用可能です。

cloudFiles.format

なし(必須オプション)

ソースパス内のデータファイル形式。有効な値は次のとおりです。

cloudFiles.includeExistingFiles

true

ストリーム処理入力パスに既存のファイルを含めるか、初期セットアップ後に到着する新しいファイルのみを処理するかどうか。このオプションは、初めてストリームを開始するときにのみ評価されます。ストリームの再開後にこのオプションを変更しても効果はありません。

cloudFiles.inferColumnTypes

false

スキーマ推論を利用する際に、正確な列型を推論するかどうか。安心により、 JSONやCSVデータセットを推論する際、列は文字列として推論されます。 詳細については、スキーマ推論を参照してください。

cloudFiles.maxBytesPerTrigger

なし

各トリガーで処理される新規バイトの最大数。各マイクロバッチのデータ量を10GBに制限するには、 10gなどのバイト区切りを指定できます。 これは緩やかな上限値です。それぞれ3GBのファイルがある場合、Databricksは12GBのデータをマイクロバッチで処理します。cloudFiles.maxFilesPerTriggerと併用する場合、Databricks はcloudFiles.maxFilesPerTriggerまたはcloudFiles.maxBytesPerTriggerのいずれか早い方の下限まで消費します。このオプションはTrigger.Once()と併用した場合、効果がありません( Trigger.Once()は非推奨です)。

Databricks Runtime 18.0以降では、このオプションは動的に構成されるため、手動で設定する必要はありません。

cloudFiles.maxFileAge

なし

ファイルイベントが重複排除の目的で追跡される期間。Databricks 、1 時間あたり数百万ファイル程度のデータを取り込んでいる場合を除き、この設定を調整することを推奨しません。 詳細については、 「ファイルイベント追跡」のセクションを参照してください。

cloudFiles.maxFileAge過度に調整すると、重複したデータの取り込みやファイルの欠落など、データ品質の問題が発生する可能性があります。そのため、DatabricksはcloudFiles.maxFileAgeに対して90日などの保守的な設定を推奨しており、これは類似のデータ取り込みソリューションが推奨しているものと同様です。

cloudFiles.maxFilesPerTrigger

1000

各トリガーで処理される新しいファイルの最大数。cloudFiles.maxBytesPerTriggerと一緒に使用すると、DatabricksはcloudFiles.maxFilesPerTriggerまたはcloudFiles.maxBytesPerTriggerのどちらか先に到達した方の下限まで消費します。このオプションは、Trigger.Once()(非推奨)と一緒に使用すると効果がありません。

Databricks Runtime 18.0以降では、このオプションは動的に構成されるため、手動で設定する必要はありません。

cloudFiles.partitionColumns

なし

ファイルのディレクトリ構造から推測したい、Hiveスタイルのパーティション列をカンマ区切りで指定します。Hive スタイルのパーティション列は、 <base-path>/a=x/b=1/c=y/file.formatのように等号で結合されたキーと値のペアです。この例では、パーティション列はabcです。デフォルトでは、スキーマ推論を使用している場合、これらの列はスキーマに自動的に追加され、データの読み込み元として<base-path>が提供されます。スキーマを指定する場合、Auto Loader はこれらの列がスキーマに含まれていることを想定します。これらの列をスキーマに含めたくない場合は、 ""を指定してこれらの列を無視することができます。さらに、以下の例のように、複雑なディレクトリ構造内のファイルパスを列に推測させたい場合にも、このオプションを使用できます。

<base-path>/year=2022/week=1/file1.csv <base-path>/year=2022/month=2/day=3/file2.csv <base-path>/year=2022/month=2/day=4/file3.csv

cloudFiles.partitionColumns year,month,dayとして指定すると、 file1.csvに対してyear=2022が返されますが、 monthday列はnullになります。

month file2.csvfile3.csvについては、 dayは正しく解析されます。

cloudFiles.schemaEvolutionMode

addNewColumns スキーマが指定されていない場合はnone、それ以外の場合は

データ内に新しい列が発見された際に、スキーマを進化させるためのモード。安心により、 JSONデータセットを推論する際、列は文字列として推論されます。 詳細については、 「スキーマの進化」を参照してください。

cloudFiles.schemaHints

なし

スキーマ推論時にAuto Loaderに提供するスキーマ情報。 詳細については、スキーマヒントを参照してください。

cloudFiles.schemaLocation

なし(スキーマ推論に必須)

推論されたスキーマとその後の変更を保存する場所。詳細については、スキーマ推論を参照してください。

cloudFiles.useStrictGlobber

false

Apache Sparkの他のファイル ソースの完全なグロビング動作と一致する厳密なglobberを使用するかどうか。 詳細については、 「一般的なデータ読み込みパターン」を参照してください。Databricks Runtime 12.2 LTS以降で利用可能です。

cloudFiles.validateOptions

true

Auto Loaderオプションを検証し、不明なオプションまたは一貫性のないオプションに対してエラーを返すかどうか。

ディレクトリ一覧

Key

デフォルト

説明

cloudFiles.useIncrementalListing (非推奨)

auto Databricks Runtime 17.2以前のバージョンではfalse、 Databricks Runtime 17.3以降のバージョンでは

この機能は廃止されました。Databricksは、 cloudFiles.useIncrementalListingの代わりにファイルイベントでファイル通知モードを使用することを推奨しています。

ディレクトリ一覧表示モードで、完全な一覧表示ではなく、増分一覧表示を使用するかどうか。デフォルトでは、Auto Loader は指定されたディレクトリが増分リストの対象であるかどうかを自動的に検出するよう最大限の努力を払います。明示的に増分リストを使用するか、ディレクトリ全体のリストを使用するかは、それぞれtrueまたはfalseに設定することで選択できます。

辞書順に並んでいないディレクトリでインクリメンタルリストを誤って有効にすると、 Auto Loader新しいファイルを検出できなくなります。

Azureデータレイク ストレージ ( abfss:// )、 S3 ( s3:// )、およびGCS ( gs:// ) で動作します。

Databricks Runtime 9.1 LTS 以降で利用可能です。 使用可能な値:autotruefalse

ファイル通知

必要なクラウド権限、セットアップ手順、認証方法など、ファイル通知モードの構成については、 「 ファイル通知モードでのAuto Loaderストリームの構成 」を参照してください。

Key

デフォルト

説明

cloudFiles.fetchParallelism

1

キューイングサービスからメッセージを取得するときに使用するスレッドの数。

cloudFiles.useManagedFileEventstrueに設定されている場合は使用しないでください。

cloudFiles.pathRewrites

なし

複数のS3バケットからファイル通知を受け取るqueueUrlを指定し、これらのコンテナ内のデータにアクセスするために構成されたマウントポイントを使用する場合にのみ必要です。このオプションを使用すると、 bucket/keyパスのプレフィックスをマウントポイントで書き換えることができます。書き換え可能なのは接頭辞のみです。例えば、構成{"<databricks-mounted-bucket>/path": "dbfs:/mnt/data-warehouse"}の場合、パスs3://<databricks-mounted-bucket>/path/2017/08/fileA.jsondbfs:/mnt/data-warehouse/2017/08/fileA.jsonに書き換えられます。

cloudFiles.useManagedFileEventstrueに設定されている場合は使用しないでください。

cloudFiles.resourceTag

なし

関連リソースの関連付けと識別に役立つ一連のキーと値のタグのペア。次に例を示します。

cloudFiles.option("cloudFiles.resourceTag.myFirstKey", "myFirstValue") .option("cloudFiles.resourceTag.mySecondKey", "mySecondValue")

AWSの詳細については、 Amazon SQS コスト割り当てタグ」および「 Amazon SNS トピックのタグの設定」を参照してください。 (1)

Azureに関する詳細情報については、 「キューとメタデータの名前付け」および「イベント サブスクリプション」properties.labelsに関する説明を参照してください。 Auto Loaderは、これらのキーと値のタグのペアをJSONのラベルとして保存します。(1)

GCPに関する詳細情報については、 「ラベルを使用した使用状況の報告」を参照してください。 (1)

cloudFiles.useManagedFileEventstrueに設定されている場合は使用しないでください。代わりに、クラウドプロバイダーのコンソールを使用してリソースタグを設定してください。

cloudFiles.useManagedFileEvents

false

trueに設定すると、Auto Loaderはファイルイベントサービスを使用して、外部ロケーションにあるファイルを検出します。このオプションは、ロード パスが外部ロケーションにあり、ファイル イベントが有効になっている場合にのみ使用できます。 ファイルイベントでファイル通知モードを使用する方法については、「ファイルイベントでファイル通知モードを使用する」を参照してください。

ファイルイベントは、ファイル検出において通知レベルのパフォーマンスを提供します。これは、Auto Loaderが前回の実行後に新しいファイルを検出できるためです。ディレクトリ一覧表示とは異なり、この処理ではディレクトリ内のすべてのファイルを一覧表示する必要はありません。

ファイルイベントオプションが有効になっている場合でも、Auto Loaderがディレクトリ一覧表示を使用する状況がいくつかあります。

  • 初期ロード時に、 includeExistingFilestrueに設定されると、ディレクトリ全体のリストが取得され、Auto Loader が起動する前にディレクトリ内に存在していたすべてのファイルが検出されます。
  • ファイルイベントサービスは、最近作成されたファイルをキャッシュすることで、ファイルの検出を最適化します。Auto Loader の実行頻度が低い場合、このキャッシュは期限切れになる可能性があり、その場合、Auto Loader はディレクトリ一覧表示にフォールバックしてファイルを検出し、キャッシュを更新します。この事態を避けるため、少なくとも7日に1回はAuto Loaderを起動してください。

ファイルイベントを使用するAuto Loader 、いつディレクトリ一覧を使用しますか?を参照してください。 このオプションを使用して Auto Loader がディレクトリ一覧を表示する状況の包括的なリストについては、こちらをご覧ください。

Databricks Runtime 14.3 LTS以降で利用可能です。

cloudFiles.listOnStart

false

trueに設定すると、Auto Loader はチェックポイント内の継続トークンから開始する代わりに、ストリームの開始時にディレクトリ全体のリストを実行します。このオプションを使用して、 CF_MANAGED_FILE_EVENTS_INVALID_CONTINUATION_TOKENなどのエラーから回復します。CF_MANAGED_FILE_EVENTS_INVALID_CONTINUATION_TOKENエラーから復旧するにはどうすればよいですか?」を参照してください。

cloudFiles.useNotifications

false

新しいファイルが存在することを通知するために、ファイル通知モードを使用するかどうか。falseの場合は、ディレクトリ一覧表示モードを使用します。Auto Loaderファイル検出モードの比較を参照してください。

cloudFiles.useManagedFileEventstrueに設定されている場合は使用しないでください。

(1) Auto Loaderは、デフォルトではベストエフォートベースで次のキーと値のタグのペアを追加します。

  • vendor: Databricks
  • path: データが読み込まれる場所。ラベル付けの制限のため、GCPでは使用できません。
  • checkpointLocation:ストリームのチェックポイントの位置。表示上の制限により、GCP(医薬品臨床試験の実施に関する基準)では利用できません。
  • streamId: ストリームのグローバル一意識別子。

Databricksはこれらのキー名を予約しており、その値を上書きすることはできません。

クラウド固有の

Auto Loaderは、ファイル通知モード向けにクラウドインフラストラクチャを構成するためのオプションを提供します。必要なクラウド権限とセットアップ手順については、 「ファイル通知モードでのAuto Loaderストリームの構成」を参照してください。

AWS

cloudFiles.useNotifications = trueを選択し、Auto Loaderに通知サービスを設定させたい場合にのみ、以下のオプションを指定してください。

Key

デフォルト

説明

cloudFiles.region

EC2インスタンスのリージョン

ソースS3バケットが存在し、 AWS SNS および SQS サービスを作成するリージョン。

Key

デフォルト

説明

cloudFiles.restrictNotificationSetupToSameAWSAccountId

false

SNSトピックと同じアカウント内のAWS S3バケットからのイベント通知のみを許可する。この設定が有効な場合、Auto Loader は SNS トピックと同じアカウント内の AWS S3 バケットからのイベント通知のみを受け入れます。

falseの場合、アクセス ポリシーはアカウント間バケットと SNS トピックの設定を制限しません。これは、SNSのトピックとバケットパスが異なるアカウントに関連付けられている場合に役立ちます。

Databricks Runtime 17.2以降で利用可能です。

cloudFiles.useNotifications = trueを選択し、すでに設定したキューをAuto Loaderで使用する場合にのみ、次のオプションを指定します。

Key

デフォルト

説明

cloudFiles.queueUrl

なし

SQSキューのURL。提供された場合、Auto Loaderは独自のAWS SNSとSQSサービスをセットアップする代わりに、このキューから直接イベントを消費します。

AWS認証オプション

Databricks資格情報を使用するには、次の認証オプションを指定します。

Key

デフォルト

説明

databricks.serviceCredential

なし

Databricksサービスの認証情報の名前。Databricks Runtime 16.1以降で利用可能です。

Databricksのサービス認証情報またはIAMロールが利用できない場合は、代わりに以下の認証オプションを指定できます。

Key

デフォルト

説明

cloudFiles.awsAccessKey

なし

ユーザーのAWSアクセスキーID。cloudFiles.awsSecretKeyを提供する必要があります。

cloudFiles.awsSecretKey

なし

ユーザーのAWSシークレットアクセスキー。cloudFiles.awsAccessKeyを提供する必要があります。

cloudFiles.roleArn

なし

ARNIAM必要に応じて引き受ける ロールの 。このロールは、クラスターのインスタンスプロファイルから、または cloudFiles.awsAccessKeycloudFiles.awsSecretKeyで資格情報を提供することで引き受けることができます。

cloudFiles.roleExternalId

なし

cloudFiles.roleArnを使用してロールを引き受けるときに指定する識別子。

cloudFiles.roleSessionName

なし

cloudFiles.roleArnを使用してロールを引き受ける際に使用するオプションのセッション名。

cloudFiles.stsEndpoint

なし

cloudFiles.roleArnを使用してロールを引き受けるときにAWS STSにアクセスするために提供するオプションのエンドポイント。

Azure

cloudFiles.useNotifications = trueを指定し、Auto Loaderに通知サービスを設定させる場合は、次のすべてのオプションに値を指定する必要があります。

Key

デフォルト

説明

cloudFiles.resourceGroup

なし

ストレージ アカウントが作成されるAzureリソース グループ。

cloudFiles.subscriptionId

なし

リソースグループが作成されたAzureサブスクリプションID。

databricks.serviceCredential

なし

Databricksサービスの認証情報の名前。Databricks Runtime 16.1以降で利用可能です。

Databricksサービスの認証情報が利用できない場合は、代わりに以下の認証オプションを指定できます。

Key

デフォルト

説明

cloudFiles.clientId

なし

Databricksサービスプリンシパルのクライアント ID またはアプリケーション ID。

cloudFiles.clientSecret

なし

Databricksサービスプリンシパルのクライアント シークレット。

cloudFiles.connectionString

なし

アカウントアクセスキーあるいは共有アクセス署名(SAS)に基づく、ストレージアカウントの接続文字列。

cloudFiles.tenantId

なし

Databricksサービスプリンシパルが作成されるAzureテナントID。

cloudFiles.useNotifications = trueに設定し、Auto Loader に既存のキューを使用させたい場合にのみ、以下のオプションを指定してください。

Key

デフォルト

説明

cloudFiles.queueName

なし

Azureキューの名前。提供されている場合、クラウド ファイル ソースは、独自のAzure Event Grid およびキュー ストレージ サービスを設定する代わりに、このキューからイベントを直接消費します。 その場合、 databricks.serviceCredentialまたはcloudFiles.connectionStringはキューに対する読み取り権限のみを必要とします。

GCP

Auto Loader Databricksサービスの資格情報を利用して、通知サービスを自動的にセットアップできます。 Databricks サービス認証情報を使用して作成されたサービス アカウントには、ファイル通知モードでの Auto Loader ストリームの構成で指定された権限が必要です。

Key

デフォルト

説明

cloudFiles.projectId

なし

GCSバケットが存在するプロジェクトのID。Google クラウド Pub/Sub サブスクリプションもこのプロジェクト内で作成されます。

databricks.serviceCredential

なし

Databricksサービスの認証情報の名前。Databricks Runtime 16.1以降で利用可能です。

Databricks サービスの認証情報を利用できない場合は、Google サービス アカウントを直接使用できます。Google サービスのセットアップに従って、クラスターをサービス アカウントとして構成するか、次の認証オプションを直接提供することができます。

Key

デフォルト

説明

cloudFiles.client

なし

GoogleサービスアカウントのクライアントID。

cloudFiles.clientEmail

なし

Googleサービスアカウントのメールアドレス。

cloudFiles.privateKey

なし

Google サービス アカウント用に生成される秘密キー。

cloudFiles.privateKeyId

なし

Google サービス アカウント用に生成された秘密キーの ID。

cloudFiles.useNotifications = trueを選択し、すでに設定したキューをAuto Loaderで使用する場合にのみ、次のオプションを指定します。

Key

デフォルト

説明

cloudFiles.subscription

なし

Google Cloud Pub/Subサブスクリプションの名前。指定されている場合、クラウドファイルソースは独自のGCS通知サービスとGoogle Cloud Pub/Subサービスを設定する代わりに、このキューからのイベントを消費します。

Delta Lake

spark.readStreamを使用して Delta Lake テーブルから読み込む場合、以下のオプションが適用されます。

Key

デフォルト

説明

allowSourceColumnDrop

なし

Deltaテーブルのバージョン番号または"always"に設定すると、ソース テーブル スキーマから列が削除された後もストリームが継続できるようになります。 バージョン番号を設定すると、そのバージョンまでのすべてのスキーマ変更を認識します。必要条件: schemaTrackingLocationDelta Lakeの列マッピングを使用した列名の変更と削除を参照してください。

allowSourceColumnRename

なし

Deltaテーブルのバージョン番号または"always"に設定すると、ソース テーブルで列の名前が変更された後もストリームが継続できるようになります。 バージョン番号を設定すると、そのバージョンまでのすべてのスキーマ変更を認識します。必要条件: schemaTrackingLocationDelta Lakeの列マッピングを使用した列名の変更と削除を参照してください。

allowSourceColumnTypeChange

なし

Deltaテーブルのバージョン番号または"always"に設定すると、ソース テーブルで列の型が変更された後もストリームが継続できるようになります。 バージョン番号を設定すると、そのバージョンまでのすべてのスキーマ変更を認識します。必要条件: schemaTrackingLocation型の拡張を参照してください。

excludeRegex

なし

正規表現パターン。パスがパターンに一致するファイルは、ストリーミング読み取りから除外されます。想定される命名規則に準拠していないファイルを除外するのに役立ちます。

failOnDataLoss

true

ログ保持( logRetentionDuration )によりソースデータが削除された場合、ストリーミングクエリを失敗させるかどうか。欠損データをスキップして処理を続行するには、 falseに設定してください。タイムトラベルクエリのデータ保持設定については、「タイムトラベルクエリのデータ保持設定」を参照してください。

ignoreChanges (非推奨)

false

Databricks Runtime 11.3 LTS以前のバージョンで利用可能です。UPDATEMERGE INTODELETEOVERWRITEなどの変更操作後に書き換えられたデータファイルを再出力します。変更されていない行が新しい行と同時に出力される可能性があるため、下流のコンシューマーは重複を処理する必要があります。削除操作は下流には伝播されません。Databricks Runtime 12.2 LTS以降ではskipChangeCommitsに置き換えられました。

ignoreDeletes (非推奨)

false

パーティション境界でデータを削除するトランザクション(パーティション全体の削除のみ)は無視します。パーティション以外の削除、更新、その他の変更には対応していません。代わりにskipChangeCommits使用してください。

readChangeFeed または readChangeData

false

ストリーミング クエリの変更データフィードの読み取りを有効にするかどうか。 有効にすると、ストリームは行レベルの変更(挿入、更新、削除)に加えて、追加のメタデータ列を出力します。DatabricksでのDelta Lake変更データフィードの使用」を参照してください。

schemaTrackingLocation

なし

Delta Lakeがストリーミング読み取りのためのスキーマ変更を追跡するディレクトリへのパス。列マッピングが有効になっているテーブルからストリーミングし、スキーマ進化を処理するためにallowSourceColumn*オプションを使用する場合に必要です。 ストリーミングクエリのcheckpointLocation範囲内である必要があります。Delta Lakeの列マッピングを使用した列名の変更と削除を参照してください。

skipChangeCommits

false

既存のレコードを削除または変更するトランザクションは無視し、追加のみを処理します。Databricks 、変更データフィードを使用しないほとんどのワークロードにこのオプションを推奨します。 Databricks Runtime 12.2 LTS以降で利用可能です。skipChangeCommitsを使用してアップストリームの変更コミットをスキップする」を参照してください。

startingTimestamp

最新の情報

読み取りを開始するタイムスタンプ。このストリームは、指定されたタイムスタンプ以降にコミットされたすべてのテーブル変更を読み取ります。タイムスタンプがすべての利用可能なテーブル コミットよりも前にある場合、ストリームは利用可能な最も早いコミットから開始されます。 startingVersionと併用することはできません。ストリーミングチェックポイントが既に存在する場合は無視されます。

"2019-01-01T00:00:00.000Z"のようなタイムスタンプ文字列、または"2019-01-01"のような日付文字列を受け入れます。

startingVersion

最新の情報

Deltaテーブルのバージョンを読み取り開始します。 このストリームは、指定されたバージョン以降にコミットされたすべての変更を読み取ります。最新の変更のみから開始するには、 "latest"指定してください。startingTimestampと併用することはできません。ストリーミングチェックポイントが既に存在する場合は無視されます。テーブル履歴の操作を参照してください。

withEventTimeOrder

false

初期テーブルスナップショットをイベント時間バケットに分割することで、レコードが誤って遅延イベントとしてマークされ、ウォーターマーク付きのステートフルクエリで削除されるのを防ぎます。初期スナップショット処理が開始された後は、チェックポイントを削除しない限り変更できません。Databricks Runtime 11.3 LTS以降で利用可能です。データを削除せずに初期スナップショットを処理する方法については、「データを削除せずに処理する」を参照してください。

DataFrameWriterのオプション

これらのオプションをDataFrameWriter.option()DataFrameWriterV2.option()と組み合わせて使用すると、Databricksがデータを書き込む方法を制御できます。

次の例では、Delta Lake テーブルを書き込む際にmergeSchemaTrueに設定します。

Python
df.write.format("delta").option("mergeSchema", True).saveAsTable("my_table")

Avro

Key

デフォルト

説明

avroSchema

なし

JSON文字列としての完全なAvroスキーマ。 このオプションを使用して、Spark SQLの型を特定のAvro型に変換します。Avroファイルに適用されます。

avroSchemaUrl

なし

Avroスキーマファイルを指すURL。スキーマが外部に保存されている場合は、 avroSchemaの代わりにこれを使用してください。avroSchemaとは相互排他的である。Avroファイルに適用されます。

compression

snappy

書き込み時に使用する圧縮コーデック。有効な値: uncompresseddeflatesnappybzip2xzzstandardAvroファイルに適用されます。

recordName

topLevelRecord

出力されるAvroスキーマにおける最上位レコード名。Avroファイルに適用されます。

positionalFieldMatching

false

SparkスキーマとAvroスキーマ間の列を、名前ではなくフィールド位置で照合するかどうか。Avroファイルに適用されます。

recordNamespace

空の文字列

出力Avroスキーマにおける最上位レコードの名前空間。Avroファイルに適用されます。

Delta LakeとApache Iceberg

Key

デフォルト

説明

clusterByAuto

false

クエリパターンに基づいてDatabricksがクラスタリング列を選択する自動リキッドクラスタリングを有効にするかどうか。mode("overwrite")の場合のみ有効です。appendモードでは使用できません。Databricks Runtime 16.4以降で利用可能です。テーブルにリキッドクラスタリングを使用するために適用されます。

mergeSchema

なし

書き込み操作においてスキーマ進化を有効にするかどうか。生成されたDataFrameの新しい列が、ターゲットテーブルのスキーマに追加されます。 バッチ追記とストリーミング追記の両方に適用されます。テーブルスキーマの更新に適用されます。

overwriteSchema

なし

上書き時にテーブルスキーマとパーティショニングを置き換えるかどうか。replaceWhereなしでmode("overwrite")が必要です。partitionOverwriteModeと併用できません。テーブルスキーマの更新に適用されます。

partitionOverwriteMode

なし

パーティションの上書きモード。新しいデータを含むパーティションのみを上書きし、その他のパーティションは変更しないよう、この値をdynamicに設定してください。レガシー モード。サーバーレス コンピュートまたはDatabricks SQLではサポートされていません。 有効な値: staticdynamicDelta Lake を使用してデータを選択的に上書きする場合に適用されます。

replaceOn

なし

対象テーブルの行を、パッケージクエリからの行で置き換えるブール式。 対象テーブルとソースクエリの両方の列を参照できます。ターゲット内の行のうち、ソースの行と一致する行は削除され、置き換えられます。ソースが空の場合、削除は行われません。列参照の曖昧さを解消するには、 targetAliasを使用してください。Databricks Runtime 17.1以降で利用可能です。Delta Lake を使用してデータを選択的に上書きする場合に適用されます。

replaceUsing

なし

対象テーブルとソースクエリ間の行を照合するために使用される、カンマ区切りの列名のリスト。ターゲットとソースの両方に、リストされているすべての列が含まれている必要があります。対象レコード内の行のうち、ソースレコードの行と等価比較で一致する行は削除され、置き換えられます。NULL値は等しくないものとみなされ、一致しません。Databricks Runtime 16.3以降で利用可能です。Delta Lake を使用してデータを選択的に上書きする場合に適用されます。

replaceWhere

なし

述語式。述語に一致するレコードのみをアトミックに上書きします。Delta Lake を使用してデータを選択的に上書きする場合に適用されます。

targetAlias

なし

対象テーブルの文字列エイリアス。条件が対象テーブルとソースクエリの両方の列を参照する場合、 replaceOnまたはreplaceWhereと組み合わせて使用すると、列参照の曖昧さを解消できます。Delta Lake を使用してデータを選択的に上書きする場合に適用されます。

txnAppId

なし

foreachBatchの操作で冪等書き込みを行うアプリケーションを識別する一意の文字列。複数の Delta Lake テーブルへの書き込みが正確に 1 回のみ行われるようにするには、 txnVersionと併用してください。適用対象:冪等テーブル書き込みにはforeachBatchを使用します

txnVersion

なし

foreachBatch操作における冪等書き込みのトランザクション バージョンとして使用される、単調増加する数値。複数の Delta Lake テーブルへの書き込みが正確に 1 回のみ行われるようにするには、 txnAppIdと併用してください。適用対象:冪等テーブル書き込みにはforeachBatchを使用します

optimizeWrite

なし

この書き込み操作で自動最適化書き込みを有効にするかどうか。spark.databricks.delta.optimizeWrite.enabled設定を上書きします。DatabricksにおけるDelta Lakeとは何か?に適用されます。

userMetadata

なし

書き込み操作のコミットメタデータに追加される、ユーザー定義の文字列。DESCRIBE HISTORYの出力に表示されます。カスタムメタデータを使用してテーブルを拡張する場合に適用されます。

CSV

Key

デフォルト

説明

charToEscapeQuoteEscaping

\0 (無効)

エスケープ文字が引用文字と異なる場合に、エスケープ文字をエスケープするために使用される文字。csv (DataFrameWriter)に適用されます。

compression

none

書き込み時に使用する圧縮コーデック。有効な値: nonebzip2gziplz4snappydeflatezstdcsv (DataFrameWriter)に適用されます。

dateFormat

yyyy-MM-dd

日付列の値の書式指定文字列。csv (DataFrameWriter)に適用されます。

emptyValue

空の文字列

空の値(null以外の値)に対して書き込まれる文字列。csv (DataFrameWriter)に適用されます。

encoding

UTF-8

出力ファイルの文字エンコーディング。csv (DataFrameWriter)に適用されます。

escape

\

引用符で囲まれた値をエスケープするために使用される文字。csv (DataFrameWriter)に適用されます。

escapeQuotes

true

引用符で囲まれたフィールド値内の引用符文字をエスケープするかどうか。csv (DataFrameWriter)に適用されます。

header

false

出力の最初の行に列名を表示するかどうか。csv (DataFrameWriter)に適用されます。

ignoreLeadingWhiteSpace

false

値を書き込む際に、先頭の空白文字を削除するかどうか。csv (DataFrameWriter)に適用されます。

ignoreTrailingWhiteSpace

false

値を書き込む際に、末尾の空白文字を削除するかどうか。csv (DataFrameWriter)に適用されます。

lineSep

\n

レコード間で使用される行区切り文字列。csv (DataFrameWriter)に適用されます。

locale

en-US

java.util.Locale識別子。書き込み時の日付とタイムスタンプの値の書式設定に影響します。

nullValue

空の文字列

null値に対して書き込まれた文字列。csv (DataFrameWriter)に適用されます。

quote

"

区切り文字を含むフィールド値を引用するために使用される文字。csv (DataFrameWriter)に適用されます。

quoteAll

false

内容に関わらず、すべてのフィールド値を引用符で囲むかどうか。csv (DataFrameWriter)に適用されます。

sep

,

フィールド区切り文字。csv (DataFrameWriter)に適用されます。

timestampFormat

yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]

タイムスタンプ列の値の書式指定文字列。csv (DataFrameWriter)に適用されます。

timestampNTZFormat

yyyy-MM-dd'T'HH:mm:ss[.SSS]

タイムゾーン( TimestampNTZType )列の値を含まないタイムスタンプのフォーマット文字列。

Excel

Key

デフォルト

説明

dataAddress

なし

書き込みを開始するシート名または開始セル。省略した場合、 Sheet1という名前のシートのセルA1から書き込みを開始します。シート名( "SheetName" )または単一セル参照( "SheetName!A1" )を受け入れます。セル範囲指定による書き込みはサポートされていません。

dateFormatInWrite

yyyy-mm-dd

Date列にExcelセル書式文字列が適用されました。Excel形式の構文を使用します。

headerRows

0

列名を最初の行に表示するかどうか。有効な値: 01

timestampNTZFormat

yyyy-mm-dd hh:mm:ss

TimestampNTZTimestamp列にExcelのセル書式文字列が適用されました。Excel形式の構文を使用します。

version

xlsx

書き込むExcelファイルの形式バージョン。有効な値: xlsxxls

JSON

Key

デフォルト

説明

compression

none

書き込み時に使用する圧縮コーデック。有効な値: nonebzip2gziplz4snappydeflatezstdJSON (DataFrameWriter)に適用されます。

dateFormat

yyyy-MM-dd

日付列の値の書式指定文字列。JSON (DataFrameWriter)に適用されます。

encoding

UTF-8

出力ファイルの文字エンコーディング。JSON (DataFrameWriter)に適用されます。

ignoreNullFields

spark.sql.jsonGenerator.ignoreNullFields

JSON出力からnull値を持つフィールドを除外するかどうか。JSON (DataFrameWriter)に適用されます。

lineSep

\n

レコード間で使用される行区切り文字列。JSON (DataFrameWriter)に適用されます。

locale

en-US

java.util.Locale識別子。書き込み時の日付とタイムスタンプの値の書式設定に影響します。

pretty

false

整形された(インデント付き、複数行)JSON出力を有効にするかどうか。

sortKeys

false

出力時にJSONオブジェクトのキーをアルファベット順に並べ替えるかどうか。決定論的な出力を生成するのに役立ちます。

timestampFormat

yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]

タイムスタンプ列の値の書式指定文字列。JSON (DataFrameWriter)に適用されます。

timestampNTZFormat

yyyy-MM-dd'T'HH:mm:ss[.SSS]

タイムゾーン( TimestampNTZType )列の値を含まないタイムスタンプのフォーマット文字列。

writeNonAsciiCharacterAsCodePoint

false

出力時に、非ASCII文字をリテラルUTF-8文字ではなく、Unicodeエスケープシーケンス\uXXXXとしてエンコードするかどうか。

ORC

Key

デフォルト

説明

compression

zstd

書き込み時に使用する圧縮コーデック。有効な値: noneuncompressedsnappyzliblzozstdlz4brotliorc (DataFrameWriter)に適用されます。

Parquet

Key

デフォルト

説明

compression

snappy

書き込み時に使用する圧縮コーデック。有効な値: noneuncompressedsnappygziplzobrotlilz4lz4_rawzstdParquet (DataFrameWriter)に適用されます。

spark.sql.parquet.outputTimestampType

INT96

タイムスタンプ列をエンコードするために使用される物理型。有効な値: INT96TIMESTAMP_MICROSTIMESTAMP_MILLIS 。標準のタイムスタンプ型をサポートしていない旧式の Parquet リーダーとの互換性を確保するには、 INT96使用してください。

文章

Key

デフォルト

説明

compression

none

書き込み時に使用する圧縮コーデック。有効な値: nonebzip2gziplz4snappydeflatezstdテキスト(DataFrameWriter)に適用されます。

encoding

UTF-8

出力ファイルの文字エンコーディング。

lineSep

\n

レコード間で使用される行区切り文字列。テキスト(DataFrameWriter)に適用されます。

XML

Key

デフォルト

説明

arrayElementName

item

明示的な名前を持たない配列要素の要素名。XML (DataFrameWriter)に適用されます。

attributePrefix

_

XML属性に対応するフィールド名の前に付加される接頭辞。XML (DataFrameWriter)に適用されます。

compression

none

書き込み時に使用する圧縮コーデック。有効な値: nonebzip2gziplz4snappydeflatezstdXML (DataFrameWriter)に適用されます。

dateFormat

yyyy-MM-dd

日付列の値の書式指定文字列。XML (DataFrameWriter)に適用されます。

declaration

version="1.0" encoding="UTF-8" standalone="yes"

各出力ファイルの先頭に記述されるXML宣言文字列。宣言を抑制するには、空の文字列を設定します。XML (DataFrameWriter)に適用されます。

encoding

UTF-8

出力ファイルの文字エンコーディング。XML (DataFrameWriter)に適用されます。

indent

4つのスペース

出力において子要素をインデントするために使用される文字列。インデントをオフにして各行を1行に表示するには、空の文字列に設定します。

locale

en-US

java.util.Locale識別子。書き込み時の日付とタイムスタンプの値の書式設定に影響します。

nullValue

null

null値に対して書き込まれる文字列。nullに設定すると、nullフィールドの属性と子要素は省略されます。XML (DataFrameWriter)に適用されます。

rootTag

ROWS

出力内のすべての行要素を囲むルート要素タグ。XML (DataFrameWriter)に適用されます。

rowTag

ROW

出力における行を表す要素タグ。XML (DataFrameWriter)に適用されます。

singleVariantColumn

なし

XMLファイルに書き込む単一のバリアント列の名前。XML (DataFrameWriter)に適用されます。

timestampFormat

yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]

タイムスタンプ列の値の書式指定文字列。XML (DataFrameWriter)に適用されます。

timestampNTZFormat

yyyy-MM-dd'T'HH:mm:ss[.SSS]

タイムゾーン列の値を含まないタイムスタンプのフォーマット文字列。XML (DataFrameWriter)に適用されます。

validateName

true

列名が有効な XML 要素識別子でない場合に例外をスローするかどうか。XML (DataFrameWriter)に適用されます。

valueTag

_VALUE

属性または子要素を持つXML要素内の文字データに使用されるフィールド名。XML (DataFrameWriter)に適用されます。

DataStreamWriterのオプション

ストリーミング書き込みを設定するには、これらのオプションをDataStreamWriter.option()と組み合わせて使用します。

次の例は、ストリームのチェックポイントの位置を設定するものです。

Python
(df.writeStream
.format("delta")
.option("checkpointLocation", "/path/to/checkpoint")
.start("/path/to/table"))

一般

Key

デフォルト

説明

checkpointLocation

なし(必須)

ストリーミングクエリのチェックポイントディレクトリへのパス。耐障害性と、厳密に1回のみの処理を保証するために必要です。各ストリーミングクエリは、固有のチェックポイント位置を使用する必要があります。Databricks 、チェックポイントをUnity Catalogボリュームまたはクラウド ストレージ パスに保存することをお勧めします。 構造化ストリーミングのチェックポイントを参照してください。

path

なし

Parquetなどのファイルベースのストリーミングシンクの出力パス。ファイルベースのフォーマットにのみ適用されます。

コンソールシンク

Key

デフォルト

説明

numRows

20

コンソールシンクに書き込む際に、マイクロバッチごとに表示する行数。

truncate

true

行を表示する際に、長い文字列を切り詰めるかどうか。文字列値全体を表示するには、 falseに設定してください。

Delta Lake

format("delta")を使用してストリームを Delta Lake テーブルに書き込む場合、以下のオプションが適用されます。overwriteSchemareplaceWherepartitionOverwriteModeなどの上書き専用オプションは、ストリーミング書き込みではサポートされていません。

Key

デフォルト

説明

mergeSchema

false

ストリーミングDataFrameに新しい列が含まれている場合に、 Delta Lakeテーブル スキーマを進化させるかどうか。 追記出力モードにのみ適用されます。テーブルスキーマの更新に適用されます。

userMetadata

なし

書き込み操作のコミットメタデータに追加される、ユーザー定義の文字列。DESCRIBE HISTORYの出力に表示されます。カスタムメタデータを使用してテーブルを拡張する場合に適用されます。

ファイルシンク

以下のオプションは、ストリームをファイルベースの形式(Parquet、JSON、CSV、ORC、テキスト)に書き込む場合に適用されます。フォーマット固有のオプションについては、 DataFrameWriter オプションを参照してください。

Key

デフォルト

説明

retention

なし

耐障害性および圧縮に使用されるシンクメタデータファイルをどのくらいの期間保持するか。7 days24 hoursのような時間文字列を受け入れます。設定されていない場合、メタデータファイルは無期限に保持されます。

Kafkaシンク

Kafka にストリームを書き込むためのオプションの完全なリストについては、 「オプション」を参照してください。

Key

デフォルト

説明

kafka.bootstrap.servers

なし

必須。カンマ区切りの Kafka ブローカーhost:portアドレスのリスト。

topic

なし

すべての行の対象となるKafkaトピック。DataFrameにtopic列が含まれていない場合は必須です。

kafka.*

なし

kafka.で始まるKafka プロデューサー構成。例えば、 kafka.compression.type

メモリシンク

Key

デフォルト

説明

queryName

なし(必須)

クエリが書き込みを行うインメモリテーブルの名前。メモリシンクに必要です。.queryName()を介して設定することも可能です。

mode

exactlyonce

メモリシンクの配送保証。exactlyonce 、厳密に一度だけ実行されるセマンティクスを持つマイクロバッチモードを使用します。atleastonce 、少なくとも 1 回という意味論を持つ連続モードを使用します。有効な値: exactlyonceatleastonce

Spark関数オプション

Spark SQLの一部の組み込み関数は、解析またはシリアル化の動作を制御するoptionsマップを受け入れます。オプションをPython dictまたはScala Map[String, String]として渡します。

次の例は、JSON列を解析しながら、不正な形式のレコードを削除します。

Python
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StructField, StringType

schema = StructType([StructField("name", StringType())])
df = df.withColumn("parsed", from_json("json_col", schema, {"mode": "DROPMALFORMED"}))

Avro

Avro関数は、対応するDataFrameオプションと同じオプションを受け入れます。

次の例は、スキーマ進化が有効になっているAvroカラムをデコードするものです。

Python
from pyspark.sql.functions import from_avro

df = df.withColumn("decoded", from_avro("avro_col", json_schema, {"avroSchemaEvolutionMode": "restart"}))

さらに、スキーマレジストリのバリアントfrom_avroto_avroは、以下のオプションを受け入れます。

Key

デフォルト

説明

schemaId

なし

jsonFormatSchemaと互換性のないスキーマでエンコードされた Avro データをデコードする際に使用する Confluent Schema Registry のスキーマ ID。from_avroのみに適用されます。

confluent.schema.registry.*

なし

Confluent Schema Registryクライアント構成プロパティ。Confluent SRクライアントのプロパティを渡す際は、このプレフィックスを使用します。例えば、基本認証情報の場合はconfluent.schema.registry.basic.auth.user.info使用します。スキーマレジストリのバリアントfrom_avroおよびto_avroに必要です。

CSV

CSV関数は、対応するDataFrameのオプションと同じオプションを受け入れます。

以下の例は、カスタム区切り文字とNULL値を含むCSVファイルを読み込む例です。

Python
from pyspark.sql.functions import from_csv
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

schema = StructType([StructField("id", IntegerType()), StructField("name", StringType())])
df = df.withColumn("parsed", from_csv("csv_col", schema, {"sep": "|", "nullValue": "N/A"}))

JSON

JSON関数は、対応するDataFrameオプションと同じオプションを受け入れます。

次の例では、 NULLのフィールドを無視し、整形フォーマットを有効にしたJSONを書き込みます。

Python
from pyspark.sql.functions import to_json

df = df.withColumn("json_str", to_json("struct_col", {"pretty": "true", "ignoreNullFields": "true"}))

Protobuf

from_protobuf また、 to_protobufファイルベースのデータソースを使用しません。これらの関数を使用すると、Protobufデータは常にバイナリ列として読み書きされます。オプションはMap[String, String]として渡され、大文字と小文字が区別されます。

次の例は、PERMISSIVEモードを使用してProtobufカラムをデコードする例です。

Python
from pyspark.sql.functions import from_protobuf

df = df.withColumn("decoded", from_protobuf("proto_col", "MyMessage", "/path/to/descriptor.desc",
{"mode": "PERMISSIVE", "enums.as.ints": "true"}))

Protobuf関数は以下のオプションを使用します。

Key

デフォルト

説明

mode

FAILFAST

破損した記録の対処方法。FAILFAST例外をスローします。PERMISSIVE不正なフィールドを null に設定します。有効な値: FAILFASTPERMISSIVEfrom_protobufに適用されます。

recursive.fields.max.depth

-1 (無効)

再帰的なProtobufフィールドの最大再帰深度。再帰フィールドのサポートを無効にするには、 0に設定してください。有効な値: 010from_protobufに適用されます。

convert.any.fields.to.json

false

Protobuf AnyフィールドをSTRUCTではなく JSON 文字列に変換するかどうか。from_protobufに適用されます。

emit.default.values

false

フィールドをゼロまたはデフォルト値で出力するかどうか(proto3セマンティクス)。falseの場合、デフォルト値を持つフィールドは出力から省略されます。from_protobufに適用されます。

enums.as.ints

false

列挙型フィールドを文字列ではなく整数値としてレンダリングするかどうか。from_protobufに適用されます。

upcast.unsigned.ints

false

整数オーバーフローを防ぐために、 uint32Longに、 uint64Decimal(20,0)にアップキャストするかどうか。from_protobufに適用されます。

unwrap.primitive.wrapper.types

false

google.protobufラッパー型 (例えばInt32ValueStringValue ) を対応するプリミティブ Spark 型に展開するかどうか。from_protobufに適用されます。

retain.empty.message.types

false

出力スキーマに空のProtobufメッセージ型を保持するかどうか(ダミー列を挿入して)。from_protobufに適用されます。

schema.registry.subject

なし

スキーマレジストリのサブジェクト名。スキーマレジストリのバリアントfrom_protobufおよびto_protobufを使用する場合に必要です。

schema.registry.address

なし

スキーマレジストリのアドレス(ホスト名とポート番号)。スキーマレジストリのバリアントfrom_protobufおよびto_protobufを使用する場合に必要です。

schema.registry.protobuf.name

なし

スキーマレジストリのサブジェクトに複数のメッセージが含まれている場合に、どのProtobufメッセージを使用するかを指定します。任意。

XML

XML関数は、対応するDataFrameオプションと同じオプションを受け入れます。

次の例では、カスタムのルートタグと行タグを使用してXMLを書き込みます。

Python
from pyspark.sql.functions import to_xml

df = df.withColumn("xml_str", to_xml("struct_col", {"rootTag": "records", "rowTag": "record"}))