メインコンテンツまでスキップ

Spark APIオプションのリファレンス

このページでは、データの読み書きを行うSpark APIsで利用可能な入力および出力オプションを一覧表示します。

DataFrameReaderのオプション

これらのオプションをDataFrameReader.option()DataFrameReader.options()read_filesCOPY INTO 、およびAuto Loaderと組み合わせて使用すると、Databricksがデータファイルを読み込む方法を制御できます。

次の例では、JSON ファイルの読み込み時にmultiLineTrueに設定します。

Python
df = spark.read.format("json").option("multiLine", True).load("/path/to/data")

一般

次のオプションは、すべてのファイル形式に適用されます。

Key

デフォルト

有効な値

説明

ignoreCorruptFiles

false

true, false

破損したファイルを無視するかどうか。もしそうであれば、 Spark破損したファイルに遭遇しても実行を続け、読み取られた内容は引き続き返されます。 COPY INTOの場合、スキップされた破損ファイルは、Delta Lake 履歴のoperationMetrics列でnumSkippedCorruptFilesとして確認できます。Databricks Runtime 11.3 LTS以降で利用可能です。

ignoreMissingFiles

false Auto Loaderの場合、 trueCOPY INTO (レガシー)

true, false

欠落したファイルを無視するかどうか。true の場合、ファイルが見つからない場合でもSparkジョブは実行を続行し、コンテンツは引き続き返されます。 Databricks Runtime 11.3 LTS以降で利用可能です。

modifiedAfter

なし

タイムスタンプ文字列

指定されたタイムスタンプより後の変更タイムスタンプを持つファイルのみを、フィルターとして取り込むためのオプションのタイムスタンプ。

modifiedBefore

なし

タイムスタンプ文字列

指定されたタイムスタンプより前の変更タイムスタンプを持つファイルのみをフィルタリングして取り込むためのオプションのタイムスタンプ。

pathGlobFilter または fileNamePattern

なし

glob パターン文字列

ファイルを選択するための潜在的なglobパターンです。COPY INTOPATTERN に相当します (レガシー)。fileNamePatternread_filesで使用できます。

recursiveFileLookup

false

true, false

trueの場合、このオプションは、名前がdate=2019-07-01ようなパーティション命名規則に従っていない場合でも、ネストされたディレクトリを検索します。

Avro

以下のオプションは、Avroファイルを読み取る際に適用されます。

Key

デフォルト

有効な値

説明

avroSchema

なし

Avroスキーマの文字列

ユーザーがAvroフォーマットで指定するオプションのスキーマAvroを読み取る際、このオプションは、互換性はあるが実際のAvroスキーマとは異なる、進化したスキーマに設定することができます。逆シリアル化スキーマは、進化したスキーマと一致します。たとえば、デフォルト値を持つ1つの追加列を含む進化したスキーマを設定すると、読み取り結果には新しい列も含まれます。

avroSchemaEvolutionMode

none

none, restart

スキーマレジストリ利用時のスキーマ進化の対処方法noneはスキーマ変更を無視し、ジョブを続行します。スキーマの変更が検出されると、restartUnknownFieldException を発生させ、ジョブの再起動が必要になります。

datetimeRebaseMode

LEGACY

EXCEPTIONLEGACYCORRECTED

ユリウス暦と先発グレゴリオ暦の間でのDATE値とTIMESTAMP値のリベースを制御します。

enableStableIdentifiersForUnionType

false

true, false

Avro Union型に安定したフィールド名を使用するかどうか。有効にすると、共用型フィールド名は、その型名を小文字にしたものから派生します(例: member_intmember_string )。小文字に変換した後の型名が同じ場合、例外をスローします。

mergeSchema

false

true, false

複数のファイルにまたがるスキーマを推測し、各ファイルのスキーマをマージするかどうか。 AvroのmergeSchema では、データ型は緩和されません。

mode

FAILFAST

FAILFASTPERMISSIVEDROPMALFORMED

不正な形式のレコードの処理に関するパーサーモード。FAILFASTは例外をスローします。PERMISSIVE は不正な形式のフィールドを null に設定します。DROPMALFORMED は不正なレコードを自動的に破棄します。

readerCaseSensitive

true

true, false

rescuedDataColumnが有効になっている場合の大文字小文字の区別動作を指定します。もしそうであれば、スキーマから大文字小文字が異なる名前のデータ列を復元します。偽の場合、大文字小文字を区別せずにデータを読み込む。

recursiveFieldMaxDepth

なし

0 - 15

再帰的なAvroフィールドの最大深度すべての再帰フィールドを切り捨てるには1に設定し、1レベルの再帰を許可するには2に設定します。以降、15まで設定できます。未設定の場合、または0の場合、再帰フィールドは許可されません。

rescuedDataColumn

なし

列名文字列

データ型の不一致、スキーマの不一致(列の大文字小文字の区別を含む)などが原因で解析できないすべてのデータを、別の列に収集するかどうか。この列は、 Auto Loader使用するときに自動的に組み込まれます。

COPY INTO (レガシー)は、 COPY INTOを使用してスキーマを手動で設定できないため、救出されたデータ列をサポートしていません。Databricksは、ほとんどのデータ取り込みシナリオにおいてAuto Loaderの使用を推奨しています。

詳細については、 「救出されたデータ列とは何ですか?」を参照してください。

stableIdentifierPrefixForUnionType

member_

任意の文字列

enableStableIdentifiersForUnionType=trueの場合に安定共用型フィールド名に使用する接頭辞。

CSV

CSVファイルを読み取る際に、次のオプションが適用されます。

Key

デフォルト

有効な値

説明

badRecordsPath

なし

パス文字列

不正なCSVレコードに関する情報を記録するファイルを保存するパス。

charToEscapeQuoteEscaping

\0

1文字

引用符のエスケープに使用される文字をエスケープするために使用される文字。たとえば、次のレコードの場合:[ " a\\", b ]

  • '\'をエスケープするための文字が未定義の場合、レコードは解析されません。パーサーは文字[a],[\],["],[,],[ ],[b]を読み取りますが、閉じ引用符が見つからないためエラーをスローします。
  • '\'をエスケープする文字が'\'として定義されている場合、レコードは2つの値[a\][b]で読み取られます。

columnNameOfCorruptRecord

_corrupt_record

列名文字列

Auto Loaderに対応しています。 COPY INTO (レガシー) ではサポートされていません。形式が不正で解析できないレコードを格納するための列。解析用のmodeDROPMALFORMEDに設定されている場合、この列は空になります。

comment

\0

1文字

テキスト行の先頭にある場合に、行コメントを表す文字を定義します。コメントのスキップを無効にするには、'\0'を使用します。

dateFormat

yyyy-MM-dd

日付形式文字列

日付文字列を解析するための形式。

emptyValue

空の文字列

任意の文字列

空の値の文字列形式。

enableDateTimeParsingFallback

false

true, false

指定された形式で値を解析できない場合に、従来の日付およびタイムスタンプの解析動作に戻すかどうか。falseの場合、解析の失敗はmodeに応じてエラーを発生させるか null を生成します。

encoding または charset

UTF-8

java.nio.charset.Charsetの名前

CSVファイルのエンコード名。オプションのリストについては、java.nio.charset.Charsetを参照してください。multilinetrueの場合、 UTF-16UTF-32 は使用できません。

enforceSchema

true

true, false

指定したスキーマまたは推論されたスキーマを CSV ファイルに強制的に適用するかどうか。 このオプションを有効にすると、CSV ファイルのヘッダーは無視されます。 このオプションは、 Auto Loader を使用してデータをレスキューし、スキーマ進化を許可する場合、デフォルトで無視されます。

escape

\

1文字

データの解析時に使用するエスケープ文字。

extension

csv

ファイル拡張子文字列

リードの想定されるファイル拡張子です。この拡張子がないファイルは除外されます。

failOnUnknownFields

false

true, false

CSVレコードにスキーマに存在しない列が含まれている場合に、エラーとして処理するかどうか。falseの場合、認識されない列はrescuedDataColumnに応じてサイレントに削除または救済されます。

failOnWidenedFields

false

true, false

フィールド値が、拡張せずに宣言されたスキーマ型として解析できない場合に、エラーとするかどうか。falseの場合、型拡張された値はrescuedDataColumnに応じて暗黙的に救済されます。設定failOnUnknownFields=trueは、このオプションの効果を隠蔽する可能性があります。

header

false

true, false

CSVファイルにヘッダーが含まれているかどうか。Auto Loaderは、スキーマを推論するときにファイルにヘッダーがあると想定します。

ignoreLeadingWhiteSpace

false

true, false

解析された各値の先頭の空白を無視するかどうか。

ignoreTrailingWhiteSpace

false

true, false

解析された各値の末尾の空白を無視するかどうか。

inferSchema

false

true, false

解析されたCSVレコードのデータ型を推測するか、すべての列がStringTypeであると仮定するかどうか。trueに設定されている場合は、データの追加パスが必要です。Auto Loaderでは、代わりにcloudFiles.inferColumnTypesを使用します。

inputBufferSize

1048576 (1MB)

正の整数

CSVパーサーのバッファサイズ(バイト単位)。大規模なCSVファイルを解析する際のメモリ使用量の調整に役立ちます。

lineSep

なし、これは\r\r\n 、およびをカバーします \n

文字列

連続する2つのCSVレコード間の文字列。

locale

US

java.util.Locale識別子

Javaロケール識別子。CSV内でのデフォルトの日付、タイムスタンプ、および小数点の解析に影響を与えます。

maxCharsPerColumn

-1

正の整数、または(-1は無制限)

解析する値から予想される最大文字数。メモリエラーを回避するために使用できます。デフォルトは-1で、無制限を意味します。

maxColumns

20480

正の整数

レコードに含めることができる列数のハードリミット。

mergeSchema

false

true, false

複数のファイルにまたがるスキーマを推測し、各ファイルのスキーマをマージするかどうか。 スキーマを推論するときに Auto Loader するためにデフォルトで有効になります。

mode

PERMISSIVE

PERMISSIVEDROPMALFORMEDFAILFAST

不正な形式のレコードの処理に関するパーサーモード。

multiLine

false

true, false

CSVレコードが複数行にまたがるかどうか。

nanValue

NaN

任意の文字列

FloatType列およびDoubleType列を解析するときの数値以外の値の文字列形式。

negativeInf

-Inf

任意の文字列

FloatTypeまたはDoubleType列を解析するときの負の無限大の文字列表現。

nullValue

空の文字列

任意の文字列

null値の文字列形式。

parserCaseSensitive (非推奨)

false

true, false

ファイルの読み取り中に、ヘッダーで宣言された列をスキーマの大文字と小文字を区別して配置するかどうか。Auto Loaderのデフォルトでは、これはtrueです。有効にすると、大文字と小文字が異なる列がrescuedDataColumnで救出されます。このオプションは廃止され、readerCaseSensitiveが採用されました。

positiveInf

Inf

任意の文字列

FloatTypeまたはDoubleType列を解析するときの正の無限大の文字列表現。

preferDate

true

true, false

可能な場合は、文字列をタイムスタンプではなく日付として推測しようとします。またinferSchemaを有効にするか、Auto LoaderでcloudFiles.inferColumnTypesを使用して、スキーマ推論を使用する必要があります。

quote

"

1文字

フィールド区切り文字が値の一部である場合に、値をエスケープするために使用される文字。

readerCaseSensitive

true

true, false

rescuedDataColumnが有効になっている場合の大文字小文字の区別動作を指定します。もしそうであれば、スキーマから大文字小文字が異なる名前のデータ列を復元します。偽の場合、大文字小文字を区別せずにデータを読み込む。

rescuedDataColumn

なし

列名文字列

データ型の不一致、スキーマの不一致(列の大文字小文字の区別を含む)などが原因で解析できないすべてのデータを、別の列に収集するかどうか。この列は、 Auto Loader使用するときに自動的に組み込まれます。 詳細については、 「救出されたデータ列とは何ですか?」を参照してください。

COPY INTO (レガシー)は、 COPY INTOを使用してスキーマを手動で設定できないため、救出されたデータ列をサポートしていません。Databricksは、ほとんどのデータ取り込みシナリオにおいてAuto Loaderの使用を推奨しています。

sep または delimiter

,

文字列

列間の区切り記号文字列。

singleVariantColumn

なし

列名文字列

列名を指定すると、各フィールドを個別の列に解析するのではなく、CSVレコード全体をその名前の単一のVariantType列に読み込みます。必要条件: header=true

skipRows

0

正の整数または 0

CSVファイルの先頭から無視すべき行数(コメント行や空行を含む)。headerがtrueの場合、ヘッダーはスキップされていない、コメントされていない最初の行になります。

timeFormat

HH:mm:ss

時刻形式文字列

TimeType列の値を解析するためのフォーマット。

timestampFormat

yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]

タイムスタンプ形式の文字列

タイムスタンプ文字列を解析するための形式。

timestampNTZFormat

yyyy-MM-dd'T'HH:mm:ss[.SSS]

タイムスタンプ形式の文字列

タイムゾーン( TimestampNTZType )文字列を含まないタイムスタンプを解析するためのフォーマット。

timeZone

なし

java.time.ZoneId 文字列

タイムスタンプと日付を解析するときに使用するjava.time.ZoneId

unescapedQuoteHandling

STOP_AT_DELIMITER

STOP_AT_CLOSING_QUOTEBACK_TO_DELIMITERSTOP_AT_DELIMITERSKIP_VALUERAISE_ERROR

エスケープされていない引用符を処理するための戦略。各許可オプションの動作は次のとおりです:

  • STOP_AT_CLOSING_QUOTE:入力内でエスケープされていない引用符が見つかった場合は、引用符文字を蓄積し、閉じ引用符が見つかるまで値を引用符で囲まれた値として解析します。
  • BACK_TO_DELIMITER:入力内にエスケープされていない引用符が見つかった場合、その値は引用符で囲まれていない値とみなされます。これにより、パーサーは、sepで定義された区切り文字が見つかるまで、現在の解析値のすべての文字を蓄積します。値に区切り文字が見つからない場合、パーサーは区切り文字または行末が見つかるまで入力から文字を蓄積し続けます。
  • STOP_AT_DELIMITER:入力内にエスケープされていない引用符が見つかった場合、その値は引用符で囲まれていない値とみなされます。これにより、パーサーは、sepで定義された区切り文字、または入力内で行末が見つかるまで、すべての文字を蓄積します。
  • SKIP_VALUE:入力内でエスケープされていない引用符が見つかった場合、指定された値に対して解析されたコンテンツは(次の区切り文字が見つかるまで)スキップされ、代わりにnullValueに設定された値が生成されます。
  • RAISE_ERROR:入力内にエスケープされていない引用符が見つかった場合は、TextParsingExceptionが投げられます。

