チュートリアル: Apache Spark DataFrames を使用してデータを読み込み、変換する
このチュートリアルでは、Databricks で Apache Spark Python (PySpark) DataFrame API、Apache Spark Scala DataFrame API、SparkR SparkDataFrame API を使用してデータを読み込み、変換する方法を説明します。
このチュートリアルの最後には、DataFrame とは何かを理解し、次のタスクに慣れることができます。
「Apache Spark PySpark APIリファレンス」も参照してください。
Apache Spark Scala APIリファレンスも参照してください。
DataFrameとは
DataFrameは、潜在的に異なるタイプの列を持つ2次元のラベル付きデータ構造です。DataFrameは、スプレッドシート、SQLテーブル、または複数のオブジェクトから成る辞書のようなものと考えることができます。Apache Spark DataFramesは、一般的なデータ分析の問題を効率的に解決できる関数(列の選択、絞り込み、結合、集計)を多数提供します。
Apache Spark DataFramesは、Resilient Distributed Datasets(RDD)の上に構築された抽象化です。Spark DataFramesとSpark SQLでは、統合された計画および最適化エンジンが使用されているため、Databricksでサポートされているすべての言語(Python、SQL、Scala、および R)でほぼ同じパフォーマンスを得ることができます。
要件
次のチュートリアルを完了するには、次の要件を満たす必要があります。
このチュートリアルの例を使用するには、ワークスペースでUnity Catalog が有効になっている必要があります。
このチュートリアルの例では、 Unity Catalogボリュームを使用してサンプルデータを保存します。 これらの例を使用するには、ボリュームを作成し、そのボリュームのカタログ名、スキーマ名、およびボリューム名を使用して、例で使用するボリュームパスを設定します。
Unity Catalog では次の権限が必要です。
READ VOLUME
このチュートリアルで使用するボリュームの場合はWRITE VOLUME
、またはALL PRIVILEGES
です。USE SCHEMA
または、このチュートリアルで使用されるスキーマの場合はALL PRIVILEGES
。USE CATALOG
または、このチュートリアルで使用されるカタログの場合はALL PRIVILEGES
。
これらの権限を設定するには、 Databricks管理者またはUnity Catalog権限とセキュリティ保護可能なオブジェクトを参照してください。
ステップ1: 変数を定義し、 CSVファイルを読み込む
このステップでは、このチュートリアルで使用する変数を定義し、CSV health.data.ny.gov からの赤ちゃんの名前データを含む ファイルを ボリュームに読み込みます。Unity Catalog
をクリックして新しいノートブックを開きますアイコン。 Databricksを操作する方法については、 Databricksインターフェイスとコントロール」を参照してください。
次のコードをコピーして、新しい空のノートブック セルに貼り付けます。
<catalog-name>
、<schema-name>
、<volume-name>
をUnity Catalogボリュームのカタログ名、スキーマ名、ボリューム名に置き換えます。<table_name>
を任意のテーブル名に置き換えます。このチュートリアルの後半で、赤ちゃんの名前のデータをこのテーブルに読み込みます。Shift+Enter
を押すとセルが実行され、新しい空白のセルが作成されます。catalog = "<catalog_name>" schema = "<schema_name>" volume = "<volume_name>" download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" file_name = "rows.csv" table_name = "<table_name>" path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume path_tables = catalog + "." + schema print(path_tables) # Show the complete path print(path_volume) # Show the complete path
val catalog = "<catalog_name>" val schema = "<schema_name>" val volume = "<volume_name>" val download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" val file_name = "rows.csv" val table_name = "<table_name>" val path_volume = s"/Volumes/$catalog/$schema/$volume" val path_tables = s"$catalog.$schema.$table_name" print(path_volume) // Show the complete path print(path_tables) // Show the complete path
catalog <- "<catalog_name>" schema <- "<schema_name>" volume <- "<volume_name>" download_url <- "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" file_name <- "rows.csv" table_name <- "<table_name>" path_volume <- paste("/Volumes/", catalog, "/", schema, "/", volume, sep = "") path_tables <- paste(catalog, ".", schema, sep = "") print(path_volume) # Show the complete path print(path_tables) # Show the complete path
次のコードをコピーして、新しい空のノートブック セルに貼り付けます。 このコードは 、Databricks dbutuils コマンドを使用して、
rows.csv
health.data.ny.gov から ファイルを ボリュームにコピーします。Unity CatalogShift+Enter
を押すとセルが実行され、次のセルに移動します。dbutils.fs.cp(f"{download_url}", f"{path_volume}" + "/" + f"{file_name}")
dbutils.fs.cp(download_url, s"$path_volume/$file_name")
dbutils.fs.cp(download_url, paste(path_volume, "/", file_name, sep = ""))
ステップ2: DataFrameを作成する
このステップでは、テスト データを含むdf1
という名前の DataFrame を作成し、その内容を表示します。
次のコードをコピーして、新しい空のノートブック セルに貼り付けます。 このコードは、テスト データを使用して Dataframe を作成し、DataFrame の内容とスキーマを表示します。
Shift+Enter
を押すとセルが実行され、次のセルに移動します。data = [[2021, "test", "Albany", "M", 42]] columns = ["Year", "First_Name", "County", "Sex", "Count"] df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int") display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization. # df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
val data = Seq((2021, "test", "Albany", "M", 42)) val columns = Seq("Year", "First_Name", "County", "Sex", "Count") val df1 = data.toDF(columns: _*) display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization. // df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
# Load the SparkR package that is already preinstalled on the cluster. library(SparkR) data <- data.frame( Year = c(2021), First_Name = c("test"), County = c("Albany"), Sex = c("M"), Count = c(42) ) df1 <- createDataFrame(data) display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization. # head(df1) The head() method is a part of the Apache SparkR DataFrame API and provides basic visualization.
ステップ3: ファイルから にデータをロードするDataFrameCSV
このステップでは、以前に ボリュームにロードした ファイルからDataFrame df_csv
という名前のCSV Unity Catalogを作成します。spark.read.csvを参照してください。
次のコードをコピーして、新しい空のノートブック セルに貼り付けます。 このコードは、CSV ファイルから赤ちゃんの名前データを DataFrame
df_csv
に読み込み、DataFrame の内容を表示します。Shift+Enter
を押すとセルが実行され、次のセルに移動します。df_csv = spark.read.csv(f"{path_volume}/{file_name}", header=True, inferSchema=True, sep=",") display(df_csv)
val df_csv = spark.read .option("header", "true") .option("inferSchema", "true") .option("delimiter", ",") .csv(s"$path_volume/$file_name") display(df_csv)
df_csv <- read.df(paste(path_volume, "/", file_name, sep=""), source="csv", header = TRUE, inferSchema = TRUE, delimiter = ",") display(df_csv)
サポートされている様々なファイル形式からデータを読み込むことができます。
ステップ4: DataFrameを表示して操作する
次の方法を使用して、赤ちゃんの名前のDataFramesを表示および操作します。
DataFrameスキーマを印刷する
Apache Spark DataFrame のスキーマを表示する方法を学習します。 Apache Spark では、DataFrame 内の列の名前とデータ型を参照するためにスキーマという用語を使用します。
次のコードをコピーして、空のノートブックのセルに貼り付けます。 このコードは、2 つのDataFramesのスキーマを表示する .printSchema()
メソッドを使用してDataFramesのスキーマを表示し、2 つのDataFramesを結合する準備をします。
df_csv.printSchema()
df1.printSchema()
df_csv.printSchema()
df1.printSchema()
printSchema(df_csv)
printSchema(df1)
注:
Databricksは、カタログに登録されたテーブルの集まりを表すのにも「スキーマ」という用語を使用します。
DataFrame の列名を変更する
DataFrame 内の列の名前を変更する方法を学びます。
次のコードをコピーして、空のノートブックのセルに貼り付けます。 このコードは、 df1_csv
DataFrame 内の列の名前を、 df1
DataFrame 内の対応する列と一致するように変更します。 このコードはApache Spark withColumnRenamed()
メソッドを使用します。
df_csv = df_csv.withColumnRenamed("First Name", "First_Name")
df_csv.printSchema
val df_csvRenamed = df_csv.withColumnRenamed("First Name", "First_Name")
// when modifying a DataFrame in Scala, you must assign it to a new variable
df_csv_renamed.printSchema()
df_csv <- withColumnRenamed(df_csv, "First Name", "First_Name")
printSchema(df_csv)
DataFramesを結合する
ある DataFrame の行を別の DataFrame に追加する新しい DataFrame を作成する方法を学習します。
次のコードをコピーして、空のノートブックのセルに貼り付けます。 このコードは、Apache Spark union()
メソッドを使用して、最初の DataFrame df
の内容を、CSV ファイルから読み込まれた赤ちゃんの名前データを含む DataFrame df_csv
と結合します。
df = df1.union(df_csv)
display(df)
val df = df1.union(df_csv_renamed)
display(df)
display(df <- union(df1, df_csv))
DataFrame内の行をフィルタリングする
Apache Spark の.filter()
または.where()
メソッドを使用して行をフィルタリングし、データ セット内で最も人気のある赤ちゃんの名前を見つけます。 フィルタリングを使用して、DataFrame で返される行または変更される行のサブセットを選択します。 次の例に示すように、パフォーマンスや構文に違いはありません。
DataFrame から列を選択し、頻度順に並べ替える
select()
メソッドを使用して、DataFrame から返す列を指定して、赤ちゃんの名前の頻度について学習します。 結果を並べ替えるには、Apache Spark のorderby
およびdesc
関数を使用します。
用のPySpark .sql モジュールは、Apache Spark SQL関数のサポートを提供します。このチュートリアルで使用する関数には、Apache Spark のorderBy()
、 desc()
、 expr()
関数があります。 これらの関数を使用できるようにするには、必要に応じてセッションにインポートします。
次のコードをコピーして、空のノートブックのセルに貼り付けます。 このコードは、 desc()
関数をインポートし、Apache Spark select()
メソッドと Apache Spark orderBy()
およびdesc()
関数を使用して、最も一般的な名前とその数を降順で表示します。
from pyspark.sql.functions import desc
display(df.select("First_Name", "Count").orderBy(desc("Count")))
import org.apache.spark.sql.functions.desc
display(df.select("First_Name", "Count").orderBy(desc("Count")))
display(arrange(select(df, df$First_Name, df$Count), desc(df$Count)))
サブセットDataFrameを作成する
既存の DataFrame からサブセット DataFrame を作成する方法を学習します。
次のコードをコピーして、空のノートブックのセルに貼り付けます。 このコードは、Apache Spark filter
メソッドを使用して、年、数、性別でデータを制限する新しい DataFrame を作成します。 列を制限するために Apache Spark select()
メソッドを使用します。 また、Apache Spark のorderBy()
およびdesc()
関数を使用して、新しい DataFrame をカウントで並べ替えます。
subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
display(subsetDF)
val subsetDF = df.filter((df("Year") == 2009) && (df("Count") > 100) && (df("Sex") == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
display(subsetDF)
subsetDF <- select(filter(df, (df$Count > 100) & (df$year == 2009) & df["Sex"] == "F")), "First_Name", "County", "Count")
display(subsetDF)
ステップ5: DataFrameを保存する
DataFrame を保存する方法を学びます。 DataFrame をテーブルに保存するか、DataFrame を 1 つまたは複数のファイルに書き込むことができます。
DataFrameをテーブルに保存する
Databricks は、デフォルトですべてのテーブルに Delta Lake 形式を使用します。 DataFrame を保存するには、カタログとスキーマに対するCREATE
テーブル権限が必要です。
次のコードをコピーして、空のノートブックのセルに貼り付けます。 このコードは、このチュートリアルの冒頭で定義した変数を使用して、DataFrame の内容をテーブルに保存します。
df.write.saveAsTable(f"{path_tables}" + "." + f"{table_name}")
# To overwrite an existing table, use the following code:
# df.write.mode("overwrite").saveAsTable(f"{path_tables}" + "." + f"{table_name}")
df.write.saveAsTable(s"$path_tables" + "." + s"$table_name")
// To overwrite an existing table, use the following code:
// df.write.mode("overwrite").saveAsTable(s"$path_volume" + "." + s"$table_name")
saveAsTable(df, paste(path_tables, ".", table_name))
# To overwrite an existing table, use the following code:
# saveAsTable(df, paste(path_tables, ".", table_name), mode = "overwrite")
ほとんどのApache Spark アプリケーションは、大規模なデータ セットを分散形式で処理します。 Apache Spark は、単一のファイルではなく、ファイルのディレクトリを書き出します。 Delta Lake は Parquet フォルダーとファイルを分割します。 多くのデータ・システムは、これらのファイルのディレクトリーを読み取ることができます。 Databricks では、ほとんどのアプリケーションでファイル パスよりもテーブルを使用することを推奨しています。
DataFrameをJSONファイルに保存する
次のコードをコピーして、空のノートブックのセルに貼り付けます。 このコードは、DataFrame を JSON ファイルのディレクトリに保存します。
df.write.format("json").save("/tmp/json_data")
# To overwrite an existing file, use the following code:
# df.write.format("json").mode("overwrite").save("/tmp/json_data")
df.write.format("json").save("/tmp/json_data")
// To overwrite an existing file, use the following code:
// df.write.format("json").mode("overwrite").save("/tmp/json_data")
write.df(df, path = "/tmp/json_data", source = "json")
# To overwrite an existing file, use the following code:
# write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")
JSONファイルからDataFrameを読み込む
Apache Sparkspark.read.format()
read.jsonメソッドを使用して、ディレクトリからDataFrame に .json データ 方法を学習します。
次のコードをコピーして、空のノートブックのセルに貼り付けます。 このコードは、前の例で保存した JSON ファイルを表示します。
display(spark.read.format("json").json("/tmp/json_data"))
display(spark.read.format("json").json("/tmp/json_data"))
display(read.json("/tmp/json_data"))
追加タスク: PySpark、Scala、R で SQL クエリを実行する
Apache Spark DataFrames は、SQL を PySpark、Scala、R と組み合わせるための次のオプションを提供します。 このチュートリアル用に作成した同じノートブックで次のコードを実行できます。
SQLクエリとして列を指定する
Apache Spark selectExpr()
メソッドの使用方法を学習します。 これは、SQL 式を受け入れて更新された DataFrame を返すselect()
メソッドのバリエーションです。 このメソッドでは、 upper
などの SQL 式を使用できます。
次のコードをコピーして、空のノートブックのセルに貼り付けます。 このコードは、Apache Spark selectExpr()
メソッドと SQL upper
式を使用して、文字列の列を大文字に変換します (列の名前を変更します)。
display(df.selectExpr("Count", "upper(County) as big_name"))
display(df.selectExpr("Count", "upper(County) as big_name"))
display(df_selected <- selectExpr(df, "Count", "upper(County) as big_name"))
列にSQL構文を使用するにはexpr()
を使用します
Apache Spark expr()
関数をインポートして使用し、列が指定される任意の場所で SQL 構文を使用する方法を学習します。
次のコードをコピーして、空のノートブックのセルに貼り付けます。 このコードは、 expr()
関数をインポートし、Apache Spark expr()
関数と SQL lower
式を使用して文字列の列を小文字に変換します (列の名前を変更します)。
from pyspark.sql.functions import expr
display(df.select("Count", expr("lower(County) as little_name")))
import org.apache.spark.sql.functions.{col, expr} // Scala requires us to import the col() function as well as the expr() function
display(df.select(col("Count"), expr("lower(County) as little_name")))
display(df_selected <- selectExpr(df, "Count", "lower(County) as little_name"))
# expr() function is not supported in R, selectExpr in SparkR replicates this functionality
spark.sql() を使用して任意の SQL クエリを 実行する
Apache Spark spark.sql()
関数を使用して任意の SQL クエリを実行する方法を学習します。
次のコードをコピーして、空のノートブックのセルに貼り付けます。 このコードは、Apache Spark spark.sql()
関数を使用して、SQL 構文で SQL テーブルをクエリします。
display(spark.sql(f"SELECT * FROM {path_tables}" + "." + f"{table_name}"))
display(spark.sql(s"SELECT * FROM $path_tables.$table_name"))
display(sql(paste("SELECT * FROM", path_tables, ".", table_name)))