Pular para o conteúdo principal

Padrões comuns de carregamento de dados

Auto Loader simplifica uma série de tarefas comuns de ingestão de dados. Essa referência rápida fornece exemplos de vários padrões populares.

Filtrando diretórios ou arquivos usando padrões globais

Os padrões Glob podem ser usados para filtrar diretórios e arquivos quando fornecidos no caminho.

Padrão

Descrição

?

Corresponde a qualquer caractere único

*

Corresponde a zero ou mais caracteres

[abc]

Corresponde a um único caractere do conjunto de caracteres {a, b, c}.

[a-z]

Corresponde a um único caractere do intervalo de caracteres {a... z}.

[^a]

Corresponde a um único caractere que não é do conjunto de caracteres ou do intervalo {a}. Observe que o caractere ^ deve ocorrer imediatamente à direita do colchete de abertura.

{ab,cd}

Corresponde a uma cadeia de caracteres do conjunto de cadeias de caracteres {ab, cd}.

{ab,c{de, fh}}

Corresponde a uma cadeia de caracteres do conjunto de cadeias de caracteres {ab, cde, cfh}.

Use o path para fornecer padrões de prefixo, por exemplo:

Python
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", <format>) \
.schema(schema) \
.load("<base-path>/*/files")
important

Você precisa usar a opção pathGlobFilter para fornecer explicitamente padrões de sufixo. O path fornece apenas um filtro de prefixo.

Por exemplo, se você quiser analisar somente arquivos png em um diretório que contém arquivos com sufixos diferentes, você pode fazer:

Python
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "binaryFile") \
.option("pathGlobfilter", "*.png") \
.load(<base-path>)
nota

O comportamento de globbing do default do Auto Loader é diferente do comportamento do default de outras fontes de arquivos do Spark. Adicione .option("cloudFiles.useStrictGlobber", "true") à sua leitura para usar o globbing que corresponde ao comportamento de default Spark em relação às fontes de arquivos. Consulte a tabela a seguir para obter mais informações sobre globbing:

Padrão

Caminho do arquivo

padrão globber

Rigoroso globber

/a/b

/a/b/c/file.txt

Sim

Sim

/a/b

/a/b_dir/c/file.txt

Não

Não

/a/b

/a/b.txt

Não

Não

/a/b/

/a/b.txt

Não

Não

/a/ */c/

/a/b/c/file.txt

Sim

Sim

/a/ */c/

/a/b/c/d/file.txt

Sim

Sim

/a/ */c/

/a/b/x/y/c/file.txt

Sim

Não

/a/ */c

/a/b/c_file.txt

Sim

Não

/a/ */c/

/a/b/c_file.txt

Sim

Não

/a/ */c/

/a/ */cookie/file.txt

Sim

Não

/a/b *

/a/b.txt

Sim

Sim

/a/b *

/a/b/file.txt

Sim

Sim

/a/ {0.txt ,1.txt}

/a/0.txt

Sim

Sim

/a/ */ {0.txt ,1.txt}

/a/0.txt

Não

Não

/a/b/ [cde-h] /i/

/a/b/c/i/file.txt

Sim

Sim

Permitir ETL fácil

Uma maneira fácil de colocar seus dados no Delta Lake sem perder nenhum dado é usar o seguinte padrão e ativar a inferência de esquema com o Auto Loader. Databricks recomenda a execução do código a seguir em um trabalho Databricks para que ele reinicie automaticamente a transmissão quando o esquema dos dados de origem for alterado. Em default, o esquema é inferido como tipos de cadeia de caracteres, todos os erros de análise (não deve haver nenhum se tudo permanecer como cadeia de caracteres) irão para _rescued_data, e todas as novas colunas falharão na transmissão e evoluirão o esquema.

Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
.option("cloudFiles.schemaLocation", "<path-to-schema-location>") \
.load("<path-to-source-data>") \
.writeStream \
.option("mergeSchema", "true") \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")

Evite a perda de dados em dados bem estruturados

Quando o senhor conhece o esquema, mas quer saber sempre que receber dados inesperados, a Databricks recomenda o uso do rescuedDataColumn.

Python
spark.readStream.format("cloudFiles") \
.schema(expected_schema) \
.option("cloudFiles.format", "json") \
# will collect all new fields as well as data type mismatches in _rescued_data
.option("cloudFiles.schemaEvolutionMode", "rescue") \
.load("<path-to-source-data>") \
.writeStream \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")

