Padrões comuns de carregamento de dados
Auto Loader simplifica uma série de tarefas comuns de ingestão de dados. Essa referência rápida fornece exemplos de vários padrões populares.
Ingira dados do armazenamento de objetos na nuvem como variante
O Auto Loader pode carregar todos os dados das fontes de arquivos compatíveis como uma única coluna VARIANT em uma tabela de destino. Como o site VARIANT é flexível para alterações de esquema e tipo e mantém a sensibilidade a maiúsculas e minúsculas e os valores NULL presentes na fonte de dados, esse padrão é robusto para a maioria dos cenários de ingestão. Para obter detalhes, consulte Ingerir dados do armazenamento de objetos na nuvem como variante.
Filtrando diretórios ou arquivos usando padrões globais
Os padrões Glob podem ser usados para filtrar diretórios e arquivos quando fornecidos no caminho.
Padrão | Descrição |
|---|---|
| Corresponde a qualquer caractere único |
| Corresponde a zero ou mais caracteres |
| Corresponde a um único caractere do conjunto de caracteres {a, b, c}. |
| Corresponde a um único caractere do intervalo de caracteres {a... z}. |
| Corresponde a um único caractere que não é do conjunto de caracteres ou do intervalo {a}. Observe que o caractere |
| Corresponde a uma cadeia de caracteres do conjunto de cadeias de caracteres {ab, cd}. |
| Corresponde a uma cadeia de caracteres do conjunto de cadeias de caracteres {ab, cde, cfh}. |
Use o path para fornecer padrões de prefixo, por exemplo:
- Python
- Scala
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", <format>) \
.schema(schema) \
.load("<base-path>/*/files")
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", <format>)
.schema(schema)
.load("<base-path>/*/files")
Você deve usar a opção pathGlobFilter para fornecer explicitamente padrões de sufixo. O path fornece apenas um filtro de prefixo. Por exemplo, se você quiser analisar apenas png arquivos em um diretório que contém arquivos com sufixos diferentes, você pode fazer o seguinte:
- Python
- Scala
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "binaryFile") \
.option("pathGlobfilter", "*.png") \
.load(<base-path>)
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "binaryFile")
.option("pathGlobfilter", "*.png")
.load(<base-path>)
O comportamento de globbing do default do Auto Loader é diferente do comportamento do default de outras fontes de arquivos do Spark. Adicione .option("cloudFiles.useStrictGlobber", "true") à sua leitura para usar o globbing que corresponde ao comportamento de default Spark em relação às fontes de arquivos. Consulte a tabela a seguir para obter mais informações sobre globbing:
Padrão | Caminho do arquivo | padrão globber | Rigoroso globber |
|---|---|---|---|
/a/b | /a/b/c/file.txt | Sim | Sim |
/a/b | /a/b_dir/c/file.txt | Não | Não |
/a/b | /a/b.txt | Não | Não |
/a/b/ | /a/b.txt | Não | Não |
/a/ */c/ | /a/b/c/file.txt | Sim | Sim |
/a/ */c/ | /a/b/c/d/file.txt | Sim | Sim |
/a/ */c/ | /a/b/x/y/c/file.txt | Sim | Não |
/a/ */c | /a/b/c_file.txt | Sim | Não |
/a/ */c/ | /a/b/c_file.txt | Sim | Não |
/a/ */c/ | /a/ */cookie/file.txt | Sim | Não |
/a/b * | /a/b.txt | Sim | Sim |
/a/b * | /a/b/file.txt | Sim | Sim |
/a/ {0.txt ,1.txt} | /a/0.txt | Sim | Sim |
/a/ */ {0.txt ,1.txt} | /a/0.txt | Não | Não |
/a/b/ [cde-h] /i/ | /a/b/c/i/file.txt | Sim | Sim |
Permitir ETL fácil
Uma maneira fácil de colocar seus dados no Delta Lake sem perder nenhum dado é usar o seguinte padrão e ativar a inferência de esquema com o Auto Loader. Databricks recomenda a execução do código a seguir em um trabalho Databricks para que ele reinicie automaticamente a transmissão quando o esquema dos dados de origem for alterado. Em default, o esquema é inferido como tipos de cadeia de caracteres, todos os erros de análise (não deve haver nenhum se tudo permanecer como cadeia de caracteres) irão para _rescued_data, e todas as novas colunas falharão na transmissão e evoluirão o esquema.
- Python
- Scala
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
.option("cloudFiles.schemaLocation", "<path-to-schema-location>") \
.load("<path-to-source-data>") \
.writeStream \
.option("mergeSchema", "true") \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "<path-to-schema-location>")
.load("<path-to-source-data>")
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Evite a perda de dados em dados bem estruturados
Quando você conhece seu esquema, mas deseja capturar dados inesperados, o Databricks recomenda usar o rescuedDataColumn.
- Python
- Scala
spark.readStream.format("cloudFiles") \
.schema(expected_schema) \
.option("cloudFiles.format", "json") \
# will collect all new fields as well as data type mismatches in _rescued_data
.option("cloudFiles.schemaEvolutionMode", "rescue") \
.load("<path-to-source-data>") \
.writeStream \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
spark.readStream.format("cloudFiles")
.schema(expected_schema)
.option("cloudFiles.format", "json")
// will collect all new fields as well as data type mismatches in _rescued_data
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Se quiser que a transmissão interrompa o processamento se for introduzido um novo campo que não corresponda ao seu esquema, o senhor pode adicionar:
.option("cloudFiles.schemaEvolutionMode", "failOnNewColumns")
Permitir um pipeline de dados semiestruturado e flexível
Ao receber dados de um fornecedor que introduz novas colunas às informações fornecidas, você pode não estar ciente exatamente de quando isso ocorre, ou pode não ter a capacidade necessária para atualizar seu pipeline de dados. Agora você pode aproveitar a evolução do esquema para reiniciar a transmissão e deixar Auto Loader atualizar o esquema inferido automaticamente. Você também pode usar schemaHints para alguns dos campos "sem esquema" que o fornecedor pode estar fornecendo.
- Python
- Scala
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
# will ensure that the headers column gets processed as a map
.option("cloudFiles.schemaHints",
"headers map<string,string>, statusCode SHORT") \
.load("/api/requests") \
.writeStream \
.option("mergeSchema", "true") \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
// will ensure that the headers column gets processed as a map
.option("cloudFiles.schemaHints",
"headers map<string,string>, statusCode SHORT")
.load("/api/requests")
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Transformar dados JSON aninhados
Como o site Auto Loader infere as colunas JSON de nível superior como strings, o senhor pode ficar com objetos JSON aninhados que exigem transformações adicionais. O senhor pode usar as APIs de acesso a dados semiestruturados para transformar ainda mais o conteúdo JSON complexo.
- Python
- Scala
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
# The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
.load("<source-data-with-nested-json>") \
.selectExpr(
"*",
"tags:page.name", # extracts {"tags":{"page":{"name":...
"tags:page.id::int", # extracts {"tags":{"page":{"id":... and casts to int
"tags:eventType" # extracts {"tags":{"eventType":...}}
)
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
// The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.load("<source-data-with-nested-json>")
.selectExpr(
"*",
"tags:page.name", // extracts {"tags":{"page":{"name":...
"tags:page.id::int", // extracts {"tags":{"page":{"id":... and casts to int
"tags:eventType" // extracts {"tags":{"eventType":...}}
)
Inferir dados JSON aninhados
Quando você tem dados aninhados, você pode usar a opção cloudFiles.inferColumnTypes para inferir a estrutura aninhada de seus dados e outros tipos de coluna.
- Python
- Scala
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
# The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
.option("cloudFiles.inferColumnTypes", "true") \
.load("<source-data-with-nested-json>")
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
// The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.option("cloudFiles.inferColumnTypes", "true")
.load("<source-data-with-nested-json>")
Carregar arquivos CSV sem cabeçalhos
O exemplo a seguir mostra como carregar arquivos CSV sem cabeçalhos usando o Auto Loader. Use rescuedDataColumn para capturar quaisquer dados que não correspondam ao esquema fornecido.
- Python
- Scala
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "csv") \
.option("rescuedDataColumn", "_rescued_data") \ # ensure that you don't lose data
.schema(<schema>) \ # provide a schema here for the files
.load(<path>)
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
.schema(<schema>) // provide a schema here for the files
.load(<path>)
Aplicar um esquema em arquivos CSV com cabeçalhos
O exemplo a seguir mostra como impor um esquema em arquivos CSV que incluem cabeçalhos. Use rescuedDataColumn para capturar quaisquer dados que não correspondam ao esquema fornecido.
- Python
- Scala
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "csv") \
.option("header", "true") \
.option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
.schema(<schema>) \ # provide a schema here for the files
.load(<path>)
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("header", "true")
.option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
.schema(<schema>) // provide a schema here for the files
.load(<path>)
Ingerir dados binários ou de imagem no Delta Lake para ML
Depois que os dados são armazenados em Delta Lake, o senhor pode executar a inferência distribuída nos dados. Consulte Realizar inferência distribuída usando Pandas UDF .
- Python
- Scala
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "binaryFile") \
.load("<path-to-source-data>") \
.writeStream \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "binaryFile")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
SintaxeAuto Loader para o pipeline declarativo LakeFlow Spark
O pipeline declarativo LakeFlow Spark fornece uma sintaxe Python ligeiramente modificada para Auto Loader e adiciona suporte SQL para Auto Loader. Os exemplos a seguir utilizam Auto Loader para criar conjuntos de dados a partir de arquivos JSON usando o dataset de exemplo de reservas de viagens da Wanderbricks :
- Python
- SQL
@dp.table
def booking_updates():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("multiLine", "true")
.load("/Volumes/my_catalog/my_schema/my_volume/wanderbricks/booking_updates")
)
@dp.table
def reviews():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("multiLine", "true")
.load("/Volumes/my_catalog/my_schema/my_volume/wanderbricks/reviews")
)
CREATE OR REFRESH STREAMING TABLE booking_updates
AS SELECT * FROM STREAM read_files(
"/Volumes/my_catalog/my_schema/my_volume/wanderbricks/booking_updates",
format => "json",
multiLine => true
)
CREATE OR REFRESH STREAMING TABLE reviews
AS SELECT * FROM STREAM read_files(
"/Volumes/my_catalog/my_schema/my_volume/wanderbricks/reviews",
format => "json",
multiLine => true
)
O senhor pode usar as opções de formato compatíveis com o Auto Loader. As opções para read_files são par key-value. Para obter detalhes sobre formatos e opções compatíveis, consulte Opções.
CREATE OR REFRESH STREAMING TABLE my_table
AS SELECT *
FROM STREAM read_files(
"/Volumes/my_volume/path/to/files/*",
option-key => option-value,
...
)
O exemplo a seguir lê arquivos JSON com várias linhas e com a inferência de tipo de coluna ativada:
CREATE OR REFRESH STREAMING TABLE booking_updates
AS SELECT * FROM STREAM read_files(
"/Volumes/my_catalog/my_schema/my_volume/wanderbricks/booking_updates",
format => "json",
multiLine => true,
inferColumnTypes => true
)
Você pode usar o schema para especificar o formato manualmente; você deve especificar o schema para formatos que não suportam inferência de esquema:
- Python
- SQL
@dp.table
def booking_updates_raw():
return (
spark.readStream.format("cloudFiles")
.schema("booking_id LONG, booking_update_id LONG, user_id LONG, property_id LONG, status STRING, guests_count INT, total_amount DOUBLE, check_in DATE, check_out DATE, created_at TIMESTAMP, updated_at TIMESTAMP")
.option("cloudFiles.format", "json")
.option("multiLine", "true")
.load("/Volumes/my_catalog/my_schema/my_volume/wanderbricks/booking_updates")
)
CREATE OR REFRESH STREAMING TABLE booking_updates_raw
AS SELECT *
FROM STREAM read_files(
"/Volumes/my_catalog/my_schema/my_volume/wanderbricks/booking_updates",
format => "json",
multiLine => true,
schema => "booking_id LONG, booking_update_id LONG, user_id LONG, property_id LONG, status STRING, guests_count INT, total_amount DOUBLE, check_in DATE, check_out DATE, created_at TIMESTAMP, updated_at TIMESTAMP"
)
O pipeline declarativo LakeFlow Spark configura e gerencia automaticamente os diretórios de esquema e de ponto de verificação ao usar Auto Loader para ler arquivos. No entanto, se você configurar manualmente qualquer um desses diretórios, realizar uma refresh completa não afetará o conteúdo dos diretórios configurados. A Databricks recomenda o uso dos diretórios configurados automaticamente para evitar efeitos colaterais inesperados durante o processamento.