メインコンテンツまでスキップ

R で データフレーム とテーブルを操作する

important

Databricks の SparkR は、Databricks Runtime 16.0 以降では 非推奨とされています 。 Databricks 代わりに Sparklyr を使用することをお勧めします。

この記事では、SparkRSparklyrdplyr などの R パッケージを使用して、R data.frameSpark データフレーム、インメモリ テーブルを操作する方法について説明します。

SparkR、Sparklyr、dplyr を操作すると、これらすべてのパッケージで特定の操作を完了でき、最も使いやすいパッケージを使用できることに気付く場合があることに注意してください。たとえば、クエリを実行するには、 SparkR::sqlsparklyr::sdf_sqldplyr::selectなどの関数を呼び出すことができます。 また、これらのパッケージの 1 つまたは 2 つだけで操作を完了できる場合もあり、選択する操作は使用シナリオによって異なります。 たとえば、 sparklyr::sdf_quantile を呼び出す方法は、 dplyr::percentile_approxを呼び出す方法とは少し異なりますが、どちらの関数も分位数を計算します。

SQL は SparkR と Sparklyr の間のブリッジとして使用できます。たとえば、 SparkR::sql を使用して、Sparklyr で作成したテーブルをクエリできます。 sparklyr::sdf_sql を使用して、SparkR で作成したテーブルをクエリできます。また dplyr コードは、実行前に常にメモリ内で SQL に変換されます。 「API の相互運用性」および「SQL 変換」も参照してください。

ロード SparkR、Sparklyr、dplyr

SparkR、Sparklyr、および dplyr パッケージは、Databricks Runtime Databricksクラスター にインストールされる に含まれています。したがって、これらのパッケージの呼び出しを開始する前に、通常の install.package を呼び出す必要はありません。 ただし、これらのパッケージは、最初に library で読み込む必要があります。 たとえば、ワークスペースの R ノートブックDatabricks 内から、ノートブック セルで次のコードを実行して、SparkR 、Sparklyr、dplyr を読み込みます。

R
library(SparkR)
library(sparklyr)
library(dplyr)

Sparklyr をクラスターに接続する

Sparklyr を読み込んだら、 sparklyr::spark_connect を呼び出してクラスターに接続し、 databricks 接続方法を指定する必要があります。 たとえば、ノートブックのセルで次のコードを実行して、ノートブックをホストするクラスターに接続します。

sc <- spark_connect(method = "databricks")

これに対し、 Databricks ノートブックでは、 SparkRで使用するためのクラスターSparkSessionが既に確立されているため、 SparkRの呼び出しを開始する前に SparkR::sparkR.session を呼び出す必要はありません。

JSON データ ファイルをワークスペースにアップロードする

この記事のコード例の多くは、Databricks ワークスペース内の特定の場所にあるデータに基づいており、特定の列名とデータ型があります。 このコード例のデータは、GitHub 内から book.json という名前の JSON ファイルに基づいています。 このファイルを取得してワークスペースにアップロードするには、次のようにします。

  1. ブックスに移動します 。JSON GitHub上のファイル 、テキスト エディタを使用して、その内容をローカルbooks.json マシン上の という名前のファイルにコピーします。
  2. Databricks ワークスペースのサイドバーで、 [カタログ] をクリックします。
  3. [ テーブルを作成 ] をクリックします。
  4. [ ファイルのアップロード ] タブで、 books.json ファイルをローカル コンピューターから [ アップロードするファイルのドロップ ] ボックスにドロップします。または、[ クリックして参照 ] を選択し、ローカルコンピューターから books.json ファイルを参照します。

デフォルトでは、Databricks はローカル books.json ファイルをワークスペース内の DBFS の場所にパス /FileStore/tables/books.jsonでアップロードします。

「UI を使用したテーブルの作成 」または「ノートブックでのテーブルの作成 」はクリックしないでください。この記事のコード例では、この DBFS の場所にアップロードされた books.json ファイルのデータを使用します。

JSON データを データフレーム に読み込みます

sparklyr::spark_read_json を使用して、アップロードされた JSON ファイルを データフレーム に読み取り、接続、JSON ファイルへのパス、およびデータの内部テーブル表現の名前を指定します。この例では、 book.json ファイルに複数の行が含まれるように指定する必要があります。 ここでの列のスキーマの指定はオプションです。 それ以外の場合、Sparklyr は列のスキーマをデフォルトによって推論します。 たとえば、ノートブックのセルで次のコードを実行して、アップロードされた JSON ファイルのデータを jsonDFという名前の データフレーム に読み込みます。

