Atualizar o esquema da tabela Delta Lake

O Delta Lake permite atualizar o esquema de uma mesa. Os seguintes tipos de alterações são suportados:

  • Adicionando novas colunas (em posições arbitrárias)

  • Reordenando colunas existentes

  • Renomeando colunas existentes

Você pode fazer essas alterações explicitamente usando DDL ou implicitamente usando DML.

Importante

Uma atualização em um esquema de tabela Delta é uma operação que entra em conflito com todas as operações de gravação Delta concorrentes.

Ao atualizar o esquema de uma tabela Delta, as transmissões que leem dessa tabela são interrompidos.Se você quiser que a transmissão continue, você deve reiniciá-la. Para métodos recomendados, consulte Considerações de produção para transmissão estruturada.

Atualize explicitamente o esquema para adicionar colunas

ALTER TABLE table_name ADD COLUMNS (col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)

Por padrão, a anulabilidade é true.

Para adicionar uma coluna a um campo aninhado, utilize:

ALTER TABLE table_name ADD COLUMNS (col_name.nested_col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)

Por exemplo, se o esquema antes de executar o ALTER TABLE boxes ADD COLUMNS (colB.nested STRING AFTER field1) for:

- root
| - colA
| - colB
| +-field1
| +-field2

o esquema depois é:

- root
| - colA
| - colB
| +-field1
| +-nested
| +-field2

Observação

A adição de colunas aninhadas é compatível apenas para estruturas. Matrizes e mapas não são compatíveis.

Atualizar explicitamente o esquema para alterar o comentário ou a ordenação da coluna

ALTER TABLE table_name ALTER [COLUMN] col_name (COMMENT col_comment | FIRST | AFTER colA_name)

Para alterar uma coluna em um campo aninhado, use:

ALTER TABLE table_name ALTER [COLUMN] col_name.nested_col_name (COMMENT col_comment | FIRST | AFTER colA_name)

Por exemplo, se o esquema antes de executar o ALTER TABLE boxes ALTER COLUMN colB.field2 FIRST for:

- root
| - colA
| - colB
| +-field1
| +-field2

o esquema depois é:

- root
| - colA
| - colB
| +-field2
| +-field1

Atualizar explicitamente o esquema para substituir colunas

ALTER TABLE table_name REPLACE COLUMNS (col_name1 col_type1 [COMMENT col_comment1], ...)

Por exemplo, ao executar o seguinte DDL:

ALTER TABLE boxes REPLACE COLUMNS (colC STRING, colB STRUCT<field2:STRING, nested:STRING, field1:STRING>, colA STRING)

se o esquema anterior for:

- root
| - colA
| - colB
| +-field1
| +-field2

o esquema depois é:

- root
| - colC
| - colB
| +-field2
| +-nested
| +-field1
| - colA

Atualizar explicitamente o esquema para renomear colunas

Observação

Esse recurso está disponível em Databricks Runtime 10.4 LTS e acima.

Para renomear colunas sem alterar os dados existentes das colunas, é necessário ativar o mapeamento de colunas para a tabela. Consulte Renomear e eliminar colunas com o mapeamento de colunas do Delta Lake.

Para renomear uma coluna:

ALTER TABLE table_name RENAME COLUMN old_col_name TO new_col_name

Para renomear um campo aninhado:

ALTER TABLE table_name RENAME COLUMN col_name.old_nested_field TO new_nested_field

Por exemplo, quando você executa o seguinte comando:

ALTER TABLE boxes RENAME COLUMN colB.field1 TO field001

Se o esquema anterior for:

- root
| - colA
| - colB
| +-field1
| +-field2

Então o esquema depois é:

- root
| - colA
| - colB
| +-field001
| +-field2

Consulte Renomear e eliminar colunas com o mapeamento de colunas do Delta Lake.

Atualizar explicitamente o esquema para soltar colunas

Observação

Esse recurso está disponível em Databricks Runtime 11.3 LTS e acima.

Para eliminar colunas como uma operação que afeta apenas os metadados sem reescrever arquivos de dados, é necessário ativar o mapeamento de colunas para a tabela.Consulte Renomear e eliminar colunas com o mapeamento de colunas do Delta Lake.

Importante

Soltar uma coluna de metadados não exclui os dados subjacentes da coluna em arquivos. Para limpar os dados da coluna descartada, você pode usar REORG TABLE para regravar arquivos. Em seguida, você pode usar o VACUUM para excluir fisicamente os arquivos que contêm os dados da coluna descartada.

Para soltar uma coluna:

ALTER TABLE table_name DROP COLUMN col_name

Para soltar várias colunas:

ALTER TABLE table_name DROP COLUMNS (col_name_1, col_name_2)

Atualizar explicitamente o esquema para alterar o tipo ou o nome da coluna

Você pode modificar o tipo ou nome de uma coluna ou eliminar uma coluna reescrevendo a tabela. Para fazer isto, utilize a opção overwriteSchema.

