Níveis de isolamento e conflitos de gravação em Databricks

O nível de isolamento de uma tabela define o grau em que uma transação deve ser isolada de modificações feitas por operações concorrentes. Os conflitos de gravação no Databricks dependem do nível de isolamento.

O Delta Lake fornece garantias ACID de transações entre leituras e gravações. Isso significa que:

  • Vários gravadores em vários clusters podem modificar simultaneamente uma partição de tabela. Os gravadores veem uma view Snapshot consistente da tabela e as gravações ocorrem em uma ordem serial.

    • Os leitores continuam a ver uma view Snapshot consistente da tabela com a qual o Job do Databricks começa, mesmo quando uma tabela é modificada durante um Job.

Consulte O que são garantias ACID no Databricks?.

Observação

Databricks usa Delta Lake para todas as tabelas por default. Este artigo descreve o comportamento do Delta Lake no Databricks.

Importante

As alterações de metadados fazem com que todas as operações de gravação concorrente falhem. Essas operações incluem alterações no protocolo da tabela, nas propriedades da tabela ou no esquema de dados.

As leituras de transmissão falham quando encontram um commit que altera os metadados da tabela. Se quiser que a transmissão continue, o senhor deve reiniciá-la. Para conhecer os métodos recomendados, consulte Considerações sobre produção para transmissão estruturada.

A seguir, exemplos de consultas que alteram os metadados:

-- Set a table property.
ALTER TABLE table-name SET TBLPROPERTIES ('delta.isolationLevel' = 'Serializable')

-- Enable a feature using a table property and update the table protocol.
ALTER TABLE table_name SET TBLPROPERTIES ('delta.enableDeletionVectors' = true);

-- Drop a table feature.
ALTER TABLE table_name DROP FEATURE deletionVectors;

-- Upgrade to UniForm.
REORG TABLE table_name APPLY (UPGRADE UNIFORM(ICEBERG_COMPAT_VERSION=2));

-- Update the table schema.
ALTER TABLE table_name ADD COLUMNS (col_name STRING);

Escreva conflitos com simultaneidade em nível de linha

A simultaneidade em nível de linha reduz conflitos entre operações de gravação simultâneas, detectando alterações em nível de linha e resolvendo automaticamente conflitos que ocorrem quando gravações simultâneas atualizam ou excluem linhas diferentes no mesmo arquivo de dados.

A simultaneidade em nível de linha está geralmente disponível no Databricks Runtime 14.2 e acima. A simultaneidade em nível de linha é suportada pelo site default nas seguintes condições:

  • Tabelas com vetores de exclusão ativados e sem particionamento.

  • Tabelas com clusters líquidos, a menos que o senhor tenha desativado os vetores de exclusão.

As tabelas com partições não oferecem suporte à simultaneidade em nível de linha, mas ainda podem evitar conflitos entre OPTIMIZE e todas as outras operações de gravação quando os vetores de exclusão estão ativados. Consulte Limitações para simultaneidade em nível de linha.

Para outras versões do Databricks Runtime, consulte Comportamento de visualização de simultaneidade em nível de linha (legado).

A tabela a seguir descreve quais pares de operações de gravação podem entrar em conflito em cada nível de isolamento com a simultaneidade em nível de linha habilitada.

Observação

Tabelas com colunas de identidade não suportam transações concorrentes. Consulte Usar colunas de identidade no Delta Lake.

INSERIR (1)

ATUALIZAR, EXCLUIR, merge EM

Otimizar

INSERIR

Não pode entrar em conflito

ATUALIZAR, EXCLUIR, merge EM

Não é possível entrar em conflito em WriteSerializable. Pode entrar em conflito em Serializable ao modificar a mesma linha; consulte Limitações para simultaneidade em nível de linha.

MERGE INTO requer Photon para resolução de conflitos em nível de linha. Pode entrar em conflito ao modificar a mesma linha; consulte Limitações para simultaneidade em nível de linha.

Otimizar

Não pode entrar em conflito

Não pode entrar em conflito

Não pode entrar em conflito

Importante

