Pular para o conteúdo principal

Migre do SparkR para Sparklyr

O SparkR foi desenvolvido como parte do Apache Spark, e seu design é familiar para usuários de Scala e Python, mas potencialmente menos intuitivo para profissionais de R. Além disso, o SparkR foi descontinuado no Spark 4.0.

Em contrapartida, Sparklyr concentra-se em proporcionar uma experiência mais amigável para leitores iniciantes. Ele aproveita a sintaxe dplyr , que é familiar aos usuários do tidyverse com padrões como select(), filter() e mutate() para operações de DataFrame.

Sparklyr é o pacote R recomendado para trabalhar com Apache Spark. Esta página explica as diferenças entre SparkR e Sparklyr nas APIs Spark e fornece informações sobre migração de código.

Configuração do ambiente

Instalação

Se você estiver no workspace Databricks , nenhuma instalação é necessária. Carregar Sparklyr com library(sparklyr). Para instalar Sparklyr localmente fora do Databricks, consulte Obter início.

Conectando-se ao Spark

Conecte-se ao Spark com Sparklyr no workspace Databricks ou localmente usando Databricks Connect:

workspace :

R
library(sparklyr)
sc <- spark_connect(method = "databricks")

Databricks Connect :

R
sc <- spark_connect(method = "databricks_connect")

Para obter mais detalhes e um tutorial mais completo sobre Databricks Connect com Sparklyr, consulte Introdução.

Leitura e escrita de dados

Sparklyr possui uma família de funções spark_read_*() e spark_write_*() para carregar e salvar dados, diferentemente das funções genéricas read.df() e write.df() do SparkR. Existem também funções exclusivas para criar DataFrames Spark ou visualizações temporárias Spark SQL a partir de data frames do R na memória.

Tarefa

SparkR

Sparklyr

Copiar dados para o Spark

createDataFrame()

copy_to()

Criar viewtemporária

createOrReplaceTempView()

Use invoke() com o método diretamente

Escrever dados na tabela

saveAsTable()

spark_write_table()

Escrever dados em um formato específico

write.df()

spark_write_<format>()

Leia os dados da tabela.

tableToDF()

tbl() ou spark_read_table()

Ler dados de um formato específico

read.df()

spark_read_<format>()

Carregando dados

Para converter um data frame do R em um DataFrame Spark , ou para criar uma view temporária a partir de um DataFrame para aplicar SQL a ele:

R
mtcars_df <- createDataFrame(mtcars)

copy_to() Cria uma view temporária usando o nome especificado. Você pode usar o nome para referenciar os dados se estiver usando SQL diretamente (por exemplo, sdf_sql()). Além disso, copy_to() armazena dados em cache definindo o parâmetro memory como TRUE.

Criando visualização

Os exemplos de código a seguir mostram como as visualizações temporárias são criadas:

R
createOrReplaceTempView(mtcars_df, "mtcars_tmp_view")

Escrita de dados

Os exemplos de código a seguir mostram como os dados são gravados:

R
# Save a DataFrame to Unity Catalog
saveAsTable(
mtcars_df,
tableName = "<catalog>.<schema>.<table>",
mode = "overwrite"
)

# Save a DataFrame to local filesystem using Delta format
write.df(
mtcars_df,
path = "file:/<path/to/save/delta/mtcars>",
source = "delta",
mode = "overwrite"
)

Dados de leitura

Os exemplos de código a seguir mostram como os dados são lidos:

R
# Load a Unity Catalog table as a DataFrame
tableToDF("<catalog>.<schema>.<table>")

# Load csv file into a DataFrame
read.df(
path = "file:/<path/to/read/csv/data.csv>",
source = "csv",
header = TRUE,
inferSchema = TRUE
)

# Load Delta from local filesystem as a DataFrame
read.df(
path = "file:/<path/to/read/delta/mtcars>",
source = "delta"
)

# Load data from a table using SQL - Databricks recommendeds using `tableToDF`
sql("SELECT * FROM <catalog>.<schema>.<table>")

Processamento de dados

Selecione e filtre

R
# Select specific columns
select(mtcars_df, "mpg", "cyl", "hp")

# Filter rows where mpg > 20
filter(mtcars_df, mtcars_df$mpg > 20)

Adicionar colunas

R
# Add a new column 'power_to_weight' (hp divided by wt)
withColumn(mtcars_df, "power_to_weight", mtcars_df$hp / mtcars_df$wt)

Agrupamento e agregação

