メインコンテンツまでスキップ

Azure Synapse Analytics でデータのクエリを実行する

Azure Synapse コネクタを使用して Databricks からAzure Synapseにアクセスできます。 コネクタは、Azure Synapse の COPY ステートメントを使用して、Databricks クラスターと Azure Synapse インスタンス間で大量のデータを効率的に転送し、一時的なステージングに Azure Data Lake Storage Gen2 ストレージ アカウントを使用します。

備考

実験段階

この記事で説明する構成は 試験段階です。 試験的な機能は現状のまま提供され、 Databricks を通じて顧客のテクニカル サポートを通じてサポートされることはありません。 クエリ フェデレーションを完全にサポートするには、代わりに レイクハウスフェデレーションを使用して、 Databricks ユーザーが Unity Catalog 構文ツールとデータガバナンス ツールを利用できるようにする必要があります。

Azure Synapse Analyticsは、超並列処理(MPP)を活用して、ペタバイト規模のデータ間で複雑なクエリを迅速に実行するクラウドベースのエンタープライズデータウェアハウスです。

important

このコネクタは、Synapse 専用プール インスタンスでのみ使用するためのものであり、他の Synapse コンポーネントと互換性はありません。

注記

COPY は、Azure Data Lake Storage Gen2 インスタンスでのみ使用できます。 Polybase の操作の詳細については、「 Databricks と Azure Synapse と PolyBase の接続 (レガシ)」を参照してください。

Synapse の構文例

Synapse のクエリは、Scala、Python、SQL、R で行うことができます。次のコード例では、ストレージ アカウント キーを使用し、ストレージ資格情報を Databricks から Synapse に転送します。

注記

Azure portal によって提供される接続文字列を使用すると、Spark ドライバーと JDBC 接続を介して Azure Synapse インスタンス間で送信されるすべてのデータに対して Secure Sockets Layer (SSL) 暗号化が有効になります。 SSL 暗号化が有効になっていることを確認するには、接続文字列で encrypt=true を検索します。

important

Unity Catalogで定義された外部ロケーションは、tempDirロケーションとしてサポートされていません。

Databricks では、利用可能な最も安全な認証フローを使用することをお勧めします。 この例で説明する認証フローには、他のフローには存在しないリスクが伴います。 このフローは、マネージド ID などの他のより安全なフローが実行可能でない場合にのみ使用してください。

Scala

// Set up the storage account access key in the notebook session conf.
spark.conf.set(
"fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
"<your-storage-account-access-key>")

// Get some data from an Azure Synapse table. The following example applies to Databricks Runtime 11.3 LTS and above.
val df: DataFrame = spark.read
.format("sqldw")
.option("host", "hostname")
.option("port", "port") /* Optional - will use default port 1433 if not specified. */
.option("user", "username")
.option("password", "password")
.option("database", "database-name")
.option("dbtable", "schema-name.table-name") /* If schemaName not provided, default to "dbo". */
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
.option("forwardSparkAzureStorageCredentials", "true")
.load()

// Get some data from an Azure Synapse table. The following example applies to Databricks Runtime 10.4 LTS and below.
val df: DataFrame = spark.read
.format("com.databricks.spark.sqldw")
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
.option("forwardSparkAzureStorageCredentials", "true")
.option("dbTable", "<your-table-name>")
.load()

// Load data from an Azure Synapse query.
val df: DataFrame = spark.read
.format("com.databricks.spark.sqldw")
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
.option("forwardSparkAzureStorageCredentials", "true")
.option("query", "select x, count(*) as cnt from table group by x")
.load()

// Apply some transformations to the data, then use the
// Data Source API to write the data back to another table in Azure Synapse.

df.write
.format("com.databricks.spark.sqldw")
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
.option("forwardSparkAzureStorageCredentials", "true")
.option("dbTable", "<your-table-name>")
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
.save()

Databricks と Synapse 間の認証はどのように機能しますか?

Azure Synapse コネクタでは、次の 3 種類のネットワーク接続が使用されます。

  • Spark ドライバーから Azure Synapse への接続
  • Spark クラスター to Azure storage アカウント
  • Azure Synapse から Azure ストレージ アカウントへの接続

