JDBC を使用したデータベースのクエリ
Databricksでは、JDBCを使用した外部データベースへの接続がサポートされています。この記事では、これらの接続を構成して使用するための基本的な構文を、Python、SQL、およびScala の例とともに提供します。
実験段階
この記事で説明する構成は 試験段階です。 試験的な機能は現状のまま提供され、 Databricks を通じて顧客のテクニカル サポートを通じてサポートされることはありません。 クエリ フェデレーションを完全にサポートするには、代わりに レイクハウスフェデレーションを使用して、 Databricks ユーザーが Unity Catalog 構文ツールとデータガバナンス ツールを利用できるようにする必要があります。
Partner Connect は、多くの外部外部データソースとデータを同期するための最適化された統合を提供します。 「Databricks Partner Connect とは」を参照してください。
この記事の例では、JDBC URL にユーザー名とパスワードは含まれていません。 Databricks では、シークレット を使用してデータベースの資格情報を格納することをお勧めします。 例えば:
- Python
- Scala
username = dbutils.secrets.get(scope = "jdbc", key = "username")
password = dbutils.secrets.get(scope = "jdbc", key = "password")
val username = dbutils.secrets.get(scope = "jdbc", key = "username")
val password = dbutils.secrets.get(scope = "jdbc", key = "password")
DatabricksSQLでSparkシークレットを参照するには 、クラスターの開始時に 設定プロパティを設定する必要があります 。
シークレット管理の完全な例については、「 チュートリアル: Databricks シークレットを作成して使用する」を参照してください。
このリリースでは、Databricks on Google CloudでSpark UIのサポートは利用できません。
JDBC によるデータの読み取り
JDBCを使用してデータを読み取るには、多数の設定を構成する必要があります。各データベースは、 <jdbc-url>
に対して異なる形式を使用する点に注意してください。
- Python
- SQL
- Scala
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.load()
)
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
)
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.load()
Sparkは、データベーステーブルからスキーマを自動的に読み取り、その型をSpark SQL型にマップします。
- Python
- SQL
- Scala
employees_table.printSchema
DESCRIBE employees_table_vw
employees_table.printSchema
この JDBC テーブルに対してクエリーを実行できます。
- Python
- SQL
- Scala
display(employees_table.select("age", "salary").groupBy("age").avg("salary"))
SELECT age, avg(salary) as salary
FROM employees_table_vw
GROUP BY age
display(employees_table.select("age", "salary").groupBy("age").avg("salary"))
JDBC を使用したデータの書き込み
JDBCを使用したテーブルへのデータの保存では、読み取りと同様の構成が使用されます。次の例を参照してください。
- Python
- SQL
- Scala
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
)
CREATE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
) AS
SELECT * FROM employees_table_vw
employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
デフォルトの動作では、新しいテーブルの作成が試行され、その名前のテーブルが既に存在する場合はエラーがスローされます。
次の構文を使用すると、既存のテーブルにデータを追加できます。
- Python
- SQL
- Scala
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("append")
.save()
)
CREATE TABLE IF NOT EXISTS new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
);
INSERT INTO new_employees_table
SELECT * FROM employees_table_vw;
employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("append")
.save()
次の構文を使用すると、既存のテーブルを上書きできます。
- Python
- SQL
- Scala
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("overwrite")
.save()
)
CREATE OR REPLACE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
) AS
SELECT * FROM employees_table_vw;
employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("overwrite")
.save()
JDBC クエリの並列処理の制御
デフォルトでは、JDBCドライバは1つのスレッドのみでソースデータベースを照会します。読み込みのパフォーマンスを向上させるには、Databricksがデータベースに対して同時に実行するクエリーの数を制御するために、いくつかのオプションを指定する必要があります。小規模なクラスターの場合、numPartitions
オプションをクラスター内のExecutorコアの数と同じに設定すると、すべてのノードがデータを並列にクエリーできるようになります。
大規模なクラスターでnumPartitions
を高い値に設定すると、同時クエリーが多すぎてサービスが過負荷になる可能性があり、リモートデータベースのパフォーマンスが低下する可能性があります。これは、アプリケーションデータベースでは特に厄介です。この値を50より大きく設定する場合は注意が必要です。
partitionColumn
のソースデータベースで計算されたインデックスを持つ列を選択することにより、クエリーを高速化します。
次のコード例は、8つのコアを持つクラスターの並列処理を構成する方法を示しています。
- Python
- SQL
- Scala
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
# a column that can be used that has a uniformly distributed range of values that can be used for parallelization
.option("partitionColumn", "<partition-key>")
# lowest value to pull data for with the partitionColumn
.option("lowerBound", "<min-value>")
# max value to pull data for with the partitionColumn
.option("upperBound", "<max-value>")
# number of partitions to distribute the data into. Do not set this very large (~hundreds)
.option("numPartitions", 8)
.load()
)
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>',
partitionColumn "<partition-key>",
lowerBound "<min-value>",
upperBound "<max-value>",
numPartitions 8
)
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
// a column that can be used that has a uniformly distributed range of values that can be used for parallelization
.option("partitionColumn", "<partition-key>")
// lowest value to pull data for with the partitionColumn
.option("lowerBound", "<min-value>")
// max value to pull data for with the partitionColumn
.option("upperBound", "<max-value>")
// number of partitions to distribute the data into. Do not set this very large (~hundreds)
.option("numPartitions", 8)
.load()
Databricksでは、JDBCを構成するためのすべてのApache Sparkオプションがサポートされています。
JDBCを使用してデータベースに書き込む場合、Apache Sparkはメモリ内のパーティションの数を使用して並列処理を制御します。並列処理を制御するために、書き込み前にデータを再パーティション化できます。大規模なクラスターでは多数のパーティションを避けて、リモートデータベースの過負荷を回避します。次の例は、書き込み前に8つのパーティションに再パーティション化する方法を示しています。
- Python
- SQL
- Scala
(employees_table.repartition(8)
.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
)
CREATE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
) AS
SELECT /*+ REPARTITION(8) */ * FROM employees_table_vw
employees_table.repartition(8)
.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
クエリをデータベース エンジンにプッシュダウンする
クエリー全体をデータベースにプッシュダウンして、結果だけを返すことができます。table
パラメーターは、読み取るJDBCテーブルを識別します。SQLクエリーFROM
句で有効なものなら何でも使用できます。
- Python
- SQL
- Scala
pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", pushdown_query)
.option("user", "<username>")
.option("password", "<password>")
.load()
)
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "(select * from employees where emp_no < 10008) as emp_alias",
user '<username>',
password '<password>'
)
val pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", pushdown_query)
.option("user", "<username>")
.option("password", "<password>")
.load()
クエリごとにフェッチされる行数を制御する
JDBCドライバには、リモートデータベースから一度にフェッチされるロー数を制御する、fetchSize
パラメーターがあります。
設定 | 結果 |
---|---|
低すぎる | 多くのラウンドトリップによる待機時間が長い(クエリーごとに返される行が少ない) |
高すぎます | メモリ不足エラー(1つのクエリーで返されるデータが多すぎる) |
最適値はワークロードによって異なります。考慮事項は次のとおりです。
- クエリーによって返される列の数はいくつか?
- どのようなデータ型が返されるか?
- 各列の文字列はどのくらいの期間返されるか?
システムのデフォルト値は非常に小さく、チューニングの恩恵を受ける可能性があります。例:オラクルのデフォルトのfetchSize
は10です。100に増やすと、実行する必要があるクエリーの総数が10分の1に減ります。JDBCの結果はネットワークトラフィックであるため、非常に大きな数は避けますが、多くのデータセットでは最適な値が数千になる可能性があります。
次の例のように、fetchSize
オプションを使用します。
- Python
- SQL
- Scala
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.option("fetchSize", "100")
.load()
)
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'.
fetchSize 100
)
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.option("fetchSize", "100")
.load()