メインコンテンツまでスキップ

Spark データソース

Spark データソース API を使用すると、Databricks から直接、外部データベースのデータを読み書きできます。Sparkエンジンの完全な柔軟性が必要な場合、ソースでネイティブクエリを実行したい場合、または外部システムへの書き込みアクセスが必要な場合にのみ使用してください。一般的に、Databricks は、自動 Spark または SQL クエリプッシュダウンによるガバナンスされた読み取り専用のアクセスを推奨しています。「クエリフェデレーションとは」を参照してください。

SparkデータソースAPIには、接続性、クエリ実行、およびスキーマ検出の点で特定の動作があります。

  • 主要なワークロードとそれに続くすべてのSpark変換は、Databricks Spark クラスター上で実行されます。
  • queryオプションを使用すると、指定されたSQLステートメントは外部データソースで完全に実行されます。Spark は、クエリ文字列に対して変換プッシュダウンを行わずに結果を取得します。
  • 接続には、Databricksに同梱されているコネクタ、ユーザーが提供するJDBCドライバ、またはPySparkカスタムデータソースのいずれかが必要です。
  • Spark は、外部データベーステーブルからスキーマを自動的に読み取り、その型を Spark SQL 型にマップします。

バンドルされたコネクタを使用する

Databricks Runtime には、一般的なデータソース用の最適化されたコネクタが含まれています。完全なリストについては、「サポートされているバンドル コネクタ」を参照してください。

バンドルコネクタは、完全なJDBC URL文字列の代わりに、「host」と「port」を個別のオプションとして使用します。

パススルー クエリーを使用してデータを読み込む

queryオプションを使用すると、データが Spark に到達する前に、ソース データベースでフィルタリングおよび結合ロジックが実行されるようになります。ガバナンスされた読み取りアクセス、自動クエリプッシュダウン、およびビューを介したUnity Catalogの権限委譲が必要な場合は、代わりにリモートクエリをご検討ください。

Python
df = (spark.read
.format("sqlserver")
.option("host", "sql-server-instance.database.windows.net")
.option("user", dbutils.secrets.get(scope="<scope>", key="<user>"))
.option("password", dbutils.secrets.get(scope="<scope>", key="<password>"))
.option("database", "<database-name>")
.option("query", "SELECT id, name FROM users WHERE active = 1")
.load())

データの書き込み

データの書き込み方法を制御するために、.mode()を使用して書き込みモードを指定します。既存のテーブルに行を追加するにはappendを使用するか、その内容をoverwriteで置き換えます。

Python
(df.write
.format("sqlserver")
.mode("overwrite")
.option("host", "sql-server-instance.database.windows.net")
.option("user", dbutils.secrets.get(scope="<scope>", key="<user>"))
.option("password", dbutils.secrets.get(scope="<scope>", key="<password>"))
.option("database", "<database-name>")
.option("dbtable", "<table-name>")
.save())

JDBC UC接続を使用する

ソース固有のコネクタがバンドルされていない場合、または特定の JDBC ドライバーバージョンを使用したい場合は、JDBC Unity Catalog 接続を使用してください。これにより、資格情報の管理を一元化し、独自のJDBCドライバーを使用できます。

JDBC Unity Catalog接続には、バンドルされたコネクタまたは生のJDBCドライバーを直接使用する場合よりも、いくつかの利点があります。JDBC Unity Catalog接続を使用すると、以下を実行できます:

  • JDBCに対応する任意のデータベースで、独自のJDBCドライバーJARをご用意ください。
  • 接続を一度作成し、サーバレス、標準、および専用クラスター全体で再利用できます。
  • Unity Catalog接続オブジェクトを使用して、データソースへの統制されたアクセスを利用します。
  • クエリーするユーザーから接続資格情報を非表示にする。
  • Spark データソース API を介して、外部データベースの読み書きを行います。

JDBC Unity Catalog 接続を使用するには、Spark オプションで databricks.connection を指定します。

Python
df = (spark.read
.format("jdbc")
.option("databricks.connection", "<connection-name>")
.option("query", "SELECT * FROM external_table")
.load())

セットアップ手順については、「JDBC接続」を参照してください。

専用クラスターではカスタムコネクタを使用する

専用の(クラシック)クラスターでは、Databricks Runtime にバンドルされていないサードパーティ製の Spark データソース コネクタまたは JDBC ドライバーをインストールできます。

このアプローチは次の場合に用います:

  • MongoDB、Cassandra、Couchbase、またはElasticsearchなどのシステムには、サードパーティのSparkコネクタが必要です。
  • ランタイムにバンドルされていない特定のドライバーバージョンが必要です。
  • Unity Catalog 接続を設定せずに、クラスターに直接 JDBC ドライバーをインストールします。