Azure ストレージへのアクセスの構成

Databricks と Synapse はどちらも、一時的なデータ ストレージに使用する Azure ストレージ アカウントへの特権アクセスが必要です。

Azure Synapse では、ストレージ アカウントへのアクセスに SAS を使用することはサポートされていません。 両方のサービスのアクセスを設定するには、次のいずれかを実行します。

必要な Azure Synapse のアクセス許可

バックグラウンドで COPY を使用するため、Azure Synapse コネクタでは、JDBC 接続ユーザーに、接続された Azure Synapse インスタンスで次のコマンドを実行するアクセス許可が必要です。

宛先テーブルが Azure Synapse に存在しない場合は、上記のコマンドに加えて、次のコマンドを実行するアクセス許可が必要です。

次の表は、 COPYによる書き込みに必要な権限をまとめたものです。

パーミッション (既存のテーブルへの挿入)

パーミッション (新しいテーブルへの挿入)

データベースの一括操作の管理

INSERT

データベースの一括操作の管理

INSERT

CREATE TABLE

ALTER ON SCHEMA :: dbo

DatabricksからSynapseへの接続をOAuth 2.0でサービスプリンシパルで構成する

Azure Synapse Analyticsへの認証は、基盤となるストレージアカウントにアクセスできるサービスプリンシパルを使用して行うことができます。サービスプリンシパル credentials を使用して Azure storage アカウントにアクセスする方法の詳細については、「Azure Data Lake Storage Gen2 Storage と Blob Storage に接続する」を参照してください。接続構成 Databricks Synapse コネクタ オプションのリファレンスenableServicePrincipalAuth オプションを true に設定して、コネクタがサービスプリンシパルで認証できるようにする必要があります。

必要に応じて、 Azure Synapse Analytics 接続に別のサービスプリンシパルを使用できます。 次の例では、ストレージアカウントにサービスプリンシパル credentials を、 Synapseにオプションのサービスプリンシパル credentials を設定します。

ini
; Defining the Service Principal credentials for the Azure storage account
fs.azure.account.auth.type OAuth
fs.azure.account.oauth.provider.type org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider
fs.azure.account.oauth2.client.id <application-id>
fs.azure.account.oauth2.client.secret <service-credential>
fs.azure.account.oauth2.client.endpoint https://login.microsoftonline.com/<directory-id>/oauth2/token

; Defining a separate set of service principal credentials for Azure Synapse Analytics (If not defined, the connector will use the Azure storage account credentials)
spark.databricks.sqldw.jdbc.service.principal.client.id <application-id>
spark.databricks.sqldw.jdbc.service.principal.client.secret <service-credential>

バッチ書き込みでサポートされている保存モード

Azure Synapse コネクタは、ErrorIfExistsIgnoreAppend、および Overwrite の保存モードをサポートし、デフォルト モードは ErrorIfExistsです。Apache Sparkでサポートされている保存モードの詳細については、保存モードに関するSpark SQL資料を参照してください。

Databricks Synapse コネクタ オプションのリファレンス

Spark SQL で提供される OPTIONS は、次の設定をサポートしています。

パラメーター

必須

デフォルト

dbTable

はい ( query が指定されていない場合のみ)

デフォルトなし

Azure Synapse で作成または読み取りを行うテーブル。 このパラメーターは、データを Azure Synapse に保存し直すときに必要です。

また、 {SCHEMA NAME}.{TABLE NAME} を使用して、特定のスキーマのテーブルにアクセスすることもできます。 スキーマ名を指定しない場合は、JDBC ユーザーに関連付けられたデフォルトのスキーマが使用されます。

以前にサポートされていた dbtable バリアントは非推奨となり、将来のリリースでは無視されます。 代わりに「キャメルケース」の名前を使用してください。

query

はい ( dbTable が指定されていない場合のみ)

デフォルトなし

Azure Synapse で読み取るクエリ。

