Azure Synapse Analyticsのデータへのクエリー
Azure Synapse の COPY
ステートメントを使用して、一時的なステージング用の Azure Data Lake Storage Gen2 ストレージ アカウントを使用して、Databricks クラスターと Azure Synapse インスタンスの間で大量のデータを効率的に転送する Azure Synapse コネクタを使用して Databricks から Azure Synapse にアクセスできます。
実験段階
この記事で説明する構成は 実験的です。 実験的な機能は現状のまま提供され、Databricks による顧客テクニカル サポートを通じてサポートされることはありません。 完全なクエリ フェデレーション サポートを取得するには、代わりにレイクハウス フェデレーションを使用する必要があります。これにより、 DatabricksユーザーはUnity Catalog構文とデータガバナンス ツールを活用できるようになります。
Azure Synapse Analytics は、超並列処理 (MPP) を利用して、ペタバイト規模のデータに対して複雑なクエリーをすばやく実行するクラウドベースのエンタープライズ データ ウェアハウスです。
重要
このコネクタは、Synapse 専用プール インスタンスでのみ使用し、他の Synapse コンポーネントとは互換性がありません。
注:
COPY
は、Azure Data Lake Storage Gen2 インスタンスでのみ使用できます。 Polybase の操作の詳細については、「 Databricks と Azure Synapse を PolyBase に接続する (レガシー)」を参照してください。
Synapseの構文例
Synapse は、Scala、Python、SQL、R でクエリーできます。次のコード例では、ストレージ アカウント キーを使用して、ストレージ資格情報を Databricks から Synapse に転送します。
注:
JDBC 接続を介して Spark ドライバーと Azure Synapse インスタンスの間で送信されるすべてのデータに対して Secure Sockets Layer (SSL) 暗号化を有効にする、Azure portal によって提供される接続文字列を使用します。 SSL 暗号化が有効になっていることを確認するには、接続文字列で encrypt=true
を検索します。
重要
Unity Catalogで定義されている外部ロケーションは、tempDir
ロケーションとしてサポートされていません。
Databricks では、利用可能な最も安全な認証フローを使用することをお勧めします。 この例で説明する認証フローには、他のフローには存在しないリスクが伴います。 このフローは、マネージド ID などの他のより安全なフローが実行可能でない場合にのみ使用してください。
// Set up the storage account access key in the notebook session conf.
spark.conf.set(
"fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
"<your-storage-account-access-key>")
// Get some data from an Azure Synapse table. The following example applies to Databricks Runtime 11.3 LTS and above.
val df: DataFrame = spark.read
.format("sqldw")
.option("host", "hostname")
.option("port", "port") /* Optional - will use default port 1433 if not specified. */
.option("user", "username")
.option("password", "password")
.option("database", "database-name")
.option("dbtable", "schema-name.table-name") /* If schemaName not provided, default to "dbo". */
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
.option("forwardSparkAzureStorageCredentials", "true")
.load()
// Get some data from an Azure Synapse table. The following example applies to Databricks Runtime 10.4 LTS and below.
val df: DataFrame = spark.read
.format("com.databricks.spark.sqldw")
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
.option("forwardSparkAzureStorageCredentials", "true")
.option("dbTable", "<your-table-name>")
.load()
// Load data from an Azure Synapse query.
val df: DataFrame = spark.read
.format("com.databricks.spark.sqldw")
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
.option("forwardSparkAzureStorageCredentials", "true")
.option("query", "select x, count(*) as cnt from table group by x")
.load()
// Apply some transformations to the data, then use the
// Data Source API to write the data back to another table in Azure Synapse.
df.write
.format("com.databricks.spark.sqldw")
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
.option("forwardSparkAzureStorageCredentials", "true")
.option("dbTable", "<your-table-name>")
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
.save()
# Set up the storage account access key in the notebook session conf.
spark.conf.set(
"fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
"<your-storage-account-access-key>")
# Get some data from an Azure Synapse table. The following example applies to Databricks Runtime 11.3 LTS and above.
df = spark.read
.format("sqldw")
.option("host", "hostname")
.option("port", "port") # Optional - will use default port 1433 if not specified.
.option("user", "username")
.option("password", "password")
.option("database", "database-name")
.option("dbtable", "schema-name.table-name") # If schemaName not provided, default to "dbo".
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
.option("forwardSparkAzureStorageCredentials", "true")
.load()
# Get some data from an Azure Synapse table. The following example applies to Databricks Runtime 10.4 LTS and below.
df = spark.read \
.format("com.databricks.spark.sqldw") \
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>") \
.option("forwardSparkAzureStorageCredentials", "true") \
.option("dbTable", "<your-table-name>") \
.load()
# Load data from an Azure Synapse query.
df = spark.read \
.format("com.databricks.spark.sqldw") \
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>") \
.option("forwardSparkAzureStorageCredentials", "true") \
.option("query", "select x, count(*) as cnt from table group by x") \
.load()
# Apply some transformations to the data, then use the
# Data Source API to write the data back to another table in Azure Synapse.
df.write \
.format("com.databricks.spark.sqldw") \
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
.option("forwardSparkAzureStorageCredentials", "true") \
.option("dbTable", "<your-table-name>") \
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>") \
.save()
-- Set up the storage account access key in the notebook session conf.
SET fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net=<your-storage-account-access-key>;
-- Read data using SQL. The following example applies to Databricks Runtime 11.3 LTS and above.
CREATE TABLE example_table_in_spark_read
USING sqldw
OPTIONS (
host '<hostname>',
port '<port>' /* Optional - will use default port 1433 if not specified. */
user '<username>',
password '<password>',
database '<database-name>'
dbtable '<schema-name>.<table-name>', /* If schemaName not provided, default to "dbo". */
forwardSparkAzureStorageCredentials 'true',
tempDir 'abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>'
);
-- Read data using SQL. The following example applies to Databricks Runtime 10.4 LTS and below.
CREATE TABLE example_table_in_spark_read
USING com.databricks.spark.sqldw
OPTIONS (
url 'jdbc:sqlserver://<the-rest-of-the-connection-string>',
forwardSparkAzureStorageCredentials 'true',
dbtable '<your-table-name>',
tempDir 'abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>'
);
-- Write data using SQL.
-- Create a new table, throwing an error if a table with the same name already exists:
CREATE TABLE example_table_in_spark_write
USING com.databricks.spark.sqldw
OPTIONS (
url 'jdbc:sqlserver://<the-rest-of-the-connection-string>',
forwardSparkAzureStorageCredentials 'true',
dbTable '<your-table-name>',
tempDir 'abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>'
)
AS SELECT * FROM table_to_save_in_spark;
# Load SparkR
library(SparkR)
# Set up the storage account access key in the notebook session conf.
conf <- sparkR.callJMethod(sparkR.session(), "conf")
sparkR.callJMethod(conf, "set", "fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net", "<your-storage-account-access-key>")
# Get some data from an Azure Synapse table.
df <- read.df(
source = "com.databricks.spark.sqldw",
url = "jdbc:sqlserver://<the-rest-of-the-connection-string>",
forward_spark_azure_storage_credentials = "true",
dbTable = "<your-table-name>",
tempDir = "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
# Load data from an Azure Synapse query.
df <- read.df(
source = "com.databricks.spark.sqldw",
url = "jdbc:sqlserver://<the-rest-of-the-connection-string>",
forward_spark_azure_storage_credentials = "true",
query = "select x, count(*) as cnt from table group by x",
tempDir = "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
# Apply some transformations to the data, then use the
# Data Source API to write the data back to another table in Azure Synapse.
write.df(
df,
source = "com.databricks.spark.sqldw",
url = "jdbc:sqlserver://<the-rest-of-the-connection-string>",
forward_spark_azure_storage_credentials = "true",
dbTable = "<your-table-name>",
tempDir = "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
DatabricksとSynapse間の認証はどのように機能するか
Azure Synapse コネクタでは、次の 3 種類のネットワーク接続が使用されます。
Spark ドライバーから Azure Synapse へ
Spark クラスターから Azure ストレージ アカウントへ
Azure Synapse から Azure ストレージ アカウントへ
Azure Storageへのアクセスの構成
Databricks と Synapse の両方に、一時的なデータ ストレージに使用する Azure ストレージ アカウントへの特権アクセスが必要です。
Azure Synapse では、ストレージ アカウントへのアクセスに SAS を使用することはサポートされていません。 両方のサービスのアクセスを構成するには、次のいずれかの操作を行います。
ストレージ アカウントのアカウント キーとシークレットを使用し、
forwardSparkAzureStorageCredentials
をtrue
に設定します。 「 Spark プロパティの設定」を参照して、Azure Storage にアクセスするための Azure 資格情報を構成します。OAuth 2.0 認証で Azure Data Lake Storage Gen2 を使用し、
enableServicePrincipalAuth
をtrue
に設定します。 「サービスプリンシパルを使用した OAuth 2.0 を使用した Databricks から Synapse への接続の構成」を参照してください。マネージドサービス ID を持つように Azure Synapse インスタンスを構成し、
useAzureMSI
をtrue
に設定します。
必要な Azure Synapse アクセス許可
Azure Synapse コネクタはバックグラウンドで COPY
を使用するため、JDBC 接続ユーザーには、接続された Azure Synapse インスタンスで次のコマンドを実行するためのアクセス許可が必要です。
変換先テーブルが Azure Synapse に存在しない場合は、上記のコマンドに加えて、次のコマンドを実行するためのアクセス許可が必要です。
次の表は、 COPY
を使用した書き込みに必要なアクセス許可をまとめたものです。
アクセス許可 (既存のテーブルへの挿入) |
アクセス許可 (新しいテーブルへの挿入) |
---|---|
データベースの一括操作を管理する INSERT |
データベースの一括操作を管理する INSERT CREATE TABLE ALTER ON SCHEMA :: dbo |
サービスプリンシパルを使用して OAuth 2.0 を使用して Databricks から Synapse への接続を構成する
Azure Synapse Analytics に対する認証は、基になるストレージ アカウントへのアクセス権を持つサービス プリンシパルを使用して行うことができます。 サービスプリンシパルの資格情報を使用して Azure ストレージ アカウントにアクセスする方法の詳細については、「 Azure Data Lake Storage Gen2 と Blob Storage に接続する」を参照してください。 コネクタがサービスプリンシパルで認証できるようにするには、接続構成の Databricks Synapse コネクタ オプション リファレンスで enableServicePrincipalAuth
オプションを true
に設定する必要があります。
必要に応じて、Azure Synapse Analytics 接続に別のサービス プリンシパルを使用できます。 次の例では、ストレージ アカウントのサービス プリンシパル資格情報と、Synapse のオプションのサービス プリンシパル資格情報を構成します。
; Defining the Service Principal credentials for the Azure storage account
fs.azure.account.auth.type OAuth
fs.azure.account.oauth.provider.type org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider
fs.azure.account.oauth2.client.id <application-id>
fs.azure.account.oauth2.client.secret <service-credential>
fs.azure.account.oauth2.client.endpoint https://login.microsoftonline.com/<directory-id>/oauth2/token
; Defining a separate set of service principal credentials for Azure Synapse Analytics (If not defined, the connector will use the Azure storage account credentials)
spark.databricks.sqldw.jdbc.service.principal.client.id <application-id>
spark.databricks.sqldw.jdbc.service.principal.client.secret <service-credential>
// Defining the Service Principal credentials for the Azure storage account
spark.conf.set("fs.azure.account.auth.type", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id", "<application-id>")
spark.conf.set("fs.azure.account.oauth2.client.secret", "<service-credential>")
spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<directory-id>/oauth2/token")
// Defining a separate set of service principal credentials for Azure Synapse Analytics (If not defined, the connector will use the Azure storage account credentials)
spark.conf.set("spark.databricks.sqldw.jdbc.service.principal.client.id", "<application-id>")
spark.conf.set("spark.databricks.sqldw.jdbc.service.principal.client.secret", "<service-credential>")
# Defining the service principal credentials for the Azure storage account
spark.conf.set("fs.azure.account.auth.type", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id", "<application-id>")
spark.conf.set("fs.azure.account.oauth2.client.secret", "<service-credential>")
spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<directory-id>/oauth2/token")
# Defining a separate set of service principal credentials for Azure Synapse Analytics (If not defined, the connector will use the Azure storage account credentials)
spark.conf.set("spark.databricks.sqldw.jdbc.service.principal.client.id", "<application-id>")
spark.conf.set("spark.databricks.sqldw.jdbc.service.principal.client.secret", "<service-credential>")
# Load SparkR
library(SparkR)
conf <- sparkR.callJMethod(sparkR.session(), "conf")
# Defining the service principal credentials for the Azure storage account
sparkR.callJMethod(conf, "set", "fs.azure.account.auth.type", "OAuth")
sparkR.callJMethod(conf, "set", "fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
sparkR.callJMethod(conf, "set", "fs.azure.account.oauth2.client.id", "<application-id>")
sparkR.callJMethod(conf, "set", "fs.azure.account.oauth2.client.secret", "<service-credential>")
sparkR.callJMethod(conf, "set", "fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<directory-id>/oauth2/token")
# Defining a separate set of service principal credentials for Azure Synapse Analytics (If not defined, the connector will use the Azure storage account credentials)
sparkR.callJMethod(conf, "set", "spark.databricks.sqldw.jdbc.service.principal.client.id", "<application-id>")
sparkR.callJMethod(conf, "set", "spark.databricks.sqldw.jdbc.service.principal.client.secret", "<service-credential>")
バッチ書き込みでサポートされる保存モード
Azure Synapse コネクタでは、 ErrorIfExists
、 Ignore
、 Append
、 Overwrite
の保存モードがサポートされており、デフォルト モードは ErrorIfExists
です。 Apache Spark でサポートされている保存モードの詳細については、 保存モードに関する Spark SQL のドキュメントを参照してください。
Databricks Synapse コネクタ オプションのリファレンス
Spark SQL で提供される OPTIONS
では、次の設定がサポートされています。
パラメーター |
*必須 |
デフォルト |
注 |
---|---|---|---|
|
はい ( |
デフォルトなし |
Azure Synapse で作成または読み取りを行うテーブル。 このパラメーターは、データを Azure Synapse に保存し直すときに必要です。 また、 以前にサポートされていた |
|
はい ( |
デフォルトなし |
Azure Synapse で読み取るクエリー。 クエリーで参照されるテーブルの場合、 |
|
なし |
デフォルトなし |
Azure Synapse ユーザー名。 |
|
なし |
デフォルトなし |
Azure Synapse のパスワード。 |
|
あり |
デフォルトなし |
サブプロトコルとして |
|
なし |
JDBC URL のサブプロトコルによって決定される |
使用するJDBCドライバのクラス名。 このクラスはクラスパス上に存在する必要があります。 ほとんどの場合、適切なドライバ・クラス名はJDBC URLのサブプロトコルによって自動的に決定されるため、このオプションを指定する必要はありません。 以前にサポートされていた |
|
あり |
デフォルトなし |
以前にサポートされていた Unity Catalogで定義された外部ロケーションを |
|
なし |
|
Spark と Azure Synapse の両方で一時的なエンコード/デコードに使用される圧縮アルゴリズム。 現在サポートされている値は、 |
|
なし |
偽 |
ストレージ認証を設定する場合は、 以前にサポートされていた |
|
なし |
偽 |
ストレージ認証を設定する場合は、 |
|
なし |
偽 |
|
|
なし |
|
以前にサポートされていた |
|
なし |
デフォルトなし(空の文字列) |
Azure Synapse インスタンスにデータを書き込む前に Azure Synapse で実行される SQL コマンドの これらのコマンドのいずれかが失敗すると、エラーとして扱われ、書き込み操作は実行されません。 |
|
なし |
デフォルトなし(空の文字列) |
コネクタが Azure Synapse インスタンスにデータを正常に書き込んだ後に Azure Synapse で実行される SQL コマンドの これらのコマンドのいずれかが失敗した場合、エラーとして扱われ、データが Azure Synapse インスタンスに正常に書き込まれた後に例外が発生します。 |
|
なし |
256 |
以前にサポートされていた |
|
なし |
|
各クエリーの接続のタグ。 指定しない場合、または値が空の文字列の場合、タグのデフォルト値がJDBC URLに追加されます。 既定値は、Azure DB モニタリング ツールがクエリーに対して偽の SQL インジェクション アラートを生成するのを防ぎます。 |
|
なし |
デフォルトなし |
|
|
なし |
偽 |
IDENTITY 列への値の明示的な挿入を参照してください。 |
|
なし |
デフォルトなし |
Azure Synapse からデータを読み取るために事前にプロビジョニングされた外部データ ソース。 外部データ ソースは PolyBase でのみ使用でき、コネクタはデータを読み込むためにスコープ付き資格情報と外部データソースを作成する必要がないため、CONTROL アクセス許可の要件がなくなります。 使用方法と、外部データソースを使用するときに必要なアクセス許可の一覧については、「 外部データ ソース オプションを使用した PolyBase に必要な Azure Synapse アクセス許可」を参照してください。 |
|
なし |
0 |
読み込み操作が取り消される前に、読み取りおよび書き込み中に拒否できる行の最大数。 リジェクトされた行は無視されます。 たとえば、10 個のレコードのうち 2 個にエラーがある場合、8 個のレコードのみが処理されます。 CREATE EXTERNAL TABLE のREJECT_VALUEドキュメントと COPY の MAXERRORS のドキュメントを参照してください。 |
|
なし |
偽 |
|
注:
tableOptions
、preActions
、postActions
、maxStrLength
は、Databricks から Azure Synapse の新しいテーブルにデータを書き込む場合にのみ関連します。すべてのデータソースオプション名では大文字と小文字が区別されませんが、わかりやすくするために「キャメルケース」で指定することをお勧めします。
Azure Synapseへのクエリープッシュダウン
Azure Synapse コネクタは、次の演算子を Azure Synapse にプッシュダウンするための一連の最適化ルールを実装します。
Filter
Project
Limit
Project
演算子と Filter
演算子は、次の式をサポートしています。
ほとんどの Boolean 論理演算子
比較
基本的な算術演算
数値キャストと文字列キャスト
Limit
演算子の場合、プッシュダウンは順序が指定されていない場合にのみサポートされます。例えば:
SELECT TOP(10) * FROM table
ですが、 SELECT TOP(10) * FROM table ORDER BY col
ではありません。
注:
Azure Synapse コネクタでは、文字列、日付、またはタイムスタンプを操作する式はプッシュダウンされません。
Azure Synapse コネクタでビルドされたクエリー プッシュダウンは、デフォルトで有効になります。 無効にするには、 spark.databricks.sqldw.pushdown
を false
に設定します。
Temporary データマネジメント
Azure Synapse コネクタ では、 Azure ストレージ コンテナーに作成された一時ファイルは削除されません。 Databricks では、ユーザーが指定した tempDir
の場所にある一時ファイルを定期的に削除することをお勧めします。
データのクリーンアップを容易にするために、Azure Synapse コネクタはデータ ファイルを tempDir
の直下に格納するのではなく、 <tempDir>/<yyyy-MM-dd>/<HH-mm-ss-SSS>/<randomUUID>/
という形式のサブディレクトリを作成します。 特定のしきい値 (2 日など) より古いサブディレクトリを再帰的に削除するために、定期的なジョブを (Databricks ジョブ機能などを使用して) 設定することができます。これは、そのしきい値より長く実行される Spark ジョブは存在しないことを前提としています。
より簡単な方法は、コンテナ全体を定期的に削除し、同じ名前で新しいコンテナを作成することです。 そのためには、Azure Synapse コネクターによって生成される一時データに専用のコンテナーを使用し、コネクターが関与するクエリーが実行されていないことを保証できる時間枠を見つける必要があります。
一時オブジェクトの管理
Azure Synapse コネクタは、Databricks クラスターと Azure Synapse インスタンス間のデータ転送を自動化します。 Azure Synapse テーブルまたはクエリーからデータを読み取ったり、Azure Synapse テーブルにデータを書き込んだりするために、Azure Synapse コネクタは、 DATABASE SCOPED CREDENTIAL
、 EXTERNAL DATA SOURCE
、 EXTERNAL FILE FORMAT
、 EXTERNAL TABLE
などの一時オブジェクトをバックグラウンドで作成します。 これらのオブジェクトは、対応する Spark ジョブの期間中のみ有効であり、自動的に削除されます。
クラスターが Azure Synapse コネクタを使用してクエリーを実行しているときに、Spark ドライバー プロセスがクラッシュしたり、強制的に再起動されたりした場合、またはクラスターが強制的に終了または再起動された場合、一時オブジェクトが削除されないことがあります。 これらのオブジェクトの識別と手動削除を容易にするために、Azure Synapse コネクタでは、Azure Synapse インスタンスで作成されたすべての中間一時オブジェクトの名前に、 tmp_databricks_<yyyy_MM_dd_HH_mm_ss_SSS>_<randomUUID>_<internalObject>
という形式のタグがプレフィックスとして付けられます。
漏洩したオブジェクトは、以下のようなクエリーを使用して定期的に探すことをお勧めします。
SELECT * FROM sys.database_scoped_credentials WHERE name LIKE 'tmp_databricks_%'
SELECT * FROM sys.external_data_sources WHERE name LIKE 'tmp_databricks_%'
SELECT * FROM sys.external_file_formats WHERE name LIKE 'tmp_databricks_%'
SELECT * FROM sys.external_tables WHERE name LIKE 'tmp_databricks_%'