構造化ストリーミングによる Azure Synapse への書き込み
このドキュメントは廃止されており、更新されない可能性があります。
Azure Synapse コネクタは、バッチ書き込みで一貫したユーザー エクスペリエンスを提供する Azure SynapseMicrosoft の効率的でスケーラブルな構造化ストリーミング書き込みサポートを提供し、Databricks クラスターと Azure Synapse インスタンス間の大規模なデータ転送に COPY
を使用します。
Databricks と Synapse 間の構造化ストリーミングのサポートにより、インクリメンタル ETL ジョブを構成するためのシンプルなセマンティクスが提供されます。 Databricks から Synapse へのデータの読み込みに使用されるモデルでは、ほぼリアルタイムのワークロードの SLA 要件を満たさない可能性がある待機時間が発生します。 「 Azure Synapse Analytics でのデータのクエリ」を参照してください。
Synapseへの書き込みのストリーミングでサポートされている出力モード
Azure Synapse コネクタは、レコードの追加と集計の Append
出力モードと Complete
出力モードをサポートしています。 出力モードと互換性マトリックスの詳細については、 構造化ストリーミング ガイドを参照してください。
Synapse フォールト トレランスのセマンティクス
デフォルトにより、Azure Synapse ストリーミングは、Azure SynapseDBFSのチェックポイントの場所、Azure Synapse のチェックポイントテーブル、およびストリーミングがあらゆるタイプのエラー、再試行を処理できるようにするためのロックメカニズムの組み合わせを使用してクエリの進行状況を確実に追跡することにより、 テーブルにデータを書き込むためのエンドツーエンドの 正確に一度 の保証を提供します。 クエリの再起動。
オプションで、 Azure Synapse オプションを false
spark.databricks.sqldw.streaming.exactlyOnce.enabled
に設定することで、 ストリーミングの制限の少ない at-least-once セマンティクスを選択できます。その場合、断続的な接続失敗や Azure Synapse へのクエリの終了が発生した場合に、データの重複が発生する可能性があります。
Azure Synapse に書き込むための構造化ストリーミング構文
次のコード例は、Scala と Python の構造化ストリーミングを使用した Synapse への書き込みのストリーミングを示しています。
- Scala
- Python
// Set up the Blob storage account access key in the notebook session conf.
spark.conf.set(
"fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
"<your-storage-account-access-key>")
// Prepare streaming source; this could be Kafka or a simple rate stream.
val df: DataFrame = spark.readStream
.format("rate")
.option("rowsPerSecond", "100000")
.option("numPartitions", "16")
.load()
// Apply some transformations to the data then use
// Structured Streaming API to continuously write the data to a table in Azure Synapse.
df.writeStream
.format("com.databricks.spark.sqldw")
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
.option("forwardSparkAzureStorageCredentials", "true")
.option("dbTable", "<your-table-name>")
.option("checkpointLocation", "/tmp_checkpoint_location")
.start()
# Set up the Blob storage account access key in the notebook session conf.
spark.conf.set(
"fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
"<your-storage-account-access-key>")
# Prepare streaming source; this could be Kafka or a simple rate stream.
df = spark.readStream \
.format("rate") \
.option("rowsPerSecond", "100000") \
.option("numPartitions", "16") \
.load()
# Apply some transformations to the data then use
# Structured Streaming API to continuously write the data to a table in Azure Synapse.
df.writeStream \
.format("com.databricks.spark.sqldw") \
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>") \
.option("forwardSparkAzureStorageCredentials", "true") \
.option("dbTable", "<your-table-name>") \
.option("checkpointLocation", "/tmp_checkpoint_location") \
.start()
構成の完全な一覧については、「 Azure Synapse Analytics でデータのクエリを実行する」を参照してください。
Synapse ストリーミング チェックポイント テーブル管理
Azure Synapse コネクタは、新しいストリーミング クエリの開始時に作成されるストリーミング チェックポイント テーブルを削除し ません 。 この動作は、オブジェクトストレージに通常指定される checkpointLocation
と一致しています。 Databricks では、今後実行されないクエリのチェックポイント テーブルを定期的に削除することをお勧めします。
デフォルトでは、すべてのチェックポイント・テーブルの名前は <prefix>_<query-id>
です。ここで、 <prefix>
はデフォルト値 databricks_streaming_checkpoint
の設定可能なプレフィックス、 query_id
は _
文字を削除したストリーミング・クエリ ID です。
古いストリーミング クエリまたは削除されたストリーミング クエリのすべてのチェックポイント テーブルを検索するには、次のクエリを実行します。
SELECT * FROM sys.tables WHERE name LIKE 'databricks_streaming_checkpoint%'
プレフィックスは、Spark SQL 設定オプション spark.databricks.sqldw.streaming.exactlyOnce.checkpointTableNamePrefix
で設定できます。
Databricks Synapse コネクタ ストリーミング オプションのリファレンス
Spark SQL で提供される OPTIONS
では、 バッチ オプションに加えて、ストリーミングの次のオプションがサポートされています。
パラメーター | 必須 | デフォルト | 注 |
---|---|---|---|
| あり | デフォルトなし | 構造化ストリーミングがメタデータとチェックポイント情報を書き込むために使用する DBFS 上の場所。 構造化ストリーミング・プログラミング・ガイドの チェックポイント処理による障害からのリカバリー を参照してください。 |
| いいえ | 0 | ストリーミングのマイクロバッチの定期的なクリーンアップのために保持する(最新の)一時ディレクトリの数を示します。 |
checkpointLocation
と numStreamingTempDirsToKeep
は、Databricks から Azure Synapse の新しいテーブルへの書き込みのストリーミングにのみ関連します。