JDBCを使用したデータベースのクエリー

Databricksでは、JDBCを使用した外部データベースへの接続がサポートされています。この記事では、これらの接続を構成して使用するための基本的な構文を、Python、SQL、およびScala の例とともに提供します。

実験段階

この記事で説明する設定は試験段階のものです。実験的な機能はそのまま提供されるもので、Databricksが顧客テクニカルサポートを通じてサポートするものではありません。クエリーフェデレーションを完全にサポートするには、代わりにレイクハウスフェデレーションを使用する必要があります。これにより、DatabricksユーザーはUnity Catalog構文とデータガバナンスツールを利用できるようになります。

Partner Connectは、多くの外部データソースとデータを同期するための最適化された統合を提供します。「 Databricks Partner Connectとは」を参照してください。

重要

この記事に挙げる例には、JDBC URLのユーザー名とパスワードは含まれていません。Databricksでは、シークレット を使用してデータベースの資格情報を保存することを推奨しています。例:

username = dbutils.secrets.get(scope = "jdbc", key = "username")
password = dbutils.secrets.get(scope = "jdbc", key = "password")
val username = dbutils.secrets.get(scope = "jdbc", key = "username")
val password = dbutils.secrets.get(scope = "jdbc", key = "password")

SQLでDatabricksシークレットを参照するには、クラスターの開始時にSpark構成プロパティを構成する必要があります。

シークレット管理の完全な例については、「 チュートリアル: Databricks シークレットを作成して使用する」を参照してください。

クラウド接続を確立する

Databricks VPCは、Sparkクラスターのみを許可するように設定されています。別のインフラストラクチャに接続する場合は、VPCピアリングの使用がベストプラクティスになります。VPCピアリングが確立されたら、クラスターの netcat ユーティリティで確認できます。

%sh nc -vz <jdbcHostname> <jdbcPort>

JDBCを用いたデータの読み取り

JDBCを使用してデータを読み取るには、多数の設定を構成する必要があります。各データベースは、 <jdbc-url>に対して異なる形式を使用する点に注意してください。

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .load()
)
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
)
val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .load()

Sparkは、データベーステーブルからスキーマを自動的に読み取り、その型をSpark SQL型にマップします。

employees_table.printSchema
DESCRIBE employees_table_vw
employees_table.printSchema

この JDBC テーブルに対してクエリーを実行できます。

display(employees_table.select("age", "salary").groupBy("age").avg("salary"))
SELECT age, avg(salary) as salary
FROM employees_table_vw
GROUP BY age
display(employees_table.select("age", "salary").groupBy("age").avg("salary"))

JDBCを使用したデータの書き込み

JDBCを使用したテーブルへのデータの保存では、読み取りと同様の構成が使用されます。次の例を参照してください。

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()
)
CREATE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT * FROM employees_table_vw
employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()

デフォルトの動作では、新しいテーブルの作成が試行され、その名前のテーブルが既に存在する場合はエラーがスローされます。

次の構文を使用すると、既存のテーブルにデータを追加できます。

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("append")
  .save()
)
CREATE TABLE IF NOT EXISTS new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
);

INSERT INTO new_employees_table
SELECT * FROM employees_table_vw;
employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("append")
  .save()

次の構文を使用すると、既存のテーブルを上書きできます。

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("overwrite")
  .save()
)
CREATE OR REPLACE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT * FROM employees_table_vw;
employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("overwrite")
  .save()

JDBCクエリーの並列処理の制御

デフォルトでは、JDBCドライバは1つのスレッドのみでソースデータベースを照会します。読み込みのパフォーマンスを向上させるには、Databricksがデータベースに対して同時に実行するクエリーの数を制御するために、いくつかのオプションを指定する必要があります。小規模なクラスターの場合、numPartitionsオプションをクラスター内のExecutorコアの数と同じに設定すると、すべてのノードがデータを並列にクエリーできるようになります。

警告