R
jsonDF <- spark_read_json(
sc = sc,
name = "jsonTable",
path = "/FileStore/tables/books.json",
options = list("multiLine" = TRUE),
columns = c(
author = "character",
country = "character",
imageLink = "character",
language = "character",
link = "character",
pages = "integer",
title = "character",
year = "integer"
)
)

データフレーム の最初の数行を出力します

SparkR::headSparkR::show、または sparklyr::collect を使用して、データフレーム の最初の行を印刷できます。デフォルトでは、 head はデフォルトで最初の 6 行を印刷します。 showcollect は、最初の 10 行を印刷します。 たとえば、ノートブックのセルで次のコードを実行して、 jsonDFという名前の データフレーム の最初の行を出力します。

R
head(jsonDF)

# Source: spark<?> [?? x 8]
# author country image…¹ langu…² link pages title year
# <chr> <chr> <chr> <chr> <chr> <int> <chr> <int>
# 1 Chinua Achebe Nigeria images… English "htt… 209 Thin… 1958
# 2 Hans Christian Andersen Denmark images… Danish "htt… 784 Fair… 1836
# 3 Dante Alighieri Italy images… Italian "htt… 928 The … 1315
# 4 Unknown Sumer and Akk… images… Akkadi… "htt… 160 The … -1700
# 5 Unknown Achaemenid Em… images… Hebrew "htt… 176 The … -600
# 6 Unknown India/Iran/Ir… images… Arabic "htt… 288 One … 1200
# … with abbreviated variable names ¹​imageLink, ²​language

show(jsonDF)

# Source: spark<jsonTable> [?? x 8]
# author country image…¹ langu…² link pages title year
# <chr> <chr> <chr> <chr> <chr> <int> <chr> <int>
# 1 Chinua Achebe Nigeria images… English "htt… 209 Thin… 1958
# 2 Hans Christian Andersen Denmark images… Danish "htt… 784 Fair… 1836
# 3 Dante Alighieri Italy images… Italian "htt… 928 The … 1315
# 4 Unknown Sumer and Ak… images… Akkadi… "htt… 160 The … -1700
# 5 Unknown Achaemenid E… images… Hebrew "htt… 176 The … -600
# 6 Unknown India/Iran/I… images… Arabic "htt… 288 One … 1200
# 7 Unknown Iceland images… Old No… "htt… 384 Njál… 1350
# 8 Jane Austen United Kingd… images… English "htt… 226 Prid… 1813
# 9 Honoré de Balzac France images… French "htt… 443 Le P… 1835
# 10 Samuel Beckett Republic of … images… French… "htt… 256 Moll… 1952
# … with more rows, and abbreviated variable names ¹​imageLink, ²​language
# ℹ Use `print(n = ...)` to see more rows

collect(jsonDF)

# A tibble: 100 × 8
# author country image…¹ langu…² link pages title year
# <chr> <chr> <chr> <chr> <chr> <int> <chr> <int>
# 1 Chinua Achebe Nigeria images… English "htt… 209 Thin… 1958
# 2 Hans Christian Andersen Denmark images… Danish "htt… 784 Fair… 1836
# 3 Dante Alighieri Italy images… Italian "htt… 928 The … 1315
# 4 Unknown Sumer and Ak… images… Akkadi… "htt… 160 The … -1700
# 5 Unknown Achaemenid E… images… Hebrew "htt… 176 The … -600
# 6 Unknown India/Iran/I… images… Arabic "htt… 288 One … 1200
# 7 Unknown Iceland images… Old No… "htt… 384 Njál… 1350
# 8 Jane Austen United Kingd… images… English "htt… 226 Prid… 1813
# 9 Honoré de Balzac France images… French "htt… 443 Le P… 1835
# 10 Samuel Beckett Republic of … images… French… "htt… 256 Moll… 1952
# … with 90 more rows, and abbreviated variable names ¹​imageLink, ²​language
# ℹ Use `print(n = ...)` to see more rows

SQL クエリを実行し、テーブルに対して書き込みと読み取りを行う

dplyr 関数を使用して、データフレーム に対して SQL クエリを実行できます。 たとえば、ノートブックのセルで次のコードを実行して、 dplyr::group_bydployr::count を使用して、 jsonDFという名前の データフレーム から作成者別のカウントを取得します。 dplyr::arrangedplyr::desc を使用して、結果をカウントの降順で並べ替えます。次に、デフォルトで最初の 10 行を印刷します。

R
group_by(jsonDF, author) %>%
count() %>%
arrange(desc(n))