O exemplo a seguir mostra a alteração de um tipo de coluna:

(spark.read.table(...)
  .withColumn("birthDate", col("birthDate").cast("date"))
  .write
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .saveAsTable(...)
)

O exemplo a seguir mostra a alteração de um nome de coluna:

(spark.read.table(...)
  .withColumnRenamed("dateOfBirth", "birthDate")
  .write
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .saveAsTable(...)
)

Permitir a evolução do esquema

O senhor pode ativar a evolução do esquema seguindo um dos seguintes procedimentos:

Databricks recomenda ativar a evolução do esquema para cada operação de gravação em vez de definir um Spark conf.

Quando o senhor usa opções ou sintaxe para ativar a evolução do esquema em uma operação de gravação, isso tem precedência sobre o Spark conf.

Observação

Não há cláusula de evolução do esquema para declarações INSERT INTO.

Habilitar a evolução do esquema para gravações para adicionar novas colunas

As colunas presentes na consulta de origem, mas ausentes na tabela de destino, são adicionadas automaticamente como parte de uma transação de gravação quando a evolução do esquema está ativada. Consulte Habilitar a evolução do esquema.

O caso é preservado ao anexar uma nova coluna. Novas colunas são adicionadas ao final do esquema da tabela. Se as colunas adicionais estiverem em uma estrutura, elas serão anexadas ao final da estrutura na tabela de destino.

O exemplo a seguir demonstra o uso da opção mergeSchema com o Auto Loader. Consulte O que é o Auto Loader?

(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>")
  .load("<path-to-source-data>")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .trigger(availableNow=True)
  .toTable("table_name")
)

O exemplo a seguir demonstra o uso da opção mergeSchema com um lote de operações de gravação:

(spark.read
  .table(source_table)
  .write
  .option("mergeSchema", "true")
  .mode("append")
  .saveAsTable("table_name")
)

Evolução automática do esquema para mesclagem do Delta Lake

A evolução de esquema permite que os usuários resolvam discrepâncias de esquema entre a tabela de destino e a tabela de origem durante a operação de mesclagem (merge).Ele lida com os dois casos a seguir:

  1. Uma coluna na tabela de origem não está presente na tabela de destino. A nova coluna é adicionada ao esquema de destino e seus valores são inseridos ou atualizados usando os valores de origem.

  2. Uma coluna na tabela de destino não está presente na tabela de origem. O esquema de destino permanece inalterado; os valores na coluna de destino adicional são deixados inalterados (para UPDATE) ou definidos como NULL (para INSERT).

O senhor deve ativar manualmente a evolução automática do esquema. Consulte Habilitar a evolução do esquema.

Observação

Em Databricks Runtime 12.2 LTS e acima, as colunas e os campos struct presentes na tabela de origem podem ser especificados por nome nas ações de inserção ou atualização. Em Databricks Runtime 11.3 LTS e abaixo, somente as ações INSERT * ou UPDATE SET * podem ser usadas para a evolução do esquema com merge.

Em Databricks Runtime 13.3 LTS e acima, o senhor pode usar a evolução do esquema com structs aninhados dentro de mapas, como map<int, struct<a: int, b: int>>.

Sintaxe de evolução do esquema para mesclagem

Em Databricks Runtime 15.2 e acima, o senhor pode especificar a evolução do esquema em uma declaração merge usando a tabela SQL ou Delta APIs:

MERGE WITH SCHEMA EVOLUTION INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET *
WHEN NOT MATCHED THEN
  INSERT *
WHEN NOT MATCHED BY SOURCE THEN
  DELETE
from delta.tables import *

(targetTable
  .merge(sourceDF, "source.key = target.key")
  .withSchemaEvolution()
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .whenNotMatchedBySourceDelete()
  .execute()
)
import io.delta.tables._

targetTable
  .merge(sourceDF, "source.key = target.key")
  .withSchemaEvolution()
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .whenNotMatchedBySource()
  .delete()
  .execute()

Exemplo de operações do site merge com evolução do esquema

Aqui estão alguns exemplos dos efeitos da merge operação com e sem a evolução do esquema.

Colunas

Consulta (em SQL)

Comportamento sem evolução do esquema (padrão)

Comportamento com evolução do esquema

Colunas-alvo: key, value

Colunas de origem: key, value, new_value

MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN MATCHED
  THEN UPDATE SET *
WHEN NOT MATCHED
  THEN INSERT *

O esquema da tabela permanece inalterado; somente as colunas value são key atualizadas/inseridas.

O esquema da tabela é alterado para (key, value, new_value). Os registros existentes com correspondências são atualizados com o value e new_value na origem. Novas linhas são inseridas com o esquema (key, value, new_value).

Colunas-alvo: key, old_value

Colunas de origem: key, new_value

MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN MATCHED
  THEN UPDATE SET *
WHEN NOT MATCHED
  THEN INSERT *

UPDATE e INSERT as ações geram um erro porque a coluna de destino não old_value está na origem.