Excel

次のオプションは、Excelファイルを読み込む場合に適用されます。

Key

デフォルト

有効な値

説明

dataAddress

なし

セル範囲またはシート名の文字列

Excel構文で読み込むセル範囲。省略した場合、最初のシートから有効なセルをすべて読み取ります。指定したシートから範囲を読み取るにはSheetName!C5:H10 、最初のシートから範囲を読み取るにはC5:H10 、特定のシートからすべてのデータを読み取るにはSheetName使用します。

headerRows

0

0, 1

列名ヘッダーとして使用する先頭行数dataAddressが指定されている場合、セル範囲内で適用されます。0の場合、列名が_c1_c2_c3などのように自動生成されます。

ignoreMissingSheet

false

true, false

dataAddressで指定されたシートが含まれていないファイルをスキップするかどうか。falseの場合、ファイルに要求されたシートがないと、エラーがスローされます。dataAddress でシート名が指定されている場合にのみ適用されます。

includePhoneticRuns

false

true, false

XLSXファイルを読み込む際に、セル文字列値にピンインやふりがななどの発音表記を連結して含めるかどうか。

operation

readSheet

readSheet, listSheets

Excelワークブック上で実行する操作。readSheet はシートからデータを読み込みます。listSheets各シートに対してフィールドsheetIndex: longsheetName: Stringを持つ構造体を返します。

timestampNTZFormat

yyyy-MM-dd'T'HH:mm:ss[.SSS]

タイムスタンプ形式の文字列

Excelに文字列として保存される、タイムゾーンなしのタイムスタンプ値のカスタム形式文字列。 カスタム日付形式は、Datetime パターンの形式に従います。

dateFormat

yyyy-MM-dd

日付形式文字列

文字列値のカスタムフォーマット文字列はDateとして読み取られます。カスタム日付形式は、Datetime パターンの形式に従います。

JSON

次のオプションは、JSONファイルを読み取る際に適用されます。

Key

デフォルト

有効な値

説明

allowBackslashEscapingAnyCharacter

false

true, false

バックスラッシュの後に続く文字のエスケープを許可するかどうか。有効にしない場合、JSON仕様で明示的にリストされている文字のみをエスケープできます。

allowComments

false

true, false

構文解析されたコンテンツ内で、Java、C、C++スタイルのコメント('/''*''//'の各種)の使用を許可するかどうか。

allowNonNumericNumbers

true

true, false

数値でない(NaN)トークンの集合を有効な浮動小数点数値として許可するかどうか。

allowNumericLeadingZeros

false

true, false

整数が追加の(無視できる)ゼロ(例えば、 000001 )で始まることを許可するかどうか。

allowSingleQuotes

true

true, false

文字列(名前と文字列値)の引用符付けに一重引用符(アポストロフィ、文字 '\')の使用を許可するかどうか。

allowUnquotedControlChars

false

true, false

JSON文字列に、エスケープされていない制御文字(タブ文字や改行文字を含む、値が32未満のASCII文字)を含めることを許可するかどうか。

allowUnquotedFieldNames

false

true, false

JavaScriptでは許可されているが、JSON仕様では許可されていない、引用符なしのフィールド名の使用を許可するかどうか。

alternateVariantEncoding

なし

Z85

バリアント値に使用されるエンコードは、 JSONの形式で記述されます。 インラインJSONとして保存される代わりにBase85エンコードされたVariant値をデコードするには、 Z85に設定します。

badRecordsPath

なし

パス文字列

不正なJSONレコードに関する情報を記録するためのファイルを保存するパス。

ファイルベースのデータソースでbadRecordsPathオプションを使用する場合、以下の制限があります。

  • これは取引を伴わないため、一貫性のない結果につながる可能性があります。
  • 一時的なエラーは障害として扱われます。

columnNameOfCorruptRecord

_corrupt_record

列名文字列

形式が正しくなく、解析できないレコードを格納するための列。解析用のmode DROPMALFORMEDに設定されている場合、この列は空になります。

dateFormat

yyyy-MM-dd

日付形式文字列

日付文字列を解析するための形式。

dropFieldIfAllNull

false

true, false

スキーマ推論中に、すべてのNULL値の列、または空の配列および構造体を無視するかどうか。

encoding または charset

UTF-8

java.nio.charset.Charsetの名前

JSONファイルのエンコーディングの名前。オプションのリストについては、java.nio.charset.Charsetを参照してください。multilinetrueの場合は、UTF-16UTF-32を使用できません。

inferTimestamp

false

true, false

タイムスタンプ文字列をTimestampTypeとして推測するかどうか。trueに設定すると、スキーマ推論に著しく時間がかかる場合があります。Auto Loaderで使用するには、 cloudFiles.inferColumnTypes有効にする必要があります。

lineSep

なし、これは\r\r\n 、およびをカバーします \n

文字列

連続する2つのJSONレコード間の文字列。

locale

US

java.util.Locale識別子

Javaロケール識別子は、JSON内のデフォルトの日付、タイムスタンプ、小数の解析に影響します。

maxNestingDepth

500

正の整数

JSONオブジェクトと配列の最大ネスト深度です。深く入れ子になったドキュメントに対して、この値を増やしてください。

maxNumLen

1000

正の整数

JSON入力における数値トークンの最大長。大きい数値リテラルを持つJSONの場合、この値を増やしてください。

maxStringLen

無制限

正の整数

JSON入力の文字列の最大長です。長い文字列を含むJSONの解析時に、メモリ使用量を制限するように設定します。

mode

PERMISSIVE

PERMISSIVEDROPMALFORMEDFAILFAST

不正な形式のレコードの処理に関するパーサーモード。

multiLine

false

true, false

JSONレコードが複数行にまたがるかどうか。

prefersDecimal

false

true, false

可能な場合は、float型またはdouble型ではなく、文字列をDecimalTypeとして推論しようとします。また、inferSchemaを有効にするか、cloudFiles.inferColumnTypesをAuto Loaderとともに使用することで、スキーマ推論を使用する必要があります。

primitivesAsString

false

true, false

数値やブール値などのプリミティブ型をStringTypeとして推論するかどうか。

readerCaseSensitive

true

true, false

rescuedDataColumnが有効になっている場合の大文字小文字の区別動作を指定します。もしそうであれば、スキーマから大文字小文字が異なる名前のデータ列を復元します。偽の場合、大文字小文字を区別せずにデータを読み込む。Databricks Runtime 13.3以降で利用可能です。

rescuedDataColumn

なし

列名文字列

データ型の不一致またはスキーマの不一致(列の大文字小文字の区別を含む)により解析できないすべてのデータを、別の列に収集するかどうか。この列は、 Auto Loader使用するときに自動的に組み込まれます。 詳細については、 「救出されたデータ列とは何ですか?」を参照してください。

COPY INTO (レガシー)は、 COPY INTOを使用してスキーマを手動で設定できないため、救出されたデータ列をサポートしていません。Databricksは、ほとんどのデータ取り込みシナリオにおいてAuto Loaderの使用を推奨しています。

singleVariantColumn

なし

列名文字列

JSONドキュメント全体を指定された文字列を列名とする単一のバリアント列に解析して取り込むかどうか。設定されていない場合、JSONフィールドは個別の列として取り込まれます。

timestampFormat

yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]

タイムスタンプ形式の文字列

タイムスタンプ文字列を解析するための形式。

timestampNTZFormat

yyyy-MM-dd'T'HH:mm:ss[.SSS]

タイムスタンプ形式の文字列

タイムゾーン( TimestampNTZType )文字列を含まないタイムスタンプを解析するためのフォーマット。

timeZone

なし

java.time.ZoneId 文字列

タイムスタンプと日付を解析するときに使用するjava.time.ZoneId

upgradeExceptionAsBadRecord

false

true, false

型アップグレード例外(例えば、値を宣言された列型に拡張できない場合など)を例外をスローするのではなく、不良レコードとして扱うかどうか。

Kafka

Kafkaリーダーのオプションの全リストについては、 「DataStreamReader Kafkaオプション」を参照してください。以下のオプションは、 spark.read.format("kafka")を使用したバッチ読み取りにのみ適用されます。

Key

デフォルト

有効な値

説明

endingOffsets

latest

latest、またはJSONオフセット文字列

どこで読み取りを停止するか。JSON 文字列では、-1 が最新のオフセットです。-2は、最も早いオフセットであるため、終了オフセットとして許可されていません。これはJSONオフセット文字列の例です:{"topicA":{"0":50,"1":-1}}

endingOffsetsByTimestamp

なし

JSONタイムスタンプ文字列

パーティションごとの終了オフセットは、ミリ秒単位のタイムスタンプとして指定されます。例:{"topicA":{"0":2000,"1":3000}}

endingTimestamp

なし

正の整数または 0

すべてのパーティションに適用される、ミリ秒単位のグローバル終了タイムスタンプ。

ORC

以下のオプションは、ORCファイルを読み込む際に適用されます。

Key

デフォルト

有効な値

説明

mergeSchema

false

true, false

複数ファイルからスキーマを推定し、各ファイルのスキーマをマージするかどうか。

Parquet

Parquet ファイルを読み取る際に、以下のオプションが適用されます。

Key

デフォルト

有効な値

説明

datetimeRebaseMode

LEGACY

EXCEPTIONLEGACYCORRECTED

ユリウス暦と先発グレゴリオ暦の間でのDATE値とTIMESTAMP値のリベースを制御します。

int96RebaseMode

LEGACY

EXCEPTIONLEGACYCORRECTED

INT96のタイムスタンプ値をユリウス暦と先発グレゴリオ暦の間でリベースすることを制御します。

mergeSchema

false

true, false

複数ファイルからスキーマを推定し、各ファイルのスキーマをマージするかどうか。

readerCaseSensitive

true

true, false

rescuedDataColumnが有効になっている場合の大文字小文字の区別動作を指定します。もしそうであれば、スキーマから大文字小文字が異なる名前のデータ列を復元します。偽の場合、大文字小文字を区別せずにデータを読み込む。

rescuedDataColumn

なし

列名文字列

データ型の不一致、スキーマの不一致(列の大文字小文字の区別を含む)などが原因で解析できないすべてのデータを、別の列に収集するかどうか。この列は、 Auto Loader使用するときに自動的に組み込まれます。 詳細については、 「救出されたデータ列とは何ですか?」を参照してください。

COPY INTO (レガシー)は、 COPY INTOを使用してスキーマを手動で設定できないため、救出されたデータ列をサポートしていません。Databricksは、ほとんどのデータ取り込みシナリオにおいてAuto Loaderの使用を推奨しています。

状態ストア

これらのオプションをspark.read.format("statestore")またはread_statestoreテーブル値関数と組み合わせて使用すると、構造化ストリーミング状態データを読み込むことができます。構造化ストリーミングの状態情報の読み取りを参照してください。

Key

デフォルト

有効な値

説明

batchId

最新のバッチID

正の整数または 0

読み取り元のターゲットバッチです。以前のクエリの状態を照会します。バッチはコミットされている必要がありますが、まだクリーンアップされていません。

operatorId

0

正の整数または 0

読み取り元のターゲットオペレーターです。クエリに複数のステートフル演算子がある場合に使用します。

storeName

DEFAULT

任意の文字列

読み取り元のターゲットの状態ストア名。ステートフル演算子に複数の状態ストアインスタンスがある場合に使用します。ストリーム-ストリームJOINには、storeNameまたはjoinSideのいずれかを指定する必要がありますが、両方を指定することはできません。

joinSide

なし

left, right

ストリーム-ストリーム結合における読み取り元のターゲット側。ストリーム-ストリームJOINには、storeNameまたはjoinSideのいずれかを指定する必要がありますが、両方を指定することはできません。

snapshotStartBatchId

なし

正の整数または 0

状態を読み取る際の開始点として使用するスナップショットのバッチ ID です。リーダーは、このスナップショットからの変更をbatchIdまでリプレイすることで、状態を再構築します。スナップショットが破損しているときに役立ちます。snapshotPartitionIdと合わせて指定する必要があります。readChangeFeed とは使用できません。HDFS を基盤とした状態ストアと、変更ログチェックポイントが有効化された RocksDB 状態ストアをサポートします。Databricks Runtime 15.4 LTS 以降で利用可能です。

snapshotPartitionId

なし

正の整数または 0

指定されている場合、クエリはこのパーティションのみを読み込みます。snapshotStartBatchId と一緒に指定する必要があります。readChangeFeedと一緒に使用することはできません。Databricks Runtime 15.4 LTS 以降で利用可能です。

readChangeFeed

false

true, false

true の際に、changeStartBatchId から changeEndBatchId の間の指定された範囲のバッチにわたる状態の変更を返します。changeStartBatchId が必要です。joinSidebatchIdsnapshotStartBatchId、またはsnapshotPartitionIdとは使用できません。Databricks Runtime 16.4 LTS 以降で利用可能です。

詳細については、 「構造化ストリーミングの状態変化の読み取り」を参照してください。

changeStartBatchId

なし

正の整数または 0

変更フィード範囲の開始バッチ ID。readChangeFeedtrueの場合に必須readChangeFeedtrue に設定されている場合にのみ適用されます。Databricks Runtime 16.4 LTS 以降で利用可能です。

changeEndBatchId

最新のバッチID

正の整数または 0

チェンジフィード範囲の最終バッチIDです。changeStartBatchId以上である必要があります。readChangeFeedtrue に設定されている場合にのみ適用されます。Databricks Runtime 16.4 LTS 以降で利用可能です。

stateVarName

なし

任意の文字列

読み込む状態変数名です。状態変数名は、transformWithState演算子によって使用されるStatefulProcessorinit関数内の各変数の固有の名前です。transformWithState演算子を使用する場合に必須です。Databricks Runtime 16.4 LTS 以降で利用可能です。

readRegisteredTimers

false

true, false

trueの場合、transformWithState演算子が使用する登録済みタイマーを読み取ります。transformWithState オペレーターにのみ適用されます。Databricks Runtime 16.4 LTS 以降で利用可能です。

flattenCollectionTypes

true

true, false

true のとき、マップおよびリストの状態変数に対して返されるレコードを平坦化します。false のとき、Spark SQL Array または Map としてレコードを返します。transformWithState オペレーターにのみ適用されます。Databricks Runtime 16.4 LTS 以降で利用可能です。

