メインコンテンツまでスキップ

構造化ストリーミングによる Azure Synapse への書き込み

important

このドキュメントは廃止されており、更新されない可能性があります。

Azure Synapse コネクタは、バッチ書き込みで一貫したユーザー エクスペリエンスを提供する Azure SynapseMicrosoft の効率的でスケーラブルな構造化ストリーミング書き込みサポートを提供し、Databricks クラスターと Azure Synapse インスタンス間の大規模なデータ転送に COPY を使用します。

Databricks と Synapse 間の構造化ストリーミングのサポートにより、インクリメンタル ETL ジョブを構成するためのシンプルなセマンティクスが提供されます。 Databricks から Synapse へのデータの読み込みに使用されるモデルでは、ほぼリアルタイムのワークロードの SLA 要件を満たさない可能性がある待機時間が発生します。 「 Azure Synapse Analytics でのデータのクエリ」を参照してください。

Synapseへの書き込みのストリーミングでサポートされている出力モード

Azure Synapse コネクタは、レコードの追加と集計の Append 出力モードと Complete 出力モードをサポートしています。 出力モードと互換性マトリックスの詳細については、 構造化ストリーミング ガイドを参照してください。

Synapse フォールト トレランスのセマンティクス

デフォルトにより、Azure Synapse ストリーミングは、Azure SynapseDBFSのチェックポイントの場所、Azure Synapse のチェックポイントテーブル、およびストリーミングがあらゆるタイプのエラー、再試行を処理できるようにするためのロックメカニズムの組み合わせを使用してクエリの進行状況を確実に追跡することにより、 テーブルにデータを書き込むためのエンドツーエンドの 正確に一度 の保証を提供します。 クエリの再起動。

オプションで、 Azure Synapse オプションを false spark.databricks.sqldw.streaming.exactlyOnce.enabledに設定することで、 ストリーミングの制限の少ない at-least-once セマンティクスを選択できます。その場合、断続的な接続失敗や Azure Synapse へのクエリの終了が発生した場合に、データの重複が発生する可能性があります。

Azure Synapse に書き込むための構造化ストリーミング構文

次のコード例は、Scala と Python の構造化ストリーミングを使用した Synapse への書き込みのストリーミングを示しています。

Scala
// Set up the Blob storage account access key in the notebook session conf.
spark.conf.set(
"fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
"<your-storage-account-access-key>")

// Prepare streaming source; this could be Kafka or a simple rate stream.
val df: DataFrame = spark.readStream
.format("rate")
.option("rowsPerSecond", "100000")
.option("numPartitions", "16")
.load()

// Apply some transformations to the data then use
// Structured Streaming API to continuously write the data to a table in Azure Synapse.

df.writeStream
.format("com.databricks.spark.sqldw")
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
.option("forwardSparkAzureStorageCredentials", "true")
.option("dbTable", "<your-table-name>")
.option("checkpointLocation", "/tmp_checkpoint_location")
.start()

構成の完全な一覧については、「 Azure Synapse Analytics でデータのクエリを実行する」を参照してください。

Synapse ストリーミング チェックポイント テーブル管理

Azure Synapse コネクタは、新しいストリーミング クエリの開始時に作成されるストリーミング チェックポイント テーブルを削除し ません 。 この動作は、オブジェクトストレージに通常指定される checkpointLocation と一致しています。 Databricks では、今後実行されないクエリのチェックポイント テーブルを定期的に削除することをお勧めします。

デフォルトでは、すべてのチェックポイント・テーブルの名前は <prefix>_<query-id>です。ここで、 <prefix> はデフォルト値 databricks_streaming_checkpoint の設定可能なプレフィックス、 query_id_ 文字を削除したストリーミング・クエリ ID です。

古いストリーミング クエリまたは削除されたストリーミング クエリのすべてのチェックポイント テーブルを検索するには、次のクエリを実行します。

SQL
SELECT * FROM sys.tables WHERE name LIKE 'databricks_streaming_checkpoint%'

プレフィックスは、Spark SQL 設定オプション spark.databricks.sqldw.streaming.exactlyOnce.checkpointTableNamePrefixで設定できます。

Databricks Synapse コネクタ ストリーミング オプションのリファレンス

Spark SQL で提供される OPTIONS では、 バッチ オプションに加えて、ストリーミングの次のオプションがサポートされています。

パラメーター

必須

デフォルト

checkpointLocation

あり

デフォルトなし

構造化ストリーミングがメタデータとチェックポイント情報を書き込むために使用する DBFS 上の場所。 構造化ストリーミング・プログラミング・ガイドの チェックポイント処理による障害からのリカバリー を参照してください。

numStreamingTempDirsToKeep

いいえ

0

ストリーミングのマイクロバッチの定期的なクリーンアップのために保持する(最新の)一時ディレクトリの数を示します。 0に設定すると、マイクロバッチがコミットされた直後にディレクトリの削除がトリガーされ、それ以外の場合は最新のマイクロバッチの数が保持され、残りのディレクトリが削除されます。-1 を使用して、定期的なクリーンアップを無効にします。

注記

checkpointLocationnumStreamingTempDirsToKeep は、Databricks から Azure Synapse の新しいテーブルへの書き込みのストリーミングにのみ関連します。