Trabalhar com DataFrames e tabelas em R

Este artigo descreve como usar pacotes R como SparkR, sparklyr e dplyr para trabalhar com R data.frames, Spark DataFrames e tabelas na memória.

Observe que, ao trabalhar com SparkR, sparklyr e dplyr, você pode concluir operações específicas com todos esses pacotes e pode usar o pacote com o qual se sente mais confortável. Por exemplo, para executar uma query, você pode chamar funções como SparkR::sql, sparklyr::sdf_sql e dplyr::select. Em outras ocasiões, você pode concluir uma operação com apenas um ou dois desses pacotes, e as operações escolhidas dependem do seu cenário de uso. Por exemplo, a maneira como você chama sparklyr::sdf_quantile difere ligeiramente da maneira como você chama dplyr::percentile_approx, embora ambas as funções calculem quantis.

Você pode usar SQL como uma ponte entre SparkR e sparklyr. Por exemplo, você pode usar SparkR::sql para query tabelas criadas com sparklyr. Você pode usar sparklyr::sdf_sql para query tabelas criadas com SparkR. E o código dplyr sempre é traduzido para SQL na memória antes de ser executado. Consulte também interoperabilidade de API e tradução de SQL.

Carregar SparkR, sparklyr e dplyr

Os pacotes SparkR, sparklyr e dplyr estão incluídos no Databricks Runtime instalado em clusters Databricks. Portanto, você não precisa ligar para o install.package normal antes de começar a ligar para esses pacotes. Entretanto, você ainda deve carregar esses pacotes com library primeiro. Por exemplo, de dentro de um R Notebook em um workspace do Databricks, execute o seguinte código em uma célula Notebook para carregar SparkR, sparklyr e dplyr:

library(SparkR)
library(sparklyr)
library(dplyr)

Conecte o sparklyr a um clusters

Depois de carregar o sparklyr, você deve chamar sparklyr::spark_connect para se conectar aos clusters, especificando o método de conexão databricks. Por exemplo, execute o seguinte código em uma célula Notebook para se conectar aos clusters que hospedam o Notebook:

sc <- spark_connect(method = "databricks")

Por outro lado, um Databricks Notebook já estabelece um SparkSession nos clusters para uso com SparkR, portanto, você não precisa chamar SparkR::sparkR.session antes de começar a chamar SparkR.

Carregue um arquivo de dados JSON para seu espaço de trabalho

Muitos dos exemplos de código neste artigo são baseados em dados em um local específico em seu workspace do Databricks, com nomes de colunas e tipos de dados específicos. Os dados para este exemplo de código se originam em um arquivo JSON chamado book.json de dentro do GitHub. Para obter este arquivo e upload -lo em seu workspace:

  1. Vá para os livros.JSON no GitHub e use um editor de texto para copiar seu conteúdo para um arquivo chamado books.json em algum lugar da máquina local.

  2. Na barra lateral workspace do Databricks, clique em Catálogo.

  3. Clique em Criar tabela.

  4. Na tab Upload File , solte o books.json arquivo de sua máquina local na caixa Drop files to upload . Ou selecione clique para navegar e navegue até o arquivo books.json de sua máquina local.

Por default, o Databricks upload seu arquivo books.json local para o local DBFS em seu workspace com o caminho /FileStore/tables/books.json.

Não clique em Criar tabela com interface do usuário ou Criar tabela no Notebook. Os exemplos de código neste artigo usam os dados no arquivo upload books.json neste local do DBFS.

Leia os dados JSON em um DataFrame

Use sparklyr::spark_read_json para ler o arquivo JSON upload em um DataFrame, especificando a conexão, o caminho para o arquivo JSON e um nome para a representação da tabela interna dos dados. Para este exemplo, você deve especificar que o arquivo book.json contém várias linhas. A especificação do esquema das colunas aqui é opcional. Caso contrário, sparklyr infere o esquema das colunas por default. Por exemplo, execute o seguinte código em uma célula Notebook para ler os dados do arquivo JSON de upload em um DataFrame chamado jsonDF:

jsonDF <- spark_read_json(
  sc      = sc,
  name    = "jsonTable",
  path    = "/FileStore/tables/books.json",
  options = list("multiLine" = TRUE),
  columns = c(
    author    = "character",
    country   = "character",
    imageLink = "character",
    language  = "character",
    link      = "character",
    pages     = "integer",
    title     = "character",
    year      = "integer"
  )
)

query SQL de execução e gravação e leitura de uma tabela

Você pode usar as funções dplyr para executar query SQL em um DataFrame. Por exemplo, execute o código a seguir em uma célula Notebook para usar dplyr::group_by e dployr::count para obter contagens por autor do DataFrame chamado jsonDF. Use dplyr::arrange e dplyr::desc para classificar o resultado em ordem decrescente por contagem. Em seguida, imprima as primeiras 10 linhas por default.

