メインコンテンツまでスキップ

リアルタイム mode in 構造化ストリーミング

備考

プレビュー

この機能は パブリック プレビュー段階です。

このページでは、5 ミリ秒というエンドツーエンドの待機時間で超低遅延のデータ処理を可能にする構造化ストリーミングのトリガータイプであるリアルタイムモードについて説明します。このモードは、ストリーミング データへの即時の応答が必要な運用ワークロード向けに設計されています。

リアルタイム モードは、Databricks Runtime 16.4 LTS 以降で使用できます。

運用ワークロード

ストリーミング ワークロードは、分析ワークロードと運用ワークロードに大きく分けることができます。

  • 分析ワークロードでは、通常はメダリオンアーキテクチャに従ってデータ取り込みと変換を使用します (たとえば、ブロンズ、シルバー、ゴールドのテーブルにデータを取り込むなど)。
  • 運用ワークロードは、リアルタイムデータを消費し、ビジネスロジックを適用し、ダウンストリームのアクションや決定をトリガーします。

運用ワークロードの例を次に示します。

  • 不正行為のスコアがしきい値を超えた場合に、異常な場所、大規模な取引サイズ、急速な支出パターンなどの要因に基づいて、クレジット カード取引をリアルタイムでブロックまたはフラグを立てます。
  • クリックストリームデータでユーザーがジーンズを5分間閲覧していた場合にプロモーションメッセージを配信し、その後15分以内に購入した場合は25%の割引を提供します。

一般に、運用ワークロードは、1秒未満のエンドツーエンドのレイテンシーの必要性によって特徴付けられます。これは、 Apache Spark 構造化ストリーミングのリアルタイムモードで実現できます。

リアルタイム モードが低遅延を実現する方法

リアルタイムモードは、次の方法で実行アーキテクチャを改善します。

  • 実行時間の長いバッチ (デフォルトは 5 分) を実行し、ソースでデータが使用可能になったときに処理されます。
  • クエリのすべてのステージは同時にスケジュールされます。これには、使用可能なタスクスロットの数が、バッチ内のすべてのステージのタスクの数以上である必要があります。
  • データは、ストリーミング シャッフルを使用して生成されるとすぐにステージ間で渡されます。

処理が終了すると、 バッチ 次の開始前に、 ストリーミング チェックポイントが進行し、 最後の パケット の メトリクス 使用可能になります。 バッチが長い場合、これらのアクティビティの頻度が低くなり、障害が発生した場合の再生時間が長くなり、メトリクスの可用性が遅れる可能性があります。一方、バッチが小さい場合、これらのアクティビティの頻度が高くなり、レイテンシーに影響を与える可能性があります。Databricks では、ターゲット ワークロードと要件に対してリアルタイム モードをベンチマークし、適切なトリガー間隔を見つけることをお勧めします。

クラスタリング構成

構造化ストリーミングでリアルタイム モードを使用するには、クラシック ストリーミングを構成する必要がありますLakeFlow Job

  1. Databricks ワークスペースで、左上隅にある [ 新規 ] をクリックします。 [More ] を選択し、[ クラスタリング ] をクリックします。

  2. クリア なPhoton加速

  3. 「オートスケールを有効にする」 をクリアします。

  4. [Advanced performance ] で、[ Use spot instances ] をオフにします。

  5. [詳細 および アクセスモード ]で、[ 手動 ]をクリックし、[ 専用(旧称:シングルユーザー)] を選択します。

  6. [Spark (Spark の Spark)] で、[ Spark config (Spark の設定 )] に次のように入力します。

    spark.databricks.streaming.realTimeMode.enabled true
  7. 作成 をクリックします。

クラスタリング サイズの要件

クラスタリングに十分なタスク スロットがある場合は、クラスタリングごとに 1 つのリアルタイム ジョブを実行できます。

低遅延モードで実行するには、使用可能なタスク スロットの合計数が、すべてのクエリ ステージのタスク数以上である必要があります。

スロット計算の例

シングルステージのステートレスパイプライン(Kafka ソース + シンク):

