Ler e gravar arquivos CSV
CSV (valores separados por vírgula) é um formato tabular de texto simples amplamente utilizado para troca de dados, pipelines de ETL e armazenamento de dados de uso geral. O Databricks oferece suporte a CSV para leitura e gravação com o Apache Spark, incluindo inferência de esquema, compactação, tratamento de registros malformados e dados resgatados.
A Databricks recomenda a funçãoread_files table-valued para que os usuários de SQL leiam arquivos CSV. read_files está disponível em Databricks Runtime 13.3 LTS e acima.
Você também pode usar uma exibição temporária. Se você usar SQL para ler dados CSV diretamente sem usar exibições temporárias ou read_files, as seguintes limitações se aplicam:
- O senhor não pode especificar opções de fonte de dados.
- Você não pode especificar o esquema para os dados.
Pré-requisitos
Databricks não exige configuração adicional para usar arquivos CSV. No entanto, para transmitir arquivos CSV, é necessário ter Auto Loader.
Opções
Use os métodos .option() e .options() de DataFrameReader e DataFrameWriter para configurar a fonte de dados CSV . Para obter uma lista completa das opções suportadas, consulte DataFrameReader Opções CSV e DataFrameWriter Opções CSV.
Uso
Os exemplos a seguir demonstram a leitura e gravação de arquivos CSV, a especificação de esquemas e o tratamento de registros malformados.
Ler arquivos CSV
O exemplo a seguir usa o Wanderbricks dataset de exemplo. Grava dados de revisões em CSV e depois os lê de volta.
- Python
- Scala
- R
# Write wanderbricks reviews to CSV format
df = spark.read.table("samples.wanderbricks.reviews")
df.write.format("csv").option("header", "true").save("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
# Read the CSV file into a DataFrame
df = (spark.read
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv"))
display(df)
df.printSchema()
// Write wanderbricks reviews to CSV format
val reviews = spark.read.table("samples.wanderbricks.reviews")
reviews.write.format("csv").option("header", "true").save("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
// Read the CSV file into a DataFrame
val df = spark.read
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
df.show()
df.printSchema()
df <- read.df("/Volumes/<catalog>/<schema>/<volume>/reviews_csv", source = "csv", header = "true", inferSchema = "true")
display(df)
printSchema(df)
Ler arquivos CSV usando SQL
O exemplo de SQL a seguir faz a leitura de um arquivo CSV usando read_files.
-- mode "FAILFAST" aborts file parsing with a RuntimeException if malformed lines are encountered
SELECT * FROM read_files(
's3://<bucket>/<path>/<file>.csv',
format => 'csv',
header => true,
mode => 'FAILFAST')
Especificar um esquema
Quando o esquema do arquivo CSV é conhecido, você pode especificar o esquema desejado para o leitor de CSV com a opção schema.
- Python
- Scala
- SQL
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
schema = StructType([
StructField("review_id", StringType(), True),
StructField("rating", IntegerType(), True),
StructField("comment", StringType(), True)
])
df = spark.read.format("csv").schema(schema).option("header", "true").load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
df.printSchema()
import org.apache.spark.sql.types._
val schema = StructType(Array(
StructField("review_id", StringType, nullable = true),
StructField("rating", IntegerType, nullable = true),
StructField("comment", StringType, nullable = true)
))
val df = spark.read.format("csv").schema(schema).option("header", "true").load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
df.printSchema()
SELECT * FROM read_files(
'/Volumes/<catalog>/<schema>/<volume>/reviews_csv',
format => 'csv',
header => true,
schema => 'review_id string, rating int, comment string'
)
Ler um subconjunto de colunas
O comportamento do analisador de CSV depende de quais colunas forem lidas. Se o esquema especificado não corresponder à disposição do arquivo, os resultados podem diferir consideravelmente, dependendo de quais colunas forem acessadas. O CSV não possui metadados de nome de coluna, portanto, o Spark mapeia os campos do esquema para as colunas por posição — um esquema incompatível desloca os valores para os campos errados.
- Python
- Scala
- SQL
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
# Read only a subset of columns by specifying a partial schema
schema = StructType([
StructField("review_id", StringType(), True),
StructField("rating", IntegerType(), True)
])
df = spark.read.format("csv").schema(schema).option("header", "true").load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
display(df)
import org.apache.spark.sql.types._
val schema = StructType(Array(
StructField("review_id", StringType, nullable = true),
StructField("rating", IntegerType, nullable = true)
))
val df = spark.read.format("csv").schema(schema).option("header", "true").load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
df.show()
SELECT * FROM read_files(
'/Volumes/<catalog>/<schema>/<volume>/reviews_csv',
format => 'csv',
header => true,
schema => 'review_id string, rating int'
)
Lidar com registros CSV malformados
Na leitura de arquivos CSV com um esquema especificado, é possível que os dados dos arquivos não correspondam ao esquema. Por exemplo, um campo contendo o nome da cidade não será analisado como um número inteiro. As consequências dependem do modo em que o analisador for executado:
PERMISSIVE(padrão): são inseridos valores "null" em campos que não puderam ser analisados corretamenteDROPMALFORMED: descarta linhas que contêm campos que não puderam ser analisadosFAILFAST: aborta a leitura se for encontrado algum dado malformado
Para definir o modo, use a opção mode.
- Python
- Scala
- SQL
df = (spark.read
.format("csv")
.option("header", "true")
.option("mode", "PERMISSIVE")
.load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
)
val df = spark.read
.format("csv")
.option("header", "true")
.option("mode", "PERMISSIVE")
.load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
SELECT * FROM read_files(
'/Volumes/<catalog>/<schema>/<volume>/reviews_csv',
format => 'csv',
header => true,
mode => 'PERMISSIVE'
)
No modo PERMISSIVE é possível inspecionar as linhas que não puderam ser analisadas corretamente com um dos seguintes métodos:
- Você pode informar um caminho personalizado para a opção
badRecordsPathpara registrar registros corrompidos em um arquivo. - Você pode adicionar a coluna
_corrupt_recordao esquema informado ao DataFrameReader para revisar registros corrompidos no DataFrame resultante.
A opção badRecordsPath tem precedência sobre _corrupt_record, o que significa que as linhas malformadas gravadas no caminho informado não aparecem no DataFrame resultante.
o comportamento padrão para registros malformados muda ao usar a coluna de dados resgatados.
Para inspecionar linhas malformadas usando _corrupt_record, adicione-o ao esquema e filtre os valores não NULOS:
- Python
- Scala
- SQL
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
schema = StructType([
StructField("review_id", StringType(), True),
StructField("rating", IntegerType(), True),
StructField("comment", StringType(), True),
StructField("_corrupt_record", StringType(), True)
])
df = (spark.read
.format("csv")
.option("header", "true")
.option("mode", "PERMISSIVE")
.schema(schema)
.load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
)
display(df.filter(df["_corrupt_record"].isNotNull()))
import org.apache.spark.sql.types._
val schema = StructType(Array(
StructField("review_id", StringType, nullable = true),
StructField("rating", IntegerType, nullable = true),
StructField("comment", StringType, nullable = true),
StructField("_corrupt_record", StringType, nullable = true)
))
val df = spark.read
.format("csv")
.option("header", "true")
.option("mode", "PERMISSIVE")
.schema(schema)
.load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
df.filter(df("_corrupt_record").isNotNull).show()
SELECT * FROM read_files(
'/Volumes/<catalog>/<schema>/<volume>/reviews_csv',
format => 'csv',
header => true,
mode => 'PERMISSIVE',
schema => 'review_id string, rating int, comment string, _corrupt_record string'
)
WHERE _corrupt_record IS NOT NULL
Ativar a coluna de dados resgatados
Este recurso é compatível com Databricks Runtime 8.3 e versões superiores.
Ao usar o modo PERMISSIVE, você pode ativar a coluna de dados resgatados para capturar qualquer dado que não tenha sido analisado porque um ou mais campos em um registro apresentam um dos seguintes problemas:
- Ausente do esquema informado.
- Não corresponde ao tipo de dados do esquema informado.
- Tem incompatibilidade de maiúsculas e minúsculas nos nomes de campos no esquema informado.
A coluna de dados resgatados é retornada como um documento JSON contendo as colunas que foram resgatadas e o caminho do arquivo de origem do registro.
Para ativar a coluna de dados resgatados, defina a opção rescuedDataColumn para um nome de coluna na leitura:
- Python
- Scala
- SQL
df = spark.read.option("rescuedDataColumn", "_rescued_data").format("csv").load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
val df = spark.read.option("rescuedDataColumn", "_rescued_data").format("csv").load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
SELECT * FROM read_files(
'/Volumes/<catalog>/<schema>/<volume>/reviews_csv',
format => 'csv',
header => true,
rescuedDataColumn => '_rescued_data'
)
Para remover o caminho do arquivo de origem da coluna de dados resgatados, defina:
spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")
O analisador CSV suporta três modos ao analisar registros: PERMISSIVE, DROPMALFORMEDe FAILFAST. Quando utilizado junto com rescuedDataColumn, as incompatibilidades de tipo de dados não fazem com que os registros sejam eliminados no modo DROPMALFORMED nem geram um erro no modo FAILFAST. Somente registros corrompidos, ou seja, CSV incompleto ou malformado, são descartados ou geram erros.
Quando rescuedDataColumn é usado no modo PERMISSIVE, as seguintes regras se aplicam a registros corrompidos:
- A primeira linha do arquivo (uma linha de cabeçalho ou uma linha de dados) define o comprimento de linha esperado.
- Uma linha com um número diferente de colunas é considerada incompleta.
- Incompatibilidades de tipo de dados não são consideradas registros corrompidos.
- Apenas registros CSV incompletos e malformados são considerados corrompidos e registrados na coluna
_corrupt_recordoubadRecordsPath.
Recursos adicionais
- Ler e gravar arquivos Parquet: Se sua carga de trabalho exigir melhor desempenho de consulta ou armazenamento mais eficiente, a disposição em coluna do Parquet oferece vantagens significativas em relação ao formato de texto simples do CSV.