文章

次のオプションは、テキストファイルの読み取り時に適用されます。

Key

デフォルト

有効な値

説明

encoding

UTF-8

java.nio.charset.Charsetの名前

TEXTファイルのエンコーディングの行区切り文字の名前。ファイルの内容は、このオプションの影響を受けず、そのまま読み込まれます。

lineSep

なし、これは\r\r\nおよび \n

文字列

連続する2つのTEXTレコード間の文字列。

wholeText

false

true, false

ファイルを単一レコードとして読み取るかどうか。

XML

次のオプションは、XML ファイルの読み取り時に適用されます。

Key

デフォルト

有効な値

説明

rowTag

なし

任意の文字列

行として扱うXMLファイルの行タグ。例の XML <book> <page><page>...<book>では、適切な値はpageです。これは必須オプションです。

samplingRatio

1.0

0.0 - 1.0

スキーマ推論に使用される行の割合を定義します。XML 組み込み関数は、このオプションを無視します。

excludeAttribute

false

true, false

要素内の属性を除外するかどうか。

mode

なし

PERMISSIVEDROPMALFORMEDFAILFAST

解析中に破損したレコードを処理するモード

  • PERMISSIVE破損したレコードの場合、columnNameOfCorruptRecord で構成されたフィールドに不正な形式の文字列を格納し、不正な形式のフィールドを null に設定します。破損したレコードを保持するには、ユーザー定義スキーマでcolumnNameOfCorruptRecordという名前のstring型フィールドを設定できます。スキーマにそのフィールドがない場合、解析中に破損したレコードは削除されます。スキーマを推測する際、パーサーは出力スキーマにcolumnNameOfCorruptRecordフィールドを暗黙的に追加します。
  • DROPMALFORMED: 破損したレコードを無視します。このモードは XML 組み込み関数ではサポートされていません。
  • FAILFAST:パーサーが破損したレコードを検出すると、例外をスローします。

inferSchema

true

true, false

trueの場合、結果として得られる各 DataFrame 列に対して適切な型を推測しようとします。falseの場合、結果として得られるすべての列はstring型になります。XMLの組み込み関数はこのオプションを無視します。

columnNameOfCorruptRecord

spark.sql.columnNameOfCorruptRecord

列名文字列

PERMISSIVEモードによって作成された不正な文字列を含む新しいフィールドの名前変更を許可します。

attributePrefix

なし

任意の文字列

属性を要素と区別するための、属性の接頭辞。これはフィールド名の接頭辞になります。デフォルトは_です。XMLの読み取り時には空でも構いませんが、書き込み時には空にすることはできません。DataFrameWriter の XML オプションにも適用されます。

valueTag

_VALUE

任意の文字列

属性または子要素を持つ要素内の文字データに使用されるタグ。ユーザーはスキーマ内でvalueTagフィールドを指定することもできますし、文字データが他の要素や属性を持つ要素内に存在する場合、スキーマ推論中に自動的に追加されます。DataFrameWriter の XML オプションにも適用されます。

encoding

UTF-8

java.nio.charset.Charsetの名前

読み取り時には、指定されたエンコード方式でXMLファイルをデコードします。書き込み時に、保存されるXMLファイルのエンコーディング(文字セット)を指定します。XMLの組み込み関数はこのオプションを無視します。DataFrameWriter の XML オプションにも適用されます。

ignoreSurroundingSpaces

true

true, false

値の周囲の空白をスキップする必要があるかどうか。空白文字のみのデータは無視されます。

rowValidationXSDPath

なし

ファイルパス文字列

各行の XML を個別に検証するために使用される、オプションの XSD ファイルへのパス。検証に失敗した行は、解析エラーのように扱われます。それ以外の場合、XSD は、指定または推論されたスキーマに影響を与えません。

ignoreNamespace

false

true, false

trueの場合、XML 要素および属性の名前空間のプレフィックスは無視されます。例えば、タグ<abc:author><def:author> 、両方とも<author>であるかのように扱われます。rowTag要素では名前空間を無視することはできません。無視できるのはその子要素のみです。XML解析はfalseであっても名前空間を認識しません。

timestampFormat

yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]

タイムスタンプ形式の文字列

datetime パターン形式に従うカスタムタイムスタンプ形式文字列。これはtimestamp型に適用されます。DataFrameWriter の XML オプションにも適用されます。

timestampNTZFormat

yyyy-MM-dd'T'HH:mm:ss[.SSS]

タイムスタンプ形式の文字列

タイムゾーンを含まないタイムスタンプのカスタムフォーマット文字列。datetime パターンフォーマットに従います。これはTimestampNTZType型に適用されます。DataFrameWriter の XML オプションにも適用されます。

dateFormat

yyyy-MM-dd

日付形式文字列

datetime パターン形式に従うカスタム日付形式文字列。これは日付型に適用されます。DataFrameWriter の XML オプションにも適用されます。

locale

en-US

IETF BCP 47形式の言語タグ

IETF BCP 47形式の言語タグとしてロケールを設定します。例えば、 locale日付やタイムスタンプを解析する際に使用されます。

nullValue

string null

任意の文字列

null値の文字列表現を設定します。これがnullの場合、パーサーはフィールドの属性と要素を書き込みません。DataFrameWriter の XML オプションにも適用されます。

readerCaseSensitive

true

true, false

rescuedDataColumn が有効になっている場合の大文字小文字の区別動作を指定します。もしそうであれば、スキーマから大文字小文字が異なる名前のデータ列を復元します。偽の場合、大文字小文字を区別せずにデータを読み込む。

rescuedDataColumn

なし

列名文字列

データ型の不一致やスキーマの不一致(列の大文字小文字の区別を含む)により解析できないすべてのデータを、別の列に収集するかどうか。この列は、 Auto Loader使用するときに自動的に組み込まれます。 詳細については、 「救出されたデータ列とは何ですか?」を参照してください。 COPY INTO (レガシー) は、 COPY INTOを使用してスキーマを手動で設定できないため、救出されたデータ列をサポートしていません。Databricksは、ほとんどのデータ取り込みシナリオにおいてAuto Loaderの使用を推奨しています。

singleVariantColumn

none

列名文字列

単一のバリアント列名を指定します。このオプションが読み込み用に指定されている場合、指定されたオプション文字列の値を列名として、XMLレコード全体を単一のバリアント列に解析します。書き込み用にこのオプションが指定されている場合、単一バリアント列の値をXMLファイルに書き込みます。データフレームWriter の XML オプションにも適用されます。

useLegacyXMLParser

true

true, false

従来のXMLパーサーを使用するかどうか。従来のパーサーは、不正なコンテンツに対する検証が緩い反面、メモリ効率が劣ります。より厳格なデフォルトパーサーを有効にするには、 falseに設定してください。

wildcardColName

xs_any

列名文字列

ワイルドカード( xs:any )スキーマ要素に一致するXML要素をキャプチャするために使用される列名。rescuedDataColumnと併用することはできません。

DataStreamReaderのオプション

これらのオプションをDataStreamReader.option()と組み合わせて使用すると、Delta Lakeテーブルやその他のファイルベースのソースからのストリーミング読み取りを設定できます。

ファイル形式のオプション(JSON、CSV、Parquetなど)については、 DataFrameReaderのオプションを参照してください。

Auto Loader ( cloudFiles.* )のオプションについては、 Auto Loader参照してください。

次の例では、 Delta Lakeテーブル ストリームのmaxFilesPerTrigger10に設定します。

Python
df = spark.readStream.format("delta").option("maxFilesPerTrigger", 10).load("/path/to/delta-table")

一般

以下のオプションは、Delta Lakeテーブルおよびその他のファイルベースのストリーミングソースに適用されます。

Key

デフォルト

有効な値

説明

cleanSource

off

offdeletearchive

ストリームで処理されたソースファイルの取り扱い方法。off 操作を行いません。delete がソースファイルを完全に削除します。archive はファイルを sourceArchiveDir に移動します。archiveに設定されている場合、sourceArchiveDirも設定する必要があります。Delta Lake テーブル ストリーミングには適用されません。

fileNameOnly

false

true, false

既に処理済みのファイルを、フルパスではなくファイル名のみで識別するかどうか。trueの場合、異なるパスにある同じファイル名のファイルは同じファイルとして扱われ、再処理されません。Delta Lakeテーブル ストリーミングには適用されません。

latestFirst

false

true, false

各マイクロバッチ内で、最も最近変更されたファイルを最初に処理するかどうか。最新のデータをできるだけ迅速に処理したい場合に役立ちます。truemaxFilesPerTriggerまたはmaxBytesPerTriggerが設定されている場合、 maxFileAgeは無視されます。Delta Lakeテーブル ストリーミングには適用されません。

maxBytesPerTrigger

なし

正の整数

各マイクロバッチで処理されるデータの量の実質的な上限。最小の入力単位が制限を超える場合、バッチは制限を超える量を処理する可能性があります。maxFilesPerTriggerと一緒に使用すると、マイクロバッチはいずれかの制限に最初に到達するまでデータを処理します。

Auto Loaderの場合は、代わりにcloudFiles.maxBytesPerTriggerを使用してください。 共通項を参照。

maxCachedFiles

10000

正の整数または 0

後続のマイクロバッチ用にキャッシュする未処理ファイルの最大数0に設定すると、キャッシュをオフにできます。各トリガーでソースディレクトリに大量の新しいファイルが含まれる場合は、この値を大きくしてください。Delta Lake テーブル ストリーミングには適用されません。

maxFileAge

7d

期間を表す文字列。例:7d または 4h

現在のシステム時刻ではなく、最後に変更されたファイルのタイムスタンプを基準とした、処理対象ファイルの最大期間。このしきい値より古いファイルは無視されます。latestFirsttrue であり、maxFilesPerTrigger または maxBytesPerTrigger が設定されている場合、無視されます。Delta Lake テーブル ストリーミングには適用されません。

maxFilesPerTrigger

1000 Delta LakeとAuto Loader向け。 その他のファイルベースのソースには上限はありません。

正の整数

各マイクロバッチで処理される新しいファイルの数の上限maxBytesPerTriggerと一緒に使用すると、マイクロバッチはいずれかの制限に最初に到達するまでデータを処理します。

Auto Loaderの場合は、代わりにcloudFiles.maxFilesPerTriggerを使用してください。 共通項を参照。

sourceArchiveDir

なし

パス文字列

cleanSourcearchiveに設定されている場合のアーカイブディレクトリへのパス。ソースファイルは処理後、元のディレクトリ構造を維持したままこのパスに移動されます。Delta Lakeテーブル ストリーミングには適用されません。

Auto Loader

これらのオプションをcloudFilesソースと組み合わせて使用すると、クラウドストレージからのストリーミング取り込み用にAuto Loaderを設定できます。cloudFilesソースに固有のオプションには、他の構造化ストリーミングソースオプションとは別の名前空間に保持するために、 cloudFilesという接頭辞が付けられます。

一般

次のオプションは、すべてのAuto Loader構成に適用されます。

Key

デフォルト

有効な値

説明

cloudFiles.allowOverwrites

false

true, false

入力ディレクトリファイルの変更が既存のデータを上書きすることを許可するかどうか。

設定上の注意点については、 「ファイルが追加または上書きされた場合、Auto Loader はファイルを再度処理しますか?」を参照してください。

cloudFiles.backfillInterval

なし

期間を表す文字列。例:1 day または 1 week

Auto Loader は、特定の間隔で非同期バックフィルをトリガーできます。詳細については、「cloudFiles.backfillInterval を使用して定期的なバックフィルをトリガーする」を参照してください。

cloudFiles.useManagedFileEventstrueに設定されている場合は使用しないでください。

cloudFiles.cleanSource

OFF

OFFDELETEMOVE

入力ディレクトリから処理されたファイルを自動的に削除または移動するかどうか。OFFに設定した場合(デフォルト)、ファイルは削除されません。

DELETEに設定すると、Auto Loaderは処理後30日後にファイルを自動的に削除します。これを行うには、Auto Loader がソースディレクトリへの書き込み権限を持っている必要があります。

MOVEに設定すると、Auto Loaderは処理後cloudFiles.cleanSource.moveDestination日後に指定された場所にファイルを自動的に移動します。これを行うには、Auto Loader がソースディレクトリと移動先ディレクトリの両方に対して書き込み権限を持っている必要があります。

ファイルは、 cloud_files_stateテーブル値関数の結果のcommit_timeに null 以外の値がある場合に処理済みとみなされます。cloud_files_stateテーブル値関数を参照してください。処理後の30日間の追加待機はcloudFiles.cleanSource.retentionDurationを使用して設定できます。

cloudFiles.cleanSourceを有効にする前に、以下の事項を確認してください。

  • ソースの場所からデータを消費するストリームが複数ある場合、 Databricksこのオプションの使用をお勧めしません。これは、最速のコンシューマーがファイルを削除し、遅いソースには取り込まれないためです。
  • この機能を有効にするには、Auto Loader がチェックポイントに追加の状態を保持する必要があり、パフォーマンスのオーバーヘッドが発生しますが、 cloud_files_stateテーブル値関数を通じて監視性を向上させることができます。cloud_files_stateテーブル値関数を参照してください。
  • cleanSource 現在の設定を使用して、特定のファイルをMOVEまたはDELETEするかどうかを決定します。例えば、ファイルが最初に処理されたときの設定がMOVEだったが、30日後にファイルがクリーンアップの対象になったときにDELETEに変更されたとします。この場合、cleanSourceはファイルを削除します。
  • retentionDuration有効期限が切れた直後にファイルが削除されるとは限りません。コストを抑えるため、Auto Loaderはストリーム処理と同時にファイルを削除し、ストリーム処理が完了または終了するとすぐに終了します。ストリーム処理中にクリーンアップの対象となったものの、クリーンアップできなかったファイルは、次回のAuto Loader実行時に処理されます。

Databricks Runtime 16.4以降で利用可能です。

cloudFiles.cleanSource.retentionDuration

30 days

CalendarInterval のような文字列(14 days2 weeks など) 1 month

処理されたファイルがアーカイブ候補になるまでの待機時間( cleanSource )。DELETEの場合は 7 日以上である必要があります。MOVEには最低制限はありません。

Databricks Runtime 16.4以降で利用可能です。

cloudFiles.cleanSource.moveDestination

なし

クラウドストレージまたはUnity Catalogボリュームパス

