Tipo de ampliação
Tabelas com ampliação de tipo ativada permitem que você altere os tipos de dados da coluna para um tipo mais amplo sem reescrever os arquivos de dados subjacentes. O senhor pode alterar os tipos de coluna manualmente ou usar a evolução do esquema para desenvolver os tipos de coluna.
A ampliação de tipos está disponível em Databricks Runtime 15.4 LTS e acima. As tabelas com ampliação de tipo ativada só podem ser lidas em Databricks Runtime 15.4 LTS e acima.
O alargamento do tipo requer o Delta Lake. Todas as tabelas gerenciar Unity Catalog usam Delta Lake por default.
Alterações de tipo suportadas
Você pode ampliar os tipos de acordo com as seguintes regras:
Tipo de origem | Tipos mais amplos suportados |
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
As alterações de tipo são suportadas para colunas e campos de nível superior aninhados em estruturas, mapas e matrizes.
Por default Spark trunca a parte fracionária de um valor quando uma operação promove um tipo inteiro para decimal ou double e uma ingestão subsequente grava o valor de volta em uma coluna inteira. Para obter detalhes sobre o comportamento da política de atribuição, consulte Atribuição de armazenamento.
Ao alterar qualquer tipo numérico para decimal, a precisão total deve ser igual ou maior que a precisão inicial. Se o senhor também aumentar a escala, a precisão total deverá aumentar em um valor correspondente.
A meta mínima para os tipos byte, short e int é decimal(10,0). A meta mínima para long é decimal(20,0).
Se você quiser adicionar duas casas decimais a um campo com decimal(10,1), o alvo mínimo é decimal(12,3).
Ativar ampliação de texto
Você pode ativar a ampliação de tipo em uma tabela existente definindo a propriedade da tabela delta.enableTypeWidening como true:
ALTER TABLE <table_name> SET TBLPROPERTIES ('delta.enableTypeWidening' = 'true')
Você também pode ativar a ampliação de texto durante a criação da tabela:
CREATE TABLE T(c1 INT) TBLPROPERTIES('delta.enableTypeWidening' = 'true')
Quando o senhor ativa a ampliação de tipo, ele define o recurso de tabela typeWidening, que atualiza os protocolos de leitura e gravação. O senhor deve usar o site Databricks Runtime 15.4 ou o acima for para interagir com tabelas com a ampliação de tipos ativada. Se os clientes externos também interagirem com a tabela, verifique se eles suportam esse recurso da tabela. Consulte Delta Lake recurso compatibilidade e protocolos.
Aplicar manualmente uma alteração de tipo
Use o comando ALTER COLUMN para alterar manualmente os tipos:
ALTER TABLE <table_name> ALTER COLUMN <col_name> TYPE <new_type>
Esta operação atualiza o esquema da tabela sem sobrescrever os arquivos de dados subjacentes. Consulte ALTER TABLE para obter mais detalhes.
Ampliar os tipos com a evolução automática do esquema
A evolução do esquema funciona com a ampliação do tipo para atualizar os tipos de dados nas tabelas de destino para que correspondam ao tipo de dados recebidos.
Sem a ampliação do tipo ativada, a evolução do esquema sempre tenta fazer o downcast dos dados para corresponder aos tipos de coluna na tabela de destino. Se o senhor não quiser ampliar automaticamente os tipos de dados nas tabelas de destino, desative a ampliação de tipos antes de executar cargas de trabalho com a evolução do esquema ativada.
Para usar a evolução do esquema para ampliar o tipo de dados de uma coluna durante a ingestão, o senhor deve atender às seguintes condições:
- A execução do comando de gravação com a evolução automática do esquema ativada.
- A tabela de destino tem a ampliação de tipos ativada.
- O tipo de coluna de origem é mais largo do que o tipo de coluna de destino.
- A ampliação de tipo suporta a mudança de tipo.
As incompatibilidades de tipos que não atendem a todas essas condições seguem as regras normais de imposição de esquema. Ver imposição de esquema.
Os exemplos a seguir demonstram como o alargamento de tipos funciona com a evolução do esquema para operações de escrita comuns.
- Python
- Scala
- SQL
# Create target table with INT column and source table with BIGINT column
spark.sql("CREATE TABLE target_table (id INT, data STRING) TBLPROPERTIES ('delta.enableTypeWidening' = 'true')")
spark.sql("CREATE TABLE source_table (id BIGINT, data STRING)")
# Example 1: Automatic type widening in saveAsTable()
spark.table("source_table").write.mode("append").option("mergeSchema", "true").saveAsTable("target_table")
# Example 2: Automatic type widening in MERGE INTO
from delta.tables import DeltaTable
source_df = spark.table("source_table")
target_table = DeltaTable.forName(spark, "target_table")
(target_table.alias("target")
.merge(source_df.alias("source"), "target.id = source.id")
.withSchemaEvolution()
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
// Create target table with INT column and source table with BIGINT column
spark.sql("CREATE TABLE target_table (id INT, data STRING) TBLPROPERTIES ('delta.enableTypeWidening' = 'true')")
spark.sql("CREATE TABLE source_table (id BIGINT, data STRING)")
// Example 1: Automatic type widening in saveAsTable()
spark.table("source_table").write.mode("append").option("mergeSchema", "true").saveAsTable("target_table")
// Example 2: Automatic type widening in MERGE INTO
import io.delta.tables.DeltaTable
val sourceDf = spark.table("source_table")
val targetTable = DeltaTable.forName(spark, "target_table")
targetTable.alias("target")
.merge(sourceDf.alias("source"), "target.id = source.id")
.withSchemaEvolution()
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
-- Create target table with INT column and source table with BIGINT column
CREATE TABLE target_table (id INT, data STRING) TBLPROPERTIES ('delta.enableTypeWidening' = 'true');
CREATE TABLE source_table (id BIGINT, data STRING);
-- Example 1: Automatic type widening in INSERT INTO
---- Insert data with BIGINT value column - automatically widens INT to BIGINT
INSERT WITH SCHEMA EVOLUTION INTO target_table SELECT * FROM source_table;
-- Example 2: Automatic type widening in MERGE INTO
MERGE WITH SCHEMA EVOLUTION INTO target_table
USING source_table
ON target_table.id = source_table.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
Auto Loader
Visualização
O suporte para ampliação de tipos no Auto Loader está em versão prévia pública.
Auto Loader suporta ampliação de tipo com evolução automática do esquema. Ao usar Auto Loader para ingerir dados em uma tabela Delta com a ampliação de tipos e a evolução do esquema ativadas, os tipos de coluna são automaticamente ampliados para corresponder aos dados recebidos.
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "<path-to-schema-location>")
.load("<path-to-source-data>")
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", "<path-to-checkpoint>")
.trigger(availableNow=True)
.toTable("table_name")
)
Veja Ampliação automática de tipos com Auto Loader. Além disso, a tabela de destino deve ter a ampliação de tipo ativada. Consulte Ativar ampliação de tipo.
Desativar o recurso de tabela de ampliação de tipos
Você pode evitar a ampliação acidental de tipos em tabelas habilitadas definindo a propriedade como false:
ALTER TABLE <table_name> SET TBLPROPERTIES ('delta.enableTypeWidening' = 'false')
Essa configuração impede futuras alterações de tipo na tabela, mas não remove o recurso de ampliação de tipo da tabela nem desfaz alterações de tipo anteriores.
Se o senhor precisar remover completamente o recurso da tabela de ampliação de tipo, poderá usar o comando DROP FEATURE, conforme mostrado no exemplo a seguir:
ALTER TABLE <table-name> DROP FEATURE 'typeWidening' [TRUNCATE HISTORY]
As tabelas que habilitaram a ampliação de tipo usando o Databricks Runtime 15.4 LTS exigem a eliminação do recurso typeWidening-preview.
Ao desativar o alargamento de tipos, o Databricks reescreve todos os arquivos de dados que não estão em conformidade com o esquema de tabela atual. Consulte Remover um recurso de tabela Delta Lake e fazer downgrade do protocolo da tabela.
transmissão de uma tabela Delta
O suporte para ampliação de tipos em transmissão estruturada está disponível no Databricks Runtime 16.4 LTS e versões superiores.
Quando transmitido de uma tabela Delta com alargamento de tipo ativado, você pode configurar o alargamento de tipo automático para consultas de transmissão habilitando a evolução do esquema com a opção mergeSchema na tabela de destino. A tabela de destino deve ter a ampliação de tipo ativada. Consulte Ativar ampliação de tipo.
- Python
- Scala
(spark.readStream
.table("delta_source_table")
.writeStream
.option("checkpointLocation", "/path/to/checkpointLocation")
.option("mergeSchema", "true")
.toTable("output_table")
)
spark.readStream
.table("delta_source_table")
.writeStream
.option("checkpointLocation", "/path/to/checkpointLocation")
.option("mergeSchema", "true")
.toTable("output_table")
Quando mergeSchema está habilitado e a tabela de destino tem a ampliação de tipo habilitada:
- As alterações de tipo são aplicadas automaticamente à tabela subsequente, sem necessidade de intervenção manual.
- Novas colunas são adicionadas automaticamente ao esquema da tabela subsequente.
Sem mergeSchema ativado, os valores são tratados de acordo com a configuração spark.sql.storeAssignmentPolicy , que por default converte os valores para corresponder ao tipo coluna de destino. Para obter mais informações sobre o comportamento da política de atribuição, consulte Atribuição de armazenamento.
Confirmação de alteração de tipo manual
Ao utilizar uma tabela Delta , você pode fornecer um local de acompanhamento de esquema para rastrear alterações de esquema não aditivas, incluindo alterações de tipo. Fornecer um local de acompanhamento de esquema é obrigatório no Databricks Runtime 18.0 e versões anteriores, e opcional no Databricks Runtime 18.1 e versões posteriores.
- Python
- Scala
checkpoint_path = "/path/to/checkpointLocation"
(spark.readStream
.option("schemaTrackingLocation", checkpoint_path)
.table("delta_source_table")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("output_table")
)
val checkpointPath = "/path/to/checkpointLocation"
spark.readStream
.option("schemaTrackingLocation", checkpointPath)
.table("delta_source_table")
.writeStream
.option("checkpointLocation", checkpointPath)
.toTable("output_table")
Após fornecer um local de acompanhamento de esquema, a transmissão evolui seu esquema rastreado quando detecta uma mudança de tipo e então para. Nesse momento, você pode tomar qualquer medida necessária para lidar com a mudança de tipo, como habilitar a ampliação de tipo na tabela subsequente ou atualizar a consulta de transmissão.
Para retomar o processamento, defina a configuração do Spark spark.databricks.delta.streaming.allowSourceColumnTypeChange ou a opção de leitura do DataFrame allowSourceColumnTypeChange:
- Python
- Scala
- SQL
checkpoint_path = "/path/to/checkpointLocation"
(spark.readStream
.option("schemaTrackingLocation", checkpoint_path)
.option("allowSourceColumnTypeChange", "<delta_source_table_version>")
# alternatively to allow all future type changes for this stream:
# .option("allowSourceColumnTypeChange", "always")
.table("delta_source_table")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("output_table")
)
val checkpointPath = "/path/to/checkpointLocation"
spark.readStream
.option("schemaTrackingLocation", checkpointPath)
.option("allowSourceColumnTypeChange", "<delta_source_table_version>")
// alternatively to allow all future type changes for this stream:
// .option("allowSourceColumnTypeChange", "always")
.table("delta_source_table")
.writeStream
.option("checkpointLocation", checkpointPath)
.toTable("output_table")
-- To unblock for this particular stream just for this series of schema change(s):
SET spark.databricks.delta.streaming.allowSourceColumnTypeChange.ckpt_<checkpoint_id> = "<delta_source_table_version>"
-- To unblock for this particular stream:
SET spark.databricks.delta.streaming.allowSourceColumnTypeChange = "<delta_source_table_version>"
-- To unblock for all streams:
SET spark.databricks.delta.streaming.allowSourceColumnTypeChange = "always"
O ID do ponto de verificação <checkpoint_id> e a versão da tabela de origem Delta Lake <delta_source_table_version> são exibidos na mensagem de erro quando a transmissão é interrompida.
Pipeline declarativoLakeFlow Spark
Você pode habilitar a ampliação de tipos para o pipeline declarativo LakeFlow Spark no nível pipeline ou para tabelas individuais. O alargamento de tipos permite que os tipos de coluna sejam alargados automaticamente durante a execução pipeline , sem a necessidade de uma refresh completa das tabelas de transmissão. As alterações de tipo em visões materializadas sempre acionam um recálculo completo e, quando uma alteração de tipo é aplicada a uma tabela de origem, as visões materializadas que dependem dessa tabela também precisam ser recálculadas completamente para refletir os novos tipos.
Habilitar a ampliação de tipos para todo o pipeline
Para habilitar a ampliação de tipos para todas as tabelas em um pipeline, defina a configuração do pipeline pipelines.enableTypeWidening:
- JSON
- YAML
{
"configuration": {
"pipelines.enableTypeWidening": "true"
}
}
configuration:
pipelines.enableTypeWidening: 'true'
Habilitar a ampliação de tipos para tabelas específicas
Você também pode habilitar a ampliação de tipos para tabelas individuais definindo a propriedade da tabela delta.enableTypeWidening:
- Python
- SQL
import dlt
@dlt.table(
table_properties={"delta.enableTypeWidening": "true"}
)
def my_table():
return spark.readStream.table("source_table")
CREATE OR REFRESH STREAMING TABLE my_table
TBLPROPERTIES ('delta.enableTypeWidening' = 'true')
AS SELECT * FROM source_table
Compatibilidade com leitores subsequentes
Tabelas com ampliação de tipo habilitada só podem ser lidas no Databricks Runtime 15.4 LTS e versões superiores. Se você deseja que uma tabela com ampliação de tipo habilitada em seu pipeline seja legível para leitores no Databricks Runtime 14.3 e versões anteriores, você deve:
- Desative a ampliação de tipo removendo a propriedade
delta.enableTypeWidening/pipelines.enableTypeWideningou definindo-a como falsa e acione uma refresh completa da tabela. - Ative Modede Compatibilidade no seu tablet.
Delta Sharing
O suporte para Type Widening em Delta Sharing está disponível em Databricks Runtime 16.1 e acima.
O compartilhamento de uma tabela Delta Lake com a ampliação de tipos ativada é suportado em Databricks-to-Databricks Delta Sharing. O provedor e o destinatário devem estar em Databricks Runtime 16.1 ou acima.
Para ler o Change Data Feed de uma tabela do Delta Lake com ampliação de tipo ativada usando o Delta Sharing, o senhor deve definir o formato de resposta como delta:
spark.read
.format("deltaSharing")
.option("responseFormat", "delta")
.option("readChangeFeed", "true")
.option("startingVersion", "<start version>")
.option("endingVersion", "<end version>")
.load("<table>")
A leitura do feed de dados de alteração em todas as alterações de tipo não é suportada. Em vez disso, o senhor deve dividir as operações em duas leituras separadas, uma terminando na versão da tabela que contém a alteração de tipo e a outra começando na versão que contém a alteração de tipo.
Limitações
Compatibilidade com o Apache Iceberg
Apache Iceberg não é compatível com todas as alterações de tipo cobertas pela ampliação de tipo, consulte Iceberg evolução do esquema. Em particular, o Databricks não é compatível com as seguintes alterações de tipo:
byte,short,int,longadecimaloudouble- aumento da escala decimal
dateparatimestampNTZ
Ao habilitar o UniForm com compatibilidade com o Iceberg em uma tabela do Delta Lake, a aplicação de uma dessas alterações de tipo resulta em um erro. Consulte a seção "Ler tabelas Delta com clientes Iceberg".
Se você aplicar uma dessas alterações de tipo não suportadas a uma tabela do Delta Lake, terá duas opções:
-
Regenerar metadados do Iceberg: Use o seguinte comando para regenerar os metadados do Iceberg sem o recurso de ampliação da tabela de tipos:
SQLALTER TABLE <table-name> SET TBLPROPERTIES ('delta.universalFormat.config.icebergCompatVersion' = '<version>')Isso permite manter a compatibilidade com o Uniform mesmo após aplicar alterações de tipo incompatíveis.
-
Desativar o recurso de ampliação de tabela de tipos: consulte Desativar o recurso de ampliação de tabela de tipos.
Funções dependentes do tipo
Algumas funções SQL retornam resultados que dependem do tipo de dados de entrada. Por exemplo, a funçãohash retorna valores de hash diferentes para o mesmo valor lógico se o tipo do argumento for diferente: hash(1::INT) retorna um resultado diferente de hash(1::BIGINT).
Outras funções dependentes do tipo são: xxhash64, bit_get, bit_reverse, typeof.
Para obter resultados estáveis em consultas que utilizam essas funções, converta explicitamente os valores para o tipo desejado:
- Python
- Scala
- SQL
spark.read.table("table_name") \
.selectExpr("hash(CAST(column_name AS BIGINT))")
spark.read.table("main.johan_lasperas.dlt_type_widening_bronze2")
.selectExpr("hash(CAST(a AS BIGINT))")
-- Use explicit casting for stable hash values
SELECT hash(CAST(column_name AS BIGINT)) FROM table_name
Outras limitações
- Não é possível fornecer um local de acompanhamento de esquema usando SQL quando apresentado a partir de uma tabela Delta Lake com uma alteração de tipo.
- Não é possível compartilhar uma tabela com a ampliação de tipo ativada para consumidores que não sejam doDatabricks usando Delta Sharing.