Noções básicas de PySpark

Este artigo apresenta exemplos simples para ilustrar o uso do PySpark. Ele pressupõe que o senhor compreenda os conceitos fundamentais doApache Spark e esteja executando o comando em um Databricks Notebook conectado a compute. O senhor cria DataFrames usando dados de amostra, realiza transformações básicas, incluindo operações de linha e coluna nesses dados, combina vários DataFrames e agrega esses dados, visualiza esses dados e os salva em uma tabela ou arquivo.

Carregar dados

Alguns exemplos neste artigo usam dados de amostra fornecidos pelo site Databrickspara demonstrar o uso do site DataFrames para carregar, transformar e salvar dados. Se quiser usar seus próprios dados que ainda não estão em Databricks, o senhor pode upload primeiro e criar um DataFrame a partir deles. Consulte Criar ou modificar uma tabela usando o arquivo upload e arquivosupload em um volume Unity Catalog .

Sobre os dados de amostra da Databricks

A Databricks fornece dados de amostra no catálogo samples e no diretório /databricks-datasets.

  • Para acessar os dados de amostra no catálogo samples, use o formato samples.<schema-name>.<table-name>. Este artigo usa tabelas no esquema samples.tpch, que contém dados de uma empresa fictícia. A tabela customer contém informações sobre os clientes e a orders contém informações sobre os pedidos feitos por esses clientes.

  • Use dbutils.fs.ls para explorar os dados em /databricks-datasets. Use o Spark SQL ou DataFrames para consultar dados nesse local usando caminhos de arquivo. Para saber mais sobre os dados de amostra fornecidos pelo site Databricks, consulte Conjunto de dados de amostra.

Importar tipos de dados

Muitas operações do PySpark exigem que o senhor use funções SQL ou interaja com tipos nativos do Spark. O senhor pode importar diretamente apenas as funções e os tipos de que precisa ou pode importar o módulo inteiro.

# import all
from pyspark.sql.types import *
from pyspark.sql.functions import *

# import select functions and types
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.functions import floor, round

Como algumas funções importadas podem substituir as funções integradas do Python, alguns usuários optam por importar esses módulos usando um alias. Os exemplos a seguir mostram um alias comum usado em exemplos de código do Apache Spark:

import pyspark.sql.types as T
import pyspark.sql.functions as F

Para obter uma lista abrangente de tipos de dados, consulte Tipos de dados do Spark.

Para obter uma lista abrangente das funções SQL do PySpark, consulte Spark Functions.

Criar um DataFrame

Há várias maneiras de criar um DataFrame. Normalmente, o senhor define um DataFrame em relação a uma fonte de dados, como uma tabela ou uma coleção de arquivos. Em seguida, conforme descrito na seção de conceitos fundamentais do Apache Spark, use uma ação, como display, para acionar a execução das transformações. O método display gera DataFrames.

Criar um DataFrame com os valores especificados

Para criar um DataFrame com valores especificados, use o método createDataFrame, em que as linhas são expressas como uma lista de tuplas:

df_children = spark.createDataFrame(
  data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
  schema = ['name', 'age'])
display(df_children)

Observe na saída que os tipos de dados das colunas de df_children são automaticamente inferidos. Como alternativa, o senhor pode especificar os tipos adicionando um esquema. Os esquemas são definidos usando StructType, que é composto de StructFields que especificam o nome, o tipo de dados e um sinalizador Boolean que indica se eles contêm um valor nulo ou não. O senhor deve importar tipos de dados de pyspark.sql.types.

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

df_children_with_schema = spark.createDataFrame(
  data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
  schema = StructType([
    StructField('name', StringType(), True),
    StructField('age', IntegerType(), True)
  ])
)
display(df_children_with_schema)

Criar um DataFrame a partir de uma tabela no Unity Catalog

Para criar um DataFrame a partir de uma tabela no Unity Catalog, use o método table que identifica a tabela usando o formato <catalog-name>.<schema-name>.<table-name>. Clique em Catalog na barra de navegação à esquerda para usar o Catalog Explorer e navegar até sua tabela. Clique nele e selecione Copiar caminho da tabela para inserir o caminho da tabela no site Notebook.