cloudFiles.cleanSourceMOVEに設定された場合の処理済みファイルのアーカイブ先パス。これはクラウドストレージパスまたはUnity Catalogボリュームパス(例: /Volumes/my_catalog/my_schema/my_volume/archive/ )です。

移転先は以下の条件を満たす必要があります。

  • ソースディレクトリの子ディレクトリであってはならない。移動先をソースディレクトリ内に配置した場合、アーカイブされたファイルが再度取り込まれます。
  • ソースと同じ外部ロケーション、ボリューム、またはDBFSマウント上に配置してください。バケット間およびコンテナ間の移動はサポートされておらず、エラーが発生します。

Auto Loaderには、このディレクトリへの書き込み権限が必要です。

Databricks Runtime 16.4以降で利用可能です。

cloudFiles.format

なし(必須オプション)

avrobinaryFilecsvjsonorcparquettextxml

ソースパス内のデータファイル形式。有効な値は次のとおりです。

cloudFiles.includeExistingFiles

true

true, false

ストリーム処理入力パスに既存のファイルを含めるか、初期セットアップ後に到着する新しいファイルのみを処理するかどうか。このオプションは、初めてストリームを開始するときにのみ評価されます。ストリームの再開後にこのオプションを変更しても効果はありません。

cloudFiles.inferColumnTypes

false

true, false

スキーマ推論を利用する際に、正確な列型を推論するかどうか。安心により、 JSONやCSVデータセットを推論する際、列は文字列として推論されます。 詳細については、スキーマ推論を参照してください。

cloudFiles.maxBytesPerTrigger

なし

バイトを表す文字列。例: 10g

各トリガーで処理される新しいバイトの最大数。これはソフトマキシマムです。3GBのファイルがある場合、Databricksはマイクロバッチで12GBを処理します。cloudFiles.maxFilesPerTriggerと一緒に使用すると、DatabricksはcloudFiles.maxFilesPerTriggerまたはcloudFiles.maxBytesPerTriggerのどちらか先に到達した方の下限まで消費します。このオプションは、Trigger.Once()Trigger.Once()は非推奨です)と一緒に使用すると効果がありません。

Databricks Runtime 18.0以降では、このオプションは動的に構成されるため、手動で設定する必要はありません。

cloudFiles.maxFileAge

なし

期間を表す文字列

ファイルイベントが重複排除の目的で追跡される期間。Databricks 、1 時間あたり数百万ファイル程度のデータを取り込んでいる場合を除き、この設定を調整することを推奨しません。 詳細については、 「ファイルイベント追跡」のセクションを参照してください。

cloudFiles.maxFileAge過度に調整すると、重複したデータの取り込みやファイルの欠落など、データ品質の問題が発生する可能性があります。そのため、DatabricksはcloudFiles.maxFileAgeに対して90日などの保守的な設定を推奨しており、これは類似のデータ取り込みソリューションが推奨しているものと同様です。

cloudFiles.maxFilesPerTrigger

1000

正の整数

各トリガーで処理される新しいファイルの最大数。cloudFiles.maxBytesPerTriggerと一緒に使用すると、DatabricksはcloudFiles.maxFilesPerTriggerまたはcloudFiles.maxBytesPerTriggerのどちらか先に到達した方の下限まで消費します。このオプションは、Trigger.Once()(非推奨)と一緒に使用すると効果がありません。

Databricks Runtime 18.0以降では、このオプションは動的に構成されるため、手動で設定する必要はありません。

cloudFiles.partitionColumns

なし

列名のコンマ区切りリスト

ファイルのディレクトリ構造から推測したいHiveスタイルのパーティション列のコンマ区切りのリスト。Hiveスタイルのパーティション列は、<base-path>/a=x/b=1/c=y/file.formatなどの等号で結合されたキーと値のペアです。この例では、パーティション列はab、およびcです。デフォルトでは、スキーマ推論を使用し、データをロードする<base-path>を指定している場合、これらの列は自動的にスキーマに追加されます。スキーマを指定すると、Auto Loader はこれらの列がスキーマに含まれることを期待します。これらの列をスキーマの一部にしたくない場合は、"" を指定してこれらの列を無視できます。さらに、次の例のように、複雑なディレクトリ構造で列のファイル パスを推測する場合に、このオプションを使用できます。

<base-path>/year=2022/week=1/file1.csv <base-path>/year=2022/month=2/day=3/file2.csv <base-path>/year=2022/month=2/day=4/file3.csv

cloudFiles.partitionColumns year,month,dayとして指定すると、 file1.csvに対してyear=2022が返されますが、 monthday列はnullになります。

month file2.csvfile3.csvについては、 dayは正しく解析されます。

cloudFiles.schemaEvolutionMode

addNewColumns スキーマが指定されていない場合、それ以外の場合はnone

addNewColumnsnonerescuefailOnNewColumns

データ内に新しい列が発見された際に、スキーマを進化させるためのモード。安心により、 JSONデータセットを推論する際、列は文字列として推論されます。 詳細については、 「スキーマの進化」を参照してください。

cloudFiles.schemaHints

なし

スキーマ文字列

スキーマ推論中にAuto Loaderに提供するスキーマ情報。詳細については、スキーマのヒントを参照してください。

cloudFiles.schemaLocation

なし(スキーマ推論に必須)

パス文字列

推論されたスキーマとその後の変更を保存する場所。詳細については、スキーマ推論を参照してください。

cloudFiles.useStrictGlobber

false

true, false

Apache Sparkの他のファイル ソースの完全なグロビング動作と一致する厳密なglobberを使用するかどうか。 詳細については、 「一般的なデータ読み込みパターン」を参照してください。Databricks Runtime 12.2 LTS以降で利用可能です。

cloudFiles.validateOptions

true

true, false

Auto Loaderオプションを検証し、不明なオプションまたは一貫性のないオプションに対してエラーを返すかどうか。

ディレクトリ一覧

以下のオプションは、ディレクトリリストモードを使用する際に適用されます。

Key

デフォルト

有効な値

説明

cloudFiles.useIncrementalListing (非推奨)

auto Databricks Runtime 17.2以前のバージョンではfalse、 Databricks Runtime 17.3以降のバージョンでは

autotruefalse

この機能は廃止されました。Databricksは、 cloudFiles.useIncrementalListingの代わりにファイルイベントでファイル通知モードを使用することを推奨しています。

ディレクトリ一覧表示モードで、完全な一覧表示ではなく、増分一覧表示を使用するかどうか。デフォルトでは、Auto Loader は指定されたディレクトリが増分リストの対象であるかどうかを自動的に検出するよう最大限の努力を払います。明示的に増分リストを使用するか、ディレクトリ全体のリストを使用するかは、それぞれtrueまたはfalseに設定することで選択できます。

辞書順に並んでいないディレクトリでインクリメンタルリストを誤って有効にすると、 Auto Loader新しいファイルを検出できなくなります。

