メインコンテンツまでスキップ

チュートリアル: Spark SQL を使用した COPY INTO

Databricks では、数千のファイルを含むデータソースの増分データ読み込みと一括データ読み込みには、COPY INTO コマンドを使用することをお勧めします。Databricks では、高度なユースケースには Auto Loader を使用することをお勧めします。

このチュートリアルでは、 COPY INTO コマンドを使用して、クラウドオブジェクトストレージから Databricks ワークスペースのテーブルにデータを読み込みます。

必要条件

  1. Databricks アカウントと、アカウント内の Databricks ワークスペース。 これらを作成するには、「 Databricks の概要」を参照してください。
  2. 11.3 以降を実行しているワークスペース 汎用クラスターDatabricks RuntimeLTS 。All-Purposeクラスターを作成するには、 コンピュート構成リファレンスを参照してください。
  3. Databricks ワークスペースのユーザー インターフェイスに精通している。 「ワークスペースの移動」を参照してください。
  4. Databricks ノートブックの操作に精通している。
  5. データを書き込むことができる場所。このデモでは、例として DBFSルートを使用しますが、 Databricks では、 Unity Catalogで構成された外部ストレージの場所をお勧めします。

ステップ1.環境を構成し、データ ジェネレーターを作成する

このチュートリアルは、 Databricks とデフォルト ワークスペース構成に関する基本的な知識があることを前提としています。 提供されたコードを実行できない場合は、ワークスペース管理者に問い合わせて、コンピュート リソースにアクセスできることと、データを書き込むことができる場所があることを確認してください。

提供されているコードでは、 source パラメーターを使用して、 COPY INTO データソースとして構成する場所を指定していることに注意してください。 記述されているように、このコードはルート上の場所を指し DBFS。 外部オブジェクトストレージの場所に対する書き込み権限がある場合は、ソース文字列の dbfs:/ 部分をオブジェクトストレージへのパスに置き換えます。 このコード ブロックでは、このデモをリセットするために再帰的な削除も行われるため、これを本番運用データに指定しないようにし、既存のデータの上書きや削除を避けるために、 /user/{username}/copy-into-demo ネストされたディレクトリを保持してください。

  1. 新しいSQL ノートブックを作成し Databricks RuntimeLTS、 11.3 以降を実行している クラスターにアタッチします 。

  2. 次のコードをコピーして実行し、このチュートリアルで使用するストレージの場所とデータベースをリセットします。

    Python
    %python
    # Set parameters for isolation in workspace and reset demo

    username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
    database = f"copyinto_{username}_db"
    source = f"dbfs:/user/{username}/copy-into-demo"

    spark.sql(f"SET c.username='{username}'")
    spark.sql(f"SET c.database={database}")
    spark.sql(f"SET c.source='{source}'")

    spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
    spark.sql("CREATE DATABASE ${c.database}")
    spark.sql("USE ${c.database}")

    dbutils.fs.rm(source, True)
  3. 次のコードをコピーして実行し、データをランダムに生成するために使用されるいくつかのテーブルと関数を構成します。

    SQL
    -- Configure random data generator

    CREATE TABLE user_ping_raw
    (user_id STRING, ping INTEGER, time TIMESTAMP)
    USING json
    LOCATION ${c.source};

    CREATE TABLE user_ids (user_id STRING);

    INSERT INTO user_ids VALUES
    ("potato_luver"),
    ("beanbag_lyfe"),
    ("default_username"),
    ("the_king"),
    ("n00b"),
    ("frodo"),
    ("data_the_kid"),
    ("el_matador"),
    ("the_wiz");

    CREATE FUNCTION get_ping()
    RETURNS INT
    RETURN int(rand() * 250);

    CREATE FUNCTION is_active()
    RETURNS BOOLEAN
    RETURN CASE
    WHEN rand() > .25 THEN true
    ELSE false
    END;

ステップ 2: サンプル データをクラウド ストレージに書き込む

Delta Lake 以外のデータ形式への書き込みは、Databricks ではまれです。 ここで提供されるコードは JSON に書き込み、別のシステムからオブジェクトをストレージにダンプする可能性のある外部システムをシミュレートします。

  1. 次のコードをコピーして実行し、生の JSON データのバッチを書き込みます。

    SQL
    -- Write a new batch of data to the data source

    INSERT INTO user_ping_raw
    SELECT *,
    get_ping() ping,
    current_timestamp() time
    FROM user_ids
    WHERE is_active()=true;

ステップ3:COPY INTOを使用してJSONデータをべき等にロードする

ターゲットの Delta Lake テーブルは、 COPY INTOを使用する前に作成する必要があります。 Databricks Runtime 11.3 LTS 以降では、 CREATE TABLE ステートメントにテーブル名以外のものを指定する必要はありません。 以前のバージョンの Databricks Runtime では、空のテーブルを作成するときにスキーマを指定する必要があります。

  1. 次のコードをコピーして実行し、ターゲットの Delta テーブルを作成し、ソースからデータを読み込みます。

    SQL
    -- Create target table and load data

    CREATE TABLE IF NOT EXISTS user_ping_target;

    COPY INTO user_ping_target
    FROM ${c.source}
    FILEFORMAT = JSON
    FORMAT_OPTIONS ("mergeSchema" = "true")
    COPY_OPTIONS ("mergeSchema" = "true")

このアクションはべき等であるため、複数回実行できますが、データは 1 回しか読み込まれません。

ステップ 4: テーブルの内容をプレビューする

単純な SQL クエリを実行して、このテーブルの内容を手動で確認できます。

  1. 次のコードをコピーして実行し、テーブルをプレビューします。

    SQL
    -- Review updated table

    SELECT * FROM user_ping_target

ステップ5:さらにデータを読み込んで結果をプレビューする

手順 2 から 4 を何度も再実行して、ランダムな生 JSON データの新しいバッチをソースに配置し、それらを COPY INTOを使用して Delta Lake にべき等に読み込み、結果をプレビューできます。 これらの手順を順不同または複数回実行して、新しいデータが到着せずに生データの複数のバッチが書き込まれるか、 COPY INTO 複数回実行されることをシミュレートしてみてください。

ステップ 6: クリーンアップのチュートリアル

このチュートリアルを終了したら、関連するリソースを保持する必要がなくなった場合は、それらをクリーンアップできます。

  1. 次のコードをコピーして実行し、データベースとテーブルを削除し、すべてのデータを削除します。

    Python
    %python
    # Drop database and tables and remove data

    spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
    dbutils.fs.rm(source, True)
  2. コンピュート リソースを停止するには、 クラスター タブに移動し、 クラスターを終了する .

追加のリソース