(1) Todas as operações INSERT nas tabelas acima descrevem operações de acréscimo que não leem nenhum dado da mesma tabela antes de commit. As operações INSERT que contêm subconsultas que leem a mesma tabela suportam a mesma simultaneidade que MERGE.

Escreva conflitos sem simultaneidade em nível de linha

A tabela a seguir descreve quais pares de operações de gravação podem entrar em conflito em cada nível de isolamento.

As tabelas não suportam simultaneidade em nível de linha se tiverem partições definidas ou não tiverem vetores de exclusão habilitados. O Databricks Runtime 14.2 ou acima é necessário para simultaneidade em nível de linha.

Observação

Tabelas com colunas de identidade não suportam transações concorrentes. Consulte Usar colunas de identidade no Delta Lake.

INSERIR (1)

ATUALIZAR, EXCLUIR, merge EM

Otimizar

INSERIR

Não pode entrar em conflito

ATUALIZAR, EXCLUIR, merge EM

Não é possível entrar em conflito em WriteSerializable. Pode entrar em conflito em Serializable; veja evitar conflitos com partições.

Pode entrar em conflito em Serializable e WriteSerializable; veja evitar conflitos com partições.

Otimizar

Não pode entrar em conflito

Não é possível entrar em conflito com tabelas com vetores de exclusão ativados. Pode entrar em conflito de outra forma.

Não é possível entrar em conflito com tabelas com vetores de exclusão ativados. Pode entrar em conflito de outra forma.

Importante

(1) Todas as operações INSERT nas tabelas acima descrevem operações de acréscimo que não leem nenhum dado da mesma tabela antes de commit. As operações INSERT que contêm subconsultas que leem a mesma tabela suportam a mesma simultaneidade que MERGE.

Limitações para simultaneidade em nível de linha

Algumas limitações se aplicam à simultaneidade em nível de linha. Para as operações seguintes, a resolução de conflitos segue a simultaneidade normal para conflitos de escrita em Databricks. Consulte Conflitos de gravação sem simultaneidade em nível de linha.

  • OPTIMIZE comandos com ZORDER BY.

  • comando com cláusulas condicionais complexas, incluindo o seguinte:

    • Condições em tipos de dados complexos, como estruturas, matrizes ou mapas.

    • Condições usando expressões e subconsultas não determinísticas.

    • Condições que contêm subconsultas correlacionadas.

  • Para o comando MERGE , você deve usar um predicado explícito na tabela de destino para filtrar as linhas que correspondem à tabela de origem. Para resolução merge , o filtro é usado para verificar apenas as linhas que possam entrar em conflito com base nas condições do filtro em operações simultâneas.

Observação

A detecção de conflitos em nível de linha pode aumentar o tempo total de execução. No caso de muitas transações simultâneas, o escritor prioriza a latência em vez da resolução de conflitos e conflitos podem ocorrer.

Todas as limitações para vetores de exclusão também se aplicam. Consulte Limitações.

Quando Delta Lake confirma sem ler a tabela?

Delta Lake INSERT ou operações de acréscimo não lêem o estado da tabela antes commit se as seguintes condições forem atendidas:

  1. A lógica é expressa usando lógica SQL INSERT ou modo de acréscimo.

  2. A lógica não contém subconsultas ou condicionais que façam referência à tabela de destino das operações de gravação.

Como em outro commit, o Delta Lake valida e resolve as versões da tabela no commit usando metadados nos logs de transação, mas nenhuma versão da tabela é realmente lida.

Observação

Muitos padrões comuns usam operações MERGE para inserir dados com base nas condições da tabela. Embora possa ser possível reescrever essa lógica usando instruções INSERT , se qualquer expressão condicional fizer referência a uma coluna na tabela de destino, essas instruções terão as mesmas limitações de simultaneidade que MERGE.

Escreva níveis de isolamento serializáveis x serializáveis