O exemplo a seguir carrega a tabela samples.tpch.customer, mas o senhor pode, alternativamente, fornecer o caminho para sua própria tabela.

df_customer = spark.table('samples.tpch.customer')
display(df_customer)

Criar um DataFrame a partir de um arquivo de upload

Para criar um DataFrame a partir de um arquivo que o senhor carrega em volumes Unity Catalog, use a propriedade read. Esse método retorna um DataFrameReader, que o senhor pode usar para ler o formato apropriado. Clique na opção de catálogo na pequena barra lateral à esquerda e use o navegador de catálogo para localizar seu arquivo. Selecione-o e clique em Copy volume file path (Copiar caminho do arquivo de volume).

O exemplo abaixo lê um arquivo *.csv, mas o DataFrameReader suporta o upload de arquivos em muitos outros formatos. Consulte os métodos do DataFrameReader.

# Assign this variable your full volume file path
volume_file_path = ""

df_csv = (spark.read
  .format("csv")
  .option("header", True)
  .option("inferSchema", True)
  .load(volume_file_path)
)
display(df_csv)

Para obter mais informações sobre os volumes Unity Catalog, consulte O que são volumes Unity Catalog?

Criar um DataFrame a partir de uma resposta JSON

Para criar um DataFrame a partir de uma carga útil de resposta JSON retornada por uma API REST, use o pacote Python requests para consultar e analisar a resposta. O senhor deve importar o pacote para usá-lo. Este exemplo usa dados do banco de dados de pedidos de medicamentos da United States Food and Drug Administration.

import requests

# Download data from URL
url = "https://api.fda.gov/drug/drugsfda.json?limit=100"
response = requests.get(url)

# Create the DataFrame
df_drugs = spark.createDataFrame(response.json()["results"])
display(df_drugs)

Para obter informações sobre como trabalhar com JSON e outros dados semiestruturados em Databricks, consulte Modelar dados semiestruturados.

Selecione um campo ou objeto JSON

Para selecionar um campo ou objeto específico do JSON convertido, use a notação []. Por exemplo, para selecionar o campo products que, por sua vez, é uma matriz de produto:

display(df_drugs.select(df_drugs["products"]))

O senhor também pode encadear chamadas de método para percorrer vários campos. Por exemplo, para gerar o nome da marca do primeiro produto em um pedido de medicamento:

display(df_drugs.select(df_drugs["products"][0]["brand_name"]))

Criar um DataFrame a partir de um arquivo

Para demonstrar a criação de um DataFrame a partir de um arquivo, este exemplo carrega dados CSV no diretório /databricks-datasets.

Para navegar até o conjunto de dados de amostra, o senhor pode usar o comando do sistema de arquivos Databricks Utilties. O exemplo a seguir usa dbutils para listar o conjunto de dados disponível em /databricks-datasets:

display(dbutils.fs.ls('/databricks-datasets'))

Como alternativa, o senhor pode usar %fs para acessar Databricks CLI file system comando, conforme mostrado no exemplo a seguir:

%fs ls '/databricks-datasets'

Para criar um DataFrame a partir de um arquivo ou diretório de arquivos, especifique o caminho no método load:

df_population = (spark.read
  .format("csv")
  .option("header", True)
  .option("inferSchema", True)
  .load("/databricks-datasets/samples/population-vs-price/data_geo.csv")
)
display(df_population)

transformação de dados com DataFrames

DataFrames facilitam a transformação de dados usando métodos integrados para classificar, filtrar e agregar dados. Muitas transformações não são especificadas como métodos em DataFrames, mas são fornecidas no pacote spark.sql.functions. Consulte Databricks Spark SQL Funções.

Coluna operações

O Spark fornece muitas operações básicas de coluna:

Dica

Para gerar todas as colunas em um DataFrame, use columns, por exemplo, df_customer.columns.

Selecionar colunas

O senhor pode selecionar colunas específicas usando select e col. A função col está no submódulo pyspark.sql.functions.

from pyspark.sql.functions import col

df_customer.select(
  col("c_custkey"),
  col("c_acctbal")
)

