トランザクションモード
プレビュー
Unity Catalog で管理される Delta テーブルに書き込むトランザクションは、パブリック プレビュー段階にあります。
Unity Catalog で管理される Iceberg テーブルに書き込むトランザクションは、プライベート プレビュー段階です。このプレビューに参加するには、マネージド Iceberg テーブル プレビュー登録フォームを送信してください。
トランザクションは、非対話型と対話型の 2 つのモードをサポートします。このページでは、各モードをいつ使用するかについて説明し、実装例を示します。
トランザクションの要件と概要については、 「トランザクション」を参照してください。両方のモードの実践的な練習については、 「チュートリアル: テーブル間でのトランザクションの調整」を参照してください。
複数ステートメント、複数テーブルのトランザクションで書き込まれるすべてのテーブルは次の条件を満たす必要があります。
- Be Unity Catalogマネージドテーブル ( DeltaまたはIceberg )
- カタログ管理コミットを有効にする
非対話型トランザクション
非対話型トランザクションでは、 ATOMICキーワードを使用したSQL スクリプトが使用されます。ATOMIC 複合ステートメントブロックは、すべてのステートメントを単一のアトミック ユニットとして実行します。全員が一緒に成功するか、全員が一緒に失敗するか。
サポートされているコンピュート : SQLウェアハウス、サーバレス コンピュート、またはクラスターランニングDatabricks Runtime 18.0 以降。
サポートされている構文 : SQL、Scala spark.sqlブロック、および PySpark spark.sqlブロックをサポートします。
spark.sql("BEGIN ATOMIC ... END;")を呼び出すことで、構造化ストリーミングのforEachBatch内で非対話型トランザクションを使用できます。 ただし、構造化ストリーミング チェックポイントはトランザクション的には進みません。
構文
BEGIN ATOMIC
statement1;
statement2;
statement3;
END;
すべてのステートメントが成功した場合、Databricks はすべての変更を自動的にコミットします。いずれかのステートメントが失敗した場合、Databricks はすべての変更を自動的にロールバックします。
SQLエディタでの使用
非対話型トランザクションをSQL エディターで直接実行します。ATOMIC 複合ステートメントブロック全体を選択し、単一のステートメントとして実行します。
BEGIN ATOMIC
DELETE FROM staging_sales WHERE load_date < current_date() - INTERVAL 7 DAYS;
INSERT INTO staging_sales
SELECT * FROM raw_sales WHERE load_date = current_date();
MERGE INTO sales AS target
USING staging_sales AS source
ON target.sale_id = source.sale_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
END;
ノートブックでの使用
SQLセルまたはプログラムAPIsを使用してノートブックで非対話型トランザクションを実行します。
- SQL
- Python
- Scala
BEGIN ATOMIC
UPDATE inventory SET quantity = quantity - 10 WHERE product_id = 2001;
UPDATE inventory SET quantity = quantity + 10 WHERE product_id = 2002;
INSERT INTO inventory_moves (from_product, to_product, quantity, move_date)
VALUES (2001, 2002, 10, current_date());
END;
spark.sql("""
BEGIN ATOMIC
UPDATE inventory SET quantity = quantity - 10 WHERE product_id = 2001;
UPDATE inventory SET quantity = quantity + 10 WHERE product_id = 2002;
INSERT INTO inventory_moves (from_product, to_product, quantity, move_date)
VALUES (2001, 2002, 10, current_date());
END;
""")
spark.sql("""
BEGIN ATOMIC
UPDATE inventory SET quantity = quantity - 10 WHERE product_id = 2001;
UPDATE inventory SET quantity = quantity + 10 WHERE product_id = 2002;
INSERT INTO inventory_moves (from_product, to_product, quantity, move_date)
VALUES (2001, 2002, 10, current_date());
END;
""")
スケジュールされたジョブでの使用
非対話型トランザクションはコミットとロールバックを自動的に処理するため、スケジュールされたジョブで適切に機能します。
BEGIN ATOMIC
-- Clear previous staging data
DELETE FROM staging_daily_sales WHERE load_date = current_date();
-- Load new data
INSERT INTO staging_daily_sales
SELECT sale_id, customer_id, amount, sale_date, current_date() as load_date
FROM raw_sales
WHERE sale_date = current_date() - INTERVAL 1 DAY;
-- Validate row count (fails transaction if no data)
IF (SELECT COUNT(*) FROM staging_daily_sales WHERE load_date = current_date()) = 0 THEN
SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'No sales data loaded for yesterday';
END IF;
-- Merge into production
MERGE INTO daily_sales AS target
USING staging_daily_sales AS source
ON target.sale_id = source.sale_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
END;
アサーションを含むいずれかのステートメントが失敗した場合、トランザクション全体が自動的にロールバックされます。
JDBC で使用する
外部クライアントは非対話型トランザクションを実行できます。
- JDBC
String sql = """
BEGIN ATOMIC
INSERT INTO orders (order_id, total) VALUES (1001, 500.00);
UPDATE customers SET last_order = CURRENT_DATE() WHERE customer_id = 5001;
END;
""";
Statement stmt = conn.createStatement();
stmt.execute(sql);
ステートメント実行APIで使用する
ステートメント実行 APIを使用して非対話型トランザクションを実行します。
import requests
sql = """
BEGIN ATOMIC
INSERT INTO sales (sale_id, amount) VALUES (3001, 750.00);
UPDATE daily_totals SET total = total + 750.00 WHERE sale_date = CURRENT_DATE();
END;
"""
response = requests.post(
f"{workspace_url}/api/2.0/sql/statements",
headers={"Authorization": f"Bearer {token}"},
json={
"warehouse_id": warehouse_id,
"statement": sql,
"wait_timeout": "30s"
}
)
ETLパターン
次のパターンは、非対話型トランザクションを使用した一般的な ETL ワークフローを示しています。
ステージングと検証パターン
このパターンは、データをステージング領域にロードし、データ品質を検証し、検証されたレコードを本番運用テーブルにマージします。
BEGIN ATOMIC
-- Load into staging
INSERT INTO staging_customers
SELECT * FROM external_source
WHERE ingest_date = current_date();
-- Validate data quality
IF (SELECT COUNT(*) FROM staging_customers WHERE email NOT LIKE '%@%') > 0 THEN
SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Invalid email addresses found';
END IF;
-- Merge validated data
MERGE INTO customers AS target
USING staging_customers AS source
ON target.customer_id = source.customer_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
-- Update metadata
UPDATE etl_metadata
SET last_load_date = current_date(),
rows_processed = (SELECT COUNT(*) FROM staging_customers)
WHERE table_name = 'customers';
END;
ディメンションとファクトテーブルパターン
このパターンは、参照整合性を維持するために、ファクト テーブルをロードする前にディメンション テーブルを更新します。
BEGIN ATOMIC
-- Update dimension tables first
MERGE INTO dim_products AS target
USING staging_products AS source
ON target.product_id = source.product_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
MERGE INTO dim_customers AS target
USING staging_customers AS source
ON target.customer_id = source.customer_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
-- Then load fact table with foreign key references
INSERT INTO fact_sales
SELECT s.sale_id, p.product_key, c.customer_key, s.sale_amount, s.sale_date
FROM staging_sales s
JOIN dim_products p ON s.product_id = p.product_id
JOIN dim_customers c ON s.customer_id = c.customer_id;
END;
エラー処理
BEGIN ATOMIC ... END;ブロック内でステートメントが失敗すると、Databricks はすべての変更をロールバックし、エラー メッセージを返します。
デバッグのヒント:
- エラー メッセージを確認して、どのステートメントが失敗したかを特定します。
- トランザクション ブロック外でステートメントを個別にテストします。
SIGNALを使用して検証チェックを追加し、カスタム エラー メッセージで失敗します。- 追加のコンテキストについては、トランザクション履歴を照会してください。
インタラクティブなトランザクション
対話型トランザクションにより、トランザクションの境界を明示的に制御できます。トランザクションを手動で開始し、ステートメントを実行し、明示的にコミットまたはロールバックします。
サポートされるコンピュート : SQLウェアハウスのみ。
サポートされている構文 : SQL のみ。
構文
BEGIN TRANSACTION;
statement1;
statement2;
COMMIT;
-- or: ROLLBACK;
コミットする前に検証する
コミットする前に対話型トランザクションを使用して結果を検証します。
BEGIN TRANSACTION;
-- Load staging data
INSERT INTO staging_customers
SELECT * FROM external_customers
WHERE load_date = current_date();
-- Validate and commit or rollback
BEGIN
DECLARE duplicate_count INT;
SET duplicate_count = (
SELECT COUNT(*) FROM (
SELECT customer_id, COUNT(*) as cnt
FROM staging_customers
WHERE load_date = current_date()
GROUP BY customer_id
HAVING COUNT(*) > 1
)
);
IF duplicate_count > 0 THEN
ROLLBACK;
SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Duplicate customers found in staging data';
ELSE
MERGE INTO customers AS target
USING staging_customers AS source
ON target.customer_id = source.customer_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
COMMIT;
END IF;
END;
明示的なロールバック
検証が失敗した場合、またはビジネス ロジックで変更を破棄する必要がある場合は、トランザクションをロールバックします。
BEGIN TRANSACTION;
UPDATE inventory
SET quantity = quantity - 50
WHERE product_id = 2001;
-- Check if quantity would go negative
BEGIN
DECLARE new_quantity INT;
SET new_quantity = (SELECT quantity FROM inventory WHERE product_id = 2001);
IF new_quantity < 0 THEN
ROLLBACK;
SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Insufficient inventory for product 2001';
ELSE
COMMIT;
END IF;
END;
JDBC で使用する
JDBC ドライバーは、トランザクション内でexecuteUpdate()を使用して DML ステートメントを実行することをサポートしています。サポートされている DML ステートメントの一覧については、 「サポートされている操作」を参照してください。
JDBC クライアントは、自動コミット モードを無効にして対話型トランザクションを使用します。
Connection conn = DriverManager.getConnection(jdbcUrl, properties);
try {
conn.setAutoCommit(false); // Start transaction mode
Statement stmt = conn.createStatement();
stmt.executeUpdate("INSERT INTO accounts (account_id, balance) VALUES (1001, 5000)");
stmt.executeUpdate("UPDATE accounts SET balance = balance - 100 WHERE account_id = 1001");
conn.commit(); // Commit the transaction
} catch (SQLException e) {
conn.rollback(); // Roll back on error
throw e;
} finally {
conn.close();
}
サポートされていないJDBC操作
次の JDBC 操作は、対話型トランザクション内ではサポートされません。
カテゴリー | サポートされていない |
|---|---|
カタログまたはスキーマの切り替え |
|
セッション構成の変更 |
|
すべての DatabaseMetaData (すべてのプロトコル) | すべての |
PreparedStatementメタデータ |
|
ストアドプロシージャ |
|
ODBCで使用する
ODBC ドライバーは、トランザクション内でSQLExecute()とSQLExecDirect()を使用して DML ステートメントを実行することをサポートしています。サポートされている DML ステートメントの一覧については、 「サポートされている操作」を参照してください。
ODBC クライアントは、標準の ODBC トランザクション管理関数を使用して、Databricks ODBC ドライバーで対話型トランザクションを使用できます。
サポートされていないODBC操作
次の ODBC 操作は、対話型トランザクション内ではサポートされていません。
カテゴリー | サポートされていない |
|---|---|
すべてのカタログ機能 |
|
接続属性の設定 | カタログの切り替え、分離レベルの変更、アクセスモードの変更 |
SQL翻訳 |
|
Python用Databricks SQLコネクタを使用する
Databricks SQL Connector for Python は、トランザクション内でcursor.execute()を使用して DML ステートメントを実行することをサポートします。サポートされている DML ステートメントの一覧については、 「サポートされている操作」を参照してください。
Python アプリケーションは、 autocommit=Falseを設定することで、 Python 用 Databricks SQL コネクタを使用した対話型トランザクションを使用できます。
from databricks import sql
with sql.connect(
server_hostname="dbc-a1b2345c-d6e7.cloud.databricks.com",
http_path="sql/1.0/warehouses/abc123def456",
access_token="your-access-token",
autocommit=False
) as connection:
with connection.cursor() as cursor:
cursor.execute("INSERT INTO accounts (account_id, balance) VALUES (1001, 5000)")
cursor.execute("UPDATE accounts SET balance = balance - 100 WHERE account_id = 1001")
connection.commit()
サポートされていない Python コネクタ操作
次の Python コネクタ操作は、対話型トランザクション内ではサポートされていません。
カテゴリー | サポートされていない |
|---|---|
すべてのメタデータ |
|
対話型トランザクションにおけるドライバーの制限
対話型トランザクションを使用する場合、すべてのドライバーに次の制限が適用されます。
メタデータ操作は、対話型トランザクション内ではサポートされません。次の操作は、ドライバーやプロトコルに関係なく、トランザクション内で失敗する可能性があります。
ドライバー/プロトコル | Type | 方法 |
|---|---|---|
JDBC |
|
|
ODBC | カタログ機能 |
|
Pythonコネクタ | メタデータメソッド |
|
SQL | メタデータコマンド |
|
SQL |
|
|
すべてのメタデータ操作をトランザクションの外部で実行します。
単一のドライバー接続オブジェクト上の複数のスレッドでトランザクションを実行すると、未定義の動作が発生します。各接続オブジェクトでは一度に 1 つのトランザクションのみを実行します。
分離の挙動
対話型トランザクションでコミットされていない変更は、セッションでのみ表示されます。他のセッションでは、トランザクションが開始される前のテーブルの状態が表示されます。
対話型トランザクションでは、非対話型トランザクションよりも保守的な競合検出が使用され、テーブル レベルで競合が発生する可能性があります (無条件追加を除く)。行レベルの競合検出には、非対話型トランザクション ( BEGIN ATOMIC ... END; ) を使用します。
- 分離を確認するには、サンプル テーブルが存在しない場合は作成します。
CREATE TABLE IF NOT EXISTS sample_accounts (
id INT,
account_name STRING,
balance DECIMAL(10,2)
) USING DELTA
TBLPROPERTIES ('delta.feature.catalogManaged' = 'supported');
-
同じセッションで、トランザクションを開始して変更を加えます。
SQLBEGIN TRANSACTION;
INSERT INTO sample_accounts VALUES (10, 'Test', 100.00); -
別の SQL エディター タブまたはノートブック セッション (同じノートブック内の新しいセルではない) で、テーブルをクエリします。
SQL-- Run this in the SECOND session
SELECT * FROM sample_accounts WHERE id = 10;コミットされていない変更は最初のセッションの外部では表示されないため、0 行が返されます。
-
最初のセッションに戻ってコミットします。
SQLCOMMIT; -
2 番目のセッションから再度クエリを実行します。
SQL-- Run this in the SECOND session
SELECT * FROM sample_accounts WHERE id = 10;トランザクションがコミットされたため、行は表示されます。
この分離により、ロールバックされる可能性のあるデータを他のユーザーが読み取ることが防止されます。
トランザクションモードを選択する
シナリオ | 推奨モード |
|---|---|
スケジュールされたETLジョブ | 非対話型 - 自動コミットまたはロールバックによりエラー処理が簡素化されます |
固定されたステートメントシーケンス | 非対話型 - よりシンプルな構文、手動コミットは不要 |
コミット前のデータ検証 | インタラクティブ - 結果を検査し、コミットするかどうかを決定します |
手動制御が必要なJDBCアプリケーション | インタラクティブ - 標準データベーストランザクションパターン |
次のステップ
関連するSQLリファレンス
- ATOMIC 複合ステートメント (非対話型トランザクション) : 自動コミットとロールバックを使用して、複数の SQL ステートメントを単一のアトミック トランザクションとして実行します。
- BEGIN TRANSACTION (対話型トランザクション) : 手動コミットおよびロールバック制御を使用して対話型トランザクションを開始します。
- コミット: 対話型トランザクションをコミットし、すべての変更を永続的にします。
- ROLLBACK : 対話型トランザクションをロールバックし、すべての変更を破棄します。