Databricks と Azure Synapse を PolyBase (レガシ) で接続する
このドキュメントは廃止されており、更新されない可能性があります。 このコンテンツに記載されている製品、サービス、またはテクノロジはサポートされなくなりました。 「 Azure Synapse Analytics でのデータのクエリ」を参照してください。
への接続には、 データレイク StorageDatabricks でデフォルトCOPY
機能を使用することをお勧めします。AzureAzure Synapseこの記事には、PolyBase と BLOB ストレージに関する従来のドキュメントが含まれています。
Azure Synapse Analytics (旧SQLデータウェアハウス)は、超並列処理(MPP)を活用して、ペタバイト規模のデータ間で複雑なクエリを迅速に実行するクラウドベースのエンタープライズデータウェアハウスです。Azure をビッグデータ ソリューションの主要コンポーネントとして使用します。単純な PolyBase T-SQL クエリまたはAzureCOPY ステートメントを使用してビッグデータを にインポートし、MPP の力を利用してハイパフォーマンス アナリティクスを実行します。統合と分析を行うと、データウェアハウスは、ビジネスが知見を得るための信頼できる唯一の情報源となります。
Azure SynapseDatabricksAzure Synapseコネクタ、Apache Spark Azure BlobCOPY
Azure SynapseDatabricksストレージ を使用する のデータソース実装、および クラスターと インスタンス間で大量のデータを効率的に転送するための の PolyBase または ステートメントを使用して、Azure Synapse から にアクセスできます。
Databricks クラスターと Azure Synapse インスタンスはどちらも、共通の Blob ストレージ コンテナにアクセスして、これら 2 つのシステム間でデータを交換します。Databricks では、Apache Spark ジョブは Azure Synapse コネクタによってトリガーされ、Blob Storage コンテナーとの間でデータの読み取りと書き込みが行われます。 Azure Synapse 側では、PolyBase によって実行されるデータの読み込みおよびアンロード操作は、JDBC を介して Azure Synapse コネクタによってトリガーされます。 Databricks Runtime7.0 以降では、COPY
は デフォルト によって使用され、Azure Synapse を介してAzure Synapse コネクタによってデータをJDBC にロードします。
COPY
は、 パフォーマンスが向上する Azure Synapse Gen2 インスタンスでのみ使用できます。 データベースでまだ Gen1 インスタンスを使用している場合は、データベースを Gen2 に移行することをお勧めします。
Azure Synapse コネクタは、クエリを実行するたびに大量のデータを Blob Storage に抽出できるため、対話型クエリよりも ETL に適しています。 同じ Azure Synapse テーブルに対して複数のクエリを実行する予定の場合は、抽出したデータを Parquet などの形式で保存することをお勧めします。
必要条件
Azure Synapse データベースのマスター キー。
認証
Azure Synapse コネクタでは、次の 3 種類のネットワーク接続が使用されます。
- Spark ドライバーから Azure Synapse への接続
- SparkドライバーとエグゼキューターからAzureストレージアカウントへの接続
- Azure Synapse から Azure ストレージ アカウントへの接続
┌─────────┐
┌─────────────────────────>│ STORAGE │<────────────────────────┐
│ Storage acc key / │ ACCOUNT │ Storage acc key / │
│ Managed Service ID / └─────────┘ OAuth 2.0 / │
│ │ │
│ │ │
│ │ Storage acc key / │
│ │ OAuth 2.0 / │
│ │ │
v v ┌──────v────┐
┌──────────┐ ┌──────────┐ │┌──────────┴┐
│ Synapse │ │ Spark │ ││ Spark │
│ Analytics│<────────────────────>│ Driver │<───────────────>│ Executors │
└──────────┘ JDBC with └──────────┘ Configured └───────────┘
username & password / in Spark
次のセクションでは、各接続の認証設定オプションについて説明します。
Spark ドライバーから Azure Synapse へ
SparkAzure SynapseJDBCドライバーは、ユーザー名とパスワードを使用してOAuth を使用するか、認証用のサービス プリンシパルを使用して 2.0 を使用して に接続できます。
ユーザー名とパスワード
Azure portal によって提供される接続文字列を両方の認証の種類に使用することをお勧めします。
Spark ドライバーと Azure Synapse の間で送信されるすべてのデータに対する Secure Sockets Layer (SSL) 暗号化
インスタンスを JDBC 接続経由で取得します。 SSL 暗号化が有効になっていることを確認するには、次の項目を検索します。
接続文字列にencrypt=true
します。
Spark ドライバーが Azure Synapse に到達できるようにするには、次のことをお勧めします Azure portal を使用して Azure Synapse ワークスペースの [セキュリティ] の下にある [ネットワーク] ウィンドウで、 [ Azure サービスとリソースにこのワークスペースへのアクセスを許可する ] を [オン ] に設定します。この設定では、すべての Azure IP アドレスとすべての Azure サブネットからの通信が許可されます。 Spark ドライバーが Azure Synapse インスタンスに到達できるようにします。
OAuth 2.0 with a サービスプリンシパル
Azure Synapse Analyticsへの認証は、基盤となるストレージアカウントにアクセスできるサービスプリンシパルを使用して行うことができます。サービスプリンシパル credentials を使用して Azure storage アカウントにアクセスする方法の詳細については、「Azure データレイク Storage と Blob Storage に接続する」を参照してください。コネクタがサービスプリンシパルで認証できるようにするには、接続構成パラメーターで enableServicePrincipalAuth
オプションを true
に設定する必要があります。
必要に応じて、 Azure Synapse Analytics 接続に別のサービスプリンシパルを使用できます。 ストレージアカウントにサービスプリンシパル credentials を設定し、 Synapseにオプションのサービスプリンシパル credentials を設定する例を次に示します。
- ini
- Scala
- Python
- R
; Defining the Service Principal credentials for the Azure storage account
fs.azure.account.auth.type OAuth
fs.azure.account.oauth.provider.type org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider
fs.azure.account.oauth2.client.id <application-id>
fs.azure.account.oauth2.client.secret <service-credential>
fs.azure.account.oauth2.client.endpoint https://login.microsoftonline.com/<directory-id>/oauth2/token
; Defining a separate set of service principal credentials for Azure Synapse Analytics (If not defined, the connector will use the Azure storage account credentials)
spark.databricks.sqldw.jdbc.service.principal.client.id <application-id>
spark.databricks.sqldw.jdbc.service.principal.client.secret <service-credential>
// Defining the Service Principal credentials for the Azure storage account
spark.conf.set("fs.azure.account.auth.type", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id", "<application-id>")
spark.conf.set("fs.azure.account.oauth2.client.secret", "<service-credential>")
spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<directory-id>/oauth2/token")
// Defining a separate set of service principal credentials for Azure Synapse Analytics (If not defined, the connector will use the Azure storage account credentials)
spark.conf.set("spark.databricks.sqldw.jdbc.service.principal.client.id", "<application-id>")
spark.conf.set("spark.databricks.sqldw.jdbc.service.principal.client.secret", "<service-credential>")
# Defining the service principal credentials for the Azure storage account
spark.conf.set("fs.azure.account.auth.type", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id", "<application-id>")
spark.conf.set("fs.azure.account.oauth2.client.secret", "<service-credential>")
spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<directory-id>/oauth2/token")
# Defining a separate set of service principal credentials for Azure Synapse Analytics (If not defined, the connector will use the Azure storage account credentials)
spark.conf.set("spark.databricks.sqldw.jdbc.service.principal.client.id", "<application-id>")
spark.conf.set("spark.databricks.sqldw.jdbc.service.principal.client.secret", "<service-credential>")
# Load SparkR
library(SparkR)
conf <- sparkR.callJMethod(sparkR.session(), "conf")
# Defining the service principal credentials for the Azure storage account
sparkR.callJMethod(conf, "set", "fs.azure.account.auth.type", "OAuth")
sparkR.callJMethod(conf, "set", "fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
sparkR.callJMethod(conf, "set", "fs.azure.account.oauth2.client.id", "<application-id>")
sparkR.callJMethod(conf, "set", "fs.azure.account.oauth2.client.secret", "<service-credential>")
sparkR.callJMethod(conf, "set", "fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<directory-id>/oauth2/token")
# Defining a separate set of service principal credentials for Azure Synapse Analytics (If not defined, the connector will use the Azure storage account credentials)
sparkR.callJMethod(conf, "set", "spark.databricks.sqldw.jdbc.service.principal.client.id", "<application-id>")
sparkR.callJMethod(conf, "set", "spark.databricks.sqldw.jdbc.service.principal.client.secret", "<service-credential>")
Spark driver and エグゼキューター to Azure storage アカウント
Azure ストレージ コンテナーは、Azure Synapse からの読み取りまたはAzure Synapse への書き込み時に大量のデータを格納する仲介役として機能します。Spark は、abfss
ドライバーを使用して ADLS または Blob Storage に接続します。
次の認証オプションを使用できます。
- ストレージ アカウントのアクセス キーとシークレット
- OAuth 2.0 認証。 OAuth 2.0 とサービスプリンシパルの詳細については、「サービスプリンシパル & Microsoft Entra ID(Azure Active Directory) を使用してストレージにアクセスする」を参照してください。
次の例は、ストレージ アカウント アクセス キー アプローチを使用したこれら 2 つの方法を示しています。 同じことが OAuth 2.0 設定にも当てはまります。
ノートブック セッションの構成 (推奨)
この方法を使用すると、アカウント アクセス キーは、コマンドを実行するノートブックに関連付けられたセッション構成で設定されます。 この構成は、同じクラスターに接続されている他のノートブックには影響しません。 spark
は、ノートブックで提供される SparkSession
オブジェクトです。
spark.conf.set(
"fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
"<your-storage-account-access-key>")
グローバル Hadoop 構成
このアプローチでは、すべてのノートブックで共有される SparkContext
オブジェクトに関連付けられたグローバル Hadoop 構成が更新されます。
- Scala
- Python
sc.hadoopConfiguration.set(
"fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
"<your-storage-account-access-key>")
hadoopConfiguration
は、PySpark のすべてのバージョンで公開されているわけではありません。次のコマンドは一部の Spark 内部に依存していますが、すべての PySpark バージョンで動作するはずであり、将来壊れたり変更されたりする可能性は低いです。
sc._jsc.hadoopConfiguration().set(
"fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
"<your-storage-account-access-key>")
Azure Synapse から Azure ストレージ アカウントへ
Azure Synapse は、一時データの読み込みとアンロード中にもストレージ アカウントに接続します。
ストレージ アカウントのアカウント キーとシークレットを設定している場合は、 forwardSparkAzureStorageCredentials
を true
に設定できます。その場合は、
Azure Synapse コネクタは、ノートブック セッション構成で設定されたアカウント アクセス キーを自動的に検出します。
グローバル Hadoop 構成を作成し、一時的な Azure を作成することで、ストレージ アカウント アクセス キーを接続された Azure Synapse インスタンスに転送します
データベース スコープの資格情報。
または、 ADLS を 2.0 認証で使用する場合 OAuth Azure Synapse インスタンスがマネージドサービス ID (通常は
VNet + サービス エンドポイントのセットアップ) では、useAzureMSI
を true
に設定する必要があります。 この場合、コネクタは、データベースのスコープ付き資格情報に IDENTITY = 'Managed Service Identity'
を指定し、 SECRET
は指定しません。
ストリーミングのサポート
Azure Synapse コネクタは、効率的でスケーラブルな構造化ストリーミング書き込みサポートを提供しAzure Synapse
バッチ書き込みで一貫したユーザー エクスペリエンスを提供し、大規模なデータ転送には PolyBase または COPY
を使用します
Databricks クラスターと Azure Synapse インスタンスの間。バッチ書き込みと同様に、ストリーミングは主に設計されています
ETLの場合、レイテンシが長くなり、場合によってはリアルタイムデータの処理に適さない可能性があります。
フォールトトレランスのセマンティクス
By デフォルト,Azure Synapse ストリーミングは、 テーブルにデータを書き込むためのエンドツーエンドの exactly-once 保証を提供します。 Azure Synapse DBFSのチェックポイントの場所、Azure Synapse のチェックポイントテーブル、
また、ストリーミングがあらゆる種類の障害、再試行、クエリの再起動を処理できるようにするためのロックメカニズム。必要に応じて、次のように設定することで、 Azure Synapse ストリーミングの制限の少ない at-least-once セマンティクスを選択できます
spark.databricks.sqldw.streaming.exactlyOnce.enabled
オプションをfalse
するオプションがあり、その場合はデータが重複します
断続的な接続障害が発生したり、 Azure Synapse クエリが予期せぬ終了したりした場合に発生する可能性があります。
使用量 (バッチ)
APIScalaこのコネクタは、 、Python 、SQL 、R ノートブックのデータソース を介して使用できます。
- Scala
- Python
- SQL
- R
// Otherwise, set up the Blob storage account access key in the notebook session conf.
spark.conf.set(
"fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
"<your-storage-account-access-key>")
// Get some data from an Azure Synapse table.
val df: DataFrame = spark.read
.format("com.databricks.spark.sqldw")
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
.option("forwardSparkAzureStorageCredentials", "true")
.option("dbTable", "<your-table-name>")
.load()
// Load data from an Azure Synapse query.
val df: DataFrame = spark.read
.format("com.databricks.spark.sqldw")
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
.option("forwardSparkAzureStorageCredentials", "true")
.option("query", "select x, count(*) as cnt from table group by x")
.load()
// Apply some transformations to the data, then use the
// Data Source API to write the data back to another table in Azure Synapse.
df.write
.format("com.databricks.spark.sqldw")
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
.option("forwardSparkAzureStorageCredentials", "true")
.option("dbTable", "<your-table-name>")
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
.save()
# Otherwise, set up the Blob storage account access key in the notebook session conf.
spark.conf.set(
"fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
"<your-storage-account-access-key>")
# Get some data from an Azure Synapse table.
df = spark.read \
.format("com.databricks.spark.sqldw") \
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>") \
.option("forwardSparkAzureStorageCredentials", "true") \
.option("dbTable", "<your-table-name>") \
.load()
# Load data from an Azure Synapse query.
df = spark.read \
.format("com.databricks.spark.sqldw") \
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>") \
.option("forwardSparkAzureStorageCredentials", "true") \
.option("query", "select x, count(*) as cnt from table group by x") \
.load()
# Apply some transformations to the data, then use the
# Data Source API to write the data back to another table in Azure Synapse.
df.write \
.format("com.databricks.spark.sqldw") \
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
.option("forwardSparkAzureStorageCredentials", "true") \
.option("dbTable", "<your-table-name>") \
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>") \
.save()
-- Otherwise, set up the Blob storage account access key in the notebook session conf.
SET fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net=<your-storage-account-access-key>;
-- Read data using SQL.
CREATE TABLE example_table_in_spark_read
USING com.databricks.spark.sqldw
OPTIONS (
url 'jdbc:sqlserver://<the-rest-of-the-connection-string>',
forwardSparkAzureStorageCredentials 'true',
dbTable '<your-table-name>',
tempDir 'abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>'
);
-- Write data using SQL.
-- Create a new table, throwing an error if a table with the same name already exists:
CREATE TABLE example_table_in_spark_write
USING com.databricks.spark.sqldw
OPTIONS (
url 'jdbc:sqlserver://<the-rest-of-the-connection-string>',
forwardSparkAzureStorageCredentials 'true',
dbTable '<your-table-name>',
tempDir 'abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>'
)
AS SELECT * FROM table_to_save_in_spark;
# Load SparkR
library(SparkR)
# Otherwise, set up the Blob storage account access key in the notebook session conf.
conf <- sparkR.callJMethod(sparkR.session(), "conf")
sparkR.callJMethod(conf, "set", "fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net", "<your-storage-account-access-key>")
# Get some data from an Azure Synapse table.
df <- read.df(
source = "com.databricks.spark.sqldw",
url = "jdbc:sqlserver://<the-rest-of-the-connection-string>",
forward_spark_azure_storage_credentials = "true",
dbTable = "<your-table-name>",
tempDir = "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
# Load data from an Azure Synapse query.
df <- read.df(
source = "com.databricks.spark.sqldw",
url = "jdbc:sqlserver://<the-rest-of-the-connection-string>",
forward_spark_azure_storage_credentials = "true",
query = "select x, count(*) as cnt from table group by x",
tempDir = "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
# Apply some transformations to the data, then use the
# Data Source API to write the data back to another table in Azure Synapse.
write.df(
df,
source = "com.databricks.spark.sqldw",
url = "jdbc:sqlserver://<the-rest-of-the-connection-string>",
forward_spark_azure_storage_credentials = "true",
dbTable = "<your-table-name>",
tempDir = "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
利用状況(ストリーミング)
Scala および Python ノートブックで構造化ストリーミングを使用してデータを書き込むことができます。
- Scala
- Python
// Set up the Blob storage account access key in the notebook session conf.
spark.conf.set(
"fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
"<your-storage-account-access-key>")
// Prepare streaming source; this could be Kafka or a simple rate stream.
val df: DataFrame = spark.readStream
.format("rate")
.option("rowsPerSecond", "100000")
.option("numPartitions", "16")
.load()
// Apply some transformations to the data then use
// Structured Streaming API to continuously write the data to a table in Azure Synapse.
df.writeStream
.format("com.databricks.spark.sqldw")
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
.option("forwardSparkAzureStorageCredentials", "true")
.option("dbTable", "<your-table-name>")
.option("checkpointLocation", "/tmp_checkpoint_location")
.start()
# Set up the Blob storage account access key in the notebook session conf.
spark.conf.set(
"fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
"<your-storage-account-access-key>")
# Prepare streaming source; this could be Kafka or a simple rate stream.
df = spark.readStream \
.format("rate") \
.option("rowsPerSecond", "100000") \
.option("numPartitions", "16") \
.load()
# Apply some transformations to the data then use
# Structured Streaming API to continuously write the data to a table in Azure Synapse.
df.writeStream \
.format("com.databricks.spark.sqldw") \
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>") \
.option("forwardSparkAzureStorageCredentials", "true") \
.option("dbTable", "<your-table-name>") \
.option("checkpointLocation", "/tmp_checkpoint_location") \
.start()
構成
このセクションでは、コネクタの書き込みセマンティクス、必要なアクセス許可、およびその他の構成パラメーターを構成する方法について説明します。
このセクションの内容:
- バッチ書き込みでサポートされている保存モード
- ストリーミング書き込みでサポートされている出力モード
- セマンティクスの記述
- PolyBase に必要な Azure Synapse のアクセス許可
COPY
ステートメントに必要な Azure Synapse のアクセス許可- パラメータ
- Azure Synapse へのクエリ プッシュダウン
- Temporary データマネジメント
- 一時オブジェクト管理
- ストリーミング・チェックポイント・テーブル管理
バッチ書き込みでサポートされている保存モード
Azure Synapse コネクタは、ErrorIfExists
、Ignore
、Append
、および Overwrite
の保存モードをサポートし、デフォルト モードは ErrorIfExists
です。Apache Sparkでサポートされている保存モードの詳細については、
保存モードに関するSpark SQLドキュメントを参照してください。
ストリーミング書き込みでサポートされている出力モード
Azure Synapse コネクタは、レコードの追加と集計の Append
出力モードと Complete
出力モードをサポートしています。出力モードと互換性マトリックスの詳細については、
構造化ストリーミングガイド。
セマンティクスの記述
COPY
は、Databricks Runtime 7.0 以降で使用できます。
PolyBase に加えて、Azure Synapse コネクタでは COPY
ステートメントがサポートされています。 COPY
ステートメントは、データを Azure Synapse にロードするより便利な方法を提供します
外部テーブルを作成し、データをロードするために必要なパーミッションが少なくて済み、パフォーマンスが向上します。
データ取り込み Azure Synapse.
デフォルトでは、コネクタCOPY
は、
Azure Synapse Gen2 インスタンス、それ以外の場合は PolyBase)。 また、write セマンティクスを
次の設定:
- Scala
- Python
- SQL
- R
// Configure the write semantics for Azure Synapse connector in the notebook session conf.
spark.conf.set("spark.databricks.sqldw.writeSemantics", "<write-semantics>")
# Configure the write semantics for Azure Synapse connector in the notebook session conf.
spark.conf.set("spark.databricks.sqldw.writeSemantics", "<write-semantics>")
-- Configure the write semantics for Azure Synapse connector in the notebook session conf.
SET spark.databricks.sqldw.writeSemantics=<write-semantics>;
# Load SparkR
library(SparkR)
# Configure the write semantics for Azure Synapse connector in the notebook session conf.
conf <- sparkR.callJMethod(sparkR.session(), "conf")
sparkR.callJMethod(conf, "set", "spark.databricks.sqldw.writeSemantics", "<write-semantics>")
ここで、<write-semantics>
は PolyBase を使用するpolybase
か、COPY
ステートメントを使用するcopy
のいずれかです。
PolyBase に必要な Azure Synapse のアクセス許可
PolyBase を使用する場合、Azure Synapse コネクタでは、JDBC 接続ユーザーに次のコマンドを実行するアクセス許可が必要です 接続された Azure Synapse インスタンスで、次の操作を行います。
最初のコマンドの前提条件として、コネクタは、指定された Azure Synapse インスタンスにデータベース マスター キーが既に存在することを前提としています。 そうでない場合は、 CREATE MASTER KEY コマンドを使用してキーを作成できます。
さらに、 dbTable
または query
で参照されるテーブルを通じて Azure Synapse テーブル セットを読み取るには、JDBC ユーザーに必要な Azure Synapse テーブルにアクセスする権限が必要です。 dbTable
を使用して設定された Azure Synapse テーブルにデータを書き戻すには、JDBC ユーザーにこの Azure Synapse テーブルに書き込むアクセス許可が必要です。
次の表は、PolyBase でのすべての操作に必要なアクセス許可をまとめたものです。
オペレーション | 権限 | 外部データソースを使用する場合の権限 |
---|---|---|
バッチ書き込み | コントロール | 「バッチ書き込み」を参照してください。 |
ストリーミング書き込み | コントロール | ストリーミング書き込みを参照してください |
読み取り | コントロール |
PolyBase と外部データソース オプションの必要な Azure Synapse 権限
PolyBase は、事前プロビジョニングされた外部データソースと共に使用できます。 詳細については、パラメーターの externalDataSource
パラメーターを参照してください。
事前プロビジョニングされた外部データソースで PolyBase を使用するには、 Azure Synapse コネクタで [ JDBC 接続ユーザー] に、接続された Azure Synapse インスタンスで次のコマンドを実行するアクセス許可が必要です。
外部データソースを作成するには、まずデータベーススコープの資格情報を作成する必要があります。 次のリンクでは、サービスプリンシパルのスコープ付き資格情報と ABFS ロケーションの外部データソースを作成する方法について説明します。
外部データソースの場所は、コンテナを指している必要があります。 場所がコンテナ内のディレクトリである場合、コネクタは機能しません。
次の表は、外部データソース オプションを使用した PolyBase 書き込み操作のアクセス許可をまとめたものです。
オペレーション | パーミッション (既存のテーブルへの挿入) | パーミッション (新しいテーブルへの挿入) |
---|---|---|
バッチ書き込み | データベースの一括操作の管理 INSERT CREATE TABLE 任意のスキーマを変更する 外部データソースの変更 外部ファイル形式の変更 | データベースの一括操作の管理 INSERT CREATE TABLE 任意のスキーマを変更する 外部データソースの変更 外部ファイル形式の変更 |
ストリーミング書き込み | データベースの一括操作の管理 INSERT CREATE TABLE 任意のスキーマを変更する 外部データソースの変更 外部ファイル形式の変更 | データベースの一括操作の管理 INSERT CREATE TABLE 任意のスキーマを変更する 外部データソースの変更 外部ファイル形式の変更 |
次の表は、外部データソース オプションを使用した PolyBase 読み取り操作のアクセス許可をまとめたものです。
オペレーション | 権限 |
---|---|
読み取り | CREATE TABLE 任意のスキーマを変更する 外部データソースの変更 外部ファイル形式の変更 |
このコネクタを使用して、API Scala、Python 、SQL 、R ノートブックのデータソース を介して読み取ることができます。
- Scala
- Python
- SQL
- R
// Get some data from an Azure Synapse table.
val df: DataFrame = spark.read
.format("com.databricks.spark.sqldw")
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
.option("externalDataSource", "<your-pre-provisioned-data-source>")
.option("dbTable", "<your-table-name>")
.load()
# Get some data from an Azure Synapse table.
df = spark.read \
.format("com.databricks.spark.sqldw") \
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>") \
.option("externalDataSource", "<your-pre-provisioned-data-source>") \
.option("dbTable", "<your-table-name>") \
.load()
-- Read data using SQL.
CREATE TABLE example_table_in_spark_read
USING com.databricks.spark.sqldw
OPTIONS (
url 'jdbc:sqlserver://<the-rest-of-the-connection-string>',
forwardSparkAzureStorageCredentials 'true',
dbTable '<your-table-name>',
tempDir 'abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>',
externalDataSource '<your-pre-provisioned-data-source>'
);
# Get some data from an Azure Synapse table.
df <- read.df(
source = "com.databricks.spark.sqldw",
url = "jdbc:sqlserver://<the-rest-of-the-connection-string>",
forward_spark_azure_storage_credentials = "true",
dbTable = "<your-table-name>",
tempDir = "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>"
externalDataSource = "<your-pre-provisioned-data-source>")
COPY
ステートメントに必要な Azure Synapse のアクセス許可
COPY
ステートメントを使用する場合、Azure Synapse コネクタでは JDBC 接続ユーザーにアクセス許可が必要です
をクリックして、接続された Azure Synapse インスタンスで次のコマンドを実行します。
宛先テーブルが Azure Synapse に存在しない場合は、上記のコマンドに加えて、次のコマンドを実行するアクセス許可が必要です。
次の表は、 COPY
によるバッチ書き込みとストリーミング書き込みの権限をまとめたものです。
オペレーション | パーミッション (既存のテーブルへの挿入) | パーミッション (新しいテーブルへの挿入) |
---|---|---|
バッチ書き込み | データベースの一括操作の管理 INSERT | データベースの一括操作の管理 INSERT CREATE TABLE ALTER ON SCHEMA :: dbo |
ストリーミング書き込み | データベースの一括操作の管理 INSERT | データベースの一括操作の管理 INSERT CREATE TABLE ALTER ON SCHEMA :: dbo |
パラメーター
Spark SQL で提供されるパラメーター マップまたは OPTIONS
は、次の設定をサポートしています。
パラメーター | 必須 | デフォルト | 注 |
---|---|---|---|
| はい ( | デフォルトなし | Azure Synapse で作成または読み取りを行うテーブル。 このパラメーターは、データを Azure Synapse に保存し直すときに必要です。 また、 以前にサポートされていた |
| はい ( | デフォルトなし | Azure Synapse で読み取るクエリ。 クエリで参照されるテーブルの場合、 |
| いいえ | デフォルトなし | Azure Synapse ユーザー名。 |
| いいえ | デフォルトなし | Azure Synapse のパスワード。 |
| あり | デフォルトなし | サブプロトコルとして |
| いいえ | JDBC URL のサブプロトコルによって決定されます | 使用する JDBC ドライバーのクラス名。このクラスはクラスパス上に存在する必要があります。ほとんどの場合、適切なドライバ・クラス名はJDBC URLのサブプロトコルによって自動的に決定されるため、このオプションを指定する必要はありません。 以前にサポートされていた |
| あり | デフォルトなし |
以前にサポートされていた |
| いいえ |
| Azure Synapse に書き込むときに一時ファイルを BLOB ストアに保存する形式。 デフォルトは |
| いいえ |
| Spark と Azure Synapse の両方で一時的にエンコード/デコードするために使用される圧縮アルゴリズム。 現在サポートされている値は、 |
| いいえ | False |
Azure Synapse コネクタの現在のバージョンでは、 以前にサポートされていた |
| いいえ | False | を Azure Synapse コネクタの現在のバージョンでは、 |
| いいえ | False |
Azure Synapse コネクタの現在のバージョンでは、 |
| いいえ |
| Azure Synapse テーブル セットを作成するときに テーブル オプション を指定するために使用される文字列 以前にサポートされていた |
| いいえ | デフォルトなし(空の文字列) | Azure Synapse インスタンスにデータを書き込む前に Azure Synapse で実行される SQL コマンドの これらのコマンドのいずれかが失敗した場合、そのコマンドはエラーとして扱われ、書き込み操作は実行されません。 |
| いいえ | デフォルトなし(空の文字列) | コネクタが Azure Synapse インスタンスにデータを正常に書き込んだ後に Azure Synapse で実行される SQL コマンドの これらのコマンドのいずれかが失敗した場合、エラーとして扱われ、データが Azure Synapse インスタンスに正常に書き込まれた後に例外が発生します。 |
| いいえ | 256 |
以前にサポートされていた |
| あり | デフォルトなし | 構造化ストリーミングがメタデータとチェックポイント情報を書き込むために使用する DBFS 上の場所。 構造化ストリーミング・プログラミング・ガイドの チェックポイント処理による障害からのリカバリー を参照してください。 |
| いいえ | 0 | ストリーミングのマイクロバッチの定期的なクリーンアップのために保持する(最新の)一時ディレクトリの数を示します。 |
| いいえ |
| 各クエリの接続のタグ。 指定しない場合、または値が空の文字列の場合、タグのデフォルト値が JDBC URL に追加されます。 デフォルト値は、 Azure DB モニタリング ツールがクエリに対して偽の SQL インジェクション アラートを発生させるのを防ぎます。 |
| いいえ | デフォルトなし |
|
| いいえ | False |
IDENTITY カラムへの値の明示的な挿入を参照してください。 |
| いいえ | デフォルトなし | Azure Synapseからデータを読み取るための事前プロビジョニング external データソース外部データソースは PolyBase でのみ使用でき、コネクタはデータを読み込むためにスコープ付き資格情報と外部データソースを作成する必要がないため、CONTROL アクセス許可の要件がなくなります。 たとえば、外部データソースを使用する場合の使用方法と必要なアクセス許可の一覧については、「 外部データソース オプションを使用した PolyBase の必要な Azure Synapse アクセス許可」を参照してください。 |
| いいえ | 0 | 読み込み操作 (PolyBase または COPY) が取り消される前に、読み取りおよび書き込み中に拒否できる行の最大数。 拒否された行は無視されます。 たとえば、10 件中 2 件のレコードにエラーがある場合、処理されるレコードは 8 件のみです。 CREATE 外部テーブルのREJECT_VALUEドキュメントと COPY の MAXERRORS ドキュメントを参照してください。 |
tableOptions
、preActions
、postActions
、maxStrLength
は、Databricks から Azure Synapse の新しいテーブルにデータを書き込む場合にのみ関連します。externalDataSource
は、Azure Synapse からデータを読み取る場合と、PolyBase セマンティクスを使用して Databricks から Azure Synapse の新しいテーブルにデータを書き込む場合にのみ関連します。externalDataSource
の使用中は、forwardSparkAzureStorageCredentials
やuseAzureMSI
などの他のストレージ認証タイプを指定しないでください。checkpointLocation
とnumStreamingTempDirsToKeep
は、Databricks から Azure Synapse の新しいテーブルへの書き込みのストリーミングにのみ関連します。- すべてのデータソースオプション名では大文字と小文字が区別されませんが、わかりやすくするために「キャメルケース」で指定することをお勧めします。
Azure Synapse へのクエリ プッシュダウン
Azure Synapse コネクタは、一連の最適化ルールを実装します をクリックして、次の演算子を Azure Synapse にプッシュします。
Filter
Project
Limit
Project
演算子と Filter
演算子は、次の式をサポートしています。
- ほとんどのブール論理演算子
- 比較
- 基本的な算術演算
- 数値キャストと文字列キャスト
Limit
演算子の場合、プッシュダウンは順序が指定されていない場合にのみサポートされます。例えば:
SELECT TOP(10) * FROM table
ですが、 SELECT TOP(10) * FROM table ORDER BY col
は除くことができます。
Azure Synapse コネクタは、文字列、日付、またはタイムスタンプを操作する式をプッシュダウンしません。
Azure Synapse コネクタでビルドされたクエリー プッシュダウンは、デフォルトで有効になります。
無効にするには、 spark.databricks.sqldw.pushdown
を false
に設定します。
Temporary データマネジメント
Azure Synapse コネクタは、BLOB ストレージ コンテナーに作成した一時ファイル を削除しません 。そのため、ユーザーが指定した tempDir
場所にある一時ファイルを定期的に削除することをお勧めします。
データのクリーンアップを容易にするために、Azure Synapse コネクタは、 tempDir
のすぐ下にデータ ファイルを格納しません。
代わりに、 <tempDir>/<yyyy-MM-dd>/<HH-mm-ss-SSS>/<randomUUID>/
.特定のしきい値 (2 日など) より古いサブディレクトリを再帰的に削除するために、定期的なジョブを (Databricks ジョブ機能などを使用して) 設定することができます。これは、そのしきい値より長く実行される Spark ジョブは存在しないことを前提としています。
より簡単な方法は、コンテナ全体を定期的に削除し、同じ名前で新しいコンテナを作成することです。これには、Azure Synapse コネクタによって生成される一時データ専用のコンテナーを使用し、そのことを行う必要があります コネクタに関連するクエリが実行されていないことを保証できる時間枠を見つけることができます。
一時オブジェクト管理
Azure Synapse コネクタは、Databricks クラスターと Azure Synapse インスタンス間のデータ転送を自動化します。Azure Synapse テーブルまたはクエリからデータを読み取る場合、または Azure Synapse テーブルにデータを書き込む場合、
Azure Synapse コネクタは、 DATABASE SCOPED CREDENTIAL
、 EXTERNAL DATA SOURCE
、 EXTERNAL FILE FORMAT
、
そして舞台裏 EXTERNAL TABLE
。 これらのオブジェクトはライブです
対応する Spark ジョブの期間中のみ、その後は自動的に削除する必要があります。
クラスターが Azure Synapse コネクタを使用してクエリを実行している場合、 Spark ドライバー プロセスがクラッシュまたは強制的に再起動された場合、またはクラスター
が強制的に終了または再始動された場合、一時オブジェクトはドロップされない可能性があります。これらのオブジェクトの識別と手動削除を容易にするために、Azure Synapse コネクタは、Azure Synapse インスタンスで作成されたすべての中間一時オブジェクトの名前に " tmp_databricks_<yyyy_MM_dd_HH_mm_ss_SSS>_<randomUUID>_<internalObject>
" という形式のタグをプレフィックスとして付けます。
リークしたオブジェクトを定期的に探すには、次のようなクエリを使用します。
SELECT * FROM sys.database_scoped_credentials WHERE name LIKE 'tmp_databricks_%'
SELECT * FROM sys.external_data_sources WHERE name LIKE 'tmp_databricks_%'
SELECT * FROM sys.external_file_formats WHERE name LIKE 'tmp_databricks_%'
SELECT * FROM sys.external_tables WHERE name LIKE 'tmp_databricks_%'
ストリーミング・チェックポイント・テーブル管理
Azure Synapse コネクタは、新しいストリーミング クエリの開始時に作成されるストリーミング チェックポイント テーブルを削除し ません 。この動作は、DBFS の checkpointLocation
と一致しています。 そのため、定期的に削除することをお勧めします
チェックポイント テーブルと同時に、今後実行されないクエリやチェックポイントの場所が既に削除されているクエリの DBFS 上のチェックポイントの場所を削除します。
デフォルトでは、すべてのチェックポイント・テーブルの名前は <prefix>_<query-id>
です。ここで、 <prefix>
はデフォルト値 databricks_streaming_checkpoint
の設定可能なプレフィックス、 query_id
は _
文字を削除したストリーミング・クエリ ID です。 古いストリーミング クエリまたは削除されたストリーミング クエリのすべてのチェックポイント テーブルを検索するには、次のクエリを実行します。
SELECT * FROM sys.tables WHERE name LIKE 'databricks_streaming_checkpoint%'
プレフィックスは、Spark SQL 設定オプション spark.databricks.sqldw.streaming.exactlyOnce.checkpointTableNamePrefix
で設定できます。
よくある質問(FAQ)
Azure Synapse コネクタの使用中にエラーが発生しました。 このエラーが Azure Synapse によるものか Databricks によるものかはどうすればわかりますか?
エラーのデバッグに役立つように、Azure Synapse コネクタに固有のコードによってスローされた例外は、 SqlDWException
トレイトを拡張する例外にラップされます。 例外には、次の区別もあります。
SqlDWConnectorException
Azure Synapse コネクタによってスローされたエラーを表しますSqlDWSideException
は、接続された Azure Synapse インスタンスによってスローされたエラーを表します
クエリが "No access key found in the session conf or the global Hadoop conf" というエラーで失敗した場合はどうすればよいですか?
このエラーは、Azure Synapse コネクタが
ノートブック セッション構成のストレージ アカウント アクセス キー、または tempDir
で指定されたストレージ アカウントのグローバル Hadoop 構成。ストレージ アカウント アクセスを適切に構成する方法の例については、「 使用量 (バッチ)」 を参照してください。 Spark テーブルが Azure Synapse コネクタを使用して作成されている場合、
Spark テーブルの読み取りまたは書き込みを行うには、ストレージ アカウントのアクセス資格情報を引き続き指定する必要があります。
Shared Access Signature (SAS) を使用して、 tempDir
で指定された BLOB ストレージ コンテナーにアクセスできますか?
Azure Synapse では、SAS を使用して BLOB ストレージにアクセスすることはサポートされていません。 そのため、Azure Synapse コネクタは、 tempDir
で指定された BLOB ストレージ コンテナーにアクセスするための SAS をサポートしていません。
dbTable
オプションを指定して Azure Synapse コネクタを使用して Spark テーブルを作成し、この Spark テーブルにデータを書き込んでから、この Spark テーブルを削除しました。Azure Synapse 側で作成されたテーブルは削除されますか?
いいえ。 Azure Synapse は外部データソースと見なされます。 dbTable
から名前が設定された Azure Synapse テーブルは、次の場合には削除されません
Spark テーブルが削除されます。
Azure Synapse に データフレーム を書き込むときに、単に .saveAsTable(tableName)
ではなく ".option("dbTable", tableName).save()
" と言う必要があるのはなぜですか?
これは、 .option("dbTable", tableName)
が データベース (つまり、Azure Synapse) テーブルを指し、 .saveAsTable(tableName)
が Spark テーブルを指すという区別を明確にするためです。 実際、この 2 つを組み合わせることもできます。df.write. ... .option("dbTable", tableNameDW).saveAsTable(tableNameSpark)
では 、 Azure Synapse に tableNameDW
というテーブルが作成され、 Spark には Azure Synapse テーブルによってサポートされる tableNameSpark
という外部テーブルが作成されます。
.save()
と.saveAsTable()
の次の違いに注意してください。
df.write. ... .option("dbTable", tableNameDW).mode(writeMode).save()
の場合、writeMode
は期待どおりに Azure Synapse テーブルに対して動作します。df.write. ... .option("dbTable", tableNameDW).mode(writeMode).saveAsTable(tableNameSpark)
の場合、writeMode
は Spark テーブルに対して動作しますが、Azure Synapse に既に存在する場合はtableNameDW
は自動的に上書き されます。
この動作は、他のデータソースへの書き込みと変わりません。 これは、Spark データフレームWriter API の注意事項にすぎません。