クエリで参照されるテーブルの場合、 {SCHEMA NAME}.{TABLE NAME} を使用して特定のスキーマのテーブルにアクセスすることもできます。 スキーマ名を指定しない場合は、JDBC ユーザーに関連付けられたデフォルトのスキーマが使用されます。

user

いいえ

デフォルトなし

Azure Synapse ユーザー名。 passwordオプションと組み合わせて使用する必要があります。ユーザーとパスワードが URL で渡されていない場合にのみ使用できます。 両方を渡すとエラーになります。

password

いいえ

デフォルトなし

Azure Synapse のパスワード。 userオプションと組み合わせて使用する必要があります。ユーザーとパスワードが URL で渡されていない場合にのみ使用できます。 両方を渡すとエラーになります。

url

あり

デフォルトなし

サブプロトコルとして sqlserver が設定された JDBC URL。 Azure portal によって提供される接続文字列を使用することをお勧めします。 JDBC 接続の SSL 暗号化が有効になるため、 encrypt=true を設定することを強くお勧めします。 userpasswordが別々に設定されている場合は、URLに含める必要はありません。

jdbcDriver

いいえ

JDBC URL のサブプロトコルによって決定されます

使用する JDBC ドライバーのクラス名。 このクラスはクラスパス上に存在する必要があります。 ほとんどの場合、適切なドライバ・クラス名はJDBC URLのサブプロトコルによって自動的に決定されるため、このオプションを指定する必要はありません。

以前にサポートされていた jdbc_driver バリアントは非推奨となり、将来のリリースでは無視されます。 代わりに「キャメルケース」の名前を使用してください。

tempDir

あり

デフォルトなし

abfss URI。Azure Synapse 専用の Blob Storage コンテナーを使用することをお勧めします。

以前にサポートされていた tempdir バリアントは非推奨となり、将来のリリースでは無視されます。 代わりに「キャメルケース」の名前を使用してください。

Unity Catalogで定義された外部ロケーションtempDirロケーションとして使用することはできません。

tempCompression

いいえ

SNAPPY

Spark と Azure Synapse の両方で一時的にエンコード/デコードするために使用される圧縮アルゴリズム。 現在サポートされている値は、 UNCOMPRESSEDSNAPPYGZIPです。

forwardSparkAzureStorageCredentials

いいえ

False

trueの場合、ライブラリは、Spark が Blob ストレージ コンテナーへの接続に使用しているストレージ アカウント アクセス キーの資格情報を自動的に検出し、それらの資格情報を JDBC 経由で Azure Synapse に転送します。これらの認証情報は、JDBC クエリの一部として送信されます。 したがって、このオプションを使用する場合は、JDBC 接続の SSL 暗号化を有効にすることを強くお勧めします。

ストレージ認証を設定するときは、 useAzureMSIforwardSparkAzureStorageCredentials の 1 つだけを trueに設定する必要があります。 または、enableServicePrincipalAuthtrue に設定し、 JDBC 認証とストレージ認証の両方にサービスプリンシパルを使用することもできます。 forwardSparkAzureStorageCredentials オプションでは、マネージドサービス ID またはサービスプリンシパルを使用したストレージへの認証はサポートされていません。ストレージ アカウントのアクセス キーのみがサポートされています。

以前にサポートされていた forward_spark_azure_storage_credentials バリアントは非推奨となり、将来のリリースでは無視されます。 代わりに「キャメルケース」の名前を使用してください。

useAzureMSI

いいえ

False

trueにすると、ライブラリは作成するデータベース スコープの資格情報に対して IDENTITY = 'Managed Service Identity' を指定し、 SECRET は指定しません。

ストレージ認証を設定するときは、 useAzureMSIforwardSparkAzureStorageCredentials の 1 つだけを trueに設定する必要があります。 または、enableServicePrincipalAuthtrue に設定し、 JDBC 認証とストレージ認証の両方にサービスプリンシパルを使用することもできます。

enableServicePrincipalAuth

いいえ

False

true場合、ライブラリは提供されたサービスプリンシパルAzure クレデンシャルを使用して ストレージ アカウントに接続し、Azure Synapse Analytics 経由でJDBC します。