O esquema da tabela é alterado para (key, old_value, new_value). Os registros existentes com correspondências são atualizados com o new_value na origem deixando o old_value inalterado. Novos registros são inseridos com o key, new_value e NULL especificados para o old_value.

Colunas-alvo: key, old_value

Colunas de origem: key, new_value

MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN MATCHED
  THEN UPDATE SET new_value = s.new_value

UPDATE linha um erro porque a coluna new_value não existe na tabela de destino.

O esquema da tabela é alterado para (key, old_value, new_value). Os registros existentes com correspondências são atualizados com o new_value na origem deixando old_value inalterado e os registros não correspondentes têm NULL inseridos para new_value. Ver nota (1).

Colunas-alvo: key, old_value

Colunas de origem: key, new_value

MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN NOT MATCHED
  THEN INSERT (key, new_value) VALUES (s.key, s.new_value)

INSERT linha um erro porque a coluna new_value não existe na tabela de destino.

O esquema da tabela é alterado para (key, old_value, new_value). Novos registros são inseridos com os key, new_value e NULL especificados para o old_value. Os registros existentes NULL inseridos para new_value deixando old_value inalterados. Ver nota (1).

(1) Esse comportamento está disponível em Databricks Runtime 12.2 LTS e acima; Databricks Runtime 11.3 LTS e abaixo erro nessa condição.

Excluir colunas com mesclagem Delta Lake

Em Databricks Runtime 12.2 LTS e acima, o senhor pode usar cláusulas EXCEPT nas condições de merge para excluir explicitamente as colunas. O comportamento da palavra-chave EXCEPT varia de acordo com a ativação ou não da evolução do esquema.

Com a evolução do esquema desabilitada, a palavra-chave EXCEPT se aplica à lista de colunas na tabela de destino e permite excluir colunas de UPDATE ou INSERT ações. As colunas excluídas são definidas como null.

Com a evolução do esquema habilitada, a palavra-chave EXCEPT se aplica à lista de colunas na tabela de origem e permite excluir colunas da evolução do esquema. Uma nova coluna na fonte que não esteja presente no destino não será adicionada ao esquema de destino se estiver listada na cláusula EXCEPT. Colunas excluídas que já estão presentes no destino estão definidas como null.

Os exemplos a seguir demonstram esta sintaxe:

Colunas

Consulta (em SQL)

Comportamento sem evolução do esquema (padrão)

Comportamento com evolução do esquema

Colunas-alvo: id, title, last_updated

Colunas de origem: id, title, review, last_updated

MERGE INTO target t
USING source s
ON t.id = s.id
WHEN MATCHED
  THEN UPDATE SET last_updated = current_date()
WHEN NOT MATCHED
  THEN INSERT * EXCEPT (last_updated)

As linhas correspondentes são atualizadas definindo o campo last_updated com a data atual. Novas linhas são inseridas utilizando valores para id e title. O campo excluído last_updated está definido como null. O campo review é ignorado porque não está no destino.

As linhas correspondentes são atualizadas definindo o campo last_updated como a data atual. O esquema foi desenvolvido para adicionar o campo review. Novas linhas são inseridas usando todos os campos de origem, exceto last_updated, que é definido como null.

Colunas-alvo: id, title, last_updated

Colunas de origem: id, title, review, internal_count

MERGE INTO target t
USING source s
ON t.id = s.id
WHEN MATCHED
  THEN UPDATE SET last_updated = current_date()
WHEN NOT MATCHED
  THEN INSERT * EXCEPT (last_updated, internal_count)

INSERT linha um erro porque a coluna internal_count não existe na tabela de destino.

As linhas correspondentes são atualizadas definindo o campo last_updated como a data atual. O campo review é adicionado à tabela de destino, mas o campo internal_count é ignorado. As novas linhas inseridas têm last_updated definido como null.

Como lidar com colunas NullType em atualizações de esquemas

Como o Parquet não é compatível NullType, as colunas NullType são descartadas do DataFrame ao gravar em tabelas Delta, mas ainda são armazenadas no esquema. Quando um tipo de dados diferente é recebido para essa coluna, o Delta Lake mescla o esquema com o novo tipo de dados. Se o Delta Lake receber um NullType para uma coluna existente, o esquema antigo será mantido e a nova coluna será descartada durante a gravação.

NullType em transmissão não é compatível. Como você deve definir esquemas ao usar o streaming, isso deve ser muito raro. NullType também não é aceito para tipos complexos, como ArrayType MapType e.

Substituir esquema de tabela

Por padrão, substituir os dados em uma tabela não substitui o esquema. Ao sobrescrever uma tabela usando mode("overwrite") sem replaceWhere, talvez você ainda queira substituir o esquema dos dados que estão sendo gravados. Você substitui o esquema e o particionamento da tabela definindo a opção overwriteSchema como true:

df.write.option("overwriteSchema", "true")

Importante

Você não pode especificar overwriteSchema como true ao usar a sobregravação de partição dinâmica.