Se quiser que a transmissão interrompa o processamento se for introduzido um novo campo que não corresponda ao seu esquema, o senhor pode adicionar:

Python
.option("cloudFiles.schemaEvolutionMode", "failOnNewColumns")

Permitir um pipeline de dados semiestruturado e flexível

Quando o senhor recebe dados de um fornecedor que introduz novas colunas nas informações que ele fornece, pode não saber exatamente quando ele faz isso ou pode não ter a largura de banda necessária para atualizar seu pipeline de dados. Agora, o senhor pode aproveitar a evolução do esquema para reiniciar a transmissão e permitir que o site Auto Loader atualize o esquema inferido automaticamente. Você também pode aproveitar schemaHints para alguns dos campos “sem esquema” que o fornecedor pode estar fornecendo.

Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
# will ensure that the headers column gets processed as a map
.option("cloudFiles.schemaHints",
"headers map<string,string>, statusCode SHORT") \
.load("/api/requests") \
.writeStream \
.option("mergeSchema", "true") \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")

Transformar dados JSON aninhados

Como o site Auto Loader infere as colunas JSON de nível superior como strings, o senhor pode ficar com objetos JSON aninhados que exigem transformações adicionais. O senhor pode usar as APIs de acesso a dados semiestruturados para transformar ainda mais o conteúdo JSON complexo.

Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
# The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
.load("<source-data-with-nested-json>") \
.selectExpr(
"*",
"tags:page.name", # extracts {"tags":{"page":{"name":...}}}
"tags:page.id::int", # extracts {"tags":{"page":{"id":...}}} and casts to int
"tags:eventType" # extracts {"tags":{"eventType":...}}
)

Inferir dados JSON aninhados

Quando você tem dados aninhados, você pode usar a opção cloudFiles.inferColumnTypes para inferir a estrutura aninhada de seus dados e outros tipos de coluna.

Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
# The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
.option("cloudFiles.inferColumnTypes", "true") \
.load("<source-data-with-nested-json>")

Carregar arquivos CSV sem cabeçalhos

Python
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "csv") \
.option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
.schema(<schema>) \ # provide a schema here for the files
.load(<path>)

Aplicar um esquema em arquivos CSV com cabeçalhos

Python
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "csv") \
.option("header", "true") \
.option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
.schema(<schema>) \ # provide a schema here for the files
.load(<path>)

Ingerir dados binários ou de imagem no Delta Lake para ML

Depois que os dados são armazenados em Delta Lake, o senhor pode executar a inferência distribuída nos dados. Consulte Realizar inferência distribuída usando Pandas UDF .

Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "binaryFile") \
.load("<path-to-source-data>") \
.writeStream \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")

Sintaxe do Auto Loader para DLT

O DLT fornece uma sintaxe Python ligeiramente modificada para o Auto Loader e adiciona suporte SQL para o Auto Loader.

Os exemplos a seguir utilizam o Auto Loader para criar conjuntos de dados a partir de arquivos CSV e JSON:

Python
@dlt.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/databricks-datasets/retail-org/customers/")
)

@dlt.table
def sales_orders_raw():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders/")
)

O senhor pode usar as opções de formato compatíveis com o Auto Loader. As opções para read_files são par key-value. Para obter detalhes sobre formatos e opções compatíveis, consulte Opções.

Por exemplo:

SQL
CREATE OR REFRESH STREAMING TABLE my_table
AS SELECT *
FROM STREAM read_files(
"/Volumes/my_volume/path/to/files/*",
option-key => option-value,
...
)

O exemplo a seguir lê dados de arquivos CSV delimitados por tabulação com um cabeçalho:

SQL
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM STREAM read_files(
"/databricks-datasets/retail-org/customers/",
format => "csv",
delimiter => "\t",
header => "true"
)

Você pode usar o schema para especificar o formato manualmente; você deve especificar o schema para formatos que não suportam inferência de esquema:

Python
@dlt.table
def wiki_raw():
return (
spark.readStream.format("cloudFiles")
.schema("title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
.option("cloudFiles.format", "parquet")
.load("/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet")
)
nota

O DLT configura e gerencia automaticamente o esquema e os diretórios de ponto de verificação ao usar o site Auto Loader para ler arquivos. Entretanto, se o senhor configurar manualmente qualquer um desses diretórios, a execução de um refresh completo não afetará o conteúdo dos diretórios configurados. A Databricks recomenda usar os diretórios configurados automaticamente para evitar efeitos colaterais inesperados durante o processamento.