メインコンテンツまでスキップ

チュートリアル: テーブル間のトランザクションを調整する

備考

プレビュー

Unity Catalog で管理される Delta テーブルに書き込むトランザクションは、パブリック プレビュー段階にあります。

Unity Catalog で管理される Iceberg テーブルに書き込むトランザクションは、プライベート プレビュー段階です。このプレビューに参加するには、マネージド Iceberg テーブル プレビュー登録フォームを送信してください。

このチュートリアルでは、トランザクションを使用して複数のステートメントとテーブル間で更新を調整する方法を説明します。自動的にコミットする非対話型トランザクションと、明示的な制御を可能にする対話型トランザクションの両方のトランザクション モードを学習します。このチュートリアルでは、ストアド プロシージャと SQL スクリプトを使用したトランザクションを使用して、Databricks 上にミッション クリティカルなウェアハウス ワークロードを構築する方法も説明します。

要件

  • 環境 : Databricks ワークスペースへのアクセス。

  • コンピュート : サポートされるコンピュートのタイプはトランザクション モードによって異なります。

  • 権限 : Unity Catalogスキーマ内のCREATE TABLE

サンプルテーブルを設定する

複数ステートメント、複数テーブルのトランザクションで書き込まれるすべてのテーブルは次の条件を満たす必要があります。

SQL エディターまたはノートブックで 2 つのサンプル テーブルを作成します。

SQL
-- Create a table for account data
CREATE TABLE IF NOT EXISTS sample_accounts (
id INT,
account_name STRING,
balance DECIMAL(10,2)
) USING DELTA
TBLPROPERTIES (
'delta.feature.catalogManaged' = 'supported'
);

-- Create a table for transaction records
CREATE TABLE IF NOT EXISTS sample_transactions (
id INT,
account_id INT,
transaction_type STRING,
amount DECIMAL(10,2)
) USING DELTA
TBLPROPERTIES (
'delta.feature.catalogManaged' = 'supported'
);

-- To upgrade an existing table, use:
-- ALTER TABLE <table_name> SET TBLPROPERTIES ('delta.feature.catalogManaged' = 'supported');

-- Insert sample data
INSERT INTO sample_accounts VALUES
(1, 'Alice', 1000.00),
(2, 'Bob', 500.00);

INSERT INTO sample_transactions VALUES
(1, 1, 'deposit', 100.00);

セットアップを確認します。

SQL
SELECT * FROM sample_accounts;
SELECT * FROM sample_transactions;

出力:

sample_accounts:
id account_name balance
1 Alice 1000.00
2 Bob 500.00

sample_transactions:
id account_id transaction_type amount
1 1 deposit 100.00

非対話型トランザクション

非対話型トランザクションではBEGIN ATOMIC ... END;構文が使用されます。すべてのステートメントは単一のアトミック単位として実行されます。すべてのステートメントが成功すると、Databricks は自動的にコミットします。いずれかのステートメントが失敗した場合、Databricks はすべての変更を自動的にロールバックします。詳細な構文と使用パターンについては、 「非対話型トランザクション」を参照してください。

成功した取引を実行する

両方のテーブルをアトミックに更新します。

SQL
BEGIN ATOMIC
-- Update Alice's account balance
UPDATE sample_accounts
SET balance = balance + 100.00
WHERE id = 1;

-- Record the deposit transaction
INSERT INTO sample_transactions
VALUES (2, 1, 'deposit', 100.00);
END;

両方の操作が成功したことを確認します。

SQL
-- Alice's balance should now be 1100.00
SELECT * FROM sample_accounts WHERE id = 1;

-- Should show two transaction records
SELECT * FROM sample_transactions;

残高の更新と取引記録の両方が一緒に作成されました。どちらかのステートメントが失敗した場合、どちらの変更もコミットされず、Databricks は副作用なしでトランザクションを終了していました。

SIGNALを使用して条件に基づいてトランザクションを失敗させる

SIGNALBEGIN ATOMIC ... END;ブロック内で 使用すると、ユーザー定義の条件が満たされない場合にトランザクションを失敗させることができます。これはコミット前のデータ検証に役立ちます。

SQL
BEGIN ATOMIC
-- Insert new account
INSERT INTO sample_accounts VALUES (3, 'Charlie', -50.00);

-- Fail the transaction if balance is negative
IF (SELECT balance FROM sample_accounts WHERE id = 3) < 0 THEN
SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Account balance cannot be negative';
END IF;
END;

SIGNALはエラーを発生させ、トランザクション全体が自動的にロールバックされます。挿入がロールバックされたことを確認します。

SQL
-- Should return 0 rows (the transaction was rolled back by SIGNAL)
SELECT * FROM sample_accounts WHERE id = 3;

失敗時の自動ロールバックを参照

無効なステートメントでトランザクションを実行します。

SQL
BEGIN ATOMIC
-- This statement is valid
INSERT INTO sample_accounts VALUES (4, 'David', 300.00);

-- This statement will fail (table does not exist)
INSERT INTO non_existent_table VALUES (1, 2, 3);
END;

