チュートリアル: Spark SQL を使用したCOPY INTO

Databricks では、何千ものファイルを含む DATA の増分および一括データ読み込みに COPY INTO コマンドを使用することをお勧めします。 Databricks では、高度なユース ケースには Auto Loader を使用することをお勧めします。

このチュートリアルでは、 COPY INTO コマンドを使用して、クラウド オブジェクト ストレージから Databricks ワークスペースのテーブルにデータを読み込みます。

要件

  1. Databricks アカウント、およびアカウント内の Databricks ワークスペース。 これらを作成するには、「 はじめに: アカウントとワークスペースの設定」を参照してください。

  2. Databricks Runtime 11.3 LTS 以上を実行しているワークスペース内の多目的クラスター。 多目的クラスターを作成するには、コンピュート構成リファレンスを参照してください。

  3. Databricks ワークスペースのユーザー インターフェイスに関する知識。 「ワークスペースのナビゲート」を参照してください。

  4. Databricks ノートブックに関する知識。

  5. データを書き込むことができる場所。このデモでは例として DBFSroot を使用しますが、Databricks では Unity Catalogで構成された外部ストレージの場所をお勧めします。

ステップ1.環境を構成し、データ ジェネレーター を作成する

このチュートリアルは、Databricks と既定のワークスペース構成に関する基本的な知識があることを前提としています。 提供されたコードを実行できない場合は、ワークスペース管理者に問い合わせて、コンピュート リソースとデータを書き込むことができる場所にアクセスできることを確認してください。

提供されているコードでは、 source パラメーターを使用して、 COPY INTO データソースとして構成する場所を指定していることに注意してください。 記述されているように、このコードは DBFSルート上の場所を指します。 外部オブジェクト・ストレージの場所に対する書き込み権限がある場合は、ソース文字列の dbfs:/ 部分をオブジェクト・ストレージへのパスに置き換えます。 このコード ブロックでは、このデモをリセットするために再帰的な削除も行われるため、これを運用データに向けないようにし、既存のデータの上書きまたは削除を避けるために、 /user/{username}/copy-into-demo 入れ子になったディレクトリを保持するようにしてください。

  1. 新しい SQL ノートブックを作成し、Databricks Runtime 11.3 LTS 以上を実行しているクラスターに接続します

  2. 次のコードをコピーして実行し、このチュートリアルで使用するストレージの場所とデータベースをリセットします。

    %python
    # Set parameters for isolation in workspace and reset demo
    
    username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
    database = f"copyinto_{username}_db"
    source = f"dbfs:/user/{username}/copy-into-demo"
    
    spark.sql(f"SET c.username='{username}'")
    spark.sql(f"SET c.database={database}")
    spark.sql(f"SET c.source='{source}'")
    
    spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
    spark.sql("CREATE DATABASE ${c.database}")
    spark.sql("USE ${c.database}")
    
    dbutils.fs.rm(source, True)
    
  3. 次のコードをコピーして実行し、データをランダムに生成するために使用されるいくつかのテーブルと関数を構成します。

    -- Configure random data generator
    
    CREATE TABLE user_ping_raw
    (user_id STRING, ping INTEGER, time TIMESTAMP)
    USING json
    LOCATION ${c.source};
    
    CREATE TABLE user_ids (user_id STRING);
    
    INSERT INTO user_ids VALUES
    ("potato_luver"),
    ("beanbag_lyfe"),
    ("default_username"),
    ("the_king"),
    ("n00b"),
    ("frodo"),
    ("data_the_kid"),
    ("el_matador"),
    ("the_wiz");
    
    CREATE FUNCTION get_ping()
        RETURNS INT
        RETURN int(rand() * 250);
    
    CREATE FUNCTION is_active()
        RETURNS BOOLEAN
        RETURN CASE
            WHEN rand() > .25 THEN true
            ELSE false
            END;
    

ステップ 2: サンプルデータをクラウドストレージ に書き込む

Delta Lake 以外のデータ形式への書き込みは、Databricks ではまれです。ここで提供されるコードはJSONに書き込み、別のシステムからの結果をオブジェクトストレージにダンプする可能性のある外部システムをシミュレートします。

  1. 次のコードをコピーして実行し、生の JSON データのバッチを書き込みます。

    -- Write a new batch of data to the data source
    
    INSERT INTO user_ping_raw
    SELECT *,
      get_ping() ping,
      current_timestamp() time
    FROM user_ids
    WHERE is_active()=true;
    

ステップ 3: COPY INTO を使用して JSON データをべき等に ロードする

COPY INTOを使用する前に、ターゲット Delta Lake テーブルを作成する必要があります。 Databricks Runtime 11.3 LTS 以降では、 CREATE TABLEステートメントでテーブル名以外のものを指定する必要はありません。 以前のバージョンの Databricks Runtime では、空のテーブルを作成するときにスキーマを指定する必要があります。

  1. 次のコードをコピーして実行し、ターゲット Delta テーブルを作成し、ソースからデータを読み込みます。

    -- Create target table and load data
    
    CREATE TABLE IF NOT EXISTS user_ping_target;
    
    COPY INTO user_ping_target
    FROM ${c.source}
    FILEFORMAT = JSON
    FORMAT_OPTIONS ("mergeSchema" = "true")
    COPY_OPTIONS ("mergeSchema" = "true")
    

このアクションはべき等であるため、複数回実行できますが、データは一度だけ読み込まれます。

ステップ 4: 表 の内容をプレビューする

単純な SQL クエリーを実行して、このテーブルの内容を手動で確認できます。

  1. 次のコードをコピーして実行し、テーブルをプレビューします。

    -- Review updated table
    
    SELECT * FROM user_ping_target
    

ステップ 5: より多くのデータを読み込み、結果を プレビューする

ステップ 2-4 を何度も再実行して、ランダムな生の JSON データの新しいバッチをソースにランディングし、 COPY INTOを使用して Delta Lake にロードし、結果をプレビューできます。 これらのステップ を順不同または複数回実行して、新しいデータが到着せずに COPY INTO 複数回書き込まれたり実行されたりする生データの複数のバッチをシミュレートしてみてください。

ステップ 6: チュートリアルのクリーンアップ

このチュートリアルを完了したら、関連するリソースを保持する必要がなくなった場合は、それらをクリーンアップできます。

  1. 次のコードをコピーして実行し、データベースとテーブルを削除し、すべてのデータを削除します。

    %python
    # Drop database and tables and remove data
    
    spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
    dbutils.fs.rm(source, True)
    
  2. コンピュート リソースを停止するには、[ クラスター ] タブに移動し、 クラスターを終了します

関連リソース