チュートリアル: Apache Spark DataFrames を使用してデータを読み込み、変換する

このチュートリアルでは、Databricks で Apache Spark Python (PySpark) DataFrame API、Apache Spark Scala DataFrame API、SparkR SparkDataFrame API を使用してデータを読み込み、変換する方法を説明します。

このチュートリアルの最後には、DataFrame とは何かを理解し、次のタスクに慣れることができます。

DataFrameとは

DataFrameは、潜在的に異なるタイプの列を持つ2次元のラベル付きデータ構造です。DataFrameは、スプレッドシート、SQLテーブル、または複数のオブジェクトから成る辞書のようなものと考えることができます。Apache Spark DataFramesは、一般的なデータ分析の問題を効率的に解決できる関数(列の選択、絞り込み、結合、集計)を多数提供します。

Apache Spark DataFramesは、Resilient Distributed Datasets(RDD)の上に構築された抽象化です。Spark DataFramesとSpark SQLでは、統合された計画および最適化エンジンが使用されているため、Databricksでサポートされているすべての言語(Python、SQL、Scala、および R)でほぼ同じパフォーマンスを得ることができます。

要件

次のチュートリアルを完了するには、次の要件を満たす必要があります。

  • このチュートリアルの例を使用するには、ワークスペースでUnity Catalog が有効になっている必要があります。

  • このチュートリアルの例では、 Unity Catalogボリュームを使用してサンプルデータを保存します。 これらの例を使用するには、ボリュームを作成し、そのボリュームのカタログ名、スキーマ名、およびボリューム名を使用して、例で使用するボリュームパスを設定します。

  • Unity Catalog では次の権限が必要です。

    • READ VOLUME このチュートリアルで使用するボリュームの場合はWRITE VOLUME 、またはALL PRIVILEGESです。

    • USE SCHEMA または、このチュートリアルで使用されるスキーマの場合はALL PRIVILEGES

    • USE CATALOG または、このチュートリアルで使用されるカタログの場合はALL PRIVILEGES

    これらの権限を設定するには、 Databricks管理者またはUnity Catalog権限とセキュリティ保護可能なオブジェクトを参照してください。

ステップ1: 変数を定義し、 CSVファイルを読み込む

このステップでは、このチュートリアルで使用する変数を定義し、CSV health.data.ny.gov からの赤ちゃんの名前データを含む ファイルを ボリュームに読み込みます。Unity Catalog

  1. をクリックして新しいノートブックを開きます新しいアイコンアイコン。 Databricksを操作する方法については、 Databricksインターフェイスとコントロール」を参照してください。

  2. 次のコードをコピーして、新しい空のノートブック セルに貼り付けます。 <catalog-name><schema-name><volume-name> をUnity Catalogボリュームのカタログ名、スキーマ名、ボリューム名に置き換えます。 <table_name> を任意のテーブル名に置き換えます。このチュートリアルの後半で、赤ちゃんの名前のデータをこのテーブルに読み込みます。

  3. Shift+Enterを押すとセルが実行され、新しい空白のセルが作成されます。

    catalog = "<catalog_name>"
    schema = "<schema_name>"
    volume = "<volume_name>"
    download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    file_name = "rows.csv"
    table_name = "<table_name>"
    path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume
    path_tables = catalog + "." + schema
    print(path_tables) # Show the complete path
    print(path_volume) # Show the complete path
    
    val catalog = "<catalog_name>"
    val schema = "<schema_name>"
    val volume = "<volume_name>"
    val download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    val file_name = "rows.csv"
    val table_name = "<table_name>"
    val path_volume = s"/Volumes/$catalog/$schema/$volume"
    val path_tables = s"$catalog.$schema.$table_name"
    print(path_volume) // Show the complete path
    print(path_tables) // Show the complete path
    
    catalog <- "<catalog_name>"
    schema <- "<schema_name>"
    volume <- "<volume_name>"
    download_url <- "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    file_name <- "rows.csv"
    table_name <- "<table_name>"
    path_volume <- paste("/Volumes/", catalog, "/", schema, "/", volume, sep = "")
    path_tables <- paste(catalog, ".", schema, sep = "")
    print(path_volume) # Show the complete path
    print(path_tables) # Show the complete path
    
  4. 次のコードをコピーして、新しい空のノートブック セルに貼り付けます。 このコードは 、Databricks dbutuils コマンドを使用して、 rows.csvhealth.data.ny.gov から ファイルを ボリュームにコピーします。Unity Catalog

  5. Shift+Enterを押すとセルが実行され、次のセルに移動します。

    dbutils.fs.cp(f"{download_url}", f"{path_volume}" + "/" + f"{file_name}")
    
    dbutils.fs.cp(download_url, s"$path_volume/$file_name")
    
    dbutils.fs.cp(download_url, paste(path_volume, "/", file_name, sep = ""))
    

