Migre do SparkR para Sparklyr
O SparkR foi desenvolvido como parte do Apache Spark, e seu design é familiar para usuários de Scala e Python, mas potencialmente menos intuitivo para profissionais de R. Além disso, o SparkR foi descontinuado no Spark 4.0.
Em contrapartida, Sparklyr concentra-se em proporcionar uma experiência mais amigável para leitores iniciantes. Ele aproveita a sintaxe dplyr , que é familiar aos usuários do tidyverse com padrões como select(), filter() e mutate() para operações de DataFrame.
Sparklyr é o pacote R recomendado para trabalhar com Apache Spark. Esta página explica as diferenças entre SparkR e Sparklyr nas APIs Spark e fornece informações sobre migração de código.
Configuração do ambiente
Instalação
Se você estiver no workspace Databricks , nenhuma instalação é necessária. Carregar Sparklyr com library(sparklyr). Para instalar Sparklyr localmente fora do Databricks, consulte Obter início.
Conectando-se ao Spark
Conecte-se ao Spark com Sparklyr no workspace Databricks ou localmente usando Databricks Connect:
workspace :
library(sparklyr)
sc <- spark_connect(method = "databricks")
Databricks Connect :
sc <- spark_connect(method = "databricks_connect")
Para obter mais detalhes e um tutorial mais completo sobre Databricks Connect com Sparklyr, consulte Introdução.
Leitura e escrita de dados
Sparklyr possui uma família de funções spark_read_*() e spark_write_*() para carregar e salvar dados, diferentemente das funções genéricas read.df() e write.df() do SparkR. Existem também funções exclusivas para criar DataFrames Spark ou visualizações temporárias Spark SQL a partir de data frames do R na memória.
Tarefa | SparkR | Sparklyr |
|---|---|---|
Copiar dados para o Spark |
|
|
Criar viewtemporária |
| Use |
Escrever dados na tabela |
|
|
Escrever dados em um formato específico |
|
|
Leia os dados da tabela. |
|
|
Ler dados de um formato específico |
|
|
Carregando dados
Para converter um data frame do R em um DataFrame Spark , ou para criar uma view temporária a partir de um DataFrame para aplicar SQL a ele:
- SparkR
- sparklyr
mtcars_df <- createDataFrame(mtcars)
mtcars_tbl <- copy_to(
sc,
df = mtcars,
name = "mtcars_tmp",
overwrite = TRUE,
memory = FALSE
)
copy_to() Cria uma view temporária usando o nome especificado. Você pode usar o nome para referenciar os dados se estiver usando SQL diretamente (por exemplo, sdf_sql()). Além disso, copy_to() armazena dados em cache definindo o parâmetro memory como TRUE.
Criando visualização
Os exemplos de código a seguir mostram como as visualizações temporárias são criadas:
- SparkR
- sparklyr
createOrReplaceTempView(mtcars_df, "mtcars_tmp_view")
spark_dataframe(mtcars_tbl) |>
invoke("createOrReplaceTempView", "mtcars_tmp_view")
Escrita de dados
Os exemplos de código a seguir mostram como os dados são gravados:
- SparkR
- sparklyr
# Save a DataFrame to Unity Catalog
saveAsTable(
mtcars_df,
tableName = "<catalog>.<schema>.<table>",
mode = "overwrite"
)
# Save a DataFrame to local filesystem using Delta format
write.df(
mtcars_df,
path = "file:/<path/to/save/delta/mtcars>",
source = "delta",
mode = "overwrite"
)
# Save tbl_spark to Unity Catalog
spark_write_table(
mtcars_tbl,
name = "<catalog>.<schema>.<table>",
mode = "overwrite"
)
# Save tbl_spark to local filesystem using Delta format
spark_write_delta(
mtcars_tbl,
path = "file:/<path/to/save/delta/mtcars>",
mode = "overwrite"
)
# Use DBI
library(DBI)
dbWriteTable(
sc,
value = mtcars_tbl,
name = "<catalog>.<schema>.<table>",
overwrite = TRUE
)
Dados de leitura
Os exemplos de código a seguir mostram como os dados são lidos:
- SparkR
- sparklyr
# Load a Unity Catalog table as a DataFrame
tableToDF("<catalog>.<schema>.<table>")
# Load csv file into a DataFrame
read.df(
path = "file:/<path/to/read/csv/data.csv>",
source = "csv",
header = TRUE,
inferSchema = TRUE
)
# Load Delta from local filesystem as a DataFrame
read.df(
path = "file:/<path/to/read/delta/mtcars>",
source = "delta"
)
# Load data from a table using SQL - Databricks recommendeds using `tableToDF`
sql("SELECT * FROM <catalog>.<schema>.<table>")
# Load table from Unity Catalog with dplyr
tbl(sc, "<catalog>.<schema>.<table>")
# or using `in_catalog`
tbl(sc, in_catalog("<catalog>", "<schema>", "<table>"))
# Load csv from local filesystem as tbl_spark
spark_read_csv(
sc,
name = "mtcars_csv",
path = "file:/<path/to/csv/mtcars>",
header = TRUE,
infer_schema = TRUE
)
# Load delta from local filesystem as tbl_spark
spark_read_delta(
sc,
name = "mtcars_delta",
path = "file:/tmp/test/sparklyr1"
)
# Load data using SQL
sdf_sql(sc, "SELECT * FROM <catalog>.<schema>.<table>")
Processamento de dados
Selecione e filtre
- SparkR
- sparklyr
# Select specific columns
select(mtcars_df, "mpg", "cyl", "hp")
# Filter rows where mpg > 20
filter(mtcars_df, mtcars_df$mpg > 20)
# Select specific columns
mtcars_tbl |>
select(mpg, cyl, hp)
# Filter rows where mpg > 20
mtcars_tbl |>
filter(mpg > 20)
Adicionar colunas
- SparkR
- sparklyr
# Add a new column 'power_to_weight' (hp divided by wt)
withColumn(mtcars_df, "power_to_weight", mtcars_df$hp / mtcars_df$wt)
# Add a new column 'power_to_weight' (hp divided by wt)
mtcars_tbl |>
mutate(power_to_weight = hp / wt)
Agrupamento e agregação
- SparkR
- sparklyr
# Calculate average mpg and hp by number of cylinders
mtcars_df |>
groupBy("cyl") |>
summarize(
avg_mpg = avg(mtcars_df$mpg),
avg_hp = avg(mtcars_df$hp)
)
# Calculate average mpg and hp by number of cylinders
mtcars_tbl |>
group_by(cyl) |>
summarize(
avg_mpg = mean(mpg),
avg_hp = mean(hp)
)
join
Suponha que temos outro dataset com rótulo de cilindro que queremos join ao mtcars.
- SparkR
- sparklyr
# Create another DataFrame with cylinder labels
cylinders <- data.frame(
cyl = c(4, 6, 8),
cyl_label = c("Four", "Six", "Eight")
)
cylinders_df <- createDataFrame(cylinders)
# Join mtcars_df with cylinders_df
join(
x = mtcars_df,
y = cylinders_df,
mtcars_df$cyl == cylinders_df$cyl,
joinType = "inner"
)
# Create another SparkDataFrame with cylinder labels
cylinders <- data.frame(
cyl = c(4, 6, 8),
cyl_label = c("Four", "Six", "Eight")
)
cylinders_tbl <- copy_to(sc, cylinders, "cylinders", overwrite = TRUE)
# join mtcars_df with cylinders_tbl
mtcars_tbl |>
inner_join(cylinders_tbl, by = join_by(cyl))
Funções definidas pelo usuário (UDFs)
Para criar uma função personalizada de categorização:
# Define the custom function
categorize_hp <- function(df)
df$hp_category <- ifelse(df$hp > 150, "High", "Low") # a real-world example would use case_when() with mutate()
df
- SparkR
- sparklyr
O SparkR exige que o esquema de saída seja definido explicitamente antes da aplicação de uma função:
# Define the schema for the output DataFrame
schema <- structType(
structField("mpg", "double"),
structField("cyl", "double"),
structField("disp", "double"),
structField("hp", "double"),
structField("drat", "double"),
structField("wt", "double"),
structField("qsec", "double"),
structField("vs", "double"),
structField("am", "double"),
structField("gear", "double"),
structField("carb", "double"),
structField("hp_category", "string")
)
# Apply the function across partitions
dapply(
mtcars_df,
func = categorize_hp,
schema = schema
)
# Apply the same function to each group of a DataFrame. Note that the schema is still required.
gapply(
mtcars_df,
cols = "hp",
func = categorize_hp,
schema = schema
)
# Load Arrow to avoid cryptic errors
library(arrow)
# Apply the function over data.
# By default this applies to each partition.
mtcars_tbl |>
spark_apply(f = categorize_hp)
# Apply the function over data
# Use `group_by` to apply data over groups
mtcars_tbl |>
spark_apply(
f = summary,
group_by = "hp" # This isn't changing the resulting output as the functions behavior is applied to rows independently.
)
spark.lapply() vs spark_apply()
No SparkR, spark.lapply() opera em listas R em vez de DataFrames. Não existe um equivalente direto no Sparklyr, mas você pode obter um comportamento semelhante com spark_apply() trabalhando com um DataFrame que inclua identificadores únicos e agrupando por esses IDs. Em alguns casos, as operações linha a linha também podem fornecer funcionalidade comparável. Para obter mais informações sobre spark_apply(), consulte Distribuindo cálculos R.
- SparkR
- sparklyr
# Define a list of integers
numbers <- list(1, 2, 3, 4, 5)
# Define a function to apply
square <- function(x)
x * x
# Apply the function over list using Spark
spark.lapply(numbers, square)
# Create a DataFrame of given length
sdf <- sdf_len(sc, 5, repartition = 1)
# Apply function to each partition of the DataFrame
# spark_apply() defaults to processing data based on number of partitions.
# In this case it will return a single row due to repartition = 1.
spark_apply(sdf, f = nrow)
# Apply function to each row (option 1)
# To force behaviour like spark.lapply() you can create a DataFrame with N rows and force grouping with group_by set to a unique row identifier. In this case it's the id column automatically generated by sdf_len()). This will return N rows.
spark_apply(sdf, f = nrow, group_by = "id")
# Apply function to each row (option 2)
# This requires writing a function that operates across rows of a data.frame, in some occasions this may be faster relative to option 1. Specifying group_by in optional for this example. This example does not require rowwise(), but is just to illustrate one method to force computations to be for every row.
row_func <- function(df)
df |>
dplyr::rowwise() |>
dplyr::mutate(x = id * 2)
spark_apply(sdf, f = row_func)
Aprendizado de máquina
Exemplos completos SparkR e Sparklyr para machine learning estão disponíveis no guiaSpark ML e na referênciaSparklyr.
Se você não estiver usando Spark MLlib, Databricks recomenda o uso de UDFs para ensinar com a biblioteca de sua escolha (por exemplo, xgboost).
Regressão linear
- SparkR
- sparklyr
# Select features
training_df <- select(mtcars_df, "mpg", "hp", "wt")
# Fit the model using Generalized Linear Model (GLM)
linear_model <- spark.glm(training_df, mpg ~ hp + wt, family = "gaussian")
# View model summary
summary(linear_model)
# Select features
training_tbl <- mtcars_tbl |>
select(mpg, hp, wt)
# Fit the model using Generalized Linear Model (GLM)
linear_model <- training_tbl |>
ml_linear_regression(response = "mpg", features = c("hp", "wt"))
# View model summary
summary(linear_model)
clusteringK-means
- SparkR
- sparklyr
# Apply KMeans clustering with 3 clusters using mpg and hp as features
kmeans_model <- spark.kmeans(mtcars_df, mpg ~ hp, k = 3)
# Get cluster predictions
predict(kmeans_model, mtcars_df)
# Use mpg and hp as features
features_tbl <- mtcars_tbl |>
select(mpg, hp)
# Assemble features into a vector column
features_vector_tbl <- features_tbl |>
ft_vector_assembler(
input_cols = c("mpg", "hp"),
output_col = "features"
)
# Apply K-Means clustering
kmeans_model <- features_vector_tbl |>
ml_kmeans(features_col = "features", k = 3)
# Get cluster predictions
ml_predict(kmeans_model, features_vector_tbl)
desempenho e otimização
Colecionando
Tanto SparkR quanto Sparklyr usam collect() para converter DataFrames Spark em DataFrames do R. Colete apenas pequenas quantidades de dados de volta para data frames do R, ou o driver Spark ficará sem memória.
Para evitar erros de falta de memória, SparkR possui otimizações integradas no Databricks Runtime que ajudam a coletar dados ou executar funções definidas pelo usuário.
Para garantir o desempenho ideal do Sparklyr na coleta de dados e UDFs em versões Databricks Runtime anteriores à 14.3 LTS, carregue o pacote arrow :
library(arrow)
Particionamento em memória
- SparkR
- sparklyr
# Repartition the SparkDataFrame based on 'cyl' column
repartition(mtcars_df, col = mtcars_df$cyl)
# Repartition the SparkDataFrame to number of partitions
repartition(mtcars_df, numPartitions = 10)
# Coalesce the DataFrame to number of partitions
coalesce(mtcars_df, numPartitions = 1)
# Get number of partitions
getNumPartitions(mtcars_df)
# Repartition the tbl_spark based on 'cyl' column
sdf_repartition(mtcars_tbl, partition_by = "cyl")
# Repartition the tbl_spark to number of partitions
sdf_repartition(mtcars_tbl, partitions = 10)
# Coalesce the tbl_spark to number of partitions
sdf_coalesce(mtcars_tbl, partitions = 1)
# Get number of partitions
sdf_num_partitions(mtcars_tbl)
Armazenamento em cache
- SparkR
- sparklyr
# Cache the DataFrame in memory
cache(mtcars_df)
# Cache the tbl_spark in memory
tbl_cache(sc, name = "mtcars_tmp")