O senhor também pode se referir a uma coluna usando expr, que usa uma expressão definida como uma cadeia de caracteres:

from pyspark.sql.functions import expr

df_customer.select(
  expr("c_custkey"),
  expr("c_acctbal")
)

O senhor também pode usar selectExpr, que aceita expressões SQL:

df_customer.selectExpr(
  "c_custkey as key",
  "round(c_acctbal) as account_rounded"
)

Para selecionar colunas usando um literal de cadeia de caracteres, faça o seguinte:

df_customer.select(
  "c_custkey",
  "c_acctbal"
)

Para selecionar explicitamente uma coluna de um DataFrame específico, o senhor pode usar o operador [] ou o operador .. (O operador . não pode ser usado para selecionar colunas que comecem com um número inteiro ou que contenham um espaço ou caractere especial). Isso pode ser especialmente útil quando o senhor estiver unindo DataFrames em que algumas colunas têm o mesmo nome.

df_customer.select(
  df_customer["c_custkey"],
  df_customer["c_acctbal"]
)
df_customer.select(
  df_customer.c_custkey,
  df_customer.c_acctbal
)

Criar colunas

Para criar uma nova coluna, use o método withColumn. O exemplo a seguir cria uma nova coluna que contém um valor Boolean com base no fato de o cliente account ter um saldo c_acctbal superior a 1000:

df_customer_flag = df_customer.withColumn("balance_flag", col("c_acctbal") > 1000)

Renomear colunas

Para renomear uma coluna, use o método withColumnRenamed, que aceita os nomes da coluna existente e da nova:

df_customer_flag_renamed = df_customer_flag.withColumnRenamed("balance_flag", "balance_flag_renamed")

O método alias é especialmente útil quando o senhor deseja renomear as colunas como parte das agregações:

from pyspark.sql.functions import avg

df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
    avg(df_customer["c_acctbal"]).alias("avg_account_balance")
)

display(df_segment_balance)

Tipos de colunas fundidas

Em alguns casos, o senhor pode querer alterar o tipo de dados de uma ou mais colunas do DataFrame. Para fazer isso, use o método cast para converter entre os tipos de dados de coluna. O exemplo a seguir mostra como converter uma coluna de um tipo inteiro para um tipo de cadeia de caracteres, usando o método col para fazer referência a uma coluna:

from pyspark.sql.functions import col

df_casted = df_customer.withColumn("c_custkey", col("c_custkey").cast(StringType()))
print(type(df_casted))

Remover colunas

Para remover colunas, o senhor pode omitir colunas durante uma seleção ou select(*) except ou pode usar o método drop:

df_customer_flag_renamed.drop("balance_flag_renamed")

O senhor também pode soltar várias colunas de uma vez:

df_customer_flag_renamed.drop("c_phone", "balance_flag_renamed")

Operações em fila

O Spark fornece muitas operações básicas de linha:

Filtrar linhas

Para filtrar as linhas, use o método filter ou where em um DataFrame para retornar apenas determinadas linhas. Para identificar uma coluna para filtrar, use o método col ou uma expressão que seja avaliada como uma coluna.

from pyspark.sql.functions import col

df_that_one_customer = df_customer.filter(col("c_custkey") == 412449)

Para filtrar em várias condições, use operadores lógicos. Por exemplo, & e | permitem que o senhor faça AND e OR condições, respectivamente. O exemplo a seguir filtra as linhas em que c_nationkey é igual a 20 e c_acctbal é maior que 1000.

df_customer.filter((col("c_nationkey") == 20) & (col("c_acctbal") > 1000))
df_filtered_customer = df_customer.filter((col("c_custkey") == 412446) | (col("c_custkey") == 412447))

Remover linhas duplicadas

Para eliminar a duplicação de linhas, use distinct, que retorna apenas as linhas exclusivas.

df_unique = df_customer.distinct()

Manipular valores nulos

Para lidar com valores nulos, elimine as linhas que contêm valores nulos usando o método na.drop. Esse método permite que o senhor especifique se deseja descartar as linhas que contêm any valores nulos ou all valores nulos.

