Ampliação automática de tipos com Auto Loader
Visualização
Este recurso está em Pré-visualização Pública no Databricks Runtime 16.4 e versões superiores.
Auto Loader processa de forma incremental e eficiente novos arquivos de dados à medida que chegam ao armazenamento cloud . Isso também reduz a manutenção do pipeline, lidando automaticamente com alterações complexas de esquema. Por exemplo, você pode configurar o Auto Loader para detectar automaticamente o esquema dos dados carregados, permitindo inicializar tabelas sem declarar explicitamente o esquema dos dados. Você também pode evoluir o esquema da tabela à medida que novas colunas são introduzidas, eliminando a necessidade de rastrear e aplicar manualmente as alterações de esquema ao longo do tempo. O Auto Loader pode até mesmo recuperar dados inesperados (por exemplo, devido a tipos de dados diferentes) em uma coluna de dados recuperados, ajudando você a evitar a perda de dados.
No entanto, a coluna de dados recuperada exige que você lide manualmente com quaisquer alterações no tipo de dados.
Para lidar automaticamente com algumas dessas alterações de tipo de dados, use a ampliação de tipo no Auto Loader. O Delta Lake agora suporta alterações de ampliação de diferentes tipos de dados sem exigir a reescrita de dados ou intervenção do usuário. Veja alargamento do tipo Delta Lake. O novo modo de evolução do esquema, addNewColumnsWithTypeWidening, evolui automaticamente o esquema em alterações de tipo de dados compatíveis.
Você pode expandir tipos primitivos como int para long, float para double e muito mais. O alargamento de tipos está disponível para todos os formatos de arquivo com suporte a evolução do esquema no Auto Loader. Isso inclui formatos de texto (como JSON, CSV ou XML) e formatos binários (como Avro ou Parquet). Não há mudança no comportamento de evolução do esquema para os modos de evolução do esquema existentes (como addNewColumns, rescue, failOnNewColumns ou none).
Alterações de tipo suportadas
As seguintes alterações de tipo são suportadas:
Tipo de origem | Tipos mais amplos suportados |
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Ao expandir qualquer tipo numérico para decimal, o Auto Loader expande para decimal com precisão igual ou maior que a precisão inicial. Ao aumentar a escala, a precisão total aumenta na mesma proporção.
A precisão inicial dos integer types é a seguinte:
Tipo | Precisão inicial |
|---|---|
|
|
|
|
|
|
|
|
Por exemplo, se o tipo atual de uma coluna for int e um arquivo com o tipo dessa coluna como decimal(5, 2) for lido, o Auto Loader amplia o tipo dessa coluna para decimal(12, 2).
Pré-requisitos
Para usar a ampliação de tipos com o Auto Loader, você deve atender aos seguintes requisitos:
- Use Databricks Runtime 16.4 ou acima.
- Se o destino de gravação for uma tabela Delta Lake, habilite a ampliação de tipo para a tabela Delta Lake usando um dos seguintes métodos:
-
Se estiver usando uma tabela existente:
SQLALTER TABLE <table_name> SET TBLPROPERTIES ('delta.enableTypeWidening' = 'true') -
Se estiver criando uma nova tabela com a ampliação de tipo ativada:
SQLCREATE TABLE T(c1 INT) TBLPROPERTIES('delta.enableTypeWidening' = 'true')
-
Para obter mais informações sobre a ampliação de tipos nas tabelas Delta Lake , consulte Ampliação de tipos.
Habilitar a ampliação de tipos com a evolução do esquema
Para usar o alargamento de tipo com Auto Loader, especifique addNewColumnsWithTypeWidening ao usar a evolução do esquema. O Auto Loader detecta a adição de novas colunas e alterações de tipo à medida que processa seus dados.
- Python
- Scala
query = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("cloudFiles.inferColumnTypes", True)
.option("cloudFiles.schemaLocation", <schemaPath>)
.option("cloudFiles.schemaEvolutionMode", "addNewColumnsWithTypeWidening")
.load(<inputPath>)
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", <checkpointPath>)
.trigger(availableNow=True)
.toTable("table_name")
)
val query = spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("cloudFiles.inferColumnTypes", true)
.option("cloudFiles.schemaLocation", <schemaPath>)
.option("cloudFiles.schemaEvolutionMode", "addNewColumnsWithTypeWidening")
.load(<inputPath>)
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", <checkpointPath>)
.trigger(Trigger.AvailableNow())
.toTable("table_name")
Quando Auto Loader detecta uma nova coluna ou uma mudança de tipo que é suportada pela ampliação de tipo, a transmissão para com um UnknownFieldException. Antes que sua transmissão apresente esse erro, Auto Loader realiza a inferência de esquema nos microlotes de dados mais recentes e atualiza a localização do esquema com o esquema mais recente, ampliando as colunas existentes ou mesclando novas colunas ao final do esquema.
Comportamento da evolução do esquema em relação às mudanças no tipo de dados
Se você importar um CSV com o seguinte conteúdo, o Auto Loader inferirá o esquema como STRUCT<id INT, name STRING, _rescued_data STRING>.
id, name
1, John
2, Mary
A tabela de destino tem a seguinte aparência:
| id | nome | _dados_resgatados | | -- | ---- | --------------- | | 1 | João | NULO | | 2 | Maria | NULO |
Agora, importe outro arquivo CSV onde os valores na coluna id sejam mais largos que o tipo INT :
id, name, age
2147483648, Bob, 25
A tabela a seguir explica o comportamento e a saída com diferentes modos de evolução do esquema no Auto Loader:
Mode | Comportamento com alteração de tipo de dados alargável suportada |
|---|---|
| O tipo de dados não evolui e a transmissão não falha devido à mudança do tipo de dados. As colunas com valores incompatíveis em relação ao tipo são definidas como |
| O esquema não evolui e as transmissões não falham devido a quaisquer alterações no esquema. As colunas com valores incompatíveis em relação ao tipo são definidas como |
| O tipo de dados não evolui e a transmissão não falha devido à mudança do tipo de dados. As colunas com valores incompatíveis em relação ao tipo são definidas como |
| Não evolui o esquema, novas colunas são ignoradas e os dados não são resgatados a menos que a opção |
| A transmissão falhou. Novas colunas foram adicionadas ao esquema e as alterações de tipo de dados suportadas foram ampliadas. Alterações de tipo de dados não suportadas (por exemplo, |
Resultados de exemplo
A tabela a seguir mostra o esquema inferido e os valores para cada modo de evolução do esquema após a ingestão do segundo arquivo CSV:
Mode | Esquema e valores inferidos | ||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| Esquema : Valores :
| ||||||||||||||||
| Esquema : Valores :
| ||||||||||||||||
| Esquema : Valores :
| ||||||||||||||||
| Esquema : Valores :
| ||||||||||||||||
| Esquema : Valores :
|
Limitações
- A opção
prefersDecimalnão pode ser definida comofalseao usaraddNewColumnsWithTypeWidening. QuandoaddNewColumnsWithTypeWideningé especificado, o valor default deprefersDecimalétrue. dateA ampliação paratimestampNTZsó é suportada para arquivos Parquet.