ステップ2: DataFrameを作成する

このステップでは、テスト データを含むdf1という名前の DataFrame を作成し、その内容を表示します。

  1. 次のコードをコピーして、新しい空のノートブック セルに貼り付けます。 このコードは、テスト データを使用して Dataframe を作成し、DataFrame の内容とスキーマを表示します。

  2. Shift+Enterを押すとセルが実行され、次のセルに移動します。

    data = [[2021, "test", "Albany", "M", 42]]
    columns = ["Year", "First_Name", "County", "Sex", "Count"]
    
    df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int")
    display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
    # df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    
    val data = Seq((2021, "test", "Albany", "M", 42))
    val columns = Seq("Year", "First_Name", "County", "Sex", "Count")
    
    val df1 = data.toDF(columns: _*)
    display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization.
    // df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    
    # Load the SparkR package that is already preinstalled on the cluster.
    library(SparkR)
    
    data <- data.frame(
      Year = c(2021),
      First_Name = c("test"),
      County = c("Albany"),
      Sex = c("M"),
      Count = c(42)
    )
    
    df1 <- createDataFrame(data)
    display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
    # head(df1) The head() method is a part of the Apache SparkR DataFrame API and provides basic visualization.
    

ステップ3: ファイルから にデータをロードするDataFrameCSV

このステップでは、以前に ボリュームにロードした ファイルからDataFrame df_csvという名前のCSV Unity Catalogを作成します。spark.read.csvを参照してください。

  1. 次のコードをコピーして、新しい空のノートブック セルに貼り付けます。 このコードは、CSV ファイルから赤ちゃんの名前データを DataFrame df_csvに読み込み、DataFrame の内容を表示します。

  2. Shift+Enterを押すとセルが実行され、次のセルに移動します。

    df_csv = spark.read.csv(f"{path_volume}/{file_name}",
      header=True,
      inferSchema=True,
      sep=",")
    display(df_csv)
    
    val df_csv = spark.read
      .option("header", "true")
      .option("inferSchema", "true")
      .option("delimiter", ",")
      .csv(s"$path_volume/$file_name")
    
    display(df_csv)
    
    df_csv <- read.df(paste(path_volume, "/", file_name, sep=""),
      source="csv",
      header = TRUE,
      inferSchema = TRUE,
      delimiter = ",")
    
    display(df_csv)
    

サポートされている様々なファイル形式からデータを読み込むことができます。

ステップ4: DataFrameを表示して操作する

次の方法を使用して、赤ちゃんの名前のDataFramesを表示および操作します。

DataFrame の列名を変更する

DataFrame 内の列の名前を変更する方法を学びます。

次のコードをコピーして、空のノートブックのセルに貼り付けます。 このコードは、 df1_csv DataFrame 内の列の名前を、 df1 DataFrame 内の対応する列と一致するように変更します。 このコードはApache Spark withColumnRenamed()メソッドを使用します。

df_csv = df_csv.withColumnRenamed("First Name", "First_Name")
df_csv.printSchema
val df_csvRenamed = df_csv.withColumnRenamed("First Name", "First_Name")
// when modifying a DataFrame in Scala, you must assign it to a new variable
df_csv_renamed.printSchema()
df_csv <- withColumnRenamed(df_csv, "First Name", "First_Name")
printSchema(df_csv)

DataFramesを結合する

ある DataFrame の行を別の DataFrame に追加する新しい DataFrame を作成する方法を学習します。

次のコードをコピーして、空のノートブックのセルに貼り付けます。 このコードは、Apache Spark union()メソッドを使用して、最初の DataFrame dfの内容を、CSV ファイルから読み込まれた赤ちゃんの名前データを含む DataFrame df_csvと結合します。

df = df1.union(df_csv)
display(df)
val df = df1.union(df_csv_renamed)
display(df)
display(df <- union(df1, df_csv))