R
# Calculate average mpg and hp by number of cylinders
mtcars_df |>
groupBy("cyl") |>
summarize(
avg_mpg = avg(mtcars_df$mpg),
avg_hp = avg(mtcars_df$hp)
)

join

Suponha que temos outro dataset com rótulo de cilindro que queremos join ao mtcars.

R
# Create another DataFrame with cylinder labels
cylinders <- data.frame(
cyl = c(4, 6, 8),
cyl_label = c("Four", "Six", "Eight")
)
cylinders_df <- createDataFrame(cylinders)

# Join mtcars_df with cylinders_df
join(
x = mtcars_df,
y = cylinders_df,
mtcars_df$cyl == cylinders_df$cyl,
joinType = "inner"
)

Funções definidas pelo usuário (UDFs)

Para criar uma função personalizada de categorização:

R
# Define the custom function
categorize_hp <- function(df)
df$hp_category <- ifelse(df$hp > 150, "High", "Low") # a real-world example would use case_when() with mutate()
df

O SparkR exige que o esquema de saída seja definido explicitamente antes da aplicação de uma função:

R
# Define the schema for the output DataFrame
schema <- structType(
structField("mpg", "double"),
structField("cyl", "double"),
structField("disp", "double"),
structField("hp", "double"),
structField("drat", "double"),
structField("wt", "double"),
structField("qsec", "double"),
structField("vs", "double"),
structField("am", "double"),
structField("gear", "double"),
structField("carb", "double"),
structField("hp_category", "string")
)

# Apply the function across partitions
dapply(
mtcars_df,
func = categorize_hp,
schema = schema
)

# Apply the same function to each group of a DataFrame. Note that the schema is still required.
gapply(
mtcars_df,
cols = "hp",
func = categorize_hp,
schema = schema
)

spark.lapply() vs spark_apply()

No SparkR, spark.lapply() opera em listas R em vez de DataFrames. Não existe um equivalente direto no Sparklyr, mas você pode obter um comportamento semelhante com spark_apply() trabalhando com um DataFrame que inclua identificadores únicos e agrupando por esses IDs. Em alguns casos, as operações linha a linha também podem fornecer funcionalidade comparável. Para obter mais informações sobre spark_apply(), consulte Distribuindo cálculos R.

R
# Define a list of integers
numbers <- list(1, 2, 3, 4, 5)

# Define a function to apply
square <- function(x)
x * x

# Apply the function over list using Spark
spark.lapply(numbers, square)

Aprendizado de máquina

Exemplos completos SparkR e Sparklyr para machine learning estão disponíveis no guiaSpark ML e na referênciaSparklyr.

nota

Se você não estiver usando Spark MLlib, Databricks recomenda o uso de UDFs para ensinar com a biblioteca de sua escolha (por exemplo, xgboost).

Regressão linear

R
# Select features
training_df <- select(mtcars_df, "mpg", "hp", "wt")

# Fit the model using Generalized Linear Model (GLM)
linear_model <- spark.glm(training_df, mpg ~ hp + wt, family = "gaussian")

# View model summary
summary(linear_model)

clusteringK-means

R
# Apply KMeans clustering with 3 clusters using mpg and hp as features
kmeans_model <- spark.kmeans(mtcars_df, mpg ~ hp, k = 3)

# Get cluster predictions
predict(kmeans_model, mtcars_df)

desempenho e otimização

Colecionando

Tanto SparkR quanto Sparklyr usam collect() para converter DataFrames Spark em DataFrames do R. Colete apenas pequenas quantidades de dados de volta para data frames do R, ou o driver Spark ficará sem memória.

Para evitar erros de falta de memória, SparkR possui otimizações integradas no Databricks Runtime que ajudam a coletar dados ou executar funções definidas pelo usuário.

Para garantir o desempenho ideal do Sparklyr na coleta de dados e UDFs em versões Databricks Runtime anteriores à 14.3 LTS, carregue o pacote arrow :

R
library(arrow)

Particionamento em memória

R
# Repartition the SparkDataFrame based on 'cyl' column
repartition(mtcars_df, col = mtcars_df$cyl)

# Repartition the SparkDataFrame to number of partitions
repartition(mtcars_df, numPartitions = 10)

# Coalesce the DataFrame to number of partitions
coalesce(mtcars_df, numPartitions = 1)

# Get number of partitions
getNumPartitions(mtcars_df)

Armazenamento em cache

R
# Cache the DataFrame in memory
cache(mtcars_df)