メインコンテンツまでスキップ

JDBC を使用したデータベースのクエリ

Databricksでは、JDBCを使用した外部データベースへの接続がサポートされています。この記事では、これらの接続を構成して使用するための基本的な構文を、Python、SQL、およびScala の例とともに提供します。

備考

実験段階

この記事で説明する構成は 試験段階です。 試験的な機能は現状のまま提供され、 Databricks を通じて顧客のテクニカル サポートを通じてサポートされることはありません。 クエリ フェデレーションを完全にサポートするには、代わりに レイクハウスフェデレーションを使用して、 Databricks ユーザーが Unity Catalog 構文ツールとデータガバナンス ツールを利用できるようにする必要があります。

Partner Connect は、多くの外部外部データソースとデータを同期するための最適化された統合を提供します。 「Databricks Partner Connect とは」を参照してください。

important

この記事の例では、JDBC URL にユーザー名とパスワードは含まれていません。 Databricks では、シークレット を使用してデータベースの資格情報を格納することをお勧めします。 例えば:

Python
username = dbutils.secrets.get(scope = "jdbc", key = "username")
password = dbutils.secrets.get(scope = "jdbc", key = "password")

DatabricksSQLでSparkシークレットを参照するには 、クラスターの開始時に 設定プロパティを設定する必要があります 。

シークレット管理の完全な例については、「 チュートリアル: Databricks シークレットを作成して使用する」を参照してください。

注記

このリリースでは、Databricks on Google CloudでSpark UIのサポートは利用できません。

JDBC によるデータの読み取り

JDBCを使用してデータを読み取るには、多数の設定を構成する必要があります。各データベースは、 <jdbc-url>に対して異なる形式を使用する点に注意してください。

Python
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.load()
)

Sparkは、データベーステーブルからスキーマを自動的に読み取り、その型をSpark SQL型にマップします。

Python
employees_table.printSchema

この JDBC テーブルに対してクエリーを実行できます。

Python
display(employees_table.select("age", "salary").groupBy("age").avg("salary"))

JDBC を使用したデータの書き込み

JDBCを使用したテーブルへのデータの保存では、読み取りと同様の構成が使用されます。次の例を参照してください。

Python
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
)

デフォルトの動作では、新しいテーブルの作成が試行され、その名前のテーブルが既に存在する場合はエラーがスローされます。

次の構文を使用すると、既存のテーブルにデータを追加できます。

Python
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("append")
.save()
)

次の構文を使用すると、既存のテーブルを上書きできます。

Python
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("overwrite")
.save()
)

JDBC クエリの並列処理の制御

デフォルトでは、JDBCドライバは1つのスレッドのみでソースデータベースを照会します。読み込みのパフォーマンスを向上させるには、Databricksがデータベースに対して同時に実行するクエリーの数を制御するために、いくつかのオプションを指定する必要があります。小規模なクラスターの場合、numPartitionsオプションをクラスター内のExecutorコアの数と同じに設定すると、すべてのノードがデータを並列にクエリーできるようになります。

警告

大規模なクラスターでnumPartitionsを高い値に設定すると、同時クエリーが多すぎてサービスが過負荷になる可能性があり、リモートデータベースのパフォーマンスが低下する可能性があります。これは、アプリケーションデータベースでは特に厄介です。この値を50より大きく設定する場合は注意が必要です。

注記

partitionColumnのソースデータベースで計算されたインデックスを持つ列を選択することにより、クエリーを高速化します。

次のコード例は、8つのコアを持つクラスターの並列処理を構成する方法を示しています。

Python
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
# a column that can be used that has a uniformly distributed range of values that can be used for parallelization
.option("partitionColumn", "<partition-key>")
# lowest value to pull data for with the partitionColumn
.option("lowerBound", "<min-value>")
# max value to pull data for with the partitionColumn
.option("upperBound", "<max-value>")
# number of partitions to distribute the data into. Do not set this very large (~hundreds)
.option("numPartitions", 8)
.load()
)
注記

Databricksでは、JDBCを構成するためのすべてのApache Sparkオプションがサポートされています。

JDBCを使用してデータベースに書き込む場合、Apache Sparkはメモリ内のパーティションの数を使用して並列処理を制御します。並列処理を制御するために、書き込み前にデータを再パーティション化できます。大規模なクラスターでは多数のパーティションを避けて、リモートデータベースの過負荷を回避します。次の例は、書き込み前に8つのパーティションに再パーティション化する方法を示しています。

Python
(employees_table.repartition(8)
.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
)

クエリをデータベース エンジンにプッシュダウンする

クエリー全体をデータベースにプッシュダウンして、結果だけを返すことができます。tableパラメーターは、読み取るJDBCテーブルを識別します。SQLクエリーFROM句で有効なものなら何でも使用できます。

Python
pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"

employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", pushdown_query)
.option("user", "<username>")
.option("password", "<password>")
.load()
)

クエリごとにフェッチされる行数を制御する

JDBCドライバには、リモートデータベースから一度にフェッチされるロー数を制御する、fetchSizeパラメーターがあります。

設定

結果

低すぎる

多くのラウンドトリップによる待機時間が長い(クエリーごとに返される行が少ない)

高すぎます

メモリ不足エラー(1つのクエリーで返されるデータが多すぎる)

最適値はワークロードによって異なります。考慮事項は次のとおりです。

  • クエリーによって返される列の数はいくつか?
  • どのようなデータ型が返されるか?
  • 各列の文字列はどのくらいの期間返されるか?

システムのデフォルト値は非常に小さく、チューニングの恩恵を受ける可能性があります。例:オラクルのデフォルトのfetchSizeは10です。100に増やすと、実行する必要があるクエリーの総数が10分の1に減ります。JDBCの結果はネットワークトラフィックであるため、非常に大きな数は避けますが、多くのデータセットでは最適な値が数千になる可能性があります。

次の例のように、fetchSizeオプションを使用します。

Python
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.option("fetchSize", "100")
.load()
)