はじめに:追加データを取り込んで挿入する

この入門記事では、 Databricksダッシュボードを使用して、追加の赤ちゃんの名前データを含むCSVファイルをUnity Catalogボリュームに取り込み、 Python 、 Scala 、R を使用して新しい赤ちゃんの名前データを既存のテーブルにインポートする方法について説明します。

重要

この入門記事は、入門: ノートブックから CSV データをインポートして視覚化するに基づいています。 この記事を完了するには、その記事のステップを完了する必要があります。 入門記事の完全なノートブックについては、 データのインポートと視覚化ノートブックを参照してください。

要件

この記事のタスクを完了するには、次の要件を満たす必要があります。

  • ワークスペースでUnity Catalog が有効になっている必要があります。 Unity Catalogの使用開始に関する情報については、 Unity Catalogのセットアップと管理を参照してください。

  • ボリュームに対する WRITE VOLUME 権限、親スキーマに対する USE SCHEMA 権限、および親カタログに対する USE CATALOG 権限が必要です。

  • 既存のコンピュート リソースを使用するか、新しいコンピュート リソースを作成するには、アクセス許可が必要です。 Databricks の使用を開始するを参照するか、Databricks 管理者に問い合わせてください。

ヒント

この記事の完成したノートブックについては、 追加データの取り込みノートブックを参照してください。

ステップ1:新しいノートブックを作成する

ワークスペースにノートブックを作成するには、サイドバーで 新しいアイコン新規作成をクリックし、ノートブックをクリックします。空白のノートブックがワークスペースで開きます。

ノートブックの作成と管理の詳細については、ノートブックの管理を参照してください。

ステップ2:変数を定義する

このステップでは、この記事で作成するノートブックの例で使用する変数を定義します。

  1. 次のコードをコピーして、新しい空のノートブック セルに貼り付けます。 <catalog-name><schema-name>、および <volume-name> を、Unity Catalog ボリュームのカタログ、スキーマ、およびボリューム名に置き換えます。必要に応じて、 table_name 値を任意のテーブル名に置き換えます。 赤ちゃんの名前のデータは、この記事の後半でこのテーブルに保存します。

  2. Shift+Enterを押すとセルが実行され、新しい空白のセルが作成されます。

    catalog = "<catalog_name>"
    schema = "<schema_name>"
    volume = "<volume_name>"
    file_name = "new_baby_names.csv"
    table_name = "baby_names"
    path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume
    path_table = catalog + "." + schema
    print(path_table) # Show the complete path
    print(path_volume) # Show the complete path
    
    val catalog = "<catalog_name>"
    val schema = "<schema_name>"
    val volume = "<volume_name>"
    val fileName = "new_baby_names.csv"
    val tableName = "baby_names"
    val pathVolume = s"/Volumes/${catalog}/${schema}/${volume}"
    val pathTable = s"${catalog}.${schema}"
    print(pathVolume) // Show the complete path
    print(pathTable) // Show the complete path
    
    catalog <- "<catalog_name>"
    schema <- "<schema_name>"
    volume <- "<volume_name>"
    file_name <- "new_baby_names.csv"
    table_name <- "baby_names"
    path_volume <- paste0("/Volumes/", catalog, "/", schema, "/", volume, sep = "")
    path_table <- paste0(catalog, ".", schema, sep = "")
    print(path_volume) # Show the complete path
    print(path_table) # Show the complete path
    

ステップ3:Unity Catalogボリュームに新しいCSVファイルデータを追加する

このステップでは、2022年の新しい赤ちゃんの名前で「df」というDataFrameを作成し、そのデータをUnity Catalogボリューム内の新しいCSVファイルに保存します。

注:

このステップでは、以前の年のロード済みの既存データに新しい年のデータを追加するシミュレーションを行います。本番運用環境では、この増分データはクラウドストレージに保存されます。

  1. 次のコードをコピーして、新しい空のノートブックセルに貼り付けます。このコードは、赤ちゃんの名前データを追加したDataFrameを作成し、そのデータをUnity CatalogボリュームのCSVファイルに書き込みます。

    data = [[2022, "CARL", "Albany", "M", 42]]
    
    df = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int")
    # display(df)
    (df.coalesce(1)
        .write
        .option("header", "true")
        .mode("overwrite")
        .csv(f"{path_volume}/{file_name}"))
    
    val data = Seq((2022, "CARL", "Albany", "M", 42))
    val columns = Seq("Year", "First_Name", "County", "Sex", "Count")
    
    val df = data.toDF(columns: _*)
    
    // display(df)
    df.coalesce(1)
        .write
        .option("header", "true")
        .mode("overwrite")
        .csv(f"{pathVolume}/{fileName}")
    
    # Load the SparkR package that is already preinstalled on the cluster.
    library(SparkR)
    
    data <- data.frame(Year = 2022,
        First_Name = "CARL",
        County = "Albany",
        Sex = "M",
        Count = 42)
    
    df <- createDataFrame(data)
    # display(df)
    write.df(df, path = paste0(path_volume, "/", file_name),
        source = "csv",
        mode = "overwrite",
        header = "true")
    
  2. Shift+Enterを押してセルを実行し、次のセルに移動します。

ステップ4:CSVファイルからDataFrameにデータをロードする

注:

このステップでは、クラウドストレージからデータをロードするシミュレーションを行います。

  1. 次のコードをコピーして、ノートブックの空のセルに貼り付けます。このコードは、新しい赤ちゃんの名前データをCSVファイルから新しいDataFrameにロードします。

    df1 = spark.read.csv(f"{path_volume}/{file_name}",
        header=True,
        inferSchema=True,
        sep=",")
    display(df1)
    
    val df1 = spark.read
        .option("header", "true")
        .option("inferSchema", "true")
        .option("delimiter", ",")
        .csv(s"$pathVolume/$fileName")
    display(df1)
    
    df1 <- read.df(paste0(path_volume, "/", file_name),
        source = "csv",
        header = TRUE,
        inferSchema = TRUE)
    display(df1)
    
  2. Shift+Enterを押してセルを実行し、次のセルに移動します。

ステップ5:既存のテーブルに挿入する

  1. 次のコードをコピーして、ノートブックの空のセルに貼り付けます。このコードは、新しい赤ちゃんの名前データをDataFrameからの既存のテーブルに追加します。

    df.write.mode("append").insertInto(f"{path_table}.{table_name}")
    display(spark.sql(f"SELECT * FROM {path_table}.{table_name} WHERE Year = 2022"))
    
    df1.write.mode("append").insertInto(s"${pathTable}.${tableName}")
    display(spark.sql(s"SELECT * FROM ${pathTable}.${tableName} WHERE Year = 2022"))
    
    # The write.df function in R, as provided by the SparkR package, does not directly support writing to Unity Catalog.
    # In this example, you write the DataFrame into a temporary view and then use the SQL command to insert data from the temporary view to the Unity Catalog table
    createOrReplaceTempView(df1, "temp_view")
    sql(paste0("INSERT INTO ", path_table, ".", table_name, " SELECT * FROM temp_view"))
    display(sql(paste0("SELECT * FROM ", path_table, ".", table_name, " WHERE Year = 2022")))
    
  2. Ctrl+Enterを押してセルを実行します。

追加のデータノートブックを取り込む

次のいずれかのノートブックを使用して、この記事のステップを実行します。 <catalog-name><schema-name>、および <volume-name> を、Unity Catalog ボリュームのカタログ、スキーマ、およびボリューム名に置き換えます。必要に応じて、 table_name 値を任意のテーブル名に置き換えます。

Pythonを使用して追加データを取り込んで挿入する

ノートブックを新しいタブで開く

Scalaを使用して追加データを取り込んで挿入する

ノートブックを新しいタブで開く

Rを使用して追加のデータを取り込んで挿入する

ノートブックを新しいタブで開く

次のステップ

データのクレンジングと拡張の詳細については、はじめに:データの拡張とクレンジングを参照してください。