メインコンテンツまでスキップ

SparkRからSparklyrへの移行

SparkRはApache Sparkの一部として開発され、その設計はScalaやPythonのユーザーには馴染み深いものですが、Rの実践者にとっては直感的ではない可能性があります。さらに、SparkRはSpark 4.0で非推奨となりました。

それに対し、 SparklyrはよりR言語に配慮したユーザー体験の提供に重点を置いている。 これは、 dplyr構文を活用しており、これは、DataFrame操作のためのselect()filter()mutate()などのパターンでtidyverseのユーザーには馴染みのあるものです。

Sparklyrは、 Apache Sparkを扱う際に推奨されるRパッケージです。 このページでは、 Spark APIs全体におけるSparkRと Sparklyr の違いについて説明し、コード移行に関する情報を提供します。

環境設定

インストール

Databricksワークスペースにいる場合、インストールは必要ありません。 Sparklyrをlibrary(sparklyr)でロードします。 SparklyrをDatabricks以外のローカル環境にインストールするには、 「はじめに」を参照してください。

Sparkに接続中

Databricksワークスペースで Sparklyr を使用するか、ローカルでDatabricks Connect使用してSparkに接続します。

ワークスペース :

R
library(sparklyr)
sc <- spark_connect(method = "databricks")

Databricks Connect :

R
sc <- spark_connect(method = "databricks_connect")

Databricks Connect with Sparklyr の詳細と拡張チュートリアルについては、 「 はじめに 」を参照してください。

データの読み書き

Sparklyrには、 SparkRの汎用的なread.df()およびwrite.df()関数とは異なり、データの読み込みと保存を行うためのspark_read_*()およびspark_write_*()関数群があります。 また、メモリ上のRデータフレームからSpark DataFramesやSpark SQL一時ビューを作成するための独自の機能も用意されています。

タスク

SparkR

sparklyr

データをSparkにコピーする

createDataFrame()

copy_to()

CREATE TEMPORARY VIEW

createOrReplaceTempView()

invoke()メソッドで直接使用してください

テーブルにデータを書き込む

saveAsTable()

spark_write_table()

指定された形式でデータを書き込む

write.df()

spark_write_<format>()

テーブルからデータを読み込む

tableToDF()

tbl() または spark_read_table()

指定された形式のデータを読み込む

read.df()

spark_read_<format>()

データ読み込み中

RデータフレームをSpark DataFrameに変換する場合、またはDataFrameから一時的なビューを作成してSQLを適用する場合:

R
mtcars_df <- createDataFrame(mtcars)

copy_to() 指定された名前を使用して一時的なビューを作成します。SQLを直接使用している場合は、名前を使用してデータを参照できます(例: sdf_sql() )。また、 copy_to()memory問題をTRUEに設定することでデータをキャッシュします。

ビューを作成する

以下のコード例は、一時ビューを作成する方法を示しています。

R
createOrReplaceTempView(mtcars_df, "mtcars_tmp_view")

データの書き込み

以下のコード例は、データの書き込み方法を示しています。

R
# Save a DataFrame to Unity Catalog
saveAsTable(
mtcars_df,
tableName = "<catalog>.<schema>.<table>",
mode = "overwrite"
)

# Save a DataFrame to local filesystem using Delta format
write.df(
mtcars_df,
path = "file:/<path/to/save/delta/mtcars>",
source = "delta",
mode = "overwrite"
)

データを読み込んでいます

以下のコード例は、データの読み取り方法を示しています。

R
# Load a Unity Catalog table as a DataFrame
tableToDF("<catalog>.<schema>.<table>")

# Load csv file into a DataFrame
read.df(
path = "file:/<path/to/read/csv/data.csv>",
source = "csv",
header = TRUE,
inferSchema = TRUE
)

# Load Delta from local filesystem as a DataFrame
read.df(
path = "file:/<path/to/read/delta/mtcars>",
source = "delta"
)

# Load data from a table using SQL - Databricks recommendeds using `tableToDF`
sql("SELECT * FROM <catalog>.<schema>.<table>")

データ処理

選択してフィルタリングする

R
# Select specific columns
select(mtcars_df, "mpg", "cyl", "hp")

# Filter rows where mpg > 20
filter(mtcars_df, mtcars_df$mpg > 20)

列を追加する

R
# Add a new column 'power_to_weight' (hp divided by wt)
withColumn(mtcars_df, "power_to_weight", mtcars_df$hp / mtcars_df$wt)

