COPY INTOを使用してデータをロードする開始
COPY INTO SQLコマンドを使用すると、ファイルからDeltaテーブルにデータをロードできます。 COPY INTOは再試行可能で冪等性があります。ソースの場所にある既に読み込まれたファイルは、以降の実行ではスキップされます。
COPY INTO 以下の機能を提供します:
- S3、ADLS、ABFS、GCS、Unity Catalog ボリュームなどのクラウド ストレージからのファイルまたはフォルダー フィルターを簡単に構成できます。
- 複数のソース ファイル形式 (CSV、JSON、XML、 Avro 、 ORC 、 Parquet 、テキスト、バイナリ ファイル) をサポートします。
- デフォルトでは、正確に 1 回 (べき等) のファイル処理が行われます。
- ターゲット テーブル スキーマの推論、マッピング、マージ、および進化。
COPY INTO 削除ベクトルのワークスペース設定を尊重します。有効にすると、 Databricks Runtime 14.0 以降を実行しているSQLウェアハウスまたはコンピュートでCOPY INTO実行するときにターゲット テーブルで削除が有効になります。 削除ベクトルを有効にすると、Databricks Runtime 11.3 LTS 以下のテーブルに対するクエリがブロックされます。Databricksの削除」と「自動有効化」を参照してください。
始める前に
ユーザーがCOPY INTOを使用してデータをロードする前に、アカウント管理者はクラウドオブジェクトストレージ内のデータへのアクセスを設定する必要があります。
スキーマレスな Delta Lake テーブルにデータをロードします
Databricks Runtime 11.3 LTS 以降では、 COPY_OPTIONSでmergeSchemaをtrueに設定することにより、 COPY INTOコマンド中にスキーマが推論されるように、空のプレースホルダー Delta テーブルを作成できます。以下の例では、 Wanderbricksデータセットを使用しています。<catalog> 、 <schema> 、 <volume>を、 CREATE TABLE権限を持つカタログ、スキーマ、ボリュームに置き換えてください。
- SQL
- Python
- R
- Scala
CREATE TABLE IF NOT EXISTS <catalog>.<schema>.booking_updates_schemaless;
COPY INTO <catalog>.<schema>.booking_updates_schemaless
FROM '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
FILEFORMAT = JSON
FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');
table_name = '<catalog>.<schema>.booking_updates_schemaless'
source_data = '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
source_format = 'JSON'
spark.sql("CREATE TABLE IF NOT EXISTS " + table_name)
spark.sql("COPY INTO " + table_name + \
" FROM '" + source_data + "'" + \
" FILEFORMAT = " + source_format + \
" FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')" + \
" COPY_OPTIONS ('mergeSchema' = 'true')"
)
library(SparkR)
sparkR.session()
table_name = "<catalog>.<schema>.booking_updates_schemaless"
source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
source_format = "JSON"
sql(paste("CREATE TABLE IF NOT EXISTS ", table_name, sep = ""))
sql(paste("COPY INTO ", table_name,
" FROM '", source_data, "'",
" FILEFORMAT = ", source_format,
" FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')",
" COPY_OPTIONS ('mergeSchema' = 'true')",
sep = ""
))
val table_name = "<catalog>.<schema>.booking_updates_schemaless"
val source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
val source_format = "JSON"
spark.sql("CREATE TABLE IF NOT EXISTS " + table_name)
spark.sql("COPY INTO " + table_name +
" FROM '" + source_data + "'" +
" FILEFORMAT = " + source_format +
" FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')" +
" COPY_OPTIONS ('mergeSchema' = 'true')"
)
このSQL文は冪等性を持つ。つまり、これを繰り返し実行するようにスケジュール設定すれば、 Deltaテーブルには新しいデータのみがロードされるということです。
空の Delta テーブルはCOPY INTO外部では使用できません。INSERT INTOおよびMERGE INTO 、スキーマレス Delta テーブルへのデータの書き込みはサポートされていません。COPY INTOを使用してテーブルにデータが挿入されると、テーブルはクエリ可能になります。
COPY INTOのターゲットテーブルの作成を参照してください。
Delta Lakeテーブルにスキーマを設定し、データをロードします。
次の例では、Delta テーブルを作成し、 COPY INTO SQL コマンドを使用してWanderbricksデータセットからサンプルデータをテーブルにロードします。これらのファイルは、 Unity Catalogボリュームに保存されているJSONファイルです。 Databricksクラスターに接続されたノートブックからサンプルのPython 、R、 Scala 、またはSQLコードを実行できます。 Databricks SQLでは、SQLウェアハウスに関連付けられたクエリからSQLコードを実行することもできます。<catalog> 、 <schema> 、 <volume>を、 CREATE TABLE権限を持つカタログ、スキーマ、ボリュームに置き換えてください。
- SQL
- Python
- R
- Scala
DROP TABLE IF EXISTS <catalog>.<schema>.booking_updates_upload;
CREATE TABLE <catalog>.<schema>.booking_updates_upload (
booking_id BIGINT,
user_id BIGINT,
status STRING,
total_amount DOUBLE
);
COPY INTO <catalog>.<schema>.booking_updates_upload
FROM '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
FILEFORMAT = JSON
FORMAT_OPTIONS ('multiLine' = 'true');
SELECT * FROM <catalog>.<schema>.booking_updates_upload;
table_name = '<catalog>.<schema>.booking_updates_upload'
source_data = '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
source_format = 'JSON'
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" \
"booking_id BIGINT, " + \
"user_id BIGINT, " + \
"status STRING, " + \
"total_amount DOUBLE)"
)
spark.sql("COPY INTO " + table_name + \
" FROM '" + source_data + "'" + \
" FILEFORMAT = " + source_format + \
" FORMAT_OPTIONS ('multiLine' = 'true')"
)
booking_updates_upload_data = spark.sql("SELECT * FROM " + table_name)
display(booking_updates_upload_data)
library(SparkR)
sparkR.session()
table_name = "<catalog>.<schema>.booking_updates_upload"
source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
source_format = "JSON"
sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))
sql(paste("CREATE TABLE ", table_name, " (",
"booking_id BIGINT, ",
"user_id BIGINT, ",
"status STRING, ",
"total_amount DOUBLE)",
sep = ""
))
sql(paste("COPY INTO ", table_name,
" FROM '", source_data, "'",
" FILEFORMAT = ", source_format,
" FORMAT_OPTIONS ('multiLine' = 'true')",
sep = ""
))
booking_updates_upload_data = tableToDF(table_name)
display(booking_updates_upload_data)
val table_name = "<catalog>.<schema>.booking_updates_upload"
val source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
val source_format = "JSON"
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" +
"booking_id BIGINT, " +
"user_id BIGINT, " +
"status STRING, " +
"total_amount DOUBLE)"
)
spark.sql("COPY INTO " + table_name +
" FROM '" + source_data + "'" +
" FILEFORMAT = " + source_format +
" FORMAT_OPTIONS ('multiLine' = 'true')"
)
val booking_updates_upload_data = spark.table(table_name)
display(booking_updates_upload_data)
クリーンアップするには、次のコードを実行してサンプルテーブルを削除してください。
- SQL
- Python
- R
- Scala
DROP TABLE <catalog>.<schema>.booking_updates_upload
spark.sql("DROP TABLE " + table_name)
sql(paste("DROP TABLE ", table_name, sep = ""))
spark.sql("DROP TABLE " + table_name)
メタデータファイルをクリーンアップします
vacuumを実行して、Databricks Runtime 15.2 以降の COPY INTO によって作成された参照されていないメタデータ ファイルをクリーンアップできます。