レイクハウスフェデレーションのパフォーマンスに関する推奨事項
この記事では、レイクハウスフェデレーション クエリのパフォーマンスを向上させるためのガイダンスを提供します。
複数の述語をAND演算子で結合します。
Databricks Runtime は、ネットワーク経由で取得されるレコードの数を減らすために、述語をリモート データベース エンジンにプッシュダウンしようとします。述語をプッシュダウンできない場合、リモート データベース エンジンで実行されるクエリでは述語が除外されるため、Databricks Runtime を使用してフィルター処理を実行する必要があります。ただし、フィルターの特定の部分をプッシュダウンできない場合でも、 AND演算子で結合すればフィルターの別の部分をプッシュダウンすることは可能です。
非プッシュ可能な述語でフィルタリングする
ノートブックまたはDatabricks SQLクエリ エディタで次のクエリを実行して、 ILIKE式を使用して名前でレコードをフィルタリングします。
SELECT * FROM foreign_catalog.schema.table WHERE name ILIKE 'john'
適切な翻訳がないため、 ILIKE式をリモート データベース (MySQL など) にプッシュダウンできません。フィルタリングは Databricks Runtime を使用して実行する必要があります。
リモート データベースに送信されたクエリは、すべてのレコードを返します。
SELECT * FROM catalog.schema.table
プッシュ可能な述語とプッシュ不可能な述語でフィルタリングする
ノートブックまたはDatabricks SQLクエリエディタで次のクエリを実行して、レコードを名前と日付の両方でフィルタリングします。
SELECT * FROM foreign_catalog.schema.table WHERE name ILIKE 'john' AND date > '2025-05-01'
ILIKE式は適切な変換がないため、リモートデータベース (MySQL など) にプッシュダウンできませんが、日付比較はプッシュダウンできます。名前によるフィルタリングは依然としてDatabricks Runtimeを使用して行う必要がありますが、日付比較によって取得するレコード数を削減できます。
リモート データベースに送信されたクエリは、レコードのサブセットを返します。
SELECT * FROM catalog.schema.table WHERE date > '2025-05-01'
リモートデータベースで実行されるクエリを確認する
リモート データベースに送信されるクエリを確認するには、 EXPLAIN FORMATTEDコマンドを実行します。
適応型クエリ実行のため、実際のクエリはEXPLAIN FORMATTED出力のクエリと異なる場合があります。
リモートデータベースから取得するバッチのサイズを設定する
JDBC転送プロトコルを使用する以下のコネクタを設定することで、リモートシステムからデータを取得する方法を制御できます。
- Databricks
- Microsoft SQL Server
- Microsoft Azure Synapse
- MySQL
- Oracle
- PostgreSQL
- セールスフォース・データ360
- テラデータ
JDBC フェッチ・サイズは、ラウンド・トリップごとにフェッチするロー数を決定します。デフォルトでは、ほとんどの JDBC コネクターはデータをアトミックにフェッチします。これにより、データ量が使用可能なメモリを超える可能性があります。
メモリ不足エラーを回避するには、 fetchSize パラメーターを設定します。 fetchSize が 0 以外の値に設定されている場合、コネクタはデータをバッチで読み取ります。バッチあたりの最大行数は、 fetchSizeの値と等しくなります。Databricks では、バッチ内の行数が少なすぎるとクエリ全体の実行時間が長くなる可能性があるため、大きな fetchSize 値 ( 100,000など) を指定することをお勧めします。
この問題により、ワーカー ノードは、並列ではなくバッチでデータを読み取ることができるようになります。
コンピュートの要件:
- コンピュートは Databricks Runtime 16.1以上で使用する必要があります。 SQLウェアハウスは Pro または サーバレスで、2024.50 を使用する必要があります。
SELECT * FROM mySqlCatalog.schema.table WITH ('fetchSize' 100000)
パーティション サイズ パラメーター (Snowflake) を設定します
Snowflake 、複数のパーティションにあるデータをフェッチできるため、複数のエグゼキューターの連携と並列処理が可能になります。
partition_size_in_mb問題を設定して、適切なパーティション サイズを選択します。
この問題は、各パーティションの推奨される非圧縮サイズを指定します。 パーティション数を減らすには、より大きな値を指定してください。デフォルト値は100 (MB)です。
partition_size_in_mbは推奨サイズを設定します。パーティションの実際のサイズは異なる場合があります。
コンピュートの要件:
- コンピュートは Databricks Runtime 16.1以上で使用する必要があります。 SQLウェアハウスは Pro または サーバレスで、2024.50 を使用する必要があります。
SELECT * FROM snowflakeCatalog.schema.table WITH ('partition_size_in_mb' 1000)
JDBC コネクタの並列読み取りを有効にする
JDBC 転送プロトコルをサポートするコネクタは、クエリをパーティション化することでデータを並列に読み取ることができます。次のコネクタの並列読み取りを構成できます。
- Databricks
- Microsoft SQL Server
- Microsoft Azure Synapse
- MySQL
- Oracle
- PostgreSQL
- Redshift
- セールスフォース・データ360
- テラデータ
これにより、複数のエグゼキューターが同時にデータをフェッチできるようになり、大規模なテーブルのパフォーマンスが大幅に向上します。
並列読み取りを有効にするには、次の点を指定します。
numPartitions: 並列処理に使用するパーティションの数partitionColumn: クエリのパーティション分割に使用される数値列の名前lowerBound: パーティションストライドの決定に使用されるpartitionColumnの最小値upperBound: パーティションストライドの決定に使用されるpartitionColumnの最大値
lowerBoundとupperBound値は、パーティションのストライドを決定するためだけに使用され、テーブル内の行をフィルタリングするためには使用されません。テーブル内のすべての行が分割され、返されます。
パーティション列は次のようになります。
- 数値列
- 範囲全体に均等に分布
- パフォーマンスを向上させるためのインデックス付き列
コンピュートの要件:
- Databricks Runtime 17.1 以降ではコンピュートを使用する必要があります。 SQLは Pro またはサーバレスであり、2025.25 を使用する必要があります。
次の例では、クエリはid列に基づいて 4 つの並列パーティションに分割され、各パーティションは約 250 個の ID の範囲を処理します ( 1と1000の間には、各idに対して 1 つのレコードがあると仮定します)。
SELECT * FROM mySqlCatalog.schema.table WITH (
'numPartitions' 4,
'partitionColumn' 'id',
'lowerBound' 1,
'upperBound' 1000
)
並列読み取りに対応したソースデータベースビューを使用する
フェデレーションテーブルを参照するDatabricks作成のビューをクエリする場合、並列読み取りはサポートされていません。代わりに、ソースデータベースにビューを作成してください。
ビューで並列読み取りを使用するには、Databricksではなくソースデータベースでビューを作成してください。
ソースデータベースでは:
CREATE VIEW my_source_database_view AS SELECT * FROM source_database_schema.table;
次に、 Databricksから、並列読み取りを使用してビューをクエリします。
SELECT * FROM myfederated_catalog.schema.my_source_database_view WITH (
'numPartitions' 4,
'partitionColumn' 'id',
'lowerBound' 1,
'upperBound' 1000
)
レイクハウスフェデレーションでプッシュダウンに参加する
プレビュー
この機能は パブリック プレビュー段階です。
Databricksレイクハウスフェデレーションで結合プッシュダウンがどのように機能するかを学びます。
プッシュダウン参加の概要
結合プッシュダウンは、データを取得してローカルで結合を実行する代わりに、Databricks が結合操作をリモート データベース エンジンにプッシュするクエリ最適化手法です。これにより、リモート データベースの組み込み結合機能を活用して、ネットワーク トラフィックが大幅に削減され、クエリ パフォーマンスが向上します。
サポートされているデータソース
次のデータソースは結合プッシュダウンをサポートしています。
- Oracle
- PostgreSQL
- MySQL
- SQL Server
- テラデータ
- Redshift
- Snowflake
- BigQuery
この機能は一般提供されており、Redshift、Snowflake、BigQuery ではデフォルトで有効になっています。次の制限と要件は、Oracle、PostgreSQL、MySQL、SQL Server、および Teradata コネクタにのみ適用されます。
要件
- Databricks Runtime 17.2 以降ではコンピュートを使用する必要があります。
- SQLは Pro またはサーバレスであり、2025.30 を使用する必要があります。
- Databricks UI の プレビュー ページで、 フェデレーション クエリの結合プッシュダウン (パブリック プレビュー) をオンにする必要があります。
制限事項
- 内部結合、左外部結合、右外部結合のみがサポートされます。
- 結合の子のエイリアスは、Databricks Runtime 17.3 以降でのみサポートされます。
ノード階層の要件
結合をプッシュダウンするには、左側と右側の子ブランチ内のすべてのノードもプッシュ可能である必要があります。以下のルールが適用されます。
- サポートされている子ノード : プッシュダウンが成功するには、クエリ プラン内の結合の下に結合、フィルター、サンプル、およびスキャン ノードのみを表示できます。
- サポートされていない子ノード : 結合の下の左または右のブランチのいずれかに制限、オフセット、または集計操作が表示される場合、結合をプッシュダウンできません。
- 結合の上の操作 : 集計、制限、およびオフセットの操作は、結合の上に適用するとプッシュダウンできます。
サポートされている結合パターンとサポートされていない結合パターンを識別する
-- Supported: Join two table scans
SELECT *
FROM table1
INNER JOIN table2
ON col_from_table1 = col_from_table2 + 1
-- Supported: Join two table scans with a nested select query
SELECT *
FROM (SELECT a FROM table1) q1
INNER JOIN (SELECT a FROM table2) q2
ON q1.a = q2.a + 1
-- Supported: Child subqueries with aliases in projection (:re[DBR] 17.3+)
SELECT *
FROM (SELECT a AS a1 FROM table1) t1
INNER JOIN (SELECT a AS a2 FROM table2) t2
ON t1.a1 = t2.a2 + 1
-- Supported: Join with filters below
SELECT *
FROM (SELECT * FROM table1 WHERE a > 10) t1
INNER JOIN (SELECT * FROM table2 WHERE b < 20) t2
ON t1.id = t2.id
-- Supported: Aggregate on top of join
SELECT COUNT(*)
FROM table1 t1
INNER JOIN table2 t2
ON t1.id = t2.id
-- Not supported: Join on top of aggregate
SELECT *
FROM (SELECT id, COUNT(*) as cnt FROM table1 GROUP BY id) t1
INNER JOIN table2 t2
ON t1.id = t2.id
-- Not supported: Join on top of limit
SELECT *
FROM (SELECT * FROM table1 LIMIT 100) t1
INNER JOIN table2 t2
ON t1.id = t2.id
可観測性
EXPLAIN FORMATTED使用して、結合がプッシュダウンされていることを確認します。
EXPLAIN FORMATTED
SELECT *
FROM foreign_catalog.schema.table1 t1
INNER JOIN foreign_catalog.schema.table2 t2
ON t1.id = t2.id
結合プッシュダウンが成功したことを示す出力例:
== Physical Plan ==
*(1) Scan JDBCRelation
PushedFilters: [id = id_1],
PushedJoins:
[L]: Relation: foreign_catalog.schema.table1
PushedFilters: [ID IS NOT NULL]
[R]: Relation: foreign_catalog.schema.table2
PushedFilters: [ID IS NOT NULL]
この出力では、
id_1列名が重複している場合にあいまいさを解決するために Databricks が自動的に生成する別名です。PushedJoinsの上のPushedFiltersは、リモート データベースにプッシュされる実際の結合条件を表します。- 各リレーション([L] および [R])の
PushedFiltersは、各テーブルに適用された追加のフィルター述語を示します。