コネクタ/ドライバーをインストールします

コンピュート > お使いのクラスター > ライブラリ > 新規インストール から、クラスターにライブラリをインストールします。JAR をダウンロードまたはアップロードせずに、Maven 座標を直接使用できます。ライブラリを有効にするには、クラスターを再起動してください。

データの読み取り

コネクタがインストールされたら、コネクタのフォーマット名と必要な接続オプションを使用してデータを読み込みます。

Python
df = (spark.read
.format("mongodb")
.option("connection.uri", "mongodb://<hostname>:27017")
.option("database", "<database-name>")
.option("collection", "<collection-name>")
.load())

データの書き込み

同じ形式名と接続オプションを使用して、データをソースに書き戻してください。

Python
(df.write
.format("mongodb")
.mode("overwrite")
.option("connection.uri", "mongodb://<hostname>:27017")
.option("database", "<database-name>")
.option("collection", "<collection-name>")
.save())

考慮事項

専用クラスターでカスタムコネクターを使用する際は、以下の点に注意してください。

  • ドライバまたはコネクタは、それがインストールされているクラスタでのみ利用可能です。
  • Databricks SQL、サーバレス、または標準アクセスモードのクラスターでは、カスタムのサードパーティSpark JARはサポートされていません。これらのコンピュートタイプには、バンドルされているコネクタまたはJDBC Unity Catalog接続を使用してください。

PySpark カスタムデータソース

Python DataSource API を使用すると、カスタムデータコネクタをすべてPythonで、JARやJVMベースのライブラリなしで構築できます。REST API、SaaSアプリケーション、またはJDBCインターフェースを持たないシステムに接続する必要がある場合、あるいはプログラムで合成データを生成したい場合に、これを使用します。API はバッチとストリーミングの両方の読み取りと書き込みをサポートします。

注記

PySpark カスタムデータソースには Databricks Runtime 15.4 LTS 以上が必要です。

セットアップ、例、APIリファレンスについては、PySpark カスタムデータソースを参照してください。

統合戦略を比較

以下の表では、ユースケースに最適なアプローチを選択するのに役立つよう、Spark データソース API とレイクハウスフェデレーション、LakeFlow Connect を比較しています。

機能

Spark データソース API

レイクハウスフェデレーション

Lakeflowコネクト

主なユースケース

複雑なETL、カスタムSparkロジック、パススルークエリ

アドホッククエリ、BI レポート

大規模な自動化された取り込み

データ移動

Spark メモリに読み込み済み(一時的)

Spark メモリに読み込み済み(一時的)

Delta Lake にコピー済み(永続化済み)

クエリーの実行

ネイティブのqueryオプションを使用した手動プッシュダウン

Spark および SQL フィルター、結合、集計の自動プッシュダウン

該当なし(テーブル全体の複製)

ガバナンス

Unity Catalog接続(JDBC)またはシークレットスコープ

Unity Catalog(フェデレーションカタログ)

Unity Catalog(マネージド パイプライン)

どのようなタスクにベストなのか

Sparkの最大限の柔軟性を必要とする上級ユーザー

ガバナンスを維持しつつ、データ移動を最小化

本番運用 CDC および取り込みパイプライン

対応するバンドルコネクタ

以下のデータソースは Databricks Runtime にバンドルされており、Spark を介して直接呼び出すことができます。専用クラスターと標準クラスターで読み取りと書き込みがサポートされています。

注記

サーバレスコンピュートへの書き込みは、PostgreSQL、SQL Server、MySQL、Snowflake、およびRedshift向けに公開プレビュー中です。サポートされているコネクタオプションについては、サーバレス書き込みオプションを参照してください。

データソース

spark.format() name

PostgreSQL

"postgresql"

SQL Server

"sqlserver"

MySQL と MariaDB

"mysql"

Snowflake

"snowflake"

Amazon Redshift

"redshift"

Google BigQuery

"bigquery"

Azure Synapse

"SQLDW"

HTTP

"http"

制限事項:

Databricks で Spark データソース API を使用する場合、次の制限が適用されます。

  • バンドルされたデータソースの Spark オプションは、 querydbtable 、および少数のコネクタ固有のオプションに限定されます。
  • カスタムのサードパーティ Spark JAR は、専用のクラスターにのみインストールできます。サーバレスまたは標準クラスターには、バンドルされたコネクタまたはJDBC Unity Catalog接続を使用してください。
  • PySpark カスタムデータソースには Databricks Runtime 15.4 LTS 以上が必要です。