チュートリアル: Spark SQL を使用した COPY INTO
Databricksでは、数千のファイルを含むデータソースの増分データロードおよび一括データロードにはCOPY INTOコマンドを使用することを推奨しています。
このチュートリアルでは、 COPY INTOコマンドを使用して、 Unity CatalogボリュームからDatabricksワークスペースのDeltaテーブルにJSONデータを読み込みます。 データソースとして、 Wanderbricksのサンプルデータセットを使用します。より高度なデータ取り込みのユースケースについては、 Auto Loaderとは?」を参照してください。
必要条件
- コンピュート リソースへのアクセス。 「コンピュート」を参照。
- Unityカタログが有効になっているワークスペースで、カタログ内にスキーマとボリュームを作成する権限が付与されているもの。Unity Catalog を使用してクラウドオブジェクトストレージに接続する方法については、こちらをご覧ください。
ステップ 1: 環境を構成する
このチュートリアルのコードは、 Unity Catalogボリュームを使用してJSONソース ファイルを保存します。 <catalog> CREATE SCHEMAとCREATE VOLUME権限を持つカタログに置き換えてください。コードを実行できない場合は、ワークスペース管理者に連絡してください。
ノートブックを作成し、コンピュート リソースに添付します。 次に、以下のコードを実行して、このチュートリアル用のスキーマとボリュームを設定します。
- Python
- SQL
# Set parameters and reset demo environment
catalog = "<catalog>"
username = spark.sql("SELECT regexp_replace(session_user(), '[^a-zA-Z0-9]', '_')").first()[0]
schema = f"copyinto_{username}_db"
volume = "copy_into_source"
source = f"/Volumes/{catalog}/{schema}/{volume}"
spark.sql(f"SET c.catalog={catalog}")
spark.sql(f"SET c.schema={schema}")
spark.sql(f"SET c.volume={volume}")
spark.sql(f"DROP SCHEMA IF EXISTS {catalog}.{schema} CASCADE")
spark.sql(f"CREATE SCHEMA {catalog}.{schema}")
spark.sql(f"CREATE VOLUME {catalog}.{schema}.{volume}")
-- Reset demo environment
DROP SCHEMA IF EXISTS <catalog>.copy_into_tutorial CASCADE;
CREATE SCHEMA <catalog>.copy_into_tutorial;
CREATE VOLUME <catalog>.copy_into_tutorial.copy_into_source;
ステップ 2: サンプルデータをJSONとしてボリュームに書き込みます
COPY INTOコマンドは、ファイルベースのソースからデータを読み込みます。Wanderbricks bookingsサンプルテーブルからデータを読み込み、外部システムからデータが到着する様子をシミュレートして、レコードのバッチを JSON ファイルとしてボリュームに書き込みます。
- Python
- SQL
# Write a batch of Wanderbricks bookings data as JSON to the volume
bookings = spark.read.table("samples.wanderbricks.bookings")
batch_1 = bookings.orderBy("booking_id").limit(20)
batch_1.write.mode("append").json(f"{source}/bookings")
ボリュームにファイルを書き込むにはPythonが必要です。実際のワークフローでは、このデータは外部システムから送られてくる。
%python
# Write a batch of Wanderbricks bookings data as JSON to the volume
bookings = spark.read.table("samples.wanderbricks.bookings")
batch_1 = bookings.orderBy("booking_id").limit(20)
batch_1.write.mode("append").json("/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings")
ステップ3:COPY INTOを使用してJSONデータをべき等にロードする
COPY INTOを使用する前に、ターゲット Delta テーブルを作成してください。CREATE TABLEステートメントでは、テーブル名以外に何も指定する必要はありません。この操作は冪等性を持つため、コードを複数回実行しても、Databricksはデータを一度だけ読み込みます。
- Python
- SQL
# Create target table and load data
spark.sql(f"CREATE TABLE IF NOT EXISTS {catalog}.{schema}.bookings_target")
spark.sql(f"""
COPY INTO {catalog}.{schema}.bookings_target
FROM '/Volumes/{catalog}/{schema}/{volume}/bookings'
FILEFORMAT = JSON
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true')
""")
-- Create target table and load data
CREATE TABLE IF NOT EXISTS <catalog>.copy_into_tutorial.bookings_target;
COPY INTO <catalog>.copy_into_tutorial.bookings_target
FROM '/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings'
FILEFORMAT = JSON
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true')
ステップ 4: テーブルの内容をプレビューする
テーブルにWanderbricksの予約データの最初の行から20行が含まれていること、およびスキーマがJSONソースファイルから正しく推論されていることを確認してください。
- Python
- SQL
# Review loaded data
display(spark.sql(f"SELECT * FROM {catalog}.{schema}.bookings_target"))
-- Review loaded data
SELECT * FROM <catalog>.copy_into_tutorial.bookings_target
ステップ5:さらにデータを読み込んで結果をプレビューする
外部システムから追加データが届くことをシミュレートするには、別のレコードのバッチを書き込み、 COPY INTO再度実行します。次のコードを実行して、データの 2 番目のバッチを書き込みます。
- Python
- SQL
# Write another batch of Wanderbricks bookings data as JSON
bookings = spark.read.table("samples.wanderbricks.bookings")
batch_2 = bookings.orderBy(bookings.booking_id.desc()).limit(20)
batch_2.write.mode("append").json(f"{source}/bookings")
ボリュームにファイルを書き込むにはPythonが必要です。実際のワークフローでは、このデータは外部システムから送られてくる。
%python
# Write another batch of Wanderbricks bookings data as JSON
bookings = spark.read.table("samples.wanderbricks.bookings")
batch_2 = bookings.orderBy(bookings.booking_id.desc()).limit(20)
batch_2.write.mode("append").json("/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings")
次に、ステップ 3 からCOPY INTOコマンドを再度実行し、テーブルをプレビューして新しいレコードを確認します。 新しいファイルのみが読み込まれます。
- Python
- SQL
# Confirm new data was loaded
display(spark.sql(f"SELECT COUNT(*) AS total_rows FROM {catalog}.{schema}.bookings_target"))
-- Confirm new data was loaded
SELECT COUNT(*) AS total_rows FROM <catalog>.copy_into_tutorial.bookings_target
ステップ 6: クリーンアップのチュートリアル
このチュートリアルが完了したら、不要になった関連リソースは削除できます。スキーマ、テーブル、ボリュームを削除し、すべてのデータを削除します。
- Python
- SQL
# Drop schema and all associated objects
spark.sql(f"DROP SCHEMA IF EXISTS {catalog}.{schema} CASCADE")
-- Drop schema and all associated objects
DROP SCHEMA IF EXISTS <catalog>.copy_into_tutorial CASCADE;