Tutorial: Carregamento e transformação de dados usando Apache Spark DataFrames
Este tutorial mostra aos senhores como carregar e transformar dados usando o Apache Spark Python (PySpark) DataFrame API, o Apache Spark Scala DataFrame API e o SparkR SparkDataFrame API em Databricks.
Se você estiver usando a versão gratuitaDatabricks, selecione a tab Python para ver todos os exemplos de código neste tutorial. A versão gratuita não oferece suporte a R ou Scala. Além disso, a Edição Gratuita restringe o acesso à internet de saída, portanto, você deve upload o arquivo CSV usando a interface do usuário workspace em vez de baixá-lo com código. Consulte o passo 1 para obter instruções detalhadas.
Ao final deste tutorial, você entenderá o que é um DataFrame e conhecerá as seguintes tarefas:
- Python
- Scala
- R
- Definir variáveis e copiar dados públicos para um volume do Unity Catalog
- Criar um DataFrame com o Python
- Carregar dados em um DataFrame de um arquivo CSV
- Ver e interagir com um DataFrame
- Salvar o DataFrame
- Executar queries SQL no PySpark
Consulte também a referência da API do Apache Spark PySpark.
- Definir variáveis e copiar dados públicos para um volume do Unity Catalog
- Criar um DataFrame com Scala
- Carregar dados em um DataFrame de um arquivo CSV
- Visualizar e interagir com um DataFrame
- Salvar o DataFrame
- Executar consultas SQL no Apache Spark
Consulte também a referência da API Apache Spark Scala.
O que é um DataFrame?
DataFrame é uma estrutura de dados bidimensional rotulada com colunas de tipos variados. Imagine o DataFrame como uma planilha, uma tabela SQL ou um dicionário de objetos em série. Os DataFrames do Apache Spark oferecem um conjunto abrangente de funções (selecionar, filtrar, unir, agregar colunas) que permitem que você resolva problemas comuns de análise de dados de forma simples.
Os DataFrames do Apache Spark são uma abstração criada sobre os Resilient Distributed Datasets (RDDs). Os DataFrames do Spark e o Spark SQL utilizam um mecanismo de planejamento e otimização unificado, permitindo que você tenha um desempenho quase idêntico em todas as linguagens compatíveis com o o Databricks (Python, SQL, Scala e R).
Requisitos
Para concluir o tutorial a seguir, você precisa atender aos seguintes requisitos:
-
Para usar os exemplos deste tutorial, Unity Catalog deve estar ativado no seu workspace . A edição gratuita Databricks e o espaço de trabalho de avaliação gratuita têm Unity Catalog ativado por default.
-
Os exemplos neste tutorial usam um volume Unity Catalog para armazenar dados de amostra. Para usar esses exemplos, crie um volume e use o catálogo, o esquema e os nomes de volume desse volume para definir o caminho do volume usado pelos exemplos. Os usuários da Edição Gratuita têm acesso ao catálogo workspace e ao esquema
defaultpor default. -
O senhor deve ter as seguintes permissões no Unity Catalog:
READ VOLUMEeWRITE VOLUMEpara o volume usado neste tutorial.USE SCHEMApara o esquema usado neste tutorialUSE CATALOGpara o catálogo usado neste tutorial
Para definir essas permissões, consulte os privilégios de administrador Databricks ou Unity Catalog e os objetos protegíveis. Os usuários da Edição Gratuita têm esses privilégios no catálogo workspace e no esquema
defaultpor default.
Para obter um Notebook completo para este artigo, consulte DataFrame tutorial Notebook.
Etapa 1: Definir variáveis e carregar o arquivo CSV
Este passo define variáveis para uso neste tutorial e, em seguida, carrega um arquivo CSV contendo dados de nomes de bebês do site health.data.ny.gov em seu volume Unity Catalog . Você precisa dos nomes do catálogo, do esquema e do volume do Unity Catalog.
Se você não souber os nomes do seu catálogo e esquema, clique aqui. Catálogo na barra lateral. O catálogo workspace compartilha o mesmo nome que o seu workspace e está listado no painel de catálogo. Expanda para ver os esquemas disponíveis. Os usuários da Edição Gratuita e da versão de avaliação gratuita podem usar o catálogo workspace e o esquema
default .
Se você não tiver um volume, crie um executando o seguinte comando em uma célula do Notebook (substitua <catalog_name> e <schema_name> pelos seus valores):
CREATE VOLUME IF NOT EXISTS <catalog_name>.<schema_name>.my_volume
-
Abra um novo Notebook clicando no ícone
. Para saber como navegar pelo Databricks Notebook, consulte Personalizar a aparência do Notebook.
-
Copie e cole o seguinte código em uma nova célula vazia do Notebook. Substitua
<catalog-name>,<schema-name>e<volume-name>pelos nomes de catálogo, esquema e volume para um volume Unity Catalog . Substitua<table_name>pelo nome da tabela de sua escolha. Você irá inserir os dados com os nomes dos bebês nesta tabela mais adiante neste tutorial.
- Python
- Scala
- R
catalog = "<catalog_name>"
schema = "<schema_name>"
volume = "<volume_name>"
download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
file_name = "rows.csv"
table_name = "<table_name>"
path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume
path_table = catalog + "." + schema
print(path_table) # Show the complete path
print(path_volume) # Show the complete path
val catalog = "<catalog_name>"
val schema = "<schema_name>"
val volume = "<volume_name>"
val downloadUrl = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
val fileName = "rows.csv"
val tableName = "<table_name>"
val pathVolume = s"/Volumes/$catalog/$schema/$volume"
val pathTable = s"$catalog.$schema"
print(pathVolume) // Show the complete path
print(pathTable) // Show the complete path
catalog <- "<catalog_name>"
schema <- "<schema_name>"
volume <- "<volume_name>"
download_url <- "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
file_name <- "rows.csv"
table_name <- "<table_name>"
path_volume <- paste("/Volumes/", catalog, "/", schema, "/", volume, sep = "")
path_table <- paste(catalog, ".", schema, sep = "")
print(path_volume) # Show the complete path
print(path_table) # Show the complete path
-
Pressione
Shift+Enterpara executar a célula e criar uma nova célula em branco. -
Carregue o arquivo CSV em seu volume. Escolha um dos seguintes métodos:
- Carregar usando a interface do usuário workspace — Use este método se estiver usando a Edição GratuitaDatabricks ou se o download do código na opção B falhar devido a um erro de rede. A versão gratuita e outros ambientes compute serverless restringem o acesso à internet, portanto, você deve upload o arquivo do seu computador local.
- Baixar usando código — Use este método se o seu ambiente compute tiver acesso à internet de saída.
Opção A: fazer o upload usando a interface do usuário workspace
- Em seu computador, abra o arquivo health.data.ny.gov/api/views/jxy9-yhdk/rows.csv em seu navegador. O arquivo downloads para o seu computador como
rows.csv, que corresponde à variávelfile_namedefinida anteriormente. - Volte ao seu workspace Databricks . Na barra lateral, clique
Novo > Adicionar ou upload dados .
- Clique em " Carregar arquivos para um volume" .
- Clique em procurar e selecione o arquivo
rows.csvou arraste e solte-o na área de upload. - Em Volume de destino , selecione o volume que você especificou acima.
- Após a conclusão upload , retorne ao seu Notebook e continue com o passo 2.
Para obter mais detalhes sobre como carregar arquivos, consulte Carregar arquivos para um volume Unity Catalog.
Opção B: baixar usando o código
Copie e cole o seguinte código em uma nova célula vazia do Notebook. Este código copia o arquivo
rows.csvde health.data.ny.gov para o seu volume Unity Catalog usando o comando dbutilsDatabricks . PressioneShift+Enterpara executar a célula e depois passe para a próxima célula.
- Python
- Scala
- R
dbutils.fs.cp(f"{download_url}", f"{path_volume}/{file_name}")
dbutils.fs.cp(downloadUrl, s"$pathVolume/$fileName")
dbutils.fs.cp(download_url, paste(path_volume, "/", file_name, sep = ""))
Etapa 2: Criar um DataFrame
Esta etapa cria um DataFrame chamado df1 com dados de teste e exibe o conteúdo dele.
- Copie e cole o código a seguir na nova célula vazia do notebook. Esse código cria o DataFrame com dados de teste e, em seguida, exibe o conteúdo e o esquema do DataFrame.
- Python
- Scala
- R
data = [[2021, "test", "Albany", "M", 42]]
columns = ["Year", "First_Name", "County", "Sex", "Count"]
df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int")
display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
# df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
val data = Seq((2021, "test", "Albany", "M", 42))
val columns = Seq("Year", "First_Name", "County", "Sex", "Count")
val df1 = data.toDF(columns: _*)
display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization.
// df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
# Load the SparkR package that is already preinstalled on the cluster.
library(SparkR)
data <- data.frame(
Year = as.integer(c(2021)),
First_Name = c("test"),
County = c("Albany"),
Sex = c("M"),
Count = as.integer(c(42))
)
df1 <- createDataFrame(data)
display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
# head(df1) The head() method is a part of the Apache SparkR DataFrame API and provides basic visualization.
- Pressione
Shift+Enterpara executar a célula e depois passar para a próxima célula.
Etapa 3: Carregar dados em um DataFrame a partir de um arquivo CSV
Esta etapa cria um DataFrame chamado df_csv do arquivo CSV que você carregou anteriormente em seu volume do Unity Catalog. Consulte spark.read.csv.
- Copie e cole o código a seguir na nova célula vazia do notebook. Esse código carrega dados de nomes de bebês no DataFrame
df_csvdo arquivo CSV e exibe o conteúdo do DataFrame.
- Python
- Scala
- R
df_csv = spark.read.csv(f"{path_volume}/{file_name}",
header=True,
inferSchema=True,
sep=",")
display(df_csv)
val dfCsv = spark.read
.option("header", "true")
.option("inferSchema", "true")
.option("delimiter", ",")
.csv(s"$pathVolume/$fileName")
display(dfCsv)
df_csv <- read.df(paste(path_volume, "/", file_name, sep=""),
source="csv",
header = TRUE,
inferSchema = TRUE,
delimiter = ",")
display(df_csv)
- Pressione
Shift+Enterpara executar a célula e depois passar para a próxima célula.
Você pode carregar dados de vários formatos de arquivo compatíveis.
Etapa 4: visualize e interaja com seu DataFrame
Veja e interaja com os DataFrames de nomes de bebês usando os seguintes métodos.
Imprimir o esquema do DataFrame
Aprenda como exibir o esquema de um DataFrame do Apache Spark. O Apache Spark utiliza o termo esquema para se referir aos nomes e tipos de dados das colunas no DataFrame.
Databricks usa também o termo esquema para descrever uma coleção de tabelas registradas em um catálogo.
- Copie e cole o código a seguir em uma célula vazia do notebook. Esse código exibe o esquema dos seus DataFrames com o método
.printSchema()para ver os esquemas dos dois DataFrames, para preparação para unir os dois DataFrames.
- Python
- Scala
- R
df_csv.printSchema()
df1.printSchema()
dfCsv.printSchema()
df1.printSchema()
printSchema(df_csv)
printSchema(df1)
- Pressione
Shift+Enterpara executar a célula e depois passar para a próxima célula.
Renomear coluna no DataFrame
Aprenda a renomear uma coluna em um DataFrame.
- Copie e cole o código a seguir em uma célula vazia do notebook. Esse código renomeia uma coluna no DataFrame
df1_csvpara corresponder à respectiva coluna no DataFramedf1. Esse código usa o métodowithColumnRenamed()do Apache Spark.
- Python
- Scala
- R
df_csv = df_csv.withColumnRenamed("First Name", "First_Name")
df_csv.printSchema()
val dfCsvRenamed = dfCsv.withColumnRenamed("First Name", "First_Name")
// when modifying a DataFrame in Scala, you must assign it to a new variable
dfCsvRenamed.printSchema()
df_csv <- withColumnRenamed(df_csv, "First Name", "First_Name")
printSchema(df_csv)
- Pressione
Shift+Enterpara executar a célula e depois passar para a próxima célula.
Combinar DataFrames
Saiba como criar um novo DataFrame que adiciona as linhas de um DataFrame a outro.
- Copie e cole o código a seguir em uma célula vazia do notebook. Este código usa o método
union()do Apache Spark para combinar o conteúdo do seu primeiro DataFramedfcom o DataFramedf_csvcontendo os dados de nomes de bebês carregados do arquivo CSV.
- Python
- Scala
- R
df = df1.union(df_csv)
display(df)
val df = df1.union(dfCsvRenamed)
display(df)
display(df <- union(df1, df_csv))
- Pressione
Shift+Enterpara executar a célula e depois passar para a próxima célula.
Filtrar linhas em um DataFrame
Descubra os nomes de bebês mais populares em seu conjunto de dados filtrando as linhas com os métodos .filter() ou .where() do Apache Spark. Use a filtragem para selecionar um subconjunto de linhas a serem retornadas ou modificadas em um DataFrame. Não há diferença no desempenho ou sintaxe, como visto nos exemplos a seguir.
Usando .filter () método
- Copie e cole o seguinte código em uma célula vazia do Notebook. Este código usa o método
.filter()do Apache Spark para exibir as linhas no DataFrame com uma contagem maior que 50.
- Python
- Scala
- R
display(df.filter(df["Count"] > 50))
display(df.filter(df("Count") > 50))
display(filteredDF <- filter(df, df$Count > 50))
- Pressione
Shift+Enterpara executar a célula e depois passar para a próxima célula.
Usando .where () método
- Copie e cole o seguinte código em uma célula vazia do Notebook. Este código usa o método
.where()do Apache Spark para exibir as linhas no DataFrame com uma contagem maior que 50.
- Python
- Scala
- R
display(df.where(df["Count"] > 50))
display(df.where(df("Count") > 50))
display(filtered_df <- where(df, df$Count > 50))
- Pressione
Shift+Enterpara executar a célula e depois passar para a próxima célula.
Selecionar colunas de um DataFrame e ordená-las por frequência
Aprenda a frequência de nomes de bebês com o método select() para especificar as colunas do DataFrame a serem retornadas. Use as funções orderby e desc do Apache Spark para ordenar os resultados.
O módulo PySpark.sql para Apache Spark oferece suporte a funções SQL . Entre essas funções que usamos neste tutorial estão as funções orderBy(), desc() e expr() do Apache Spark. Você habilita o uso dessas funções importando-as para sua sessão conforme necessário.
- Copie e cole o código a seguir em uma célula vazia do notebook. Esse código importa a função
desc()e, em seguida, usa o métodoselect()do Apache Spark e as funçõesorderBy()edesc()do Apache Spark para exibir os nomes mais comuns e suas contagens em ordem decrescente.
- Python
- Scala
- R
from pyspark.sql.functions import desc
display(df.select("First_Name", "Count").orderBy(desc("Count")))
import org.apache.spark.sql.functions.desc
display(df.select("First_Name", "Count").orderBy(desc("Count")))
display(arrange(select(df, df$First_Name, df$Count), desc(df$Count)))
- Pressione
Shift+Enterpara executar a célula e depois passar para a próxima célula.
Criar um subconjunto DataFrame
Aprenda como criar um DataFrame de subconjunto com um DataFrame existente.
- Copie e cole o código a seguir em uma célula vazia do notebook. Esse código usa o método
filterdo Apache Spark para criar um novo DataFrame restringindo os dados por ano, contagem e sexo. Ele usa o métodoselect()do Apache Spark para limitar as colunas. Usa também as funçõesorderBy()edesc()do Apache Spark para classificar o novo DataFrame por contagem.
- Python
- Scala
- R
subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
display(subsetDF)
val subsetDF = df.filter((df("Year") === 2009) && (df("Count") > 100) && (df("Sex") === "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
display(subsetDF)
subsetDF <- select(filter(df, (df$Count > 100) & (df$year == 2009) & df["Sex"] == "F")), "First_Name", "County", "Count")
display(subsetDF)
- Pressione
Shift+Enterpara executar a célula e depois passar para a próxima célula.
Etapa 5: Salvar o DataFrame
Aprenda a salvar um DataFrame,. O senhor pode salvar seu DataFrame em uma tabela ou gravar o DataFrame em um arquivo ou em vários arquivos.
Salvar o DataFrame em uma tabela
O Databricks usa o formato Delta Lake para todas as tabelas por padrão. Para salvar seu DataFrame, é necessário ter privilégio para CREATE tabela no catálogo e no esquema.
- Copie e cole o código a seguir em uma célula vazia do notebook. Este código salva o conteúdo do DataFrame em uma tabela usando a variável que você definiu no início deste tutorial.
- Python
- Scala
- R
df.write.mode("overwrite").saveAsTable(f"{path_table}.{table_name}")
df.write.mode("overwrite").saveAsTable(s"$pathTable" + "." + s"$tableName")
saveAsTable(df, paste(path_table, ".", table_name), mode = "overwrite")
- Pressione
Shift+Enterpara executar a célula e depois passar para a próxima célula.
A maioria dos aplicativos do Apache Spark trabalha com grandes conjuntos de dados e de forma distribuída. O Apache Spark grava um diretório de arquivos em vez de um único arquivo. O Delta Lake divide as pastas e arquivos do Parquet. Muitos sistemas de dados conseguem ler esses diretórios de arquivos. A Databricks recomenda o uso de tabelas em vez de caminhos de arquivo para a maioria das aplicações.
Salvar o DataFrame em arquivos JSON
- Copie e cole o código a seguir em uma célula vazia do Notebook. Esse código salva o DataFrame em um diretório de arquivos JSON.
- Python
- Scala
- R
df.write.format("json").mode("overwrite").save("/tmp/json_data")
df.write.format("json").mode("overwrite").save("/tmp/json_data")
write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")
- Pressione
Shift+Enterpara executar a célula e depois passar para a próxima célula.
Ler o DataFrame de um arquivo JSON
Saiba como usar o método Apache Spark spark.read.format() para read.json dados de um diretório em um DataFrame.
- Copie e cole o código a seguir em uma célula vazia do Notebook. Esse código exibe os arquivos JSON que o senhor salvou no exemplo anterior.
- Python
- Scala
- R
display(spark.read.format("json").json("/tmp/json_data"))
display(spark.read.format("json").json("/tmp/json_data"))
display(read.json("/tmp/json_data"))
- Pressione
Shift+Enterpara executar a célula e depois passar para a próxima célula.
Tarefa adicional: execução SQL consultas em PySpark, Scala, e R
O DataFrames do Apache Spark oferece as seguintes opções para combinar SQL com PySpark, Scala e R. Você pode executar o código a seguir no mesmo notebook que criou para este tutorial.
Especificar uma coluna como uma consulta SQL
Aprenda como usar o método selectExpr() do Apache Spark. Essa é uma variante do método select() que aceita expressões SQL e retorna um DataFrame atualizado. Este método permite usar uma expressão SQL, como upper.
- Copie e cole o código a seguir em uma célula vazia do notebook. Esse código usa o método
selectExpr()do Apache Spark e a expressãoupperdo SQL para converter uma coluna de strings em maiúsculas (e renomear a coluna).
- Python
- Scala
- R
display(df.selectExpr("Count", "upper(County) as big_name"))
display(df.selectExpr("Count", "upper(County) as big_name"))
display(df_selected <- selectExpr(df, "Count", "upper(County) as big_name"))
- Pressione
Shift+Enterpara executar a célula e depois passar para a próxima célula.
Use expr() para usar a sintaxe SQL para uma coluna
Saiba como importar e usar a função expr() do Apache Spark para usar a sintaxe SQL em qualquer local onde haveria especificação de uma coluna.
- Copie e cole o código a seguir em uma célula vazia do notebook. Esse código importa a função
expr()e, em seguida, usa a funçãoexpr()do Apache Spark e a expressãolowerdo SQL para converter uma coluna de strings em minúsculas (e renomear a coluna).
- Python
- Scala
- R
from pyspark.sql.functions import expr
display(df.select("Count", expr("lower(County) as little_name")))
import org.apache.spark.sql.functions.{col, expr}
// Scala requires us to import the col() function as well as the expr() function
display(df.select(col("Count"), expr("lower(County) as little_name")))
display(df_selected <- selectExpr(df, "Count", "lower(County) as little_name"))
# expr() function is not supported in R, selectExpr in SparkR replicates this functionality
- Pressione
Shift+Enterpara executar a célula e depois passar para a próxima célula.
execução de uma consulta arbitrária SQL usando spark.sql() função
Saiba como usar a função spark.sql() do Apache Spark para executar consultas SQL arbitrárias.
- Copie e cole o código a seguir em uma célula vazia do notebook. Esse código usa a função
spark.sql()do Apache Spark para consultar uma tabela SQL usando a sintaxe SQL.
- Python
- Scala
- R
display(spark.sql(f"SELECT * FROM {path_table}.{table_name}"))
display(spark.sql(s"SELECT * FROM $pathTable.$tableName"))
display(sql(paste("SELECT * FROM", path_table, ".", table_name)))
- Pressione
Shift+Enterpara executar a célula e depois passar para a próxima célula.
DataFrame tutorial Caderno de anotações
Os notebooks a seguir incluem os exemplos de consultas deste tutorial.
- Python
- Scala
- R