メインコンテンツまでスキップ

チュートリアル: Spark SQL を使用した COPY INTO

Databricksでは、数千のファイルを含むデータソースの増分データロードおよび一括データロードにはCOPY INTOコマンドを使用することを推奨しています。

このチュートリアルでは、 COPY INTOコマンドを使用して、 Unity CatalogボリュームからDatabricksワークスペースのDeltaテーブルにJSONデータを読み込みます。 データソースとして、 Wanderbricksのサンプルデータセットを使用します。より高度なデータ取り込みのユースケースについては、 Auto Loaderとは?」を参照してください。

必要条件

ステップ 1: 環境を構成する

このチュートリアルのコードは、 Unity Catalogボリュームを使用してJSONソース ファイルを保存します。 <catalog> CREATE SCHEMACREATE VOLUME権限を持つカタログに置き換えてください。コードを実行できない場合は、ワークスペース管理者に連絡してください。

ノートブックを作成し、コンピュート リソースに添付します。 次に、以下のコードを実行して、このチュートリアル用のスキーマとボリュームを設定します。

Python
# Set parameters and reset demo environment

catalog = "<catalog>"

username = spark.sql("SELECT regexp_replace(session_user(), '[^a-zA-Z0-9]', '_')").first()[0]
schema = f"copyinto_{username}_db"
volume = "copy_into_source"
source = f"/Volumes/{catalog}/{schema}/{volume}"

spark.sql(f"SET c.catalog={catalog}")
spark.sql(f"SET c.schema={schema}")
spark.sql(f"SET c.volume={volume}")

spark.sql(f"DROP SCHEMA IF EXISTS {catalog}.{schema} CASCADE")
spark.sql(f"CREATE SCHEMA {catalog}.{schema}")
spark.sql(f"CREATE VOLUME {catalog}.{schema}.{volume}")

ステップ 2: サンプルデータをJSONとしてボリュームに書き込みます

COPY INTOコマンドは、ファイルベースのソースからデータを読み込みます。Wanderbricks bookingsサンプルテーブルからデータを読み込み、外部システムからデータが到着する様子をシミュレートして、レコードのバッチを JSON ファイルとしてボリュームに書き込みます。

Python
# Write a batch of Wanderbricks bookings data as JSON to the volume

bookings = spark.read.table("samples.wanderbricks.bookings")
batch_1 = bookings.orderBy("booking_id").limit(20)
batch_1.write.mode("append").json(f"{source}/bookings")

ステップ3:COPY INTOを使用してJSONデータをべき等にロードする

COPY INTOを使用する前に、ターゲット Delta テーブルを作成してください。CREATE TABLEステートメントでは、テーブル名以外に何も指定する必要はありません。この操作は冪等性を持つため、コードを複数回実行しても、Databricksはデータを一度だけ読み込みます。

Python
# Create target table and load data

spark.sql(f"CREATE TABLE IF NOT EXISTS {catalog}.{schema}.bookings_target")

spark.sql(f"""
COPY INTO {catalog}.{schema}.bookings_target
FROM '/Volumes/{catalog}/{schema}/{volume}/bookings'
FILEFORMAT = JSON
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true')
""")

ステップ 4: テーブルの内容をプレビューする

テーブルにWanderbricksの予約データの最初の行から20行が含まれていること、およびスキーマがJSONソースファイルから正しく推論されていることを確認してください。

Python
# Review loaded data

display(spark.sql(f"SELECT * FROM {catalog}.{schema}.bookings_target"))

ステップ5:さらにデータを読み込んで結果をプレビューする

外部システムから追加データが届くことをシミュレートするには、別のレコードのバッチを書き込み、 COPY INTO再度実行します。次のコードを実行して、データの 2 番目のバッチを書き込みます。

Python
# Write another batch of Wanderbricks bookings data as JSON

bookings = spark.read.table("samples.wanderbricks.bookings")
batch_2 = bookings.orderBy(bookings.booking_id.desc()).limit(20)
batch_2.write.mode("append").json(f"{source}/bookings")

次に、ステップ 3 からCOPY INTOコマンドを再度実行し、テーブルをプレビューして新しいレコードを確認します。 新しいファイルのみが読み込まれます。

Python
# Confirm new data was loaded

display(spark.sql(f"SELECT COUNT(*) AS total_rows FROM {catalog}.{schema}.bookings_target"))

ステップ 6: クリーンアップのチュートリアル

このチュートリアルが完了したら、不要になった関連リソースは削除できます。スキーマ、テーブル、ボリュームを削除し、すべてのデータを削除します。

Python
# Drop schema and all associated objects

spark.sql(f"DROP SCHEMA IF EXISTS {catalog}.{schema} CASCADE")

追加のリソース