Lidar com registros e arquivos incorretos
O Databricks oferece várias opções para lidar com arquivos que contêm registros incorretos. Exemplos de dados incorretos incluem:
- Registros incompletos ou corrompidos : Observado principalmente em formatos de arquivo baseados em texto, como JSON e CSV. Por exemplo, um registro JSON que não tenha uma chave de fechamento ou um registro CSV que não tenha tantas colunas quanto o cabeçalho ou o primeiro registro do arquivo CSV.
- Tipos de dados incompatíveis : quando o valor de uma coluna não tem o tipo de dados especificado ou inferido.
- Nomes de campo incorretos : podem ocorrer em todos os formatos de arquivo, quando o nome da coluna especificado no arquivo ou registro tem maiúsculas e minúsculas diferentes do esquema especificado ou inferido.
- Arquivos corrompidos : Quando um arquivo não pode ser lido, o que pode ser devido à corrupção de metadados ou dados em tipos de arquivos binários, como Avro, Parquet e ORC. Em raras ocasiões, pode ser causado por falhas transitórias de longa duração no sistema de armazenamento subjacente.
- Arquivos ausentes : Um arquivo que foi descoberto durante o tempo de análise da consulta e não existe mais no momento do processamento.
Uso badRecordsPath
Quando você define badRecordsPath
, o caminho especificado registra exceções para registros ou arquivos inválidos encontrados durante o carregamento de dados.
Além de registros e arquivos corrompidos, os erros que indicam arquivos excluídos, exceção de conexão de rede, exceção de IO e assim por diante são ignorados e registrados no badRecordsPath
.
O uso da opção badRecordsPath
em uma fonte de dados baseada em arquivo tem algumas limitações importantes:
- Não é transacional e pode levar a resultados inconsistentes.
- Erros transitórios são tratados como falhas.
Não foi possível encontrar o arquivo de entrada
val df = spark.read
.option("badRecordsPath", "/tmp/badRecordsPath")
.format("parquet").load("/input/parquetFile")
// Delete the input parquet file '/input/parquetFile'
dbutils.fs.rm("/input/parquetFile")
df.show()
No exemplo acima, como o df.show()
não consegue encontrar o arquivo de entrada, o Spark cria um arquivo de exceção no formato JSON para registrar o erro. Por exemplo, /tmp/badRecordsPath/20170724T101153/bad_files/xyz
é o caminho do arquivo de exceção. Esse arquivo está no diretório badRecordsPath
especificado, /tmp/badRecordsPath
. 20170724T101153
é a hora de criação deste DataFrameReader
. bad_files
é o tipo de exceção. xyz
é um arquivo que contém um registro JSON, que tem o caminho do arquivo inválido e a mensagem de exceção/razão.
O arquivo de entrada contém um registro incorreto
// Creates a json file containing both parsable and corrupted records
Seq("""{"a": 1, "b": 2}""", """{bad-record""").toDF().write.format("text").save("/tmp/input/jsonFile")
val df = spark.read
.option("badRecordsPath", "/tmp/badRecordsPath")
.schema("a int, b int")
.format("json")
.load("/tmp/input/jsonFile")
df.show()
Neste exemplo, o DataFrame contém apenas o primeiro registro analisável ({"a": 1, "b": 2}
). O segundo registro incorreto ({bad-record
) é registrado no arquivo de exceção, que é um arquivo JSON localizado em /tmp/badRecordsPath/20170724T114715/bad_records/xyz
. O arquivo de exceção contém o registro inválido, o caminho do arquivo que contém o registro e a mensagem de exceção/motivo. Depois de localizar os arquivos de exceção, o senhor pode usar um leitor de JSON para processá-los.