DataFrame内の行をフィルタリングする

Apache Spark の.filter()または.where()メソッドを使用して行をフィルタリングし、データ セット内で最も人気のある赤ちゃんの名前を見つけます。 フィルタリングを使用して、DataFrame で返される行または変更される行のサブセットを選択します。 次の例に示すように、パフォーマンスや構文に違いはありません。

.filter() の使用 方式

次のコードをコピーして、空のノートブックのセルに貼り付けます。 このコードは、Apache Spark .filter()メソッドを使用して、DataFrame 内の行数が 50 を超える行を表示します。

display(df.filter(df["Count"] > 50))
display(df.filter(df("Count") > 50))
display(filteredDF <- filter(df, df$Count > 50))

.where() の使用 方式

次のコードをコピーして、空のノートブックのセルに貼り付けます。 このコードは、Apache Spark .where()メソッドを使用して、DataFrame 内の行数が 50 を超える行を表示します。

display(df.where(df["Count"] > 50))
display(df.where(df("Count") > 50))
display(filtered_df <- where(df, df$Count > 50))

DataFrame から列を選択し、頻度順に並べ替える

select()メソッドを使用して、DataFrame から返す列を指定して、赤ちゃんの名前の頻度について学習します。 結果を並べ替えるには、Apache Spark のorderbyおよびdesc関数を使用します。

用のPySpark .sql モジュールは、Apache Spark SQL関数のサポートを提供します。このチュートリアルで使用する関数には、Apache Spark のorderBy()desc()expr()関数があります。 これらの関数を使用できるようにするには、必要に応じてセッションにインポートします。

次のコードをコピーして、空のノートブックのセルに貼り付けます。 このコードは、 desc()関数をインポートし、Apache Spark select()メソッドと Apache Spark orderBy()およびdesc()関数を使用して、最も一般的な名前とその数を降順で表示します。

from pyspark.sql.functions import desc
display(df.select("First_Name", "Count").orderBy(desc("Count")))
import org.apache.spark.sql.functions.desc
display(df.select("First_Name", "Count").orderBy(desc("Count")))
display(arrange(select(df, df$First_Name, df$Count), desc(df$Count)))

サブセットDataFrameを作成する

既存の DataFrame からサブセット DataFrame を作成する方法を学習します。

次のコードをコピーして、空のノートブックのセルに貼り付けます。 このコードは、Apache Spark filterメソッドを使用して、年、数、性別でデータを制限する新しい DataFrame を作成します。 列を制限するために Apache Spark select()メソッドを使用します。 また、Apache Spark のorderBy()およびdesc()関数を使用して、新しい DataFrame をカウントで並べ替えます。

subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
display(subsetDF)
val subsetDF = df.filter((df("Year") == 2009) && (df("Count") > 100) && (df("Sex") == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))

display(subsetDF)
subsetDF <- select(filter(df, (df$Count > 100) & (df$year == 2009) & df["Sex"] == "F")), "First_Name", "County", "Count")
display(subsetDF)

ステップ5: DataFrameを保存する

DataFrame を保存する方法を学びます。 DataFrame をテーブルに保存するか、DataFrame を 1 つまたは複数のファイルに書き込むことができます。

DataFrameをテーブルに保存する

Databricks は、デフォルトですべてのテーブルに Delta Lake 形式を使用します。 DataFrame を保存するには、カタログとスキーマに対するCREATEテーブル権限が必要です。

次のコードをコピーして、空のノートブックのセルに貼り付けます。 このコードは、このチュートリアルの冒頭で定義した変数を使用して、DataFrame の内容をテーブルに保存します。

df.write.saveAsTable(f"{path_tables}" + "." + f"{table_name}")

# To overwrite an existing table, use the following code:
# df.write.mode("overwrite").saveAsTable(f"{path_tables}" + "." + f"{table_name}")
df.write.saveAsTable(s"$path_tables" + "." + s"$table_name")

// To overwrite an existing table, use the following code:
// df.write.mode("overwrite").saveAsTable(s"$path_volume" + "." + s"$table_name")
saveAsTable(df, paste(path_tables, ".", table_name))
# To overwrite an existing table, use the following code:
# saveAsTable(df, paste(path_tables, ".", table_name), mode = "overwrite")