group_by(jsonDF, author) %>%
  count() %>%
  arrange(desc(n))

# Source:     spark<?> [?? x 2]
# Ordered by: desc(n)
#    author                     n
#    <chr>                  <dbl>
#  1 Fyodor Dostoevsky          4
#  2 Unknown                    4
#  3 Leo Tolstoy                3
#  4 Franz Kafka                3
#  5 William Shakespeare        3
#  6 William Faulkner           2
#  7 Gustave Flaubert           2
#  8 Homer                      2
#  9 Gabriel García Márquez     2
# 10 Thomas Mann                2
# … with more rows
# ℹ Use `print(n = ...)` to see more rows

Você pode usar sparklyr::spark_write_table para gravar o resultado em uma tabela no Databricks. Por exemplo, execute o seguinte código em uma célula Notebook para reexecução da query e, em seguida, grave o resultado em uma tabela chamada json_books_agg:

group_by(jsonDF, author) %>%
  count() %>%
  arrange(desc(n)) %>%
  spark_write_table(
    name = "json_books_agg",
    mode = "overwrite"
  )

Para verificar se a tabela foi criada, você pode usar sparklyr::sdf_sql junto com SparkR::showDF para exibir os dados da tabela. Por exemplo, execute o seguinte código em uma célula Notebook para query a tabela em um DataFrame e, em seguida, use sparklyr::collect para imprimir as primeiras 10 linhas do DataFrame por default:

collect(sdf_sql(sc, "SELECT * FROM json_books_agg"))

# A tibble: 82 × 2
#    author                     n
#    <chr>                  <dbl>
#  1 Fyodor Dostoevsky          4
#  2 Unknown                    4
#  3 Leo Tolstoy                3
#  4 Franz Kafka                3
#  5 William Shakespeare        3
#  6 William Faulkner           2
#  7 Homer                      2
#  8 Gustave Flaubert           2
#  9 Gabriel García Márquez     2
# 10 Thomas Mann                2
# … with 72 more rows
# ℹ Use `print(n = ...)` to see more rows

Você também pode usar sparklyr::spark_read_table para fazer algo semelhante. Por exemplo, execute o seguinte código em uma célula Notebook para query o DataFrame anterior chamado jsonDF em um DataFrame e, em seguida, use sparklyr::collect para imprimir as primeiras 10 linhas do DataFrame por default:

fromTable <- spark_read_table(
  sc   = sc,
  name = "json_books_agg"
)

collect(fromTable)

# A tibble: 82 × 2
#    author                     n
#    <chr>                  <dbl>
#  1 Fyodor Dostoevsky          4
#  2 Unknown                    4
#  3 Leo Tolstoy                3
#  4 Franz Kafka                3
#  5 William Shakespeare        3
#  6 William Faulkner           2
#  7 Homer                      2
#  8 Gustave Flaubert           2
#  9 Gabriel García Márquez     2
# 10 Thomas Mann                2
# … with 72 more rows
# ℹ Use `print(n = ...)` to see more rows

Adicionar colunas e calcular valores de coluna em um DataFrame

Você pode usar funções dplyr para adicionar colunas a DataFrames e compute os valores das colunas.

Por exemplo, execute o código a seguir em uma célula Notebook para obter o conteúdo do DataFrame chamado jsonDF. Use dplyr::mutate para adicionar uma coluna denominada today e preencha essa nova coluna com o carimbo de data/hora atual. Em seguida, grave esse conteúdo em um novo DataFrame chamado withDate e use dplyr::collect para imprimir as primeiras 10 linhas do novo DataFrame por default.

Observação

dplyr::mutate aceita apenas argumentos que estejam em conformidade com as funções integradas do Hive (também conhecidas como UDFs) e as funções agregadas integradas (também conhecidas como UDAFs). Para obter informações gerais, consulte Hive Functions. Para obter informações sobre as funções relacionadas à data nesta seção, consulte Funções de data.

withDate <- jsonDF %>%
  mutate(today = current_timestamp())

collect(withDate)