大規模なクラスターでnumPartitionsを高い値に設定すると、同時クエリーが多すぎてサービスが過負荷になる可能性があり、リモートデータベースのパフォーマンスが低下する可能性があります。これは、アプリケーションデータベースでは特に厄介です。この値を50より大きく設定する場合は注意が必要です。

注:

partitionColumnのソースデータベースで計算されたインデックスを持つ列を選択することにより、クエリーを高速化します。

次のコード例は、8つのコアを持つクラスターの並列処理を構成する方法を示しています。

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  # a column that can be used that has a uniformly distributed range of values that can be used for parallelization
  .option("partitionColumn", "<partition-key>")
  # lowest value to pull data for with the partitionColumn
  .option("lowerBound", "<min-value>")
  # max value to pull data for with the partitionColumn
  .option("upperBound", "<max-value>")
  # number of partitions to distribute the data into. Do not set this very large (~hundreds)
  .option("numPartitions", 8)
  .load()
)
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>',
  partitionColumn "<partition-key>",
  lowerBound "<min-value>",
  upperBound "<max-value>",
  numPartitions 8
)
val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  // a column that can be used that has a uniformly distributed range of values that can be used for parallelization
  .option("partitionColumn", "<partition-key>")
  // lowest value to pull data for with the partitionColumn
  .option("lowerBound", "<min-value>")
  // max value to pull data for with the partitionColumn
  .option("upperBound", "<max-value>")
  // number of partitions to distribute the data into. Do not set this very large (~hundreds)
  .option("numPartitions", 8)
  .load()

注:

Databricksでは、JDBCを構成するためのすべてのApache Sparkオプションがサポートされています。

JDBCを使用してデータベースに書き込む場合、Apache Sparkはメモリ内のパーティションの数を使用して並列処理を制御します。並列処理を制御するために、書き込み前にデータを再パーティション化できます。大規模なクラスターでは多数のパーティションを避けて、リモートデータベースの過負荷を回避します。次の例は、書き込み前に8つのパーティションに再パーティション化する方法を示しています。

(employees_table.repartition(8)
  .write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()
)
CREATE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT /*+ REPARTITION(8) */ * FROM employees_table_vw
employees_table.repartition(8)
  .write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()

クエリーをデータベースエンジンにプッシュダウンする

クエリー全体をデータベースにプッシュダウンして、結果だけを返すことができます。tableパラメーターは、読み取るJDBCテーブルを識別します。SQLクエリーFROM句で有効なものなら何でも使用できます。

pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", pushdown_query)
  .option("user", "<username>")
  .option("password", "<password>")
  .load()
)
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "(select * from employees where emp_no < 10008) as emp_alias",
  user '<username>',
  password '<password>'
)
val pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", pushdown_query)
  .option("user", "<username>")
  .option("password", "<password>")
  .load()

クエリーごとにフェッチされる行数を制御する

JDBCドライバには、リモートデータベースから一度にフェッチされるロー数を制御する、fetchSizeパラメーターがあります。

設定

結果

低すぎる

多くのラウンドトリップによる待機時間が長い(クエリーごとに返される行が少ない)

高すぎます

メモリ不足エラー(1つのクエリーで返されるデータが多すぎる)

最適値はワークロードによって異なります。考慮事項は次のとおりです。

  • クエリーによって返される列の数はいくつか?

  • どのようなデータ型が返されるか?

  • 各列の文字列はどのくらいの期間返されるか?

システムのデフォルト値は非常に小さく、チューニングの恩恵を受ける可能性があります。例:オラクルのデフォルトのfetchSizeは10です。100に増やすと、実行する必要があるクエリーの総数が10分の1に減ります。JDBCの結果はネットワークトラフィックであるため、非常に大きな数は避けますが、多くのデータセットでは最適な値が数千になる可能性があります。

次の例のように、fetchSizeオプションを使用します。

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .option("fetchSize", "100")
  .load()
)
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'.
  fetchSize 100
)
val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .option("fetchSize", "100")
  .load()