Ampliação de tipo
Disponível em tabelas do Delta Lake no Databricks Runtime 15.4 LTS e acima, o alargamento de tipo permite que você altere os tipos de dados da coluna para um tipo mais amplo sem reescrever arquivos de dados.
Todas as tabelas gerenciadas do Unity Catalog usam Delta Lake por default. Consulte Tabelas gerenciadas do Unity Catalog para Delta Lake e Apache Iceberg.
Habilitar a ampliação de tipo atualiza os protocolos de leitura e gravação. Isso pode afetar a compatibilidade com clientes externos do Delta Lake. Consulte a compatibilidade de recursos e protocolos do Delta Lake.
Tabelas com ampliação de tipo habilitada só podem ser lidas pelo Databricks Runtime 15.4 LTS e acima.
Tipos de alterações suportados
É possível ampliar tipos de acordo com as seguintes regras:
Tipo de origem | Tipos mais abrangentes compatíveis |
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| Qualquer tipo |
Alterações de tipo são compatíveis para colunas de nível superior e campos aninhados dentro de estruturas, mapas e matrizes.
VOID para qualquer tipo não exige que a ampliação de tipo esteja habilitada na tabela. Qualquer operação que atualiza o tipo de uma coluna VOID é bem-sucedida sem configuração adicional. VOID ampliação de tipo está disponível no Databricks Runtime 18,2 ou acima.
Comportamento decimal
Por default, o Spark trunca a parte fracionária de um valor quando uma operação promove um tipo inteiro para um decimal ou double e uma ingestão downstream grava o valor de volta em uma coluna de inteiro. Para obter detalhes sobre o comportamento da política de atribuição, consulte Atribuição da loja.
Ao alterar qualquer tipo numérico para decimal, a precisão total deve ser igual ou maior que a precisão inicial. Se também se aumentar a escala, a precisão total deve aumentar em uma quantidade correspondente.
O destino mínimo para os tipos byte, short e int é decimal(10,0). O alvo mínimo para long é decimal(20,0).
Se você deseja adicionar duas casas decimais a um campo com decimal(10,1), o destino mínimo é decimal(12,3).
Habilitar ampliação de tipo
Habilitar a ampliação de tipo atualiza os protocolos de leitura e gravação. Isso pode afetar a compatibilidade com clientes externos do Delta Lake. Consulte a compatibilidade de recursos e protocolos do Delta Lake.
É possível habilitar o alargamento de tipo em uma tabela existente definindo a propriedade da tabela delta.enableTypeWidening como true:
ALTER TABLE <table_name> SET TBLPROPERTIES ('delta.enableTypeWidening' = 'true')
Você também pode ativar a ampliação de tipo durante a criação da tabela:
CREATE TABLE T(c1 INT) TBLPROPERTIES('delta.enableTypeWidening' = 'true')
Aplicar manualmente uma alteração de tipo
Use o comando ALTER COLUMN para alterar tipos manualmente:
ALTER TABLE <table_name> ALTER COLUMN <col_name> TYPE <new_type>
Esta operação atualiza o esquema da tabela sem reescrever os arquivos de dados subjacentes. Consulte ALTER TABLE para mais detalhes.
Ampliar tipos com evolução automática do esquema
Use a evolução do esquema com ampliação de tipo para atualizar os tipos de dados em tabelas de destino para corresponder ao tipo de dados de entrada.
Sem a ampliação de tipo habilitada, a evolução do esquema sempre tenta fazer o downcast dos dados para corresponder aos tipos de coluna na tabela de destino. Se não quiser ampliar automaticamente os tipos de dados em suas tabelas de destino, você deve desativar a ampliação de tipo antes da execução de cargas de trabalho com a evolução do esquema habilitada.
Para usar a evolução do esquema para ampliar o tipo de dado de uma coluna durante a ingestão, é preciso atender às seguintes condições:
- O comando de gravação é executado com a evolução automática do esquema habilitada.
- A tabela de destino está com a ampliação de tipo ativada.
- O tipo de coluna de origem é mais amplo que o tipo de coluna de destino.
- Ampliação de tipo suporta a mudança de tipo.
Incompatibilidades de tipo que não atendem a todas essas condições seguem as regras normais de imposição de esquema. See imposição de esquema.
Exemplo
Os exemplos a seguir demonstram como a ampliação de tipo funciona com a evolução do esquema.
- Python
- Scala
- SQL
Crie uma tabela de destino com uma coluna INT e uma tabela de origem com uma coluna BIGINT:
spark.sql("CREATE TABLE target_table (id INT, data STRING) TBLPROPERTIES ('delta.enableTypeWidening' = 'true')")
spark.sql("CREATE TABLE source_table (id BIGINT, data STRING)")
Use saveAsTable() com a evolução do esquema para alargar automaticamente a coluna INT para BIGINT durante um acréscimo:
spark.table("source_table").write.mode("append").option("mergeSchema", "true").saveAsTable("target_table")
Use MERGE INTO com evolução do esquema:
from delta.tables import DeltaTable
source_df = spark.table("source_table")
target_table = DeltaTable.forName(spark, "target_table")
(target_table.alias("target")
.merge(source_df.alias("source"), "target.id = source.id")
.withSchemaEvolution()
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
Crie uma tabela de destino com uma coluna INT e uma tabela de origem com uma coluna BIGINT:
spark.sql("CREATE TABLE target_table (id INT, data STRING) TBLPROPERTIES ('delta.enableTypeWidening' = 'true')")
spark.sql("CREATE TABLE source_table (id BIGINT, data STRING)")
Use saveAsTable() com a evolução do esquema para alargar automaticamente a coluna INT para BIGINT durante um acréscimo:
spark.table("source_table").write.mode("append").option("mergeSchema", "true").saveAsTable("target_table")
Use MERGE INTO com evolução do esquema:
import io.delta.tables.DeltaTable
val sourceDf = spark.table("source_table")
val targetTable = DeltaTable.forName(spark, "target_table")
targetTable.alias("target")
.merge(sourceDf.alias("source"), "target.id = source.id")
.withSchemaEvolution()
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
Crie uma tabela de destino com uma coluna INT e uma tabela de origem com uma coluna BIGINT:
CREATE TABLE target_table (id INT, data STRING) TBLPROPERTIES ('delta.enableTypeWidening' = 'true');
CREATE TABLE source_table (id BIGINT, data STRING);
Use INSERT INTO com a evolução do esquema para alargar automaticamente a coluna INT para BIGINT durante um acréscimo:
INSERT WITH SCHEMA EVOLUTION INTO target_table SELECT * FROM source_table;
Use MERGE INTO com evolução do esquema:
MERGE WITH SCHEMA EVOLUTION INTO target_table
USING source_table
ON target_table.id = source_table.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
Auto Loader
Visualização
O suporte para ampliação de tipo no Auto Loader está em Public Preview.
O Auto Loader oferece compatibilidade com o alargamento de tipo com a evolução automática do esquema. Quando se usa o Auto Loader para ingerir dados em uma tabela Delta Lake com a ampliação de tipo e a evolução do esquema habilitadas, os tipos de coluna são automaticamente ampliados para corresponder aos dados recebidos.
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "<path-to-schema-location>")
.load("<path-to-source-data>")
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", "<path-to-checkpoint>")
.trigger(availableNow=True)
.toTable("table_name")
)
Consulte Alargamento Automático de Tipo com o Auto Loader. Além disso, a tabela de destino deve ter a ampliação de tipo habilitada. Consulte Habilitar ampliação de tipo.
Desativar o recurso de tabela de ampliação de tipo
Pode-se evitar a ampliação de tipo acidental em tabelas habilitadas ao definir a propriedade para false:
ALTER TABLE <table_name> SET TBLPROPERTIES ('delta.enableTypeWidening' = 'false')
Essa configuração impede futuras alterações de tipo na tabela, mas não remove o recurso de tabela de ampliação de tipo nem desfaz alterações de tipo anteriores.
Se precisar remover completamente os recursos de tabela de ampliação de tipo, você pode utilizar o comando DROP FEATURE como mostrado no exemplo a seguir:
ALTER TABLE <table-name> DROP FEATURE 'typeWidening' [TRUNCATE HISTORY]
Tabelas que habilitaram o alargamento de tipo usando o Databricks Runtime 15.4 LTS exigem que você descarte o recurso typeWidening-preview em vez disso.
Ao descartar o alargamento de tipo, o Databricks reescreve todos os arquivos de dados que não estão em conformidade com o esquema de tabela atual. Consulte Remover um recurso de tabela do Delta Lake e fazer downgrade do protocolo da tabela.
Transmissão de uma tabela Delta Lake
Suporte para ampliação de tipo em transmissão estructurada está disponível no Databricks Runtime 16.4 LTS e acima.
Ao transmitir de uma tabela Delta Lake com a expansão de tipo habilitada, é possível configurar a expansão de tipo automática para consultas de transmissão habilitando a evolução do esquema com a opção mergeSchema na tabela de destino. A tabela de destino deve ter ampliação de tipo habilitada. Consulte Habilitar a ampliação de tipo.
- Python
- Scala
(spark.readStream
.table("delta_source_table")
.writeStream
.option("checkpointLocation", "/path/to/checkpointLocation")
.option("mergeSchema", "true")
.toTable("output_table")
)
spark.readStream
.table("delta_source_table")
.writeStream
.option("checkpointLocation", "/path/to/checkpointLocation")
.option("mergeSchema", "true")
.toTable("output_table")
Quando mergeSchema está ativado e a tabela de destino tem a ampliação de tipo ativada:
- As alterações de tipo são aplicadas automaticamente à tabela downstream sem a necessidade de intervenção manual.
- Novas colunas são adicionadas automaticamente ao esquema da tabela a jusante.
Sem mergeSchema ativado, os valores são tratados de acordo com a configuração spark.sql.storeAssignmentPolicy, que por default rebaixa os valores para corresponder ao tipo da coluna de destino. Para obter mais informações sobre o comportamento da política de atribuição, consulte Atribuição de armazenamento.
Lidar com alterações de tipo em uma transmissão
Ao transmitir de uma tabela Delta Lake, é possível fornecer um local de acompanhamento de esquema para acompanhar as alterações de esquema não aditivas, incluindo alterações de tipo. É obrigatório fornecer um local de acompanhamento de esquema no Databricks Runtime 18.0 e abaixo, e é opcional no Databricks Runtime 18.1 e acima.
Não é possível definir um schemaTrackingLocation usando SQL. Consulte Recursos sem suporte.
schemaTrackingLocation deve ser definido como um local dentro do mesmo caminho que o seu checkpoint de transmissão. Por exemplo:
- Python
- Scala
checkpoint_path = "/path/to/checkpointLocation"
(spark.readStream
.option("schemaTrackingLocation", checkpoint_path)
.table("delta_source_table")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("output_table")
)
val checkpointPath = "/path/to/checkpointLocation"
spark.readStream
.option("schemaTrackingLocation", checkpointPath)
.table("delta_source_table")
.writeStream
.option("checkpointLocation", checkpointPath)
.toTable("output_table")
Após definir um local de acompanhamento de esquema, a transmissão evolui seu esquema acompanhado quando detecta uma mudança de tipo e, em seguida, para. Nesse momento, é preciso lidar com a mudança de tipo, como a ativação da ampliação de tipo na tabela downstream ou a atualização da consulta de transmissão.
Para retomar o processamento, defina a configuração do Spark spark.databricks.delta.streaming.allowSourceColumnTypeChange ou a opção de leitor DataFrame allowSourceColumnTypeChange, como no exemplo a seguir:
- Python
- Scala
- SQL
checkpoint_path = "/path/to/checkpointLocation"
(spark.readStream
.option("schemaTrackingLocation", checkpoint_path)
.option("allowSourceColumnTypeChange", "<delta_source_table_version>")
# alternatively to allow all future type changes for this stream:
# .option("allowSourceColumnTypeChange", "always")
.table("delta_source_table")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("output_table")
)
val checkpointPath = "/path/to/checkpointLocation"
spark.readStream
.option("schemaTrackingLocation", checkpointPath)
.option("allowSourceColumnTypeChange", "<delta_source_table_version>")
// alternatively to allow all future type changes for this stream:
// .option("allowSourceColumnTypeChange", "always")
.table("delta_source_table")
.writeStream
.option("checkpointLocation", checkpointPath)
.toTable("output_table")
-- To unblock for this particular stream just for this series of schema change(s):
SET spark.databricks.delta.streaming.allowSourceColumnTypeChange.ckpt_<checkpoint_id> = "<delta_source_table_version>"
-- To unblock for this particular stream:
SET spark.databricks.delta.streaming.allowSourceColumnTypeChange = "<delta_source_table_version>"
-- To unblock for all streams:
SET spark.databricks.delta.streaming.allowSourceColumnTypeChange = "always"
Quando a transmissão para, uma mensagem de erro exibe o ID do ponto de verificação <checkpoint_id> e a versão da tabela de origem do Delta Lake <delta_source_table_version>.
Para uma lista completa de opções de transmissão do Delta Lake, consulte Delta Lake.
Lakeflow Spark Declarative Pipelines
É possível habilitar a expansão de tipo para os Pipelines Declarativos do Lakeflow Spark em nível de pipeline ou para tabelas individuais. A ampliação de tipo permite que os tipos de coluna sejam automaticamente ampliados durante a execução do pipeline sem exigir um refresh completo das tabelas de transmissão. Alterações de tipo em visualizações materializadas sempre acionam um recalculamento completo, e quando uma alteração de tipo é aplicada a uma tabela de origem, as visualizações materializadas que dependem dessa tabela requerem um recalculamento completo para refletir os novos tipos.
Habilitar ampliação de tipo para um pipeline inteiro
Para habilitar a ampliação de tipo para todas as tabelas em um pipeline, defina a configuração do pipeline pipelines.enableTypeWidening:
- JSON
- YAML
{
"configuration": {
"pipelines.enableTypeWidening": "true"
}
}
configuration:
pipelines.enableTypeWidening: 'true'
Habilitar ampliação de tipo para tabelas específicas
Você também pode ativar a ampliação de tipo para tabelas individuais definindo a propriedade da tabela delta.enableTypeWidening:
- Python
- SQL
import dlt
@dlt.table(
table_properties={"delta.enableTypeWidening": "true"}
)
def my_table():
return spark.readStream.table("source_table")
CREATE OR REFRESH STREAMING TABLE my_table
TBLPROPERTIES ('delta.enableTypeWidening' = 'true')
AS SELECT * FROM source_table
Compatibilidade com leitores downstream
Tabelas com ampliação de tipo habilitada só podem ser lidas no Databricks Runtime 15.4 LTS e acima. Se você quiser uma tabela com ampliação de tipo ativada em seu pipeline para ser legível por leitores no Databricks Runtime 14.3 ou abaixo, você deve:
- Desative a ampliação de tipo removendo a propriedade
delta.enableTypeWidening/pipelines.enableTypeWideningou definindo-a como false, e acione um refresh completo da tabela. - Habilitar Compatibility Mode na sua tabela.
OpenSharing
O suporte para Type Widening no OpenSharing está disponível no Databricks Runtime 16.1e acima.
O compartilhamento de uma tabela do Delta Lake com a ampliação de tipo habilitada é compatível no Databricks-to-Databricks OpenSharing. O provedor e o destinatário devem estar no Databricks Runtime 16.1 e acima.
Para ler o feed de dados de alteração de uma tabela Delta Lake com ampliação de tipo habilitada usando OpenSharing, você deve definir o formato de resposta para delta:
spark.read
.format("deltaSharing")
.option("responseFormat", "delta")
.option("readChangeFeed", "true")
.option("startingVersion", "<start version>")
.option("endingVersion", "<end version>")
.load("<table>")
A leitura de feed de dados de alteração entre alterações de tipo não é compatível. Em vez disso, é preciso dividir a operação em duas leituras separadas, uma terminando na versão da tabela que contém a alteração de tipo, e a outra começando na versão que contém a alteração de tipo.
Limitações
Compatibilidade com Apache Iceberg
O Apache Iceberg não oferece suporte a todas as alterações de tipo cobertas pela ampliação de tipo. Consulte Evolução do Esquema do Iceberg.
As alterações de tipo não compatíveis incluem o seguinte:
byte,short,int,longadecimaloudouble- aumento de escala decimal
datetotimestampNTZ
Quando você habilita o UniForm com compatibilidade Iceberg em uma tabela Delta Lake, a aplicação de uma das alterações de tipo precedentes resulta em um erro. Consulte Ler tabelas Delta Lake com clientes Iceberg usando UniForm.
Se você aplicar uma dessas alterações de tipo não suportadas a uma tabela do Delta Lake, você tem duas opções:
-
Regenerar metadados do Iceberg: use o seguinte comando para regenerar os metadados do Iceberg sem o recurso de tabela de ampliação de tipo.
SQLALTER TABLE <table-name> SET TBLPROPERTIES ('delta.universalFormat.config.icebergCompatVersion' = '<version>')Isso permite manter a compatibilidade UniForm após aplicar alterações de tipo incompatíveis.
-
Desative o recurso de tabela de ampliação de tipo: Consulte Desativar o recurso de tabela de ampliação de tipo.
Funções dependentes do tipo
Algumas funções SQL retornam resultados que dependem do tipo de dado de entrada. Por exemplo, a funçãohash retorna valores de hash diferentes para o mesmo valor lógico se o tipo de argumento for diferente: hash(1::INT) retorna um resultado diferente de hash(1::BIGINT).
Outras funções dependentes do tipo são: xxhash64, bit_get, bit_reverse, typeof.
Para resultados estáveis em consultas que usam essas funções, você deve converter valores explicitamente para o tipo desejado:
- Python
- Scala
- SQL
spark.read.table("table_name") \
.selectExpr("hash(CAST(column_name AS BIGINT))")
spark.read.table("main.johan_lasperas.dlt_type_widening_bronze2")
.selectExpr("hash(CAST(a AS BIGINT))")
-- Use explicit casting for stable hash values
SELECT hash(CAST(column_name AS BIGINT)) FROM table_name
Recursos sem suporte
- Não é possível definir um local de acompanhamento de esquema usando SQL ao realizar a transmissão de uma tabela Delta Lake com uma alteração de tipo.
- Não é possível compartilhar uma tabela com a ampliação de tipo ativada para consumidores que não são da Databricks usando o OpenSharing.