Tipo de ampliação
Visualização
Esse recurso está em Public Preview em Databricks Runtime 15.4 LTS e acima.
Tabelas com ampliação de tipo ativada permitem que você altere os tipos de dados da coluna para um tipo mais amplo sem reescrever os arquivos de dados subjacentes. O senhor pode alterar os tipos de coluna manualmente ou usar a evolução do esquema para desenvolver os tipos de coluna.
A ampliação de tipos está disponível em Databricks Runtime 15.4 LTS e acima. As tabelas com ampliação de tipo ativada só podem ser lidas em Databricks Runtime 15.4 LTS e acima.
O alargamento do tipo requer o Delta Lake. Todas as tabelas gerenciar Unity Catalog usam Delta Lake por default.
Alterações de tipo suportadas
Você pode ampliar os tipos de acordo com as seguintes regras:
Tipo de origem | Tipos mais amplos suportados |
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
As alterações de tipo são suportadas para colunas e campos de nível superior aninhados em estruturas, mapas e matrizes.
Por default Spark trunca a parte fracionária de um valor quando uma operação promove um tipo inteiro para decimal ou double e uma ingestão subsequente grava o valor de volta em uma coluna inteira. Para obter detalhes sobre o comportamento da política de atribuição, consulte Atribuição de armazenamento.
Ao alterar qualquer tipo numérico para decimal, a precisão total deve ser igual ou maior que a precisão inicial. Se o senhor também aumentar a escala, a precisão total deverá aumentar em um valor correspondente.
A meta mínima para os tipos byte, short e int é decimal(10,0). A meta mínima para long é decimal(20,0).
Se você quiser adicionar duas casas decimais a um campo com decimal(10,1), o alvo mínimo é decimal(12,3).
Ativar ampliação de texto
Você pode ativar a ampliação de tipo em uma tabela existente definindo a propriedade da tabela delta.enableTypeWidening como true:
ALTER TABLE <table_name> SET TBLPROPERTIES ('delta.enableTypeWidening' = 'true')
Você também pode ativar a ampliação de texto durante a criação da tabela:
CREATE TABLE T(c1 INT) TBLPROPERTIES('delta.enableTypeWidening' = 'true')
Quando o senhor ativa a ampliação de tipo, ele define o recurso de tabela typeWidening, que atualiza os protocolos de leitura e gravação. O senhor deve usar o site Databricks Runtime 15.4 ou o acima for para interagir com tabelas com a ampliação de tipos ativada. Se os clientes externos também interagirem com a tabela, verifique se eles suportam esse recurso da tabela. Consulte Delta Lake recurso compatibilidade e protocolos.
Aplicar manualmente uma alteração de tipo
Use o comando ALTER COLUMN para alterar manualmente os tipos:
ALTER TABLE <table_name> ALTER COLUMN <col_name> TYPE <new_type>
Esta operação atualiza o esquema da tabela sem sobrescrever os arquivos de dados subjacentes. Consulte ALTER TABLE para obter mais detalhes.
Ampliar os tipos com a evolução automática do esquema
A evolução do esquema funciona com a ampliação do tipo para atualizar os tipos de dados nas tabelas de destino para que correspondam ao tipo de dados recebidos.
Sem a ampliação do tipo ativada, a evolução do esquema sempre tenta fazer o downcast dos dados para corresponder aos tipos de coluna na tabela de destino. Se o senhor não quiser ampliar automaticamente os tipos de dados nas tabelas de destino, desative a ampliação de tipos antes de executar cargas de trabalho com a evolução do esquema ativada.
Para usar a evolução do esquema para ampliar o tipo de dados de uma coluna durante a ingestão, o senhor deve atender às seguintes condições:
- A execução do comando de gravação com a evolução automática do esquema ativada.
- A tabela de destino tem a ampliação de tipos ativada.
- O tipo de coluna de origem é mais largo do que o tipo de coluna de destino.
- A ampliação de tipo suporta a mudança de tipo.
As incompatibilidades de tipos que não atendem a todas essas condições seguem as regras normais de imposição de esquema. Ver imposição de esquema.
Os exemplos a seguir demonstram como o alargamento de tipos funciona com a evolução do esquema para operações de escrita comuns.
- Python
- Scala
- SQL
# Create target table with INT column and source table with BIGINT column
spark.sql("CREATE TABLE target_table (id INT, data STRING) TBLPROPERTIES ('delta.enableTypeWidening' = 'true')")
spark.sql("CREATE TABLE source_table (id BIGINT, data STRING)")
# Example 1: Automatic type widening in saveAsTable()
spark.table("source_table").write.mode("append").option("mergeSchema", "true").saveAsTable("target_table")
# Example 2: Automatic type widening in MERGE INTO
from delta.tables import DeltaTable
source_df = spark.table("source_table")
target_table = DeltaTable.forName(spark, "target_table")
(target_table.alias("target")
.merge(source_df.alias("source"), "target.id = source.id")
.withSchemaEvolution()
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
// Create target table with INT column and source table with BIGINT column
spark.sql("CREATE TABLE target_table (id INT, data STRING) TBLPROPERTIES ('delta.enableTypeWidening' = 'true')")
spark.sql("CREATE TABLE source_table (id BIGINT, data STRING)")
// Example 1: Automatic type widening in saveAsTable()
spark.table("source_table").write.mode("append").option("mergeSchema", "true").saveAsTable("target_table")
// Example 2: Automatic type widening in MERGE INTO
import io.delta.tables.DeltaTable
val sourceDf = spark.table("source_table")
val targetTable = DeltaTable.forName(spark, "target_table")
targetTable.alias("target")
.merge(sourceDf.alias("source"), "target.id = source.id")
.withSchemaEvolution()
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
-- Create target table with INT column and source table with BIGINT column
CREATE TABLE target_table (id INT, data STRING) TBLPROPERTIES ('delta.enableTypeWidening' = 'true');
CREATE TABLE source_table (id BIGINT, data STRING);
-- Example 1: Automatic type widening in INSERT INTO
---- Enable schema evolution
SET spark.databricks.delta.schema.autoMerge.enabled = true;
---- Insert data with BIGINT value column - automatically widens INT to BIGINT
INSERT INTO target_table SELECT * FROM source_table;
-- Example 2: Automatic type widening in MERGE INTO
MERGE WITH SCHEMA EVOLUTION INTO target_table
USING source_table
ON target_table.id = source_table.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
Desativar o recurso de tabela de ampliação de tipos
Você pode evitar a ampliação acidental de tipos em tabelas habilitadas definindo a propriedade como false:
ALTER TABLE <table_name> SET TBLPROPERTIES ('delta.enableTypeWidening' = 'false')
Essa configuração impede futuras alterações de tipo na tabela, mas não remove o recurso de ampliação de tipo da tabela nem desfaz alterações de tipo anteriores.
Se o senhor precisar remover completamente o recurso da tabela de ampliação de tipo, poderá usar o comando DROP FEATURE, conforme mostrado no exemplo a seguir:
ALTER TABLE <table-name> DROP FEATURE 'typeWidening' [TRUNCATE HISTORY]
As tabelas que habilitaram a ampliação de tipo usando o Databricks Runtime 15.4 LTS exigem a eliminação do recurso typeWidening-preview.
Ao desativar o alargamento de tipos, o Databricks reescreve todos os arquivos de dados que não estão em conformidade com o esquema de tabela atual. Consulte Remover um recurso de tabela Delta Lake e fazer downgrade do protocolo da tabela.
transmissão de uma tabela Delta
O suporte para ampliação de tipos em transmissão estruturada está disponível no Databricks Runtime 16.4 LTS e versões superiores.
Quando transmitido de uma tabela Delta com alargamento de tipo ativado, você pode configurar o alargamento de tipo automático para consultas de transmissão habilitando a evolução do esquema com a opção mergeSchema na tabela de destino. A tabela de destino deve ter a ampliação de tipo ativada. Consulte Ativar ampliação de tipo.
- Python
- Scala
(spark.readStream
.table("delta_source_table")
.writeStream
.option("checkpointLocation", "/path/to/checkpointLocation")
.option("mergeSchema", "true")
.toTable("output_table")
)
spark.readStream
.table("delta_source_table")
.writeStream
.option("checkpointLocation", "/path/to/checkpointLocation")
.option("mergeSchema", "true")
.toTable("output_table")
Quando mergeSchema está habilitado e a tabela de destino tem a ampliação de tipo habilitada:
- As alterações de tipo são aplicadas automaticamente à tabela subsequente, sem necessidade de intervenção manual.
- Novas colunas são adicionadas automaticamente ao esquema da tabela subsequente.
Sem mergeSchema ativado, os valores são tratados de acordo com a configuração spark.sql.storeAssignmentPolicy , que por default converte os valores para corresponder ao tipo coluna de destino. Para obter mais informações sobre o comportamento da política de atribuição, consulte Atribuição de armazenamento.
Confirmação de alteração de tipo manual
Ao utilizar uma tabela Delta , você pode fornecer um local de acompanhamento de esquema para rastrear alterações de esquema não aditivas, incluindo alterações de tipo. Fornecer um local de acompanhamento de esquema é obrigatório no Databricks Runtime 18.0 e versões anteriores, e opcional no Databricks Runtime 18.1 e versões posteriores.
- Python
- Scala
checkpoint_path = "/path/to/checkpointLocation"
(spark.readStream
.option("schemaTrackingLocation", checkpoint_path)
.table("delta_source_table")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("output_table")
)
val checkpointPath = "/path/to/checkpointLocation"
spark.readStream
.option("schemaTrackingLocation", checkpointPath)
.table("delta_source_table")
.writeStream
.option("checkpointLocation", checkpointPath)
.toTable("output_table")
Após fornecer um local de acompanhamento de esquema, a transmissão evolui seu esquema rastreado quando detecta uma mudança de tipo e então para. Nesse momento, você pode tomar qualquer medida necessária para lidar com a mudança de tipo, como habilitar a ampliação de tipo na tabela subsequente ou atualizar a consulta de transmissão.
Para retomar o processamento, defina a configuração do Spark spark.databricks.delta.streaming.allowSourceColumnTypeChange ou a opção de leitura do DataFrame allowSourceColumnTypeChange:
- Python
- Scala
- SQL
checkpoint_path = "/path/to/checkpointLocation"
(spark.readStream
.option("schemaTrackingLocation", checkpoint_path)
.option("allowSourceColumnTypeChange", "<delta_source_table_version>")
# alternatively to allow all future type changes for this stream:
# .option("allowSourceColumnTypeChange", "always")
.table("delta_source_table")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("output_table")
)
val checkpointPath = "/path/to/checkpointLocation"
spark.readStream
.option("schemaTrackingLocation", checkpointPath)
.option("allowSourceColumnTypeChange", "<delta_source_table_version>")
// alternatively to allow all future type changes for this stream:
// .option("allowSourceColumnTypeChange", "always")
.table("delta_source_table")
.writeStream
.option("checkpointLocation", checkpointPath)
.toTable("output_table")
-- To unblock for this particular stream just for this series of schema change(s):
SET spark.databricks.delta.streaming.allowSourceColumnTypeChange.ckpt_<checkpoint_id> = "<delta_source_table_version>"
-- To unblock for this particular stream:
SET spark.databricks.delta.streaming.allowSourceColumnTypeChange = "<delta_source_table_version>"
-- To unblock for all streams:
SET spark.databricks.delta.streaming.allowSourceColumnTypeChange = "always"
O ID do ponto de verificação <checkpoint_id> e a versão da tabela de origem Delta Lake <delta_source_table_version> são exibidos na mensagem de erro quando a transmissão é interrompida.
Pipeline declarativoLakeFlow Spark
O suporte para ampliação de tipos no pipeline LakeFlow Spark Declarative está disponível no canal PREVIEW.
Você pode habilitar a ampliação de tipos para o pipeline declarativo LakeFlow Spark no nível pipeline ou para tabelas individuais. O alargamento de tipos permite que os tipos de coluna sejam alargados automaticamente durante a execução pipeline , sem a necessidade de uma refresh completa das tabelas de transmissão. As alterações de tipo em visões materializadas sempre acionam um recálculo completo e, quando uma alteração de tipo é aplicada a uma tabela de origem, as visões materializadas que dependem dessa tabela também precisam ser recálculadas completamente para refletir os novos tipos.
Habilitar a ampliação de tipos para todo o pipeline
Para habilitar a ampliação de tipos para todas as tabelas em um pipeline, defina a configuração do pipeline pipelines.enableTypeWidening:
- JSON
- YAML
{
"configuration": {
"pipelines.enableTypeWidening": "true"
}
}
configuration:
pipelines.enableTypeWidening: 'true'
Habilitar a ampliação de tipos para tabelas específicas
Você também pode habilitar a ampliação de tipos para tabelas individuais definindo a propriedade da tabela delta.enableTypeWidening:
- Python
- SQL
import dlt
@dlt.table(
table_properties={"delta.enableTypeWidening": "true"}
)
def my_table():
return spark.readStream.table("source_table")
CREATE OR REFRESH STREAMING TABLE my_table
TBLPROPERTIES ('delta.enableTypeWidening' = 'true')
AS SELECT * FROM source_table
Compatibilidade com leitores subsequentes
Tabelas com ampliação de tipo habilitada só podem ser lidas no Databricks Runtime 15.4 LTS e versões superiores. Se você deseja que uma tabela com ampliação de tipo habilitada em seu pipeline seja legível para leitores no Databricks Runtime 14.3 e versões anteriores, você deve:
- Desative a ampliação de tipo removendo a propriedade
delta.enableTypeWidening/pipelines.enableTypeWideningou definindo-a como falsa e acione uma refresh completa da tabela. - Ative Modede Compatibilidade no seu tablet.
Delta Sharing
O suporte para Type Widening em Delta Sharing está disponível em Databricks Runtime 16.1 e acima.
O compartilhamento de uma tabela Delta Lake com a ampliação de tipos ativada é suportado em Databricks-to-Databricks Delta Sharing. O provedor e o destinatário devem estar em Databricks Runtime 16.1 ou acima.
Para ler o Change Data Feed de uma tabela do Delta Lake com ampliação de tipo ativada usando o Delta Sharing, o senhor deve definir o formato de resposta como delta:
spark.read
.format("deltaSharing")
.option("responseFormat", "delta")
.option("readChangeFeed", "true")
.option("startingVersion", "<start version>")
.option("endingVersion", "<end version>")
.load("<table>")
A leitura do feed de dados de alteração em todas as alterações de tipo não é suportada. Em vez disso, o senhor deve dividir as operações em duas leituras separadas, uma terminando na versão da tabela que contém a alteração de tipo e a outra começando na versão que contém a alteração de tipo.
Limitações
Compatibilidade com o Apache Iceberg
Apache Iceberg não é compatível com todas as alterações de tipo cobertas pela ampliação de tipo, consulte Iceberg evolução do esquema. Em particular, o Databricks não é compatível com as seguintes alterações de tipo:
byte,short,int,longadecimaloudouble- aumento da escala decimal
dateparatimestampNTZ
Ao habilitar o UniForm com compatibilidade com o Iceberg em uma tabela do Delta Lake, a aplicação de uma dessas alterações de tipo resulta em um erro. Consulte a seção "Ler tabelas Delta com clientes Iceberg".
Se você aplicar uma dessas alterações de tipo não suportadas a uma tabela do Delta Lake, terá duas opções:
-
Regenerar metadados do Iceberg: Use o seguinte comando para regenerar os metadados do Iceberg sem o recurso de ampliação da tabela de tipos:
SQLALTER TABLE <table-name> SET TBLPROPERTIES ('delta.universalFormat.config.icebergCompatVersion' = '<version>')Isso permite manter a compatibilidade com o Uniform mesmo após aplicar alterações de tipo incompatíveis.
-
Desativar o recurso de ampliação de tabela de tipos: consulte Desativar o recurso de ampliação de tabela de tipos.
Funções dependentes do tipo
Algumas funções SQL retornam resultados que dependem do tipo de dados de entrada. Por exemplo, a funçãohash retorna valores de hash diferentes para o mesmo valor lógico se o tipo do argumento for diferente: hash(1::INT) retorna um resultado diferente de hash(1::BIGINT).
Outras funções dependentes do tipo são: xxhash64, bit_get, bit_reverse, typeof.
Para obter resultados estáveis em consultas que utilizam essas funções, converta explicitamente os valores para o tipo desejado:
- Python
- Scala
- SQL
spark.read.table("table_name") \
.selectExpr("hash(CAST(column_name AS BIGINT))")
spark.read.table("main.johan_lasperas.dlt_type_widening_bronze2")
.selectExpr("hash(CAST(a AS BIGINT))")
-- Use explicit casting for stable hash values
SELECT hash(CAST(column_name AS BIGINT)) FROM table_name
Outras limitações
- Não é possível fornecer um local de acompanhamento de esquema usando SQL quando apresentado a partir de uma tabela Delta Lake com uma alteração de tipo.
- Não é possível compartilhar uma tabela com a ampliação de tipo ativada para consumidores que não sejam doDatabricks usando Delta Sharing.