JDBC を使用したデータベースのクエリ
従来のクエリ フェデレーションのドキュメントは廃止されており、更新されない可能性があります。このコンテンツに記載されている製品、サービス、またはテクノロジは、Databricks によって公式に承認またはテストされたものではありません。レイクハウスフェデレーションとはを参照してください。
Databricksでは、JDBCを使用した外部データベースへの接続がサポートされています。この記事では、これらの接続を構成して使用するための基本的な構文を、Python、SQL、およびScala の例とともに提供します。
実験段階
この記事で説明する構成は 試験段階です。試験的な機能は現状のまま提供され、 Databricks を通じて顧客のテクニカル サポートを通じてサポートされることはありません。 クエリ フェデレーションを完全にサポートするには、代わりに レイクハウスフェデレーションを使用して、 Databricks ユーザーが Unity Catalog 構文ツールとデータガバナンス ツールを利用できるようにする必要があります。
Partner Connect は、多くの外部外部データソースとデータを同期するための最適化された統合を提供します。「Databricks Partner Connect とは」を参照してください。
この記事の例では、JDBC URL にユーザー名とパスワードは含まれていません。Databricks では、シークレット を使用してデータベースの資格情報を格納することをお勧めします。例えば:
- Python
- Scala
username = dbutils.secrets.get(scope = "jdbc", key = "username")
password = dbutils.secrets.get(scope = "jdbc", key = "password")
val username = dbutils.secrets.get(scope = "jdbc", key = "username")
val password = dbutils.secrets.get(scope = "jdbc", key = "password")
DatabricksSQLでSparkシークレットを参照するには 、クラスターの開始時に 設定プロパティを設定する必要があります 。
シークレット管理の完全な例については、「 チュートリアル: Databricks シークレットを作成して使用する」を参照してください。
このリリースでは、Databricks on Google CloudでSpark UIのサポートは利用できません。
JDBC によるデータの読み取り
JDBCを使用してデータを読み取るには、多数の設定を構成する必要があります。各データベースは、 <jdbc-url>
に対して異なる形式を使用する点に注意してください。
- Python
- SQL
- Scala
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.load()
)
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
)
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.load()
Sparkは、データベーステーブルからスキーマを自動的に読み取り、その型をSpark SQL型にマップします。
- Python
- SQL
- Scala
employees_table.printSchema
DESCRIBE employees_table_vw
employees_table.printSchema
この JDBC テーブルに対してクエリーを実行できます。
- Python
- SQL
- Scala
display(employees_table.select("age", "salary").groupBy("age").avg("salary"))
SELECT age, avg(salary) as salary
FROM employees_table_vw
GROUP BY age
display(employees_table.select("age", "salary").groupBy("age").avg("salary"))
JDBC を使用したデータの書き込み
JDBCを使用したテーブルへのデータの保存では、読み取りと同様の構成が使用されます。次の例を参照してください。
- Python
- SQL
- Scala
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
)
CREATE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
) AS
SELECT * FROM employees_table_vw
employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
デフォルトの動作では、新しいテーブルの作成が試行され、その名前のテーブルが既に存在する場合はエラーがスローされます。
次の構文を使用すると、既存のテーブルにデータを追加できます。
- Python
- SQL
- Scala
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("append")
.save()
)
CREATE TABLE IF NOT EXISTS new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
);
INSERT INTO new_employees_table
SELECT * FROM employees_table_vw;
employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("append")
.save()
次の構文を使用すると、既存のテーブルを上書きできます。
- Python
- SQL
- Scala
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("overwrite")
.save()
)
CREATE OR REPLACE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
) AS
SELECT * FROM employees_table_vw;
employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("overwrite")
.save()
JDBC クエリの並列処理の制御
デフォルトでは、JDBCドライバは1つのスレッドのみでソースデータベースを照会します。読み込みのパフォーマンスを向上させるには、Databricksがデータベースに対して同時に実行するクエリーの数を制御するために、いくつかのオプションを指定する必要があります。小規模なクラスターの場合、numPartitions
オプションをクラスター内のExecutorコアの数と同じに設定すると、すべてのノードがデータを並列にクエリーできるようになります。
大規模なクラスターでnumPartitions
を高い値に設定すると、同時クエリーが多すぎてサービスが過負荷になる可能性があり、リモートデータベースのパフォーマンスが低下する可能性があります。これは、アプリケーションデータベースでは特に厄介です。この値を50より大きく設定する場合は注意が必要です。
partitionColumn
のソースデータベースで計算されたインデックスを持つ列を選択することにより、クエリーを高速化します。
次のコード例は、8つのコアを持つクラスターの並列処理を構成する方法を示しています。
- Python
- SQL
- Scala
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
# a column that can be used that has a uniformly distributed range of values that can be used for parallelization
.option("partitionColumn", "<partition-key>")
# lowest value to pull data for with the partitionColumn
.option("lowerBound", "<min-value>")
# max value to pull data for with the partitionColumn
.option("upperBound", "<max-value>")
# number of partitions to distribute the data into. Do not set this very large (~hundreds)
.option("numPartitions", 8)
.load()
)
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>',
partitionColumn "<partition-key>",
lowerBound "<min-value>",
upperBound "<max-value>",
numPartitions 8
)
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
// a column that can be used that has a uniformly distributed range of values that can be used for parallelization
.option("partitionColumn", "<partition-key>")
// lowest value to pull data for with the partitionColumn
.option("lowerBound", "<min-value>")
// max value to pull data for with the partitionColumn
.option("upperBound", "<max-value>")
// number of partitions to distribute the data into. Do not set this very large (~hundreds)
.option("numPartitions", 8)
.load()
Databricksでは、JDBCを構成するためのすべてのApache Sparkオプションがサポートされています。
JDBCを使用してデータベースに書き込む場合、Apache Sparkはメモリ内のパーティションの数を使用して並列処理を制御します。並列処理を制御するために、書き込み前にデータを再パーティション化できます。大規模なクラスターでは多数のパーティションを避けて、リモートデータベースの過負荷を回避します。次の例は、書き込み前に8つのパーティションに再パーティション化する方法を示しています。
- Python
- SQL
- Scala
(employees_table.repartition(8)
.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
)
CREATE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
) AS
SELECT /*+ REPARTITION(8) */ * FROM employees_table_vw
employees_table.repartition(8)
.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
クエリをデータベース エンジンにプッシュダウンする
クエリー全体をデータベースにプッシュダウンして、結果だけを返すことができます。table
パラメーターは、読み取るJDBCテーブルを識別します。SQLクエリーFROM
句で有効なものなら何でも使用できます。
- Python
- SQL
- Scala
pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", pushdown_query)
.option("user", "<username>")
.option("password", "<password>")
.load()
)
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "(select * from employees where emp_no < 10008) as emp_alias",
user '<username>',
password '<password>'
)
val pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", pushdown_query)
.option("user", "<username>")
.option("password", "<password>")
.load()
クエリごとにフェッチされる行数を制御する
JDBCドライバには、リモートデータベースから一度にフェッチされるロー数を制御する、fetchSize
パラメーターがあります。
設定 | 結果 |
---|---|
低すぎる | 多くのラウンドトリップによる待機時間が長い(クエリーごとに返される行が少ない) |
高すぎます | メモリ不足エラー(1つのクエリーで返されるデータが多すぎる) |
最適値はワークロードによって異なります。考慮事項は次のとおりです。
- クエリーによって返される列の数はいくつか?
- どのようなデータ型が返されるか?
- 各列の文字列はどのくらいの期間返されるか?
システムのデフォルト値は非常に小さく、チューニングの恩恵を受ける可能性があります。例:オラクルのデフォルトのfetchSize
は10です。100に増やすと、実行する必要があるクエリーの総数が10分の1に減ります。JDBCの結果はネットワークトラフィックであるため、非常に大きな数は避けますが、多くのデータセットでは最適な値が数千になる可能性があります。
次の例のように、fetchSize
オプションを使用します。
- Python
- SQL
- Scala
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.option("fetchSize", "100")
.load()
)
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'.
fetchSize 100
)
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.option("fetchSize", "100")
.load()