Para eliminar qualquer valor nulo, use um dos exemplos a seguir.

df_customer_no_nulls = df_customer.na.drop()
df_customer_no_nulls = df_customer.na.drop("any")

Se, em vez disso, o senhor quiser filtrar apenas as linhas que contêm todos os valores nulos, use o seguinte:

df_customer_no_nulls = df_customer.na.drop("all")

O senhor pode aplicar isso a um subconjunto de colunas especificando isso, conforme mostrado abaixo:

df_customer_no_nulls = df_customer.na.drop("all", subset=["c_acctbal", "c_custkey"])

Para preencher os valores ausentes, use o método fill. O senhor pode optar por aplicar isso a todas as colunas ou a um subconjunto de colunas. No exemplo abaixo account, os saldos que têm um valor nulo para seu saldo account c_acctbal são preenchidos com 0.

df_customer_filled = df_customer.na.fill("0", subset=["c_acctbal"])

Para substituir strings por outros valores, use o método replace. No exemplo abaixo, qualquer endereço vazio strings é substituído pela palavra UNKNOWN:

df_customer_phone_filled = df_customer.na.replace([""], ["UNKNOWN"], subset=["c_phone"])

Anexar linhas

Para anexar linhas, o senhor precisa usar o método union para criar um novo DataFrame. No exemplo a seguir, o DataFrame df_that_one_customer criado anteriormente e df_filtered_customer são combinados, o que retorna um DataFrame com três clientes:

df_appended_rows = df_that_one_customer.union(df_filtered_customer)

display(df_appended_rows)

Observação

O senhor também pode combinar DataFrames gravando-os em uma tabela e, em seguida, acrescentando novas linhas. Para cargas de trabalho de produção, o processamento incremental de fontes de dados em uma tabela de destino pode reduzir drasticamente a latência e os custos compute à medida que os dados aumentam de tamanho. Consulte Ingerir dados em um lakehouse da Databricks.

Ordenar linhas

Importante

A classificação pode ser cara em escala e, se o senhor armazenar dados classificados e recarregar os dados com o Spark, a ordem não será garantida. Certifique-se de que o senhor seja intencional no uso da classificação.

Para classificar as linhas por uma ou mais colunas, use o método sort ou orderBy. Em default, esses métodos são classificados em ordem crescente:

df_customer.orderBy(col("c_acctbal"))

Para filtrar em ordem decrescente, use desc:

df_customer.sort(col("c_custkey").desc())

O exemplo a seguir mostra como classificar em duas colunas:

df_sorted = df_customer.orderBy(col("c_acctbal").desc(), col("c_custkey").asc())
df_sorted = df_customer.sort(col("c_acctbal").desc(), col("c_custkey").asc())

Para limitar o número de linhas a serem retornadas depois que o DataFrame for classificado, use o método limit. O exemplo a seguir exibe apenas os principais resultados 10:

display(df_sorted.limit(10))

Unir DataFrames

Para join dois ou mais DataFrames, use o método join. O senhor pode especificar como gostaria que o DataFrames fosse unido nos parâmetros how (o tipo join ) e on (em quais colunas basear o join). Os tipos comuns de join incluem:

  • inner: Este é o tipo join default, que retorna um DataFrame que mantém apenas as linhas em que há uma correspondência para o parâmetro on no DataFrames.

  • left: Mantém todas as linhas do primeiro DataFrame especificado e somente as linhas do segundo DataFrame especificado que tenham uma correspondência com o primeiro.

  • outer: Um join externo mantém todas as linhas de ambos os DataFrames independentemente da correspondência.

Para obter informações detalhadas sobre o join, consulte Work with join em Databricks. Para obter uma lista de junções compatíveis em PySpark, consulte DataFrame join.

O exemplo a seguir retorna um único DataFrame em que cada linha do orders DataFrame é unida à linha correspondente do customers DataFrame. DataFrame. É usado um join interno, pois a expectativa é que cada pedido corresponda exatamente a um cliente.

df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')

df_joined = df_order.join(
  df_customer,
  on = df_order["o_custkey"] == df_customer["c_custkey"],
  how = "inner"
)

display(df_joined)

