Ampliação de tipo
Tabelas com ampliação de tipo habilitada permitem alterar os tipos de dados da coluna para um tipo mais amplo sem reescrever os arquivos de dados subjacentes. Pode-se alterar os tipos de coluna manualmente ou usar a evolução do esquema para evoluir os tipos de coluna.
Alargamento de tipo está disponível no Databricks Runtime 15.4 LTS e acima. Tabelas com ampliação de tipo ativada só podem ser lidas no Databricks Runtime 15.4 LTS e acima.
A ampliação de tipo requer o Delta Lake. Todas as tabelas gerenciadas do Unity Catalog usam Delta Lake por padrão.
Tipos de alterações suportados
É possível ampliar tipos de acordo com as seguintes regras:
Tipo de origem | Tipos mais abrangentes compatíveis |
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| Qualquer tipo |
Alterações de tipo são compatíveis para colunas de nível superior e campos aninhados dentro de estruturas, mapas e matrizes.
VOID para qualquer tipo não exige que a ampliação de tipo esteja habilitada na tabela. Qualquer operação que atualiza o tipo de uma coluna VOID é bem-sucedida sem configuração adicional. VOID ampliação de tipo está disponível no Databricks Runtime 18,2 ou acima.
Por default, o Spark trunca a parte fracionária de um valor quando uma operação promove um tipo inteiro para um decimal ou double e uma ingestão downstream grava o valor de volta em uma coluna de inteiro. Para obter detalhes sobre o comportamento da política de atribuição, consulte Atribuição da loja.
Ao alterar qualquer tipo numérico para decimal, a precisão total deve ser igual ou maior que a precisão inicial. Se também se aumentar a escala, a precisão total deve aumentar em uma quantidade correspondente.
O destino mínimo para os tipos byte, short e int é decimal(10,0). O alvo mínimo para long é decimal(20,0).
Se você deseja adicionar duas casas decimais a um campo com decimal(10,1), o destino mínimo é decimal(12,3).
Habilitar ampliação de tipo
É possível habilitar o alargamento de tipo em uma tabela existente definindo a propriedade da tabela delta.enableTypeWidening como true:
ALTER TABLE <table_name> SET TBLPROPERTIES ('delta.enableTypeWidening' = 'true')
Você também pode ativar a ampliação de tipo durante a criação da tabela:
CREATE TABLE T(c1 INT) TBLPROPERTIES('delta.enableTypeWidening' = 'true')
Quando você habilita a ampliação de tipo, ele define o recurso de tabela typeWidening, que atualiza os protocolos de leitura e gravação. Você deve usar o Databricks Runtime 15.4 ou acima para interagir com tabelas com o alargamento de tipo habilitado. Se clientes externos também interagirem com a tabela, verifique se eles suportam esse recurso de tabela. Consulte compatibilidade de recursos do Delta Lake e protocolos.
Aplicar manualmente uma alteração de tipo
Use o comando ALTER COLUMN para alterar tipos manualmente:
ALTER TABLE <table_name> ALTER COLUMN <col_name> TYPE <new_type>
Esta operação atualiza o esquema da tabela sem reescrever os arquivos de dados subjacentes. Consulte ALTER TABLE para mais detalhes.
Ampliar tipos com evolução automática do esquema
A evolução do esquema atua com a ampliação de tipo para atualizar os tipos de dados em tabelas de destino para corresponder ao tipo de dados de entrada.
Sem a ampliação de tipo habilitada, a evolução do esquema sempre tenta fazer o downcast dos dados para corresponder aos tipos de coluna na tabela de destino. Caso não se queira ampliar automaticamente os tipos de dados nas tabelas de destino, desabilite a ampliação de tipos antes de executar cargas de trabalho com a evolução do esquema ativada.
Para usar a evolução do esquema para ampliar o tipo de dado de uma coluna durante a ingestão, é preciso atender às seguintes condições:
- O comando de gravação é executado com a evolução automática do esquema habilitada.
- A tabela de destino está com a ampliação de tipo ativada.
- O tipo de coluna de origem é mais amplo que o tipo de coluna de destino.
- Ampliação de tipo suporta a mudança de tipo.
Incompatibilidades de tipo que não atendem a todas essas condições seguem as regras normais de imposição de esquema. See imposição de esquema.
Os exemplos a seguir demonstram como o alargamento de tipo funciona com a evolução do esquema para operações de gravação comuns.
- Python
- Scala
- SQL
# Create target table with INT column and source table with BIGINT column
spark.sql("CREATE TABLE target_table (id INT, data STRING) TBLPROPERTIES ('delta.enableTypeWidening' = 'true')")
spark.sql("CREATE TABLE source_table (id BIGINT, data STRING)")
# Example 1: Automatic type widening in saveAsTable()
spark.table("source_table").write.mode("append").option("mergeSchema", "true").saveAsTable("target_table")
# Example 2: Automatic type widening in MERGE INTO
from delta.tables import DeltaTable
source_df = spark.table("source_table")
target_table = DeltaTable.forName(spark, "target_table")
(target_table.alias("target")
.merge(source_df.alias("source"), "target.id = source.id")
.withSchemaEvolution()
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
// Create target table with INT column and source table with BIGINT column
spark.sql("CREATE TABLE target_table (id INT, data STRING) TBLPROPERTIES ('delta.enableTypeWidening' = 'true')")
spark.sql("CREATE TABLE source_table (id BIGINT, data STRING)")
// Example 1: Automatic type widening in saveAsTable()
spark.table("source_table").write.mode("append").option("mergeSchema", "true").saveAsTable("target_table")
// Example 2: Automatic type widening in MERGE INTO
import io.delta.tables.DeltaTable
val sourceDf = spark.table("source_table")
val targetTable = DeltaTable.forName(spark, "target_table")
targetTable.alias("target")
.merge(sourceDf.alias("source"), "target.id = source.id")
.withSchemaEvolution()
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
-- Create target table with INT column and source table with BIGINT column
CREATE TABLE target_table (id INT, data STRING) TBLPROPERTIES ('delta.enableTypeWidening' = 'true');
CREATE TABLE source_table (id BIGINT, data STRING);
-- Example 1: Automatic type widening in INSERT INTO
---- Insert data with BIGINT value column - automatically widens INT to BIGINT
INSERT WITH SCHEMA EVOLUTION INTO target_table SELECT * FROM source_table;
-- Example 2: Automatic type widening in MERGE INTO
MERGE WITH SCHEMA EVOLUTION INTO target_table
USING source_table
ON target_table.id = source_table.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
Auto Loader
Visualização
O suporte para ampliação de tipo no Auto Loader está em Public Preview.
O Auto Loader oferece compatibilidade com o alargamento de tipo com a evolução automática do esquema. Quando se usa o Auto Loader para ingerir dados em uma tabela Delta Lake com a ampliação de tipo e a evolução do esquema habilitadas, os tipos de coluna são automaticamente ampliados para corresponder aos dados recebidos.
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "<path-to-schema-location>")
.load("<path-to-source-data>")
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", "<path-to-checkpoint>")
.trigger(availableNow=True)
.toTable("table_name")
)
Consulte Alargamento Automático de Tipo com o Auto Loader. Além disso, a tabela de destino deve ter a ampliação de tipo habilitada. Consulte Habilitar ampliação de tipo.
Desativar o recurso de tabela de ampliação de tipo
Pode-se evitar a ampliação de tipo acidental em tabelas habilitadas ao definir a propriedade para false:
ALTER TABLE <table_name> SET TBLPROPERTIES ('delta.enableTypeWidening' = 'false')
Essa configuração impede futuras alterações de tipo na tabela, mas não remove o recurso de tabela de ampliação de tipo nem desfaz alterações de tipo anteriores.
Se precisar remover completamente os recursos de tabela de ampliação de tipo, você pode utilizar o comando DROP FEATURE como mostrado no exemplo a seguir:
ALTER TABLE <table-name> DROP FEATURE 'typeWidening' [TRUNCATE HISTORY]
As tabelas que habilitaram o alargamento de tipo usando o Databricks Runtime 15.4 LTS requerem a exclusão do recurso typeWidening-preview em vez disso.
Ao descartar o alargamento de tipo, o Databricks reescreve todos os arquivos de dados que não estão em conformidade com o esquema de tabela atual. Consulte Remover um recurso de tabela do Delta Lake e fazer downgrade do protocolo da tabela.
Transmissão de uma tabela Delta Lake
Suporte para ampliação de tipo em transmissão estructurada está disponível no Databricks Runtime 16.4 LTS e acima.
Ao transmitir de uma tabela Delta Lake com a expansão de tipo habilitada, é possível configurar a expansão de tipo automática para consultas de transmissão habilitando a evolução do esquema com a opção mergeSchema na tabela de destino. A tabela de destino deve ter ampliação de tipo habilitada. Consulte Habilitar a ampliação de tipo.
- Python
- Scala
(spark.readStream
.table("delta_source_table")
.writeStream
.option("checkpointLocation", "/path/to/checkpointLocation")
.option("mergeSchema", "true")
.toTable("output_table")
)
spark.readStream
.table("delta_source_table")
.writeStream
.option("checkpointLocation", "/path/to/checkpointLocation")
.option("mergeSchema", "true")
.toTable("output_table")
Quando mergeSchema está ativado e a tabela de destino tem a ampliação de tipo ativada:
- As alterações de tipo são aplicadas automaticamente à tabela downstream sem a necessidade de intervenção manual.
- Novas colunas são adicionadas automaticamente ao esquema da tabela a jusante.
Sem mergeSchema ativado, os valores são tratados de acordo com a configuração spark.sql.storeAssignmentPolicy, que por default rebaixa os valores para corresponder ao tipo da coluna de destino. Para obter mais informações sobre o comportamento da política de atribuição, consulte Atribuição de armazenamento.
Confirmação de alteração de tipo manual
Ao transmitir de uma tabela Delta Lake, é possível fornecer um local de acompanhamento de esquema para acompanhar as alterações de esquema não aditivas, incluindo alterações de tipo. É obrigatório fornecer um local de acompanhamento de esquema no Databricks Runtime 18.0 e abaixo, e é opcional no Databricks Runtime 18.1 e acima.
- Python
- Scala
checkpoint_path = "/path/to/checkpointLocation"
(spark.readStream
.option("schemaTrackingLocation", checkpoint_path)
.table("delta_source_table")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("output_table")
)
val checkpointPath = "/path/to/checkpointLocation"
spark.readStream
.option("schemaTrackingLocation", checkpointPath)
.table("delta_source_table")
.writeStream
.option("checkpointLocation", checkpointPath)
.toTable("output_table")
Após fornecer um local para o acompanhamento do esquema, a transmissão evolui seu esquema acompanhado quando detecta uma alteração de tipo e, então, para. Nesse momento, pode-se tomar qualquer ação necessária para lidar com a alteração de tipo, como ativar a ampliação de tipo na tabela downstream ou atualizar a consulta de transmissão.
Para retomar o processamento, defina a configuração do Spark spark.databricks.delta.streaming.allowSourceColumnTypeChange ou a opção de leitor do DataFrame allowSourceColumnTypeChange:
- Python
- Scala
- SQL
checkpoint_path = "/path/to/checkpointLocation"
(spark.readStream
.option("schemaTrackingLocation", checkpoint_path)
.option("allowSourceColumnTypeChange", "<delta_source_table_version>")
# alternatively to allow all future type changes for this stream:
# .option("allowSourceColumnTypeChange", "always")
.table("delta_source_table")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("output_table")
)
val checkpointPath = "/path/to/checkpointLocation"
spark.readStream
.option("schemaTrackingLocation", checkpointPath)
.option("allowSourceColumnTypeChange", "<delta_source_table_version>")
// alternatively to allow all future type changes for this stream:
// .option("allowSourceColumnTypeChange", "always")
.table("delta_source_table")
.writeStream
.option("checkpointLocation", checkpointPath)
.toTable("output_table")
-- To unblock for this particular stream just for this series of schema change(s):
SET spark.databricks.delta.streaming.allowSourceColumnTypeChange.ckpt_<checkpoint_id> = "<delta_source_table_version>"
-- To unblock for this particular stream:
SET spark.databricks.delta.streaming.allowSourceColumnTypeChange = "<delta_source_table_version>"
-- To unblock for all streams:
SET spark.databricks.delta.streaming.allowSourceColumnTypeChange = "always"
O ID do ponto de verificação <checkpoint_id> e a versão <delta_source_table_version> da tabela de origem do Delta Lake são exibidos na mensagem de erro quando a transmissão para.
Para uma lista completa de opções de transmissão do Delta Lake, consulte Delta Lake.
Lakeflow Spark Declarative Pipelines
É possível habilitar a expansão de tipo para os Pipelines Declarativos do Lakeflow Spark em nível de pipeline ou para tabelas individuais. A ampliação de tipo permite que os tipos de coluna sejam automaticamente ampliados durante a execução do pipeline sem exigir um refresh completo das tabelas de transmissão. Alterações de tipo em visualizações materializadas sempre acionam um recalculamento completo, e quando uma alteração de tipo é aplicada a uma tabela de origem, as visualizações materializadas que dependem dessa tabela requerem um recalculamento completo para refletir os novos tipos.
Habilitar ampliação de tipo para um pipeline inteiro
Para habilitar a ampliação de tipo para todas as tabelas em um pipeline, defina a configuração do pipeline pipelines.enableTypeWidening:
- JSON
- YAML
{
"configuration": {
"pipelines.enableTypeWidening": "true"
}
}
configuration:
pipelines.enableTypeWidening: 'true'
Habilitar ampliação de tipo para tabelas específicas
Você também pode ativar a ampliação de tipo para tabelas individuais definindo a propriedade da tabela delta.enableTypeWidening:
- Python
- SQL
import dlt
@dlt.table(
table_properties={"delta.enableTypeWidening": "true"}
)
def my_table():
return spark.readStream.table("source_table")
CREATE OR REFRESH STREAMING TABLE my_table
TBLPROPERTIES ('delta.enableTypeWidening' = 'true')
AS SELECT * FROM source_table
Compatibilidade com leitores downstream
Tabelas com ampliação de tipo habilitada só podem ser lidas no Databricks Runtime 15.4 LTS e acima. Se você quiser uma tabela com ampliação de tipo ativada em seu pipeline para ser legível por leitores no Databricks Runtime 14.3 ou abaixo, você deve:
- Desabilitar o alargamento de tipo removendo a propriedade
delta.enableTypeWidening/pipelines.enableTypeWideningou definindo-a como falsa, e acionar uma atualização completa da tabela. - Habilitar Compatibility Mode na sua tabela.
OpenSharing
O suporte para Type Widening no OpenSharing está disponível no Databricks Runtime 16.1e acima.
O compartilhamento de uma tabela do Delta Lake com a ampliação de tipo habilitada é compatível no Databricks-to-Databricks OpenSharing. O provedor e o destinatário devem estar no Databricks Runtime 16.1 e acima.
Para ler o feed de dados de alteração de uma tabela Delta Lake com alargamento de tipo ativado usando OpenSharing, é necessário definir o formato de resposta como delta:
spark.read
.format("deltaSharing")
.option("responseFormat", "delta")
.option("readChangeFeed", "true")
.option("startingVersion", "<start version>")
.option("endingVersion", "<end version>")
.load("<table>")
A leitura de feed de dados de alterações entre alterações de tipo não é compatível. Em vez disso, é preciso dividir a operação em duas leituras separadas, uma terminando na versão da tabela que contém a alteração de tipo, e a outra começando na versão que contém a alteração de tipo.
Limitações
Compatibilidade com Apache Iceberg
Apache Iceberg não oferece suporte a todas as alterações de tipo abrangidas pela ampliação de tipo, consulte Evolução do Esquema do Iceberg. Em particular, o Databricks não oferece suporte às seguintes alterações de tipo:
byte,short,int,longadecimaloudouble- aumento de escala decimal
datetotimestampNTZ
Ao habilitar o UniForm com compatibilidade Iceberg em uma tabela Delta Lake, a aplicação de uma destas alterações de tipo resulta em um erro. Consulte Leitura de tabelas do Delta Lake com clientes Iceberg.
Se você aplicar uma dessas alterações de tipo não suportadas a uma tabela do Delta Lake, você tem duas opções:
-
Regenerar metadados do Iceberg: use o seguinte comando para regenerar os metadados do Iceberg sem o recurso de tabela de ampliação de tipo.
SQLALTER TABLE <table-name> SET TBLPROPERTIES ('delta.universalFormat.config.icebergCompatVersion' = '<version>')Isso permite manter a compatibilidade UniForm após aplicar alterações de tipo incompatíveis.
-
Desative o recurso de tabela de ampliação de tipo: Consulte Desativar o recurso de tabela de ampliação de tipo.
Funções dependentes do tipo
Algumas funções SQL retornam resultados que dependem do tipo de dado de entrada. Por exemplo, a funçãohash retorna valores de hash diferentes para o mesmo valor lógico se o tipo de argumento for diferente: hash(1::INT) retorna um resultado diferente de hash(1::BIGINT).
Outras funções dependentes do tipo são: xxhash64, bit_get, bit_reverse, typeof.
Para resultados estáveis em consultas que usam estas funções, converta explicitamente os valores para o tipo desejado:
- Python
- Scala
- SQL
spark.read.table("table_name") \
.selectExpr("hash(CAST(column_name AS BIGINT))")
spark.read.table("main.johan_lasperas.dlt_type_widening_bronze2")
.selectExpr("hash(CAST(a AS BIGINT))")
-- Use explicit casting for stable hash values
SELECT hash(CAST(column_name AS BIGINT)) FROM table_name
Outras Limitações
- Não é possível fornecer um local de acompanhamento de esquema usando SQL ao realizar transmissão de uma tabela Delta Lake com uma alteração de tipo.
- Não é possível compartilhar uma tabela com a ampliação de tipo ativada para consumidores que não são da Databricks usando o OpenSharing.