チュートリアル: テーブル間のトランザクションを調整する
プレビュー
Unity Catalog で管理される Delta テーブルに書き込むトランザクションは、パブリック プレビュー段階にあります。
Unity Catalog で管理される Iceberg テーブルに書き込むトランザクションは、プライベート プレビュー段階です。このプレビューに参加するには、マネージド Iceberg テーブル プレビュー登録フォームを送信してください。
このチュートリアルでは、トランザクションを使用して複数のステートメントとテーブル間で更新を調整する方法を説明します。自動的にコミットする非対話型トランザクションと、明示的な制御を可能にする対話型トランザクションの両方のトランザクション モードを学習します。このチュートリアルでは、ストアド プロシージャと SQL スクリプトを使用したトランザクションを使用して、Databricks 上にミッション クリティカルなウェアハウス ワークロードを構築する方法も説明します。
要件
-
環境 : Databricks ワークスペースへのアクセス。
-
コンピュート : サポートされるコンピュートのタイプはトランザクション モードによって異なります。
- クラシックまたはサーバレスSQLウェアハウスは、両方のトランザクション モードをサポートします。
- サーバーレス コンピュートは非対話型トランザクションのみをサポートします。
- Databricks Runtime 18.0 以降を実行しているクラシッククラスターは、非対話型トランザクションのみをサポートします。
-
権限 : Unity Catalogスキーマ内の
CREATE TABLE。
サンプルテーブルを設定する
複数ステートメント、複数テーブルのトランザクションで書き込まれるすべてのテーブルは次の条件を満たす必要があります。
- Be Unity Catalogマネージドテーブル ( DeltaまたはIceberg )
- カタログ管理コミットを有効にする
SQL エディターまたはノートブックで 2 つのサンプル テーブルを作成します。
-- Create a table for account data
CREATE TABLE IF NOT EXISTS sample_accounts (
id INT,
account_name STRING,
balance DECIMAL(10,2)
) USING DELTA
TBLPROPERTIES (
'delta.feature.catalogManaged' = 'supported'
);
-- Create a table for transaction records
CREATE TABLE IF NOT EXISTS sample_transactions (
id INT,
account_id INT,
transaction_type STRING,
amount DECIMAL(10,2)
) USING DELTA
TBLPROPERTIES (
'delta.feature.catalogManaged' = 'supported'
);
-- To upgrade an existing table, use:
-- ALTER TABLE <table_name> SET TBLPROPERTIES ('delta.feature.catalogManaged' = 'supported');
-- Insert sample data
INSERT INTO sample_accounts VALUES
(1, 'Alice', 1000.00),
(2, 'Bob', 500.00);
INSERT INTO sample_transactions VALUES
(1, 1, 'deposit', 100.00);
セットアップを確認します。
SELECT * FROM sample_accounts;
SELECT * FROM sample_transactions;
出力:
sample_accounts:
id account_name balance
1 Alice 1000.00
2 Bob 500.00
sample_transactions:
id account_id transaction_type amount
1 1 deposit 100.00
非対話型トランザクション
非対話型トランザクションではBEGIN ATOMIC ... END;構文が使用されます。すべてのステートメントは単一のアトミック単位として実行されます。すべてのステートメントが成功すると、Databricks は自動的にコミットします。いずれかのステートメントが失敗した場合、Databricks はすべての変更を自動的にロールバックします。詳細な構文と使用パターンについては、 「非対話型トランザクション」を参照してください。
成功した取引を実行する
両方のテーブルをアトミックに更新します。
BEGIN ATOMIC
-- Update Alice's account balance
UPDATE sample_accounts
SET balance = balance + 100.00
WHERE id = 1;
-- Record the deposit transaction
INSERT INTO sample_transactions
VALUES (2, 1, 'deposit', 100.00);
END;
両方の操作が成功したことを確認します。
-- Alice's balance should now be 1100.00
SELECT * FROM sample_accounts WHERE id = 1;
-- Should show two transaction records
SELECT * FROM sample_transactions;
残高の更新と取引記録の両方が一緒に作成されました。どちらかのステートメントが失敗した場合、どちらの変更もコミットされず、Databricks は副作用なしでトランザクションを終了していました。
SIGNALを使用して条件に基づいてトランザクションを失敗させる
SIGNALBEGIN ATOMIC ... END;ブロック内で 使用すると、ユーザー定義の条件が満たされない場合にトランザクションを失敗させることができます。これはコミット前のデータ検証に役立ちます。
BEGIN ATOMIC
-- Insert new account
INSERT INTO sample_accounts VALUES (3, 'Charlie', -50.00);
-- Fail the transaction if balance is negative
IF (SELECT balance FROM sample_accounts WHERE id = 3) < 0 THEN
SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Account balance cannot be negative';
END IF;
END;
SIGNALはエラーを発生させ、トランザクション全体が自動的にロールバックされます。挿入がロールバックされたことを確認します。
-- Should return 0 rows (the transaction was rolled back by SIGNAL)
SELECT * FROM sample_accounts WHERE id = 3;
失敗時の自動ロールバックを参照
無効なステートメントでトランザクションを実行します。
BEGIN ATOMIC
-- This statement is valid
INSERT INTO sample_accounts VALUES (4, 'David', 300.00);
-- This statement will fail (table does not exist)
INSERT INTO non_existent_table VALUES (1, 2, 3);
END;
トランザクションはエラーにより失敗します。最初のステートメントがロールバックされたことを確認します。
-- Should return 0 rows because the transaction was rolled back
SELECT * FROM sample_accounts WHERE id = 4;
最初のINSERTステートメントは有効でしたが、2 番目のステートメントが失敗したためロールバックされました。これは、トランザクションの「すべてか無か」の保証を示しています。
インタラクティブな取引
対話型トランザクションを使用すると、コミットまたはロールバックするタイミングを明示的に制御できます。BEGIN TRANSACTIONを使用して開始し、コミットして変更を保存するか、 ROLLBACK を使用して変更を破棄します。
変更をコミットする
取引を開始します:
BEGIN TRANSACTION;
変更を加える(まだコミットされていません):
INSERT INTO sample_accounts VALUES (5, 'Eve', 850.00);
UPDATE sample_accounts SET balance = balance + 50.00 WHERE id = 2;
変更を永続的にすることを約束します:
COMMIT;
変更を確認します。
-- Eve's account should now be visible
SELECT * FROM sample_accounts WHERE id = 5;
-- Bob's balance should be 550.00 (500 + 50)
SELECT * FROM sample_accounts WHERE id = 2;
変更をロールバックする
新しい取引を開始します:
BEGIN TRANSACTION;
変更を加える:
INSERT INTO sample_accounts VALUES (6, 'Frank', 600.00);
セッションで変更が表示されていることを確認します。
-- Should show Frank's account (visible in your session only)
SELECT * FROM sample_accounts WHERE id = 6;
変更を破棄するにはロールバックします。
ROLLBACK;
変更が破棄されたことを確認します。
-- Should return 0 rows (the insert was rolled back)
SELECT * FROM sample_accounts WHERE id = 6;
ストアドプロシージャとSQLスクリプトでの使用
トランザクションとストアド プロシージャを組み合わせて、再利用可能なトランザクション ロジックを作成できます。このパターンは、頻繁に実行する複雑な操作に役立ちます。
-
カタログ管理コミットを有効にした2つのテーブルを作成する
SQL
CREATE TABLE orders (order_id STRING, item_sku STRING, quantity_ordered INT)
TBLPROPERTIES ('delta.feature.catalogManaged' = 'supported');
CREATE TABLE inventory (item_sku STRING, quantity_in_stock INT)
TBLPROPERTIES ('delta.feature.catalogManaged' = 'supported'); -
ストアドプロシージャを定義する
SQLCREATE OR REPLACE PROCEDURE main.retail.apply_order(
IN p_order_id STRING,
IN p_customer_id STRING,
IN p_order_amount DECIMAL(18,2)
)
LANGUAGE SQL
SQL SECURITY INVOKER
MODIFIES SQL DATA
AS
BEGIN
-- Insert the order
INSERT INTO main.retail.orders (order_id, customer_id, amount)
VALUES (p_order_id, p_customer_id, p_order_amount);
-- Update total sales per customer
MERGE INTO main.retail.total_sales AS t
USING (
SELECT
p_customer_id AS customer_id,
p_order_amount AS order_amount
) s
ON t.customer_id = s.customer_id
WHEN MATCHED THEN
UPDATE SET t.total_amount = t.total_amount + s.order_amount
WHEN NOT MATCHED THEN
INSERT (customer_id, total_amount)
VALUES (s.customer_id, s.order_amount);
END; -
取引を定義する
SQLBEGIN ATOMIC
-- Staging batch id for this transaction
DECLARE new_order_id STRING DEFAULT uuid();
DECLARE v_batch_id STRING DEFAULT uuid();
-- 1) Stage incoming customer and order rows
INSERT INTO main.retail.orders_staging (order_id, customer_id, amount, batch_id)
VALUES (new_order_id, 'CUST_123', 249.99, v_batch_id);
-- 2) Drive final writes from staging to production via stored procedure
FOR o AS
SELECT
order_id,
customer_id,
amount
FROM main.retail.orders_staging
WHERE batch_id = v_batch_id
DO
CALL main.retail.apply_order(
o.order_id,
o.customer_id,
o.amount
);
END FOR;
-- 3) Clean up processed staging rows
DELETE FROM main.retail.orders_staging
WHERE batch_id = v_batch_id;
END; -- 4) Commit the transaction
トランザクションの一部が失敗した場合、Databricks はすべての変更を自動的にロールバックします。
掃除
サンプル テーブルを削除します。
DROP TABLE IF EXISTS sample_accounts;
DROP TABLE IF EXISTS sample_transactions;
DROP TABLE IF EXISTS orders;
DROP TABLE IF EXISTS inventory;
次のステップ
- トランザクション: トランザクション サポートの概要。
- トランザクション モード: 両方のモードの詳細な構文とパターン。
- カタログ管理コミット: テーブルでトランザクション サポートを有効にします。
- 異なるクライアントからのトランザクションを使用する: JDBC、ODBC、および Python アプリケーションからトランザクションを実行します。