はじめに:追加データを取り込んで挿入する
この入門記事では、 Databricksダッシュボードを使用して、追加の赤ちゃんの名前データを含むCSVファイルをUnity Catalogボリュームに取り込み、 Python 、 Scala 、R を使用して新しい赤ちゃんの名前データを既存のテーブルにインポートする方法について説明します。
重要
この入門記事は、入門: ノートブックから CSV データをインポートして視覚化するに基づいています。 この記事を完了するには、その記事のステップを完了する必要があります。 入門記事の完全なノートブックについては、 データのインポートと視覚化ノートブックを参照してください。
要件
この記事のタスクを完了するには、次の要件を満たす必要があります。
ワークスペースでUnity Catalog が有効になっている必要があります。 Unity Catalogの使用開始に関する情報については、 Unity Catalogのセットアップと管理を参照してください。
ボリュームに対する
WRITE VOLUME
権限、親スキーマに対するUSE SCHEMA
権限、および親カタログに対するUSE CATALOG
権限が必要です。既存のコンピュート リソースを使用するか、新しいコンピュート リソースを作成するには、アクセス許可が必要です。 Databricks の使用を開始するを参照するか、Databricks 管理者に問い合わせてください。
ヒント
この記事の完成したノートブックについては、 追加データの取り込みノートブックを参照してください。
ステップ1:新しいノートブックを作成する
ワークスペースにノートブックを作成するには、サイドバーで 新規作成をクリックし、ノートブックをクリックします。空白のノートブックがワークスペースで開きます。
ノートブックの作成と管理の詳細については、ノートブックの管理を参照してください。
ステップ2:変数を定義する
このステップでは、この記事で作成するノートブックの例で使用する変数を定義します。
次のコードをコピーして、新しい空のノートブック セルに貼り付けます。
<catalog-name>
、<schema-name>
、および<volume-name>
を、Unity Catalog ボリュームのカタログ、スキーマ、およびボリューム名に置き換えます。必要に応じて、table_name
値を任意のテーブル名に置き換えます。 赤ちゃんの名前のデータは、この記事の後半でこのテーブルに保存します。Shift+Enter
を押すとセルが実行され、新しい空白のセルが作成されます。catalog = "<catalog_name>" schema = "<schema_name>" volume = "<volume_name>" file_name = "new_baby_names.csv" table_name = "baby_names" path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume path_table = catalog + "." + schema print(path_table) # Show the complete path print(path_volume) # Show the complete path
val catalog = "<catalog_name>" val schema = "<schema_name>" val volume = "<volume_name>" val fileName = "new_baby_names.csv" val tableName = "baby_names" val pathVolume = s"/Volumes/${catalog}/${schema}/${volume}" val pathTable = s"${catalog}.${schema}" print(pathVolume) // Show the complete path print(pathTable) // Show the complete path
catalog <- "<catalog_name>" schema <- "<schema_name>" volume <- "<volume_name>" file_name <- "new_baby_names.csv" table_name <- "baby_names" path_volume <- paste0("/Volumes/", catalog, "/", schema, "/", volume, sep = "") path_table <- paste0(catalog, ".", schema, sep = "") print(path_volume) # Show the complete path print(path_table) # Show the complete path
ステップ3:Unity Catalogボリュームに新しいCSVファイルデータを追加する
このステップでは、2022年の新しい赤ちゃんの名前で「df
」というDataFrameを作成し、そのデータをUnity Catalogボリューム内の新しいCSVファイルに保存します。
注:
このステップでは、以前の年のロード済みの既存データに新しい年のデータを追加するシミュレーションを行います。本番運用環境では、この増分データはクラウドストレージに保存されます。
次のコードをコピーして、新しい空のノートブックセルに貼り付けます。このコードは、赤ちゃんの名前データを追加したDataFrameを作成し、そのデータをUnity CatalogボリュームのCSVファイルに書き込みます。
data = [[2022, "CARL", "Albany", "M", 42]] df = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int") # display(df) (df.coalesce(1) .write .option("header", "true") .mode("overwrite") .csv(f"{path_volume}/{file_name}"))
val data = Seq((2022, "CARL", "Albany", "M", 42)) val columns = Seq("Year", "First_Name", "County", "Sex", "Count") val df = data.toDF(columns: _*) // display(df) df.coalesce(1) .write .option("header", "true") .mode("overwrite") .csv(f"{pathVolume}/{fileName}")
# Load the SparkR package that is already preinstalled on the cluster. library(SparkR) data <- data.frame(Year = 2022, First_Name = "CARL", County = "Albany", Sex = "M", Count = 42) df <- createDataFrame(data) # display(df) write.df(df, path = paste0(path_volume, "/", file_name), source = "csv", mode = "overwrite", header = "true")
Shift+Enter
を押してセルを実行し、次のセルに移動します。
ステップ4:CSVファイルからDataFrameにデータをロードする
注:
このステップでは、クラウドストレージからデータをロードするシミュレーションを行います。
次のコードをコピーして、ノートブックの空のセルに貼り付けます。このコードは、新しい赤ちゃんの名前データをCSVファイルから新しいDataFrameにロードします。
df1 = spark.read.csv(f"{path_volume}/{file_name}", header=True, inferSchema=True, sep=",") display(df1)
val df1 = spark.read .option("header", "true") .option("inferSchema", "true") .option("delimiter", ",") .csv(s"$pathVolume/$fileName") display(df1)
df1 <- read.df(paste0(path_volume, "/", file_name), source = "csv", header = TRUE, inferSchema = TRUE) display(df1)
Shift+Enter
を押してセルを実行し、次のセルに移動します。
ステップ5:既存のテーブルに挿入する
次のコードをコピーして、ノートブックの空のセルに貼り付けます。このコードは、新しい赤ちゃんの名前データをDataFrameからの既存のテーブルに追加します。
df.write.mode("append").insertInto(f"{path_table}.{table_name}") display(spark.sql(f"SELECT * FROM {path_table}.{table_name} WHERE Year = 2022"))
df1.write.mode("append").insertInto(s"${pathTable}.${tableName}") display(spark.sql(s"SELECT * FROM ${pathTable}.${tableName} WHERE Year = 2022"))
# The write.df function in R, as provided by the SparkR package, does not directly support writing to Unity Catalog. # In this example, you write the DataFrame into a temporary view and then use the SQL command to insert data from the temporary view to the Unity Catalog table createOrReplaceTempView(df1, "temp_view") sql(paste0("INSERT INTO ", path_table, ".", table_name, " SELECT * FROM temp_view")) display(sql(paste0("SELECT * FROM ", path_table, ".", table_name, " WHERE Year = 2022")))
Ctrl+Enter
を押してセルを実行します。
追加のデータノートブックを取り込む
次のいずれかのノートブックを使用して、この記事のステップを実行します。 <catalog-name>
、<schema-name>
、および <volume-name>
を、Unity Catalog ボリュームのカタログ、スキーマ、およびボリューム名に置き換えます。必要に応じて、 table_name
値を任意のテーブル名に置き換えます。
次のステップ
データのクレンジングと拡張の詳細については、はじめに:データの拡張とクレンジングを参照してください。