Azure Synapse Analytics でデータのクエリを実行する
従来のクエリ フェデレーションのドキュメントは廃止されており、更新されない可能性があります。このコンテンツに記載されている製品、サービス、またはテクノロジは、Databricks によって公式に承認またはテストされたものではありません。レイクハウスフェデレーションとはを参照してください。
Azure SynapseDatabricksAzure SynapseCOPY
Azure SynapseDatabricksコネクタを使用して から にアクセスできる コネクタでは、Azure Synapse の ステートメントを使用して、Azure データレイク Storage ストレージ アカウントを使用して クラスタリングと インスタンス間で大量のデータを効率的に転送できます。一時的なステージングには使用できません。
実験段階
この記事で説明する構成は 試験段階です。試験的な機能は現状のまま提供され、 Databricks を通じて顧客のテクニカル サポートを通じてサポートされることはありません。 クエリ フェデレーションを完全にサポートするには、代わりに レイクハウスフェデレーションを使用して、 Databricks ユーザーが Unity Catalog 構文ツールとデータガバナンス ツールを利用できるようにする必要があります。
Azure Synapse Analyticsは、超並列処理(MPP)を活用して、ペタバイト規模のデータ間で複雑なクエリを迅速に実行するクラウドベースのエンタープライズデータウェアハウスです。
このコネクタは、Synapse 専用プール インスタンスでのみ使用するためのものであり、他の Synapse コンポーネントと互換性はありません。
COPY
は Azure データレイク Storage インスタンスでのみ使用できます。 Polybase の操作の詳細については、「 Databricks と Azure Synapse と PolyBase の接続 (レガシ)」を参照してください。
Synapse の構文例
Synapse のクエリは、Scala、Python、SQL、R で行うことができます。次のコード例では、ストレージ アカウント キーを使用し、ストレージ資格情報を Databricks から Synapse に転送します。
Azure portal によって提供される接続文字列を使用すると、Spark ドライバーと JDBC 接続を介して Azure Synapse インスタンス間で送信されるすべてのデータに対して Secure Sockets Layer (SSL) 暗号化が有効になります。SSL 暗号化が有効になっていることを確認するには、接続文字列で encrypt=true
を検索します。
Unity Catalogで定義された外部ロケーションは、tempDir
ロケーションとしてサポートされていません。
Databricks では、利用可能な最も安全な認証フローを使用することをお勧めします。この例で説明する認証フローには、他のフローには存在しないリスクが伴います。このフローは、マネージド ID などの他のより安全なフローが実行可能でない場合にのみ使用してください。
- Scala
- Python
- SQL
- R
// Set up the storage account access key in the notebook session conf.
spark.conf.set(
"fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
"<your-storage-account-access-key>")
// Get some data from an Azure Synapse table. The following example applies to Databricks Runtime 11.3 LTS and above.
val df: DataFrame = spark.read
.format("sqldw")
.option("host", "hostname")
.option("port", "port") /* Optional - will use default port 1433 if not specified. */
.option("user", "username")
.option("password", "password")
.option("database", "database-name")
.option("dbtable", "schema-name.table-name") /* If schemaName not provided, default to "dbo". */
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
.option("forwardSparkAzureStorageCredentials", "true")
.load()
// Get some data from an Azure Synapse table. The following example applies to Databricks Runtime 10.4 LTS and below.
val df: DataFrame = spark.read
.format("com.databricks.spark.sqldw")
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
.option("forwardSparkAzureStorageCredentials", "true")
.option("dbTable", "<your-table-name>")
.load()
// Load data from an Azure Synapse query.
val df: DataFrame = spark.read
.format("com.databricks.spark.sqldw")
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
.option("forwardSparkAzureStorageCredentials", "true")
.option("query", "select x, count(*) as cnt from table group by x")
.load()
// Apply some transformations to the data, then use the
// Data Source API to write the data back to another table in Azure Synapse.
df.write
.format("com.databricks.spark.sqldw")
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
.option("forwardSparkAzureStorageCredentials", "true")
.option("dbTable", "<your-table-name>")
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
.save()
# Set up the storage account access key in the notebook session conf.
spark.conf.set(
"fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
"<your-storage-account-access-key>")
# Get some data from an Azure Synapse table. The following example applies to Databricks Runtime 11.3 LTS and above.
df = spark.read
.format("sqldw")
.option("host", "hostname")
.option("port", "port") # Optional - will use default port 1433 if not specified.
.option("user", "username")
.option("password", "password")
.option("database", "database-name")
.option("dbtable", "schema-name.table-name") # If schemaName not provided, default to "dbo".
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
.option("forwardSparkAzureStorageCredentials", "true")
.load()
# Get some data from an Azure Synapse table. The following example applies to Databricks Runtime 10.4 LTS and below.
df = spark.read \
.format("com.databricks.spark.sqldw") \
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>") \
.option("forwardSparkAzureStorageCredentials", "true") \
.option("dbTable", "<your-table-name>") \
.load()
# Load data from an Azure Synapse query.
df = spark.read \
.format("com.databricks.spark.sqldw") \
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>") \
.option("forwardSparkAzureStorageCredentials", "true") \
.option("query", "select x, count(*) as cnt from table group by x") \
.load()
# Apply some transformations to the data, then use the
# Data Source API to write the data back to another table in Azure Synapse.
df.write \
.format("com.databricks.spark.sqldw") \
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
.option("forwardSparkAzureStorageCredentials", "true") \
.option("dbTable", "<your-table-name>") \
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>") \
.save()
-- Set up the storage account access key in the notebook session conf.
SET fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net=<your-storage-account-access-key>;
-- Read data using SQL. The following example applies to Databricks Runtime 11.3 LTS and above.
CREATE TABLE example_table_in_spark_read
USING sqldw
OPTIONS (
host '<hostname>',
port '<port>' /* Optional - will use default port 1433 if not specified. */
user '<username>',
password '<password>',
database '<database-name>'
dbtable '<schema-name>.<table-name>', /* If schemaName not provided, default to "dbo". */
forwardSparkAzureStorageCredentials 'true',
tempDir 'abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>'
);
-- Read data using SQL. The following example applies to Databricks Runtime 10.4 LTS and below.
CREATE TABLE example_table_in_spark_read
USING com.databricks.spark.sqldw
OPTIONS (
url 'jdbc:sqlserver://<the-rest-of-the-connection-string>',
forwardSparkAzureStorageCredentials 'true',
dbtable '<your-table-name>',
tempDir 'abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>'
);
-- Write data using SQL.
-- Create a new table, throwing an error if a table with the same name already exists:
CREATE TABLE example_table_in_spark_write
USING com.databricks.spark.sqldw
OPTIONS (
url 'jdbc:sqlserver://<the-rest-of-the-connection-string>',
forwardSparkAzureStorageCredentials 'true',
dbTable '<your-table-name>',
tempDir 'abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>'
)
AS SELECT * FROM table_to_save_in_spark;
# Load SparkR
library(SparkR)
# Set up the storage account access key in the notebook session conf.
conf <- sparkR.callJMethod(sparkR.session(), "conf")
sparkR.callJMethod(conf, "set", "fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net", "<your-storage-account-access-key>")
# Get some data from an Azure Synapse table.
df <- read.df(
source = "com.databricks.spark.sqldw",
url = "jdbc:sqlserver://<the-rest-of-the-connection-string>",
forward_spark_azure_storage_credentials = "true",
dbTable = "<your-table-name>",
tempDir = "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
# Load data from an Azure Synapse query.
df <- read.df(
source = "com.databricks.spark.sqldw",
url = "jdbc:sqlserver://<the-rest-of-the-connection-string>",
forward_spark_azure_storage_credentials = "true",
query = "select x, count(*) as cnt from table group by x",
tempDir = "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
# Apply some transformations to the data, then use the
# Data Source API to write the data back to another table in Azure Synapse.
write.df(
df,
source = "com.databricks.spark.sqldw",
url = "jdbc:sqlserver://<the-rest-of-the-connection-string>",
forward_spark_azure_storage_credentials = "true",
dbTable = "<your-table-name>",
tempDir = "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
Databricks と Synapse 間の認証はどのように機能しますか?
Azure Synapse コネクタでは、次の 3 種類のネットワーク接続が使用されます。
- Spark ドライバーから Azure Synapse への接続
- Spark クラスター to Azure storage アカウント
- Azure Synapse から Azure ストレージ アカウントへの接続
Azure ストレージへのアクセスの構成
Databricks と Synapse はどちらも、一時的なデータ ストレージに使用する Azure ストレージ アカウントへの特権アクセスが必要です。
Azure Synapse では、ストレージ アカウントへのアクセスに SAS を使用することはサポートされていません。両方のサービスのアクセスを設定するには、次のいずれかを実行します。
- ストレージ アカウントのアカウント キーとシークレットを使用し、
forwardSparkAzureStorageCredentials
をtrue
に設定します。「Spark のプロパティを設定する」を参照して、Azure Storage にアクセスするための Azure 資格情報を構成します。 - Azure データレイク Storage を OAuth 2.0 認証で使用し、
enableServicePrincipalAuth
をtrue
に設定します。「2.0 を使用した から への接続とサービスプリンシパルの設定DatabricksSynapse OAuth」を参照してください。 - Azure Synapseインスタンスをマネージドサービス ID を持つように設定し、
useAzureMSI
をtrue
に設定します。
必要な Azure Synapse のアクセス許可
バックグラウンドで COPY
を使用するため、Azure Synapse コネクタでは、JDBC 接続ユーザーに、接続された Azure Synapse インスタンスで次のコマンドを実行するアクセス許可が必要です。
宛先テーブルが Azure Synapse に存在しない場合は、上記のコマンドに加えて、次のコマンドを実行するアクセス許可が必要です。
次の表は、 COPY
による書き込みに必要な権限をまとめたものです。
パーミッション (既存のテーブルへの挿入) | パーミッション (新しいテーブルへの挿入) |
---|---|
データベースの一括操作の管理 INSERT | データベースの一括操作の管理 INSERT CREATE TABLE ALTER ON SCHEMA :: dbo |
DatabricksからSynapseへの接続をOAuth 2.0でサービスプリンシパルで構成する
Azure Synapse Analyticsへの認証は、基盤となるストレージアカウントにアクセスできるサービスプリンシパルを使用して行うことができます。サービスプリンシパル credentials を使用して Azure storage アカウントにアクセスする方法の詳細については、「Azure データレイク Storage と Blob Storage に接続する」を参照してください。接続構成Databricks Synapseコネクタ オプションのリファレンスで enableServicePrincipalAuth
オプションを true
に設定して、コネクタがサービスプリンシパルで認証できるようにする必要があります。
必要に応じて、 Azure Synapse Analytics 接続に別のサービスプリンシパルを使用できます。 次の例では、ストレージアカウントにサービスプリンシパル credentials を、 Synapseにオプションのサービスプリンシパル credentials を設定します。
- ini
- Scala
- Python
- R
; Defining the Service Principal credentials for the Azure storage account
fs.azure.account.auth.type OAuth
fs.azure.account.oauth.provider.type org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider
fs.azure.account.oauth2.client.id <application-id>
fs.azure.account.oauth2.client.secret <service-credential>
fs.azure.account.oauth2.client.endpoint https://login.microsoftonline.com/<directory-id>/oauth2/token
; Defining a separate set of service principal credentials for Azure Synapse Analytics (If not defined, the connector will use the Azure storage account credentials)
spark.databricks.sqldw.jdbc.service.principal.client.id <application-id>
spark.databricks.sqldw.jdbc.service.principal.client.secret <service-credential>
// Defining the Service Principal credentials for the Azure storage account
spark.conf.set("fs.azure.account.auth.type", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id", "<application-id>")
spark.conf.set("fs.azure.account.oauth2.client.secret", "<service-credential>")
spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<directory-id>/oauth2/token")
// Defining a separate set of service principal credentials for Azure Synapse Analytics (If not defined, the connector will use the Azure storage account credentials)
spark.conf.set("spark.databricks.sqldw.jdbc.service.principal.client.id", "<application-id>")
spark.conf.set("spark.databricks.sqldw.jdbc.service.principal.client.secret", "<service-credential>")
# Defining the service principal credentials for the Azure storage account
spark.conf.set("fs.azure.account.auth.type", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id", "<application-id>")
spark.conf.set("fs.azure.account.oauth2.client.secret", "<service-credential>")
spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<directory-id>/oauth2/token")
# Defining a separate set of service principal credentials for Azure Synapse Analytics (If not defined, the connector will use the Azure storage account credentials)
spark.conf.set("spark.databricks.sqldw.jdbc.service.principal.client.id", "<application-id>")
spark.conf.set("spark.databricks.sqldw.jdbc.service.principal.client.secret", "<service-credential>")
# Load SparkR
library(SparkR)
conf <- sparkR.callJMethod(sparkR.session(), "conf")
# Defining the service principal credentials for the Azure storage account
sparkR.callJMethod(conf, "set", "fs.azure.account.auth.type", "OAuth")
sparkR.callJMethod(conf, "set", "fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
sparkR.callJMethod(conf, "set", "fs.azure.account.oauth2.client.id", "<application-id>")
sparkR.callJMethod(conf, "set", "fs.azure.account.oauth2.client.secret", "<service-credential>")
sparkR.callJMethod(conf, "set", "fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<directory-id>/oauth2/token")
# Defining a separate set of service principal credentials for Azure Synapse Analytics (If not defined, the connector will use the Azure storage account credentials)
sparkR.callJMethod(conf, "set", "spark.databricks.sqldw.jdbc.service.principal.client.id", "<application-id>")
sparkR.callJMethod(conf, "set", "spark.databricks.sqldw.jdbc.service.principal.client.secret", "<service-credential>")
バッチ書き込みでサポートされている保存モード
Azure Synapse コネクタは、ErrorIfExists
、Ignore
、Append
、および Overwrite
の保存モードをサポートし、デフォルト モードは ErrorIfExists
です。Apache Sparkでサポートされている保存モードの詳細については、保存モードに関するSpark SQL資料を参照してください。
Databricks Synapse コネクタ オプションのリファレンス
Spark SQL で提供される OPTIONS
は、次の設定をサポートしています。
パラメーター | 必須 | デフォルト | 注 |
---|---|---|---|
| はい ( | デフォルトなし | Azure Synapse で作成または読み取りを行うテーブル。このパラメーターは、データを Azure Synapse に保存し直すときに必要です。 また、 以前にサポートされていた |
| はい ( | デフォルトなし | Azure Synapse で読み取るクエリ。 クエリで参照されるテーブルの場合、 |
| いいえ | デフォルトなし | Azure Synapse ユーザー名。 |
| いいえ | デフォルトなし | Azure Synapse のパスワード。 |
| あり | デフォルトなし | サブプロトコルとして |
| いいえ | JDBC URL のサブプロトコルによって決定されます | 使用する JDBC ドライバーのクラス名。このクラスはクラスパス上に存在する必要があります。ほとんどの場合、適切なドライバ・クラス名はJDBC URLのサブプロトコルによって自動的に決定されるため、このオプションを指定する必要はありません。 以前にサポートされていた |
| あり | デフォルトなし |
以前にサポートされていた Unity Catalogで定義された外部ロケーションを |
| いいえ |
| Spark と Azure Synapse の両方で一時的にエンコード/デコードするために使用される圧縮アルゴリズム。現在サポートされている値は、 |
| いいえ | False |
ストレージ認証を設定するときは、 以前にサポートされていた |
| いいえ | False | を ストレージ認証を設定するときは、 |
| いいえ | False |
|
| いいえ |
| Azure Synapse テーブル セットを作成するときに テーブル オプション を指定するために使用される文字列 以前にサポートされていた |
| いいえ | デフォルトなし(空の文字列) | Azure Synapse インスタンスにデータを書き込む前に Azure Synapse で実行される SQL コマンドの これらのコマンドのいずれかが失敗した場合、そのコマンドはエラーとして扱われ、書き込み操作は実行されません。 |
| いいえ | デフォルトなし(空の文字列) | コネクタが Azure Synapse インスタンスにデータを正常に書き込んだ後に Azure Synapse で実行される SQL コマンドの これらのコマンドのいずれかが失敗した場合、エラーとして扱われ、データが Azure Synapse インスタンスに正常に書き込まれた後に例外が発生します。 |
| いいえ | 256 |
以前にサポートされていた |
| いいえ |
| 各クエリの接続のタグ。指定しない場合、または値が空の文字列の場合、タグのデフォルト値が JDBC URL に追加されます。デフォルト値は、 Azure DB モニタリング ツールがクエリに対して偽の SQL インジェクション アラートを発生させるのを防ぎます。 |
| いいえ | デフォルトなし |
|
| いいえ | False |
IDENTITY カラムへの値の明示的な挿入を参照してください。 |
| いいえ | デフォルトなし | Azure Synapseからデータを読み取るための事前プロビジョニング external データソース外部データソースは PolyBase でのみ使用でき、コネクタはデータを読み込むためにスコープ付き資格情報と外部データソースを作成する必要がないため、CONTROL アクセス許可の要件がなくなります。 たとえば、外部データソースを使用する場合の使用方法と必要なアクセス許可の一覧については、「 外部データソース オプションを使用した PolyBase の必要な Azure Synapse アクセス許可」を参照してください。 |
| いいえ | 0 | 読み込み操作がキャンセルされる前に、読み取りおよび書き込み中に拒否できる行の最大数。拒否された行は無視されます。たとえば、10 件中 2 件のレコードにエラーがある場合、処理されるレコードは 8 件のみです。 CREATE 外部テーブルのREJECT_VALUEドキュメントと COPY の MAXERRORS ドキュメントを参照してください。 |
| いいえ | False |
|
tableOptions
、preActions
、postActions
、maxStrLength
は、Databricks から Azure Synapse の新しいテーブルにデータを書き込む場合にのみ関連します。- すべてのデータソースオプション名では大文字と小文字が区別されませんが、わかりやすくするために「キャメルケース」で指定することをお勧めします。
Azure Synapse へのクエリ プッシュダウン
Azure Synapse コネクタは、一連の最適化ルールを実装します をクリックして、次の演算子を Azure Synapse にプッシュします。
Filter
Project
Limit
Project
演算子と Filter
演算子は、次の式をサポートしています。
- ほとんどのブール論理演算子
- 比較
- 基本的な算術演算
- 数値キャストと文字列キャスト
Limit
演算子の場合、プッシュダウンは順序が指定されていない場合にのみサポートされます。例えば:
SELECT TOP(10) * FROM table
ですが、 SELECT TOP(10) * FROM table ORDER BY col
は除くことができます。
Azure Synapse コネクタは、文字列、日付、またはタイムスタンプを操作する式をプッシュダウンしません。
Azure Synapse コネクタでビルドされたクエリー プッシュダウンは、デフォルトで有効になります。 無効にするには、 spark.databricks.sqldw.pushdown
を false
に設定します。
Temporary データマネジメント
Azure Synapse コネクタは、Azure ストレージ コンテナーに作成した一時ファイルを削除し ません 。Databricks では、ユーザーが指定した tempDir
場所にある一時ファイルを定期的に削除することをお勧めします。
データのクリーンアップを容易にするために、Azure Synapse コネクタはデータ ファイルを tempDir
の直下に格納するのではなく、 <tempDir>/<yyyy-MM-dd>/<HH-mm-ss-SSS>/<randomUUID>/
という形式のサブディレクトリを作成します。特定のしきい値 (2 日など) より古いサブディレクトリを再帰的に削除するために、定期的なジョブを (Databricks ジョブ機能などを使用して) 設定することができます。これは、そのしきい値より長く実行される Spark ジョブは存在しないことを前提としています。
より簡単な方法は、コンテナ全体を定期的に削除し、同じ名前で新しいコンテナを作成することです。これには、Azure Synapse コネクタによって生成される一時データ専用のコンテナを使用し、コネクタに関連するクエリが実行されていないことを保証できる時間枠を見つける必要があります。
一時オブジェクト管理
Azure Synapse コネクタは、Databricks クラスターと Azure Synapse インスタンス間のデータ転送を自動化します。Azure Synapse テーブルからデータを読み取ったり、クエリを実行したり、Azure Synapse テーブルにデータを書き込んだりするために、Azure Synapse コネクタは、 DATABASE SCOPED CREDENTIAL
、 EXTERNAL DATA SOURCE
、 EXTERNAL FILE FORMAT
、 EXTERNAL TABLE
などの一時オブジェクトをバックグラウンドで作成します。 これらのオブジェクトは、対応する Spark ジョブの期間中のみ存在し、自動的に削除されます。
クラスターが Azure Synapse コネクタを使用してクエリを実行している場合、 Spark ドライバー プロセスがクラッシュまたは強制的に再起動された場合、またはクラスターが強制的に終了または再起動された場合、一時オブジェクトは削除されない可能性があります。 これらのオブジェクトの識別と手動削除を容易にするために、Azure Synapse コネクタは、Azure Synapse インスタンスで作成されたすべての中間一時オブジェクトの名前に " tmp_databricks_<yyyy_MM_dd_HH_mm_ss_SSS>_<randomUUID>_<internalObject>
" という形式のタグをプレフィックスとして付けます。
リークしたオブジェクトを定期的に探すには、次のようなクエリを使用します。
SELECT * FROM sys.database_scoped_credentials WHERE name LIKE 'tmp_databricks_%'
SELECT * FROM sys.external_data_sources WHERE name LIKE 'tmp_databricks_%'
SELECT * FROM sys.external_file_formats WHERE name LIKE 'tmp_databricks_%'
SELECT * FROM sys.external_tables WHERE name LIKE 'tmp_databricks_%'