チュートリアル: Apache Spark データフレーム を使用してデータを読み込んで変換する
このチュートリアルでは、Databricks で Apache Spark Python (PySpark) データフレーム API、Apache Spark Scala データフレーム API、および SparkR Sparkデータフレーム API を使用してデータを読み込んで変換する方法を示します。
このチュートリアルの最後には、データフレームとは何かを理解し、次のタスクに慣れることができます:
- Python
- Scala
- R
- Define variables and copy public data into a Unity Catalog volume
- Create a DataFrame with Python
- Load data into a DataFrame from CSV file
- View and interact with a DataFrame
- Save the DataFrame
- Run SQL queries in PySpark
See also Apache Spark PySpark API reference.
データフレームとは何ですか?
データフレームは、異なるタイプの列を持ちえる2次元のラベル付きデータ構造です。データフレームは、スプレッドシート、SQLテーブル、または複数のオブジェクトから成る辞書のようなものと考えることができます。Apache Sparkデータフレームは、一般的なデータ分析の問題を効率的に解決できる関数(列の選択、絞り込み、結合、集計)を多数提供します。
Apache Sparkデータフレームは、Resilient Distributed Datasets(RDD)の上に構築された抽象化です。SparkデータフレームとSpark SQLでは、統合された計画および最適化エンジンが使用されているため、Databricksでサポートされているすべての言語(Python、SQL、Scala、およびR)でほぼ同じパフォーマンスを得ることができます。
必要条件
以下のチュートリアルを完了するには、以下の条件を満たす必要があります:
-
このチュートリアルの例を使用するには、ワークスペースで Unity Catalog が有効になっている必要があります。
-
このチュートリアルの例では、Unity Catalog ボリューム を使用してサンプル データを格納します。 これらの例を使用するには、ボリュームを作成し、そのボリュームのカタログ、スキーマ、およびボリューム名を使用して、例で使用されるボリュームパスを設定します。
-
Unity Catalog には、次のアクセス許可が必要です。
READ VOLUME
とWRITE VOLUME
、またはこのチュートリアルで使用するボリュームのALL PRIVILEGES
。USE SCHEMA
または、このチュートリアルで使用するスキーマのALL PRIVILEGES
。USE CATALOG
または、このチュートリアルで使用するカタログのALL PRIVILEGES
です。
これらのアクセス許可を設定するには、Databricks 管理者または Unity Catalog の特権とセキュリティ保護可能なオブジェクトを参照してください。
この記事の完成したノートブックについては、「データフレーム チュートリアル ノートブック」を参照してください。
ステップ1:変数を定義してCSVファイルをロードする
この手順では、このチュートリアルで使用する変数を定義し、 health.data.ny.gov から赤ちゃんの名前データを含む CSV ファイルを Unity Catalog ボリュームに読み込みます。
-
新しいノートブックを開くには、
アイコンをクリックします。 ノートブック Databricks ナビゲートする方法については、「 ノートブックの外観をカスタマイズする」を参照してください。
-
次のコードをコピーして、新しい空のノートブック セルに貼り付けます。
<catalog-name>
、<schema-name>
、および<volume-name>
を、Unity Catalog ボリュームのカタログ、スキーマ、およびボリューム名に置き換えます。<table_name>
を任意のテーブル名に置き換えます。このテーブルには、このチュートリアルの後半で赤ちゃんの名前データを読み込みます。
- Python
- Scala
- R
catalog = "<catalog_name>"
schema = "<schema_name>"
volume = "<volume_name>"
download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
file_name = "rows.csv"
table_name = "<table_name>"
path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume
path_table = catalog + "." + schema
print(path_table) # Show the complete path
print(path_volume) # Show the complete path
val catalog = "<catalog_name>"
val schema = "<schema_name>"
val volume = "<volume_name>"
val downloadUrl = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
val fileName = "rows.csv"
val tableName = "<table_name>"
val pathVolume = s"/Volumes/$catalog/$schema/$volume"
val pathTable = s"$catalog.$schema"
print(pathVolume) // Show the complete path
print(pathTable) // Show the complete path
catalog <- "<catalog_name>"
schema <- "<schema_name>"
volume <- "<volume_name>"
download_url <- "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
file_name <- "rows.csv"
table_name <- "<table_name>"
path_volume <- paste("/Volumes/", catalog, "/", schema, "/", volume, sep = "")
path_table <- paste(catalog, ".", schema, sep = "")
print(path_volume) # Show the complete path
print(path_table) # Show the complete path
-
Shift+Enter
を押すとセルが実行され、新しい空白のセルが作成されます。 -
次のコードをコピーして、新しい空のノートブック セルに貼り付けます。 このコードは、
rows.csv
Databricks dbutuilsコマンドを使用して、 health.data.ny.gov から Unity Catalog ボリュームに ファイルをコピーします。
- Python
- Scala
- R
dbutils.fs.cp(f"{download_url}", f"{path_volume}/{file_name}")
dbutils.fs.cp(downloadUrl, s"$pathVolume/$fileName")
dbutils.fs.cp(download_url, paste(path_volume, "/", file_name, sep = ""))
Shift+Enter
を押してセルを実行し、次のセルに移動します。
ステップ 2: データフレーム を作成する
このステップでは、テストデータを含むdf1
という名前のデータフレームを作成し、その内容を表示します。
- 次のコードをコピーして、新しい空のノートブックセルに貼り付けます。このコードは、テストデータを使用してデータフレームを作成し、データフレームの内容とスキーマを表示します。
- Python
- Scala
- R
data = [[2021, "test", "Albany", "M", 42]]
columns = ["Year", "First_Name", "County", "Sex", "Count"]
df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int")
display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
# df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
val data = Seq((2021, "test", "Albany", "M", 42))
val columns = Seq("Year", "First_Name", "County", "Sex", "Count")
val df1 = data.toDF(columns: _*)
display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization.
// df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
# Load the SparkR package that is already preinstalled on the cluster.
library(SparkR)
data <- data.frame(
Year = as.integer(c(2021)),
First_Name = c("test"),
County = c("Albany"),
Sex = c("M"),
Count = as.integer(c(42))
)
df1 <- createDataFrame(data)
display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
# head(df1) The head() method is a part of the Apache SparkR DataFrame API and provides basic visualization.
Shift+Enter
を押してセルを実行し、次のセルに移動します。
ステップ 3: CSV ファイルから データフレーム にデータを読み込む
このステップでは、以前にUnity Catalog ボリュームにロードしたCSVファイルからdf_csv
という名前のデータフレームを作成します。spark.read.csvを参照してください。
- 次のコードをコピーして、新しい空のノートブックセルに貼り付けます。このコードは、CSVファイルから赤ちゃんの名前データをデータフレーム
df_csv
に読み込み、データフレームの内容を表示します。
- Python
- Scala
- R
df_csv = spark.read.csv(f"{path_volume}/{file_name}",
header=True,
inferSchema=True,
sep=",")
display(df_csv)
val dfCsv = spark.read
.option("header", "true")
.option("inferSchema", "true")
.option("delimiter", ",")
.csv(s"$pathVolume/$fileName")
display(dfCsv)
df_csv <- read.df(paste(path_volume, "/", file_name, sep=""),
source="csv",
header = TRUE,
inferSchema = TRUE,
delimiter = ",")
display(df_csv)
Shift+Enter
を押してセルを実行し、次のセルに移動します。
サポートされている多くのファイル形式からデータをロードできます。
ステップ 4: データフレーム を表示して操作する
次の方法を使用して、赤ちゃんの名前のデータフレームを表示および操作します。
データフレーム スキーマを印刷する
Apache Sparkデータフレームのスキーマを表示する方法を学習します。Apache Sparkでは、 スキーマ という用語を使用して、データフレーム内の列の名前とデータ型を示します。
Databricksは、カタログに登録されたテーブルの集まりを表すのにも「スキーマ」という用語を使用します。
- 次のコードをコピーして、ノートブックの空のセルに貼り付けます。このコードは、
.printSchema()
メソッドを使用してデータフレームのスキーマを示しています。2つのデータフレームのスキーマを表示して、2つのデータフレームを結合する準備をします。
- Python
- Scala
- R
df_csv.printSchema()
df1.printSchema()
dfCsv.printSchema()
df1.printSchema()
printSchema(df_csv)
printSchema(df1)
Shift+Enter
を押してセルを実行し、次のセルに移動します。
データフレーム の列の名前を変更する
データフレームの列の名前を変更する方法を学びましょう。
- 次のコードをコピーして、ノートブックの空のセルに貼り付けます。このコードは、
df1_csv
データフレーム内の列の名前を、df1
データフレーム内の対応する列と一致するように変更します。このコードはApache SparkwithColumnRenamed()
メソッドを使用します。
- Python
- Scala
- R
df_csv = df_csv.withColumnRenamed("First Name", "First_Name")
df_csv.printSchema
val dfCsvRenamed = dfCsv.withColumnRenamed("First Name", "First_Name")
// when modifying a DataFrame in Scala, you must assign it to a new variable
dfCsvRenamed.printSchema()
df_csv <- withColumnRenamed(df_csv, "First Name", "First_Name")
printSchema(df_csv)
Shift+Enter
を押してセルを実行し、次のセルに移動します。
データフレームを組み合わせる
あるデータフレームの行を別のデータフレームに追加して新しいデータフレームを作成する方法を学習します。
- 次のコードをコピーして、ノートブックの空のセルに貼り付けます。このコードでは、Apache Sparkの
union()
メソッドを使用して、最初のデータフレームdf
の内容を、CSVファイルから読み込んだ赤ちゃんの名前データを含むデータフレームdf_csv
と結合します。
- Python
- Scala
- R
df = df1.union(df_csv)
display(df)
val df = df1.union(dfCsvRenamed)
display(df)
display(df <- union(df1, df_csv))
Shift+Enter
を押してセルを実行し、次のセルに移動します。
データフレーム の行をフィルター処理する
Apache Spark .filter()
または.where()
メソッドを使用して行をフィルタリングして、データセット内で最も人気のある赤ちゃんの名前を見つけてください。フィルタリングを使用して、データフレーム内で返したり変更したりする行のサブセットを選択します。以下の例のように、パフォーマンスや構文に違いはありません。
.filter() の使用 方式
- 次のコードをコピーして、ノートブックの空のセルに貼り付けます。このコードは、Apache Spark
.filter()
メソッドを使用して、データフレーム内の行数が50を超える行を表示します。
- Python
- Scala
- R
display(df.filter(df["Count"] > 50))
display(df.filter(df("Count") > 50))
display(filteredDF <- filter(df, df$Count > 50))
Shift+Enter
を押してセルを実行し、次のセルに移動します。
.where() の使用 方式
- 次のコードをコピーして、ノートブックの空のセルに貼り付けます。このコードは、Apache Spark
.where()
メソッドを使用して、データフレーム内の行数が50を超える行を表示します。
- Python
- Scala
- R
display(df.where(df["Count"] > 50))
display(df.where(df("Count") > 50))
display(filtered_df <- where(df, df$Count > 50))
Shift+Enter
を押してセルを実行し、次のセルに移動します。
データフレーム から列を選択し、頻度で並べ替える
select()
メソッドを使用して、データフレームから返す列を指定することで、赤ちゃんの名前の頻度について学習します。結果を並べ替えるには、Apache Sparkのorderby
およびdesc
関数を使用します。
PySparkの.sql モジュールは、Apache Spark SQL関数をサポートします。このチュートリアルで使用するこれらの関数には、Apache Spark orderBy()
、 desc()
、および expr()
関数があります。 これらの関数の使用を有効にするには、必要に応じてセッションにインポートします。
- 次のコードをコピーして、ノートブックの空のセルに貼り付けます。このコードは、
desc()
関数をインポートし、Apache Sparkselect()
メソッドとApache SparkorderBy()
およびdesc()
関数を使用して、最も一般的な名前とその数を降順で表示します。
- Python
- Scala
- R
from pyspark.sql.functions import desc
display(df.select("First_Name", "Count").orderBy(desc("Count")))
import org.apache.spark.sql.functions.desc
display(df.select("First_Name", "Count").orderBy(desc("Count")))
display(arrange(select(df, df$First_Name, df$Count), desc(df$Count)))
Shift+Enter
を押してセルを実行し、次のセルに移動します。
サブセットの データフレーム を作成する
既存のデータフレームからサブセットデータフレームを作成する方法を学びましょう。
- 次のコードをコピーして、ノートブックの空のセルに貼り付けます。このコードでは、Apache Sparkの
filter
メソッドを使用して、年、数、性別でデータを制限した新しいデータフレームを作成します。列を制限するためにApache Sparkselect()
メソッドを使用します。また、Apache SparkのorderBy()
およびdesc()
関数を使用して、新しい データフレームをカウントで並べ替えます。
- Python
- Scala
- R
subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
display(subsetDF)
val subsetDF = df.filter((df("Year") === 2009) && (df("Count") > 100) && (df("Sex") === "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
display(subsetDF)
subsetDF <- select(filter(df, (df$Count > 100) & (df$year == 2009) & df["Sex"] == "F")), "First_Name", "County", "Count")
display(subsetDF)
Shift+Enter
を押してセルを実行し、次のセルに移動します。
ステップ 5: データフレーム を保存する
データフレームを保存する方法について説明します。 データフレームをテーブルに保存するか、データフレームを 1 つまたは複数のファイルに書き込むことができます。
データフレーム をテーブルに保存します
Databricksでは、デフォルトですべてのテーブル向けにDelta Lakeを使用しています。データフレームを保存するには、カタログとスキーマに対する CREATE
テーブル権限が必要です。
- 次のコードをコピーして、ノートブックの空のセルに貼り付けます。このコードは、このチュートリアルの最初に定義した変数を使用して、データフレームの内容を表に保存します。
- Python
- Scala
- R
df.write.mode("overwrite").saveAsTable(f"{path_table}.{table_name}")
df.write.mode("overwrite").saveAsTable(s"$pathTable" + "." + s"$tableName")
saveAsTable(df, paste(path_table, ".", table_name), mode = "overwrite")
Shift+Enter
を押してセルを実行し、次のセルに移動します。
Apache Sparkアプリケーションのほとんどは、大規模なデータセットを分散処理します。Apache Sparkは単一のファイルではなく、ファイルのディレクトリを書き出します。Delta LakeはParquetフォルダとファイルを分割します。多くのデータシステムは、これらのファイルのディレクトリを読むことができます。Databricksでは、ほとんどのアプリケーションでファイルパスよりもテーブルを優先して使用することを推奨しています。
データフレーム を JSON ファイルに保存します
- 次のコードをコピーして、ノートブックの空のセルに貼り付けます。このコードは、データフレームを JSON ファイルのディレクトリに保存します。
- Python
- Scala
- R
df.write.format("json").mode("overwrite").save("/tmp/json_data")
df.write.format("json").mode("overwrite").save("/tmp/json_data")
write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")
Shift+Enter
を押してセルを実行し、次のセルに移動します。
JSON ファイルから データフレーム を読み取る
Apache Sparkspark.read.format()
read.jsonメソッドを使用して、ディレクトリからデータフレーム に .json データを読み込む方法を学習します。
- 次のコードをコピーして、空のノートブック セルに貼り付けます。 このコードは、前の例で保存した JSON ファイルを表示します。
- Python
- Scala
- R
display(spark.read.format("json").json("/tmp/json_data"))
display(spark.read.format("json").json("/tmp/json_data"))
display(read.json("/tmp/json_data"))
Shift+Enter
を押してセルを実行し、次のセルに移動します。
その他のタスク: PySpark、Scala、R での SQL クエリの実行
Apache Sparkデータフレームには、SQLをPySpark、Scala、Rと組み合わせるための次のオプションが用意されています。次のコードは、このチュートリアル用に作成したものと同じノートブックで実行できます。
列を SQL クエリとして指定する
Apache Spark selectExpr()
メソッドの使用方法を学習します。これは、SQL 式を受け入れ、更新されたデータフレームを返すselect()
メソッドのバリアントです。この方法では、upper
などのSQL式を使用できます。
- 次のコードをコピーして、ノートブックの空のセルに貼り付けます。このコードは、Apache Spark
selectExpr()
メソッドとSQLupper
式を使用して、文字列の列を大文字に変換します(列の名前を変更します)。
- Python
- Scala
- R
display(df.selectExpr("Count", "upper(County) as big_name"))
display(df.selectExpr("Count", "upper(County) as big_name"))
display(df_selected <- selectExpr(df, "Count", "upper(County) as big_name"))
Shift+Enter
を押してセルを実行し、次のセルに移動します。
expr()
を使用して、列の SQL 構文を使用します
Apache Spark expr()
関数をインポートして使用し、列が指定される任意の場所でSQL構文を使用する方法を学習します。
- 次のコードをコピーして、ノートブックの空のセルに貼り付けます。このコードは
expr()
関数をインポートし、Apache Sparkexpr()
関数とSQLlower
式を使用して文字列を小文字に変換します(そして列の名前を変更します)。
- Python
- Scala
- R
from pyspark.sql.functions import expr
display(df.select("Count", expr("lower(County) as little_name")))
import org.apache.spark.sql.functions.{col, expr}
// Scala requires us to import the col() function as well as the expr() function
display(df.select(col("Count"), expr("lower(County) as little_name")))
display(df_selected <- selectExpr(df, "Count", "lower(County) as little_name"))
# expr() function is not supported in R, selectExpr in SparkR replicates this functionality
Shift+Enter
を押してセルを実行し、次のセルに移動します。
spark.sql() を使用して任意の SQL クエリを実行する 機能
Apache Spark spark.sql()
関数を使用して任意のSQLクエリを実行する方法を学習します。
- 次のコードをコピーして、ノートブックの空のセルに貼り付けます。このコードでは、Apache Spark
spark.sql()
関数を使用してSQL構文を使用してSQLテーブルにクエリを実行します。
- Python
- Scala
- R
display(spark.sql(f"SELECT * FROM {path_table}.{table_name}"))
display(spark.sql(s"SELECT * FROM $pathTable.$tableName"))
display(sql(paste("SELECT * FROM", path_table, ".", table_name)))
Shift+Enter
を押してセルを実行し、次のセルに移動します。
データフレーム チュートリアル ノートブック
次のノートブックには、このチュートリアルからのクエリーの例が含まれています。
- Python
- Scala
- R