maxPartitions = 8 の場合、少なくとも 8 つのスロットが必要です。maxPartitions が設定されていない場合は、Kafka トピックパーティションの数を使用します。

2 段階のステートフル パイプライン (Kafka ソース + シャッフル):

maxPartitions = 8 でパーティションのシャッフル = 20 の場合、8 + 20 = 28 スロットが必要です。

3 段階のパイプライン (Kafka ソース + シャッフル + 再パーティション):

maxPartitions = 8 で、それぞれ 20 の 2 つのシャッフル ステージの場合、8 + 20 + 20 = 48 スロットが必要です。

主な考慮事項

クラスタリングを設定するときは、次の点を考慮してください。

  • マイクロバッチモードとは異なり、リアルタイムタスクはデータを待っている間、アイドル状態のままになる可能性があるため、リソースの浪費を避けるためには、適切なサイジングが不可欠です。

  • 目標使用率レベル (50% など) を目指すには、次のように調整します。

    • maxPartitions ( Kafka対象)
    • spark.sql.shuffle.partitions (シャッフルステージ用)
  • Databricks では、オーバーヘッドを減らすために、各タスクが複数の Kafka パーティションを処理するように maxPartitions を設定することをお勧めします。

  • ワーカーごとにタスクスロットを調整して、単純な 1 ステージジョブのワークロードに一致させます。

  • シャッフルを多用するジョブの場合、エクスペリメントはバックログを回避し、そこから調整するシャッフルパーティションの最小数を見つけます。クラスタリングに十分なスロットがない場合、ジョブはスケジュールされません。

注記

Databricks Runtime 16.4 LTS 以降では、すべてのリアルタイム パイプラインでチェックポイント v2 が使用され、リアルタイム モードとマイクロバッチ モードをシームレスに切り替えることができます。

クエリ構成

リアルタイム トリガーを有効にして、クエリを低遅延モードで実行するように指定する必要があります。さらに、リアルタイムトリガーは更新モードでのみサポートされます。例えば:

Scala
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

val readStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic).load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.outputMode(“update”)
.trigger(RealTimeTrigger.apply())
.start()

RealTimeTrigger は、チェックポイント間隔を指定する引数も受け入れることができます。たとえば、次のコードは、チェックポイント間隔が 5 分であることを示しています。

Scala
.trigger(RealTimeTrigger.apply("5 minutes"))

オブザーバビリティ

以前は、エンドツーエンドのクエリレイテンシはバッチのデュテンシと密接に関連していたため、バッチのデュテンシはクエリのレイテンシの良い指標となっていました。ただし、この方法はリアルタイム モードでは適用されなくなり、レイテンシを測定するための別のアプローチが必要になります。エンドツーエンドのレイテンシはワークロードに固有であり、ビジネスロジックでしか正確に測定できない場合があります。たとえば、ソースのタイムスタンプが Kafka で出力される場合、レイテンシは Kafka の出力タイムスタンプとソースのタイムスタンプの差として計算できます。

エンドツーエンドのレイテンシは、ストリーミングプロセス中に収集された部分的な情報に基づいて、いくつかの方法で見積もることができます。

StreamingQueryProgress を使用する

次のメトリクスは StreamingQueryProgress イベントに含まれており、ドライバー ログに自動的に記録されます。 また、 StreamingQueryListeneronQueryProgress() コールバック関数からアクセスすることもできます。QueryProgressEvent.json() または toString() には、追加のリアルタイム モード メトリクスが含まれています。

  1. 処理レイテンシ (processingLatencyMs)。 リアルタイム・モード・クエリがレコードを読み取ってから、次のステージまたはダウンストリームに書き込まれるまでの経過時間。シングルステージクエリの場合、これはE2Eレイテンシと同じ期間を測定します。このメトリクスは、タスクごとに報告されます。
  2. ソース キューイング遅延 (sourceQueuingLatencyMs)。 レコードがメッセージバスに正常に書き込まれてから (Kafka のログ追加時間など) から、レコードがリアルタイムモードクエリによって最初に読み取られるまでにかかった時間。このメトリクスは、タスクごとに報告されます。
  3. E2E 遅延 (e2eLatencyMs)。 レコードがメッセージ・バスに正常に書き込まれてから、レコードがリアルタイム・モード・クエリによってダウンストリームに書き込まれるまでの時間。このメトリクスは、すべてのタスクによって処理されたすべてのレコードのバッチごとに集計されます。