# A tibble: 100 × 9
#    author    country image…¹ langu…² link  pages title  year today
#    <chr>     <chr>   <chr>   <chr>   <chr> <int> <chr> <int> <dttm>
#  1 Chinua A… Nigeria images… English "htt…   209 Thin…  1958 2022-09-27 21:32:59
#  2 Hans Chr… Denmark images… Danish  "htt…   784 Fair…  1836 2022-09-27 21:32:59
#  3 Dante Al… Italy   images… Italian "htt…   928 The …  1315 2022-09-27 21:32:59
#  4 Unknown   Sumer … images… Akkadi… "htt…   160 The … -1700 2022-09-27 21:32:59
#  5 Unknown   Achaem… images… Hebrew  "htt…   176 The …  -600 2022-09-27 21:32:59
#  6 Unknown   India/… images… Arabic  "htt…   288 One …  1200 2022-09-27 21:32:59
#  7 Unknown   Iceland images… Old No… "htt…   384 Njál…  1350 2022-09-27 21:32:59
#  8 Jane Aus… United… images… English "htt…   226 Prid…  1813 2022-09-27 21:32:59
#  9 Honoré d… France  images… French  "htt…   443 Le P…  1835 2022-09-27 21:32:59
# 10 Samuel B… Republ… images… French… "htt…   256 Moll…  1952 2022-09-27 21:32:59
# … with 90 more rows, and abbreviated variable names ¹​imageLink, ²​language
# ℹ Use `print(n = ...)` to see more rows

Agora use dplyr::mutate para adicionar mais duas colunas ao conteúdo do DataFrame withDate . As novas colunas month e year contêm o mês numérico e o ano da coluna today . Em seguida, grave esse conteúdo em um novo DataFrame chamado withMMyyyy e use dplyr::select junto com dplyr::collect para imprimir as colunas author, title, month e year das dez primeiras linhas do novo DataFrame por default:

withMMyyyy <- withDate %>%
  mutate(month = month(today),
         year  = year(today))

collect(select(withMMyyyy, c("author", "title", "month", "year")))

# A tibble: 100 × 4
#    author                  title                                     month  year
#    <chr>                   <chr>                                     <int> <int>
#  1 Chinua Achebe           Things Fall Apart                             9  2022
#  2 Hans Christian Andersen Fairy tales                                   9  2022
#  3 Dante Alighieri         The Divine Comedy                             9  2022
#  4 Unknown                 The Epic Of Gilgamesh                         9  2022
#  5 Unknown                 The Book Of Job                               9  2022
#  6 Unknown                 One Thousand and One Nights                   9  2022
#  7 Unknown                 Njál's Saga                                   9  2022
#  8 Jane Austen             Pride and Prejudice                           9  2022
#  9 Honoré de Balzac        Le Père Goriot                                9  2022
# 10 Samuel Beckett          Molloy, Malone Dies, The Unnamable, the …     9  2022
# … with 90 more rows
# ℹ Use `print(n = ...)` to see more rows

Agora use dplyr::mutate para adicionar mais duas colunas ao conteúdo do DataFrame withMMyyyy . As novas colunas formatted_date contêm a parte yyyy-MM-dd da coluna today , enquanto a nova coluna day contém o dia numérico da nova coluna formatted_date . Em seguida, grave esse conteúdo em um novo DataFrame chamado withUnixTimestamp e use dplyr::select junto com dplyr::collect para imprimir as colunas title, formatted_date e day das dez primeiras linhas do novo DataFrame por default:

withUnixTimestamp <- withMMyyyy %>%
  mutate(formatted_date = date_format(today, "yyyy-MM-dd"),
         day            = dayofmonth(formatted_date))

collect(select(withUnixTimestamp, c("title", "formatted_date", "day")))

# A tibble: 100 × 3
#    title                                           formatted_date   day
#    <chr>                                           <chr>          <int>
#  1 Things Fall Apart                               2022-09-27        27
#  2 Fairy tales                                     2022-09-27        27
#  3 The Divine Comedy                               2022-09-27        27
#  4 The Epic Of Gilgamesh                           2022-09-27        27
#  5 The Book Of Job                                 2022-09-27        27
#  6 One Thousand and One Nights                     2022-09-27        27
#  7 Njál's Saga                                     2022-09-27        27
#  8 Pride and Prejudice                             2022-09-27        27
#  9 Le Père Goriot                                  2022-09-27        27
# 10 Molloy, Malone Dies, The Unnamable, the trilogy 2022-09-27        27
# … with 90 more rows
# ℹ Use `print(n = ...)` to see more rows

Criar uma exibição temporária

Você pode criar view temporária nomeada na memória baseada em DataFrames existentes. Por exemplo, execute o código a seguir em uma célula Notebook para usar SparkR::createOrReplaceTempView para obter o conteúdo do DataFrame anterior chamado jsonTable e criar uma view temporária dele chamada timestampTable. Em seguida, use sparklyr::spark_read_table para ler o conteúdo da viewtemporária. Use sparklyr::collect para imprimir as primeiras 10 linhas da tabela temporária por default:

createOrReplaceTempView(withTimestampDF, viewName = "timestampTable")

spark_read_table(
  sc = sc,
  name = "timestampTable"
) %>% collect()