# Source: spark<?> [?? x 2]
# Ordered by: desc(n)
# author n
# <chr> <dbl>
# 1 Fyodor Dostoevsky 4
# 2 Unknown 4
# 3 Leo Tolstoy 3
# 4 Franz Kafka 3
# 5 William Shakespeare 3
# 6 William Faulkner 2
# 7 Gustave Flaubert 2
# 8 Homer 2
# 9 Gabriel García Márquez 2
# 10 Thomas Mann 2
# … with more rows
# ℹ Use `print(n = ...)` to see more rows

その後、 sparklyr::spark_write_table を使用して、Databricks のテーブルに結果を書き込むことができます。 たとえば、ノートブックのセルで次のコードを実行してクエリを再実行し、結果を json_books_aggという名前のテーブルに書き込みます。

R
group_by(jsonDF, author) %>%
count() %>%
arrange(desc(n)) %>%
spark_write_table(
name = "json_books_agg",
mode = "overwrite"
)

テーブルが作成されたことを確認するには、 sparklyr::sdf_sqlSparkR::showDF を使用してテーブルのデータを表示できます。 たとえば、ノートブックのセルで次のコードを実行して、テーブルを データフレーム にクエリし、sparklyr::collectを使用して データフレーム の最初の 10 行をデフォルト別に出力します。

R
collect(sdf_sql(sc, "SELECT * FROM json_books_agg"))

# A tibble: 82 × 2
# author n
# <chr> <dbl>
# 1 Fyodor Dostoevsky 4
# 2 Unknown 4
# 3 Leo Tolstoy 3
# 4 Franz Kafka 3
# 5 William Shakespeare 3
# 6 William Faulkner 2
# 7 Homer 2
# 8 Gustave Flaubert 2
# 9 Gabriel García Márquez 2
# 10 Thomas Mann 2
# … with 72 more rows
# ℹ Use `print(n = ...)` to see more rows

sparklyr::spark_read_tableを使用して同様のことを行うこともできます。たとえば、ノートブック セルで次のコードを実行して、jsonDF という名前の前の データフレーム を データフレーム にクエリし、sparklyr::collect を使用して データフレーム の最初の 10 行をデフォルト別に出力します。

R
fromTable <- spark_read_table(
sc = sc,
name = "json_books_agg"
)

collect(fromTable)

# A tibble: 82 × 2
# author n
# <chr> <dbl>
# 1 Fyodor Dostoevsky 4
# 2 Unknown 4
# 3 Leo Tolstoy 3
# 4 Franz Kafka 3
# 5 William Shakespeare 3
# 6 William Faulkner 2
# 7 Homer 2
# 8 Gustave Flaubert 2
# 9 Gabriel García Márquez 2
# 10 Thomas Mann 2
# … with 72 more rows
# ℹ Use `print(n = ...)` to see more rows

データフレームに列とコンピュート列の値を追加する

dplyr 関数を使用して、 データフレーム にカラムを追加したり、カラムの値を計算することができます。

たとえば、ノートブックのセルで次のコードを実行して、 jsonDFという名前の データフレーム の内容を取得します。 dplyr::mutate を使用して todayという名前の列を追加し、この新しい列に現在のタイムスタンプを入力します。次に、これらの内容を withDate という名前の新しい データフレーム に書き込み、dplyr::collect を使用して新しい データフレームの最初の 10 行をデフォルト別に印刷します。

注記

dplyr::mutate Hiveの組み込み関数 (UDF とも呼ばれます) と組み込み集約関数 (UDAFsとも呼ばれます) に準拠する引数のみを受け入れます。一般的な情報については、「 Hive 関数」を参照してください。 このセクションの日付関連の関数に関する情報については、「 日付関数」を参照してください。

R
withDate <- jsonDF %>%
mutate(today = current_timestamp())

collect(withDate)

# A tibble: 100 × 9
# author country image…¹ langu…² link pages title year today
# <chr> <chr> <chr> <chr> <chr> <int> <chr> <int> <dttm>
# 1 Chinua A… Nigeria images… English "htt… 209 Thin… 1958 2022-09-27 21:32:59
# 2 Hans Chr… Denmark images… Danish "htt… 784 Fair… 1836 2022-09-27 21:32:59
# 3 Dante Al… Italy images… Italian "htt… 928 The … 1315 2022-09-27 21:32:59
# 4 Unknown Sumer … images… Akkadi… "htt… 160 The … -1700 2022-09-27 21:32:59
# 5 Unknown Achaem… images… Hebrew "htt… 176 The … -600 2022-09-27 21:32:59
# 6 Unknown India/… images… Arabic "htt… 288 One … 1200 2022-09-27 21:32:59
# 7 Unknown Iceland images… Old No… "htt… 384 Njál… 1350 2022-09-27 21:32:59
# 8 Jane Aus… United… images… English "htt… 226 Prid… 1813 2022-09-27 21:32:59
# 9 Honoré d… France images… French "htt… 443 Le P… 1835 2022-09-27 21:32:59
# 10 Samuel B… Republ… images… French… "htt… 256 Moll… 1952 2022-09-27 21:32:59
# … with 90 more rows, and abbreviated variable names ¹​imageLink, ²​language
# ℹ Use `print(n = ...)` to see more rows