ほとんどのApache Spark アプリケーションは、大規模なデータ セットを分散形式で処理します。 Apache Spark は、単一のファイルではなく、ファイルのディレクトリを書き出します。 Delta Lake は Parquet フォルダーとファイルを分割します。 多くのデータ・システムは、これらのファイルのディレクトリーを読み取ることができます。 Databricks では、ほとんどのアプリケーションでファイル パスよりもテーブルを使用することを推奨しています。

DataFrameをJSONファイルに保存する

次のコードをコピーして、空のノートブックのセルに貼り付けます。 このコードは、DataFrame を JSON ファイルのディレクトリに保存します。

df.write.format("json").save("/tmp/json_data")

# To overwrite an existing file, use the following code:
# df.write.format("json").mode("overwrite").save("/tmp/json_data")
df.write.format("json").save("/tmp/json_data")

// To overwrite an existing file, use the following code:
// df.write.format("json").mode("overwrite").save("/tmp/json_data")
write.df(df, path = "/tmp/json_data", source = "json")
# To overwrite an existing file, use the following code:
# write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")

JSONファイルからDataFrameを読み込む

Apache Sparkspark.read.format()read.jsonメソッドを使用して、ディレクトリからDataFrame に .json データ 方法を学習します。

次のコードをコピーして、空のノートブックのセルに貼り付けます。 このコードは、前の例で保存した JSON ファイルを表示します。

display(spark.read.format("json").json("/tmp/json_data"))
display(spark.read.format("json").json("/tmp/json_data"))
display(read.json("/tmp/json_data"))

追加タスク: PySpark、Scala、R で SQL クエリを実行する

Apache Spark DataFrames は、SQL を PySpark、Scala、R と組み合わせるための次のオプションを提供します。 このチュートリアル用に作成した同じノートブックで次のコードを実行できます。

SQLクエリとして列を指定する

Apache Spark selectExpr()メソッドの使用方法を学習します。 これは、SQL 式を受け入れて更新された DataFrame を返すselect()メソッドのバリエーションです。 このメソッドでは、 upperなどの SQL 式を使用できます。

次のコードをコピーして、空のノートブックのセルに貼り付けます。 このコードは、Apache Spark selectExpr()メソッドと SQL upper式を使用して、文字列の列を大文字に変換します (列の名前を変更します)。

display(df.selectExpr("Count", "upper(County) as big_name"))
display(df.selectExpr("Count", "upper(County) as big_name"))
display(df_selected <- selectExpr(df, "Count", "upper(County) as big_name"))

列にSQL構文を使用するにはexpr()を使用します

Apache Spark expr()関数をインポートして使用し、列が指定される任意の場所で SQL 構文を使用する方法を学習します。

次のコードをコピーして、空のノートブックのセルに貼り付けます。 このコードは、 expr()関数をインポートし、Apache Spark expr()関数と SQL lower式を使用して文字列の列を小文字に変換します (列の名前を変更します)。

from pyspark.sql.functions import expr
display(df.select("Count", expr("lower(County) as little_name")))
import org.apache.spark.sql.functions.{col, expr} // Scala requires us to import the col() function as well as the expr() function

display(df.select(col("Count"), expr("lower(County) as little_name")))
display(df_selected <- selectExpr(df, "Count", "lower(County) as little_name"))
# expr() function is not supported in R, selectExpr in SparkR replicates this functionality

spark.sql() を使用して任意の SQL クエリを 実行する

Apache Spark spark.sql()関数を使用して任意の SQL クエリを実行する方法を学習します。

次のコードをコピーして、空のノートブックのセルに貼り付けます。 このコードは、Apache Spark spark.sql()関数を使用して、SQL 構文で SQL テーブルをクエリします。

display(spark.sql(f"SELECT * FROM {path_tables}" + "." + f"{table_name}"))
display(spark.sql(s"SELECT * FROM $path_tables.$table_name"))
display(sql(paste("SELECT * FROM", path_tables, ".", table_name)))

DataFrame チュートリアル ノートブック

次のノートブックには、このチュートリアルのサンプルクエリが含まれています。

DataFrames チュートリアル ノートブック

ノートブックを新しいタブで開く

DataFrames チュートリアル ノートブック

ノートブックを新しいタブで開く

DataFrames チュートリアル ノートブック

ノートブックを新しいタブで開く