O nível de isolamento de uma tabela define o grau em que uma transação deve ser isolada de modificações feitas por transações concorrentes. Delta Lake em Databricks dá suporte a dois níveis de isolamento: Serializable e WriteSerializable.

  • Serializável: O nível de isolamento mais forte. Ele garante que as operações de gravação confirmadas e todas as leituras sejam serializáveis. As operações são permitidas desde que exista uma sequência serial de execução uma a uma que gere o mesmo resultado visto na tabela. Para as operações de escrita, a sequência serial é exatamente a mesma vista na história da tabela.

  • WriteSerializable (default): Um nível de isolamento mais fraco que Serializable. Ele garante apenas que as operações de gravação (ou seja, não as leituras) sejam serializáveis. No entanto, isso ainda é mais forte do que o isolamento de instantâneo . WriteSerializable é o nível de isolamento default porque fornece grande equilíbrio de consistência de dados e disponibilidade para as operações mais comuns.

    Neste modo, o conteúdo da tabela Delta pode ser diferente do que se espera da sequência de operações vista na tabela história. Isso ocorre porque esse modo permite que certos pares de gravações concorrentes (digamos, operações X e Y) procedam de forma que o resultado seja como se Y fosse executado antes de X (ou seja, serializável entre eles), mesmo que a história mostrasse que Y foi cometido depois de X. Para impedir essa reordenação, defina o nível de isolamento da tabela como Serializável para causar falha nessas transações.

As operações de leitura sempre usam isolamento Snapshot . O nível de isolamento de escrita determina se é ou não possível para um leitor ver um Snapshot de uma tabela, que segundo a história, “nunca existiu”.

Para o nível Serializable, um leitor sempre vê apenas as tabelas que estão de acordo com a história. Para o nível WriteSerializable, um leitor pode ver uma tabela que não existe nos logs Delta.

Por exemplo, considere txn1, uma exclusão de execução longa e txn2, que insere dados excluídos por txn1. txn2 e txn1 se completam e são registrados nessa ordem na história. Segundo a história, os dados inseridos em txn2 não deveriam existir na tabela. Para o nível serializável, um leitor nunca veria dados inseridos por txn2. No entanto, para o nível WriteSerializable, um leitor pode, em algum momento, ver os dados inseridos por txn2.

Para obter mais informações sobre quais tipos de operações podem entrar em conflito entre si em cada nível de isolamento e os possíveis erros, consulte Evite conflitos usando particionamento e condições de comando separadas.

Definir o nível de isolamento

Você define o nível de isolamento usando o comando ALTER TABLE .

ALTER TABLE <table-name> SET TBLPROPERTIES ('delta.isolationLevel' = <level-name>)

em que <level-name> é Serializable ou WriteSerializable.

Por exemplo, para alterar o nível de isolamento do default WriteSerializable para Serializable, execução:

ALTER TABLE <table-name> SET TBLPROPERTIES ('delta.isolationLevel' = 'Serializable')

Evite conflitos usando particionamento e condições de comando separadas

Em todos os casos marcados como “podem entrar em conflito”, se as duas operações entrarão em conflito depende se elas operam no mesmo conjunto de arquivos. Você pode separar os dois conjuntos de arquivos particionando a tabela pelas mesmas colunas usadas nas condições das operações. Por exemplo, os dois comandos UPDATE table WHERE date > '2010-01-01' ... e DELETE table WHERE date < '2010-01-01' entrarão em conflito se a tabela não for particionada por data, pois ambos podem tentar modificar o mesmo conjunto de arquivos. Particionar a tabela por date evitará o conflito. Portanto, particionar uma tabela de acordo com as condições comumente usadas no comando pode reduzir significativamente os conflitos. No entanto, particionar uma tabela por uma coluna com alta cardinalidade pode levar a outros problemas de desempenho devido ao grande número de subdiretórios.

Exceções de conflito

Quando ocorrer um conflito de transação, você observará uma das seguintes exceções:

ConcurrentAppendException

Esta exceção ocorre quando uma operação concorrente adiciona arquivos na mesma partição (ou em qualquer lugar em uma tabela não particionada) que sua operação lê. As adições de arquivo podem ser causadas por operações INSERT, DELETE, UPDATE ou MERGE .

Com o nível de isolamentodefault de WriteSerializable, os arquivos adicionados por INSERT operações cegas (isto é, operações que anexam dados cegamente sem ler nenhum dado) não entram em conflito com nenhuma operação, mesmo se tocarem na mesma partição (ou em qualquer lugar na uma tabela não particionada). Se o nível de isolamento for definido como Serializable, os anexos cegos poderão entrar em conflito.