グループ化と集計

R
# Calculate average mpg and hp by number of cylinders
mtcars_df |>
groupBy("cyl") |>
summarize(
avg_mpg = avg(mtcars_df$mpg),
avg_hp = avg(mtcars_df$hp)
)

参加する

mtcarsに結合したいシリンダーラベルを含む別のデータセットがあるとします。

R
# Create another DataFrame with cylinder labels
cylinders <- data.frame(
cyl = c(4, 6, 8),
cyl_label = c("Four", "Six", "Eight")
)
cylinders_df <- createDataFrame(cylinders)

# Join mtcars_df with cylinders_df
join(
x = mtcars_df,
y = cylinders_df,
mtcars_df$cyl == cylinders_df$cyl,
joinType = "inner"
)

ユーザー定義関数(UDF)

分類用のカスタム関数を作成するには:

R
# Define the custom function
categorize_hp <- function(df)
df$hp_category <- ifelse(df$hp > 150, "High", "Low") # a real-world example would use case_when() with mutate()
df

SparkRでは、関数を適用する前に出力スキーマを明示的に定義する必要があります。

R
# Define the schema for the output DataFrame
schema <- structType(
structField("mpg", "double"),
structField("cyl", "double"),
structField("disp", "double"),
structField("hp", "double"),
structField("drat", "double"),
structField("wt", "double"),
structField("qsec", "double"),
structField("vs", "double"),
structField("am", "double"),
structField("gear", "double"),
structField("carb", "double"),
structField("hp_category", "string")
)

# Apply the function across partitions
dapply(
mtcars_df,
func = categorize_hp,
schema = schema
)

# Apply the same function to each group of a DataFrame. Note that the schema is still required.
gapply(
mtcars_df,
cols = "hp",
func = categorize_hp,
schema = schema
)

spark.lapply() と spark_apply() の比較

SparkRでは、 spark.lapply() DataFramesではなくRリストに対して操作を行います。 Sparklyrには直接同等の機能はありませんが、一意の識別子を含むDataFrameを使用し、それらのIDでグループ化することで、 spark_apply()と同様の動作を実現できます。 場合によっては、行単位の操作でも同等の機能を提供できる。spark_apply()詳細については、 「R 計算の分散」を参照してください。

R
# Define a list of integers
numbers <- list(1, 2, 3, 4, 5)

# Define a function to apply
square <- function(x)
x * x

# Apply the function over list using Spark
spark.lapply(numbers, square)

機械学習

機械学習用の完全なSparkRおよび Sparklyr の例は、 Spark MLガイドおよびSparklyr リファレンスにあります。

注記

Spark MLlibを使用していない場合は、DatabricksはUDFを使用して任意のライブラリでトレーニングすることを推奨します(例: xgboost )。

線形回帰

R
# Select features
training_df <- select(mtcars_df, "mpg", "hp", "wt")

# Fit the model using Generalized Linear Model (GLM)
linear_model <- spark.glm(training_df, mpg ~ hp + wt, family = "gaussian")

# View model summary
summary(linear_model)

K平均クラスタリング

R
# Apply KMeans clustering with 3 clusters using mpg and hp as features
kmeans_model <- spark.kmeans(mtcars_df, mpg ~ hp, k = 3)

# Get cluster predictions
predict(kmeans_model, mtcars_df)

パフォーマンスと最適化

収集

SparkRとSparklyrはどちらもcollect()を使用してSpark DataFrames Rデータフレームに変換します。 Rデータフレームに収集するデータは少量にしてください。そうしないと、 Sparkドライバがメモリ不足になります。

メモリ不足エラーを防ぐため、 SparkRはDatabricks Runtimeに、データの収集やユーザー定義関数の実行を支援する最適化機能があります。

Databricks Runtimeバージョン 14.3 LTS未満で Sparklyr を使用してデータ収集および UDF の最適なパフォーマンスを確保するには、 arrowパッケージをロードしてください。

R
library(arrow)

インメモリパーティショニング

R
# Repartition the SparkDataFrame based on 'cyl' column
repartition(mtcars_df, col = mtcars_df$cyl)

# Repartition the SparkDataFrame to number of partitions
repartition(mtcars_df, numPartitions = 10)

# Coalesce the DataFrame to number of partitions
coalesce(mtcars_df, numPartitions = 1)

# Get number of partitions
getNumPartitions(mtcars_df)

キャッシング

R
# Cache the DataFrame in memory
cache(mtcars_df)