構造化ストリーミングは Azure Synapse に書き込みます

Azure Synapse コネクタは、バッチ書き込みで一貫したユーザー エクスペリエンスを提供し、Databricks クラスターと Azure Synapse インスタンス間の大規模なデータ転送に COPY を使用する、Azure Synapse の効率的でスケーラブルな構造化ストリーミング書き込みサポートを提供します。

Databricks と Synapse の間の構造化ストリーミングのサポートにより、増分 ETL ジョブを構成するためのシンプルなセマンティクスが提供されます。 Databricks から Synapse にデータを読み込むために使用されるモデルでは、ほぼリアルタイムのワークロードの SLA 要件を満たさない可能性がある待機時間が発生します。 「 Azure Synapse Analytics のクエリー データ」を参照してください。

Synapse への書き込みのストリーミングでサポートされている出力モード

Azure Synapse コネクタでは、レコードの追加と集計の Append モードと Complete 出力モードがサポートされています。 出力モードと互換性マトリックスの詳細については、 構造化ストリーミング ガイドを参照してください。

Synapse フォールト トレランス セマンティクス

既定により、Azure Synapse ストリーミングは、DBFS のチェックポイントの場所、Azure Synapse のチェックポイント テーブル、およびロック メカニズムの組み合わせを使用してクエリーの進行状況を確実に追跡し、ストリーミングがあらゆる種類のエラー、再試行、クエリーの再起動を処理できるようにすることで、Azure Synapse テーブルにデータを書き込むためのエンド ツー エンド の正確な 保証を提供します。

必要に応じて、 spark.databricks.sqldw.streaming.exactlyOnce.enabled オプションを falseに設定することで、Azure Synapse ストリーミングに対して制限の少ない少なくとも 1 回セマンティクスを選択できますが、その場合、Azure Synapse への接続エラーが断続的に発生した場合や、予期しないクエリの終了が発生した場合にデータの重複が発生する可能性があります。

Azure Synapse に書き込むための構造化ストリーミング構文

次のコード例は、Scala と Python で構造化ストリーミングを使用して Synapse への書き込みをストリーミングする方法を示しています。

// Set up the Blob storage account access key in the notebook session conf.
spark.conf.set(
  "fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
  "<your-storage-account-access-key>")

// Prepare streaming source; this could be Kafka or a simple rate stream.
val df: DataFrame = spark.readStream
  .format("rate")
  .option("rowsPerSecond", "100000")
  .option("numPartitions", "16")
  .load()

// Apply some transformations to the data then use
// Structured Streaming API to continuously write the data to a table in Azure Synapse.

df.writeStream
  .format("com.databricks.spark.sqldw")
  .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
  .option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
  .option("forwardSparkAzureStorageCredentials", "true")
  .option("dbTable", "<your-table-name>")
  .option("checkpointLocation", "/tmp_checkpoint_location")
  .start()
# Set up the Blob storage account access key in the notebook session conf.
spark.conf.set(
  "fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
  "<your-storage-account-access-key>")

# Prepare streaming source; this could be Kafka or a simple rate stream.
df = spark.readStream \
  .format("rate") \
  .option("rowsPerSecond", "100000") \
  .option("numPartitions", "16") \
  .load()

# Apply some transformations to the data then use
# Structured Streaming API to continuously write the data to a table in Azure Synapse.

df.writeStream \
  .format("com.databricks.spark.sqldw") \
  .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
  .option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>") \
  .option("forwardSparkAzureStorageCredentials", "true") \
  .option("dbTable", "<your-table-name>") \
  .option("checkpointLocation", "/tmp_checkpoint_location") \
  .start()

構成の完全な一覧については、「 Azure Synapse Analytics のクエリー データ」を参照してください。

ストリーミングチェックポイントテーブル管理 Synaps e

Azure Synapse コネクタでは、新しいストリーミング クエリの開始時に作成されるストリーミング チェックポイント テーブルは削除 されません 。 この動作は、オブジェクトストレージに通常指定される checkpointLocation と一致しています。 Databricks では、今後実行されないクエリーのチェックポイント テーブルを定期的に削除することをお勧めします。

デフォルトでは、すべてのチェックポイント・テーブルの名前は <prefix>_<query-id>になります。ここで、 <prefix> はデフォルト値が databricks_streaming_checkpoint を持つ構成可能な接頭部で、 query_id_ 文字が除去されたストリーミング・クエリー ID です。

古いストリーミングまたは削除されたストリーミングクエリーのすべてのチェックポイントテーブルを検索するには、クエリーを実行します。

SELECT * FROM sys.tables WHERE name LIKE 'databricks_streaming_checkpoint%'

プレフィックスは、Spark SQL 構成オプション spark.databricks.sqldw.streaming.exactlyOnce.checkpointTableNamePrefixを使用して構成できます。

Databricks Synapse コネクタのストリーミング オプションのリファレンス

Spark SQL で提供される OPTIONS では、バッチ オプションに加えて、ストリーミング用の次の オプションがサポートされています。

パラメーター

必須

デフォルト

checkpointLocation

はい

デフォルトなし

メタデータとチェックポイント情報を書き込むために構造化ストリーミングによって使用される DBFS 上の場所。 構造化ストリーミングの「 チェックポイント処理による障害からの回復 」プログラミングガイドを参照してください。

numStreamingTempDirsToKeep

いいえ

0

ストリーミング内のマイクロバッチを定期的にクリーンアップするために保持する(最新の)一時ディレクトリの数を示します。 0に設定すると、マイクロバッチがコミットされた直後にディレクトリの削除がトリガーされ、それ以外の場合は、最新のマイクロバッチの数が保持され、残りのディレクトリが削除されます。定期的なクリーンアップを無効にするには、 -1 を使用します。

checkpointLocation numStreamingTempDirsToKeep は、Azure Synapse の新しいテーブルへの Databricks からのストリーミング書き込みにのみ関連します。