forward_spark_azure_storage_credentials または useAzureMSItrueに設定されている場合、ストレージ認証ではそのオプションがサービスプリンシパルよりも優先されます。

tableOptions

いいえ

CLUSTERED COLUMNSTORE INDEX, DISTRIBUTION = ROUND_ROBIN

Azure Synapse テーブル セットを作成するときに テーブル オプション を指定するために使用される文字列 dbTable. この文字列は、Azure Synapse に対して発行される CREATE TABLE SQL ステートメントの WITH 句に文字通り渡されます。

以前にサポートされていた table_options バリアントは非推奨となり、将来のリリースでは無視されます。 代わりに「キャメルケース」の名前を使用してください。

preActions

いいえ

デフォルトなし(空の文字列)

Azure Synapse インスタンスにデータを書き込む前に Azure Synapse で実行される SQL コマンドの ; 区切りの一覧。 これらの SQL コマンドは、Azure Synapse によって受け入れられる有効なコマンドである必要があります。

これらのコマンドのいずれかが失敗した場合、そのコマンドはエラーとして扱われ、書き込み操作は実行されません。

postActions

いいえ

デフォルトなし(空の文字列)

コネクタが Azure Synapse インスタンスにデータを正常に書き込んだ後に Azure Synapse で実行される SQL コマンドの ; 分離されたリスト。 これらの SQL コマンドは、Azure Synapse によって受け入れられる有効なコマンドである必要があります。

これらのコマンドのいずれかが失敗した場合、エラーとして扱われ、データが Azure Synapse インスタンスに正常に書き込まれた後に例外が発生します。

maxStrLength

いいえ

256

StringType in Spark は in の [NVARCHAR(maxStrLength) タイプ] にマップされます Azure Synapse。 maxStrLength を使用して、Azure Synapse で名前がdbTableを持つテーブル内のすべての NVARCHAR(maxStrLength) 型の列の文字列の長さを設定できます。

以前にサポートされていた maxstrlength バリアントは非推奨となり、将来のリリースでは無視されます。 代わりに「キャメルケース」の名前を使用してください。

applicationName

いいえ

Databricks-User-Query

各クエリの接続のタグ。 指定しない場合、または値が空の文字列の場合、タグのデフォルト値が JDBC URL に追加されます。 デフォルト値は、 Azure DB モニタリング ツールがクエリに対して偽の SQL インジェクション アラートを発生させるのを防ぎます。

maxbinlength

いいえ

デフォルトなし

BinaryType列の列の長さを制御します。このパラメーターは VARBINARY(maxbinlength)と訳されます。

identityInsert

いいえ

False

trueに設定すると、IDENTITY_INSERT モードが有効になり、DataFrame で指定された値が Azure Synapse テーブルの ID 列に挿入されます。

IDENTITY カラムへの値の明示的な挿入を参照してください。

externalDataSource

いいえ

デフォルトなし

Azure Synapseからデータを読み取るための事前プロビジョニング external データソース外部データソースは PolyBase でのみ使用でき、コネクタはデータを読み込むためにスコープ付き資格情報と外部データソースを作成する必要がないため、CONTROL アクセス許可の要件がなくなります。

たとえば、外部データソースを使用する場合の使用方法と必要なアクセス許可の一覧については、「 外部データソース オプションを使用した PolyBase の必要な Azure Synapse アクセス許可」を参照してください。

maxErrors

いいえ

0

読み込み操作がキャンセルされる前に、読み取りおよび書き込み中に拒否できる行の最大数。 拒否された行は無視されます。 たとえば、10 件中 2 件のレコードにエラーがある場合、処理されるレコードは 8 件のみです。

CREATE 外部テーブルのREJECT_VALUEドキュメントCOPY の MAXERRORS ドキュメントを参照してください。

inferTimestampNTZType

いいえ

False

trueの場合、Azure Synapse TIMESTAMP 型の値は、読み取り中に (タイム ゾーンなしのタイムスタンプ) TimestampNTZType として解釈されます。それ以外の場合、基になる Azure Synapse テーブルの種類に関係なく、すべてのタイムスタンプは TimestampType として解釈されます。

