チュートリアル: Spark SQL を使用した COPY INTO
Databricks では、数千のファイルを含むデータソースの増分データ読み込みと一括データ読み込みには、COPY INTO コマンドを使用することをお勧めします。Databricks では、高度なユースケースには Auto Loader を使用することをお勧めします。
このチュートリアルでは、 COPY INTO
コマンドを使用して、クラウドオブジェクトストレージから Databricks ワークスペースのテーブルにデータを読み込みます。
必要条件
- Databricks アカウントと、アカウント内の Databricks ワークスペース。 これらを作成するには、「 Databricks の概要」を参照してください。
- 11.3 以降を実行しているワークスペース 汎用クラスターDatabricks RuntimeLTS 。All-Purposeクラスターを作成するには、 コンピュート構成リファレンスを参照してください。
- Databricks ワークスペースのユーザー インターフェイスに精通している。 「ワークスペースの移動」を参照してください。
- Databricks ノートブックの操作に精通している。
- データを書き込むことができる場所。このデモでは、例として DBFSルートを使用しますが、 Databricks では、 Unity Catalogで構成された外部ストレージの場所をお勧めします。
ステップ1.環境を構成し、データ ジェネレーターを作成する
このチュートリアルは、 Databricks とデフォルト ワークスペース構成に関する基本的な知識があることを前提としています。 提供されたコードを実行できない場合は、ワークスペース管理者に問い合わせて、コンピュート リソースにアクセスできることと、データを書き込むことができる場所があることを確認してください。
提供されているコードでは、 source
パラメーターを使用して、 COPY INTO
データソースとして構成する場所を指定していることに注意してください。 記述されているように、このコードはルート上の場所を指し DBFS。 外部オブジェクトストレージの場所に対する書き込み権限がある場合は、ソース文字列の dbfs:/
部分をオブジェクトストレージへのパスに置き換えます。 このコード ブロックでは、このデモをリセットするために再帰的な削除も行われるため、これを本番運用データに指定しないようにし、既存のデータの上書きや削除を避けるために、 /user/{username}/copy-into-demo
ネストされたディレクトリを保持してください。
-
新しいSQL ノートブックを作成し Databricks RuntimeLTS、 11.3 以降を実行している クラスターにアタッチします 。
-
次のコードをコピーして実行し、このチュートリアルで使用するストレージの場所とデータベースをリセットします。
Python%python
# Set parameters for isolation in workspace and reset demo
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
database = f"copyinto_{username}_db"
source = f"dbfs:/user/{username}/copy-into-demo"
spark.sql(f"SET c.username='{username}'")
spark.sql(f"SET c.database={database}")
spark.sql(f"SET c.source='{source}'")
spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
spark.sql("CREATE DATABASE ${c.database}")
spark.sql("USE ${c.database}")
dbutils.fs.rm(source, True) -
次のコードをコピーして実行し、データをランダムに生成するために使用されるいくつかのテーブルと関数を構成します。
SQL-- Configure random data generator
CREATE TABLE user_ping_raw
(user_id STRING, ping INTEGER, time TIMESTAMP)
USING json
LOCATION ${c.source};
CREATE TABLE user_ids (user_id STRING);
INSERT INTO user_ids VALUES
("potato_luver"),
("beanbag_lyfe"),
("default_username"),
("the_king"),
("n00b"),
("frodo"),
("data_the_kid"),
("el_matador"),
("the_wiz");
CREATE FUNCTION get_ping()
RETURNS INT
RETURN int(rand() * 250);
CREATE FUNCTION is_active()
RETURNS BOOLEAN
RETURN CASE
WHEN rand() > .25 THEN true
ELSE false
END;
ステップ 2: サンプル データをクラウド ストレージに書き込む
Delta Lake 以外のデータ形式への書き込みは、Databricks ではまれです。 ここで提供されるコードは JSON に書き込み、別のシステムからオブジェクトをストレージにダンプする可能性のある外部システムをシミュレートします。
-
次のコードをコピーして実行し、生の JSON データのバッチを書き込みます。
SQL-- Write a new batch of data to the data source
INSERT INTO user_ping_raw
SELECT *,
get_ping() ping,
current_timestamp() time
FROM user_ids
WHERE is_active()=true;
ステップ3:COPY INTOを使用してJSONデータをべき等にロードする
ターゲットの Delta Lake テーブルは、 COPY INTO
を使用する前に作成する必要があります。 Databricks Runtime 11.3 LTS 以降では、 CREATE TABLE
ステートメントにテーブル名以外のものを指定する必要はありません。 以前のバージョンの Databricks Runtime では、空のテーブルを作成するときにスキーマを指定する必要があります。
-
次のコードをコピーして実行し、ターゲットの Delta テーブルを作成し、ソースからデータを読み込みます。
SQL-- Create target table and load data
CREATE TABLE IF NOT EXISTS user_ping_target;
COPY INTO user_ping_target
FROM ${c.source}
FILEFORMAT = JSON
FORMAT_OPTIONS ("mergeSchema" = "true")
COPY_OPTIONS ("mergeSchema" = "true")
このアクションはべき等であるため、複数回実行できますが、データは 1 回しか読み込まれません。
ステップ 4: テーブルの内容をプレビューする
単純な SQL クエリを実行して、このテーブルの内容を手動で確認できます。
-
次のコードをコピーして実行し、テーブルをプレビューします。
SQL-- Review updated table
SELECT * FROM user_ping_target
ステップ5:さらにデータを読み込んで結果をプレビューする
手順 2 から 4 を何度も再実行して、ランダムな生 JSON データの新しいバッチをソースに配置し、それらを COPY INTO
を使用して Delta Lake にべき等に読み込み、結果をプレビューできます。 これらの手順を順不同または複数回実行して、新しいデータが到着せずに生データの複数のバッチが書き込まれるか、 COPY INTO
複数回実行されることをシミュレートしてみてください。
ステップ 6: クリーンアップのチュートリアル
このチュートリアルを終了したら、関連するリソースを保持する必要がなくなった場合は、それらをクリーンアップできます。
-
次のコードをコピーして実行し、データベースとテーブルを削除し、すべてのデータを削除します。
Python%python
# Drop database and tables and remove data
spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
dbutils.fs.rm(source, True) -
コンピュート リソースを停止するには、 クラスター タブに移動し、 クラスターを終了する .
追加のリソース
- COPY INTOの参照記事