Azureデータレイク ストレージ ( abfss:// )、 S3 ( s3:// )、およびGCS ( gs:// ) で動作します。

Databricks Runtime 9.1 LTS 以降で利用可能です。

ファイル通知

必要なクラウド権限、セットアップ手順、認証方法など、ファイル通知モードの構成については、 「 ファイル通知モードでのAuto Loaderストリームの構成 」を参照してください。

Key

デフォルト

有効な値

説明

cloudFiles.fetchParallelism

1

正の整数

キューイングサービスからメッセージを取得するときに使用するスレッドの数。

cloudFiles.useManagedFileEventstrueに設定されている場合は使用しないでください。

cloudFiles.pathRewrites

なし

JSON マップ文字列

複数のS3バケットからファイル通知を受け取るqueueUrlを指定し、これらのコンテナ内のデータにアクセスするために構成されたマウントポイントを使用する場合にのみ必要です。このオプションを使用すると、 bucket/keyパスのプレフィックスをマウントポイントで書き換えることができます。書き換え可能なのは接頭辞のみです。例えば、構成{"<databricks-mounted-bucket>/path": "dbfs:/mnt/data-warehouse"}の場合、パスs3://<databricks-mounted-bucket>/path/2017/08/fileA.jsondbfs:/mnt/data-warehouse/2017/08/fileA.jsonに書き換えられます。

cloudFiles.useManagedFileEventstrueに設定されている場合は使用しないでください。

cloudFiles.resourceTag

なし

キーと値タグ文字列

関連リソースの関連付けと識別に役立つ一連のキーと値のタグのペア。次に例を示します。

cloudFiles.option("cloudFiles.resourceTag.myFirstKey", "myFirstValue") .option("cloudFiles.resourceTag.mySecondKey", "mySecondValue")

cloudFiles.useManagedFileEventstrueに設定されている場合は使用しないでください。代わりに、クラウドプロバイダーのコンソールを使用してリソースタグを設定してください。

詳細については、「クラウドプロバイダーのリソースタグ」を参照してください。

cloudFiles.useManagedFileEvents

false

true, false

trueに設定すると、Auto Loaderはファイルイベントサービスを使用して、外部ロケーションにあるファイルを検出します。このオプションは、ロード パスが外部ロケーションにあり、ファイル イベントが有効になっている場合にのみ使用できます。 ファイルイベントでファイル通知モードを使用する方法については、「ファイルイベントでファイル通知モードを使用する」を参照してください。

ファイルイベントは、ファイル検出において通知レベルのパフォーマンスを提供します。これは、Auto Loaderが前回の実行後に新しいファイルを検出できるためです。ディレクトリ一覧表示とは異なり、この処理ではディレクトリ内のすべてのファイルを一覧表示する必要はありません。

ファイルイベントオプションが有効になっている場合でも、Auto Loaderがディレクトリ一覧表示を使用する状況がいくつかあります。

  • 初期ロード時に、 includeExistingFilestrueに設定されると、ディレクトリ全体のリストが取得され、Auto Loader が起動する前にディレクトリ内に存在していたすべてのファイルが検出されます。
  • ファイルイベントサービスは、最近作成されたファイルをキャッシュすることで、ファイルの検出を最適化します。Auto Loader の実行頻度が低い場合、このキャッシュは期限切れになる可能性があり、その場合、Auto Loader はディレクトリ一覧表示にフォールバックしてファイルを検出し、キャッシュを更新します。この事態を避けるため、少なくとも7日に1回はAuto Loaderを起動してください。

ファイルイベントを使用するAuto Loader 、いつディレクトリ一覧を使用しますか?を参照してください。 このオプションを使用して Auto Loader がディレクトリ一覧を表示する状況の包括的なリストについては、こちらをご覧ください。

Databricks Runtime 14.3 LTS以降で利用可能です。

cloudFiles.listOnStart

false

true, false

trueに設定すると、Auto Loader はチェックポイント内の継続トークンから開始する代わりに、ストリームの開始時にディレクトリ全体のリストを実行します。このオプションを使用して、 CF_MANAGED_FILE_EVENTS_INVALID_CONTINUATION_TOKENなどのエラーから回復します。CF_MANAGED_FILE_EVENTS_INVALID_CONTINUATION_TOKENエラーから復旧するにはどうすればよいですか?」を参照してください。

cloudFiles.useNotifications

false

true, false

新しいファイルが存在することを通知するために、ファイル通知モードを使用するかどうか。falseの場合は、ディレクトリ一覧表示モードを使用します。Auto Loaderファイル検出モードの比較を参照してください。

cloudFiles.useManagedFileEventstrueに設定されている場合は使用しないでください。

クラウド プロバイダー リソース タグ

Auto Loaderは、デフォルトではベストエフォート方式で次のキーと値のタグのペアを追加します。

  • vendor: Databricks
  • path: データが読み込まれる場所。ラベル付けの制限のため、GCPでは使用できません。
  • checkpointLocation:ストリームのチェックポイントの位置。表示上の制限により、GCP(医薬品臨床試験の実施に関する基準)では利用できません。
  • streamId: ストリームのグローバル一意識別子。

Databricksはこれらのキー名を予約しており、その値を上書きすることはできません。

AWSの詳細については、Amazon SQSコスト割り当てタグおよびAmazon SNSトピックのタグの設定を参照してください。

クラウド固有の

Auto Loader には、ファイル通知モード向けにクラウドインフラを構成するためのオプションが用意されています。必要なクラウド権限とセットアップ手順については、「ファイル通知モードでの Auto Loader ストリームの構成」を参照してください。

AWS

cloudFiles.useNotifications = trueを選択し、Auto Loaderで通知サービスを設定する場合にのみ、次のオプションを指定します。

Key

デフォルト

有効な値

説明

cloudFiles.region

EC2インスタンスのリージョン

AWSリージョン文字列

ソースS3バケットが存在し、 AWS SNS および SQS サービスを作成するリージョン。

Key

デフォルト

有効な値

説明

cloudFiles.restrictNotificationSetupToSameAWSAccountId

false

true, false

SNSトピックと同じアカウント内のAWS S3バケットからのイベント通知のみを許可する。この設定が有効な場合、Auto Loader は SNS トピックと同じアカウント内の AWS S3 バケットからのイベント通知のみを受け入れます。

falseの場合、アクセス ポリシーはアカウント間バケットと SNS トピックの設定を制限しません。これは、SNSのトピックとバケットパスが異なるアカウントに関連付けられている場合に役立ちます。

Databricks Runtime 17.2以降で利用可能です。

cloudFiles.useNotifications = trueを選択し、すでに設定したキューをAuto Loaderで使用する場合にのみ、次のオプションを指定します。

Key

デフォルト

有効な値

説明

cloudFiles.queueUrl

なし

URL 文字列

SQSキューのURL。指定された場合、Auto Loaderは独自のAWS SNSとSQSサービスをセットアップする代わりに、このキューから直接イベントを消費します。

AWS認証オプション

Databricksサービス資格情報を使用するには、次の認証オプションを指定してください。

Key

デフォルト

有効な値

説明

databricks.serviceCredential

なし

任意の文字列

Databricksサービスの認証情報の名前。Databricks Runtime 16.1以降で利用可能です。

Databricksサービス資格情報またはIAMロールが利用できない場合、代わりに、次の認証オプションを指定できます。

Key

デフォルト

有効な値

説明

cloudFiles.awsAccessKey

なし

任意の文字列

ユーザーのAWSアクセスキーID。cloudFiles.awsSecretKeyで指定する必要があります。

cloudFiles.awsSecretKey

なし

任意の文字列

ユーザーのAWSシークレットアクセスキー。cloudFiles.awsAccessKeyで指定する必要があります。

cloudFiles.roleArn

なし

ARN文字列

ARNIAM必要に応じて引き受ける ロールの 。このロールは、クラスターのインスタンスプロファイルから、または cloudFiles.awsAccessKeycloudFiles.awsSecretKeyで資格情報を提供することで引き受けることができます。

cloudFiles.roleExternalId

なし

任意の文字列

cloudFiles.roleArnを使用してロールを引き受けるときに使用する識別子。

cloudFiles.roleSessionName

なし

任意の文字列

cloudFiles.roleArnを使用してロールを引き受ける際に使用するオプションのセッション名。

cloudFiles.stsEndpoint

なし

URL 文字列

cloudFiles.roleArn を使用してロールを引き受けるときに AWS STS にアクセスするために使用するオプションのエンドポイント

Azure

cloudFiles.useNotifications = trueを指定し、Auto Loaderに通知サービスを設定させる場合は、次のすべてのオプションに値を指定する必要があります。

Key

デフォルト

有効な値

説明

cloudFiles.resourceGroup

なし

任意の文字列

ストレージ アカウントが作成されるAzureリソース グループ。

cloudFiles.subscriptionId

なし

任意の文字列

リソースグループが作成されたAzureサブスクリプションID。

databricks.serviceCredential

なし

任意の文字列

Databricksサービスの認証情報の名前。Databricks Runtime 16.1以降で利用可能です。

Databricks サービスの認証情報が利用できない場合、次の認証オプションを指定できます。

Key

デフォルト

有効な値

説明

cloudFiles.clientId

なし

任意の文字列

Databricksサービスプリンシパルのクライアント ID またはアプリケーション ID。

cloudFiles.clientSecret

なし

任意の文字列

Databricksサービスプリンシパルのクライアント シークレット。

cloudFiles.connectionString

なし

接続文字列

アカウントアクセスキーあるいは共有アクセス署名(SAS)に基づく、ストレージアカウントの接続文字列。

cloudFiles.tenantId

なし

任意の文字列

Databricksサービスプリンシパルが作成されるAzureテナントID。

cloudFiles.useNotifications = trueを選択し、Auto Loaderで既存のキューを使用する場合にのみ、次のオプションを指定します。

Key

デフォルト

有効な値

説明

cloudFiles.queueName

なし

任意の文字列

Azureキューの名前。指定されている場合、クラウドファイルソースは、独自のAzure Event Gridサービスとキューストレージサービスを設定する代わりに、このキューからイベントを直接消費します。その場合、databricks.serviceCredentialまたはcloudFiles.connectionStringにはキューに対する読み取り権限のみが必要です。

GCP

Auto Loader Databricksサービスの資格情報を利用して、通知サービスを自動的にセットアップできます。 Databricks サービス認証情報を使用して作成されたサービス アカウントには、ファイル通知モードでの Auto Loader ストリームの構成で指定された権限が必要です。

Key

デフォルト

有効な値

説明

cloudFiles.projectId

なし

任意の文字列

GCSバケットが存在するプロジェクトのID。Google クラウド Pub/Sub サブスクリプションもこのプロジェクト内で作成されます。

databricks.serviceCredential

なし

任意の文字列

Databricksサービスの認証情報の名前。Databricks Runtime 16.1以降で利用可能です。

Databricks サービスの認証情報を利用できない場合は、Google サービス アカウントを直接使用できます。Google サービスのセットアップに従ってクラスターをサービス アカウントとして構成するか、次の認証オプションを指定できます。

Key

デフォルト

有効な値

説明

cloudFiles.client

なし

任意の文字列

GoogleサービスアカウントのクライアントID。

cloudFiles.clientEmail

なし

メールアドレス文字列

Googleサービスアカウントのメールアドレス。

cloudFiles.privateKey

なし

プライベートキー文字列

Google サービス アカウント用に生成される秘密キー。

cloudFiles.privateKeyId

なし

任意の文字列

Google サービス アカウント用に生成された秘密キーの ID。

cloudFiles.useNotifications = trueを選択し、すでに設定したキューをAuto Loaderで使用する場合にのみ、次のオプションを指定します。

Key

デフォルト

有効な値

説明

cloudFiles.subscription

なし

任意の文字列

Google Cloud Pub/Subサブスクリプションの名前。指定されている場合、クラウドファイルソースは独自のGCS通知サービスとGoogle Cloud Pub/Subサービスを設定する代わりに、このキューからのイベントを消費します。

Delta Lake

spark.readStreamを使用して Delta Lake テーブルから読み込む場合、以下のオプションが適用されます。

Key

デフォルト

有効な値

説明

allowSourceColumnDrop

なし

バージョン番号または always

Deltaテーブルのバージョン番号またはalwaysに設定すると、ソース テーブル スキーマから列が削除された後もストリームが継続できるようになります。 バージョン番号を設定すると、そのバージョンまでのすべてのスキーマ変更を認識します。必要条件: schemaTrackingLocationDelta Lakeの列マッピングを使用した列名の変更と削除を参照してください。

allowSourceColumnRename

なし

バージョン番号または always

Deltaテーブルのバージョン番号またはalwaysに設定すると、ソース テーブルで列の名前が変更された後もストリームが継続できるようになります。 バージョン番号を設定すると、そのバージョンまでのすべてのスキーマ変更を認識します。必要条件: schemaTrackingLocationDelta Lakeの列マッピングを使用した列名の変更と削除を参照してください。

allowSourceColumnTypeChange

なし

バージョン番号または always

Deltaテーブルのバージョン番号またはalwaysに設定すると、ソース テーブルで列の型が変更された後もストリームが継続できるようになります。 バージョン番号を設定すると、そのバージョンまでのすべてのスキーマ変更を認識します。必要条件: schemaTrackingLocation型の拡張を参照してください。

excludeRegex

なし

Javaの正規表現文字列

正規表現パターン。パスがパターンに一致するファイルは、ストリーミング読み取りから除外されます。想定される命名規則に準拠していないファイルを除外するのに役立ちます。

failOnDataLoss

true

true, false

ログ保持( logRetentionDuration )によりソースデータが削除された場合、ストリーミングクエリを失敗させるかどうか。欠損データをスキップして処理を続行するには、 falseに設定してください。タイムトラベルクエリのデータ保持設定については、「タイムトラベルクエリのデータ保持設定」を参照してください。

ignoreChanges (非推奨)

false

true, false

Databricks Runtime 11.3 LTS以前のバージョンで利用可能です。UPDATEMERGE INTODELETEOVERWRITEなどの変更操作後に書き換えられたデータファイルを再出力します。変更されていない行が新しい行と同時に出力される可能性があるため、下流のコンシューマーは重複を処理する必要があります。削除操作は下流には伝播されません。Databricks Runtime 12.2 LTS以降ではskipChangeCommitsに置き換えられました。

ignoreDeletes (非推奨)

false

true, false

パーティション境界でデータを削除するトランザクション(パーティション全体の削除のみ)は無視します。パーティション以外の削除、更新、その他の変更には対応していません。代わりにskipChangeCommits使用してください。

readChangeFeed または readChangeData

false

true, false

ストリーミングクエリのチェンジデータフィードの読み取りを有効にするかどうか。有効にすると、ストリームは、追加のメタデータ列を伴う行レベルの変更(挿入、更新、および削除)を出力します。Databricksでのチェンジデータフィードの使用 を参照してください。

schemaTrackingLocation

なし

パス文字列

Delta Lakeがストリーミング読み取りのためのスキーマ変更を追跡するディレクトリへのパス。列マッピングが有効になっているテーブルからストリーミングし、スキーマ進化を処理するためにallowSourceColumn*オプションを使用する場合に必要です。 ストリーミングクエリのcheckpointLocation範囲内である必要があります。Delta Lakeの列マッピングを使用した列名の変更と削除を参照してください。

skipChangeCommits

false

true, false

既存のレコードを削除または変更するトランザクションは無視し、追加のみを処理します。Databricks 、変更データフィードを使用しないほとんどのワークロードにこのオプションを推奨します。 Databricks Runtime 12.2 LTS以降で利用可能です。skipChangeCommitsを使用してアップストリームの変更コミットをスキップする」を参照してください。

startingTimestamp

最新の情報

タイムスタンプ文字列(例:2019-01-01T00:00:00.000Z)または日付文字列(例: 2019-01-01

読み取りを開始するタイムスタンプ。このストリームは、指定されたタイムスタンプ以降にコミットされたすべてのテーブル変更を読み取ります。タイムスタンプがすべての利用可能なテーブル コミットよりも前にある場合、ストリームは利用可能な最も早いコミットから開始されます。 startingVersionと併用することはできません。ストリーミングチェックポイントが既に存在する場合は無視されます。

startingVersion

最新の情報

正の整数、0、または latest

Deltaテーブルのバージョンを読み取り開始します。 このストリームは、指定されたバージョン以降にコミットされたすべての変更を読み取ります。最新の変更のみから開始するには、 latest指定してください。startingTimestampと併用することはできません。ストリーミングチェックポイントが既に存在する場合は無視されます。テーブル履歴の操作を参照してください。

withEventTimeOrder

false

true, false

初期テーブルスナップショットをイベント時間バケットに分割することで、レコードが誤って遅延イベントとしてマークされ、ウォーターマーク付きのステートフルクエリで削除されるのを防ぎます。初期スナップショット処理が開始された後は、チェックポイントを削除しない限り変更できません。Databricks Runtime 11.3 LTS以降で利用可能です。データを削除せずに初期スナップショットを処理する方法については、「データを削除せずに処理する」を参照してください。

Kafka

これらのオプションはspark.readStream.format("kafka")またはspark.read.format("kafka")のいずれかと組み合わせて使用してください。

Key

デフォルト

有効な値

説明

assign

なし

JSON文字列など {"topicA":[0,1],"topicB":[2,4]}

コンシュームする特定のパーティションです。subscribesubscribePattern、またはassignのオプションのうち、きっかり1つを指定する必要があります。

failOnDataLoss

true

true, false

データが失われた可能性がある場合、例えば、削除されたトピックやオフセットの切り捨てが原因で、クエリーを失敗させるかどうか。false に設定して、欠落しているデータをスキップし、続行します。

Databricksは、データが失われた可能性について控えめな見積もりを行っています。しかし、これは誤報を引き起こす可能性がある。

fetchoffset.numretries

3

正の整数または 0

Kafka オフセットの取得に失敗した場合の再試行回数。

fetchoffset.retryintervalms

1000

正の整数または 0

オフセットフェッチの再試行間のミリ秒単位の間隔。

groupIdPrefix

spark-kafka-source (ストリーミング)、 spark-kafka-relation (バッチ)

任意の文字列

自動生成された Kafka コンシューマーグループの ID に使用するカスタマイズされたプレフィックス。kafka.group.id が明示的に設定されている場合、コネクタはこのオプションを無視します。

kafka.group.id

なし

任意の文字列

読み取りに使用する Kafka コンシューマーグループの ID。注意:同じグループ ID を共有するクエリは相互に干渉し、データの一部のみを読み取る可能性があります。これは、並列バッチおよびストリーミングワークロードを実行している場合、またはクエリを素早く再起動している場合に発生する可能性があります。設定されている場合、groupIdPrefix は無視されます。問題を最小限に抑えるには、Kafka コンシューマー構成 session.timeout.ms を小さい値に設定します。

includeHeaders

false

true, false

出力に Kafka メッセージヘッダーを列として含めるかどうか。

kafkaconsumer.polltimeoutms

なし

正の整数

Kafka コンシューマー poll() 呼び出しのミリ秒単位のタイムアウトです。

kafka.bootstrap.servers

なし

カンマ区切りの host:port 文字列のリスト

ホストのカンマ区切りリスト Kafkaブローカーのアドレス。Kafkaクライアントのbootstrap.serversプロパティを設定します。

Kafkaからデータが取得できない場合は、このブローカーアドレスリストを確認して、アドレスに誤りがないか確認してください。ブローカーのアドレスリストが間違っている場合、エラーが発生しない可能性があります。Kafkaクライアントは、ブローカーがいずれ利用可能になると想定し、ネットワークエラーが発生した場合は永久に再試行します。

maxRecordsPerPartition

なし

正の整数

各Sparkパーティションの最大レコード数です。設定されている場合、コネクタはKafkaパーティションを分割し、各Sparkパーティションが読み込むレコードの最大数がこの値になるようにします。

このオプションはminPartitionsでも使用できます。両方のオプションが設定されている場合、Sparkはより多くのパーティションが生成される方のオプションを使用します。

minPartitions

なし

正の整数

Kafka から読み取る Spark パーティションの最小数設定すると、コネクタは大きな Kafka パーティションを分割して並列処理を向上させます。設定されていない場合、Spark は各 Kafka トピックパーティションに対して 1 つのパーティションを作成します。データスキューまたはピーク負荷の処理に役立ちます。

このオプションは、トリガーごとにKafkaコンシューマーを再初期化するため、SSL使用時のパフォーマンスに影響を与える可能性があります。

startingOffsets

latest (ストリーミング)、 earliest (バッチ)

earliestlatest、またはJSONオフセット文字列

クエリが読み取りを開始するオフセット。JSON 文字列では、-1 が最新のオフセットです。-2 は最も古いオフセットです。例:{"topicA":{"0":23,"1":-2}}

ストリーミングクエリの場合、このオプションは新しいクエリが開始されたときにのみ適用されます。再開されたクエリは常にチェックポイントを使用します。クエリ実行中、新しいパーティションは最も早いオフセットから読み取りを開始します。

バッチクエリの場合、 latestは使用できません。

startingOffsetsByTimestamp

なし

JSON タイムスタンプ文字列のような {"topicA":{"0":1000,"1":2000}}

ミリ秒単位のタイムスタンプとして指定される、各パーティションの開始オフセットのリストです。タイムスタンプにオフセットが存在しない場合、クエリの動作はstartingOffsetsByTimestampStrategyによって決定されます。

ストリーミングクエリの場合、このオプションは新しいクエリが開始されたときにのみ適用されます。再開されたクエリは常にチェックポイントを使用します。クエリ実行中、新しいパーティションは最も早いオフセットから読み取りを開始します。

startingOffsetsByTimestampStrategy

error

error, latest

startingOffsetsByTimestampまたはstartingTimestampで指定されたタイムスタンプのオフセットが見つからない場合に使用する戦略。error が例外を発生させます。latest は利用可能な最新のオフセットを使用します。

startingTimestamp

なし

正の整数または 0

すべてのパーティションに適用されるグローバルな開始タイムスタンプ (ミリ秒単位)。タイムスタンプにオフセットが存在しない場合、動作はstartingOffsetsByTimestampStrategyによって制御されます。

subscribe

なし

トピック名のコンマ区切りリスト

サブスクライブするトピックです。subscribesubscribePattern、またはassignのオプションのうち、きっかり1つを指定する必要があります。

subscribePattern

なし

Javaの正規表現文字列

トピックをサブスクライブするのに使われるパターンです。subscribesubscribePattern、またはassignのオプションのうち、きっかり1つを指定する必要があります。たとえば、topic.*です。

以下のオプションは、 spark.readStream.format("kafka")のストリーミング読み取りにのみ適用されます。

Key

デフォルト

有効な値

説明

bytesEstimateWindowLength

300s

期間を表す文字列(10m など)、または 600s

estimatedTotalBytesBehindLatest メトリクスの残りのバイトを推定するために使用される時間枠。Kafka メトリクスの取得を参照してください。

maxOffsetsPerTrigger

なし

正の整数

トリガー間隔ごとに処理されるオフセットの最大数。オフセットはトピックパーティション間で比例して分配されます。

maxTriggerDelay

15m

期間を表す文字列(10m など)、または 600s

minOffsetsPerTriggerが蓄積されるまでに待機する最大時間です。

minOffsetsPerTrigger

なし

正の整数

マイクロバッチをトリガーするまでに蓄積する最小オフセット数maxTriggerDelay に到達した場合、マイクロバッチは実行されます。

spark.read.format("kafka")を使用したバッチ読み取りにのみ適用されるオフセットオプションについては、 DataFrameReader Kafka オプションを参照してください。

認証

Databricks は、クラウドマネージド Kafka サービス(AWS MSK、Azure Event Hubs、または Google Cloud Managed Kafka)への認証に Unity Catalog サービス資格情報を使用することを推奨します。

Key

デフォルト

有効な値

説明

databricks.serviceCredential

なし

任意の文字列

クラウド管理のKafkaサービスを認証するための Unity Catalog サービス資格情報の名前Databricks Runtime 16.1 以降で利用可能です。

databricks.serviceCredential.scope

なし

任意の文字列

サービス認証情報のOAuthスコープ。Databricks がお客様の Kafka サービスに対するスコープを自動的に推論できない場合にのみ、これを設定してください。

サービス資格情報が利用できない場合、SASL/SSLオプションを使用します(kafka.*プロパティを介して渡されます)。サービス認証情報を使用する場合、kafka.sasl.mechanismkafka.sasl.jaas.config、またはkafka.security.protocolを指定する必要はありません。

Key

デフォルト

有効な値

説明

kafka.security.protocol

なし

セキュリティプロトコル文字列。例: SASL_SSLSSL など PLAINTEXT

ブローカー通信のセキュリティプロトコル。

kafka.sasl.mechanism

なし

SASLメカニズム文字列、例えばPLAINSCRAM-SHA-256SCRAM-SHA-512OAUTHBEARERAWS_MSK_IAM

SASLメカニズムです。

kafka.sasl.jaas.config

なし

JAAS 構成文字列

JAASログイン設定文字列です。

kafka.sasl.login.callback.handler.class

なし

完全修飾クラス名

SASL認証用のログインコールバックハンドラーの完全修飾クラス名です。

kafka.sasl.client.callback.handler.class

なし

完全修飾クラス名

SASL認証用のクライアントコールバックハンドラーの完全修飾クラス名です。

kafka.ssl.truststore.location

なし

ファイルパス文字列

SSLトラストストアファイルへのパス。

kafka.ssl.truststore.password

なし

任意の文字列

SSLトラストストアファイルのパスワードです。

kafka.ssl.keystore.location

なし

ファイルパス文字列

SSLキーストアファイルのパス。

kafka.ssl.keystore.password

なし

任意の文字列

SSLキーストアファイルのパスワード。

完全な認証設定手順については、認証を参照してください。

Kinesis

Amazon Kinesis Data ストリームから読み取るには、これらのオプションを spark.readStream.format("kinesis") と共に使用します。streamName または streamARN のいずれかを指定する必要があります。ただし、両方は指定できません。

Key

デフォルト

有効な値

説明

streamName

なし

ストリーム名のコンマ区切りリスト

購読する Kinesis ストリーム名のコンマ区切りリスト

streamARN

なし

Kinesis ストリーム ARNs のコンマ区切りリスト

Kinesis ストリーム ARNs のコンマ区切りリストたとえば、arn:aws:kinesis:myarn1,arn:aws:kinesis:myarn2。Databricks Runtime 16.1 以降で利用可能です。

次のオプションも使用できます。

Key

デフォルト

有効な値

説明

awsAccessKey

なし

任意の文字列

AWSアクセスキーID。awsSecretKeyで指定する必要があります。

awsSecretKey

なし

任意の文字列

awsAccessKey に対応する AWS シークレットアクセスキーです。

coalesceBinSize

128000000

正の整数

結合後のおおよその目標ブロックサイズ (バイト単位)。

coalesceThresholdBlockSize

10000000

正の整数

自動結合が発生するしきい値。平均ブロックサイズがこの値より小さい場合、プリフェッチされたブロックが coalesceBinSize になるように結合されます。

consumerMode

polling

polling, efo

コンシューマータイプです。efo は、シャードあたり専用の 2 MB/秒のスループットで拡張ファンアウトを提供します。Databricks Runtime 11.3 LTS以降で利用可能です。

consumerName

ストリーミングクエリーID

単一のコンシューマー名、またはストリーム数に対応するカンマ区切りのリスト

EFOモードでKinesisサービスにクエリを登録するために使用されるコンシューマー名Databricks Runtime 11.3 LTS以降で利用可能です。

consumerNamePrefix

databricks_

任意の文字列

EFOモードでコンシューマーを登録する際に consumerName に付加されるプレフィックスです。Databricks Runtime 16.0以降で利用可能です。

consumerRefreshInterval

300s (最大3600s

期間を表す文字列。例: 1s

EFOコンシューマー登録が確認および更新される間隔。Databricks Runtime 11.3 LTS以降で利用可能です。

endpoint

ローカルで解決されたリージョン

任意の文字列

Kinesis Data Streams のリージョンエンドポイント

fetchBufferSize

20gb

バイトを表す文字列。例:2gb10mb

次のトリガーのためにバッファーするデータの量。これは停止条件であり、厳密な上限ではありません。指定された量よりも多くのデータがバッファーされる可能性があります。

initialPosition

latest

latesttrim_horizonearliestat_timestamp

ストリーム内のどこから読み取りを開始するか。trim_horizonearliest のエイリアスです。

at_timestamp については、Javaタイムスタンプ形式でJSON文字列を、{"at_timestamp": "06/25/2020 10:23:45 PDT"} のように指定します。カスタム形式も指定できます:{"at_timestamp": "06/25/2020 10:23:45 PDT", "format": "MM/dd/yyyy HH:mm:ss ZZZ"}

maxFetchDuration

10s

期間を表す文字列。例: 1m

プリフェッチされたデータが処理可能になるまでのバッファー時間。

maxFetchRate

1.0 (最大2.0

正の10進数

シャードごとのデータの最大プリフェッチ速度 (MB/s)。これは、フェッチのレートを制限し、Kinesis のスロットリングを回避します。Kinesis で許可される最大レートは 2.0 MB/s です。

maxRecordsPerFetch

10000

正の整数

Kinesis API リクエストごとに読み取られるレコードの数。Kinesis Producer Library を使用してサブレコードが集約されている場合、返されるレコードの数は実際にはさらに多くなる場合があります。

maxShardsPerDescribe

100

〜までの正の整数 10000

シャードを一覧表示する際に、APIコールごとに読み取るシャードの最大数。

minFetchPeriod

400ms (最小200ms

期間を表す文字列。例: 1s

連続するプリフェッチ試行間の最小待ち時間。これは、フェッチの頻度を制限し、Kinesis のスロットリングを回避するためのものです。Kinesis では 1 秒あたり最大 5 回のフェッチが許可されるため、最小値は 200ms となります。

region

ローカルで解決されたリージョン

任意の文字列

ストリームが定義されているリージョン

registeredConsumerId

なし

コンシューマー名またはARNsのコンマ区切りリスト

既存のEFOコンシューマーの識別子のカンマ区切りリスト。Databricks Runtime 16.1 以降で利用可能です。

registeredConsumerIdType

なし

name, ARN

registeredConsumerId 内の識別子が、コンシューマー名であるか、ARNs であるか。Databricks Runtime 16.1 以降で利用可能です。

requireConsumerDeregistration

false

true, false

クエリー終了時に拡張ファンアウトコンシューマーの登録を解除するかどうか。consumerModeefoに設定されていることが必要です。Databricks Runtime 11.3 LTS以降で利用可能です。

roleArn

なし

ARN文字列

Kinesisにアクセスするときに引き受けるIAMロールのARN。

roleExternalId

なし

任意の文字列

roleArnを使用してロールを引き受ける際に使用するオプションの外部ID。「外部 ID の使用方法」を参照してください。

roleSessionName

なし

任意の文字列

引き受けられるロールセッションの識別子。同じロールが異なるプリンシパルによって引き受けられる場合に、セッションを一意に識別します。

serviceCredential

なし

任意の文字列

Kinesis の認証に使用する Databricks サービス資格情報の名前。Databricks Runtime 16.1 以降で利用可能です。

stsEndpoint

なし

URL 文字列

roleArnを使用してロールを引き受けるときにAWS STSにアクセスするためのカスタムエンドポイント。

shardFetchInterval

1s

期間を表す文字列。例: 2m

リシャーディングイベントのために Kinesis をポーリングする間隔です。

shardsPerTask

5

正の整数

1 つの Spark タスクで並行してプリフェッチする Kinesis シャードの数。レイテンシーを最小限に抑えるには、# cores in cluster >= # Kinesis shards / shardsPerTaskを確認してください。

Kinesisからの読み取りの詳細については、「Amazon Kinesisに接続する」を参照してください。

パブ/サブ

spark.readStream.format("pubsub") と共にこれらのオプションを使用して、Google Pub/Sub を購読します。オプションsubscriptionIdtopicId、およびprojectIdが必要です。

Key

デフォルト

有効な値

説明

subscriptionId

なし

任意の文字列

必須。Pub/Sub サブスクリプション ID。コネクタは、サブスクリプションが存在しない場合は作成します。

topicId

なし

任意の文字列

必須。パブ/サブ トピックID。

projectId

なし

任意の文字列

必須。Google Cloud プロジェクト ID です。

numFetchPartitions

ストリーム初期化時に利用できるエグゼキューターの半分

正の整数

サブスクリプションから行を取得する並列Sparkタスクの数。

maxBytesPerTrigger

なし

正の整数

マイクロバッチごとに処理されるバイト数のソフトリミット。

maxRecordsPerFetch

1000

正の整数

処理する前にタスクごとに取得する行数。

maxFetchPeriod

10s

期間を表す文字列。例:1s または 1m

各タスクが行を処理する前に取得にかかる時間。Databricksはデフォルト値を使用することをお勧めします。

deleteSubscriptionOnStreamStop

false

true, false

true の場合、subscriptionId からのサブスクリプションは、ストリーミングクエリの終了時に削除されます。

serviceCredential

なし

任意の文字列

Pub/Subへの認証に使用するDatabricks サービス認証情報の名前。Databricks Runtime 16.1 以降で利用可能です。

clientEmail

なし

メールアドレス文字列

GoogleサービスアカウントのEメールアドレス。サービス認証情報を使用しない場合、必要です。

clientId

なし

任意の文字列

Google サービス アカウントのクライアントID。サービス認証情報を使用しない場合、必要です。

privateKey

なし

プライベートキー文字列

Google サービスアカウントの秘密鍵。サービス認証情報を使用しない場合、必要です。

privateKeyId

なし

任意の文字列

Googleサービスアカウントの秘密鍵のID。サービス認証情報を使用しない場合、必要です。

Pub/Subに関する詳細については、Google Pub/Sub をサブスクライブするをご覧ください。

Pulsar

spark.readStream.format("pulsar")でこれらのオプションを使用して、Apache Pulsarからストリームします。Databricks Runtime 14.1 以降で利用可能です。

以下のオプションは必須です。topictopics、またはtopicsPatternのいずれか1つを指定する必要があります。

Key

デフォルト

有効な値

説明

service.url

なし

PulsarサービスのURL文字列

Pulsarサービス用のPulsar serviceURL です。例えば pulsar://broker.example.com:6650

topic

なし

任意の文字列

コンシュームする単一のトピック名です。

topics

なし

トピック名のコンマ区切りリスト

消費対象のトピック名のコンマ区切りリスト

topicsPattern

なし

Javaの正規表現文字列

トピック名に一致するJavaの正規表現文字列です。

次のオプションもサポートされています。

Key

デフォルト

有効な値

説明

admin.url

なし

URL 文字列

Pulsar管理サービスのHTTP URL。maxBytesPerTriggerが設定されている場合に必須です。

allowDifferentTopicSchemas

false

true, false

複数の異なるスキーマを持つトピックが読み込まれる場合、自動スキーマに基づくトピック値の逆シリアル化を無効にするには、このオプションを使用してください。これがtrueの場合、生の値のみが返されます。

failOnDataLoss

true

true, false

データが失われた場合にクエリーを失敗させるかどうか。例えば、保持ポリシーによってトピックが削除されたり、メッセージの有効期限が切れたりすると、データが失われる可能性があります。

maxBytesPerTrigger

なし

正の整数

マイクロバッチごとに処理されるバイト数のソフトリミット。admin.url が必須です。

pollTimeoutMs

120000

正の整数

Pulsar からメッセージを読み取る際のタイムアウト (ミリ秒単位)

predefinedSubscription

なし

任意の文字列

コネクタがSparkアプリケーションの進捗状況を追跡するために使用する事前定義されたサブスクリプション名

startingOffsets

latest

latestearliest、またはJSONオフセット文字列

どこから読み取りを開始するか。

subscriptionPrefix

なし

任意の文字列

Sparkアプリケーションの進捗を追跡するためのランダムなサブスクリプションを生成する際に、コネクタによって使用されるプレフィックスです。

waitingForNonExistedTopic

false

true, false

コネクターが目的のトピックが作成されるまで待機するかどうか。

以下のオプションパターンを使用して、追加のPulsarクライアント、管理、およびリーダー構成を指定できます。

パターン

構成オプション

pulsar.admin.*

Pulsar管理構成

pulsar.client.*

pulsar.client.authPluginClassNameおよびpulsar.client.authParamsなどの認証オプションを含むPulsarクライアント構成

pulsar.reader.*

Pulsarリーダーの構成

Pulsarクライアントおよび管理者認証オプションの詳細については、「認証」を参照してください。

認証

DatabricksはPulsarへのトラストストアとキーストア認証をサポートしています。Databricks では、シークレットを使用して認証情報を格納することを推奨しています。「 シークレットの管理」を参照してください。

Key

デフォルト

有効な値

説明

pulsar.client.authPluginClassName

なし

完全修飾クラス名

認証プラグインの完全修飾クラス名。たとえば、org.apache.pulsar.client.impl.auth.AuthenticationTls

pulsar.client.authParams

なし

認証情報文字列

認証プラグインに認証資格情報が文字列として渡されます。たとえば、tlsCertFile:/path/to/my-role.cert.pem,tlsKeyFile:/path/to/my-role.key-pk8.pem

pulsar.client.useKeyStoreTls

false

true, false

trueにすると、PEM形式のファイルの代わりにKeyStoreベースのTLS構成を有効にします。

pulsar.client.tlsTrustStoreType

なし

任意の文字列

TLSトラストストアファイルの形式です。たとえば、JKS

pulsar.client.tlsTrustStorePath

なし

ファイルパス文字列

信頼されたCA証明書を含むTLSトラストストアファイルのパス。pulsar.client.useKeyStoreTlstrueの場合に必須

pulsar.client.tlsTrustStorePassword

なし

任意の文字列

TLSトラストストアファイルのパスワードです。

ストリームがPulsarAdminを使用している場合、次のオプションを設定することもできます。

Key

デフォルト

有効な値

説明

pulsar.admin.authPluginClassName

なし

完全修飾クラス名

Pulsar管理クライアント用の認証プラグインの完全修飾クラス名

pulsar.admin.authParams

なし

認証情報文字列

Pulsar管理クライアント認証プラグインの認証資格情報

pulsar.admin.useTls

なし

true, false

Pulsar管理クライアント接続にTLSを使用するかどうかの設定。

pulsar.admin.tlsAllowInsecureConnection

なし

true, false

Pulsar管理クライアントに対し、安全でないTLS接続を許可するかどうか。

pulsar.admin.tlsTrustCertsFilePath

なし

ファイルパス文字列

Pulsar管理クライアント用の信頼されたTLS証明書ファイルのパス。

pulsar.admin.useKeyStoreTls

なし

true, false

Pulsar管理クライアントにKeyStoreベースのTLSを使用するかどうか。

pulsar.admin.tlsTrustStoreType

なし

任意の文字列

Pulsar adminクライアントのTLSトラストストアの形式。たとえば、JKS

pulsar.admin.tlsTrustStorePath

なし

ファイルパス文字列

Pulsar管理クライアント用のTLSトラストストアファイルへのパス。pulsar.admin.useKeyStoreTlstrueの場合に必須

pulsar.admin.tlsTrustStorePassword

なし

任意の文字列

Pulsar管理クライアントのTLSトラストストアのパスワード。

認証の例については、Pulsarへの認証を参照してください。

DataFrameWriterのオプション

これらのオプションをDataFrameWriter.option()DataFrameWriterV2.option()と組み合わせて使用すると、Databricksがデータを書き込む方法を制御できます。

次の例では、Delta Lake テーブルを書き込む際にmergeSchemaTrueに設定します。

Python
df.write.format("delta").option("mergeSchema", True).saveAsTable("my_table")

Avro

Avroファイルを書き込む場合、以下のオプションが適用されます。

Key

デフォルト

有効な値

説明

avroSchema

なし

JSONスキーマの文字列

完全な Avro スキーマを JSON 文字列として。このオプションを使用して、Spark SQL の型を特定の Avro 型に変換してください。Avro ファイルの読み取りと書き込みに適用されます。

avroSchemaUrl

なし

URL 文字列

AvroスキーマファイルへのURL。外部に保存されているスキーマの場合、avroSchemaの代わりに使用します。avroSchemaと相互に排他的です。Avro ファイルの読み取りと書き込みに適用されます。

compression

snappy

uncompresseddeflatesnappy (default)bzip2xzzstandard

書き込み時に使用する圧縮コーデック。Avro ファイルの読み取りと書き込みに適用されます。

recordName

topLevelRecord

任意の文字列

出力 Avro スキーマにおけるトップレベルのレコード名です。Avro ファイルの読み取りと書き込みに適用されます。

positionalFieldMatching

false

true, false

SparkスキーマとAvroスキーマの間で、名前ではなくフィールド位置で列を一致させるかどうか。Avro ファイルの読み取りと書き込みに適用されます。

recordNamespace

空の文字列

任意の文字列

出力 Avro スキーマ内のトップレベルレコードのネームスペースです。Avro ファイルの読み取りと書き込みに適用されます。

Delta LakeとApache Iceberg

Delta LakeおよびApache Icebergテーブルを書き込む場合、以下のオプションが適用されます。

Key

デフォルト

有効な値

説明

clusterByAuto

false

true, false

クエリパターンに基づいてDatabricksがクラスタリング列を選択する自動リキッドクラスタリングを有効にするかどうか。mode("overwrite")の場合のみ有効です。appendモードでは使用できません。Databricks Runtime 16.4以降で利用可能です。テーブルにリキッドクラスタリングを使用するために適用されます。

mergeSchema

なし

true, false

書き込み操作においてスキーマ進化を有効にするかどうか。生成されたDataFrameの新しい列が、ターゲットテーブルのスキーマに追加されます。 バッチ追記とストリーミング追記の両方に適用されます。テーブルスキーマの更新に適用されます。

overwriteSchema

なし

true, false

上書き時にテーブルスキーマとパーティショニングを置き換えるかどうか。replaceWhereなしでmode("overwrite")が必要です。partitionOverwriteModeと併用できません。テーブルスキーマの更新に適用されます。

partitionOverwriteMode

なし

static, dynamic

パーティション上書きモード。これをdynamicに設定すると、新しいデータを含むパーティションのみが上書きされ、その他のすべてのパーティションは変更されずに残ります。レガシーモードは、サーバレス コンピュートまたはDatabricks SQLではサポートされていません。Delta Lakeでデータを選択的に上書きするに適用されます。

replaceOn

なし

ブーリアン式文字列

対象テーブルの行を、パッケージクエリからの行で置き換えるブール式。 対象テーブルとソースクエリの両方の列を参照できます。ターゲット内の行のうち、ソースの行と一致する行は削除され、置き換えられます。ソースが空の場合、削除は行われません。列参照の曖昧さを解消するには、 targetAliasを使用してください。Databricks Runtime 17.1以降で利用可能です。Delta Lake を使用してデータを選択的に上書きする場合に適用されます。

replaceUsing

なし

列名のコンマ区切りリスト

対象テーブルとソースクエリ間の行を照合するために使用される、カンマ区切りの列名のリスト。ターゲットとソースの両方に、リストされているすべての列が含まれている必要があります。対象レコード内の行のうち、ソースレコードの行と等価比較で一致する行は削除され、置き換えられます。NULL値は等しくないものとみなされ、一致しません。Databricks Runtime 16.3以降で利用可能です。Delta Lake を使用してデータを選択的に上書きする場合に適用されます。

replaceWhere

なし

述語式文字列

述語式。述語に一致するレコードのみをアトミックに上書きします。Delta Lake を使用してデータを選択的に上書きする場合に適用されます。

targetAlias

なし

任意の文字列

対象テーブルの文字列エイリアス。条件が対象テーブルとソースクエリの両方の列を参照する場合、 replaceOnまたはreplaceWhereと組み合わせて使用すると、列参照の曖昧さを解消できます。Delta Lake を使用してデータを選択的に上書きする場合に適用されます。

txnAppId

なし

任意の文字列

foreachBatchの操作で冪等書き込みを行うアプリケーションを識別する一意の文字列。複数の Delta Lake テーブルへの書き込みが正確に 1 回のみ行われるようにするには、 txnVersionと併用してください。適用対象:冪等テーブル書き込みにはforeachBatchを使用します

txnVersion

なし

単調増加の整数

foreachBatch操作における冪等書き込みのトランザクション バージョンとして使用される、単調増加する数値。複数の Delta Lake テーブルへの書き込みが正確に 1 回のみ行われるようにするには、 txnAppIdと併用してください。適用対象:冪等テーブル書き込みにはforeachBatchを使用します

optimizeWrite

なし

true, false

この書き込み操作で自動最適化書き込みを有効にするかどうか。spark.databricks.delta.optimizeWrite.enabled設定を上書きします。DatabricksにおけるDelta Lakeとは何か?に適用されます。

userMetadata

なし

任意の文字列

書き込み操作のコミットメタデータに追加される、ユーザー定義の文字列。DESCRIBE HISTORYの出力に表示されます。カスタムメタデータを使用してテーブルを拡張する場合に適用されます。

CSV

次のオプションは、CSVファイルの書き込み時に適用されます。

Key

デフォルト

有効な値

説明

charToEscapeQuoteEscaping

\0 (無効)

1文字

エスケープ文字が引用文字と異なる場合に、エスケープ文字をエスケープするために使用される文字。csv (DataFrameWriter)に適用されます。

compression

none

none (default)bzip2gziplz4snappydeflatezstd

書き込み時に使用する圧縮コーデック。csv (データフレームWriter) に適用されます。

dateFormat

yyyy-MM-dd

日付形式文字列

日付列の値の書式指定文字列。csv (DataFrameWriter)に適用されます。

emptyValue

空の文字列

任意の文字列

空の値(null以外の値)に対して書き込まれる文字列。csv (DataFrameWriter)に適用されます。

encoding

UTF-8

java.nio.charset.Charsetの名前

出力ファイルの文字エンコーディング。csv (DataFrameWriter)に適用されます。

escape

\

1文字

引用符で囲まれた値をエスケープするために使用される文字。csv (DataFrameWriter)に適用されます。

escapeQuotes

true

true, false

引用符で囲まれたフィールド値内の引用符文字をエスケープするかどうか。csv (DataFrameWriter)に適用されます。

header

false

true, false

出力の最初の行に列名を表示するかどうか。csv (DataFrameWriter)に適用されます。

ignoreLeadingWhiteSpace

false

true, false

値を書き込む際に、先頭の空白文字を削除するかどうか。csv (DataFrameWriter)に適用されます。

ignoreTrailingWhiteSpace

false

true, false

値を書き込む際に、末尾の空白文字を削除するかどうか。csv (DataFrameWriter)に適用されます。

lineSep

\n

文字列

レコード間で使用される行区切り文字列。csv (DataFrameWriter)に適用されます。

locale

en-US

java.util.Locale識別子

java.util.Locale識別子。Javaロケール識別子。CSV内でのデフォルトの日付、タイムスタンプ、および小数点の解析に影響を与えます。

nullValue

空の文字列

任意の文字列

null値に対して書き込まれた文字列。csv (DataFrameWriter)に適用されます。

quote

"

1文字

区切り文字を含むフィールド値を引用するために使用される文字。csv (DataFrameWriter)に適用されます。

quoteAll

false

true, false

内容に関わらず、すべてのフィールド値を引用符で囲むかどうか。csv (DataFrameWriter)に適用されます。

sep

,

文字列

フィールド区切り文字。csv (DataFrameWriter)に適用されます。

timestampFormat

yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]

タイムスタンプ形式の文字列

タイムスタンプ列の値の書式指定文字列。csv (DataFrameWriter)に適用されます。

timestampNTZFormat

yyyy-MM-dd'T'HH:mm:ss[.SSS]

タイムスタンプ形式の文字列

タイムゾーン( TimestampNTZType )列の値を含まないタイムスタンプのフォーマット文字列。

Excel

次のオプションはExcelファイルの書き込み時に適用されます。

Key

デフォルト

有効な値

説明

dataAddress

なし

シート名またはセル参照文字列

書き込みを開始するシート名または開始セル。省略した場合、 Sheet1という名前のシートのセルA1から書き込みを開始します。シート名( SheetName )または単一セル参照( SheetName!A1 )を受け入れます。セル範囲指定による書き込みはサポートされていません。

dateFormatInWrite

yyyy-mm-dd

Excelの日付形式文字列

Date列にExcelセル書式文字列が適用されました。Excel形式の構文を使用します。

headerRows

0

0, 1

列名を最初の行として書き込むかどうか。

timestampNTZFormat

yyyy-mm-dd hh:mm:ss

Excel タイムスタンプ書式文字列

TimestampNTZTimestamp列にExcelのセル書式文字列が適用されました。Excel形式の構文を使用します。

version

xlsx

xlsx, xls

書き込む Excel ファイル形式のバージョン。

JSON

JSON ファイルを書き込む際に、次のオプションが適用されます。

Key

デフォルト

有効な値

説明

compression

none

nonebzip2gziplz4snappydeflatezstd

書き込み時に使用する圧縮コーデック。JSON(データフレームWriter) に適用されます。

dateFormat

yyyy-MM-dd

日付形式文字列

日付列の値の書式指定文字列。JSON (DataFrameWriter)に適用されます。

encoding

UTF-8

java.nio.charset.Charsetの名前

出力ファイルの文字エンコーディング。JSON (DataFrameWriter)に適用されます。

ignoreNullFields

spark.sql.jsonGenerator.ignoreNullFields

true, false

JSON出力からnull値を持つフィールドを除外するかどうか。JSON (DataFrameWriter)に適用されます。

lineSep

\n

文字列

レコード間で使用される行区切り文字列。JSON (DataFrameWriter)に適用されます。

locale

en-US

java.util.Locale識別子

Javaロケール識別子は、JSON内のデフォルトの日付、タイムスタンプ、小数の解析に影響します。

pretty

false

true, false

整形された(インデント付き、複数行)JSON出力を有効にするかどうか。

sortKeys

false

true, false

出力時にJSONオブジェクトのキーをアルファベット順に並べ替えるかどうか。決定論的な出力を生成するのに役立ちます。

timestampFormat

yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]

タイムスタンプ形式の文字列

タイムスタンプ列の値の書式指定文字列。JSON (DataFrameWriter)に適用されます。

timestampNTZFormat

yyyy-MM-dd'T'HH:mm:ss[.SSS]

タイムスタンプ形式の文字列

タイムゾーン( TimestampNTZType )列の値を含まないタイムスタンプのフォーマット文字列。

writeNonAsciiCharacterAsCodePoint

false

true, false

出力時に、非ASCII文字をリテラルUTF-8文字ではなく、Unicodeエスケープシーケンス\uXXXXとしてエンコードするかどうか。

ORC

ORC ファイルを書き込む際に、以下のオプションが適用されます。

Key

デフォルト

有効な値

説明

compression

zstd

noneuncompressedsnappyzliblzozstdlz4brotli

書き込み時に使用する圧縮コーデック。orc (データフレームライター) に適用されます。

Parquet

次のオプションは、Parquet ファイルを書き込む際に適用されます。

Key

デフォルト

有効な値

説明

compression

snappy

noneuncompressedsnappygziplzobrotlilz4lz4_rawzstd

書き込み時に使用する圧縮コーデック。適用対象:Parquet(データフレームWriter)

spark.sql.parquet.outputTimestampType

INT96

INT96TIMESTAMP_MICROSTIMESTAMP_MILLIS

タイムスタンプ列のエンコードに使用される物理タイプです。標準のタイムスタンプ型をサポートしないレガシー Parquet リーダーとの互換性のため、INT96 を使用します。

文章

次のオプションは、テキストファイルの書き込み時に適用されます。

Key

デフォルト

有効な値

説明

compression

none

nonebzip2gziplz4snappydeflatezstd

書き込み時に使用する圧縮コーデック。データフレームWriter に適用されます。

encoding

UTF-8

java.nio.charset.Charsetの名前

出力ファイルの文字エンコーディング。

lineSep

\n

文字列

レコード間で使用される行区切り文字列。テキスト(DataFrameWriter)に適用されます。

XML

次のオプションは、XML ファイルの書き込み時に適用されます。

Key

デフォルト

有効な値

説明

arrayElementName

item

任意の文字列

明示的な名前を持たない配列要素の要素名。XML (DataFrameWriter)に適用されます。

attributePrefix

_

任意の文字列

XML属性に対応するフィールド名の前に付加される接頭辞。XML (DataFrameWriter)に適用されます。

compression

none

nonebzip2gziplz4snappydeflatezstd

書き込み時に使用する圧縮コーデック。xml (データフレームWriter) に適用されます。

dateFormat

yyyy-MM-dd

日付形式文字列

日付列の値の書式指定文字列。XML (DataFrameWriter)に適用されます。

declaration

version="1.0" encoding="UTF-8" standalone="yes"

XML宣言文字列、または抑制するための空の文字列

各出力ファイルの先頭に記述されるXML宣言文字列。宣言を抑制するには、空の文字列を設定します。XML (DataFrameWriter)に適用されます。

encoding

UTF-8

java.nio.charset.Charsetの名前

出力ファイルの文字エンコーディング。XML (DataFrameWriter)に適用されます。

indent

4つのスペース

任意の文字列

出力において子要素をインデントするために使用される文字列。インデントをオフにして各行を1行に表示するには、空の文字列に設定します。

locale

en-US

java.util.Locale識別子

Javaロケール識別子。XML内でのデフォルトの日付、タイムスタンプ、小数点の書式設定に影響します。

nullValue

null

任意の文字列

null値に対して書き込まれる文字列。nullに設定すると、nullフィールドの属性と子要素は省略されます。XML (DataFrameWriter)に適用されます。

rootTag

ROWS

任意の文字列

出力内のすべての行要素を囲むルート要素タグ。XML (DataFrameWriter)に適用されます。

rowTag

ROW

任意の文字列

出力における行を表す要素タグ。XML (DataFrameWriter)に適用されます。

singleVariantColumn

なし

列名文字列

XMLファイルに書き込む単一のバリアント列の名前。XML (DataFrameWriter)に適用されます。

timestampFormat

yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]

タイムスタンプ形式の文字列

タイムスタンプ列の値の書式指定文字列。XML (DataFrameWriter)に適用されます。

timestampNTZFormat

yyyy-MM-dd'T'HH:mm:ss[.SSS]

タイムスタンプ形式の文字列

タイムゾーン列の値を含まないタイムスタンプのフォーマット文字列。XML (DataFrameWriter)に適用されます。

validateName

true

true, false

列名が有効な XML 要素識別子でない場合に例外をスローするかどうか。XML (DataFrameWriter)に適用されます。

valueTag

_VALUE

任意の文字列

属性または子要素を持つXML要素内の文字データに使用されるフィールド名。XML (DataFrameWriter)に適用されます。

DataStreamWriterのオプション

ストリーミング書き込みを設定するには、これらのオプションをDataStreamWriter.option()と組み合わせて使用します。

次の例は、ストリームのチェックポイントの位置を設定するものです。

Python
(df.writeStream
.format("delta")
.option("checkpointLocation", "/path/to/checkpoint")
.start("/path/to/table"))

一般

次のオプションは、すべてのストリーミング書き込み操作に適用されます。

Key

デフォルト

有効な値

説明

checkpointLocation

なし(必須)

パス文字列

ストリーミングクエリのチェックポイントディレクトリへのパス。耐障害性と、厳密に1回のみの処理を保証するために必要です。各ストリーミングクエリは、固有のチェックポイント位置を使用する必要があります。Databricks 、チェックポイントをUnity Catalogボリュームまたはクラウド ストレージ パスに保存することをお勧めします。 構造化ストリーミングのチェックポイントを参照してください。

path

なし

パス文字列

Parquetなどのファイルベースのストリーミングシンクの出力パス。ファイルベースのフォーマットにのみ適用されます。

コンソールシンク

コンソールシンクにストリームを書き込む際に、次のオプションが適用されます。

Key

デフォルト

有効な値

説明

numRows

20

正の整数

コンソールシンクに書き込む際に、各マイクロバッチごとに表示する行数。

truncate

true

true, false

行を表示する際に、長い文字列を切り詰めるかどうか。文字列値全体を表示するには、 falseに設定してください。

Delta Lake

format("delta")を使用してストリームを Delta Lake テーブルに書き込む場合、以下のオプションが適用されます。overwriteSchemareplaceWherepartitionOverwriteModeなどの上書き専用オプションは、ストリーミング書き込みではサポートされていません。

Key

デフォルト

有効な値

説明

mergeSchema

false

true, false

ストリーミングDataFrameに新しい列が含まれている場合に、 Delta Lakeテーブル スキーマを進化させるかどうか。 追記出力モードにのみ適用されます。テーブルスキーマの更新に適用されます。

userMetadata

なし

任意の文字列

書き込み操作のコミットメタデータに追加される、ユーザー定義の文字列。DESCRIBE HISTORYの出力に表示されます。カスタムメタデータを使用してテーブルを拡張する場合に適用されます。

ファイルシンク

以下のオプションは、ストリームをファイルベースの形式(Parquet、JSON、CSV、ORC、テキスト)に書き込む場合に適用されます。フォーマット固有のオプションについては、 DataFrameWriter オプションを参照してください。

Key

デフォルト

有効な値

説明

retention

なし

7 daysのような時刻文字列、または 24 hours

耐障害性および圧縮に使用されるシンクメタデータファイルの保持期間設定されていない場合、メタデータファイルは無期限に保持されます。

Kafkaシンク

以下のオプションは、Kafka への書き込み時に適用されます。

Key

デフォルト

有効な値

説明

kafka.bootstrap.servers

なし

カンマ区切りの host:port 文字列のリスト

必須。カンマ区切りの Kafka ブローカーhost:portアドレスのリスト。

topic

なし

任意の文字列

すべての行の対象となるKafkaトピック。DataFrameにtopic列が含まれていない場合は必須です。

kafka.*

なし

任意の Kafka プロデューサー構成 の値

kafka.で始まるKafka プロデューサー構成。例えば、 kafka.compression.type

メモリシンク

以下のオプションは、メモリシンクにストリームを書き込む際に適用されます。

Key

デフォルト

有効な値

説明

queryName

なし(必須)

任意の文字列

クエリが書き込みを行うインメモリテーブルの名前。メモリシンクに必要です。.queryName()を介して設定することも可能です。

mode

exactlyonce

exactlyonce, atleastonce

メモリシンクの配信保証です。exactlyonce は、厳密に 1 回セマンティクスでマイクロバッチモードを使用します。atleastonce は「最低1回」のセマンティクスで連続モードを使用しています。

Spark関数オプション

Spark SQLの一部の組み込み関数は、解析またはシリアル化の動作を制御するoptionsマップを受け入れます。オプションをPython dictまたはScala Map[String, String]として渡します。

次の例は、JSON列を解析しながら、不正な形式のレコードを削除します。

Python
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StructField, StringType

schema = StructType([StructField("name", StringType())])
df = df.withColumn("parsed", from_json("json_col", schema, {"mode": "DROPMALFORMED"}))

Avro

Avro関数は、対応するDataFrameオプションと同じオプションを受け入れます。

次の例は、スキーマ進化が有効になっているAvroカラムをデコードするものです。

Python
from pyspark.sql.functions import from_avro

df = df.withColumn("decoded", from_avro("avro_col", json_schema, {"avroSchemaEvolutionMode": "restart"}))

さらに、スキーマレジストリのバリアントfrom_avroto_avroは、以下のオプションを受け入れます。

Key

デフォルト

有効な値

説明

schemaId

なし

スキーマIDの整数

jsonFormatSchemaと互換性のないスキーマでエンコードされた Avro データをデコードする際に使用する Confluent Schema Registry のスキーマ ID。from_avroのみに適用されます。

confluent.schema.registry.*

なし

任意の Confluent SR クライアントプロパティ値

Confluent Schema Registryクライアント構成プロパティ。Confluent SRクライアントのプロパティを渡す際は、このプレフィックスを使用します。例えば、基本認証情報の場合はconfluent.schema.registry.basic.auth.user.info使用します。スキーマレジストリのバリアントfrom_avroおよびto_avroに必要です。

CSV

CSV関数は、対応するDataFrameのオプションと同じオプションを受け入れます。

以下の例は、カスタム区切り文字とNULL値を含むCSVファイルを読み込む例です。

Python
from pyspark.sql.functions import from_csv
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

schema = StructType([StructField("id", IntegerType()), StructField("name", StringType())])
df = df.withColumn("parsed", from_csv("csv_col", schema, {"sep": "|", "nullValue": "N/A"}))

JSON

JSON関数は、対応するDataFrameオプションと同じオプションを受け入れます。

次の例では、 NULLのフィールドを無視し、整形フォーマットを有効にしたJSONを書き込みます。

Python
from pyspark.sql.functions import to_json

df = df.withColumn("json_str", to_json("struct_col", {"pretty": "true", "ignoreNullFields": "true"}))

Protobuf

from_protobuf また、 to_protobufファイルベースのデータソースを使用しません。これらの関数を使用すると、Protobufデータは常にバイナリ列として読み書きされます。オプションはMap[String, String]として渡され、大文字と小文字が区別されます。

次の例は、PERMISSIVEモードを使用してProtobufカラムをデコードする例です。

Python
from pyspark.sql.functions import from_protobuf

df = df.withColumn("decoded", from_protobuf("proto_col", "MyMessage", "/path/to/descriptor.desc",
{"mode": "PERMISSIVE", "enums.as.ints": "true"}))

Protobuf関数は以下のオプションを使用します。

Key

デフォルト

有効な値

説明

mode

FAILFAST

FAILFAST, PERMISSIVE

破損したレコードを処理する方法FAILFASTは例外をスローします。PERMISSIVE は不正な形式のフィールドを null に設定します。from_protobuf に適用されます。

recursive.fields.max.depth

-1 (無効)

0 - 10

再帰 Protobuf フィールドの最大再帰深度。再帰フィールドのサポートをオフにするには、0に設定してください。from_protobufに適用されます。

convert.any.fields.to.json

false

true, false

Protobuf AnyフィールドをSTRUCTではなく JSON 文字列に変換するかどうか。from_protobufに適用されます。

emit.default.values

false

true, false

フィールドをゼロまたはデフォルト値で出力するかどうか(proto3セマンティクス)。falseの場合、デフォルト値を持つフィールドは出力から省略されます。from_protobufに適用されます。

enums.as.ints

false

true, false

列挙型フィールドを文字列ではなく整数値としてレンダリングするかどうか。from_protobufに適用されます。

upcast.unsigned.ints

false

true, false

整数オーバーフローを防ぐために、 uint32Longに、 uint64Decimal(20,0)にアップキャストするかどうか。from_protobufに適用されます。

unwrap.primitive.wrapper.types

false

true, false

google.protobufラッパー型 (例えばInt32ValueStringValue ) を対応するプリミティブ Spark 型に展開するかどうか。from_protobufに適用されます。

retain.empty.message.types

false

true, false

出力スキーマに空のProtobufメッセージ型を保持するかどうか(ダミー列を挿入して)。from_protobufに適用されます。

schema.registry.subject

なし

任意の文字列

スキーマレジストリのサブジェクト名。スキーマレジストリのバリアントfrom_protobufおよびto_protobufを使用する場合に必要です。

schema.registry.address

なし

host:port 文字列

スキーマレジストリのアドレス(ホスト名とポート番号)。スキーマレジストリのバリアントfrom_protobufおよびto_protobufを使用する場合に必要です。

schema.registry.protobuf.name

なし

任意の文字列

スキーマレジストリのサブジェクトに複数のメッセージが含まれている場合に、どのProtobufメッセージを使用するかを指定します。任意。

schema.registry.schema.evolution.mode

"restart"

"restart", "none"

受信レコードでより新しいスキーマIDが検出された場合のスキーマ変更の処理方法。"restart"UnknownFieldException でクエリを終了します。変更を反映させるには、失敗時にジョブが再起動するように構成してください。"none" はスキーマ ID の変更を無視し、元のスキーマで新しいレコードを解析します。

confluent.schema.registry.<option>

任意の有効な Confluent スキーマレジストリクライアントオプション値

プレフィックス "confluent.schema.registry" を使用して、任意の Confluent スキーマレジストリクライアント オプションを渡します。たとえば、"confluent.schema.registry.basic.auth.credentials.source""USER_INFO" に、"confluent.schema.registry.basic.auth.user.info""<KEY>:<SECRET>" に設定して、基本認証を構成します。

XML

XML関数は、対応するDataFrameオプションと同じオプションを受け入れます。

次の例では、カスタムのルートタグと行タグを使用してXMLを書き込みます。

Python
from pyspark.sql.functions import to_xml

df = df.withColumn("xml_str", to_xml("struct_col", {"rootTag": "records", "rowTag": "record"}))