注記
  • tableOptionspreActionspostActionsmaxStrLength は、Databricks から Azure Synapse の新しいテーブルにデータを書き込む場合にのみ関連します。
  • すべてのデータソースオプション名では大文字と小文字が区別されませんが、わかりやすくするために「キャメルケース」で指定することをお勧めします。

Azure Synapse へのクエリ プッシュダウン

Azure Synapse コネクタは、一連の最適化ルールを実装します をクリックして、次の演算子を Azure Synapse にプッシュします。

  • Filter
  • Project
  • Limit

Project 演算子と Filter 演算子は、次の式をサポートしています。

  • ほとんどのブール論理演算子
  • 比較
  • 基本的な算術演算
  • 数値キャストと文字列キャスト

Limit演算子の場合、プッシュダウンは順序が指定されていない場合にのみサポートされます。例えば:

SELECT TOP(10) * FROM tableですが、 SELECT TOP(10) * FROM table ORDER BY colは除くことができます。

注記

Azure Synapse コネクタは、文字列、日付、またはタイムスタンプを操作する式をプッシュダウンしません。

Azure Synapse コネクタでビルドされたクエリー プッシュダウンは、デフォルトで有効になります。 無効にするには、 spark.databricks.sqldw.pushdownfalseに設定します。

Temporary データマネジメント

Azure Synapse コネクタは、Azure ストレージ コンテナーに作成した一時ファイルを削除し ません 。 Databricks では、ユーザーが指定した tempDir 場所にある一時ファイルを定期的に削除することをお勧めします。

データのクリーンアップを容易にするために、Azure Synapse コネクタはデータ ファイルを tempDirの直下に格納するのではなく、 <tempDir>/<yyyy-MM-dd>/<HH-mm-ss-SSS>/<randomUUID>/という形式のサブディレクトリを作成します。 特定のしきい値 (2 日など) より古いサブディレクトリを再帰的に削除するために、定期的なジョブを (Databricks ジョブ機能などを使用して) 設定することができます。これは、そのしきい値より長く実行される Spark ジョブは存在しないことを前提としています。

より簡単な方法は、コンテナ全体を定期的に削除し、同じ名前で新しいコンテナを作成することです。 これには、Azure Synapse コネクタによって生成される一時データ専用のコンテナを使用し、コネクタに関連するクエリが実行されていないことを保証できる時間枠を見つける必要があります。

一時オブジェクト管理

Azure Synapse コネクタは、Databricks クラスターと Azure Synapse インスタンス間のデータ転送を自動化します。Azure Synapse テーブルからデータを読み取ったり、クエリを実行したり、Azure Synapse テーブルにデータを書き込んだりするために、Azure Synapse コネクタは、 DATABASE SCOPED CREDENTIALEXTERNAL DATA SOURCEEXTERNAL FILE FORMATEXTERNAL TABLE などの一時オブジェクトをバックグラウンドで作成します。 これらのオブジェクトは、対応する Spark ジョブの期間中のみ存在し、自動的に削除されます。

クラスターが Azure Synapse コネクタを使用してクエリを実行している場合、 Spark ドライバー プロセスがクラッシュまたは強制的に再起動された場合、またはクラスターが強制的に終了または再起動された場合、一時オブジェクトは削除されない可能性があります。 これらのオブジェクトの識別と手動削除を容易にするために、Azure Synapse コネクタは、Azure Synapse インスタンスで作成されたすべての中間一時オブジェクトの名前に " tmp_databricks_<yyyy_MM_dd_HH_mm_ss_SSS>_<randomUUID>_<internalObject>" という形式のタグをプレフィックスとして付けます。

リークしたオブジェクトを定期的に探すには、次のようなクエリを使用します。

  • SELECT * FROM sys.database_scoped_credentials WHERE name LIKE 'tmp_databricks_%'
  • SELECT * FROM sys.external_data_sources WHERE name LIKE 'tmp_databricks_%'
  • SELECT * FROM sys.external_file_formats WHERE name LIKE 'tmp_databricks_%'
  • SELECT * FROM sys.external_tables WHERE name LIKE 'tmp_databricks_%'