例えば:

"rtmMetrics" : {
"processingLatencyMs" : {
"P0" : 0,
"P50" : 0,
"P90" : 0,
"P95" : 0,
"P99" : 0
},
"sourceQueuingLatencyMs" : {
"P0" : 0,
"P50" : 1,
"P90" : 1,
"P95" : 2,
"P99" : 3
},
"e2eLatencyMs" : {
"P0" : 0,
"P50" : 1,
"P90" : 1,
"P95" : 2,
"P99" : 4
},

ジョブで Observe API を使用する

Observe API は、別のジョブを起動せずにレイテンシを測定するのに役立ちます。ソース データの到着時刻を概算したソース タイムスタンプがあり、シンクに到達する前にそのタイムスタンプが渡された場合、またはタイムスタンプを渡す方法が見つかった場合は、Observe API を使用して各バッチのレイテンシを見積もることができます。

Scala
val currentTimestampUDF = udf(() => System.currentTimeMillis())

// Query before outputting
.withColumn("temp-timestamp", currentTimestampUDF())
.withColumn(
"latency",
col("temp-timestamp").cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
name = "observedLatency",
avg(col("latency")).as("avg"),
max(col("latency")).as("max"),
percentile_approx(col("latency"), lit(0.99), lit(150)).as("p99"),
percentile_approx(col("latency"), lit(0.5), lit(150)).as("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
// Output part of the query. For example, .WriteStream, etc.

この例では、エントリを出力する前に現在のタイムスタンプが記録され、このタイムスタンプとレコードのソースタイムスタンプの差を計算することでレイテンシーが推定されます。結果は進行状況レポートに含まれ、リスナーが利用できるようになります。出力例を次に示します。

"observedMetrics" : {
"observedLatency" : {
"avg" : 63.8369765176552,
"max" : 219,
"p99" : 154,
"p50" : 49
}
}

何がサポートされているのですか?

環境

クラスタータイプ

サポート

専用(旧:シングルユーザー)

Yes

標準(旧:共有)

No

LakeFlow Declarative パイプライン Classic

No

LakeFlow Declarative パイプライン サーバレス

No

サーバーレス

No

言語

言語

サポート

Scala

Yes

Java

Yes

Python

No

実行モード

実行 Mode

サポート

更新モード

Yes

追加モード

No

コンプリートモード

No

ソース

ソース

サポート

Apache Kafka

Yes

AWS MSK

Yes

Eventhub (Kafka Connector を使用)

Yes

Kinesis

はい(EFOモードのみ)

Google Pub/Sub

No

Apache パルサー

No

シンク

シンク

サポート

Apache Kafka

Yes

Eventhub (Kafka Connector を使用)

Yes

Kinesis

No

Google Pub/Sub

No

Apache パルサー

No

任意のシンク (forEachWriter を使用)

Yes

演算子

演算子

サポート

ステートレス操作

-選定

Yes

-投射

Yes

UDF

  • Scala UDF

Yes

  • Python UDF (英語)

No

集計

-和

Yes

-数える

Yes

-マックス

Yes

-分

Yes

-平均

Yes

集計関数

Yes

ウィンドウ

-タンブリング

Yes

-スライディング

Yes

-セッション

No

重複排除

  • ドロップ重複

はい (状態は無制限です)

  • dropDuplicatesWithinウォーターマーク

No

ストリーム - テーブル結合

  • 放送テーブル(小さいもの)

Yes

ストリーム - ストリーム結合

No

(フラット)MapGroupsWithState (英語)

No

transformWithState(トランスフォーム・ウィズ・ステート)

はい(一部異なります)

union

はい (一部制限があります)

For each

Yes

forEachBatch

No

マップパーティション

Yes

transformWithState をリアルタイム モードで使用する

カスタムステートフルアプリケーションを構築するために、Databricks は Apache Spark 構造化ストリーミングのAPIである transformWithStateをサポートしています。APIとコード スニペットの詳細については、「カスタム ステートフル アプリケーションの構築」を参照してください。

ただし、API がリアルタイム モードで動作する方法と、マイクロバッチ アーキテクチャを活用する従来のストリーミング クエリとの間には、いくつかの違いがあります。

  • リアルタイム モード handleInputRows(key: String, inputRows: Iterator[T], timerValues: TimerValues) のメソッドは、各行に対して呼び出されます。

    • inputRows イテレータは 1 つの値を返します。マイクロバッチモードでは、キーごとに 1 回呼び出され、 inputRows イテレータはマイクロバッチ内のキーのすべての値を返します。
    • 彼らのコードを書くときは、この違いを認識する必要があります。
  • イベント時間タイマーは、リアルタイム モードではサポートされていません。

  • リアルタイムモードでは、データの到着に応じてタイマーの発射が遅れます。それ以外の場合、データがない場合は、実行時間の長いバッチの終了時に発生します。たとえば、タイマーが 10:00:00 に起動することになっていて、同時にデータ到着がない場合、タイマーは起動されません。代わりに、データが 10:00:10 に到着した場合、タイマーは 10 秒の遅延で起動されます。または、データが到着せず、実行時間の長いバッチが終了している場合は、実行時間の長いバッチを終了する前にタイマーが実行されます。

制限

ソースの制限

Kinesis では、ポーリングモードはサポートされていません。さらに、頻繁にパーティションを再分割すると、レイテンシーに悪影響を与える可能性があります。

ユニオンの制限

Union には、いくつかの制限があります。

  • 自己結合はサポートされていません。

    • Kafka : 同じソースデータフレームオブジェクトと、そこからユニオン派生したデータフレームを使用することはできません。回避策: 同じソースから読み取る異なるデータフレームを使用します。
    • Kinesis : 同じ設定の同じ Kinesis ソースから派生したデータフレームをユニオンすることはできません。回避策: 異なる Dataframe を使用するだけでなく、各 DataFrame に異なる 'consumerName' オプションを割り当てることができます。
  • ユニオンの前に定義されたステートフル演算子 ( aggregatededuplicatetransformWithStateなど) はサポートされていません。

  • バッチソースとの結合はサポートされていません。

次の例は、サポートされているクエリを示しています。

ステートレス クエリ

単一または複数ステージのステートレスクエリがサポートされています。

Kafka ソースから Kafka シンクへ

この例では、Kafka ソースから読み取り、Kafka シンクに書き込みます。

Scala
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", inputTopic)
.load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", Files.createTempDirectory("some-prefix").toFile.getName)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()

パーティション

この例では、Kafka ソースから読み取り、データを 20 個のパーティションに再パーティション分割して、Kafka シンクに書き込みます。

現在の実装上の制限により、再パーティションを使用する前に、Spark 構成 spark.sql.execution.sortBeforeRepartitionfalse に設定してください。

Scala
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

// Sorting is not supported in repartition with real-time mode, so this has to be set to false to achieve low latency.
spark.conf.set("spark.sql.execution.sortBeforeRepartition", "false")


spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", inputTopic)
.load()
.repartition(20)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", Files.createTempDirectory("some-prefix").toFile.getName)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()

ストリームスナップショット結合 (ブロードキャストのみ)

この例では、Kafka から読み取り、データを静的テーブルと結合して、Kafka シンクに書き込みます。静的テーブルをブロードキャストするストリーム静的結合のみがサポートされているため、静的テーブルはメモリに収まる必要があることに注意してください。

Scala
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", inputTopic)
.load()
.join(broadcast(spark.read.format("parquet").load(staticTableLocation)), expr("joinKey = lookupKey"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", Files.createTempDirectory("some-prefix").toFile.getName)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()

Kinesis ソースから Kafka シンクへ

この例では、Kinesis ソースから読み取り、Kafka シンクに書き込みます。

Scala
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

spark.readStream
.format("kinesis")
.option(REGION_KEY, regionName)
.option(AWS_ACCESS_ID_KEY, awsAccessKeyId)
.option(AWS_SECRET_KEY, awsSecretAccessKey)
.option(CONSUMER_MODE_KEY, CONSUMER_MODE_EFO)
.option(CONSUMER_NAME_KEY, kinesisSourceStream.consumerName)
.load()
.select(
col("partitionKey").alias("key"),
col("data").cast("string").alias("value")
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", Files.createTempDirectory("some-prefix").toFile.getName)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()

組合

この例では、2 つの異なるトピックの 2 つの Kafka DataFrames をユニオンし、 Kafka シンクに書き込みます。

Scala
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

val df1 = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", inputTopic1)
.load()

val df2 = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", inputTopic2)
.load()

df1.union(df2)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", Files.createTempDirectory("some-prefix").toFile.getName)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()

ステートフル クエリ

重複 排除

Scala
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

spark.conf.set(SQLConf.SHUFFLE_PARTITIONS.key, 40)
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", inputTopic)
.load()
.dropDuplicates("timestamp", "value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", Files.createTempDirectory("some-prefix").toFile.getName)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()

集合体

Scala
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

spark.conf.set(SQLConf.SHUFFLE_PARTITIONS.key, 20)
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", inputTopic)
.load()
.groupBy(col("timestamp"), col("value"))
.count()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", Files.createTempDirectory("some-prefix").toFile.getName)
.trigger(RealTimeTrigger.apply("5 minutes"))
.outputMode(OutputMode.Update())
.start()

アグリゲーションとの結合

この例では、最初に 2 つの異なるトピックの 2 つの Kafka DataFrames を結合し、次に集計を行います。 最終的には、Kafka シンクに書き込みます。

Scala
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

val df1 = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", inputTopic1)
.load()

val df2 = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", inputTopic2)
.load()

df1.union(df2)
.groupBy(col("timestamp"), col("value"))
.count()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", Files.createTempDirectory("some-prefix").toFile.getName)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()

TransformWithState(トランスフォーム・ウィズ・ステート)

Scala
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

/**
* This processor counts the number of records it has seen for each key using state variables
* with TTLs. It redundantly maintains this count with a value, list, and map state to put load
* on the state variable cleanup mechanism. (In practice, only one value state is needed to maintain
* the count for a given grouping key.)
*
* The input schema it expects is (String, Long) which represents a (key, source-timestamp) tuple.
* The source-timestamp is passed through so that we can calculate end-to-end latency. The output
* schema is (String, Long, Long), which represents a (key, count, source-timestamp) 3-tuple.
*
*/

class RTMStatefulProcessor(ttlConfig: TTLConfig)
extends StatefulProcessor[String, (String, Long), (String, Long, Long)] {
@transient private var _value: ValueState[Long] = _
@transient private var _map: MapState[Long, String] = _
@transient private var _list: ListState[String] = _

override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
// Counts the number of records this key has seen
_value = getHandle.getValueState("value", Encoders.scalaLong, ttlConfig)
_map = getHandle.getMapState("map", Encoders.scalaLong, Encoders.STRING, ttlConfig)
_list = getHandle.getListState("list", Encoders.STRING, ttlConfig)
}

override def handleInputRows(
key: String,
inputRows: Iterator[(String, Long)],
timerValues: TimerValues): Iterator[(String, Long, Long)] = {
inputRows.map { row =>
val key = row._1
val sourceTimestamp = row._2

val oldValue = _value.get()
_value.update(oldValue + 1)
_map.updateValue(oldValue, key)
_list.appendValue(key)

(key, oldValue + 1, sourceTimestamp)
}
}
}

spark.conf.set(SQLConf.SHUFFLE_PARTITIONS.key, 20)
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", inputTopic)
.load()
.select(col("key").cast("STRING"), col("value").cast("STRING"), col("timestamp"))
.as[(String, String, Timestamp)]
.groupByKey(row => row._1)
.transformWithState(new RTMStatefulProcessor(TTLConfig(Duration.ofSeconds(30))), TimeMode.ProcessingTime, OutputMode.Update)
.as[(String, Long, Long)]
.select(
col("_1").as("key"),
col("_2").as("value")
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", Files.createTempDirectory("some-prefix").toFile.getName)
.trigger(RealTimeTrigger.apply("5 minutes"))
.outputMode(OutputMode.Update())
.start()
注記

構造化ストリーミングのリアルタイムモードと他の実行モードが StatefulProcessor を実行する方法には違いがあります transformWithState「リアルタイム モードでの transformWithState の使用」を参照してください。

シンク

foreachSink による Postgres への書き込み

Scala
import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.spark.sql.{ForeachWriter, Row}

/**
* Groups connection properties for
* the JDBC writers.
*
* @param url JDBC url of the form jdbc:subprotocol:subname to connect to
* @param dbtable database table that should be written into
* @param username username for authentication
* @param password password for authentication
*/
class JdbcWriterConfig(
val url: String,
val dbtable: String,
val username: String,
val password: String,
) extends Serializable

/**
* Handles streaming data writes to a database sink via JDBC, by:
* - connecting to the database
* - buffering incoming data rows in batches to reduce write overhead
*
* @param config connection parameters and configuration knobs for the writer
*/
class JdbcStreamingDataWriter(config: JdbcWriterConfig)
extends ForeachWriter[Row] with Serializable {
// The writer currently only supports this hard-coded schema
private val UPSERT_STATEMENT_SQL =
s"""MERGE INTO "${config.dbtable}"
|USING (
| SELECT
| CAST(? AS INTEGER) AS "id",
| CAST(? AS CHARACTER VARYING) AS "data"
|) AS "source"
|ON "test"."id" = "source"."id"
|WHEN MATCHED THEN
| UPDATE SET "data" = "source"."data"
|WHEN NOT MATCHED THEN
| INSERT ("id", "data") VALUES ("source"."id", "source"."data")
|""".stripMargin

private val MAX_BUFFER_SIZE = 3
private val buffer = new Array[Row](MAX_BUFFER_SIZE)
private var bufferSize = 0

private var connection: Connection = _

/**
* Flushes the [[buffer]] by writing all rows in the buffer to the database.
*/
private def flushBuffer(): Unit = {
require(connection != null)

if (bufferSize == 0) {
return
}

var upsertStatement: PreparedStatement = null

try {
upsertStatement = connection.prepareStatement(UPSERT_STATEMENT_SQL)

for (i <- 0 until bufferSize) {
val row = buffer(i)
upsertStatement.setInt(1, row.getAs[String]("key"))
upsertStatement.setString(2, row.getAs[String]("value"))
upsertStatement.addBatch()
}

upsertStatement.executeBatch()
connection.commit()

bufferSize = 0
} catch { case e: Exception =>
if (connection != null) {
connection.rollback()
}
throw e
} finally {
if (upsertStatement != null) {
upsertStatement.close()
}
}
}

override def open(partitionId: Long, epochId: Long): Boolean = {
connection = DriverManager.getConnection(config.url, config.username, config.password)
true
}

override def process(row: Row): Unit = {
buffer(bufferSize) = row
bufferSize += 1
if (bufferSize >= MAX_BUFFER_SIZE) {
flushBuffer()
}
}

override def close(errorOrNull: Throwable): Unit = {
flushBuffer()
if (connection != null) {
connection.close()
connection = null
}
}
}


spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", inputTopic)
.load()
.writeStream
.outputMode(OutputMode.Update())
.trigger(defaultTrigger)
.foreach(new JdbcStreamingDataWriter(new JdbcWriterConfig(jdbcUrl, tableName, jdbcUsername, jdbcPassword)))
.start()

陳列

important

この機能は、Databricks Runtime 17.1 以降で使用できます。

表示レートソース

この例では、レート ソースから読み取り、ストリーミング DataFrame をノートブックに表示します。

Scala
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.streaming.OutputMode

val inputDF = spark
.readStream
.format("rate")
.option("numPartitions", 2)
.option("rowsPerSecond", 1)
.load()
display(inputDF, trigger=Trigger.RealTime("30 seconds"), outputMode=OutputMode.Update())