Para join em várias condições, use os operadores Boolean, como & e |, para especificar AND e OR, respectivamente. O exemplo a seguir acrescenta uma condição adicional, filtrando apenas as linhas que têm o_totalprice maior que 500,000:

df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')

df_complex_joined = df_order.join(
  df_customer,
  on = ((df_order["o_custkey"] == df_customer["c_custkey"]) & (df_order["o_totalprice"] > 500000)),
  how = "inner"
)

display(df_complex_joined)

Dados agregados

Para agregar dados em um DataFrame, semelhante a um GROUP BY no SQL, use o método groupBy para especificar as colunas a serem agrupadas e o método agg para especificar as agregações. Importar agregações comuns, incluindo avg, sum, max e min de pyspark.sql.functions. O exemplo a seguir mostra o saldo médio do cliente por segmento de mercado:

from pyspark.sql.functions import avg

# group by one column
df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
    avg(df_customer["c_acctbal"])
)

display(df_segment_balance)
from pyspark.sql.functions import avg

# group by two columns
df_segment_nation_balance = df_customer.groupBy("c_mktsegment", "c_nationkey").agg(
    avg(df_customer["c_acctbal"])
)

display(df_segment_nation_balance)

Algumas agregações são ações, o que significa que elas acionam cálculos. Nesse caso, o senhor não precisa usar outras ações para gerar resultados.

Para contar as linhas em um DataFrame, use o método count:

df_customer.count()

Encadeamento de chamadas

Os métodos que transformam DataFrames retornam DataFrames, e o Spark não atua nas transformações até que as ações sejam chamadas. Essa avaliação preguiçosa significa que o senhor pode encadear vários métodos para maior conveniência e legibilidade. O exemplo a seguir mostra como encadear a filtragem, a agregação e a ordenação:

from pyspark.sql.functions import count

df_chained = (
    df_order.filter(col("o_orderstatus") == "F")
    .groupBy(col("o_orderpriority"))
    .agg(count(col("o_orderkey")).alias("n_orders"))
    .sort(col("n_orders").desc())
)

display(df_chained)

Visualize seu DataFrame

Para visualizar um DataFrame em um Notebook, clique no sinal + ao lado da tabela na parte superior esquerda do DataFrame e selecione Visualization (Visualização ) para adicionar um ou mais gráficos com base em seu DataFrame. Para obter detalhes sobre visualizações, consulte Visualizações em Databricks Notebook.

display(df_order)

Para realizar visualizações adicionais, Databricks recomenda usar Pandas API para Spark. O .pandas_api() permite que o senhor faça o cast para o site correspondente Pandas API para um Spark DataFrame. Para obter mais informações, consulte Pandas API em Spark.

Salve seus dados

Depois de transformar os dados, o senhor pode salvá-los usando os métodos DataFrameWriter. Uma lista completa desses métodos pode ser encontrada em DataFrameWriter. As seções a seguir mostram como salvar o DataFrame como uma tabela e como uma coleção de arquivos de dados.

Salvar seu DataFrame como uma tabela

Para salvar o DataFrame como uma tabela no Unity Catalog, use o método write.saveAsTable e especifique o caminho no formato <catalog-name>.<schema-name>.<table-name>.

df_joined.write.saveAsTable(f"{catalog_name}.{schema_name}.{table_name}")

Escreva seu DataFrame como CSV

Para gravar seu DataFrame no formato *.csv, use o método write.csv, especificando o formato e as opções. Em default, se os dados existirem no caminho especificado, as operações de gravação falharão. O senhor pode especificar um dos seguintes modos para realizar uma ação diferente:

  • overwrite sobrescreve todos os dados existentes no caminho de destino com o conteúdo do DataFrame.

  • append anexa o conteúdo do DataFrame aos dados no caminho de destino.

  • ignore falha silenciosamente a gravação se houver dados no caminho de destino.

O exemplo a seguir demonstra a substituição de dados com o conteúdo do DataFrame como arquivos CSV:

# Assign this variable your file path
file_path = ""

(df_joined.write
  .format("csv")
  .mode("overwrite")
  .write(file_path)
)