# A tibble: 100 × 10
#    author    country image…¹ langu…² link  pages title  year today
#    <chr>     <chr>   <chr>   <chr>   <chr> <int> <chr> <int> <dttm>
#  1 Chinua A… Nigeria images… English "htt…   209 Thin…  1958 2022-09-27 21:11:56
#  2 Hans Chr… Denmark images… Danish  "htt…   784 Fair…  1836 2022-09-27 21:11:56
#  3 Dante Al… Italy   images… Italian "htt…   928 The …  1315 2022-09-27 21:11:56
#  4 Unknown   Sumer … images… Akkadi… "htt…   160 The … -1700 2022-09-27 21:11:56
#  5 Unknown   Achaem… images… Hebrew  "htt…   176 The …  -600 2022-09-27 21:11:56
#  6 Unknown   India/… images… Arabic  "htt…   288 One …  1200 2022-09-27 21:11:56
#  7 Unknown   Iceland images… Old No… "htt…   384 Njál…  1350 2022-09-27 21:11:56
#  8 Jane Aus… United… images… English "htt…   226 Prid…  1813 2022-09-27 21:11:56
#  9 Honoré d… France  images… French  "htt…   443 Le P…  1835 2022-09-27 21:11:56
# 10 Samuel B… Republ… images… French… "htt…   256 Moll…  1952 2022-09-27 21:11:56
# … with 90 more rows, 1 more variable: month <chr>, and abbreviated variable
#   names ¹​imageLink, ²​language
# ℹ Use `print(n = ...)` to see more rows, and `colnames()` to see all variable names

Realizar análise estatística em um DataFrame

Você pode usar sparklyr junto com dplyr para análises estatísticas.

Por exemplo, crie um DataFrame para as estatísticas de execução. Para fazer isso, execute o código a seguir em uma célula Notebook para usar sparklyr::sdf_copy_to para gravar o conteúdo do dataset iris que está integrado ao R em um DataFrame chamado iris. Use sparklyr::sdf_collect para imprimir as primeiras 10 linhas da tabela temporária por default:

irisDF <- sdf_copy_to(
  sc        = sc,
  x         = iris,
  name      = "iris",
  overwrite = TRUE
)

sdf_collect(irisDF, "row-wise")

# A tibble: 150 × 5
#    Sepal_Length Sepal_Width Petal_Length Petal_Width Species
#           <dbl>       <dbl>        <dbl>       <dbl> <chr>
#  1          5.1         3.5          1.4         0.2 setosa
#  2          4.9         3            1.4         0.2 setosa
#  3          4.7         3.2          1.3         0.2 setosa
#  4          4.6         3.1          1.5         0.2 setosa
#  5          5           3.6          1.4         0.2 setosa
#  6          5.4         3.9          1.7         0.4 setosa
#  7          4.6         3.4          1.4         0.3 setosa
#  8          5           3.4          1.5         0.2 setosa
#  9          4.4         2.9          1.4         0.2 setosa
# 10          4.9         3.1          1.5         0.1 setosa
# … with 140 more rows
# ℹ Use `print(n = ...)` to see more rows

Agora use dplyr::group_by para agrupar linhas pela coluna Species . Use dplyr::summarize juntamente com dplyr::percentile_approx para calcular estatísticas de resumo pelos 25º, 50º, 75º e 100º quantis da coluna Sepal_Length por Species. Use sparklyr::collect para imprimir os resultados:

Observação

dplyr::summarize aceita apenas argumentos que estejam em conformidade com as funções integradas do Hive (também conhecidas como UDFs) e as funções agregadas integradas (também conhecidas como UDAFs). Para obter informações gerais, consulte Hive Functions. Para obter informações sobre percentile_approx, consulte Funções agregadas integradas (UDAF)).

quantileDF <- irisDF %>%
  group_by(Species) %>%
  summarize(
    quantile_25th = percentile_approx(
      Sepal_Length,
      0.25
    ),
    quantile_50th = percentile_approx(
      Sepal_Length,
      0.50
    ),
    quantile_75th = percentile_approx(
      Sepal_Length,
      0.75
    ),
    quantile_100th = percentile_approx(
      Sepal_Length,
      1.0
    )
  )

collect(quantileDF)

# A tibble: 3 × 5
#   Species    quantile_25th quantile_50th quantile_75th quantile_100th
#   <chr>              <dbl>         <dbl>         <dbl>          <dbl>
# 1 virginica            6.2           6.5           6.9            7.9
# 2 versicolor           5.6           5.9           6.3            7
# 3 setosa               4.8           5             5.2            5.8

Resultados semelhantes podem ser calculados, por exemplo, usando sparklyr::sdf_quantile:

print(sdf_quantile(
  x = irisDF %>%
    filter(Species == "virginica"),
  column = "Sepal_Length",
  probabilities = c(0.25, 0.5, 0.75, 1.0)
))

# 25%  50%  75% 100%
# 6.2  6.5  6.9  7.9