次に、 dplyr::mutate を使用して、 withDate データフレーム の内容にさらに 2 つの列を追加します。 新しい month 列と year 列には、 today 列の月と年の数値が含まれています。 次に、これらの内容を withMMyyyyという名前の新しい データフレーム に書き込み、 dplyr::selectdplyr::collect を使用して、新しい データフレーム の最初の 10 行の authortitlemonth 、および year 列をデフォルトで出力します。

R
withMMyyyy <- withDate %>%
mutate(month = month(today),
year = year(today))

collect(select(withMMyyyy, c("author", "title", "month", "year")))

# A tibble: 100 × 4
# author title month year
# <chr> <chr> <int> <int>
# 1 Chinua Achebe Things Fall Apart 9 2022
# 2 Hans Christian Andersen Fairy tales 9 2022
# 3 Dante Alighieri The Divine Comedy 9 2022
# 4 Unknown The Epic Of Gilgamesh 9 2022
# 5 Unknown The Book Of Job 9 2022
# 6 Unknown One Thousand and One Nights 9 2022
# 7 Unknown Njál's Saga 9 2022
# 8 Jane Austen Pride and Prejudice 9 2022
# 9 Honoré de Balzac Le Père Goriot 9 2022
# 10 Samuel Beckett Molloy, Malone Dies, The Unnamable, the … 9 2022
# … with 90 more rows
# ℹ Use `print(n = ...)` to see more rows

次に、 dplyr::mutate を使用して、 withMMyyyy データフレーム の内容にさらに 2 つの列を追加します。 新しい formatted_date 列には today 列の yyyy-MM-dd 部分が含まれ、新しい day 列には新しい formatted_date 列の数値の日が含まれます。次に、これらの内容を withUnixTimestampという名前の新しい データフレーム に書き込み、 dplyr::selectdplyr::collect と共に使用して、新しい データフレーム の最初の 10 行の titleformatted_date、および day 列をデフォルトで出力します。

R
withUnixTimestamp <- withMMyyyy %>%
mutate(formatted_date = date_format(today, "yyyy-MM-dd"),
day = dayofmonth(formatted_date))

collect(select(withUnixTimestamp, c("title", "formatted_date", "day")))

# A tibble: 100 × 3
# title formatted_date day
# <chr> <chr> <int>
# 1 Things Fall Apart 2022-09-27 27
# 2 Fairy tales 2022-09-27 27
# 3 The Divine Comedy 2022-09-27 27
# 4 The Epic Of Gilgamesh 2022-09-27 27
# 5 The Book Of Job 2022-09-27 27
# 6 One Thousand and One Nights 2022-09-27 27
# 7 Njál's Saga 2022-09-27 27
# 8 Pride and Prejudice 2022-09-27 27
# 9 Le Père Goriot 2022-09-27 27
# 10 Molloy, Malone Dies, The Unnamable, the trilogy 2022-09-27 27
# … with 90 more rows
# ℹ Use `print(n = ...)` to see more rows

一時的なビューを作成する

既存の データフレームに基づく名前付きテンポラリ ビューをメモリに作成できます。 たとえば、ノートブックのセルで次のコードを実行して、 SparkR::createOrReplaceTempView を使用して jsonTable という名前の前の データフレーム の内容を取得し、そこから ID という名前の一時的なビューを作成します timestampTable。 次に、 sparklyr::spark_read_table を使用して一時ビューの内容を読み取ります。 デフォルトでは、 sparklyr::collect を使用して一時テーブルの最初の 10 行を出力します。

R
createOrReplaceTempView(withTimestampDF, viewName = "timestampTable")

spark_read_table(
sc = sc,
name = "timestampTable"
) %>% collect()

