Pular para o conteúdo principal

Ler e gravar arquivos CSV

CSV (valores separados por vírgula) é um formato tabular de texto simples amplamente utilizado para troca de dados, pipelines de ETL e armazenamento de dados de uso geral. O Databricks oferece suporte a CSV para leitura e gravação com o Apache Spark, incluindo inferência de esquema, compactação, tratamento de registros malformados e dados resgatados.

nota

A Databricks recomenda a funçãoread_files table-valued para que os usuários de SQL leiam arquivos CSV. read_files está disponível em Databricks Runtime 13.3 LTS e acima.

Você também pode usar uma exibição temporária. Se você usar SQL para ler dados CSV diretamente sem usar exibições temporárias ou read_files, as seguintes limitações se aplicam:

Pré-requisitos

Databricks não exige configuração adicional para usar arquivos CSV. No entanto, para transmitir arquivos CSV, é necessário ter Auto Loader.

Opções

Use os métodos .option() e .options() de DataFrameReader e DataFrameWriter para configurar a fonte de dados CSV . Para obter uma lista completa das opções suportadas, consulte DataFrameReader Opções CSV e DataFrameWriter Opções CSV.

Uso

Os exemplos a seguir demonstram a leitura e gravação de arquivos CSV, a especificação de esquemas e o tratamento de registros malformados.

Ler arquivos CSV

O exemplo a seguir usa o Wanderbricks dataset de exemplo. Grava dados de revisões em CSV e depois os lê de volta.

Python
# Write wanderbricks reviews to CSV format
df = spark.read.table("samples.wanderbricks.reviews")
df.write.format("csv").option("header", "true").save("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")

# Read the CSV file into a DataFrame
df = (spark.read
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv"))
display(df)
df.printSchema()

Ler arquivos CSV usando SQL

O exemplo de SQL a seguir faz a leitura de um arquivo CSV usando read_files.

SQL
-- mode "FAILFAST" aborts file parsing with a RuntimeException if malformed lines are encountered
SELECT * FROM read_files(
's3://<bucket>/<path>/<file>.csv',
format => 'csv',
header => true,
mode => 'FAILFAST')

Especificar um esquema

Quando o esquema do arquivo CSV é conhecido, você pode especificar o esquema desejado para o leitor de CSV com a opção schema.

Python
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

schema = StructType([
StructField("review_id", StringType(), True),
StructField("rating", IntegerType(), True),
StructField("comment", StringType(), True)
])

df = spark.read.format("csv").schema(schema).option("header", "true").load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
df.printSchema()

Ler um subconjunto de colunas

O comportamento do analisador de CSV depende de quais colunas forem lidas. Se o esquema especificado não corresponder à disposição do arquivo, os resultados podem diferir consideravelmente, dependendo de quais colunas forem acessadas. O CSV não possui metadados de nome de coluna, portanto, o Spark mapeia os campos do esquema para as colunas por posição — um esquema incompatível desloca os valores para os campos errados.

Python
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Read only a subset of columns by specifying a partial schema
schema = StructType([
StructField("review_id", StringType(), True),
StructField("rating", IntegerType(), True)
])

df = spark.read.format("csv").schema(schema).option("header", "true").load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
display(df)

Lidar com registros CSV malformados

Na leitura de arquivos CSV com um esquema especificado, é possível que os dados dos arquivos não correspondam ao esquema. Por exemplo, um campo contendo o nome da cidade não será analisado como um número inteiro. As consequências dependem do modo em que o analisador for executado:

  • PERMISSIVE (padrão): são inseridos valores "null" em campos que não puderam ser analisados corretamente
  • DROPMALFORMED: descarta linhas que contêm campos que não puderam ser analisados
  • FAILFAST: aborta a leitura se for encontrado algum dado malformado

Para definir o modo, use a opção mode.

Python
df = (spark.read
.format("csv")
.option("header", "true")
.option("mode", "PERMISSIVE")
.load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
)

No modo PERMISSIVE é possível inspecionar as linhas que não puderam ser analisadas corretamente com um dos seguintes métodos:

  • Você pode informar um caminho personalizado para a opção badRecordsPath para registrar registros corrompidos em um arquivo.
  • Você pode adicionar a coluna _corrupt_record ao esquema informado ao DataFrameReader para revisar registros corrompidos no DataFrame resultante.
nota

A opção badRecordsPath tem precedência sobre _corrupt_record, o que significa que as linhas malformadas gravadas no caminho informado não aparecem no DataFrame resultante.

o comportamento padrão para registros malformados muda ao usar a coluna de dados resgatados.

Para inspecionar linhas malformadas usando _corrupt_record, adicione-o ao esquema e filtre os valores não NULOS:

Python
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

schema = StructType([
StructField("review_id", StringType(), True),
StructField("rating", IntegerType(), True),
StructField("comment", StringType(), True),
StructField("_corrupt_record", StringType(), True)
])

df = (spark.read
.format("csv")
.option("header", "true")
.option("mode", "PERMISSIVE")
.schema(schema)
.load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
)
display(df.filter(df["_corrupt_record"].isNotNull()))

Ativar a coluna de dados resgatados

nota

Este recurso é compatível com Databricks Runtime 8.3 e versões superiores.

Ao usar o modo PERMISSIVE, você pode ativar a coluna de dados resgatados para capturar qualquer dado que não tenha sido analisado porque um ou mais campos em um registro apresentam um dos seguintes problemas:

  • Ausente do esquema informado.
  • Não corresponde ao tipo de dados do esquema informado.
  • Tem incompatibilidade de maiúsculas e minúsculas nos nomes de campos no esquema informado.

A coluna de dados resgatados é retornada como um documento JSON contendo as colunas que foram resgatadas e o caminho do arquivo de origem do registro.

Para ativar a coluna de dados resgatados, defina a opção rescuedDataColumn para um nome de coluna na leitura:

Python
df = spark.read.option("rescuedDataColumn", "_rescued_data").format("csv").load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")

Para remover o caminho do arquivo de origem da coluna de dados resgatados, defina:

Python
spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")

O analisador CSV suporta três modos ao analisar registros: PERMISSIVE, DROPMALFORMEDe FAILFAST. Quando utilizado junto com rescuedDataColumn, as incompatibilidades de tipo de dados não fazem com que os registros sejam eliminados no modo DROPMALFORMED nem geram um erro no modo FAILFAST. Somente registros corrompidos, ou seja, CSV incompleto ou malformado, são descartados ou geram erros.

Quando rescuedDataColumn é usado no modo PERMISSIVE, as seguintes regras se aplicam a registros corrompidos:

  • A primeira linha do arquivo (uma linha de cabeçalho ou uma linha de dados) define o comprimento de linha esperado.
  • Uma linha com um número diferente de colunas é considerada incompleta.
  • Incompatibilidades de tipo de dados não são consideradas registros corrompidos.
  • Apenas registros CSV incompletos e malformados são considerados corrompidos e registrados na coluna _corrupt_record ou badRecordsPath.

Recursos adicionais

  • Ler e gravar arquivos Parquet: Se sua carga de trabalho exigir melhor desempenho de consulta ou armazenamento mais eficiente, a disposição em coluna do Parquet oferece vantagens significativas em relação ao formato de texto simples do CSV.