メインコンテンツまでスキップ

JDBC を使用したデータベースのクエリ

important

従来のクエリ フェデレーションのドキュメントは廃止されており、更新されない可能性があります。このコンテンツに記載されている製品、サービス、またはテクノロジは、Databricks によって公式に承認またはテストされたものではありません。レイクハウスフェデレーションとはを参照してください。

Databricksでは、JDBCを使用した外部データベースへの接続がサポートされています。この記事では、これらの接続を構成して使用するための基本的な構文を、Python、SQL、およびScala の例とともに提供します。

備考

実験段階

この記事で説明する構成は 試験段階です。試験的な機能は現状のまま提供され、 Databricks を通じて顧客のテクニカル サポートを通じてサポートされることはありません。 クエリ フェデレーションを完全にサポートするには、代わりに レイクハウスフェデレーションを使用して、 Databricks ユーザーが Unity Catalog 構文ツールとデータガバナンス ツールを利用できるようにする必要があります。

Partner Connect は、多くの外部外部データソースとデータを同期するための最適化された統合を提供します。「Databricks Partner Connect とは」を参照してください。

important

この記事の例では、JDBC URL にユーザー名とパスワードは含まれていません。Databricks では、シークレット を使用してデータベースの資格情報を格納することをお勧めします。例えば:

Python
username = dbutils.secrets.get(scope = "jdbc", key = "username")
password = dbutils.secrets.get(scope = "jdbc", key = "password")

DatabricksSQLでSparkシークレットを参照するには 、クラスターの開始時に 設定プロパティを設定する必要があります 。

シークレット管理の完全な例については、「 チュートリアル: Databricks シークレットを作成して使用する」を参照してください。

クラウド接続の確立

Databricks VPC は、 Spark クラスターのみを許可するように設定されています。 別のインフラストラクチャに接続する場合、ベストプラクティスは VPC ピアリングを使用することです。 VPCピアリングが確立されたら、クラスターのnetcatユーティリティで確認できます。

Bash
%sh nc -vz <jdbcHostname> <jdbcPort>

JDBC によるデータの読み取り

JDBCを使用してデータを読み取るには、多数の設定を構成する必要があります。各データベースは、 <jdbc-url>に対して異なる形式を使用する点に注意してください。

Python
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.load()
)

Sparkは、データベーステーブルからスキーマを自動的に読み取り、その型をSpark SQL型にマップします。

Python
employees_table.printSchema

この JDBC テーブルに対してクエリーを実行できます。

Python
display(employees_table.select("age", "salary").groupBy("age").avg("salary"))

JDBC を使用したデータの書き込み

JDBCを使用したテーブルへのデータの保存では、読み取りと同様の構成が使用されます。次の例を参照してください。

Python
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
)

デフォルトの動作では、新しいテーブルの作成が試行され、その名前のテーブルが既に存在する場合はエラーがスローされます。

次の構文を使用すると、既存のテーブルにデータを追加できます。

Python
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("append")
.save()
)

次の構文を使用すると、既存のテーブルを上書きできます。

Python
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("overwrite")
.save()
)

JDBC クエリの並列処理の制御

デフォルトでは、JDBCドライバは1つのスレッドのみでソースデータベースを照会します。読み込みのパフォーマンスを向上させるには、Databricksがデータベースに対して同時に実行するクエリーの数を制御するために、いくつかのオプションを指定する必要があります。小規模なクラスターの場合、numPartitionsオプションをクラスター内のExecutorコアの数と同じに設定すると、すべてのノードがデータを並列にクエリーできるようになります。

警告

大規模なクラスターでnumPartitionsを高い値に設定すると、同時クエリーが多すぎてサービスが過負荷になる可能性があり、リモートデータベースのパフォーマンスが低下する可能性があります。これは、アプリケーションデータベースでは特に厄介です。この値を50より大きく設定する場合は注意が必要です。

注記

partitionColumnのソースデータベースで計算されたインデックスを持つ列を選択することにより、クエリーを高速化します。

次のコード例は、8つのコアを持つクラスターの並列処理を構成する方法を示しています。

Python
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
# a column that can be used that has a uniformly distributed range of values that can be used for parallelization
.option("partitionColumn", "<partition-key>")
# lowest value to pull data for with the partitionColumn
.option("lowerBound", "<min-value>")
# max value to pull data for with the partitionColumn
.option("upperBound", "<max-value>")
# number of partitions to distribute the data into. Do not set this very large (~hundreds)
.option("numPartitions", 8)
.load()
)
注記

Databricksでは、JDBCを構成するためのすべてのApache Sparkオプションがサポートされています。

JDBCを使用してデータベースに書き込む場合、Apache Sparkはメモリ内のパーティションの数を使用して並列処理を制御します。並列処理を制御するために、書き込み前にデータを再パーティション化できます。大規模なクラスターでは多数のパーティションを避けて、リモートデータベースの過負荷を回避します。次の例は、書き込み前に8つのパーティションに再パーティション化する方法を示しています。

Python
(employees_table.repartition(8)
.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
)

クエリをデータベース エンジンにプッシュダウンする

クエリー全体をデータベースにプッシュダウンして、結果だけを返すことができます。tableパラメーターは、読み取るJDBCテーブルを識別します。SQLクエリーFROM句で有効なものなら何でも使用できます。

Python
pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"

employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", pushdown_query)
.option("user", "<username>")
.option("password", "<password>")
.load()
)

クエリごとにフェッチされる行数を制御する

JDBCドライバには、リモートデータベースから一度にフェッチされるロー数を制御する、fetchSizeパラメーターがあります。

設定

結果

低すぎる

多くのラウンドトリップによる待機時間が長い(クエリーごとに返される行が少ない)

高すぎます

メモリ不足エラー(1つのクエリーで返されるデータが多すぎる)

最適値はワークロードによって異なります。考慮事項は次のとおりです。

  • クエリーによって返される列の数はいくつか?
  • どのようなデータ型が返されるか?
  • 各列の文字列はどのくらいの期間返されるか?

システムのデフォルト値は非常に小さく、チューニングの恩恵を受ける可能性があります。例:オラクルのデフォルトのfetchSizeは10です。100に増やすと、実行する必要があるクエリーの総数が10分の1に減ります。JDBCの結果はネットワークトラフィックであるため、非常に大きな数は避けますが、多くのデータセットでは最適な値が数千になる可能性があります。

次の例のように、fetchSizeオプションを使用します。

Python
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.option("fetchSize", "100")
.load()
)