Essa exceção geralmente é lançada durante as operações concorrentes DELETE, UPDATE ou MERGE . Embora as operações concorrentes possam estar atualizando fisicamente diferentes diretórios de partição, uma delas pode ler a mesma partição que a outra atualiza simultaneamente, causando um conflito. Você pode evitar isso tornando a separação explícita na condição operações. Considere o seguinte exemplo.

// Target 'deltaTable' is partitioned by date and country
deltaTable.as("t").merge(
    source.as("s"),
    "s.user_id = t.user_id AND s.date = t.date AND s.country = t.country")
  .whenMatched().updateAll()
  .whenNotMatched().insertAll()
  .execute()

Suponha que você execute o código acima simultaneamente para diferentes datas ou países. Como cada Job está trabalhando em uma partição independente na tabela Delta de destino, você não espera nenhum conflito. No entanto, a condição não é explícita o suficiente e pode varrer a tabela inteira e entrar em conflito com operações concorrentes que atualizam quaisquer outras partições. Em vez disso, você pode reescrever sua declaração para adicionar data e país específicos à condição de merge , conforme mostrado no exemplo a seguir.

// Target 'deltaTable' is partitioned by date and country
deltaTable.as("t").merge(
    source.as("s"),
    "s.user_id = t.user_id AND s.date = t.date AND s.country = t.country AND t.date = '" + <date> + "' AND t.country = '" + <country> + "'")
  .whenMatched().updateAll()
  .whenNotMatched().insertAll()
  .execute()

Esta operação agora é segura para execução simultânea em diferentes datas e países.

ConcurrentDeleteReadException

Essa exceção ocorre quando uma operação concorrente excluiu um arquivo lido por sua operação. Causas comuns são operações DELETE, UPDATE ou MERGE que regravam arquivos.

ConcurrentDeleteDeleteException

Essa exceção ocorre quando uma operação concorrente excluiu um arquivo que sua operação também excluiu. Isso pode ser causado por duas operações de compactação concorrentes que reescrevem os mesmos arquivos.

MetadataChangedException

Essa exceção ocorre quando uma transação concorrente atualiza os metadados de uma tabela Delta. Causas comuns são operações ALTER TABLE ou gravações em sua tabela Delta que atualizam o esquema da tabela.

ConcurrentTransactionException

Se uma query transmitida usando o mesmo ponto de verificação começar várias vezes simultaneamente e tentar gravar na tabela Delta ao mesmo tempo. Você nunca deve ter duas query de transmissão usando o mesmo ponto de verificação e execução ao mesmo tempo.

ProtocolChangedException

Essa exceção pode ocorrer nos seguintes casos:

  • Quando sua tabela Delta é atualizada para uma nova versão de protocolo. Para que as operações futuras sejam bem-sucedidas, talvez seja necessário atualizar seu Databricks Runtime.

  • Quando vários gravadores estão criando ou substituindo uma tabela ao mesmo tempo.

  • Quando vários gravadores estão gravando em um caminho vazio ao mesmo tempo.

Consulte Como o Databricks gerencia a compatibilidade de recursos do Delta Lake? para mais detalhes.

Comportamento de visualização de simultaneidade em nível de linha (legado)

Esta seção descreve comportamentos de visualização para simultaneidade em nível de linha no Databricks Runtime 14.1 e abaixo. A simultaneidade em nível de linha sempre requer vetores de exclusão.

No Databricks Runtime 13.3 LTS e acima, as tabelas com clusters líquidos habilitados habilitam automaticamente a simultaneidade em nível de linha.

No Databricks Runtime 14.0 e 14.1, você pode habilitar a simultaneidade em nível de linha para tabelas com vetores de exclusão definindo a seguinte configuração para os clusters ou SparkSession:

spark.databricks.delta.rowLevelConcurrencyPreview = true

No Databricks Runtime 14.1 e abaixo, compute não-Photon só dá suporte à simultaneidade em nível de linha para operações DELETE.