As APIs AUTO CDC : Simplifique a captura de dados de alterações (CDC) com o pipeline declarativo LakeFlow
O pipeline declarativo LakeFlow simplifica a captura de dados de alterações (CDC) (CDC) com as APIs AUTO CDC
e AUTO CDC FROM SNAPSHOT
.
As APIs AUTO CDC
substituem as APIs APPLY CHANGES
e têm a mesma sintaxe. As APIs APPLY CHANGES
ainda estão disponíveis, mas a Databricks recomenda usar as APIs AUTO CDC
em seu lugar.
A interface que você usa depende da origem dos dados de alteração:
- Use
AUTO CDC
para processar alterações de um feed de dados de alterações (CDF). - Use
AUTO CDC FROM SNAPSHOT
(visualização pública e disponível somente para Python) para processar alterações no instantâneo do banco de dados.
Anteriormente, a instrução MERGE INTO
era comumente usada para processar registros CDC no Databricks. Entretanto, MERGE INTO
pode produzir resultados incorretos devido a registros fora de sequência ou requer lógica complexa para reordenar os registros.
A API AUTO CDC
é suportada nas interfaces SQL e Python do pipeline declarativo LakeFlow . A API AUTO CDC FROM SNAPSHOT
é suportada na interface Python do pipeline declarativo LakeFlow .
Tanto AUTO CDC
quanto AUTO CDC FROM SNAPSHOT
oferecem suporte à atualização de tabelas usando SCD tipo 1 e tipo 2:
- Use o SCD tipo 1 para atualizar registros diretamente. a história não é mantida para registros atualizados.
- Use o SCD tipo 2 para manter um histórico de registros, em todas as atualizações ou em atualizações de um conjunto específico de colunas.
Para sintaxe e outras referências, consulte AUTO CDC para SQLdo pipeline declarativo LakeFlow, AUTO CDC para Pythondo pipeline declarativo LakeFlow e AUTO CDC FROM Snapshot para Pythondo pipeline declarativo LakeFlow.
Este artigo descreve como atualizar tabelas no seu pipeline declarativo LakeFlow com base em alterações nos dados de origem. Para saber como registrar e consultar informações de alterações em nível de linha para tabelas Delta , consulte Usar feed de dados de alterações Delta Lake no Databricks.
Requisitos
Para usar as APIs CDC , seu pipeline deve ser configurado para usar o pipeline declarativo LakeFlow serverless ou as edições Pro
ou Advanced
do pipeline declarativo LakeFlow .
Como o CDC é implementado com a API AUTO CDC?
Ao manipular automaticamente registros fora de sequência, a API AUTO CDC no pipeline declarativo LakeFlow garante o processamento correto de registros CDC e elimina a necessidade de desenvolver lógica complexa para manipular registros fora de sequência. Você deve especificar uma coluna nos dados de origem na qual sequenciar os registros, o que o pipeline declarativo LakeFlow interpreta como uma representação monotonicamente crescente da ordenação adequada dos dados de origem. O pipeline declarativo LakeFlow manipula automaticamente dados que chegam fora de ordem. Para alterações do tipo 2 SCD , o pipeline declarativo LakeFlow propaga os valores de sequenciamento apropriados para as colunas __START_AT
e __END_AT
da tabela de destino. Deve haver uma atualização distinta por key em cada valor de sequenciamento, e valores de sequenciamento NULL não são suportados.
Para executar o processamento CDC com AUTO CDC
, primeiro crie uma tabela de transmissão e, em seguida, use a instrução AUTO CDC ... INTO
em SQL ou a função create_auto_cdc_flow()
em Python para especificar a origem, a chave e o sequenciamento do feed de alterações. Para criar a tabela de transmissão de destino, use a instrução CREATE OR REFRESH STREAMING TABLE
em SQL ou a função create_streaming_table()
em Python. Veja os exemplos de processamento SCD tipo 1 e tipo 2 .
Para obter detalhes de sintaxe, consulte a referênciaSQL do pipeline declarativo LakeFlow ou a referênciaPython.
Como o CDC é implementado com a API AUTO CDC FROM SNAPSHOT
?
Visualização
A API AUTO CDC FROM SNAPSHOT
está em Visualização Pública.
AUTO CDC FROM SNAPSHOT
é uma API declarativa que determina eficientemente alterações nos dados de origem comparando uma série de Snapshots em ordem e, em seguida, executa o processamento necessário para o processamento CDC dos registros no Snapshot. AUTO CDC FROM SNAPSHOT
é suportado apenas pela interface Python do pipeline declarativo LakeFlow .
AUTO CDC FROM SNAPSHOT
suporta ingestão de Snapshot de vários tipos de fonte:
- Use a ingestão periódica de Snapshot para ingerir Snapshot de uma tabela ou view existente.
AUTO CDC FROM SNAPSHOT
tem uma interface simples e otimizada para oferecer suporte à ingestão periódica de Snapshot de um objeto de banco de dados existente. Um novo Snapshot é ingerido com cada atualização pipeline , e o tempo de ingestão é usado como a versão do Snapshot. Quando um pipeline é executado no modo contínuo, vários Snapshots são ingeridos com cada atualização pipeline em um período determinado pela configuração do intervalo de gatilho para o fluxo que contém o processamentoAUTO CDC FROM SNAPSHOT
. - Use a ingestão de Snapshot histórico para processar arquivos que contêm Snapshot de banco de dados, como Snapshot gerado de um banco de dados Oracle ou MySQL ou de um data warehouse.
Para executar o processamento CDC de qualquer tipo de fonte com AUTO CDC FROM SNAPSHOT
, primeiro crie uma tabela de transmissão e, em seguida, use a função create_auto_cdc_from_snapshot_flow()
em Python para especificar o Snapshot, a chave e outros argumentos necessários para implementar o processamento. Veja os exemplos de ingestão periódica de Snapshot e ingestão histórica de Snapshot .
O Snapshot passado para a API deve estar em ordem crescente por versão. Se o pipeline declarativo LakeFlow detectar um Snapshot fora de ordem, um erro será gerado.
Para obter detalhes de sintaxe, consulte a referênciaPython do pipeline declarativo LakeFlow .
Use várias colunas para sequenciamento
Você pode sequenciar por várias colunas (por exemplo, um registro de data e hora e um ID para desempatar), você pode usar uma STRUCT para combiná-las: ela ordena pelo primeiro campo da STRUCT primeiro e, em caso de empate, considera o segundo campo, e assim por diante.
Exemplo em SQL:
SEQUENCE BY STRUCT(timestamp_col, id_col)
Exemplo em Python:
sequence_by = struct("timestamp_col", "id_col")
Limitações
A coluna usada para sequenciamento deve ser um tipo de dado classificável.
Exemplo: processamento de SCD tipo 1 e SCD tipo 2 com dados de origem CDF
As seções a seguir fornecem exemplos de consultas SCD tipo 1 e tipo 2 do pipeline declarativo LakeFlow que atualizam tabelas de destino com base em eventos de origem de um feed de dados de alteração que:
- Cria novos registros de usuário.
- Exclui um registro de usuário.
- Atualiza registros de usuários. No exemplo do SCD tipo 1, as últimas operações
UPDATE
chegam atrasadas e são descartadas da tabela de destino, demonstrando o tratamento de eventos fora de ordem.
Os exemplos a seguir pressupõem familiaridade com a configuração e atualização do pipeline declarativo LakeFlow . Veja o tutorial: Construir um pipeline ETL usando captura de dados de alterações (CDC) com pipeline declarativo LakeFlow.
Para executar esses exemplos, você deve começar criando um dataset de amostra. Consulte Gerar dados de teste.
A seguir encontram-se registros de entrada para esses exemplos:
userId | name | city | operation | sequenceNum |
---|---|---|---|---|
124 | Raul | Oaxaca | INSERT | 1 |
123 | Isabel | Monterrey | INSERT | 1 |
125 | Mercedes | Tijuana | INSERT | 2 |
126 | Lily | Cancun | INSERT | 2 |
123 | null | null | DELETE | 6 |
125 | Mercedes | Guadalajara | UPDATE | 6 |
125 | Mercedes | Mexicali | UPDATE | 5 |
123 | Isabel | Chihuahua | UPDATE | 5 |
Se você cancelar o comentário da linha final nos dados de exemplo, ele inserirá o seguinte registro que especifica onde os registros devem ser truncados:
userId | name | city | operation | sequenceNum |
---|---|---|---|---|
null | null | null | TRUNCATE | 3 |
Todos os exemplos a seguir incluem opções para especificar as operações DELETE
e TRUNCATE
, mas cada uma é opcional.
Processar atualizações do tipo 1 do SCD
O exemplo a seguir demonstra o processamento de atualizações do SCD tipo 1:
- Python
- SQL
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr
@dp.view
def users():
return spark.readStream.table("cdc_data.users")
dp.create_streaming_table("target")
dp.create_auto_cdc_flow(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
apply_as_truncates = expr("operation = 'TRUNCATE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
CREATE FLOW flowname AS AUTO CDC INTO
target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
APPLY AS TRUNCATE WHEN
operation = "TRUNCATE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 1;
Após executar o exemplo do SCD tipo 1, a tabela de destino contém os seguintes registros:
userId | name | city |
---|---|---|
124 | Raul | Oaxaca |
125 | Mercedes | Guadalajara |
126 | Lily | Cancun |
Depois de executar o exemplo do SCD tipo 1 com o registro TRUNCATE
adicional, os registros 124
e 126
são truncados devido à operação TRUNCATE
em sequenceNum=3
, e a tabela de destino contém o seguinte registro:
userId | name | city |
---|---|---|
125 | Mercedes | Guadalajara |
Processar atualizações do tipo 2 do SCD
O exemplo a seguir demonstra o processamento de atualizações do SCD tipo 2:
- Python
- SQL
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr
@dp.view
def users():
return spark.readStream.table("cdc_data.users")
dp.create_streaming_table("target")
dp.create_auto_cdc_flow(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2"
)
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
CREATE FLOW target_flow
AS AUTO CDC INTO
target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2;
Após executar o exemplo do SCD tipo 2, a tabela de destino contém os seguintes registros:
userId | name | city | __START_AT | __END_AT |
---|---|---|---|---|
123 | Isabel | Monterrey | 1 | 5 |
123 | Isabel | Chihuahua | 5 | 6 |
124 | Raul | Oaxaca | 1 | null |
125 | Mercedes | Tijuana | 2 | 5 |
125 | Mercedes | Mexicali | 5 | 6 |
125 | Mercedes | Guadalajara | 6 | null |
126 | Lily | Cancun | 2 | null |
Uma consulta SCD tipo 2 também pode especificar um subconjunto de colunas de saída a serem rastreadas para histórico na tabela de destino. Alterações em outras colunas são atualizadas no local em vez de gerar novos registros históricos. O exemplo a seguir demonstra a exclusão da coluna city
do acompanhamento:
O exemplo a seguir demonstra o uso do histórico de rastreamento com o SCD tipo 2:
- Python
- SQL
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr
@dp.view
def users():
return spark.readStream.table("cdc_data.users")
dp.create_streaming_table("target")
dp.create_auto_cdc_flow(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2",
track_history_except_column_list = ["city"]
)
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
CREATE FLOW target_flow
AS AUTO CDC INTO
target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2
TRACK HISTORY ON * EXCEPT
(city)
Depois de executar este exemplo sem o registro TRUNCATE
adicional, a tabela de destino contém os seguintes registros:
userId | name | city | __START_AT | __END_AT |
---|---|---|---|---|
123 | Isabel | Chihuahua | 1 | 6 |
124 | Raul | Oaxaca | 1 | null |
125 | Mercedes | Guadalajara | 2 | null |
126 | Lily | Cancun | 2 | null |
Gerar dados de teste
O código abaixo é fornecido para gerar um dataset de exemplo para uso nas consultas de exemplo presentes neste tutorial. Supondo que você tenha as credenciais adequadas para criar um novo esquema e criar uma nova tabela, você pode executar essas instruções com um Databricks SQL. O código a seguir não se destina a ser executado como parte do pipeline declarativo LakeFlow :
CREATE SCHEMA IF NOT EXISTS cdc_data;
CREATE TABLE
cdc_data.users
AS SELECT
col1 AS userId,
col2 AS name,
col3 AS city,
col4 AS operation,
col5 AS sequenceNum
FROM (
VALUES
-- Initial load.
(124, "Raul", "Oaxaca", "INSERT", 1),
(123, "Isabel", "Monterrey", "INSERT", 1),
-- New users.
(125, "Mercedes", "Tijuana", "INSERT", 2),
(126, "Lily", "Cancun", "INSERT", 2),
-- Isabel is removed from the system and Mercedes moved to Guadalajara.
(123, null, null, "DELETE", 6),
(125, "Mercedes", "Guadalajara", "UPDATE", 6),
-- This batch of updates arrived out of order. The above batch at sequenceNum 6 will be the final state.
(125, "Mercedes", "Mexicali", "UPDATE", 5),
(123, "Isabel", "Chihuahua", "UPDATE", 5)
-- Uncomment to test TRUNCATE.
-- ,(null, null, null, "TRUNCATE", 3)
);
Exemplo: Processamento periódico de snapshots
O exemplo a seguir demonstra o processamento SCD tipo 2 que ingere o Snapshot de uma tabela armazenada em mycatalog.myschema.mytable
. Os resultados do processamento são gravados em uma tabela chamada target
.
mycatalog.myschema.mytable
registros no carimbo de data/hora 2024-01-01 00:00:00
Chave | Valor |
---|---|
1 | a1 |
2 | a2 |
mycatalog.myschema.mytable
registros no carimbo de data/hora 2024-01-01 12:00:00
Chave | Valor |
---|---|
2 | b2 |
3 | a3 |
from pyspark import pipelines as dp
@dp.view(name="source")
def source():
return spark.read.table("mycatalog.myschema.mytable")
dp.create_streaming_table("target")
dp.create_auto_cdc_from_snapshot_flow(
target="target",
source="source",
keys=["key"],
stored_as_scd_type=2
)
Após o processamento do Snapshot, a tabela de destino contém os seguintes registros:
Chave | Valor | __START_AT | __END_AT |
---|---|---|---|
1 | a1 | 2024-01-01 00:00:00 | 2024-01-01 12:00:00 |
2 | a2 | 2024-01-01 00:00:00 | 2024-01-01 12:00:00 |
2 | b2 | 2024-01-01 12:00:00 | null |
3 | a3 | 2024-01-01 12:00:00 | null |
Exemplo: Processamento de instantâneo histórico
O exemplo a seguir demonstra o processamento SCD tipo 2 que atualiza uma tabela de destino com base em eventos de origem de dois Snapshots armazenados em um sistema de armazenamento cloud :
Snapshot em timestamp
, armazenado em /<PATH>/filename1.csv
Chave | Coluna de Rastreamento | Coluna sem rastreamento |
---|---|---|
1 | a1 | b1 |
2 | a2 | b2 |
4 | a4 | b4 |
Snapshot em timestamp + 5
, armazenado em /<PATH>/filename2.csv
Chave | Coluna de Rastreamento | Coluna sem rastreamento |
---|---|---|
2 | a2_novo | b2 |
3 | a3 | b3 |
4 | a4 | b4_novo |
O exemplo de código a seguir demonstra o processamento de atualizações SCD tipo 2 com estes Snapshots:
from pyspark import pipelines as dp
def exist(file_name):
# Storage system-dependent function that returns true if file_name exists, false otherwise
# This function returns a tuple, where the first value is a DataFrame containing the snapshot
# records to process, and the second value is the snapshot version representing the logical
# order of the snapshot.
# Returns None if no snapshot exists.
def next_snapshot_and_version(latest_snapshot_version):
latest_snapshot_version = latest_snapshot_version or 0
next_version = latest_snapshot_version + 1
file_name = "dir_path/filename_" + next_version + ".csv"
if (exist(file_name)):
return (spark.read.load(file_name), next_version)
else:
# No snapshot available
return None
dp.create_streaming_live_table("target")
dp.create_auto_cdc_from_snapshot_flow(
target = "target",
source = next_snapshot_and_version,
keys = ["Key"],
stored_as_scd_type = 2,
track_history_column_list = ["TrackingCol"]
)
Após o processamento do Snapshot, a tabela de destino contém os seguintes registros:
Chave | Coluna de Rastreamento | Coluna sem rastreamento | __START_AT | __END_AT |
---|---|---|---|---|
1 | a1 | b1 | 1 | 2 |
2 | a2 | b2 | 1 | 2 |
2 | a2_novo | b2 | 2 | null |
3 | a3 | b3 | 2 | null |
4 | a4 | b4_novo | 1 | null |
Adicionar, alterar ou excluir dados em uma tabela de transmissão de destino
Se o seu pipeline publicar tabelas no Unity Catalog, você poderá usar instruções de linguagem de manipulação de dados (DML), incluindo instruções de inserção, atualização, exclusão e merge , para modificar as tabelas de transmissão de destino criadas pelas instruções AUTO CDC ... INTO
.
- As declarações DML que modificam o esquema de tabela de uma tabela de streaming não são suportadas. Certifique-se de que suas instruções DML não tentem evoluir o esquema da tabela.
- As instruções DML que atualizam uma tabela de transmissão podem ser executadas somente em um cluster Unity Catalog compartilhado ou em um SQL warehouse usando Databricks Runtime 13.3 LTS e acima.
- Como a transmissão requer somente anexação de fonte de dados, se o seu processamento exigir transmissão de uma tabela de transmissão de origem com alterações (por exemplo, por instruções DML), defina o sinalizador skipChangeCommits ao ler a tabela de transmissão de origem. Quando
skipChangeCommits
é definido, as transações que excluem ou modificam registros na tabela de origem são ignoradas. Se o seu processamento não exigir uma tabela de transmissão, você poderá usar uma view materializada (que não tenha a restrição de somente anexação) como tabela de destino.
Como o pipeline declarativo LakeFlow usa uma coluna SEQUENCE BY
especificada e propaga valores de sequenciamento apropriados para as colunas __START_AT
e __END_AT
da tabela de destino (para SCD tipo 2), você deve garantir que as instruções DML usem valores válidos para essas colunas para manter a ordem correta dos registros. Veja Como o CDC é implementado com a API AUTO CDC?.
Para obter mais informações sobre o uso de instruções DML com tabelas de transmissão, consulte Adicionar, alterar ou excluir dados em uma tabela de transmissão.
O exemplo seguinte insere um registro ativo com uma sequência inicial de 5:
INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);
Ler um feed de dados de alteração de uma tabela de destino AUTO CDC
No Databricks Runtime 15.2 e acima, você pode ler um feed de dados de alteração de uma tabela de transmissão que é o destino de consultas AUTO CDC
ou AUTO CDC FROM SNAPSHOT
da mesma forma que você lê um feed de dados de alteração de outras tabelas Delta . Os seguintes itens são necessários para ler o feed de dados de alteração de uma tabela de transmissão de destino:
- A tabela de transmissão de destino deve ser publicada no Unity Catalog. Consulte Usar Unity Catalog com seu pipeline declarativo LakeFlow.
- Para ler o feed de dados alterados da tabela de transmissão de destino, você deve usar Databricks Runtime 15.2 ou superior. Para ler o feed de dados alterados em um pipeline diferente, o pipeline deve ser configurado para usar Databricks Runtime 15.2 ou superior.
Você lê o feed de dados de alteração de uma tabela de transmissão de destino que foi criada no pipeline declarativo LakeFlow da mesma forma que lê um feed de dados de alteração de outras tabelas Delta . Para saber mais sobre como usar a funcionalidade de feed de dados de alteração do Delta, incluindo exemplos em Python e SQL, consulte Usar feed de dados de alteração do Delta Lake no Databricks.
O registro do feed de dados de alteração inclui metadados que identificam o tipo de evento de alteração. Quando um registro é atualizado em uma tabela, os metadados dos registros de alteração associados normalmente incluem valores _change_type
definidos como eventos update_preimage
e update_postimage
.
Entretanto, os valores _change_type
serão diferentes se forem feitas atualizações na tabela de transmissão de destino que incluam alterações nos valores key primária. Quando as alterações incluem atualizações na chave primária, os campos de metadados _change_type
são definidos como eventos insert
e delete
. Alterações na chave primária podem ocorrer quando atualizações manuais são feitas em um dos campos key com uma instrução UPDATE
ou MERGE
ou, para tabelas SCD tipo 2, quando o campo __start_at
muda para refletir um valor de sequência começar anterior.
A consulta AUTO CDC
determina os valores key primária, que diferem para o processamento SCD tipo 1 e SCD tipo 2:
- Para o processamento SCD tipo 1 e a interface Python do pipeline declarativo LakeFlow , a key primária é o valor do parâmetro
keys
na funçãocreate_auto_cdc_flow()
. Para a interface SQL do pipeline declarativo LakeFlow , a key primária são as colunas definidas pela cláusulaKEYS
na instruçãoAUTO CDC ... INTO
. - Para SCD tipo 2, a key primária é o parâmetro
keys
ou cláusulaKEYS
mais o valor de retorno das operaçõescoalesce(__START_AT, __END_AT)
, onde__START_AT
e__END_AT
são as colunas correspondentes da tabela de transmissão de destino.
Obter dados sobre registros processados por uma consulta CDC do pipeline declarativo LakeFlow
As seguintes métricas são capturadas somente por consultas AUTO CDC
e não por consultas AUTO CDC FROM SNAPSHOT
.
As seguintes métricas são capturadas por consultas AUTO CDC
:
num_upserted_rows
: O número de linhas de saída inseridas no dataset durante uma atualização.num_deleted_rows
: O número de linhas de saída existentes excluídas do dataset durante uma atualização.
A métrica num_output_rows
, saída para fluxos não CDC, não é capturada para consultas AUTO CDC
.
Quais objetos de dados são usados para o processamento CDC do pipeline declarativo LakeFlow ?
- Essas estruturas de dados se aplicam somente ao processamento
AUTO CDC
, não ao processamentoAUTO CDC FROM SNAPSHOT
. - Essas estruturas de dados se aplicam somente quando a tabela de destino é publicada no Hive metastore. Se um pipeline for publicado no Unity Catalog, as tabelas de suporte internas ficarão inacessíveis aos usuários.
Quando você declara a tabela de destino no Hive metastore, são criadas duas estruturas de dados:
- Uma visualização usando o nome atribuído à tabela de destino.
- Uma tabela de suporte interna usada pelo pipeline declarativo LakeFlow para gerenciar o processamento CDC . Esta tabela é nomeada adicionando
__apply_changes_storage_
ao nome da tabela de destino.
Por exemplo, se você declarar uma tabela de destino chamada dp_cdc_target
, verá uma view chamada dp_cdc_target
e uma tabela chamada __apply_changes_storage_dp_cdc_target
no metastore. A criação de uma view permite que o pipeline declarativo LakeFlow filtre as informações extras (por exemplo, tombstones e versões) necessárias para lidar com dados fora de ordem. Para view os dados processados, consulte a view de destino. Como o esquema da tabela __apply_changes_storage_
pode mudar para oferecer suporte a recursos ou melhorias futuras, você não deve consultar a tabela para uso em produção. Se você adicionar dados manualmente à tabela, os registros serão considerados anteriores a outras alterações porque as colunas de versão estarão ausentes.