JDBCを使用したデータベースのクエリー
Databricksでは、JDBCを使用した外部データベースへの接続がサポートされています。この記事では、これらの接続を構成して使用するための基本的な構文を、Python、SQL、およびScala の例とともに提供します。
実験段階
この記事で説明する設定は試験段階のものです。実験的な機能はそのまま提供されるもので、Databricksが顧客テクニカルサポートを通じてサポートするものではありません。クエリーフェデレーションを完全にサポートするには、代わりにレイクハウスフェデレーションを使用する必要があります。これにより、DatabricksユーザーはUnity Catalog構文とデータガバナンスツールを利用できるようになります。
Partner Connectは、多くの外部データソースとデータを同期するための最適化された統合を提供します。「 Databricks Partner Connectとは」を参照してください。
重要
この記事に挙げる例には、JDBC URLのユーザー名とパスワードは含まれていません。Databricksでは、シークレット を使用してデータベースの資格情報を保存することを推奨しています。例:
username = dbutils.secrets.get(scope = "jdbc", key = "username")
password = dbutils.secrets.get(scope = "jdbc", key = "password")
val username = dbutils.secrets.get(scope = "jdbc", key = "username")
val password = dbutils.secrets.get(scope = "jdbc", key = "password")
SQLでDatabricksシークレットを参照するには、クラスターの開始時にSpark構成プロパティを構成する必要があります。
シークレット管理の例の全容については、「シークレットワークフローの例」を参照してください。
クラウド接続を確立する
Databricks VPCは、Sparkクラスターのみを許可するように設定されています。別のインフラストラクチャに接続する場合は、VPCピアリングの使用がベストプラクティスになります。VPCピアリングが確立されたら、クラスターの netcat
ユーティリティで確認できます。
%sh nc -vz <jdbcHostname> <jdbcPort>
JDBCを用いたデータの読み取り
JDBCを使用してデータを読み取るには、多数の設定を構成する必要があります。各データベースは、 <jdbc-url>
に対して異なる形式を使用する点に注意してください。
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.load()
)
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
)
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.load()
Sparkは、データベーステーブルからスキーマを自動的に読み取り、その型をSpark SQL型にマップします。
employees_table.printSchema
DESCRIBE employees_table_vw
employees_table.printSchema
この JDBC テーブルに対してクエリーを実行できます。
display(employees_table.select("age", "salary").groupBy("age").avg("salary"))
SELECT age, avg(salary) as salary
FROM employees_table_vw
GROUP BY age
display(employees_table.select("age", "salary").groupBy("age").avg("salary"))
JDBCを使用したデータの書き込み
JDBCを使用したテーブルへのデータの保存では、読み取りと同様の構成が使用されます。次の例を参照してください。
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
)
CREATE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
) AS
SELECT * FROM employees_table_vw
employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
デフォルトの動作では、新しいテーブルの作成が試行され、その名前のテーブルが既に存在する場合はエラーがスローされます。
次の構文を使用すると、既存のテーブルにデータを追加できます。
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("append")
.save()
)
CREATE TABLE IF NOT EXISTS new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
);
INSERT INTO new_employees_table
SELECT * FROM employees_table_vw;
employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("append")
.save()
次の構文を使用すると、既存のテーブルを上書きできます。
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("overwrite")
.save()
)
CREATE OR REPLACE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
) AS
SELECT * FROM employees_table_vw;
employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("overwrite")
.save()
JDBCクエリーの並列処理の制御
デフォルトでは、JDBCドライバは1つのスレッドのみでソースデータベースを照会します。読み込みのパフォーマンスを向上させるには、Databricksがデータベースに対して同時に実行するクエリーの数を制御するために、いくつかのオプションを指定する必要があります。小規模なクラスターの場合、numPartitions
オプションをクラスター内のExecutorコアの数と同じに設定すると、すべてのノードがデータを並列にクエリーできるようになります。
警告
大規模なクラスターでnumPartitions
を高い値に設定すると、同時クエリーが多すぎてサービスが過負荷になる可能性があり、リモートデータベースのパフォーマンスが低下する可能性があります。これは、アプリケーションデータベースでは特に厄介です。この値を50より大きく設定する場合は注意が必要です。
注:
partitionColumn
のソースデータベースで計算されたインデックスを持つ列を選択することにより、クエリーを高速化します。
次のコード例は、8つのコアを持つクラスターの並列処理を構成する方法を示しています。
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
# a column that can be used that has a uniformly distributed range of values that can be used for parallelization
.option("partitionColumn", "<partition-key>")
# lowest value to pull data for with the partitionColumn
.option("lowerBound", "<min-value>")
# max value to pull data for with the partitionColumn
.option("upperBound", "<max-value>")
# number of partitions to distribute the data into. Do not set this very large (~hundreds)
.option("numPartitions", 8)
.load()
)
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>',
partitionColumn "<partition-key>",
lowerBound "<min-value>",
upperBound "<max-value>",
numPartitions 8
)
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
// a column that can be used that has a uniformly distributed range of values that can be used for parallelization
.option("partitionColumn", "<partition-key>")
// lowest value to pull data for with the partitionColumn
.option("lowerBound", "<min-value>")
// max value to pull data for with the partitionColumn
.option("upperBound", "<max-value>")
// number of partitions to distribute the data into. Do not set this very large (~hundreds)
.option("numPartitions", 8)
.load()
注:
Databricksでは、JDBCを構成するためのすべてのApache Sparkオプションがサポートされています。
JDBCを使用してデータベースに書き込む場合、Apache Sparkはメモリ内のパーティションの数を使用して並列処理を制御します。並列処理を制御するために、書き込み前にデータを再パーティション化できます。大規模なクラスターでは多数のパーティションを避けて、リモートデータベースの過負荷を回避します。次の例は、書き込み前に8つのパーティションに再パーティション化する方法を示しています。
(employees_table.repartition(8)
.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
)
CREATE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
) AS
SELECT /*+ REPARTITION(8) */ * FROM employees_table_vw
employees_table.repartition(8)
.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
クエリーをデータベースエンジンにプッシュダウンする
クエリー全体をデータベースにプッシュダウンして、結果だけを返すことができます。table
パラメーターは、読み取るJDBCテーブルを識別します。SQLクエリーFROM
句で有効なものなら何でも使用できます。
pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", pushdown_query)
.option("user", "<username>")
.option("password", "<password>")
.load()
)
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "(select * from employees where emp_no < 10008) as emp_alias",
user '<username>',
password '<password>'
)
val pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", pushdown_query)
.option("user", "<username>")
.option("password", "<password>")
.load()
クエリーごとにフェッチされる行数を制御する
JDBCドライバには、リモートデータベースから一度にフェッチされるロー数を制御する、fetchSize
パラメーターがあります。
設定 |
結果 |
---|---|
低すぎる |
多くのラウンドトリップによる待機時間が長い(クエリーごとに返される行が少ない) |
高すぎます |
メモリ不足エラー(1つのクエリーで返されるデータが多すぎる) |
最適値はワークロードによって異なります。考慮事項は次のとおりです。
クエリーによって返される列の数はいくつか?
どのようなデータ型が返されるか?
各列の文字列はどのくらいの期間返されるか?
システムのデフォルト値は非常に小さく、チューニングの恩恵を受ける可能性があります。例:オラクルのデフォルトのfetchSize
は10です。100に増やすと、実行する必要があるクエリーの総数が10分の1に減ります。JDBCの結果はネットワークトラフィックであるため、非常に大きな数は避けますが、多くのデータセットでは最適な値が数千になる可能性があります。
次の例のように、fetchSize
オプションを使用します。
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.option("fetchSize", "100")
.load()
)
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'.
fetchSize 100
)
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.option("fetchSize", "100")
.load()