SparkR の概要

SparkR は、R の Apache Spark を使用するための軽量フロントエンドを提供する R パッケージであり、SparkR は MLlib を使用した分散機械学習もサポートしています。

SparkR 関数リファレンス

最新の SparkR 関数リファレンスについては、 spark.apache.org を参照してください。

SparkR パッケージをインポートした後、R ノートブックまたは RStudio で関数のヘルプを表示することもできます。

埋め込み R ドキュメント

ノートブックの SparkR

  • Spark 2.0 以降では、すべての関数呼び出しに sqlContext オブジェクトを明示的に渡す必要はありません。

  • Spark 2.2 以降では、SparkR 関数が他の一般的なパッケージの同様の名前の関数と競合していたため、ノートブックは SparkR をデフォルトでインポートしなくなりました。 SparkR を使用するには、ノートブックで library(SparkR) 呼び出します。 SparkR セッションは既に構成されており、すべての SparkR 関数は、既存のセッションを使用してアタッチされたクラスターと通信します。

spark-submit ジョブの SparkR

Databricks で SparkR を使用するスクリプトは、コードを少し変更するだけで、spark-submit ジョブとして実行できます。

SparkR DataFrames を作成する

DataFrame は、ローカル R data.frame、データソース、または Spark SQL クエリーを使用して作成できます。

ローカルR data.frame から

DataFrame を作成する最も簡単な方法は、ローカル R data.frameSparkDataFrameに変換することです。 具体的には、 createDataFrame を使用し、ローカルの R data.frame を渡して SparkDataFrameを作成できます。 他のほとんどの SparkR 関数と同様に、 createDataFrame構文は Spark 2.0 で変更されました。 この例は、以下のコードスニペットで確認できます。 その他の例については、「 createDataFrame」を参照してください。

library(SparkR)
df <- createDataFrame(faithful)

# Displays the content of the DataFrame to stdout
head(df)

データソース API の使用

データソースから DataFrame を作成する一般的な方法は read.dfです。 このメソッドは、ロードするファイルのパスと Data の種類を受け取ります。 SparkR では、CSV、JSON、テキスト、および Parquet ファイルのネイティブ読み取りがサポートされています。

library(SparkR)
diamondsDF <- read.df("/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv", source = "csv", header="true", inferSchema = "true")
head(diamondsDF)

SparkR は、CSV ファイルからスキーマを自動的に推測します。

Spark パッケージを使用したデータソースコネクタの追加

Spark パッケージでは、Avro などの一般的なファイル形式用のデータソース コネクタを見つけることができます。 たとえば、 spark-avro パッケージ を使用して Avro ファイルを読み込みます。 spark-avro パッケージを使用できるかどうかは、クラスター のバージョンによって異なります。 「Avro ファイル」を参照してください。

まず、既存の data.frameを取得し、Spark DataFrameに変換して、Avro ファイルとして保存します。

require(SparkR)
irisDF <- createDataFrame(iris)
write.df(irisDF, source = "com.databricks.spark.avro", path = "dbfs:/tmp/iris.avro", mode = "overwrite")

Avro ファイルが保存されたことを確認するには:

%fs ls /tmp/iris.avro

ここで、spark-avro パッケージを再度使用して、データを読み取ります。

irisDF2 <- read.df(path = "/tmp/iris.avro", source = "com.databricks.spark.avro")
head(irisDF2)

データソースAPIを使用して、 DataFrames を複数のファイル形式に保存することもできます。 たとえば、 write.dfを使用して、前の例の DataFrame を Parquet ファイルに保存できます。

write.df(irisDF2, path="dbfs:/tmp/iris.parquet", source="parquet", mode="overwrite")
%fs ls dbfs:/tmp/iris.parquet

Spark SQL クエリーから

Spark SQL クエリーを使用して SparkR DataFrames を作成することもできます。

# Register earlier df as temp view
createOrReplaceTempView(irisDF2, "irisTemp")
# Create a df consisting of only the 'species' column using a Spark SQL query
species <- sql("SELECT species FROM irisTemp")

species は SparkDataFrame です。

DataFrame 操作

Spark DataFrames は、構造化データ処理を行うための多くの関数をサポートしています。 ここにいくつかの基本的な例があります。 完全なリストは APIドキュメントにあります。

行と列の選択

# Import SparkR package if this is a new notebook
require(SparkR)

# Create DataFrame
df <- createDataFrame(faithful)
# Select only the "eruptions" column
head(select(df, df$eruptions))
# You can also pass in column name as strings
head(select(df, "eruptions"))
# Filter the DataFrame to only retain rows with wait times shorter than 50 mins
head(filter(df, df$waiting < 50))

グループ化と集計

SparkDataFrames では、グループ化後にデータを集計するために一般的に使用される関数が多数サポートされています。 たとえば、忠実なデータセットに各待機時間が表示される回数をカウントできます。

head(count(groupBy(df, df$waiting)))
# You can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- count(groupBy(df, df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))

列の操作

SparkR には、データ処理と集計のために列に直接適用できる関数が多数用意されています。 次の例は、基本的な算術関数の使用方法を示しています。

# Convert waiting time from hours to seconds.
# You can assign this to a new column in the same DataFrame
df$waiting_secs <- df$waiting * 60
head(df)

機械学習

SparkR は、 MLlib アルゴリズムのほとんどを公開しています。 内部的には、SparkR は MLlib を使用してモデルをトレーニングします。

次の例は、SparkR を使用してガウス GLM モデルを構築する方法を示しています。 線形回帰を実行するには、family を "gaussian"に設定します。 ロジスティック回帰を実行するには、family を "binomial"に設定します。 SparkML GLM を使用すると、SparkR はカテゴリ特徴のワンホット エンコードを自動的に実行するため、手動で行う必要はありません。 文字列型と倍精度浮動小数点型の特徴に加えて、他の MLlib コンポーネントとの互換性のために、MLlib ベクターの特徴の上に収めることもできます。

# Create the DataFrame
df <- createDataFrame(iris)

# Fit a linear model over the dataset.
model <- glm(Sepal_Length ~ Sepal_Width + Species, data = df, family = "gaussian")

# Model coefficients are returned in a similar format to R's native glm().
summary(model)

チュートリアルについては、「 チュートリアル: glm を使用したデータの分析」を参照してください。

その他の例については、「 R での DataFrames とテーブルの操作」を参照してください。