COPY INTO を使用してデータを読み込む
COPY INTO
SQL コマンドを使用すると、ファイルの場所から Delta テーブルにデータを読み込むことができます。これは、再試行可能でべき等な操作です。既に読み込まれているソースの場所のファイルはスキップされます。
COPY INTO
は、次の機能を提供します。
S3、ADLS Gen2、APFS、GCS、Unity Catalog ボリュームなどのクラウド ストレージから簡単に構成できるファイルまたはディレクトリ フィルター。
複数のソース ファイル形式のサポート: CSV、JSON、XML、 Avro 、ORC 、 Parquet 、テキスト、バイナリ ファイル
デフォルトによる Exactly-once (べき等) ファイル処理
ターゲット表スキーマの推論、マッピング、マージ、および進化
注
よりスケーラブルで堅牢なファイル インジェスト エクスペリエンスを実現するために、Databricks では、SQL ユーザーがストリーミング テーブルを活用することをお勧めします。 Databricks SQL でのストリーミング テーブルを使用したデータの読み込みに関するページを参照してください。
警告
COPY INTO
削除ベクトルのワークスペース設定を尊重します。 有効にすると、 Databricks Runtime 14.0 以上を実行している SQL ウェアハウスまたはコンピュート上で COPY INTO
を実行するときに、ターゲット テーブルでベクトル削除が有効になります。 削除ベクトルを有効にすると、Databricks Runtime 11.3 LTS 以下のテーブルに対するクエリがブロックされます。 削除ベクトルとは何かを参照してください。 削除ベクトルを自動で有効にします。
要件
アカウント管理者は、ユーザーが COPY INTO
を使用してデータを読み込む前に、「 インジェスト用のデータ アクセスの構成 」の手順に従ってクラウド オブジェクト ストレージ内のデータへのアクセスを構成する必要があります。
例: スキーマレス Delta Lake テーブル へのデータのロード
注
この機能は、Databricks Runtime 11.3 LTS 以降で利用できます。
空のプレースホルダー Delta テーブルを作成して、COPY_OPTIONS
で mergeSchema
を true
に設定することで、後で COPY INTO
コマンド中にスキーマが推論されるようにすることができます。
CREATE TABLE IF NOT EXISTS my_table
[COMMENT <table-description>]
[TBLPROPERTIES (<table-properties>)];
COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');
上記の SQL ステートメントはべき等であり、Delta テーブルに一度だけデータを取り込むようにスケジュールできます。
注
空の Delta テーブルは、 COPY INTO
以外では使用できません。 INSERT INTO
と MERGE INTO
は、スキーマレス Delta テーブルにデータを書き込むためにサポートされていません。 COPY INTO
を使用してテーブルにデータが挿入されると、テーブルはクエリ可能になります。
COPY INTO のターゲット表の作成を参照してください。
例: スキーマを設定し、Delta Lake テーブルに データを読み込む
次の例は、 Deltaテーブルを作成し、COPY INTO
SQL コマンドを使用してDatabricks データセットからテーブルにサンプル データを読み込む方法を示しています。 Databricks クラスター に接続された ノートブック からサンプルの Python、R、Scala、または SQL コードを実行できます。の Databricks SQLSQLウェアハウス に関連付けられた クエリ から SQL コードを実行することもできます。
DROP TABLE IF EXISTS default.loan_risks_upload;
CREATE TABLE default.loan_risks_upload (
loan_id BIGINT,
funded_amnt INT,
paid_amnt DOUBLE,
addr_state STRING
);
COPY INTO default.loan_risks_upload
FROM '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
FILEFORMAT = PARQUET;
SELECT * FROM default.loan_risks_upload;
-- Result:
-- +---------+-------------+-----------+------------+
-- | loan_id | funded_amnt | paid_amnt | addr_state |
-- +=========+=============+===========+============+
-- | 0 | 1000 | 182.22 | CA |
-- +---------+-------------+-----------+------------+
-- | 1 | 1000 | 361.19 | WA |
-- +---------+-------------+-----------+------------+
-- | 2 | 1000 | 176.26 | TX |
-- +---------+-------------+-----------+------------+
-- ...
table_name = 'default.loan_risks_upload'
source_data = '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
source_format = 'PARQUET'
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" \
"loan_id BIGINT, " + \
"funded_amnt INT, " + \
"paid_amnt DOUBLE, " + \
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name + \
" FROM '" + source_data + "'" + \
" FILEFORMAT = " + source_format
)
loan_risks_upload_data = spark.sql("SELECT * FROM " + table_name)
display(loan_risks_upload_data)
'''
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
'''
library(SparkR)
sparkR.session()
table_name = "default.loan_risks_upload"
source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
source_format = "PARQUET"
sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))
sql(paste("CREATE TABLE ", table_name, " (",
"loan_id BIGINT, ",
"funded_amnt INT, ",
"paid_amnt DOUBLE, ",
"addr_state STRING)",
sep = ""
))
sql(paste("COPY INTO ", table_name,
" FROM '", source_data, "'",
" FILEFORMAT = ", source_format,
sep = ""
))
loan_risks_upload_data = tableToDF(table_name)
display(loan_risks_upload_data)
# Result:
# +---------+-------------+-----------+------------+
# | loan_id | funded_amnt | paid_amnt | addr_state |
# +=========+=============+===========+============+
# | 0 | 1000 | 182.22 | CA |
# +---------+-------------+-----------+------------+
# | 1 | 1000 | 361.19 | WA |
# +---------+-------------+-----------+------------+
# | 2 | 1000 | 176.26 | TX |
# +---------+-------------+-----------+------------+
# ...
val table_name = "default.loan_risks_upload"
val source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
val source_format = "PARQUET"
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" +
"loan_id BIGINT, " +
"funded_amnt INT, " +
"paid_amnt DOUBLE, " +
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name +
" FROM '" + source_data + "'" +
" FILEFORMAT = " + source_format
)
val loan_risks_upload_data = spark.table(table_name)
display(loan_risks_upload_data)
/*
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
*/
クリーンアップするには、次のコードを実行してテーブルを削除します。
spark.sql("DROP TABLE " + table_name)
sql(paste("DROP TABLE ", table_name, sep = ""))
spark.sql("DROP TABLE " + table_name)
DROP TABLE default.loan_risks_upload
参考
Databricks Runtime 7.x 以降: COPY INTO
関連リソース
同じ Delta テーブルに対する複数の
COPY INTO
操作の例など、一般的な使用パターンについては、「 COPY INTO を使用した一般的なデータ読み込みパターン」を参照してください。