Spark APIオプションのリファレンス
このページでは、データの読み書きを行うSpark APIsで利用可能な入力および出力オプションを一覧表示します。
DataFrameReaderのオプション
これらのオプションをDataFrameReader.option() 、 DataFrameReader.options() 、 read_files 、 COPY INTO 、およびAuto Loaderと組み合わせて使用すると、Databricksがデータファイルを読み込む方法を制御できます。
例
次の例では、JSON ファイルの読み込み時にmultiLineをTrueに設定します。
- Python
- Scala
- SQL
df = spark.read.format("json").option("multiLine", True).load("/path/to/data")
val df = spark.read.format("json").option("multiLine", "true").load("/path/to/data")
SELECT * FROM read_files("/path/to/data", format => "json", multiLine => true)
一般
次のオプションは、すべてのファイル形式に適用されます。
Key | デフォルト | 説明 |
|---|---|---|
|
| 破損したファイルを無視するかどうか。もしそうであれば、 Spark破損したファイルに遭遇しても実行を続け、読み取られた内容は引き続き返されます。 |
|
| 欠落したファイルを無視するかどうか。true の場合、ファイルが見つからない場合でもSparkジョブは実行を続行し、コンテンツは引き続き返されます。 Databricks Runtime 11.3 LTS以降で利用可能です。 |
| なし | オプションのタイムスタンプをフィルターとして使用することで、指定されたタイムスタンプ以降の更新タイムスタンプを持つファイルのみを取り込むことができます。 |
| なし | オプションのタイムスタンプをフィルターとして使用することで、指定されたタイムスタンプより前の更新タイムスタンプを持つファイルのみを取り込むことができます。 |
| なし | ファイルを選択するための潜在的なグロブパターン。 |
|
|
|
Avro
Key | デフォルト | 説明 |
|---|---|---|
| なし | ユーザーがAvro形式で提供するオプションのスキーマ。Avroを読み込む際、このオプションは、実際のAvroスキーマとは互換性があるものの異なる、進化したスキーマに設定できます。逆シリアル化スキーマは、進化後のスキーマと一致している。例えば、デフォルト値を持つ追加の列を含む進化したスキーマを設定した場合、読み取り結果にはその新しい列も含まれます。 |
|
| スキーマ レジストリを使用する場合のスキーマ進化の処理方法。 有効な値: |
|
| ユリウス暦と開始グレゴリオ暦の間での DATE 値と TIMESTAMP 値のリベースを制御します。 有効な値: |
|
| Avro Union型に安定したフィールド名を使用するかどうか。有効にすると、共用型フィールド名は、その型名を小文字にしたものから派生します(例: |
|
| 複数のファイルにまたがるスキーマを推測し、各ファイルのスキーマをマージするかどうか。 Avroの |
|
| 破損したレコードを処理するためのパーサーモード。有効な値: |
|
|
|
| なし | 再帰的なAvroフィールドの最大再帰深度。すべての再帰フィールドを切り捨てるには |
| なし | データ型の不一致、スキーマの不一致(列の大文字小文字の区別を含む)などが原因で解析できないすべてのデータを、別の列に収集するかどうか。この列は、 Auto Loader使用するときに自動的に組み込まれます。
詳細については、 「救出されたデータ列とは何ですか?」を参照してください。 |
|
|
|
CSV
Key | デフォルト | 説明 |
|---|---|---|
| なし | 不正なCSVレコードに関する情報を記録するファイルを保存するパス。 |
|
| 引用符のエスケープに使用される文字をエスケープするために使用される文字。たとえば、次のレコードの場合:
|
|
| Auto Loaderに対応しています。 |
|
| テキスト行の先頭にある場合に、行コメントを表す文字を定義します。コメントのスキップを無効にするには、 |
|
| 日付文字列を解析するための形式。 |
| 空の文字列 | 空の値の文字列形式。 |
|
| 指定された形式で値を解析できない場合に、従来の日付およびタイムスタンプの解析動作に戻すかどうか。 |
|
| CSVファイルのエンコード名。オプションのリストについては、 |
|
| 指定したスキーマまたは推論されたスキーマを CSV ファイルに強制的に適用するかどうか。 このオプションを有効にすると、CSV ファイルのヘッダーは無視されます。 このオプションは、 Auto Loader を使用してデータをレスキューし、スキーマ進化を許可する場合、デフォルトで無視されます。 |
|
| データの解析時に使用するエスケープ文字。 |
|
| 想定されるファイル名拡張子。この拡張子を持たないファイルは、読み込み時に除外されます。 |
|
| CSVレコードにスキーマに存在しない列が含まれている場合に、エラーとして処理するかどうか。 |
|
| フィールド値が、拡張せずに宣言されたスキーマ型として解析できない場合に、エラーとするかどうか。 |
|
| CSVファイルにヘッダーが含まれているかどうか。Auto Loaderは、スキーマを推論するときにファイルにヘッダーがあると想定します。 |
|
| 解析された各値の先頭の空白を無視するかどうか。 |
|
| 解析された各値の末尾の空白を無視するかどうか。 |
|
| 解析されたCSVレコードのデータ型を推測するか、すべての列が |
|
| CSVパーサーのバッファサイズ(バイト単位)。大きなCSVファイルを解析する際のメモリ使用量を調整するのに役立ちます。有効な値:正の整数。 |
| なし、これは | 連続する2つのCSVレコード間の文字列。 |
|
|
|
|
| 解析対象となる値から想定される最大文字数。メモリエラーを回避するために使用できます。デフォルト値は |
|
| レコードが持つことができる列数の上限。有効な値:正の整数。 |
|
| 複数のファイルにまたがるスキーマを推測し、各ファイルのスキーマをマージするかどうか。 スキーマを推論するときに Auto Loader するためにデフォルトで有効になります。 |
|
| 不正な形式のレコードを処理するためのパーサーモード。有効な値: |
|
| CSVレコードが複数行にまたがるかどうか。 |
|
|
|
|
|
|
| 空の文字列 | null値の文字列形式。 |
|
| ファイルの読み取り中に、ヘッダーで宣言された列をスキーマの大文字と小文字を区別して配置するかどうか。Auto Loaderのデフォルトでは、これは |
|
|
|
|
| 可能な場合は、文字列をタイムスタンプではなく日付として推測しようとします。また |
|
| フィールド区切り文字が値の一部である場合に、値をエスケープするために使用される文字。 |
|
|
|
| なし | データ型の不一致、スキーマの不一致(列の大文字小文字の区別を含む)などが原因で解析できないすべてのデータを、別の列に収集するかどうか。この列は、 Auto Loader使用するときに自動的に組み込まれます。 詳細については、 「救出されたデータ列とは何ですか?」を参照してください。
|
|
| 列間の区切り記号文字列。 |
| なし | 列名を指定すると、各フィールドを個別の列に解析するのではなく、CSVレコード全体をその名前の単一の |
|
| CSVファイルの先頭から無視する行数(コメント行と空行を含む)。 |
|
|
|
|
| タイムスタンプ文字列を解析するための形式。 |
|
| タイムゾーン( |
| なし | タイムスタンプと日付を解析するときに使用する |
|
| エスケープされていない引用符を処理するための戦略。許可されるオプション:
|
Excel
Key | デフォルト | 説明 |
|---|---|---|
| なし | Excel構文で読み込むセル範囲。省略した場合、最初のシートから有効なセルをすべて読み取ります。指定したシートから範囲を読み取るには |
|
| 列名ヘッダーとして使用する初期行数。 |
|
| Excelワークブックに対して実行する操作。有効な値: |
|
| Excelに文字列として保存される、タイムゾーンなしのタイムスタンプ値のカスタム形式文字列。 カスタム日付形式は、Datetime パターンの形式に従います。 |
|
| 文字列値のカスタムフォーマット文字列は |
JSON
Key | デフォルト | 説明 |
|---|---|---|
|
| バックスラッシュの後に続く文字のエスケープを許可するかどうか。有効にしない場合、JSON仕様で明示的にリストされている文字のみをエスケープできます。 |
|
| 構文解析されたコンテンツ内で、Java、C、C++スタイルのコメント( |
|
| 数値でない( |
|
| 整数が追加の(無視できる)ゼロ(例えば、 |
|
| 文字列(名前と文字列値)の引用符付けに一重引用符(アポストロフィ、文字 |
|
| JSON文字列に、エスケープされていない制御文字(タブ文字や改行文字を含む、値が32未満のASCII文字)を含めることを許可するかどうか。 |
|
| JavaScriptでは許可されているが、JSON仕様では許可されていない、引用符なしのフィールド名の使用を許可するかどうか。 |
| なし | バリアント値に使用されるエンコードは、 JSONの形式で記述されます。 インラインJSONとして保存される代わりにBase85エンコードされたVariant値をデコードするには、 |
| なし | 不正なJSONレコードに関する情報を記録するためのファイルを保存するパス。 ファイルベースのデータソースで
|
|
| 形式が正しくなく、解析できないレコードを格納するための列。解析用の |
|
| 日付文字列を解析するための形式。 |
|
| スキーマ推論中に、すべてのNULL値の列、または空の配列および構造体を無視するかどうか。 |
|
| JSONファイルのエンコーディングの名前。オプションのリストについては、 |
|
| タイムスタンプ文字列を |
| なし、これは | 連続する2つのJSONレコード間の文字列。 |
|
|
|
|
| JSONオブジェクトおよび配列の最大許容ネスト深度。深くネストされたドキュメントの場合は、この値を増やしてください。有効な値:正の整数。 |
|
| JSON入力における数値トークンの最大長。大きな数値リテラルを含むJSONの場合は、この値を増やしてください。有効な値:正の整数。 |
| 無制限 | JSON入力における文字列値の最大長。大きな文字列を含むJSONを解析する際のメモリ使用量を制限するように設定します。有効な値:正の整数。 |
|
| 不正な形式のレコードを処理するためのパーサーモード。有効な値: |
|
| JSONレコードが複数行にまたがるかどうか。 |
|
| 可能な場合は、float型またはdouble型ではなく、文字列を |
|
| 数値やブール値などのプリミティブ型を |
|
|
|
| なし | データ型の不一致またはスキーマの不一致(列の大文字小文字の区別を含む)により解析できないすべてのデータを、別の列に収集するかどうか。この列は、 Auto Loader使用するときに自動的に組み込まれます。 詳細については、 「救出されたデータ列とは何ですか?」を参照してください。
|
| なし | JSONドキュメント全体を取り込み、指定された文字列を列名とする単一のVariant列に解析するかどうか。設定されていない場合、JSONフィールドはそれぞれ独自の列に取り込まれます。有効な値:任意の文字列。 |
|
| タイムスタンプ文字列を解析するための形式。 |
|
| タイムゾーン( |
| なし | タイムスタンプと日付を解析するときに使用する |
|
| 型アップグレード例外(例えば、値を宣言された列型に拡張できない場合など)を例外をスローするのではなく、不良レコードとして扱うかどうか。 |
ORC
Key | デフォルト | 説明 |
|---|---|---|
|
| 複数ファイルからスキーマを推定し、各ファイルのスキーマをマージするかどうか。 |
Parquet
Key | デフォルト | 説明 |
|---|---|---|
|
| ユリウス暦と開始グレゴリオ暦の間での DATE 値と TIMESTAMP 値のリベースを制御します。 有効な値: |
|
| ユリウス暦と開始グレゴリオ暦の間での INT96 タイムスタンプ値のリベースを制御します。 有効な値: |
|
| 複数ファイルからスキーマを推定し、各ファイルのスキーマをマージするかどうか。 |
|
|
|
| なし | データ型の不一致、スキーマの不一致(列の大文字小文字の区別を含む)などが原因で解析できないすべてのデータを、別の列に収集するかどうか。この列は、 Auto Loader使用するときに自動的に組み込まれます。 詳細については、 「救出されたデータ列とは何ですか?」を参照してください。
|
文章
Key | デフォルト | 説明 |
|---|---|---|
|
| TEXTファイルの行区切り文字のエンコーディング名。 オプションの一覧については、 |
| なし、これは | 連続する2つのTEXTレコード間の文字列。 |
|
| ファイルを単一レコードとして読み取るかどうか。 |
XML
Key | デフォルト | 説明 |
|---|---|---|
| なし | 行として扱うXMLファイルの行タグ。例の XML |
|
| スキーマ推論に使用する行の割合を定義します。XMLの組み込み関数はこのオプションを無視します。有効な値: |
|
| 要素内の属性を除外するかどうか。 |
| なし | 解析中に破損したレコードを処理するためのMode 。 |
|
|
|
|
|
|
| なし | 属性を要素と区別するための、属性の接頭辞。これはフィールド名の接頭辞になります。デフォルトは |
|
| 属性または子要素を持つ要素内の文字データに使用されるタグ。ユーザーはスキーマ内で |
|
| 読み取り時には、指定されたエンコード方式でXMLファイルをデコードします。書き込み時に、保存されるXMLファイルのエンコーディング(文字セット)を指定します。XMLの組み込み関数はこのオプションを無視します。DataFrameWriter の XML オプションにも適用されます。 |
|
| 値の周囲の空白をスキップする必要があるかどうか。空白文字のみのデータは無視されます。 |
| なし | 各行のXMLを個別に検証するために使用される、オプションのXSDファイルへのパス。検証に失敗した行は、構文解析エラーとして扱われます。XSDは、提供されたスキーマであろうと推論されたスキーマであろうと、それ以外のスキーマには影響を与えません。 |
|
|
|
|
| datetime パターン形式に従うカスタムタイムスタンプ形式文字列。これは |
|
| タイムゾーンを含まないタイムスタンプのカスタムフォーマット文字列。datetime パターンフォーマットに従います。これはTimestampNTZType型に適用されます。DataFrameWriter の XML オプションにも適用されます。 |
|
| datetime パターン形式に従うカスタム日付形式文字列。これは日付型に適用されます。DataFrameWriter の XML オプションにも適用されます。 |
|
| IETF BCP 47形式の言語タグとしてロケールを設定します。例えば、 |
| string | null値の文字列表現を設定します。これが |
|
| rescuedDataColumn が有効になっている場合の大文字小文字の区別動作を指定します。もしそうであれば、スキーマから大文字小文字が異なる名前のデータ列を復元します。偽の場合、大文字小文字を区別せずにデータを読み込む。 |
| なし | データ型の不一致やスキーマの不一致(列の大文字小文字の区別を含む)により解析できないすべてのデータを、別の列に収集するかどうか。この列は、 Auto Loader使用するときに自動的に組み込まれます。 詳細については、 「救出されたデータ列とは何ですか?」を参照してください。 |
|
| 単一バリアント列の名前を指定します。このオプションが読み取り用に指定されている場合、XMLレコード全体を解析して、指定されたオプション文字列値を列名とする単一のVariant列を作成します。このオプションが書き込み用に提供されている場合、単一のVariant列の値をXMLファイルに書き込みます。DataFrameWriter の XML オプションにも適用されます。 |
|
| 従来のXMLパーサーを使用するかどうか。従来のパーサーは、不正なコンテンツに対する検証が緩い反面、メモリ効率が劣ります。より厳格なデフォルトパーサーを有効にするには、 |
|
| ワイルドカード( |
DataStreamReaderのオプション
これらのオプションをDataStreamReader.option()と組み合わせて使用すると、Delta Lakeテーブルやその他のファイルベースのソースからのストリーミング読み取りを設定できます。
ファイル形式のオプション(JSON、CSV、Parquetなど)については、 DataFrameReaderのオプションを参照してください。
Auto Loader ( cloudFiles.* )のオプションについては、 Auto Loader参照してください。
例
次の例では、 Delta Lakeテーブル ストリームのmaxFilesPerTriggerを10に設定します。
- Python
- Scala
df = spark.readStream.format("delta").option("maxFilesPerTrigger", 10).load("/path/to/delta-table")
val df = spark.readStream.format("delta").option("maxFilesPerTrigger", "10").load("/path/to/delta-table")
一般
以下のオプションは、Delta Lakeテーブルおよびその他のファイルベースのストリーミングソースに適用されます。
Key | デフォルト | 説明 |
|---|---|---|
|
| ストリーム処理後のソースファイルの取り扱い方法。有効な値: |
|
| 既に処理済みのファイルを、フルパスではなくファイル名のみで識別するかどうか。 |
|
| 各マイクロバッチ内で、最も最近変更されたファイルを最初に処理するかどうか。最新のデータをできるだけ迅速に処理したい場合に役立ちます。 |
| なし | マイクロバッチごとに処理されるデータ量のソフト最大値。最小入力単位が制限値を超えた場合、バッチ処理で制限値を超える処理が行われる可能性があります。 Auto Loaderの場合は、代わりに |
|
| 後続のマイクロバッチ処理のためにキャッシュする未処理ファイルの最大数。キャッシュを無効にするには、 |
|
| 処理対象となるファイルの最大経過時間は、現在のシステム時刻ではなく、最後に変更されたファイルのタイムスタンプを基準とします。このしきい値よりも古いファイルは無視されます。 |
|
| 各マイクロバッチで処理される新規ファイル数の上限。 Auto Loaderの場合は、代わりに |
| なし |
|
Auto Loader
これらのオプションをcloudFilesソースと組み合わせて使用すると、クラウドストレージからのストリーミング取り込み用にAuto Loaderを設定できます。cloudFilesソースに固有のオプションには、他の構造化ストリーミングソースオプションとは別の名前空間に保持するために、 cloudFilesという接頭辞が付けられます。
一般
Key | デフォルト | 説明 |
|---|---|---|
|
| 入力ディレクトリファイルの変更が既存のデータを上書きすることを許可するかどうか。 設定上の注意点については、 「ファイルが追加または上書きされた場合、Auto Loader はファイルを再度処理しますか?」を参照してください。 |
| なし | Auto Loader指定された間隔で非同期のバックフィルを実行できます。 例えば、
|
|
| 処理済みのファイルを入力ディレクトリから自動的に削除するかどうか。
ファイルは、
Databricks Runtime 16.4以降で利用可能です。 |
|
| 処理されたファイルがアーカイブ候補になるまでの待機時間( 値はCalendarInterval型の文字列です。例えば、 Databricks Runtime 16.4以降で利用可能です。 |
| なし |
移転先は以下の条件を満たす必要があります。
Auto Loaderには、このディレクトリへの書き込み権限が必要です。 Databricks Runtime 16.4以降で利用可能です。 |
| なし(必須オプション) | ソースパス内のデータファイル形式。有効な値は次のとおりです。 |
|
| ストリーム処理入力パスに既存のファイルを含めるか、初期セットアップ後に到着する新しいファイルのみを処理するかどうか。このオプションは、初めてストリームを開始するときにのみ評価されます。ストリームの再開後にこのオプションを変更しても効果はありません。 |
|
| スキーマ推論を利用する際に、正確な列型を推論するかどうか。安心により、 JSONやCSVデータセットを推論する際、列は文字列として推論されます。 詳細については、スキーマ推論を参照してください。 |
| なし | 各トリガーで処理される新規バイトの最大数。各マイクロバッチのデータ量を10GBに制限するには、 Databricks Runtime 18.0以降では、このオプションは動的に構成されるため、手動で設定する必要はありません。 |
| なし | ファイルイベントが重複排除の目的で追跡される期間。Databricks 、1 時間あたり数百万ファイル程度のデータを取り込んでいる場合を除き、この設定を調整することを推奨しません。 詳細については、 「ファイルイベント追跡」のセクションを参照してください。
|
|
| 各トリガーで処理される新しいファイルの最大数。 Databricks Runtime 18.0以降では、このオプションは動的に構成されるため、手動で設定する必要はありません。 |
| なし | ファイルのディレクトリ構造から推測したい、Hiveスタイルのパーティション列をカンマ区切りで指定します。Hive スタイルのパーティション列は、
|
|
| データ内に新しい列が発見された際に、スキーマを進化させるためのモード。安心により、 JSONデータセットを推論する際、列は文字列として推論されます。 詳細については、 「スキーマの進化」を参照してください。 |
| なし | スキーマ推論時にAuto Loaderに提供するスキーマ情報。 詳細については、スキーマヒントを参照してください。 |
| なし(スキーマ推論に必須) | 推論されたスキーマとその後の変更を保存する場所。詳細については、スキーマ推論を参照してください。 |
|
| Apache Sparkの他のファイル ソースの完全なグロビング動作と一致する厳密なglobberを使用するかどうか。 詳細については、 「一般的なデータ読み込みパターン」を参照してください。Databricks Runtime 12.2 LTS以降で利用可能です。 |
|
| Auto Loaderオプションを検証し、不明なオプションまたは一貫性のないオプションに対してエラーを返すかどうか。 |
ディレクトリ一覧
Key | デフォルト | 説明 |
|---|---|---|
|
| この機能は廃止されました。Databricksは、 ディレクトリ一覧表示モードで、完全な一覧表示ではなく、増分一覧表示を使用するかどうか。デフォルトでは、Auto Loader は指定されたディレクトリが増分リストの対象であるかどうかを自動的に検出するよう最大限の努力を払います。明示的に増分リストを使用するか、ディレクトリ全体のリストを使用するかは、それぞれ 辞書順に並んでいないディレクトリでインクリメンタルリストを誤って有効にすると、 Auto Loader新しいファイルを検出できなくなります。 Azureデータレイク ストレージ ( Databricks Runtime 9.1 LTS 以降で利用可能です。
使用可能な値: |
ファイル通知
必要なクラウド権限、セットアップ手順、認証方法など、ファイル通知モードの構成については、 「 ファイル通知モードでのAuto Loaderストリームの構成 」を参照してください。
Key | デフォルト | 説明 |
|---|---|---|
|
| キューイングサービスからメッセージを取得するときに使用するスレッドの数。
|
| なし | 複数のS3バケットからファイル通知を受け取る
|
| なし | 関連リソースの関連付けと識別に役立つ一連のキーと値のタグのペア。次に例を示します。
AWSの詳細については、 Amazon SQS コスト割り当てタグ」および「 Amazon SNS トピックのタグの設定」を参照してください。 (1) Azureに関する詳細情報については、 「キューとメタデータの名前付け」および「イベント サブスクリプション」の GCPに関する詳細情報については、 「ラベルを使用した使用状況の報告」を参照してください。 (1)
|
|
|
ファイルイベントは、ファイル検出において通知レベルのパフォーマンスを提供します。これは、Auto Loaderが前回の実行後に新しいファイルを検出できるためです。ディレクトリ一覧表示とは異なり、この処理ではディレクトリ内のすべてのファイルを一覧表示する必要はありません。 ファイルイベントオプションが有効になっている場合でも、Auto Loaderがディレクトリ一覧表示を使用する状況がいくつかあります。
ファイルイベントを使用するAuto Loader 、いつディレクトリ一覧を使用しますか?を参照してください。 このオプションを使用して Auto Loader がディレクトリ一覧を表示する状況の包括的なリストについては、こちらをご覧ください。 Databricks Runtime 14.3 LTS以降で利用可能です。 |
|
|
|
|
| 新しいファイルが存在することを通知するために、ファイル通知モードを使用するかどうか。
|
(1) Auto Loaderは、デフォルトではベストエフォートベースで次のキーと値のタグのペアを追加します。
vendor:Databrickspath: データが読み込まれる場所。ラベル付けの制限のため、GCPでは使用できません。checkpointLocation:ストリームのチェックポイントの位置。表示上の制限により、GCP(医薬品臨床試験の実施に関する基準)では利用できません。streamId: ストリームのグローバル一意識別子。
Databricksはこれらのキー名を予約しており、その値を上書きすることはできません。
クラウド固有の
Auto Loaderは、ファイル通知モード向けにクラウドインフラストラクチャを構成するためのオプションを提供します。必要なクラウド権限とセットアップ手順については、 「ファイル通知モードでのAuto Loaderストリームの構成」を参照してください。
AWS
cloudFiles.useNotifications = trueを選択し、Auto Loaderに通知サービスを設定させたい場合にのみ、以下のオプションを指定してください。
Key | デフォルト | 説明 |
|---|---|---|
| EC2インスタンスのリージョン | ソースS3バケットが存在し、 AWS SNS および SQS サービスを作成するリージョン。 |
Key | デフォルト | 説明 |
|---|---|---|
|
| SNSトピックと同じアカウント内のAWS S3バケットからのイベント通知のみを許可する。この設定が有効な場合、Auto Loader は SNS トピックと同じアカウント内の AWS S3 バケットからのイベント通知のみを受け入れます。
Databricks Runtime 17.2以降で利用可能です。 |
cloudFiles.useNotifications = trueを選択し、すでに設定したキューをAuto Loaderで使用する場合にのみ、次のオプションを指定します。
Key | デフォルト | 説明 |
|---|---|---|
| なし | SQSキューのURL。提供された場合、Auto Loaderは独自のAWS SNSとSQSサービスをセットアップする代わりに、このキューから直接イベントを消費します。 |
AWS認証オプション
Databricks資格情報を使用するには、次の認証オプションを指定します。
Key | デフォルト | 説明 |
|---|---|---|
| なし | Databricksサービスの認証情報の名前。Databricks Runtime 16.1以降で利用可能です。 |
Databricksのサービス認証情報またはIAMロールが利用できない場合は、代わりに以下の認証オプションを指定できます。
Key | デフォルト | 説明 |
|---|---|---|
| なし | ユーザーのAWSアクセスキーID。 |
| なし | ユーザーのAWSシークレットアクセスキー。 |
| なし | ARNIAM必要に応じて引き受ける ロールの 。このロールは、クラスターのインスタンスプロファイルから、または |
| なし |
|
| なし |
|
| なし |
|
Azure
cloudFiles.useNotifications = trueを指定し、Auto Loaderに通知サービスを設定させる場合は、次のすべてのオプションに値を指定する必要があります。
Key | デフォルト | 説明 |
|---|---|---|
| なし | ストレージ アカウントが作成されるAzureリソース グループ。 |
| なし | リソースグループが作成されたAzureサブスクリプションID。 |
| なし | Databricksサービスの認証情報の名前。Databricks Runtime 16.1以降で利用可能です。 |
Databricksサービスの認証情報が利用できない場合は、代わりに以下の認証オプションを指定できます。
Key | デフォルト | 説明 |
|---|---|---|
| なし | Databricksサービスプリンシパルのクライアント ID またはアプリケーション ID。 |
| なし | Databricksサービスプリンシパルのクライアント シークレット。 |
| なし | アカウントアクセスキーあるいは共有アクセス署名(SAS)に基づく、ストレージアカウントの接続文字列。 |
| なし | Databricksサービスプリンシパルが作成されるAzureテナントID。 |
cloudFiles.useNotifications = trueに設定し、Auto Loader に既存のキューを使用させたい場合にのみ、以下のオプションを指定してください。
Key | デフォルト | 説明 |
|---|---|---|
| なし | Azureキューの名前。提供されている場合、クラウド ファイル ソースは、独自のAzure Event Grid およびキュー ストレージ サービスを設定する代わりに、このキューからイベントを直接消費します。 その場合、 |
GCP
Auto Loader Databricksサービスの資格情報を利用して、通知サービスを自動的にセットアップできます。 Databricks サービス認証情報を使用して作成されたサービス アカウントには、ファイル通知モードでの Auto Loader ストリームの構成で指定された権限が必要です。
Key | デフォルト | 説明 |
|---|---|---|
| なし | GCSバケットが存在するプロジェクトのID。Google クラウド Pub/Sub サブスクリプションもこのプロジェクト内で作成されます。 |
| なし | Databricksサービスの認証情報の名前。Databricks Runtime 16.1以降で利用可能です。 |
Databricks サービスの認証情報を利用できない場合は、Google サービス アカウントを直接使用できます。Google サービスのセットアップに従って、クラスターをサービス アカウントとして構成するか、次の認証オプションを直接提供することができます。
Key | デフォルト | 説明 |
|---|---|---|
| なし | GoogleサービスアカウントのクライアントID。 |
| なし | Googleサービスアカウントのメールアドレス。 |
| なし | Google サービス アカウント用に生成される秘密キー。 |
| なし | Google サービス アカウント用に生成された秘密キーの ID。 |
cloudFiles.useNotifications = trueを選択し、すでに設定したキューをAuto Loaderで使用する場合にのみ、次のオプションを指定します。
Key | デフォルト | 説明 |
|---|---|---|
| なし | Google Cloud Pub/Subサブスクリプションの名前。指定されている場合、クラウドファイルソースは独自のGCS通知サービスとGoogle Cloud Pub/Subサービスを設定する代わりに、このキューからのイベントを消費します。 |
Delta Lake
spark.readStreamを使用して Delta Lake テーブルから読み込む場合、以下のオプションが適用されます。
Key | デフォルト | 説明 |
|---|---|---|
| なし | Deltaテーブルのバージョン番号または |
| なし | Deltaテーブルのバージョン番号または |
| なし | Deltaテーブルのバージョン番号または |
| なし | 正規表現パターン。パスがパターンに一致するファイルは、ストリーミング読み取りから除外されます。想定される命名規則に準拠していないファイルを除外するのに役立ちます。 |
|
| ログ保持( |
|
| Databricks Runtime 11.3 LTS以前のバージョンで利用可能です。 |
|
| パーティション境界でデータを削除するトランザクション(パーティション全体の削除のみ)は無視します。パーティション以外の削除、更新、その他の変更には対応していません。代わりに |
|
| ストリーミング クエリの変更データフィードの読み取りを有効にするかどうか。 有効にすると、ストリームは行レベルの変更(挿入、更新、削除)に加えて、追加のメタデータ列を出力します。DatabricksでのDelta Lake変更データフィードの使用」を参照してください。 |
| なし | Delta Lakeがストリーミング読み取りのためのスキーマ変更を追跡するディレクトリへのパス。列マッピングが有効になっているテーブルからストリーミングし、スキーマ進化を処理するために |
|
| 既存のレコードを削除または変更するトランザクションは無視し、追加のみを処理します。Databricks 、変更データフィードを使用しないほとんどのワークロードにこのオプションを推奨します。 Databricks Runtime 12.2 LTS以降で利用可能です。 |
| 最新の情報 | 読み取りを開始するタイムスタンプ。このストリームは、指定されたタイムスタンプ以降にコミットされたすべてのテーブル変更を読み取ります。タイムスタンプがすべての利用可能なテーブル コミットよりも前にある場合、ストリームは利用可能な最も早いコミットから開始されます。
|
| 最新の情報 | Deltaテーブルのバージョンを読み取り開始します。 このストリームは、指定されたバージョン以降にコミットされたすべての変更を読み取ります。最新の変更のみから開始するには、 |
|
| 初期テーブルスナップショットをイベント時間バケットに分割することで、レコードが誤って遅延イベントとしてマークされ、ウォーターマーク付きのステートフルクエリで削除されるのを防ぎます。初期スナップショット処理が開始された後は、チェックポイントを削除しない限り変更できません。Databricks Runtime 11.3 LTS以降で利用可能です。データを削除せずに初期スナップショットを処理する方法については、「データを削除せずに処理する」を参照してください。 |
DataFrameWriterのオプション
これらのオプションをDataFrameWriter.option()とDataFrameWriterV2.option()と組み合わせて使用すると、Databricksがデータを書き込む方法を制御できます。
例
次の例では、Delta Lake テーブルを書き込む際にmergeSchemaをTrueに設定します。
- Python
- Scala
df.write.format("delta").option("mergeSchema", True).saveAsTable("my_table")
df.write.format("delta").option("mergeSchema", "true").saveAsTable("my_table")
Avro
Key | デフォルト | 説明 |
|---|---|---|
| なし | JSON文字列としての完全なAvroスキーマ。 このオプションを使用して、Spark SQLの型を特定のAvro型に変換します。Avroファイルに適用されます。 |
| なし | Avroスキーマファイルを指すURL。スキーマが外部に保存されている場合は、 |
|
| 書き込み時に使用する圧縮コーデック。有効な値: |
|
| 出力されるAvroスキーマにおける最上位レコード名。Avroファイルに適用されます。 |
|
| SparkスキーマとAvroスキーマ間の列を、名前ではなくフィールド位置で照合するかどうか。Avroファイルに適用されます。 |
| 空の文字列 | 出力Avroスキーマにおける最上位レコードの名前空間。Avroファイルに適用されます。 |
Delta LakeとApache Iceberg
Key | デフォルト | 説明 |
|---|---|---|
|
| クエリパターンに基づいてDatabricksがクラスタリング列を選択する自動リキッドクラスタリングを有効にするかどうか。 |
| なし | 書き込み操作においてスキーマ進化を有効にするかどうか。生成されたDataFrameの新しい列が、ターゲットテーブルのスキーマに追加されます。 バッチ追記とストリーミング追記の両方に適用されます。テーブルスキーマの更新に適用されます。 |
| なし | 上書き時にテーブルスキーマとパーティショニングを置き換えるかどうか。 |
| なし | パーティションの上書きモード。新しいデータを含むパーティションのみを上書きし、その他のパーティションは変更しないよう、この値を |
| なし | 対象テーブルの行を、パッケージクエリからの行で置き換えるブール式。 対象テーブルとソースクエリの両方の列を参照できます。ターゲット内の行のうち、ソースの行と一致する行は削除され、置き換えられます。ソースが空の場合、削除は行われません。列参照の曖昧さを解消するには、 |
| なし | 対象テーブルとソースクエリ間の行を照合するために使用される、カンマ区切りの列名のリスト。ターゲットとソースの両方に、リストされているすべての列が含まれている必要があります。対象レコード内の行のうち、ソースレコードの行と等価比較で一致する行は削除され、置き換えられます。 |
| なし | 述語式。述語に一致するレコードのみをアトミックに上書きします。Delta Lake を使用してデータを選択的に上書きする場合に適用されます。 |
| なし | 対象テーブルの文字列エイリアス。条件が対象テーブルとソースクエリの両方の列を参照する場合、 |
| なし |
|
| なし |
|
| なし | この書き込み操作で自動最適化書き込みを有効にするかどうか。 |
| なし | 書き込み操作のコミットメタデータに追加される、ユーザー定義の文字列。 |
CSV
Key | デフォルト | 説明 |
|---|---|---|
|
| エスケープ文字が引用文字と異なる場合に、エスケープ文字をエスケープするために使用される文字。csv (DataFrameWriter)に適用されます。 |
|
| 書き込み時に使用する圧縮コーデック。有効な値: |
|
| 日付列の値の書式指定文字列。csv (DataFrameWriter)に適用されます。 |
| 空の文字列 | 空の値(null以外の値)に対して書き込まれる文字列。csv (DataFrameWriter)に適用されます。 |
|
| 出力ファイルの文字エンコーディング。csv (DataFrameWriter)に適用されます。 |
|
| 引用符で囲まれた値をエスケープするために使用される文字。csv (DataFrameWriter)に適用されます。 |
|
| 引用符で囲まれたフィールド値内の引用符文字をエスケープするかどうか。csv (DataFrameWriter)に適用されます。 |
|
| 出力の最初の行に列名を表示するかどうか。csv (DataFrameWriter)に適用されます。 |
|
| 値を書き込む際に、先頭の空白文字を削除するかどうか。csv (DataFrameWriter)に適用されます。 |
|
| 値を書き込む際に、末尾の空白文字を削除するかどうか。csv (DataFrameWriter)に適用されます。 |
|
| レコード間で使用される行区切り文字列。csv (DataFrameWriter)に適用されます。 |
|
|
|
| 空の文字列 | null値に対して書き込まれた文字列。csv (DataFrameWriter)に適用されます。 |
|
| 区切り文字を含むフィールド値を引用するために使用される文字。csv (DataFrameWriter)に適用されます。 |
|
| 内容に関わらず、すべてのフィールド値を引用符で囲むかどうか。csv (DataFrameWriter)に適用されます。 |
|
| フィールド区切り文字。csv (DataFrameWriter)に適用されます。 |
|
| タイムスタンプ列の値の書式指定文字列。csv (DataFrameWriter)に適用されます。 |
|
| タイムゾーン( |
Excel
Key | デフォルト | 説明 |
|---|---|---|
| なし | 書き込みを開始するシート名または開始セル。省略した場合、 |
|
|
|
|
| 列名を最初の行に表示するかどうか。有効な値: |
|
|
|
|
| 書き込むExcelファイルの形式バージョン。有効な値: |
JSON
Key | デフォルト | 説明 |
|---|---|---|
|
| 書き込み時に使用する圧縮コーデック。有効な値: |
|
| 日付列の値の書式指定文字列。JSON (DataFrameWriter)に適用されます。 |
|
| 出力ファイルの文字エンコーディング。JSON (DataFrameWriter)に適用されます。 |
| 値 | JSON出力からnull値を持つフィールドを除外するかどうか。JSON (DataFrameWriter)に適用されます。 |
|
| レコード間で使用される行区切り文字列。JSON (DataFrameWriter)に適用されます。 |
|
|
|
|
| 整形された(インデント付き、複数行)JSON出力を有効にするかどうか。 |
|
| 出力時にJSONオブジェクトのキーをアルファベット順に並べ替えるかどうか。決定論的な出力を生成するのに役立ちます。 |
|
| タイムスタンプ列の値の書式指定文字列。JSON (DataFrameWriter)に適用されます。 |
|
| タイムゾーン( |
|
| 出力時に、非ASCII文字をリテラルUTF-8文字ではなく、Unicodeエスケープシーケンス |
ORC
Key | デフォルト | 説明 |
|---|---|---|
|
| 書き込み時に使用する圧縮コーデック。有効な値: |
Parquet
Key | デフォルト | 説明 |
|---|---|---|
|
| 書き込み時に使用する圧縮コーデック。有効な値: |
|
| タイムスタンプ列をエンコードするために使用される物理型。有効な値: |
文章
Key | デフォルト | 説明 |
|---|---|---|
|
| 書き込み時に使用する圧縮コーデック。有効な値: |
|
| 出力ファイルの文字エンコーディング。 |
|
| レコード間で使用される行区切り文字列。テキスト(DataFrameWriter)に適用されます。 |
XML
Key | デフォルト | 説明 |
|---|---|---|
|
| 明示的な名前を持たない配列要素の要素名。XML (DataFrameWriter)に適用されます。 |
|
| XML属性に対応するフィールド名の前に付加される接頭辞。XML (DataFrameWriter)に適用されます。 |
|
| 書き込み時に使用する圧縮コーデック。有効な値: |
|
| 日付列の値の書式指定文字列。XML (DataFrameWriter)に適用されます。 |
|
| 各出力ファイルの先頭に記述されるXML宣言文字列。宣言を抑制するには、空の文字列を設定します。XML (DataFrameWriter)に適用されます。 |
|
| 出力ファイルの文字エンコーディング。XML (DataFrameWriter)に適用されます。 |
| 4つのスペース | 出力において子要素をインデントするために使用される文字列。インデントをオフにして各行を1行に表示するには、空の文字列に設定します。 |
|
|
|
|
| null値に対して書き込まれる文字列。 |
|
| 出力内のすべての行要素を囲むルート要素タグ。XML (DataFrameWriter)に適用されます。 |
|
| 出力における行を表す要素タグ。XML (DataFrameWriter)に適用されます。 |
| なし | XMLファイルに書き込む単一のバリアント列の名前。XML (DataFrameWriter)に適用されます。 |
|
| タイムスタンプ列の値の書式指定文字列。XML (DataFrameWriter)に適用されます。 |
|
| タイムゾーン列の値を含まないタイムスタンプのフォーマット文字列。XML (DataFrameWriter)に適用されます。 |
|
| 列名が有効な XML 要素識別子でない場合に例外をスローするかどうか。XML (DataFrameWriter)に適用されます。 |
|
| 属性または子要素を持つXML要素内の文字データに使用されるフィールド名。XML (DataFrameWriter)に適用されます。 |
DataStreamWriterのオプション
ストリーミング書き込みを設定するには、これらのオプションをDataStreamWriter.option()と組み合わせて使用します。
例
次の例は、ストリームのチェックポイントの位置を設定するものです。
- Python
- Scala
(df.writeStream
.format("delta")
.option("checkpointLocation", "/path/to/checkpoint")
.start("/path/to/table"))
df.writeStream
.format("delta")
.option("checkpointLocation", "/path/to/checkpoint")
.start("/path/to/table")
一般
Key | デフォルト | 説明 |
|---|---|---|
| なし(必須) | ストリーミングクエリのチェックポイントディレクトリへのパス。耐障害性と、厳密に1回のみの処理を保証するために必要です。各ストリーミングクエリは、固有のチェックポイント位置を使用する必要があります。Databricks 、チェックポイントをUnity Catalogボリュームまたはクラウド ストレージ パスに保存することをお勧めします。 構造化ストリーミングのチェックポイントを参照してください。 |
| なし | Parquetなどのファイルベースのストリーミングシンクの出力パス。ファイルベースのフォーマットにのみ適用されます。 |
コンソールシンク
Key | デフォルト | 説明 |
|---|---|---|
|
| コンソールシンクに書き込む際に、マイクロバッチごとに表示する行数。 |
|
| 行を表示する際に、長い文字列を切り詰めるかどうか。文字列値全体を表示するには、 |
Delta Lake
format("delta")を使用してストリームを Delta Lake テーブルに書き込む場合、以下のオプションが適用されます。overwriteSchema 、 replaceWhere 、 partitionOverwriteModeなどの上書き専用オプションは、ストリーミング書き込みではサポートされていません。
Key | デフォルト | 説明 |
|---|---|---|
|
| ストリーミングDataFrameに新しい列が含まれている場合に、 Delta Lakeテーブル スキーマを進化させるかどうか。 追記出力モードにのみ適用されます。テーブルスキーマの更新に適用されます。 |
| なし | 書き込み操作のコミットメタデータに追加される、ユーザー定義の文字列。 |
ファイルシンク
以下のオプションは、ストリームをファイルベースの形式(Parquet、JSON、CSV、ORC、テキスト)に書き込む場合に適用されます。フォーマット固有のオプションについては、 DataFrameWriter オプションを参照してください。
Key | デフォルト | 説明 |
|---|---|---|
| なし | 耐障害性および圧縮に使用されるシンクメタデータファイルをどのくらいの期間保持するか。 |
Kafkaシンク
Kafka にストリームを書き込むためのオプションの完全なリストについては、 「オプション」を参照してください。
Key | デフォルト | 説明 |
|---|---|---|
| なし | 必須。カンマ区切りの Kafka ブローカー |
| なし | すべての行の対象となるKafkaトピック。DataFrameに |
| なし |
|
メモリシンク
Key | デフォルト | 説明 |
|---|---|---|
| なし(必須) | クエリが書き込みを行うインメモリテーブルの名前。メモリシンクに必要です。 |
|
| メモリシンクの配送保証。 |
Spark関数オプション
Spark SQLの一部の組み込み関数は、解析またはシリアル化の動作を制御するoptionsマップを受け入れます。オプションをPython dictまたはScala Map[String, String]として渡します。
例
次の例は、JSON列を解析しながら、不正な形式のレコードを削除します。
- Python
- Scala
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StructField, StringType
schema = StructType([StructField("name", StringType())])
df = df.withColumn("parsed", from_json("json_col", schema, {"mode": "DROPMALFORMED"}))
import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.types._
val schema = StructType(Seq(StructField("name", StringType)))
val df = df.withColumn("parsed", from_json(col("json_col"), schema, Map("mode" -> "DROPMALFORMED")))
Avro
Avro関数は、対応するDataFrameオプションと同じオプションを受け入れます。
from_avroそしてschema_of_avroDataFrameReader Avroオプションを使用します。to_avroDataFrameWriter Avro オプションを使用します。
例
次の例は、スキーマ進化が有効になっているAvroカラムをデコードするものです。
- Python
- Scala
from pyspark.sql.functions import from_avro
df = df.withColumn("decoded", from_avro("avro_col", json_schema, {"avroSchemaEvolutionMode": "restart"}))
import org.apache.spark.sql.avro.functions.from_avro
val df = df.withColumn("decoded", from_avro(col("avro_col"), jsonSchema, Map("avroSchemaEvolutionMode" -> "restart")))
さらに、スキーマレジストリのバリアントfrom_avroとto_avroは、以下のオプションを受け入れます。
Key | デフォルト | 説明 |
|---|---|---|
| なし |
|
| なし | Confluent Schema Registryクライアント構成プロパティ。Confluent SRクライアントのプロパティを渡す際は、このプレフィックスを使用します。例えば、基本認証情報の場合は |
CSV
CSV関数は、対応するDataFrameのオプションと同じオプションを受け入れます。
from_csvschema_of_csvはDataFrameReader の CSV オプションを使用します。to_csvDataFrameWriterのCSVオプションを使用します。
例
以下の例は、カスタム区切り文字とNULL値を含むCSVファイルを読み込む例です。
- Python
- Scala
from pyspark.sql.functions import from_csv
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
schema = StructType([StructField("id", IntegerType()), StructField("name", StringType())])
df = df.withColumn("parsed", from_csv("csv_col", schema, {"sep": "|", "nullValue": "N/A"}))
import org.apache.spark.sql.functions.from_csv
import org.apache.spark.sql.types._
val schema = StructType(Seq(StructField("id", IntegerType), StructField("name", StringType)))
val df = df.withColumn("parsed", from_csv(col("csv_col"), schema, Map("sep" -> "|", "nullValue" -> "N/A")))
JSON
JSON関数は、対応するDataFrameオプションと同じオプションを受け入れます。
from_jsonそしてschema_of_jsonDataFrameReaderのJSONオプションを使用します。to_jsonDataFrameWriterのJSONオプションを使用します。
例
次の例では、 NULLのフィールドを無視し、整形フォーマットを有効にしたJSONを書き込みます。
- Python
- Scala
from pyspark.sql.functions import to_json
df = df.withColumn("json_str", to_json("struct_col", {"pretty": "true", "ignoreNullFields": "true"}))
import org.apache.spark.sql.functions.to_json
val df = df.withColumn("json_str", to_json(col("struct_col"), Map("pretty" -> "true", "ignoreNullFields" -> "true")))
Protobuf
from_protobuf また、 to_protobufファイルベースのデータソースを使用しません。これらの関数を使用すると、Protobufデータは常にバイナリ列として読み書きされます。オプションはMap[String, String]として渡され、大文字と小文字が区別されます。
例
次の例は、PERMISSIVEモードを使用してProtobufカラムをデコードする例です。
- Python
- Scala
from pyspark.sql.functions import from_protobuf
df = df.withColumn("decoded", from_protobuf("proto_col", "MyMessage", "/path/to/descriptor.desc",
{"mode": "PERMISSIVE", "enums.as.ints": "true"}))
import org.apache.spark.sql.protobuf.functions.from_protobuf
val df = df.withColumn("decoded", from_protobuf(col("proto_col"), "MyMessage", "/path/to/descriptor.desc",
Map("mode" -> "PERMISSIVE", "enums.as.ints" -> "true")))
Protobuf関数は以下のオプションを使用します。
Key | デフォルト | 説明 |
|---|---|---|
|
| 破損した記録の対処方法。 |
|
| 再帰的なProtobufフィールドの最大再帰深度。再帰フィールドのサポートを無効にするには、 |
|
| Protobuf |
|
| フィールドをゼロまたはデフォルト値で出力するかどうか(proto3セマンティクス)。 |
|
| 列挙型フィールドを文字列ではなく整数値としてレンダリングするかどうか。 |
|
| 整数オーバーフローを防ぐために、 |
|
|
|
|
| 出力スキーマに空のProtobufメッセージ型を保持するかどうか(ダミー列を挿入して)。 |
| なし | スキーマレジストリのサブジェクト名。スキーマレジストリのバリアント |
| なし | スキーマレジストリのアドレス(ホスト名とポート番号)。スキーマレジストリのバリアント |
| なし | スキーマレジストリのサブジェクトに複数のメッセージが含まれている場合に、どのProtobufメッセージを使用するかを指定します。任意。 |
XML
XML関数は、対応するDataFrameオプションと同じオプションを受け入れます。
from_xmlそしてschema_of_xmlDataFrameReaderのXMLオプションを使用します。to_xmlDataFrameWriterのXMLオプションを使用します。
例
次の例では、カスタムのルートタグと行タグを使用してXMLを書き込みます。
- Python
- Scala
from pyspark.sql.functions import to_xml
df = df.withColumn("xml_str", to_xml("struct_col", {"rootTag": "records", "rowTag": "record"}))
import org.apache.spark.sql.functions.to_xml
val df = df.withColumn("xml_str", to_xml(col("struct_col"), Map("rootTag" -> "records", "rowTag" -> "record")))