COPY INTOを使用してデータをロードする開始
COPY INTO SQL コマンドを使用すると、ファイルの場所から Delta テーブルにデータを読み込むことができます。これは再試行可能かつべき等な操作です。ソースの場所にある既にロードされているファイルはスキップされます。
COPY INTO 以下の機能を提供します:
- S3、ADLS、ABFS、GCS、Unity Catalog ボリュームなどのクラウド ストレージからのファイルまたはフォルダー フィルターを簡単に構成できます。
- 複数のソース ファイル形式 (CSV、JSON、XML、 Avro 、 ORC 、 Parquet 、テキスト、バイナリ ファイル) をサポートします。
- デフォルトでは、正確に 1 回 (べき等) のファイル処理が行われます。
- ターゲット テーブル スキーマの推論、マッピング、マージ、および進化。
よりスケーラブルで堅牢なファイル取り込みエクスペリエンスを実現するために、 DatabricksではSQLユーザーにストリーミング テーブルを使用することをお勧めします。 詳細については、ストリーミングテーブルを参照してください。
COPY INTO 削除ベクトルのワークスペース設定を尊重します。有効にすると、 Databricks Runtime 14.0 以降を実行しているSQLウェアハウスまたはコンピュートでCOPY INTO実行するときにターゲット テーブルで削除が有効になります。 削除ベクトルを有効にすると、Databricks Runtime 11.3 LTS 以下のテーブルに対するクエリがブロックされます。Databricksの削除」と「自動有効化」を参照してください。
始める前に
アカウント管理者は、ユーザーが COPY INTOを使用してデータを読み込む前に、 取り込み用のデータアクセスの設定 の手順に従って、クラウドオブジェクトストレージ内のデータへのアクセスを設定する必要があります。
例: スキーマレス Delta Lake テーブルにデータをロードする
この機能には、Databricks Runtime 11.3 LTS 以上が必要です。
COPY_OPTIONSでmergeSchemaをtrueに設定することで、 COPY INTOコマンドの実行中にスキーマが推論されるように、空のプレースホルダー Delta テーブルを作成できます。
CREATE TABLE IF NOT EXISTS my_table
[COMMENT <table-description>]
[TBLPROPERTIES (<table-properties>)];
COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');
SQL ステートメントはべき等であり、Delta テーブルにデータを 1 回だけ取り込むように実行するようにスケジュールできます。
空の Delta テーブルはCOPY INTO外部では使用できません。INSERT INTOおよびMERGE INTO 、スキーマレス Delta テーブルへのデータの書き込みはサポートされていません。COPY INTOを使用してテーブルにデータが挿入されると、テーブルはクエリ可能になります。
COPY INTOのターゲットテーブルの作成を参照してください。
例: スキーマを設定し、Delta Lake テーブルにデータをロードする
次の例では、Delta テーブルを作成し、 COPY INTO SQL コマンドを使用してDatabricks データセットからテーブルにサンプル データを読み込みます。Databricks クラスターに接続された ノートブックから、サンプルの Python、R、Scala、または SQL コードを実行できます。の Databricks SQLSQLウェアハウス に関連付けられたクエリ から コードを実行することもできます。SQL
- SQL
- Python
- R
- Scala
DROP TABLE IF EXISTS default.loan_risks_upload;
CREATE TABLE default.loan_risks_upload (
loan_id BIGINT,
funded_amnt INT,
paid_amnt DOUBLE,
addr_state STRING
);
COPY INTO default.loan_risks_upload
FROM '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
FILEFORMAT = PARQUET;
SELECT * FROM default.loan_risks_upload;
-- Result:
-- +---------+-------------+-----------+------------+
-- | loan_id | funded_amnt | paid_amnt | addr_state |
-- +=========+=============+===========+============+
-- | 0 | 1000 | 182.22 | CA |
-- +---------+-------------+-----------+------------+
-- | 1 | 1000 | 361.19 | WA |
-- +---------+-------------+-----------+------------+
-- | 2 | 1000 | 176.26 | TX |
-- +---------+-------------+-----------+------------+
-- ...
table_name = 'default.loan_risks_upload'
source_data = '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
source_format = 'PARQUET'
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" \
"loan_id BIGINT, " + \
"funded_amnt INT, " + \
"paid_amnt DOUBLE, " + \
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name + \
" FROM '" + source_data + "'" + \
" FILEFORMAT = " + source_format
)
loan_risks_upload_data = spark.sql("SELECT * FROM " + table_name)
display(loan_risks_upload_data)
'''
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
'''
library(SparkR)
sparkR.session()
table_name = "default.loan_risks_upload"
source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
source_format = "PARQUET"
sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))
sql(paste("CREATE TABLE ", table_name, " (",
"loan_id BIGINT, ",
"funded_amnt INT, ",
"paid_amnt DOUBLE, ",
"addr_state STRING)",
sep = ""
))
sql(paste("COPY INTO ", table_name,
" FROM '", source_data, "'",
" FILEFORMAT = ", source_format,
sep = ""
))
loan_risks_upload_data = tableToDF(table_name)
display(loan_risks_upload_data)
# Result:
# +---------+-------------+-----------+------------+
# | loan_id | funded_amnt | paid_amnt | addr_state |
# +=========+=============+===========+============+
# | 0 | 1000 | 182.22 | CA |
# +---------+-------------+-----------+------------+
# | 1 | 1000 | 361.19 | WA |
# +---------+-------------+-----------+------------+
# | 2 | 1000 | 176.26 | TX |
# +---------+-------------+-----------+------------+
# ...
val table_name = "default.loan_risks_upload"
val source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
val source_format = "PARQUET"
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" +
"loan_id BIGINT, " +
"funded_amnt INT, " +
"paid_amnt DOUBLE, " +
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name +
" FROM '" + source_data + "'" +
" FILEFORMAT = " + source_format
)
val loan_risks_upload_data = spark.table(table_name)
display(loan_risks_upload_data)
/*
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
*/
クリーンアップするには、テーブルを削除する次のコードを実行します。
- Python
- R
- Scala
- SQL
spark.sql("DROP TABLE " + table_name)
sql(paste("DROP TABLE ", table_name, sep = ""))
spark.sql("DROP TABLE " + table_name)
DROP TABLE default.loan_risks_upload
メタデータファイルをクリーンアップします
vacuumを実行して、Databricks Runtime 15.2 以降の COPY INTO によって作成された参照されていないメタデータ ファイルをクリーンアップできます。
参考
- Databricks Runtime 7.x 以降:
COPY INTO
もっと詳しく知る
-
同じ Delta テーブルに対する複数の
COPY INTO操作の例など、一般的な使用パターンについては、「COPY INTOを使用した一般的なデータ読み込みパターン」を参照してください。 -
VARIANT 型を使用して半構造化データを取り込むには、 「variant で
COPY INTOを使用する」を参照してください。