Spark APIオプションのリファレンス
このページでは、データの読み書きを行うSpark APIsで利用可能な入力および出力オプションを一覧表示します。
DataFrameReaderのオプション
これらのオプションをDataFrameReader.option() 、 DataFrameReader.options() 、 read_files 、 COPY INTO 、およびAuto Loaderと組み合わせて使用すると、Databricksがデータファイルを読み込む方法を制御できます。
例
次の例では、JSON ファイルの読み込み時にmultiLineをTrueに設定します。
- Python
- Scala
- SQL
df = spark.read.format("json").option("multiLine", True).load("/path/to/data")
val df = spark.read.format("json").option("multiLine", "true").load("/path/to/data")
SELECT * FROM read_files("/path/to/data", format => "json", multiLine => true)
一般
次のオプションは、すべてのファイル形式に適用されます。
Key | デフォルト | 有効な値 | 説明 |
|---|---|---|---|
|
|
| 破損したファイルを無視するかどうか。もしそうであれば、 Spark破損したファイルに遭遇しても実行を続け、読み取られた内容は引き続き返されます。 |
|
|
| 欠落したファイルを無視するかどうか。true の場合、ファイルが見つからない場合でもSparkジョブは実行を続行し、コンテンツは引き続き返されます。 Databricks Runtime 11.3 LTS以降で利用可能です。 |
| なし | タイムスタンプ文字列 | 指定されたタイムスタンプより後の変更タイムスタンプを持つファイルのみを、フィルターとして取り込むためのオプションのタイムスタンプ。 |
| なし | タイムスタンプ文字列 | 指定されたタイムスタンプより前の変更タイムスタンプを持つファイルのみをフィルタリングして取り込むためのオプションのタイムスタンプ。 |
| なし | glob パターン文字列 | ファイルを選択するための潜在的なglobパターンです。 |
|
|
|
|
Avro
以下のオプションは、Avroファイルを読み取る際に適用されます。
Key | デフォルト | 有効な値 | 説明 |
|---|---|---|---|
| なし | Avroスキーマの文字列 | ユーザーがAvroフォーマットで指定するオプションのスキーマAvroを読み取る際、このオプションは、互換性はあるが実際のAvroスキーマとは異なる、進化したスキーマに設定することができます。逆シリアル化スキーマは、進化したスキーマと一致します。たとえば、デフォルト値を持つ1つの追加列を含む進化したスキーマを設定すると、読み取り結果には新しい列も含まれます。 |
|
|
| スキーマレジストリ利用時のスキーマ進化の対処方法 |
|
|
| ユリウス暦と先発グレゴリオ暦の間でのDATE値とTIMESTAMP値のリベースを制御します。 |
|
|
| Avro Union型に安定したフィールド名を使用するかどうか。有効にすると、共用型フィールド名は、その型名を小文字にしたものから派生します(例: |
|
|
| 複数のファイルにまたがるスキーマを推測し、各ファイルのスキーマをマージするかどうか。 Avroの |
|
|
| 不正な形式のレコードの処理に関するパーサーモード。 |
|
|
|
|
| なし |
| 再帰的なAvroフィールドの最大深度すべての再帰フィールドを切り捨てるには |
| なし | 列名文字列 | データ型の不一致、スキーマの不一致(列の大文字小文字の区別を含む)などが原因で解析できないすべてのデータを、別の列に収集するかどうか。この列は、 Auto Loader使用するときに自動的に組み込まれます。
詳細については、 「救出されたデータ列とは何ですか?」を参照してください。 |
|
| 任意の文字列 |
|
CSV
CSVファイルを読み取る際に、次のオプションが適用されます。
Key | デフォルト | 有効な値 | 説明 |
|---|---|---|---|
| なし | パス文字列 | 不正なCSVレコードに関する情報を記録するファイルを保存するパス。 |
|
| 1文字 | 引用符のエスケープに使用される文字をエスケープするために使用される文字。たとえば、次のレコードの場合:
|
|
| 列名文字列 | Auto Loaderに対応しています。 |
|
| 1文字 | テキスト行の先頭にある場合に、行コメントを表す文字を定義します。コメントのスキップを無効にするには、 |
|
| 日付形式文字列 | 日付文字列を解析するための形式。 |
| 空の文字列 | 任意の文字列 | 空の値の文字列形式。 |
|
|
| 指定された形式で値を解析できない場合に、従来の日付およびタイムスタンプの解析動作に戻すかどうか。 |
|
|
| CSVファイルのエンコード名。オプションのリストについては、 |
|
|
| 指定したスキーマまたは推論されたスキーマを CSV ファイルに強制的に適用するかどうか。 このオプションを有効にすると、CSV ファイルのヘッダーは無視されます。 このオプションは、 Auto Loader を使用してデータをレスキューし、スキーマ進化を許可する場合、デフォルトで無視されます。 |
|
| 1文字 | データの解析時に使用するエスケープ文字。 |
|
| ファイル拡張子文字列 | リードの想定されるファイル拡張子です。この拡張子がないファイルは除外されます。 |
|
|
| CSVレコードにスキーマに存在しない列が含まれている場合に、エラーとして処理するかどうか。 |
|
|
| フィールド値が、拡張せずに宣言されたスキーマ型として解析できない場合に、エラーとするかどうか。 |
|
|
| CSVファイルにヘッダーが含まれているかどうか。Auto Loaderは、スキーマを推論するときにファイルにヘッダーがあると想定します。 |
|
|
| 解析された各値の先頭の空白を無視するかどうか。 |
|
|
| 解析された各値の末尾の空白を無視するかどうか。 |
|
|
| 解析されたCSVレコードのデータ型を推測するか、すべての列が |
|
| 正の整数 | CSVパーサーのバッファサイズ(バイト単位)。大規模なCSVファイルを解析する際のメモリ使用量の調整に役立ちます。 |
| なし、これは | 文字列 | 連続する2つのCSVレコード間の文字列。 |
|
|
| Javaロケール識別子。CSV内でのデフォルトの日付、タイムスタンプ、および小数点の解析に影響を与えます。 |
|
| 正の整数、または( | 解析する値から予想される最大文字数。メモリエラーを回避するために使用できます。デフォルトは |
|
| 正の整数 | レコードに含めることができる列数のハードリミット。 |
|
|
| 複数のファイルにまたがるスキーマを推測し、各ファイルのスキーマをマージするかどうか。 スキーマを推論するときに Auto Loader するためにデフォルトで有効になります。 |
|
|
| 不正な形式のレコードの処理に関するパーサーモード。 |
|
|
| CSVレコードが複数行にまたがるかどうか。 |
|
| 任意の文字列 |
|
|
| 任意の文字列 |
|
| 空の文字列 | 任意の文字列 | null値の文字列形式。 |
|
|
| ファイルの読み取り中に、ヘッダーで宣言された列をスキーマの大文字と小文字を区別して配置するかどうか。Auto Loaderのデフォルトでは、これは |
|
| 任意の文字列 |
|
|
|
| 可能な場合は、文字列をタイムスタンプではなく日付として推測しようとします。また |
|
| 1文字 | フィールド区切り文字が値の一部である場合に、値をエスケープするために使用される文字。 |
|
|
|
|
| なし | 列名文字列 | データ型の不一致、スキーマの不一致(列の大文字小文字の区別を含む)などが原因で解析できないすべてのデータを、別の列に収集するかどうか。この列は、 Auto Loader使用するときに自動的に組み込まれます。 詳細については、 「救出されたデータ列とは何ですか?」を参照してください。
|
|
| 文字列 | 列間の区切り記号文字列。 |
| なし | 列名文字列 | 列名を指定すると、各フィールドを個別の列に解析するのではなく、CSVレコード全体をその名前の単一の |
|
| 正の整数または | CSVファイルの先頭から無視すべき行数(コメント行や空行を含む)。 |
|
| 時刻形式文字列 |
|
|
| タイムスタンプ形式の文字列 | タイムスタンプ文字列を解析するための形式。 |
|
| タイムスタンプ形式の文字列 | タイムゾーン( |
| なし |
| タイムスタンプと日付を解析するときに使用する |
|
|
| エスケープされていない引用符を処理するための戦略。各許可オプションの動作は次のとおりです:
|
Excel
次のオプションは、Excelファイルを読み込む場合に適用されます。
Key | デフォルト | 有効な値 | 説明 |
|---|---|---|---|
| なし | セル範囲またはシート名の文字列 | Excel構文で読み込むセル範囲。省略した場合、最初のシートから有効なセルをすべて読み取ります。指定したシートから範囲を読み取るには |
|
|
| 列名ヘッダーとして使用する先頭行数 |
|
|
|
|
|
|
| XLSXファイルを読み込む際に、セル文字列値にピンインやふりがななどの発音表記を連結して含めるかどうか。 |
|
|
| Excelワークブック上で実行する操作。 |
|
| タイムスタンプ形式の文字列 | Excelに文字列として保存される、タイムゾーンなしのタイムスタンプ値のカスタム形式文字列。 カスタム日付形式は、Datetime パターンの形式に従います。 |
|
| 日付形式文字列 | 文字列値のカスタムフォーマット文字列は |
JSON
次のオプションは、JSONファイルを読み取る際に適用されます。
Key | デフォルト | 有効な値 | 説明 |
|---|---|---|---|
|
|
| バックスラッシュの後に続く文字のエスケープを許可するかどうか。有効にしない場合、JSON仕様で明示的にリストされている文字のみをエスケープできます。 |
|
|
| 構文解析されたコンテンツ内で、Java、C、C++スタイルのコメント( |
|
|
| 数値でない( |
|
|
| 整数が追加の(無視できる)ゼロ(例えば、 |
|
|
| 文字列(名前と文字列値)の引用符付けに一重引用符(アポストロフィ、文字 |
|
|
| JSON文字列に、エスケープされていない制御文字(タブ文字や改行文字を含む、値が32未満のASCII文字)を含めることを許可するかどうか。 |
|
|
| JavaScriptでは許可されているが、JSON仕様では許可されていない、引用符なしのフィールド名の使用を許可するかどうか。 |
| なし |
| バリアント値に使用されるエンコードは、 JSONの形式で記述されます。 インラインJSONとして保存される代わりにBase85エンコードされたVariant値をデコードするには、 |
| なし | パス文字列 | 不正なJSONレコードに関する情報を記録するためのファイルを保存するパス。 ファイルベースのデータソースで
|
|
| 列名文字列 | 形式が正しくなく、解析できないレコードを格納するための列。解析用の |
|
| 日付形式文字列 | 日付文字列を解析するための形式。 |
|
|
| スキーマ推論中に、すべてのNULL値の列、または空の配列および構造体を無視するかどうか。 |
|
|
| JSONファイルのエンコーディングの名前。オプションのリストについては、 |
|
|
| タイムスタンプ文字列を |
| なし、これは | 文字列 | 連続する2つのJSONレコード間の文字列。 |
|
|
| Javaロケール識別子は、JSON内のデフォルトの日付、タイムスタンプ、小数の解析に影響します。 |
|
| 正の整数 | JSONオブジェクトと配列の最大ネスト深度です。深く入れ子になったドキュメントに対して、この値を増やしてください。 |
|
| 正の整数 | JSON入力における数値トークンの最大長。大きい数値リテラルを持つJSONの場合、この値を増やしてください。 |
| 無制限 | 正の整数 | JSON入力の文字列の最大長です。長い文字列を含むJSONの解析時に、メモリ使用量を制限するように設定します。 |
|
|
| 不正な形式のレコードの処理に関するパーサーモード。 |
|
|
| JSONレコードが複数行にまたがるかどうか。 |
|
|
| 可能な場合は、float型またはdouble型ではなく、文字列を |
|
|
| 数値やブール値などのプリミティブ型を |
|
|
|
|
| なし | 列名文字列 | データ型の不一致またはスキーマの不一致(列の大文字小文字の区別を含む)により解析できないすべてのデータを、別の列に収集するかどうか。この列は、 Auto Loader使用するときに自動的に組み込まれます。 詳細については、 「救出されたデータ列とは何ですか?」を参照してください。
|
| なし | 列名文字列 | JSONドキュメント全体を指定された文字列を列名とする単一のバリアント列に解析して取り込むかどうか。設定されていない場合、JSONフィールドは個別の列として取り込まれます。 |
|
| タイムスタンプ形式の文字列 | タイムスタンプ文字列を解析するための形式。 |
|
| タイムスタンプ形式の文字列 | タイムゾーン( |
| なし |
| タイムスタンプと日付を解析するときに使用する |
|
|
| 型アップグレード例外(例えば、値を宣言された列型に拡張できない場合など)を例外をスローするのではなく、不良レコードとして扱うかどうか。 |
Kafka
Kafkaリーダーのオプションの全リストについては、 「DataStreamReader Kafkaオプション」を参照してください。以下のオプションは、 spark.read.format("kafka")を使用したバッチ読み取りにのみ適用されます。
Key | デフォルト | 有効な値 | 説明 |
|---|---|---|---|
|
|
| どこで読み取りを停止するか。JSON 文字列では、 |
| なし | JSONタイムスタンプ文字列 | パーティションごとの終了オフセットは、ミリ秒単位のタイムスタンプとして指定されます。例: |
| なし | 正の整数または | すべてのパーティションに適用される、ミリ秒単位のグローバル終了タイムスタンプ。 |
ORC
以下のオプションは、ORCファイルを読み込む際に適用されます。
Key | デフォルト | 有効な値 | 説明 |
|---|---|---|---|
|
|
| 複数ファイルからスキーマを推定し、各ファイルのスキーマをマージするかどうか。 |
Parquet
Parquet ファイルを読み取る際に、以下のオプションが適用されます。
Key | デフォルト | 有効な値 | 説明 |
|---|---|---|---|
|
|
| ユリウス暦と先発グレゴリオ暦の間でのDATE値とTIMESTAMP値のリベースを制御します。 |
|
|
| INT96のタイムスタンプ値をユリウス暦と先発グレゴリオ暦の間でリベースすることを制御します。 |
|
|
| 複数ファイルからスキーマを推定し、各ファイルのスキーマをマージするかどうか。 |
|
|
|
|
| なし | 列名文字列 | データ型の不一致、スキーマの不一致(列の大文字小文字の区別を含む)などが原因で解析できないすべてのデータを、別の列に収集するかどうか。この列は、 Auto Loader使用するときに自動的に組み込まれます。 詳細については、 「救出されたデータ列とは何ですか?」を参照してください。
|
状態ストア
これらのオプションをspark.read.format("statestore")またはread_statestoreテーブル値関数と組み合わせて使用すると、構造化ストリーミング状態データを読み込むことができます。構造化ストリーミングの状態情報の読み取りを参照してください。
Key | デフォルト | 有効な値 | 説明 |
|---|---|---|---|
| 最新のバッチID | 正の整数または | 読み取り元のターゲットバッチです。以前のクエリの状態を照会します。バッチはコミットされている必要がありますが、まだクリーンアップされていません。 |
|
| 正の整数または | 読み取り元のターゲットオペレーターです。クエリに複数のステートフル演算子がある場合に使用します。 |
|
| 任意の文字列 | 読み取り元のターゲットの状態ストア名。ステートフル演算子に複数の状態ストアインスタンスがある場合に使用します。ストリーム-ストリームJOINには、 |
| なし |
| ストリーム-ストリーム結合における読み取り元のターゲット側。ストリーム-ストリームJOINには、 |
| なし | 正の整数または | 状態を読み取る際の開始点として使用するスナップショットのバッチ ID です。リーダーは、このスナップショットからの変更を |
| なし | 正の整数または | 指定されている場合、クエリはこのパーティションのみを読み込みます。 |
|
|
|
詳細については、 「構造化ストリーミングの状態変化の読み取り」を参照してください。 |
| なし | 正の整数または | 変更フィード範囲の開始バッチ ID。 |
| 最新のバッチID | 正の整数または | チェンジフィード範囲の最終バッチIDです。 |
| なし | 任意の文字列 | 読み込む状態変数名です。状態変数名は、 |
|
|
|
|
|
|
|
|
文章
次のオプションは、テキストファイルの読み取り時に適用されます。
Key | デフォルト | 有効な値 | 説明 |
|---|---|---|---|
|
|
| TEXTファイルのエンコーディングの行区切り文字の名前。ファイルの内容は、このオプションの影響を受けず、そのまま読み込まれます。 |
| なし、これは | 文字列 | 連続する2つのTEXTレコード間の文字列。 |
|
|
| ファイルを単一レコードとして読み取るかどうか。 |
XML
次のオプションは、XML ファイルの読み取り時に適用されます。
Key | デフォルト | 有効な値 | 説明 |
|---|---|---|---|
| なし | 任意の文字列 | 行として扱うXMLファイルの行タグ。例の XML |
|
|
| スキーマ推論に使用される行の割合を定義します。XML 組み込み関数は、このオプションを無視します。 |
|
|
| 要素内の属性を除外するかどうか。 |
| なし |
| 解析中に破損したレコードを処理するモード
|
|
|
|
|
|
| 列名文字列 |
|
| なし | 任意の文字列 | 属性を要素と区別するための、属性の接頭辞。これはフィールド名の接頭辞になります。デフォルトは |
|
| 任意の文字列 | 属性または子要素を持つ要素内の文字データに使用されるタグ。ユーザーはスキーマ内で |
|
|
| 読み取り時には、指定されたエンコード方式でXMLファイルをデコードします。書き込み時に、保存されるXMLファイルのエンコーディング(文字セット)を指定します。XMLの組み込み関数はこのオプションを無視します。DataFrameWriter の XML オプションにも適用されます。 |
|
|
| 値の周囲の空白をスキップする必要があるかどうか。空白文字のみのデータは無視されます。 |
| なし | ファイルパス文字列 | 各行の XML を個別に検証するために使用される、オプションの XSD ファイルへのパス。検証に失敗した行は、解析エラーのように扱われます。それ以外の場合、XSD は、指定または推論されたスキーマに影響を与えません。 |
|
|
|
|
|
| タイムスタンプ形式の文字列 | datetime パターン形式に従うカスタムタイムスタンプ形式文字列。これは |
|
| タイムスタンプ形式の文字列 | タイムゾーンを含まないタイムスタンプのカスタムフォーマット文字列。datetime パターンフォーマットに従います。これはTimestampNTZType型に適用されます。DataFrameWriter の XML オプションにも適用されます。 |
|
| 日付形式文字列 | datetime パターン形式に従うカスタム日付形式文字列。これは日付型に適用されます。DataFrameWriter の XML オプションにも適用されます。 |
|
| IETF BCP 47形式の言語タグ | IETF BCP 47形式の言語タグとしてロケールを設定します。例えば、 |
| string | 任意の文字列 | null値の文字列表現を設定します。これが |
|
|
| rescuedDataColumn が有効になっている場合の大文字小文字の区別動作を指定します。もしそうであれば、スキーマから大文字小文字が異なる名前のデータ列を復元します。偽の場合、大文字小文字を区別せずにデータを読み込む。 |
| なし | 列名文字列 | データ型の不一致やスキーマの不一致(列の大文字小文字の区別を含む)により解析できないすべてのデータを、別の列に収集するかどうか。この列は、 Auto Loader使用するときに自動的に組み込まれます。 詳細については、 「救出されたデータ列とは何ですか?」を参照してください。 |
|
| 列名文字列 | 単一のバリアント列名を指定します。このオプションが読み込み用に指定されている場合、指定されたオプション文字列の値を列名として、XMLレコード全体を単一のバリアント列に解析します。書き込み用にこのオプションが指定されている場合、単一バリアント列の値をXMLファイルに書き込みます。データフレームWriter の XML オプションにも適用されます。 |
|
|
| 従来のXMLパーサーを使用するかどうか。従来のパーサーは、不正なコンテンツに対する検証が緩い反面、メモリ効率が劣ります。より厳格なデフォルトパーサーを有効にするには、 |
|
| 列名文字列 | ワイルドカード( |
DataStreamReaderのオプション
これらのオプションをDataStreamReader.option()と組み合わせて使用すると、Delta Lakeテーブルやその他のファイルベースのソースからのストリーミング読み取りを設定できます。
ファイル形式のオプション(JSON、CSV、Parquetなど)については、 DataFrameReaderのオプションを参照してください。
Auto Loader ( cloudFiles.* )のオプションについては、 Auto Loader参照してください。
例
次の例では、 Delta Lakeテーブル ストリームのmaxFilesPerTriggerを10に設定します。
- Python
- Scala
df = spark.readStream.format("delta").option("maxFilesPerTrigger", 10).load("/path/to/delta-table")
val df = spark.readStream.format("delta").option("maxFilesPerTrigger", "10").load("/path/to/delta-table")
一般
以下のオプションは、Delta Lakeテーブルおよびその他のファイルベースのストリーミングソースに適用されます。
Key | デフォルト | 有効な値 | 説明 |
|---|---|---|---|
|
|
| ストリームで処理されたソースファイルの取り扱い方法。 |
|
|
| 既に処理済みのファイルを、フルパスではなくファイル名のみで識別するかどうか。 |
|
|
| 各マイクロバッチ内で、最も最近変更されたファイルを最初に処理するかどうか。最新のデータをできるだけ迅速に処理したい場合に役立ちます。 |
| なし | 正の整数 | 各マイクロバッチで処理されるデータの量の実質的な上限。最小の入力単位が制限を超える場合、バッチは制限を超える量を処理する可能性があります。 Auto Loaderの場合は、代わりに |
|
| 正の整数または | 後続のマイクロバッチ用にキャッシュする未処理ファイルの最大数 |
|
| 期間を表す文字列。例: | 現在のシステム時刻ではなく、最後に変更されたファイルのタイムスタンプを基準とした、処理対象ファイルの最大期間。このしきい値より古いファイルは無視されます。 |
|
| 正の整数 | 各マイクロバッチで処理される新しいファイルの数の上限 Auto Loaderの場合は、代わりに |
| なし | パス文字列 |
|
Auto Loader
これらのオプションをcloudFilesソースと組み合わせて使用すると、クラウドストレージからのストリーミング取り込み用にAuto Loaderを設定できます。cloudFilesソースに固有のオプションには、他の構造化ストリーミングソースオプションとは別の名前空間に保持するために、 cloudFilesという接頭辞が付けられます。
一般
次のオプションは、すべてのAuto Loader構成に適用されます。
Key | デフォルト | 有効な値 | 説明 |
|---|---|---|---|
|
|
| 入力ディレクトリファイルの変更が既存のデータを上書きすることを許可するかどうか。 設定上の注意点については、 「ファイルが追加または上書きされた場合、Auto Loader はファイルを再度処理しますか?」を参照してください。 |
| なし | 期間を表す文字列。例: | Auto Loader は、特定の間隔で非同期バックフィルをトリガーできます。詳細については、「cloudFiles.backfillInterval を使用して定期的なバックフィルをトリガーする」を参照してください。
|
|
|
| 入力ディレクトリから処理されたファイルを自動的に削除または移動するかどうか。
ファイルは、
Databricks Runtime 16.4以降で利用可能です。 |
|
| CalendarInterval のような文字列( | 処理されたファイルがアーカイブ候補になるまでの待機時間( Databricks Runtime 16.4以降で利用可能です。 |
| なし | クラウドストレージまたはUnity Catalogボリュームパス |
移転先は以下の条件を満たす必要があります。
Auto Loaderには、このディレクトリへの書き込み権限が必要です。 Databricks Runtime 16.4以降で利用可能です。 |
| なし(必須オプション) |
| ソースパス内のデータファイル形式。有効な値は次のとおりです。 |
|
|
| ストリーム処理入力パスに既存のファイルを含めるか、初期セットアップ後に到着する新しいファイルのみを処理するかどうか。このオプションは、初めてストリームを開始するときにのみ評価されます。ストリームの再開後にこのオプションを変更しても効果はありません。 |
|
|
| スキーマ推論を利用する際に、正確な列型を推論するかどうか。安心により、 JSONやCSVデータセットを推論する際、列は文字列として推論されます。 詳細については、スキーマ推論を参照してください。 |
| なし | バイトを表す文字列。例: | 各トリガーで処理される新しいバイトの最大数。これはソフトマキシマムです。3GBのファイルがある場合、Databricksはマイクロバッチで12GBを処理します。 Databricks Runtime 18.0以降では、このオプションは動的に構成されるため、手動で設定する必要はありません。 |
| なし | 期間を表す文字列 | ファイルイベントが重複排除の目的で追跡される期間。Databricks 、1 時間あたり数百万ファイル程度のデータを取り込んでいる場合を除き、この設定を調整することを推奨しません。 詳細については、 「ファイルイベント追跡」のセクションを参照してください。
|
|
| 正の整数 | 各トリガーで処理される新しいファイルの最大数。 Databricks Runtime 18.0以降では、このオプションは動的に構成されるため、手動で設定する必要はありません。 |
| なし | 列名のコンマ区切りリスト | ファイルのディレクトリ構造から推測したいHiveスタイルのパーティション列のコンマ区切りのリスト。Hiveスタイルのパーティション列は、
|
|
|
| データ内に新しい列が発見された際に、スキーマを進化させるためのモード。安心により、 JSONデータセットを推論する際、列は文字列として推論されます。 詳細については、 「スキーマの進化」を参照してください。 |
| なし | スキーマ文字列 | スキーマ推論中にAuto Loaderに提供するスキーマ情報。詳細については、スキーマのヒントを参照してください。 |
| なし(スキーマ推論に必須) | パス文字列 | 推論されたスキーマとその後の変更を保存する場所。詳細については、スキーマ推論を参照してください。 |
|
|
| Apache Sparkの他のファイル ソースの完全なグロビング動作と一致する厳密なglobberを使用するかどうか。 詳細については、 「一般的なデータ読み込みパターン」を参照してください。Databricks Runtime 12.2 LTS以降で利用可能です。 |
|
|
| Auto Loaderオプションを検証し、不明なオプションまたは一貫性のないオプションに対してエラーを返すかどうか。 |
ディレクトリ一覧
以下のオプションは、ディレクトリリストモードを使用する際に適用されます。
Key | デフォルト | 有効な値 | 説明 |
|---|---|---|---|
|
|
| この機能は廃止されました。Databricksは、 ディレクトリ一覧表示モードで、完全な一覧表示ではなく、増分一覧表示を使用するかどうか。デフォルトでは、Auto Loader は指定されたディレクトリが増分リストの対象であるかどうかを自動的に検出するよう最大限の努力を払います。明示的に増分リストを使用するか、ディレクトリ全体のリストを使用するかは、それぞれ 辞書順に並んでいないディレクトリでインクリメンタルリストを誤って有効にすると、 Auto Loader新しいファイルを検出できなくなります。 Azureデータレイク ストレージ ( Databricks Runtime 9.1 LTS 以降で利用可能です。 |
ファイル通知
必要なクラウド権限、セットアップ手順、認証方法など、ファイル通知モードの構成については、 「 ファイル通知モードでのAuto Loaderストリームの構成 」を参照してください。
Key | デフォルト | 有効な値 | 説明 |
|---|---|---|---|
|
| 正の整数 | キューイングサービスからメッセージを取得するときに使用するスレッドの数。
|
| なし | JSON マップ文字列 | 複数のS3バケットからファイル通知を受け取る
|
| なし | キーと値タグ文字列 | 関連リソースの関連付けと識別に役立つ一連のキーと値のタグのペア。次に例を示します。
詳細については、「クラウドプロバイダーのリソースタグ」を参照してください。 |
|
|
|
ファイルイベントは、ファイル検出において通知レベルのパフォーマンスを提供します。これは、Auto Loaderが前回の実行後に新しいファイルを検出できるためです。ディレクトリ一覧表示とは異なり、この処理ではディレクトリ内のすべてのファイルを一覧表示する必要はありません。 ファイルイベントオプションが有効になっている場合でも、Auto Loaderがディレクトリ一覧表示を使用する状況がいくつかあります。
ファイルイベントを使用するAuto Loader 、いつディレクトリ一覧を使用しますか?を参照してください。 このオプションを使用して Auto Loader がディレクトリ一覧を表示する状況の包括的なリストについては、こちらをご覧ください。 Databricks Runtime 14.3 LTS以降で利用可能です。 |
|
|
|
|
|
|
| 新しいファイルが存在することを通知するために、ファイル通知モードを使用するかどうか。
|
クラウド プロバイダー リソース タグ
Auto Loaderは、デフォルトではベストエフォート方式で次のキーと値のタグのペアを追加します。
vendor:Databrickspath: データが読み込まれる場所。ラベル付けの制限のため、GCPでは使用できません。checkpointLocation:ストリームのチェックポイントの位置。表示上の制限により、GCP(医薬品臨床試験の実施に関する基準)では利用できません。streamId: ストリームのグローバル一意識別子。
Databricksはこれらのキー名を予約しており、その値を上書きすることはできません。
GCPの詳細については、「ラベルを使用した使用状況のレポート」を参照してください。
クラウド固有の
Auto Loader には、ファイル通知モード向けにクラウドインフラを構成するためのオプションが用意されています。必要なクラウド権限とセットアップ手順については、「ファイル通知モードでの Auto Loader ストリームの構成」を参照してください。
AWS
cloudFiles.useNotifications = trueを選択し、Auto Loaderで通知サービスを設定する場合にのみ、次のオプションを指定します。
Key | デフォルト | 有効な値 | 説明 |
|---|---|---|---|
| EC2インスタンスのリージョン | AWSリージョン文字列 | ソースS3バケットが存在し、 AWS SNS および SQS サービスを作成するリージョン。 |
Key | デフォルト | 有効な値 | 説明 |
|---|---|---|---|
|
|
| SNSトピックと同じアカウント内のAWS S3バケットからのイベント通知のみを許可する。この設定が有効な場合、Auto Loader は SNS トピックと同じアカウント内の AWS S3 バケットからのイベント通知のみを受け入れます。
Databricks Runtime 17.2以降で利用可能です。 |
cloudFiles.useNotifications = trueを選択し、すでに設定したキューをAuto Loaderで使用する場合にのみ、次のオプションを指定します。
Key | デフォルト | 有効な値 | 説明 |
|---|---|---|---|
| なし | URL 文字列 | SQSキューのURL。指定された場合、Auto Loaderは独自のAWS SNSとSQSサービスをセットアップする代わりに、このキューから直接イベントを消費します。 |
AWS認証オプション
Databricksサービス資格情報を使用するには、次の認証オプションを指定してください。
Key | デフォルト | 有効な値 | 説明 |
|---|---|---|---|
| なし | 任意の文字列 | Databricksサービスの認証情報の名前。Databricks Runtime 16.1以降で利用可能です。 |
Databricksサービス資格情報またはIAMロールが利用できない場合、代わりに、次の認証オプションを指定できます。
Key | デフォルト | 有効な値 | 説明 |
|---|---|---|---|
| なし | 任意の文字列 | ユーザーのAWSアクセスキーID。 |
| なし | 任意の文字列 | ユーザーのAWSシークレットアクセスキー。 |
| なし | ARN文字列 | ARNIAM必要に応じて引き受ける ロールの 。このロールは、クラスターのインスタンスプロファイルから、または |
| なし | 任意の文字列 |
|
| なし | 任意の文字列 |
|
| なし | URL 文字列 |
|
Azure
cloudFiles.useNotifications = trueを指定し、Auto Loaderに通知サービスを設定させる場合は、次のすべてのオプションに値を指定する必要があります。
Key | デフォルト | 有効な値 | 説明 |
|---|---|---|---|
| なし | 任意の文字列 | ストレージ アカウントが作成されるAzureリソース グループ。 |
| なし | 任意の文字列 | リソースグループが作成されたAzureサブスクリプションID。 |
| なし | 任意の文字列 | Databricksサービスの認証情報の名前。Databricks Runtime 16.1以降で利用可能です。 |
Databricks サービスの認証情報が利用できない場合、次の認証オプションを指定できます。
Key | デフォルト | 有効な値 | 説明 |
|---|---|---|---|
| なし | 任意の文字列 | Databricksサービスプリンシパルのクライアント ID またはアプリケーション ID。 |
| なし | 任意の文字列 | Databricksサービスプリンシパルのクライアント シークレット。 |
| なし | 接続文字列 | アカウントアクセスキーあるいは共有アクセス署名(SAS)に基づく、ストレージアカウントの接続文字列。 |
| なし | 任意の文字列 | Databricksサービスプリンシパルが作成されるAzureテナントID。 |
cloudFiles.useNotifications = trueを選択し、Auto Loaderで既存のキューを使用する場合にのみ、次のオプションを指定します。
Key | デフォルト | 有効な値 | 説明 |
|---|---|---|---|
| なし | 任意の文字列 | Azureキューの名前。指定されている場合、クラウドファイルソースは、独自のAzure Event Gridサービスとキューストレージサービスを設定する代わりに、このキューからイベントを直接消費します。その場合、 |
GCP
Auto Loader Databricksサービスの資格情報を利用して、通知サービスを自動的にセットアップできます。 Databricks サービス認証情報を使用して作成されたサービス アカウントには、ファイル通知モードでの Auto Loader ストリームの構成で指定された権限が必要です。
Key | デフォルト | 有効な値 | 説明 |
|---|---|---|---|
| なし | 任意の文字列 | GCSバケットが存在するプロジェクトのID。Google クラウド Pub/Sub サブスクリプションもこのプロジェクト内で作成されます。 |
| なし | 任意の文字列 | Databricksサービスの認証情報の名前。Databricks Runtime 16.1以降で利用可能です。 |
Databricks サービスの認証情報を利用できない場合は、Google サービス アカウントを直接使用できます。Google サービスのセットアップに従ってクラスターをサービス アカウントとして構成するか、次の認証オプションを指定できます。
Key | デフォルト | 有効な値 | 説明 |
|---|---|---|---|
| なし | 任意の文字列 | GoogleサービスアカウントのクライアントID。 |
| なし | メールアドレス文字列 | Googleサービスアカウントのメールアドレス。 |
| なし | プライベートキー文字列 | Google サービス アカウント用に生成される秘密キー。 |
| なし | 任意の文字列 | Google サービス アカウント用に生成された秘密キーの ID。 |
cloudFiles.useNotifications = trueを選択し、すでに設定したキューをAuto Loaderで使用する場合にのみ、次のオプションを指定します。
Key | デフォルト | 有効な値 | 説明 |
|---|---|---|---|
| なし | 任意の文字列 | Google Cloud Pub/Subサブスクリプションの名前。指定されている場合、クラウドファイルソースは独自のGCS通知サービスとGoogle Cloud Pub/Subサービスを設定する代わりに、このキューからのイベントを消費します。 |
Delta Lake
spark.readStreamを使用して Delta Lake テーブルから読み込む場合、以下のオプションが適用されます。
Key | デフォルト | 有効な値 | 説明 |
|---|---|---|---|
| なし | バージョン番号または | Deltaテーブルのバージョン番号または |
| なし | バージョン番号または | Deltaテーブルのバージョン番号または |
| なし | バージョン番号または | Deltaテーブルのバージョン番号または |
| なし | Javaの正規表現文字列 | 正規表現パターン。パスがパターンに一致するファイルは、ストリーミング読み取りから除外されます。想定される命名規則に準拠していないファイルを除外するのに役立ちます。 |
|
|
| ログ保持( |
|
|
| Databricks Runtime 11.3 LTS以前のバージョンで利用可能です。 |
|
|
| パーティション境界でデータを削除するトランザクション(パーティション全体の削除のみ)は無視します。パーティション以外の削除、更新、その他の変更には対応していません。代わりに |
|
|
| ストリーミングクエリのチェンジデータフィードの読み取りを有効にするかどうか。有効にすると、ストリームは、追加のメタデータ列を伴う行レベルの変更(挿入、更新、および削除)を出力します。Databricksでのチェンジデータフィードの使用 を参照してください。 |
| なし | パス文字列 | Delta Lakeがストリーミング読み取りのためのスキーマ変更を追跡するディレクトリへのパス。列マッピングが有効になっているテーブルからストリーミングし、スキーマ進化を処理するために |
|
|
| 既存のレコードを削除または変更するトランザクションは無視し、追加のみを処理します。Databricks 、変更データフィードを使用しないほとんどのワークロードにこのオプションを推奨します。 Databricks Runtime 12.2 LTS以降で利用可能です。 |
| 最新の情報 | タイムスタンプ文字列(例: | 読み取りを開始するタイムスタンプ。このストリームは、指定されたタイムスタンプ以降にコミットされたすべてのテーブル変更を読み取ります。タイムスタンプがすべての利用可能なテーブル コミットよりも前にある場合、ストリームは利用可能な最も早いコミットから開始されます。 |
| 最新の情報 | 正の整数、 | Deltaテーブルのバージョンを読み取り開始します。 このストリームは、指定されたバージョン以降にコミットされたすべての変更を読み取ります。最新の変更のみから開始するには、 |
|
|
| 初期テーブルスナップショットをイベント時間バケットに分割することで、レコードが誤って遅延イベントとしてマークされ、ウォーターマーク付きのステートフルクエリで削除されるのを防ぎます。初期スナップショット処理が開始された後は、チェックポイントを削除しない限り変更できません。Databricks Runtime 11.3 LTS以降で利用可能です。データを削除せずに初期スナップショットを処理する方法については、「データを削除せずに処理する」を参照してください。 |
Kafka
これらのオプションはspark.readStream.format("kafka")またはspark.read.format("kafka")のいずれかと組み合わせて使用してください。
Key | デフォルト | 有効な値 | 説明 |
|---|---|---|---|
| なし | JSON文字列など | コンシュームする特定のパーティションです。 |
|
|
| データが失われた可能性がある場合、例えば、削除されたトピックやオフセットの切り捨てが原因で、クエリーを失敗させるかどうか。 Databricksは、データが失われた可能性について控えめな見積もりを行っています。しかし、これは誤報を引き起こす可能性がある。 |
|
| 正の整数または | Kafka オフセットの取得に失敗した場合の再試行回数。 |
|
| 正の整数または | オフセットフェッチの再試行間のミリ秒単位の間隔。 |
|
| 任意の文字列 | 自動生成された Kafka コンシューマーグループの ID に使用するカスタマイズされたプレフィックス。 |
| なし | 任意の文字列 | 読み取りに使用する Kafka コンシューマーグループの ID。注意:同じグループ ID を共有するクエリは相互に干渉し、データの一部のみを読み取る可能性があります。これは、並列バッチおよびストリーミングワークロードを実行している場合、またはクエリを素早く再起動している場合に発生する可能性があります。設定されている場合、 |
|
|
| 出力に Kafka メッセージヘッダーを列として含めるかどうか。 |
| なし | 正の整数 | Kafka コンシューマー |
| なし | カンマ区切りの | ホストのカンマ区切りリスト Kafkaブローカーのアドレス。Kafkaクライアントの Kafkaからデータが取得できない場合は、このブローカーアドレスリストを確認して、アドレスに誤りがないか確認してください。ブローカーのアドレスリストが間違っている場合、エラーが発生しない可能性があります。Kafkaクライアントは、ブローカーがいずれ利用可能になると想定し、ネットワークエラーが発生した場合は永久に再試行します。 |
| なし | 正の整数 | 各Sparkパーティションの最大レコード数です。設定されている場合、コネクタはKafkaパーティションを分割し、各Sparkパーティションが読み込むレコードの最大数がこの値になるようにします。 このオプションは |
| なし | 正の整数 | Kafka から読み取る Spark パーティションの最小数設定すると、コネクタは大きな Kafka パーティションを分割して並列処理を向上させます。設定されていない場合、Spark は各 Kafka トピックパーティションに対して 1 つのパーティションを作成します。データスキューまたはピーク負荷の処理に役立ちます。 このオプションは、トリガーごとにKafkaコンシューマーを再初期化するため、SSL使用時のパフォーマンスに影響を与える可能性があります。 |
|
|
| クエリが読み取りを開始するオフセット。JSON 文字列では、 ストリーミングクエリの場合、このオプションは新しいクエリが開始されたときにのみ適用されます。再開されたクエリは常にチェックポイントを使用します。クエリ実行中、新しいパーティションは最も早いオフセットから読み取りを開始します。 バッチクエリの場合、 |
| なし | JSON タイムスタンプ文字列のような | ミリ秒単位のタイムスタンプとして指定される、各パーティションの開始オフセットのリストです。タイムスタンプにオフセットが存在しない場合、クエリの動作は ストリーミングクエリの場合、このオプションは新しいクエリが開始されたときにのみ適用されます。再開されたクエリは常にチェックポイントを使用します。クエリ実行中、新しいパーティションは最も早いオフセットから読み取りを開始します。 |
|
|
|
|
| なし | 正の整数または | すべてのパーティションに適用されるグローバルな開始タイムスタンプ (ミリ秒単位)。タイムスタンプにオフセットが存在しない場合、動作は |
| なし | トピック名のコンマ区切りリスト | サブスクライブするトピックです。 |
| なし | Javaの正規表現文字列 | トピックをサブスクライブするのに使われるパターンです。 |
以下のオプションは、 spark.readStream.format("kafka")のストリーミング読み取りにのみ適用されます。
Key | デフォルト | 有効な値 | 説明 |
|---|---|---|---|
|
| 期間を表す文字列( |
|
| なし | 正の整数 | トリガー間隔ごとに処理されるオフセットの最大数。オフセットはトピックパーティション間で比例して分配されます。 |
|
| 期間を表す文字列( |
|
| なし | 正の整数 | マイクロバッチをトリガーするまでに蓄積する最小オフセット数 |
spark.read.format("kafka")を使用したバッチ読み取りにのみ適用されるオフセットオプションについては、 DataFrameReader Kafka オプションを参照してください。
認証
Databricks は、クラウドマネージド Kafka サービス(AWS MSK、Azure Event Hubs、または Google Cloud Managed Kafka)への認証に Unity Catalog サービス資格情報を使用することを推奨します。
Key | デフォルト | 有効な値 | 説明 |
|---|---|---|---|
| なし | 任意の文字列 | クラウド管理のKafkaサービスを認証するための Unity Catalog サービス資格情報の名前Databricks Runtime 16.1 以降で利用可能です。 |
| なし | 任意の文字列 | サービス認証情報のOAuthスコープ。Databricks がお客様の Kafka サービスに対するスコープを自動的に推論できない場合にのみ、これを設定してください。 |
サービス資格情報が利用できない場合、SASL/SSLオプションを使用します(kafka.*プロパティを介して渡されます)。サービス認証情報を使用する場合、kafka.sasl.mechanism、kafka.sasl.jaas.config、またはkafka.security.protocolを指定する必要はありません。
Key | デフォルト | 有効な値 | 説明 |
|---|---|---|---|
| なし | セキュリティプロトコル文字列。例: | ブローカー通信のセキュリティプロトコル。 |
| なし | SASLメカニズム文字列、例えば | SASLメカニズムです。 |
| なし | JAAS 構成文字列 | JAASログイン設定文字列です。 |
| なし | 完全修飾クラス名 | SASL認証用のログインコールバックハンドラーの完全修飾クラス名です。 |
| なし | 完全修飾クラス名 | SASL認証用のクライアントコールバックハンドラーの完全修飾クラス名です。 |
| なし | ファイルパス文字列 | SSLトラストストアファイルへのパス。 |
| なし | 任意の文字列 | SSLトラストストアファイルのパスワードです。 |
| なし | ファイルパス文字列 | SSLキーストアファイルのパス。 |
| なし | 任意の文字列 | SSLキーストアファイルのパスワード。 |
完全な認証設定手順については、認証を参照してください。
パブ/サブ
spark.readStream.format("pubsub") と共にこれらのオプションを使用して、Google Pub/Sub を購読します。オプションsubscriptionId、topicId、およびprojectIdが必要です。
Key | デフォルト | 有効な値 | 説明 |
|---|---|---|---|
| なし | 任意の文字列 | 必須。Pub/Sub サブスクリプション ID。コネクタは、サブスクリプションが存在しない場合は作成します。 |
| なし | 任意の文字列 | 必須。パブ/サブ トピックID。 |
| なし | 任意の文字列 | 必須。Google Cloud プロジェクト ID です。 |
| ストリーム初期化時に利用できるエグゼキューターの半分 | 正の整数 | サブスクリプションから行を取得する並列Sparkタスクの数。 |
| なし | 正の整数 | マイクロバッチごとに処理されるバイト数のソフトリミット。 |
|
| 正の整数 | 処理する前にタスクごとに取得する行数。 |
|
| 期間を表す文字列。例: | 各タスクが行を処理する前に取得にかかる時間。Databricksはデフォルト値を使用することをお勧めします。 |
|
|
|
|
| なし | 任意の文字列 | Pub/Subへの認証に使用するDatabricks サービス認証情報の名前。Databricks Runtime 16.1 以降で利用可能です。 |
| なし | メールアドレス文字列 | GoogleサービスアカウントのEメールアドレス。サービス認証情報を使用しない場合、必要です。 |
| なし | 任意の文字列 | Google サービス アカウントのクライアントID。サービス認証情報を使用しない場合、必要です。 |
| なし | プライベートキー文字列 | Google サービスアカウントの秘密鍵。サービス認証情報を使用しない場合、必要です。 |
| なし | 任意の文字列 | Googleサービスアカウントの秘密鍵のID。サービス認証情報を使用しない場合、必要です。 |
Pub/Subに関する詳細については、Google Pub/Sub をサブスクライブするをご覧ください。
Pulsar
spark.readStream.format("pulsar")でこれらのオプションを使用して、Apache Pulsarからストリームします。Databricks Runtime 14.1 以降で利用可能です。
以下のオプションは必須です。topic、topics、またはtopicsPatternのいずれか1つを指定する必要があります。
Key | デフォルト | 有効な値 | 説明 |
|---|---|---|---|
| なし | PulsarサービスのURL文字列 | Pulsarサービス用のPulsar |
| なし | 任意の文字列 | コンシュームする単一のトピック名です。 |
| なし | トピック名のコンマ区切りリスト | 消費対象のトピック名のコンマ区切りリスト |
| なし | Javaの正規表現文字列 | トピック名に一致するJavaの正規表現文字列です。 |
次のオプションもサポートされています。
Key | デフォルト | 有効な値 | 説明 |
|---|---|---|---|
| なし | URL 文字列 | Pulsar管理サービスのHTTP URL。 |
|
|
| 複数の異なるスキーマを持つトピックが読み込まれる場合、自動スキーマに基づくトピック値の逆シリアル化を無効にするには、このオプションを使用してください。これが |
|
|
| データが失われた場合にクエリーを失敗させるかどうか。例えば、保持ポリシーによってトピックが削除されたり、メッセージの有効期限が切れたりすると、データが失われる可能性があります。 |
| なし | 正の整数 | マイクロバッチごとに処理されるバイト数のソフトリミット。 |
|
| 正の整数 | Pulsar からメッセージを読み取る際のタイムアウト (ミリ秒単位) |
| なし | 任意の文字列 | コネクタがSparkアプリケーションの進捗状況を追跡するために使用する事前定義されたサブスクリプション名 |
|
|
| どこから読み取りを開始するか。 |
| なし | 任意の文字列 | Sparkアプリケーションの進捗を追跡するためのランダムなサブスクリプションを生成する際に、コネクタによって使用されるプレフィックスです。 |
|
|
| コネクターが目的のトピックが作成されるまで待機するかどうか。 |
以下のオプションパターンを使用して、追加のPulsarクライアント、管理、およびリーダー構成を指定できます。
パターン | 構成オプション |
|---|---|
| |
|
|
|
Pulsarクライアントおよび管理者認証オプションの詳細については、「認証」を参照してください。
認証
DatabricksはPulsarへのトラストストアとキーストア認証をサポートしています。Databricks では、シークレットを使用して認証情報を格納することを推奨しています。「 シークレットの管理」を参照してください。
Key | デフォルト | 有効な値 | 説明 |
|---|---|---|---|
| なし | 完全修飾クラス名 | 認証プラグインの完全修飾クラス名。たとえば、 |
| なし | 認証情報文字列 | 認証プラグインに認証資格情報が文字列として渡されます。たとえば、 |
|
|
|
|
| なし | 任意の文字列 | TLSトラストストアファイルの形式です。たとえば、 |
| なし | ファイルパス文字列 | 信頼されたCA証明書を含むTLSトラストストアファイルのパス。 |
| なし | 任意の文字列 | TLSトラストストアファイルのパスワードです。 |
ストリームがPulsarAdminを使用している場合、次のオプションを設定することもできます。
Key | デフォルト | 有効な値 | 説明 |
|---|---|---|---|
| なし | 完全修飾クラス名 | Pulsar管理クライアント用の認証プラグインの完全修飾クラス名 |
| なし | 認証情報文字列 | Pulsar管理クライアント認証プラグインの認証資格情報 |
| なし |
| Pulsar管理クライアント接続にTLSを使用するかどうかの設定。 |
| なし |
| Pulsar管理クライアントに対し、安全でないTLS接続を許可するかどうか。 |
| なし | ファイルパス文字列 | Pulsar管理クライアント用の信頼されたTLS証明書ファイルのパス。 |
| なし |
| Pulsar管理クライアントにKeyStoreベースのTLSを使用するかどうか。 |
| なし | 任意の文字列 | Pulsar adminクライアントのTLSトラストストアの形式。たとえば、 |
| なし | ファイルパス文字列 | Pulsar管理クライアント用のTLSトラストストアファイルへのパス。 |
| なし | 任意の文字列 | Pulsar管理クライアントのTLSトラストストアのパスワード。 |
認証の例については、Pulsarへの認証を参照してください。
DataFrameWriterのオプション
これらのオプションをDataFrameWriter.option()とDataFrameWriterV2.option()と組み合わせて使用すると、Databricksがデータを書き込む方法を制御できます。
例
次の例では、Delta Lake テーブルを書き込む際にmergeSchemaをTrueに設定します。
- Python
- Scala
df.write.format("delta").option("mergeSchema", True).saveAsTable("my_table")
df.write.format("delta").option("mergeSchema", "true").saveAsTable("my_table")
Avro
Avroファイルを書き込む場合、以下のオプションが適用されます。
Key | デフォルト | 有効な値 | 説明 |
|---|---|---|---|
| なし | JSONスキーマの文字列 | 完全な Avro スキーマを JSON 文字列として。このオプションを使用して、Spark SQL の型を特定の Avro 型に変換してください。Avro ファイルの読み取りと書き込みに適用されます。 |
| なし | URL 文字列 | AvroスキーマファイルへのURL。外部に保存されているスキーマの場合、 |
|
|
| 書き込み時に使用する圧縮コーデック。Avro ファイルの読み取りと書き込みに適用されます。 |
|
| 任意の文字列 | 出力 Avro スキーマにおけるトップレベルのレコード名です。Avro ファイルの読み取りと書き込みに適用されます。 |
|
|
| SparkスキーマとAvroスキーマの間で、名前ではなくフィールド位置で列を一致させるかどうか。Avro ファイルの読み取りと書き込みに適用されます。 |
| 空の文字列 | 任意の文字列 | 出力 Avro スキーマ内のトップレベルレコードのネームスペースです。Avro ファイルの読み取りと書き込みに適用されます。 |
Delta LakeとApache Iceberg
Delta LakeおよびApache Icebergテーブルを書き込む場合、以下のオプションが適用されます。
Key | デフォルト | 有効な値 | 説明 |
|---|---|---|---|
|
|
| クエリパターンに基づいてDatabricksがクラスタリング列を選択する自動リキッドクラスタリングを有効にするかどうか。 |
| なし |
| 書き込み操作においてスキーマ進化を有効にするかどうか。生成されたDataFrameの新しい列が、ターゲットテーブルのスキーマに追加されます。 バッチ追記とストリーミング追記の両方に適用されます。テーブルスキーマの更新に適用されます。 |
| なし |
| 上書き時にテーブルスキーマとパーティショニングを置き換えるかどうか。 |
| なし |
| パーティション上書きモード。これを |
| なし | ブーリアン式文字列 | 対象テーブルの行を、パッケージクエリからの行で置き換えるブール式。 対象テーブルとソースクエリの両方の列を参照できます。ターゲット内の行のうち、ソースの行と一致する行は削除され、置き換えられます。ソースが空の場合、削除は行われません。列参照の曖昧さを解消するには、 |
| なし | 列名のコンマ区切りリスト | 対象テーブルとソースクエリ間の行を照合するために使用される、カンマ区切りの列名のリスト。ターゲットとソースの両方に、リストされているすべての列が含まれている必要があります。対象レコード内の行のうち、ソースレコードの行と等価比較で一致する行は削除され、置き換えられます。 |
| なし | 述語式文字列 | 述語式。述語に一致するレコードのみをアトミックに上書きします。Delta Lake を使用してデータを選択的に上書きする場合に適用されます。 |
| なし | 任意の文字列 | 対象テーブルの文字列エイリアス。条件が対象テーブルとソースクエリの両方の列を参照する場合、 |
| なし | 任意の文字列 |
|
| なし | 単調増加の整数 |
|
| なし |
| この書き込み操作で自動最適化書き込みを有効にするかどうか。 |
| なし | 任意の文字列 | 書き込み操作のコミットメタデータに追加される、ユーザー定義の文字列。 |
CSV
次のオプションは、CSVファイルの書き込み時に適用されます。
Key | デフォルト | 有効な値 | 説明 |
|---|---|---|---|
|
| 1文字 | エスケープ文字が引用文字と異なる場合に、エスケープ文字をエスケープするために使用される文字。csv (DataFrameWriter)に適用されます。 |
|
|
| 書き込み時に使用する圧縮コーデック。csv (データフレームWriter) に適用されます。 |
|
| 日付形式文字列 | 日付列の値の書式指定文字列。csv (DataFrameWriter)に適用されます。 |
| 空の文字列 | 任意の文字列 | 空の値(null以外の値)に対して書き込まれる文字列。csv (DataFrameWriter)に適用されます。 |
|
|
| 出力ファイルの文字エンコーディング。csv (DataFrameWriter)に適用されます。 |
|
| 1文字 | 引用符で囲まれた値をエスケープするために使用される文字。csv (DataFrameWriter)に適用されます。 |
|
|
| 引用符で囲まれたフィールド値内の引用符文字をエスケープするかどうか。csv (DataFrameWriter)に適用されます。 |
|
|
| 出力の最初の行に列名を表示するかどうか。csv (DataFrameWriter)に適用されます。 |
|
|
| 値を書き込む際に、先頭の空白文字を削除するかどうか。csv (DataFrameWriter)に適用されます。 |
|
|
| 値を書き込む際に、末尾の空白文字を削除するかどうか。csv (DataFrameWriter)に適用されます。 |
|
| 文字列 | レコード間で使用される行区切り文字列。csv (DataFrameWriter)に適用されます。 |
|
|
|
|
| 空の文字列 | 任意の文字列 | null値に対して書き込まれた文字列。csv (DataFrameWriter)に適用されます。 |
|
| 1文字 | 区切り文字を含むフィールド値を引用するために使用される文字。csv (DataFrameWriter)に適用されます。 |
|
|
| 内容に関わらず、すべてのフィールド値を引用符で囲むかどうか。csv (DataFrameWriter)に適用されます。 |
|
| 文字列 | フィールド区切り文字。csv (DataFrameWriter)に適用されます。 |
|
| タイムスタンプ形式の文字列 | タイムスタンプ列の値の書式指定文字列。csv (DataFrameWriter)に適用されます。 |
|
| タイムスタンプ形式の文字列 | タイムゾーン( |
Excel
次のオプションはExcelファイルの書き込み時に適用されます。
Key | デフォルト | 有効な値 | 説明 |
|---|---|---|---|
| なし | シート名またはセル参照文字列 | 書き込みを開始するシート名または開始セル。省略した場合、 |
|
| Excelの日付形式文字列 |
|
|
|
| 列名を最初の行として書き込むかどうか。 |
|
| Excel タイムスタンプ書式文字列 |
|
|
|
| 書き込む Excel ファイル形式のバージョン。 |
JSON
JSON ファイルを書き込む際に、次のオプションが適用されます。
Key | デフォルト | 有効な値 | 説明 |
|---|---|---|---|
|
|
| 書き込み時に使用する圧縮コーデック。JSON(データフレームWriter) に適用されます。 |
|
| 日付形式文字列 | 日付列の値の書式指定文字列。JSON (DataFrameWriter)に適用されます。 |
|
|
| 出力ファイルの文字エンコーディング。JSON (DataFrameWriter)に適用されます。 |
| 値 |
| JSON出力からnull値を持つフィールドを除外するかどうか。JSON (DataFrameWriter)に適用されます。 |
|
| 文字列 | レコード間で使用される行区切り文字列。JSON (DataFrameWriter)に適用されます。 |
|
|
| Javaロケール識別子は、JSON内のデフォルトの日付、タイムスタンプ、小数の解析に影響します。 |
|
|
| 整形された(インデント付き、複数行)JSON出力を有効にするかどうか。 |
|
|
| 出力時にJSONオブジェクトのキーをアルファベット順に並べ替えるかどうか。決定論的な出力を生成するのに役立ちます。 |
|
| タイムスタンプ形式の文字列 | タイムスタンプ列の値の書式指定文字列。JSON (DataFrameWriter)に適用されます。 |
|
| タイムスタンプ形式の文字列 | タイムゾーン( |
|
|
| 出力時に、非ASCII文字をリテラルUTF-8文字ではなく、Unicodeエスケープシーケンス |
ORC
ORC ファイルを書き込む際に、以下のオプションが適用されます。
Key | デフォルト | 有効な値 | 説明 |
|---|---|---|---|
|
|
| 書き込み時に使用する圧縮コーデック。orc (データフレームライター) に適用されます。 |
Parquet
次のオプションは、Parquet ファイルを書き込む際に適用されます。
Key | デフォルト | 有効な値 | 説明 |
|---|---|---|---|
|
|
| 書き込み時に使用する圧縮コーデック。適用対象:Parquet(データフレームWriter) |
|
|
| タイムスタンプ列のエンコードに使用される物理タイプです。標準のタイムスタンプ型をサポートしないレガシー Parquet リーダーとの互換性のため、 |
文章
次のオプションは、テキストファイルの書き込み時に適用されます。
Key | デフォルト | 有効な値 | 説明 |
|---|---|---|---|
|
|
| 書き込み時に使用する圧縮コーデック。データフレームWriter に適用されます。 |
|
|
| 出力ファイルの文字エンコーディング。 |
|
| 文字列 | レコード間で使用される行区切り文字列。テキスト(DataFrameWriter)に適用されます。 |
XML
次のオプションは、XML ファイルの書き込み時に適用されます。
Key | デフォルト | 有効な値 | 説明 |
|---|---|---|---|
|
| 任意の文字列 | 明示的な名前を持たない配列要素の要素名。XML (DataFrameWriter)に適用されます。 |
|
| 任意の文字列 | XML属性に対応するフィールド名の前に付加される接頭辞。XML (DataFrameWriter)に適用されます。 |
|
|
| 書き込み時に使用する圧縮コーデック。xml (データフレームWriter) に適用されます。 |
|
| 日付形式文字列 | 日付列の値の書式指定文字列。XML (DataFrameWriter)に適用されます。 |
|
| XML宣言文字列、または抑制するための空の文字列 | 各出力ファイルの先頭に記述されるXML宣言文字列。宣言を抑制するには、空の文字列を設定します。XML (DataFrameWriter)に適用されます。 |
|
|
| 出力ファイルの文字エンコーディング。XML (DataFrameWriter)に適用されます。 |
| 4つのスペース | 任意の文字列 | 出力において子要素をインデントするために使用される文字列。インデントをオフにして各行を1行に表示するには、空の文字列に設定します。 |
|
|
| Javaロケール識別子。XML内でのデフォルトの日付、タイムスタンプ、小数点の書式設定に影響します。 |
|
| 任意の文字列 | null値に対して書き込まれる文字列。 |
|
| 任意の文字列 | 出力内のすべての行要素を囲むルート要素タグ。XML (DataFrameWriter)に適用されます。 |
|
| 任意の文字列 | 出力における行を表す要素タグ。XML (DataFrameWriter)に適用されます。 |
| なし | 列名文字列 | XMLファイルに書き込む単一のバリアント列の名前。XML (DataFrameWriter)に適用されます。 |
|
| タイムスタンプ形式の文字列 | タイムスタンプ列の値の書式指定文字列。XML (DataFrameWriter)に適用されます。 |
|
| タイムスタンプ形式の文字列 | タイムゾーン列の値を含まないタイムスタンプのフォーマット文字列。XML (DataFrameWriter)に適用されます。 |
|
|
| 列名が有効な XML 要素識別子でない場合に例外をスローするかどうか。XML (DataFrameWriter)に適用されます。 |
|
| 任意の文字列 | 属性または子要素を持つXML要素内の文字データに使用されるフィールド名。XML (DataFrameWriter)に適用されます。 |
DataStreamWriterのオプション
ストリーミング書き込みを設定するには、これらのオプションをDataStreamWriter.option()と組み合わせて使用します。
例
次の例は、ストリームのチェックポイントの位置を設定するものです。
- Python
- Scala
(df.writeStream
.format("delta")
.option("checkpointLocation", "/path/to/checkpoint")
.start("/path/to/table"))
df.writeStream
.format("delta")
.option("checkpointLocation", "/path/to/checkpoint")
.start("/path/to/table")
一般
次のオプションは、すべてのストリーミング書き込み操作に適用されます。
Key | デフォルト | 有効な値 | 説明 |
|---|---|---|---|
| なし(必須) | パス文字列 | ストリーミングクエリのチェックポイントディレクトリへのパス。耐障害性と、厳密に1回のみの処理を保証するために必要です。各ストリーミングクエリは、固有のチェックポイント位置を使用する必要があります。Databricks 、チェックポイントをUnity Catalogボリュームまたはクラウド ストレージ パスに保存することをお勧めします。 構造化ストリーミングのチェックポイントを参照してください。 |
| なし | パス文字列 | Parquetなどのファイルベースのストリーミングシンクの出力パス。ファイルベースのフォーマットにのみ適用されます。 |
コンソールシンク
コンソールシンクにストリームを書き込む際に、次のオプションが適用されます。
Key | デフォルト | 有効な値 | 説明 |
|---|---|---|---|
|
| 正の整数 | コンソールシンクに書き込む際に、各マイクロバッチごとに表示する行数。 |
|
|
| 行を表示する際に、長い文字列を切り詰めるかどうか。文字列値全体を表示するには、 |
Delta Lake
format("delta")を使用してストリームを Delta Lake テーブルに書き込む場合、以下のオプションが適用されます。overwriteSchema 、 replaceWhere 、 partitionOverwriteModeなどの上書き専用オプションは、ストリーミング書き込みではサポートされていません。
Key | デフォルト | 有効な値 | 説明 |
|---|---|---|---|
|
|
| ストリーミングDataFrameに新しい列が含まれている場合に、 Delta Lakeテーブル スキーマを進化させるかどうか。 追記出力モードにのみ適用されます。テーブルスキーマの更新に適用されます。 |
| なし | 任意の文字列 | 書き込み操作のコミットメタデータに追加される、ユーザー定義の文字列。 |
ファイルシンク
以下のオプションは、ストリームをファイルベースの形式(Parquet、JSON、CSV、ORC、テキスト)に書き込む場合に適用されます。フォーマット固有のオプションについては、 DataFrameWriter オプションを参照してください。
Key | デフォルト | 有効な値 | 説明 |
|---|---|---|---|
| なし |
| 耐障害性および圧縮に使用されるシンクメタデータファイルの保持期間設定されていない場合、メタデータファイルは無期限に保持されます。 |
Kafkaシンク
以下のオプションは、Kafka への書き込み時に適用されます。
Key | デフォルト | 有効な値 | 説明 |
|---|---|---|---|
| なし | カンマ区切りの | 必須。カンマ区切りの Kafka ブローカー |
| なし | 任意の文字列 | すべての行の対象となるKafkaトピック。DataFrameに |
| なし | 任意の Kafka プロデューサー構成 の値 |
|
メモリシンク
以下のオプションは、メモリシンクにストリームを書き込む際に適用されます。
Key | デフォルト | 有効な値 | 説明 |
|---|---|---|---|
| なし(必須) | 任意の文字列 | クエリが書き込みを行うインメモリテーブルの名前。メモリシンクに必要です。 |
|
|
| メモリシンクの配信保証です。 |
Spark関数オプション
Spark SQLの一部の組み込み関数は、解析またはシリアル化の動作を制御するoptionsマップを受け入れます。オプションをPython dictまたはScala Map[String, String]として渡します。
例
次の例は、JSON列を解析しながら、不正な形式のレコードを削除します。
- Python
- Scala
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StructField, StringType
schema = StructType([StructField("name", StringType())])
df = df.withColumn("parsed", from_json("json_col", schema, {"mode": "DROPMALFORMED"}))
import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.types._
val schema = StructType(Seq(StructField("name", StringType)))
val df = df.withColumn("parsed", from_json(col("json_col"), schema, Map("mode" -> "DROPMALFORMED")))
Avro
Avro関数は、対応するDataFrameオプションと同じオプションを受け入れます。
from_avroそしてschema_of_avroDataFrameReader Avroオプションを使用します。to_avroDataFrameWriter Avro オプションを使用します。
例
次の例は、スキーマ進化が有効になっているAvroカラムをデコードするものです。
- Python
- Scala
from pyspark.sql.functions import from_avro
df = df.withColumn("decoded", from_avro("avro_col", json_schema, {"avroSchemaEvolutionMode": "restart"}))
import org.apache.spark.sql.avro.functions.from_avro
val df = df.withColumn("decoded", from_avro(col("avro_col"), jsonSchema, Map("avroSchemaEvolutionMode" -> "restart")))
さらに、スキーマレジストリのバリアントfrom_avroとto_avroは、以下のオプションを受け入れます。
Key | デフォルト | 有効な値 | 説明 |
|---|---|---|---|
| なし | スキーマIDの整数 |
|
| なし | 任意の Confluent SR クライアントプロパティ値 | Confluent Schema Registryクライアント構成プロパティ。Confluent SRクライアントのプロパティを渡す際は、このプレフィックスを使用します。例えば、基本認証情報の場合は |
CSV
CSV関数は、対応するDataFrameのオプションと同じオプションを受け入れます。
from_csvschema_of_csvはDataFrameReader の CSV オプションを使用します。to_csvDataFrameWriterのCSVオプションを使用します。
例
以下の例は、カスタム区切り文字とNULL値を含むCSVファイルを読み込む例です。
- Python
- Scala
from pyspark.sql.functions import from_csv
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
schema = StructType([StructField("id", IntegerType()), StructField("name", StringType())])
df = df.withColumn("parsed", from_csv("csv_col", schema, {"sep": "|", "nullValue": "N/A"}))
import org.apache.spark.sql.functions.from_csv
import org.apache.spark.sql.types._
val schema = StructType(Seq(StructField("id", IntegerType), StructField("name", StringType)))
val df = df.withColumn("parsed", from_csv(col("csv_col"), schema, Map("sep" -> "|", "nullValue" -> "N/A")))
JSON
JSON関数は、対応するDataFrameオプションと同じオプションを受け入れます。
from_jsonそしてschema_of_jsonDataFrameReaderのJSONオプションを使用します。to_jsonDataFrameWriterのJSONオプションを使用します。
例
次の例では、 NULLのフィールドを無視し、整形フォーマットを有効にしたJSONを書き込みます。
- Python
- Scala
from pyspark.sql.functions import to_json
df = df.withColumn("json_str", to_json("struct_col", {"pretty": "true", "ignoreNullFields": "true"}))
import org.apache.spark.sql.functions.to_json
val df = df.withColumn("json_str", to_json(col("struct_col"), Map("pretty" -> "true", "ignoreNullFields" -> "true")))
Protobuf
from_protobuf また、 to_protobufファイルベースのデータソースを使用しません。これらの関数を使用すると、Protobufデータは常にバイナリ列として読み書きされます。オプションはMap[String, String]として渡され、大文字と小文字が区別されます。
例
次の例は、PERMISSIVEモードを使用してProtobufカラムをデコードする例です。
- Python
- Scala
from pyspark.sql.functions import from_protobuf
df = df.withColumn("decoded", from_protobuf("proto_col", "MyMessage", "/path/to/descriptor.desc",
{"mode": "PERMISSIVE", "enums.as.ints": "true"}))
import org.apache.spark.sql.protobuf.functions.from_protobuf
val df = df.withColumn("decoded", from_protobuf(col("proto_col"), "MyMessage", "/path/to/descriptor.desc",
Map("mode" -> "PERMISSIVE", "enums.as.ints" -> "true")))
Protobuf関数は以下のオプションを使用します。
Key | デフォルト | 有効な値 | 説明 |
|---|---|---|---|
|
|
| 破損したレコードを処理する方法 |
|
|
| 再帰 Protobuf フィールドの最大再帰深度。再帰フィールドのサポートをオフにするには、 |
|
|
| Protobuf |
|
|
| フィールドをゼロまたはデフォルト値で出力するかどうか(proto3セマンティクス)。 |
|
|
| 列挙型フィールドを文字列ではなく整数値としてレンダリングするかどうか。 |
|
|
| 整数オーバーフローを防ぐために、 |
|
|
|
|
|
|
| 出力スキーマに空のProtobufメッセージ型を保持するかどうか(ダミー列を挿入して)。 |
| なし | 任意の文字列 | スキーマレジストリのサブジェクト名。スキーマレジストリのバリアント |
| なし |
| スキーマレジストリのアドレス(ホスト名とポート番号)。スキーマレジストリのバリアント |
| なし | 任意の文字列 | スキーマレジストリのサブジェクトに複数のメッセージが含まれている場合に、どのProtobufメッセージを使用するかを指定します。任意。 |
|
|
| 受信レコードでより新しいスキーマIDが検出された場合のスキーマ変更の処理方法。 |
| — | 任意の有効な Confluent スキーマレジストリクライアントオプション値 | プレフィックス |
XML
XML関数は、対応するDataFrameオプションと同じオプションを受け入れます。
from_xmlそしてschema_of_xmlDataFrameReaderのXMLオプションを使用します。to_xmlDataFrameWriterのXMLオプションを使用します。
例
次の例では、カスタムのルートタグと行タグを使用してXMLを書き込みます。
- Python
- Scala
from pyspark.sql.functions import to_xml
df = df.withColumn("xml_str", to_xml("struct_col", {"rootTag": "records", "rowTag": "record"}))
import org.apache.spark.sql.functions.to_xml
val df = df.withColumn("xml_str", to_xml(col("struct_col"), Map("rootTag" -> "records", "rowTag" -> "record")))