tutorial: Carga e transformação de dados usando Apache Spark DataFrames
Este tutorial mostra aos senhores como carregar e transformar dados usando o Apache Spark Python (PySpark) DataFrame API, o Apache Spark Scala DataFrame API e o SparkR SparkDataFrame API em Databricks.
Ao final deste tutorial, o senhor entenderá o que é um DataFrame e estará familiarizado com a seguinte tarefa:
Consulte também a referência da API do Apache Spark PySpark.
Veja também a referência da API Apache Spark Scala.
O que é um DataFrame?
Um DataFrame é uma estrutura de dados bidimensional com rótulos e colunas de tipos variados.Você pode pensar em um DataFrame como uma planilha, uma tabela SQL ou um dicionário de objetos em série. Os DataFrames do Apache Spark oferecem um conjunto abrangente de funções (selecionar colunas, filtrar, unir, agregar) que permitem a você resolver eficientemente problemas comuns de análise de dados.
Os DataFrames do Apache Spark são uma abstração criada sobre os Resilient Distributed Datasets (RDDs).Os DataFrames do Spark e o Spark SQL utilizam um mecanismo de planejamento e otimização unificado, permitindo que você obtenha desempenho quase idêntico em todas as linguagens compatíveis com o o Databricks (Python, SQL, Scala e R).
Requisitos
Para concluir o tutorial a seguir, o senhor deve atender aos seguintes requisitos:
Para usar os exemplos neste tutorial, seu workspace deve ter o Unity Catalog habilitado.
Os exemplos deste tutorial usam um volume do Unity Catalog para armazenar dados de amostra. Para usar esses exemplos, crie um volume e use o catálogo, o esquema e os nomes de volume desse volume para definir o caminho do volume usado pelos exemplos.
O senhor deve ter as seguintes permissões no Unity Catalog:
READ VOLUME
eWRITE VOLUME
, ouALL PRIVILEGES
para o volume usado neste tutorial.USE SCHEMA
ouALL PRIVILEGES
para o esquema usado neste tutorial.USE CATALOG
ouALL PRIVILEGES
para o catálogo usado neste tutorial.
Para definir essas permissões, consulte o administrador do Databricks ou os privilégios e objetos protegidos do Unity Catalog.
Dica
Para obter um Notebook completo para este artigo, consulte DataFrame tutorial Notebook.
o passo 1: Definir variáveis e carregar o arquivo CSV
Este passo define as variáveis a serem usadas neste tutorial e, em seguida, carrega um arquivo CSV contendo dados de nomes de bebês do health.data.ny.gov no volume Unity Catalog.
Abra um novo Notebook clicando no ícone . Para saber como navegar pelo Databricks Notebook, consulte Databricks Notebook interface e controles.
Copie e cole o código a seguir na nova célula vazia do site Notebook. Substitua
<catalog-name>
,<schema-name>
e<volume-name>
pelos nomes de catálogo, esquema e volume de um volume do Unity Catalog. Substitua<table_name>
por um nome de tabela de sua escolha. O senhor carregará os dados do nome do bebê nessa tabela mais adiante neste tutorial.catalog = "<catalog_name>" schema = "<schema_name>" volume = "<volume_name>" download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" file_name = "rows.csv" table_name = "<table_name>" path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume path_tables = catalog + "." + schema print(path_tables) # Show the complete path print(path_volume) # Show the complete path
val catalog = "<catalog_name>" val schema = "<schema_name>" val volume = "<volume_name>" val download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" val file_name = "rows.csv" val table_name = "<table_name>" val path_volume = s"/Volumes/$catalog/$schema/$volume" val path_tables = s"$catalog.$schema.$table_name" print(path_volume) // Show the complete path print(path_tables) // Show the complete path
catalog <- "<catalog_name>" schema <- "<schema_name>" volume <- "<volume_name>" download_url <- "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" file_name <- "rows.csv" table_name <- "<table_name>" path_volume <- paste("/Volumes/", catalog, "/", schema, "/", volume, sep = "") path_tables <- paste(catalog, ".", schema, sep = "") print(path_volume) # Show the complete path print(path_tables) # Show the complete path
Pressione
Shift+Enter
para executar a célula e criar uma nova célula em branco.Copie e cole o código a seguir na nova célula vazia do site Notebook. Esse código copia o arquivo
rows.csv
de health.data.ny.gov para o volume do Unity Catalog usando o comando Databricks dbutuils.dbutils.fs.cp(f"{download_url}", f"{path_volume}" + "/" + f"{file_name}")
dbutils.fs.cp(download_url, s"$path_volume/$file_name")
dbutils.fs.cp(download_url, paste(path_volume, "/", file_name, sep = ""))
Pressione
Shift+Enter
para executar a célula e depois passe para a próxima célula.
o passo 2: Criar um DataFrame
Este passo cria um DataFrame chamado df1
com dados de teste e, em seguida, exibe seu conteúdo.
Copie e cole o código a seguir na nova célula vazia do site Notebook. Esse código cria o DataFrame com dados de teste e, em seguida, exibe o conteúdo e o esquema do DataFrame.
data = [[2021, "test", "Albany", "M", 42]] columns = ["Year", "First_Name", "County", "Sex", "Count"] df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int") display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization. # df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
val data = Seq((2021, "test", "Albany", "M", 42)) val columns = Seq("Year", "First_Name", "County", "Sex", "Count") val df1 = data.toDF(columns: _*) display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization. // df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
# Load the SparkR package that is already preinstalled on the cluster. library(SparkR) data <- data.frame( Year = c(2021), First_Name = c("test"), County = c("Albany"), Sex = c("M"), Count = c(42) ) df1 <- createDataFrame(data) display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization. # head(df1) The head() method is a part of the Apache SparkR DataFrame API and provides basic visualization.
Pressione
Shift+Enter
para executar a célula e depois passe para a próxima célula.
o passo 3: Carregar dados em um DataFrame a partir do arquivo CSV
Esse passo cria um DataFrame chamado df_csv
a partir do arquivo CSV que o senhor carregou anteriormente no volume Unity Catalog. Consulte spark.read.csv.
Copie e cole o código a seguir na nova célula vazia do site Notebook. Esse código carrega os dados do nome do bebê no DataFrame
df_csv
do arquivo CSV e, em seguida, exibe o conteúdo do DataFrame.df_csv = spark.read.csv(f"{path_volume}/{file_name}", header=True, inferSchema=True, sep=",") display(df_csv)
val df_csv = spark.read .option("header", "true") .option("inferSchema", "true") .option("delimiter", ",") .csv(s"$path_volume/$file_name") display(df_csv)
df_csv <- read.df(paste(path_volume, "/", file_name, sep=""), source="csv", header = TRUE, inferSchema = TRUE, delimiter = ",") display(df_csv)
Pressione
Shift+Enter
para executar a célula e depois passe para a próxima célula.
Você pode enviar dados de vários formatos de arquivo compatíveis.
o passo 4: view e interagir com o senhor DataFrame
view e interagir com os nomes dos bebês DataFrames usando os seguintes métodos.
Imprimir o esquema do DataFrame
Saiba como exibir o esquema de um Apache Spark DataFrame. O Apache Spark usa o termo esquema para se referir aos nomes e tipos de dados das colunas no DataFrame.
Observação
Databricks também usa o termo esquema para descrever uma coleção de tabelas registradas em um catálogo.
Copie e cole o código a seguir em uma célula vazia do site Notebook. Este código mostra o esquema de seu DataFrames com o método
.printSchema()
para view os esquemas dos dois DataFrames - para preparar a união dos dois DataFrames.df_csv.printSchema() df1.printSchema()
df_csv.printSchema() df1.printSchema()
printSchema(df_csv) printSchema(df1)
Pressione
Shift+Enter
para executar a célula e depois passe para a próxima célula.
Renomear coluna no DataFrame
Saiba como renomear uma coluna em um DataFrame.
Copie e cole o código a seguir em uma célula vazia do site Notebook. Esse código renomeia uma coluna no
df1_csv
DataFrame para corresponder à respectiva coluna nodf1
DataFrame. DataFrame. Esse código usa o método Apache SparkwithColumnRenamed()
.df_csv = df_csv.withColumnRenamed("First Name", "First_Name") df_csv.printSchema
val df_csvRenamed = df_csv.withColumnRenamed("First Name", "First_Name") // when modifying a DataFrame in Scala, you must assign it to a new variable df_csv_renamed.printSchema()
df_csv <- withColumnRenamed(df_csv, "First Name", "First_Name") printSchema(df_csv)
Pressione
Shift+Enter
para executar a célula e depois passe para a próxima célula.
Combinar DataFrames
Saiba como criar um novo DataFrame que adiciona as linhas de um DataFrame a outro.
Copie e cole o código a seguir em uma célula vazia do site Notebook. Esse código usa o método Apache Spark
union()
para combinar o conteúdo do primeiro DataFramedf
com o DataFramedf_csv
que contém os dados dos nomes dos bebês carregados do arquivo CSV.df = df1.union(df_csv) display(df)
val df = df1.union(df_csv_renamed) display(df)
display(df <- union(df1, df_csv))
Pressione
Shift+Enter
para executar a célula e depois passe para a próxima célula.
Filtrar linhas em um DataFrame
Descubra os nomes de bebês mais populares em seu conjunto de dados filtrando as linhas, usando os métodos .filter()
ou .where()
do Apache Spark. Use a filtragem para selecionar um subconjunto de linhas para retornar ou modificar em um DataFrame. Não há diferença no desempenho ou na sintaxe, como visto nos exemplos a seguir.
Usando .filter() método
Copie e cole o código a seguir em uma célula vazia do site Notebook. Esse código usa o método
.filter()
do Apache Spark para exibir as linhas no DataFrame com uma contagem superior a 50.display(df.filter(df["Count"] > 50))
display(df.filter(df("Count") > 50))
display(filteredDF <- filter(df, df$Count > 50))
Pressione
Shift+Enter
para executar a célula e depois passe para a próxima célula.
Usando .where() método
Copie e cole o código a seguir em uma célula vazia do site Notebook. Esse código usa o método
.where()
do Apache Spark para exibir as linhas no DataFrame com uma contagem superior a 50.display(df.where(df["Count"] > 50))
display(df.where(df("Count") > 50))
display(filtered_df <- where(df, df$Count > 50))
Pressione
Shift+Enter
para executar a célula e depois passe para a próxima célula.
Selecionar colunas de um DataFrame e ordená-las por frequência
Saiba qual é a frequência do nome do bebê com o método select()
para especificar as colunas do DataFrame a serem retornadas. Use as funções orderby
e desc
do Apache Spark para ordenar os resultados.
O módulo PySpark.sql para Apache Spark oferece suporte às funções de SQL. Entre essas funções que usamos neste tutorial estão as funções orderBy()
, desc()
e expr()
do Apache Spark. O senhor habilita o uso dessas funções importando-as para sua sessão conforme necessário.
Copie e cole o código a seguir em uma célula vazia do site Notebook. Esse código importa a função
desc()
e, em seguida, usa o métodoselect()
do Apache Spark e as funçõesorderBy()
edesc()
do Apache Spark para exibir os nomes mais comuns e suas contagens em ordem decrescente.from pyspark.sql.functions import desc display(df.select("First_Name", "Count").orderBy(desc("Count")))
import org.apache.spark.sql.functions.desc display(df.select("First_Name", "Count").orderBy(desc("Count")))
display(arrange(select(df, df$First_Name, df$Count), desc(df$Count)))
Pressione
Shift+Enter
para executar a célula e depois passe para a próxima célula.
Criar um DataFrame de subconjunto
Saiba como criar um DataFrame de subconjunto a partir de um DataFrame existente.
Copie e cole o código a seguir em uma célula vazia do site Notebook. Esse código usa o método
filter
do Apache Spark para criar um novo DataFrame restringindo os dados por ano, contagem e sexo. Ele usa o método Apache Sparkselect()
para limitar as colunas. Ele também usa as funçõesorderBy()
edesc()
do Apache Spark para classificar o novo DataFrame por contagem.subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count")) display(subsetDF)
val subsetDF = df.filter((df("Year") == 2009) && (df("Count") > 100) && (df("Sex") == "F")).select("First_Name", "County", "Count").orderBy(desc("Count")) display(subsetDF)
subsetDF <- select(filter(df, (df$Count > 100) & (df$year == 2009) & df["Sex"] == "F")), "First_Name", "County", "Count") display(subsetDF)
Pressione
Shift+Enter
para executar a célula e depois passe para a próxima célula.
o passo 5: Salvar o DataFrame
Saiba como salvar um DataFrame,. O senhor pode salvar seu DataFrame em uma tabela ou gravar o DataFrame em um arquivo ou em vários arquivos.
Salvar o DataFrame em uma tabela
Databricks usa o formato Delta Lake para todas as tabelas por default. Para salvar o DataFrame, o senhor deve ter privilégios de tabela CREATE
no catálogo e no esquema.
Copie e cole o código a seguir em uma célula vazia do site Notebook. Esse código salva o conteúdo do site DataFrame em uma tabela usando a variável que o senhor definiu no início deste site tutorial.
df.write.saveAsTable(f"{path_tables}" + "." + f"{table_name}") # To overwrite an existing table, use the following code: # df.write.mode("overwrite").saveAsTable(f"{path_tables}" + "." + f"{table_name}")
df.write.saveAsTable(s"$path_tables" + "." + s"$table_name") // To overwrite an existing table, use the following code: // df.write.mode("overwrite").saveAsTable(s"$tables" + "." + s"$table_name")
saveAsTable(df, paste(path_tables, ".", table_name)) # To overwrite an existing table, use the following code: # saveAsTable(df, paste(path_tables, ".", table_name), mode = "overwrite")
Pressione
Shift+Enter
para executar a célula e depois passe para a próxima célula.
A maioria dos Apache aplicativos Spark trabalha com grandes conjuntos de dados e de forma distribuída. O Apache Spark grava um diretório de arquivos em vez de um único arquivo. O Delta Lake divide as pastas e os arquivos do Parquet. Muitos sistemas de dados podem ler esses diretórios de arquivos. A Databricks recomenda o uso de tabelas em vez de caminhos de arquivos para a maioria dos aplicativos.
Salvar o DataFrame em arquivos JSON
Copie e cole o código a seguir em uma célula vazia do site Notebook. Esse código salva o DataFrame em um diretório de arquivos JSON.
df.write.format("json").save("/tmp/json_data") # To overwrite an existing file, use the following code: # df.write.format("json").mode("overwrite").save("/tmp/json_data")
df.write.format("json").save("/tmp/json_data") // To overwrite an existing file, use the following code: // df.write.format("json").mode("overwrite").save("/tmp/json_data")
write.df(df, path = "/tmp/json_data", source = "json") # To overwrite an existing file, use the following code: # write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")
Pressione
Shift+Enter
para executar a célula e depois passe para a próxima célula.
Ler o DataFrame de um arquivo JSON
Saiba como usar o método Apache Spark spark.read.format()
para read.json dados de um diretório em um DataFrame.
Copie e cole o código a seguir em uma célula vazia do site Notebook. Esse código exibe os arquivos JSON que o senhor salvou no exemplo anterior.
display(spark.read.format("json").json("/tmp/json_data"))
display(spark.read.format("json").json("/tmp/json_data"))
display(read.json("/tmp/json_data"))
Pressione
Shift+Enter
para executar a célula e depois passe para a próxima célula.
Tarefa adicional: execução SQL consultas em PySpark, Scala, e R
Os DataFrames do Apache Spark oferecem as seguintes opções para combinar SQL com PySpark, Scala e R. O senhor pode executar o seguinte código no mesmo Notebook que criou para este tutorial.
Especificar uma coluna como uma consulta SQL
Saiba como usar o método Apache Spark selectExpr()
. Essa é uma variante do método select()
que aceita expressões SQL e retorna um DataFrame atualizado. Esse método permite que o senhor use uma expressão SQL, como upper
.
Copie e cole o código a seguir em uma célula vazia do site Notebook. Esse código usa o método Apache Spark
selectExpr()
e a expressão SQLupper
para converter uma coluna de strings em maiúsculas (e renomear a coluna).display(df.selectExpr("Count", "upper(County) as big_name"))
display(df.selectExpr("Count", "upper(County) as big_name"))
display(df_selected <- selectExpr(df, "Count", "upper(County) as big_name"))
Pressione
Shift+Enter
para executar a célula e depois passe para a próxima célula.
Use expr()
para usar a sintaxe SQL para uma coluna
Saiba como importar e usar a função expr()
do Apache Spark para usar a sintaxe SQL em qualquer lugar onde uma coluna seria especificada.
Copie e cole o código a seguir em uma célula vazia do site Notebook. Esse código importa a função
expr()
e, em seguida, usa a função Apache Sparkexpr()
e a expressão SQLlower
para converter uma coluna de strings em minúsculas (e renomear a coluna).from pyspark.sql.functions import expr display(df.select("Count", expr("lower(County) as little_name")))
import org.apache.spark.sql.functions.{col, expr} // Scala requires us to import the col() function as well as the expr() function display(df.select(col("Count"), expr("lower(County) as little_name")))
display(df_selected <- selectExpr(df, "Count", "lower(County) as little_name")) # expr() function is not supported in R, selectExpr in SparkR replicates this functionality
Pressione
Shift+Enter
para executar a célula e depois passe para a próxima célula.
execução de uma consulta arbitrária SQL usando spark.sql() função
Saiba como usar a função Apache Spark spark.sql()
para executar consultas SQL arbitrárias.
Copie e cole o código a seguir em uma célula vazia do site Notebook. Esse código usa a função
spark.sql()
do Apache Spark para consultar uma tabela SQL usando a sintaxe SQL.display(spark.sql(f"SELECT * FROM {path_tables}" + "." + f"{table_name}"))
display(spark.sql(s"SELECT * FROM $path_tables.$table_name"))
display(sql(paste("SELECT * FROM", path_tables, ".", table_name)))
Pressione
Shift+Enter
para executar a célula e depois passe para a próxima célula.