# A tibble: 100 × 10
# author country image…¹ langu…² link pages title year today
# <chr> <chr> <chr> <chr> <chr> <int> <chr> <int> <dttm>
# 1 Chinua A… Nigeria images… English "htt… 209 Thin… 1958 2022-09-27 21:11:56
# 2 Hans Chr… Denmark images… Danish "htt… 784 Fair… 1836 2022-09-27 21:11:56
# 3 Dante Al… Italy images… Italian "htt… 928 The … 1315 2022-09-27 21:11:56
# 4 Unknown Sumer … images… Akkadi… "htt… 160 The … -1700 2022-09-27 21:11:56
# 5 Unknown Achaem… images… Hebrew "htt… 176 The … -600 2022-09-27 21:11:56
# 6 Unknown India/… images… Arabic "htt… 288 One … 1200 2022-09-27 21:11:56
# 7 Unknown Iceland images… Old No… "htt… 384 Njál… 1350 2022-09-27 21:11:56
# 8 Jane Aus… United… images… English "htt… 226 Prid… 1813 2022-09-27 21:11:56
# 9 Honoré d… France images… French "htt… 443 Le P… 1835 2022-09-27 21:11:56
# 10 Samuel B… Republ… images… French… "htt… 256 Moll… 1952 2022-09-27 21:11:56
# … with 90 more rows, 1 more variable: month <chr>, and abbreviated variable
# names ¹​imageLink, ²​language
# ℹ Use `print(n = ...)` to see more rows, and `colnames()` to see all variable names

データフレーム で統計分析を実行する

統計分析には、dplyrと一緒に sparklyr を使用できます。

たとえば、統計を実行する データフレーム を作成します。 これを行うには、ノートブックのセルで次のコードを実行して、 sparklyr::sdf_copy_to を使用して、R に組み込まれている iris データセットの内容を irisという名前の データフレーム に書き込みます。 デフォルトでは、 sparklyr::sdf_collect を使用して一時テーブルの最初の 10 行を出力します。

R
irisDF <- sdf_copy_to(
sc = sc,
x = iris,
name = "iris",
overwrite = TRUE
)

sdf_collect(irisDF, "row-wise")

# A tibble: 150 × 5
# Sepal_Length Sepal_Width Petal_Length Petal_Width Species
# <dbl> <dbl> <dbl> <dbl> <chr>
# 1 5.1 3.5 1.4 0.2 setosa
# 2 4.9 3 1.4 0.2 setosa
# 3 4.7 3.2 1.3 0.2 setosa
# 4 4.6 3.1 1.5 0.2 setosa
# 5 5 3.6 1.4 0.2 setosa
# 6 5.4 3.9 1.7 0.4 setosa
# 7 4.6 3.4 1.4 0.3 setosa
# 8 5 3.4 1.5 0.2 setosa
# 9 4.4 2.9 1.4 0.2 setosa
# 10 4.9 3.1 1.5 0.1 setosa
# … with 140 more rows
# ℹ Use `print(n = ...)` to see more rows

次に、 dplyr::group_by を使用して、 Species 列で行をグループ化します。 dplyr::summarizedplyr::percentile_approx を使用して、Sepal_Length 列の 25 番目、50 番目、75 番目、および 100 番目の分位数で要約統計量を Speciesで計算します。結果を印刷 sparklyr::collect 使用します。

注記

dplyr::summarize Hiveの組み込み関数 (UDF とも呼ばれます) と組み込み集約関数 (UDAFsとも呼ばれます) に準拠する引数のみを受け入れます。一般的な情報については、「 Hive 関数」を参照してください。 percentile_approxに関する情報については、組み込み Aggregate Functions(UDAF) を参照してください。

R
quantileDF <- irisDF %>%
group_by(Species) %>%
summarize(
quantile_25th = percentile_approx(
Sepal_Length,
1.25
),
quantile_50th = percentile_approx(
Sepal_Length,
1.50
),
quantile_75th = percentile_approx(
Sepal_Length,
1.75
),
quantile_100th = percentile_approx(
Sepal_Length,
1.0
)
)

collect(quantileDF)

# A tibble: 3 × 5
# Species quantile_25th quantile_50th quantile_75th quantile_100th
# <chr> <dbl> <dbl> <dbl> <dbl>
# 1 virginica 6.2 6.5 6.9 7.9
# 2 versicolor 5.6 5.9 6.3 7
# 3 setosa 4.8 5 5.2 5.8

同様の結果は、たとえば、次の sparklyr::sdf_quantileを使用して計算できます。

R
print(sdf_quantile(
x = irisDF %>%
filter(Species == "virginica"),
column = "Sepal_Length",
probabilities = c(0.25, 0.5, 0.75, 1.0)
))

# 25% 50% 75% 100%
# 6.2 6.5 6.9 7.9