Substituir seletivamente os dados com o Delta Lake
O Databricks aproveita a funcionalidade do Delta Lake para oferecer suporte a duas opções distintas para substituições seletivas:
- A opção
replaceWhere
substitui atomicamente todos os registros que correspondem a um determinado predicado. - Você pode substituir diretórios de dados com base em como as tabelas são particionadas usando substituições de partição dinâmica.
Para a maioria das operações, a Databricks recomenda usar replaceWhere
para especificar quais dados devem ser substituídos.
Se os dados tiverem sido sobrescritos acidentalmente, você poderá usar a restauração para desfazer a alteração.
Substituição seletiva arbitrária por replaceWhere
Você pode substituir seletivamente somente os dados que correspondam a uma expressão arbitrária.
SQL requer Databricks Runtime 12.2 LTS ou acima.
O comando a seguir substitui atomicamente os eventos em janeiro na tabela de destino, que é particionada por start_date
, pelos dados em replace_data
:
- Python
- Scala
- SQL
(replace_data.write
.mode("overwrite")
.option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
.table("events")
)
replace_data.write
.mode("overwrite")
.option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
.table("events")
INSERT INTO TABLE events REPLACE WHERE start_date >= '2017-01-01' AND end_date <= '2017-01-31' SELECT * FROM replace_data
Esse código de exemplo grava os dados em replace_data
, valida se todas as linhas correspondem ao predicado e executa uma substituição atômica usando a semântica overwrite
. Se algum valor nas operações estiver fora da restrição, essa operação falhará com um erro em default.
Você pode alterar esse comportamento para valores overwrite
dentro do intervalo de predicados e insert
registros que estão fora do intervalo especificado. Para fazer isso, desative a verificação de restrição definindo spark.databricks.delta.replaceWhere.constraintCheck.enabled
como false usando uma das seguintes configurações:
- Python
- Scala
- SQL
spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", False)
spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", false)
SET spark.databricks.delta.replaceWhere.constraintCheck.enabled=false
Comportamento legado
O comportamento herdado do site default fazia com que replaceWhere
sobrescrevesse os dados que correspondiam a um predicado somente sobre colunas de partição. Com esse modelo legado, o comando a seguir substituiria atomicamente o mês de janeiro na tabela de destino, que é particionada por date
, pelos dados em df
:
- Python
- Scala
(df.write
.mode("overwrite")
.option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
.table("people10m")
)
df.write
.mode("overwrite")
.option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
.table("people10m")
Se quiser voltar ao comportamento antigo, você pode desativar o sinalizador spark.databricks.delta.replaceWhere.dataColumns.enabled
:
- Python
- Scala
- SQL
spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", False)
spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", false)
SET spark.databricks.delta.replaceWhere.dataColumns.enabled=false
Sobrescreve partições dinâmicas
A substituição dinâmica de partições atualiza apenas as partições para as quais os novos dados foram gravados. Ele sobrescreve todos os dados existentes nessas partições e deixa os outros inalterados.
A Databricks oferece suporte a duas abordagens:
REPLACE USING
(recomendado) - Funciona em todos os tipos de compute, incluindo Databricks SQL warehouses, serverless compute e compute clássico. Não é necessário definir uma configuração de sessão Spark.partitionOverwriteMode
(legado) - Requer o compute clássico e a configuração de uma sessão Spark. Não compatível com Databricks SQL ou serverless compute.
As seções abaixo demonstram como utilizar cada abordagem.
A partição dinâmica sobrescreve com REPLACE USING
Visualização
Esse recurso está em Public Preview.
Databricks Runtime A versão 16.3 e superior suporta sobrescritas dinâmicas de partições para tabelas particionadas utilizando o comando ` REPLACE USING
`. Este método permite substituir seletivamente os dados em todos os tipos de compute, sem a necessidade de definir uma configuração de sessão Spark. REPLACE USING
Permite um comportamento de sobrescrita atômico e independente de computeque funciona em armazéns Databricks SQL, serverless compute e compute clássicos.
REPLACE USING
sobrescreve apenas as partições de destino dos dados recebidos. Todas as outras partições permanecem inalteradas.
O exemplo a seguir demonstra como usar a substituição dinâmica de partições com REPLACE USING
. Atualmente, é possível utilizar apenas SQL, não Python ou Scala. Para obter detalhes, consulte INSERT na referência da linguagem SQL.
INSERT INTO TABLE events
REPLACE USING (event_id, start_date)
SELECT * FROM source_data
Lembre-se das seguintes restrições e comportamentos para substituições dinâmicas de partições:
- Você deve especificar o conjunto completo das colunas de partição da tabela na cláusula
USING
. - Sempre valide se os dados gravados tocam somente nas partições esperadas. Uma única linha na partição errada pode, sem querer, sobrescrever a partição inteira.
Se você precisar de uma lógica de correspondência mais personalizável do que a que REPLACE USING
suporta, como tratar os valores de NULL
como iguais, use o REPLACE ON
complementar. Consulte INSERT
para obter detalhes.
A partição dinâmica sobrescreve com partitionOverwriteMode
(legacy)
Visualização
Esse recurso está em Public Preview.
Databricks Runtime 11.3 O LTS e acima suportam sobrescritas dinâmicas de partições para tabelas particionadas usando o modo de sobrescrita: INSERT OVERWRITE
em SQL ou uma gravação DataFrame com df.write.mode("overwrite")
. Este tipo de substituição está disponível apenas para armazéns clássicos compute, não para armazéns Databricks SQL ou serverless compute.
Configure o modo de substituição dinâmica de partição definindo a configuração da sessão Spark spark.sql.sources.partitionOverwriteMode
como dynamic
. Como alternativa, você pode definir a opção DataFrameWriter
partitionOverwriteMode
como dynamic
. Se presente, a opção específica da consulta substitui o modo definido na configuração da sessão. O endereço de e-mail do default para spark.sql.sources.partitionOverwriteMode
é static
.
O exemplo a seguir demonstra o uso de partitionOverwriteMode
:
- SQL
- Python
- Scala
SET spark.sql.sources.partitionOverwriteMode=dynamic;
INSERT OVERWRITE TABLE default.people10m SELECT * FROM morePeople;
(df.write
.mode("overwrite")
.option("partitionOverwriteMode", "dynamic")
.saveAsTable("default.people10m")
)
df.write
.mode("overwrite")
.option("partitionOverwriteMode", "dynamic")
.saveAsTable("default.people10m")
Lembre-se das seguintes restrições e comportamentos para partitionOverwriteMode
:
- Você não pode definir
overwriteSchema
comotrue
. - Não é possível especificar
partitionOverwriteMode
ereplaceWhere
nas mesmas operaçõesDataFrameWriter
. - Se você especificar uma condição
replaceWhere
usando uma opçãoDataFrameWriter
, o Delta Lake aplicará essa condição para controlar quais dados serão sobrescritos. Essa opção tem precedência sobre a configuraçãopartitionOverwriteMode
em nível de sessão. - Sempre valide se os dados gravados tocam somente nas partições esperadas. Uma única linha na partição errada pode, sem querer, sobrescrever a partição inteira.