Tutorial: Carregamento e transformação de dados em PySpark DataFrames

Este tutorial mostra como carregar e transformar o uso de dados da cidade dos EUA na API Apache Spark Python (PySpark) DataFrame em Databricks.

Ao final deste tutorial, você entenderá o que é um DataFrame e estará familiarizado com a seguinte tarefa:

Consulte também a referência da API do Apache Spark PySpark.

O que é um DataFrame?

Um DataFrame é uma estrutura de dados bidimensional rotulada com colunas de tipos potencialmente diferentes. Você pode pensar em um DataFrame como uma planilha, uma tabela SQL ou um dicionário de objetos em série. O Apache Spark DataFrames fornece um conjunto repleto de funções (selecionar colunas, filtrar, unir, agregar) que permite que você resolva problemas comuns de análise de dados com eficiência.

O Apache Spark DataFrames é uma abstração construída sobre Resilient Distributed Datasets (RDDs). O Spark DataFrames e o Spark SQL usam um mecanismo unificado de planejamento e otimização, permitindo que você obtenha desempenho quase idêntico em todas as linguagens compatíveis com o Databricks (Python, SQL, Scala e R).

Requisitos

Para concluir o tutorial a seguir, você deve atender aos seguintes requisitos:

Observação

Se o senhor não tiver privilégios de controle de clusters, ainda poderá concluir a maioria dos passos a seguir, desde que tenha acesso a um cluster.

Na barra lateral da página inicial, o senhor acessa as entidades Databricks: o navegador workspace, o Catalog Explorer, o fluxo de trabalho e o compute. workspace é a pasta raiz que armazena seu Databricks ativo, como Notebook e biblioteca.

o passo 1: Crie um DataFrame com Python

Para saber como navegar no Databricks Notebook, consulte Interface e controles do Databricks Notebook .

  1. Abra um novo Notebook clicando no ícone Novo ícone.

  2. Copie e cole o código a seguir na célula vazia Notebook e, em seguida, pressione Shift+Enter para executar a célula. O exemplo de código a seguir cria um DataFrame chamado df1 com dados sobre a população da cidade e exibe seu conteúdo.

    # Create a data frame from the given data
    data = [[295, "South Bend", "Indiana", "IN", 101190, 112.9]]
    columns = ["rank", "city", "state", "code", "population", "price"]
    
    df1 = spark.createDataFrame(data, schema="rank LONG, city STRING, state STRING, code STRING, population LONG, price DOUBLE")
    display(df1)
    

o passo 2: Carregar dados em um DataFrame a partir de arquivos

Adicione mais dados de população da cidade do diretório /databricks-datasets em df2.

Para carregar dados em DataFrame df2 a partir do arquivo data_geo.csv, copie e cole o seguinte código na nova célula vazia Notebook. Pressione Shift+Enter para executar a célula.

Você pode enviar dados de vários formatos de arquivo compatíveis. O exemplo a seguir usa um dataset disponível no diretório /databricks-datasets, acessível na maioria dos workspaces. Consulte Sample datasets (amostra de datasets).

# Create second dataframe from a file
df2 = (spark.read
  .format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("/databricks-datasets/samples/population-vs-price/data_geo.csv")
)

o passo 3: Visualize e interaja com seu DataFrame

view e interaja com os DataFrames de população da sua cidade usando os métodos a seguir.

Combinar DataFrames

Combine o conteúdo do primeiro DataFrame df1 com o Dataframe df2 que contém o conteúdo de data_geo.csv.

No Notebook, use o código de exemplo a seguir para criar um novo DataFrame que adiciona as linhas de um DataFrame a outro usando a união operações:

# Returns a DataFrame that combines the rows of df1 and df2
df = df1.union(df2)

Visualizar o DataFrame

Para view os dados da cidade dos EUA em formato tabular, use o comando display() do Databricks em uma célula Notebook .

display(df)

Filtrar linhas em um DataFrame

Descubra as cinco cidades mais populosas do seu conjunto de dados filtrando linhas usando .filter() ou .where(). Use a filtragem para selecionar um subconjunto de linhas para retornar ou modificar em um DataFrame. Não há diferença no desempenho ou na sintaxe, como pode ser visto nos exemplos a seguir.

# Filter rows using .filter()
filtered_df = df.filter(df["rank"] < 6)
display(filtered_df)

# Filter rows using .where()
filtered_df = df.where(df["rank"] < 6)
display(filtered_df)

Selecionar colunas a partir de um DataFrame

Saiba em qual estado uma cidade está localizada com o método select() . Selecione colunas passando um ou mais nomes de colunas para .select(), como no exemplo a seguir:

# Select columns from a DataFrame
select_df = df.select("City", "State")
display(select_df)

Crie um subconjunto DataFrame

Crie um subconjunto DataFrame com as dez cidades com maior população e exiba os dados resultantes. Combine query select e filter para limitar linhas e colunas retornadas, usando o seguinte código em seu Notebook:

# Create a subset DataFrame
subset_df = df.filter(df["rank"] < 11).select("City")
display(subset_df)

o passo 4: Salvar o DataFrame

Você pode salvar seu DataFrame em uma tabela ou gravá-lo em um arquivo ou em vários arquivos.

Salve o DataFrame em uma tabela

Databricks usa o formato Delta Lake para todas as tabelas por default. Para salvar seu DataFrame, você deve ter CREATE privilégios de tabela no catálogo e no esquema. O exemplo a seguir salva o conteúdo do DataFrame em uma tabela chamada us_cities:

# Save DataFrame to a table
df.write.saveAsTable("us_cities")

A maioria das aplicações Spark funcionam em grandes conjuntos de dados e de forma distribuída. Spark grava um diretório de arquivos em vez de um único arquivo. Delta Lake divide as pastas e arquivos do Parquet. Muitos sistemas de dados podem ler esses diretórios de arquivos. A Databricks recomenda a utilização de tabelas em vez de caminhos de ficheiros para a maioria das aplicações.

Salve o DataFrame em arquivos JSON

O exemplo a seguir salva um diretório de arquivos JSON:

# Write a DataFrame to a directory of files
df.write.format("json").save("/tmp/json_data")

# To overwrite an existing file, use the following:
# df.write.format("json").mode("overwrite").save("/tmp/json_data")

Leia o DataFrame de um arquivo JSON

# Read a DataFrame from a JSON file
df3 = spark.read.format("json").json("/tmp/json_data")
display(df3)

Tarefa adicional: execução query SQL no PySpark

Spark DataFrames fornece as seguintes opções para combinar SQL com Python. Você pode executar o código a seguir no mesmo Notebook criado para este tutorial.

Especifique uma coluna como uma consulta SQL

O método selectExpr() permite especificar cada coluna como uma query SQL, como no exemplo a seguir:

display(df.selectExpr("`rank`", "upper(city) as big_name"))

Importar expr()

Você pode importar a função expr() de pyspark.sql.functions para usar a sintaxe SQL em qualquer lugar em que uma coluna seja especificada, como no exemplo a seguir:

from pyspark.sql.functions import expr

display(df.select("rank", expr("lower(city) as little_name")))

execução de uma consulta SQL arbitrária

Você pode usar spark.sql() para executar query SQL arbitrárias, como no exemplo a seguir:

query_df = spark.sql("SELECT * FROM us_cities")
display(query_df)

Parametrizar querySQL

Você pode usar a formatação Python para parametrizar consultas SQL, como no exemplo a seguir:

# Define the table name
table_name = "us_cities"

# Query the DataFrame using the constructed SQL query
query_df = spark.sql(f"SELECT * FROM {table_name}")
display(query_df)