トランザクションはエラーにより失敗します。最初のステートメントがロールバックされたことを確認します。

SQL
-- Should return 0 rows because the transaction was rolled back
SELECT * FROM sample_accounts WHERE id = 4;

最初のINSERTステートメントは有効でしたが、2 番目のステートメントが失敗したためロールバックされました。これは、トランザクションの「すべてか無か」の保証を示しています。

インタラクティブな取引

対話型トランザクションを使用すると、コミットまたはロールバックするタイミングを明示的に制御できます。BEGIN TRANSACTIONを使用して開始し、コミットして変更を保存するか、 ROLLBACK を使用して変更を破棄します。

変更をコミットする

取引を開始します:

SQL
BEGIN TRANSACTION;

変更を加える(まだコミットされていません):

SQL
INSERT INTO sample_accounts VALUES (5, 'Eve', 850.00);
UPDATE sample_accounts SET balance = balance + 50.00 WHERE id = 2;

変更を永続的にすることを約束します:

SQL
COMMIT;

変更を確認します。

SQL
-- Eve's account should now be visible
SELECT * FROM sample_accounts WHERE id = 5;

-- Bob's balance should be 550.00 (500 + 50)
SELECT * FROM sample_accounts WHERE id = 2;

変更をロールバックする

新しい取引を開始します:

SQL
BEGIN TRANSACTION;

変更を加える:

SQL
INSERT INTO sample_accounts VALUES (6, 'Frank', 600.00);

セッションで変更が表示されていることを確認します。

SQL
-- Should show Frank's account (visible in your session only)
SELECT * FROM sample_accounts WHERE id = 6;

変更を破棄するにはロールバックします。

SQL
ROLLBACK;

変更が破棄されたことを確認します。

SQL
-- Should return 0 rows (the insert was rolled back)
SELECT * FROM sample_accounts WHERE id = 6;

ストアドプロシージャとSQLスクリプトでの使用

トランザクションとストアド プロシージャを組み合わせて、再利用可能なトランザクション ロジックを作成できます。このパターンは、頻繁に実行する複雑な操作に役立ちます。

  1. カタログ管理コミットを有効にした2つのテーブルを作成する

    SQL

    CREATE TABLE orders (order_id STRING, item_sku STRING, quantity_ordered INT)
    TBLPROPERTIES ('delta.feature.catalogManaged' = 'supported');

    CREATE TABLE inventory (item_sku STRING, quantity_in_stock INT)
    TBLPROPERTIES ('delta.feature.catalogManaged' = 'supported');

  2. ストアドプロシージャを定義する

    SQL
    CREATE OR REPLACE PROCEDURE main.retail.apply_order(
    IN p_order_id STRING,
    IN p_customer_id STRING,
    IN p_order_amount DECIMAL(18,2)
    )
    LANGUAGE SQL
    SQL SECURITY INVOKER
    MODIFIES SQL DATA
    AS
    BEGIN
    -- Insert the order
    INSERT INTO main.retail.orders (order_id, customer_id, amount)
    VALUES (p_order_id, p_customer_id, p_order_amount);

    -- Update total sales per customer
    MERGE INTO main.retail.total_sales AS t
    USING (
    SELECT
    p_customer_id AS customer_id,
    p_order_amount AS order_amount
    ) s
    ON t.customer_id = s.customer_id
    WHEN MATCHED THEN
    UPDATE SET t.total_amount = t.total_amount + s.order_amount
    WHEN NOT MATCHED THEN
    INSERT (customer_id, total_amount)
    VALUES (s.customer_id, s.order_amount);
    END;

  3. 取引を定義する

    SQL
    BEGIN ATOMIC
    -- Staging batch id for this transaction
    DECLARE new_order_id STRING DEFAULT uuid();
    DECLARE v_batch_id STRING DEFAULT uuid();

    -- 1) Stage incoming customer and order rows
    INSERT INTO main.retail.orders_staging (order_id, customer_id, amount, batch_id)
    VALUES (new_order_id, 'CUST_123', 249.99, v_batch_id);

    -- 2) Drive final writes from staging to production via stored procedure
    FOR o AS
    SELECT
    order_id,
    customer_id,
    amount
    FROM main.retail.orders_staging
    WHERE batch_id = v_batch_id
    DO
    CALL main.retail.apply_order(
    o.order_id,
    o.customer_id,
    o.amount
    );
    END FOR;

    -- 3) Clean up processed staging rows
    DELETE FROM main.retail.orders_staging
    WHERE batch_id = v_batch_id;
    END; -- 4) Commit the transaction

トランザクションの一部が失敗した場合、Databricks はすべての変更を自動的にロールバックします。

掃除

サンプル テーブルを削除します。

SQL
DROP TABLE IF EXISTS sample_accounts;
DROP TABLE IF EXISTS sample_transactions;


DROP TABLE IF EXISTS orders;
DROP TABLE IF EXISTS inventory;

次のステップ

関連するSQLリファレンス