メインコンテンツまでスキップ

リアルタイムモードの例

備考

プレビュー

この機能は パブリック プレビュー段階です。

このページでは、単純なステートレス変換からカスタム状態管理を使用した複雑なステートフル処理まで、構造化ストリーミングでのログイン モード クエリの実用的なコード例を提供します。 概念と構成については「構造化ストリーミング」の「進行モード」を参照してください。実践的なチュートリアルについては「進行モードで始める」を参照してください

前提条件

このページの例を実行するには、次のものが必要です。

  • リアルタイム モード クラスターが構成され、実行されています。詳細なセットアップ手順については、 「リアルタイム モードの開始」を参照してください。

  • Databricks Runtime 16.4 LTS 以上。

  • サポートされているストリーミング ソースとシンクへのアクセス:

    • Kafkaの例: 入力/出力トピックが構成されたKafkaブローカー
    • Kinesis の例: AWS 認証情報と拡張ファンアウト (EFO) モード用に構成された Kinesis ストリーム
    • カスタム シンクの例: ターゲット データベースまたはサービスが構成されている (提供されている例では PostgreSQL)
  • 構造化ストリーミングの概念に関する基本的な知識。ストリーミングを初めて使用する場合は、構造化ストリーミングの概念を参照してください。

注記

例では、 broker_addressinput_topiccheckpoint_locationなどのプレースホルダー値を使用しています。コードを実行する前に、これらを実際の構成値に置き換えてください。

ステートレスクエリの例

ステートレス クエリは、レコード間の状態を維持せずに各レコードを個別に処理します。これらのクエリは通常、状態ストレージを管理したり、ルックアップを実行したりする必要がないため、ステートフル クエリよりも単純で、レイテンシが低くなります。変換、フィルタリング、静的データとの結合、ルーティング操作にはステートレス クエリを使用します。

Kafka ソースから Kafka シンクへ

この例では、Kafka ソースから読み取り、Kafka シンクに書き込みます。

Python
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic)
.load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)

再分割

この例では、Kafka ソースから読み取り、データを 20 個のパーティションに再分割し、Kafka シンクに書き込みます。

現在の実装の制限により、再パーティションを使用する前に Spark 構成spark.sql.execution.sortBeforeRepartition falseに設定する必要があります。

Python
# Sorting is not supported in repartition with real-time mode, so you must set this to false to achieve low latency.
spark.conf.set("spark.sql.execution.sortBeforeRepartition", "false")

query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.option("startingOffsets", "earliest")
.load()
.repartition(20)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)

ストリームスナップショット参加(ブロードキャストのみ)

この例では、Kafka から読み取り、データを静的テーブルと結合し、Kafka シンクに書き込みます。静的テーブルをブロードキャストするストリーム静的結合のみがサポートされています。つまり、静的テーブルはメモリに収まる必要があります。

Python
from pyspark.sql.functions import broadcast, expr

# We assume the static table in the path `static_table_location` has a column 'lookupKey'.

query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.option("startingOffsets", "earliest")
.load()
.withColumn("joinKey", expr("CAST(value AS STRING)"))
.join(
broadcast(spark.read.format("parquet").load(static_table_location)),
expr("joinKey = lookupKey")
)
.selectExpr("value AS key", "value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)

Kinesis ソースから Kafka シンクへ

この例では、Kinesis ソースから読み取り、Kafka シンクに書き込みます。

Python
query = (
spark.readStream
.format("kinesis")
.option("region", region_name)
.option("awsAccessKey", aws_access_key_id)
.option("awsSecretKey", aws_secret_access_key)
.option("consumerMode", "efo")
.option("consumerName", consumer_name)
.load()
.selectExpr("partitionKey AS key", "CAST(data AS STRING) AS value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)

連合

この例では、2 つの異なるトピックからの 2 つのKafka DataFrames結合し、 Kafkaシンクに書き込みます。

Python
df1 = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic_1)
.load()
)

df2 = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic_2)
.load()
)

query = (
df1.union(df2)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)

ステートフルクエリの例

ステートフル クエリはレコード全体の状態情報を維持し、重複排除、集約、ウィンドウ化などの操作を可能にします。これらのクエリは、時間の経過や複数のイベントにわたって情報を追跡する必要があるユースケースに不可欠です。リアルタイム モードは、マイクロ バッチ モードと同じセマンティクスでステートフル操作をサポートしますが、レイテンシを低くするためにデータを継続的に処理します。ステートフル クエリは状態を維持および更新する必要があるため、ステートレス クエリよりも多くのメモリとコンピュート リソースを必要とします。

重複排除

この例では、 timestamp列とvalue列に基づいてレコードの重複を排除します。

Python
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic)
.load()
.dropDuplicates(["timestamp", "value"])
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)

集約

この例では、レコードをtimestampvalueでグループ化し、出現回数をカウントします。

Python
from pyspark.sql.functions import col

query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic)
.load()
.groupBy(col("timestamp"), col("value"))
.count()
.selectExpr("CAST(value AS STRING) AS key", "CAST(count AS STRING) AS value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)

集約を伴う結合

この例では、まず 2 つの異なるトピックからの 2 つのKafka DataFrames結合し、次に集計を実行します。 最終的には、Kafka シンクに書き込みます。

Python
from pyspark.sql.functions import col

df1 = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic_1)
.load()
)

df2 = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic_2)
.load()
)

query = (
df1.union(df2)
.groupBy(col("timestamp"), col("value"))
.count()
.selectExpr("CAST(value AS STRING) AS key", "CAST(count AS STRING) AS value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)

状態による変換

この例では、 transformWithStateを使用して、TTL (存続時間) でカスタム状態を維持します。プロセッサは、各キーに表示されるレコードの数をカウントします。

Python
from typing import Iterator, Tuple

from pyspark.sql import Row
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import LongType, StringType, TimestampType, StructField, StructType


class RTMStatefulProcessor(StatefulProcessor):
"""
This processor counts the number of records it has seen for each key using state variables
with TTLs. It redundantly maintains this count with a value, list, and map state to put load
on the state variable cleanup mechanism. (In practice, only one value state is needed to maintain
the count for a given grouping key.)

The input schema it expects is (String, Long) which represents a (key, source-timestamp) tuple.
The source-timestamp is passed through so that we can calculate end-to-end latency. The output
schema is (String, Long, Long), which represents a (key, count, source-timestamp) 3-tuple.
"""

def init(self, handle: StatefulProcessorHandle) -> None:
state_schema = StructType([StructField("value", LongType(), True)])
self.value_state = handle.getValueState("value", state_schema, 30000)
map_key_schema = StructType([StructField("key", LongType(), True)])
map_value_schema = StructType([StructField("value", StringType(), True)])
self.map_state = handle.getMapState("map", map_key_schema, map_value_schema, 30000)
list_schema = StructType([StructField("value", StringType(), True)])
self.list_state = handle.getListState("list", list_schema, 30000)

def handleInputRows(self, key, rows, timerValues) -> Iterator[Row]:
for row in rows:
# row is a tuple (key, source_timestamp)
key_str = row[0]
source_timestamp = row[1]
old_value = value.get()
if old_value is None:
old_value = 0
self.value_state.update((old_value + 1,))
self.map_state.update((old_value,), (key_str,))
self.list_state.appendValue((key_str,))
yield Row(key=key_str, value=old_value + 1, timestamp=source_timestamp)

def close(self) -> None:
pass


output_schema = StructType(
[
StructField("key", StringType(), True),
StructField("value", LongType(), True),
StructField("timestamp", TimestampType(), True),
]
)

query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
.groupBy("key")
.transformWithState(
statefulProcessor=RTMStatefulProcessor(),
outputStructType=output_schema,
outputMode="Update",
timeMode="processingTime",
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("Update")
.start()
)
注記

リアルタイム モードと構造化ストリーミングの他の実行モードでは、 transformWithStateStatefulProcessor実行する方法に違いがあります。「リアルタイム モードで transformWithState を使用する」を参照してください。

開発とテスト

インタラクティブ開発のためのディスプレイの使用

display関数を使用すると、リアルタイム ストリーミング データをノートブック内で直接視覚化できます。これは、外部シンクや本番運用インフラストラクチャをセットアップせずに、インタラクティブな開発、テスト、およびデバッグ モードのクエリに役立ちます。

realTimeトリガーを持つdisplay関数は、Databricks Runtime 17.1 以降で使用できます。Kafkaまたはカスタム シンクを使用して本番運用にデプロイする前に、開発中にdisplayを使用してクエリ ロジックとデータ変換を検証します。 displayでレート ソースを使用する完全な例については、 「リアルタイム モードの開始」を参照してください。

表示レートソース

この例では、レート ソースから読み取り、ストリーミング DataFrame をノートブックに表示します。

Python
inputDF = (
spark
.readStream
.format("rate")
.option("numPartitions", 2)
.option("rowsPerSecond", 1)
.load()
)
display(inputDF, realTime="5 minutes", outputMode="update")

カスタムシンクの例

組み込み構造化ストリーミング サポートを持たない宛先にストリーミング データを書き込む必要がある場合は、 foreachSinkを使用してカスタム書き込みロジックを実装します。 カスタム シンクを使用すると、データの書き込み方法を完全に制御できるため、任意のデータベース、API、またはストレージ システムと統合できます。次の例は、JDBC を使用して PostgreSQL データベースに書き込む方法を示しています。

foreachSink を使用して PostgreSQL に書き込む

Scala
import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.spark.sql.{ForeachWriter, Row}

/**
* Groups connection properties for
* the JDBC writers.
*
* @param url JDBC url of the form jdbc:subprotocol:subname to connect to
* @param dbtable database table that should be written into
* @param username username for authentication
* @param password password for authentication
*/
class JdbcWriterConfig(
val url: String,
val dbtable: String,
val username: String,
val password: String,
) extends Serializable

/**
* Handles streaming data writes to a database sink via JDBC, by:
* - connecting to the database
* - buffering incoming data rows in batches to reduce write overhead
*
* @param config connection parameters and configuration knobs for the writer
*/
class JdbcStreamingDataWriter(config: JdbcWriterConfig)
extends ForeachWriter[Row] with Serializable {
// The writer currently only supports this hard-coded schema
private val UPSERT_STATEMENT_SQL =
s"""MERGE INTO "${config.dbtable}"
|USING (
| SELECT
| CAST(? AS INTEGER) AS "id",
| CAST(? AS CHARACTER VARYING) AS "data"
|) AS "source"
|ON "test"."id" = "source"."id"
|WHEN MATCHED THEN
| UPDATE SET "data" = "source"."data"
|WHEN NOT MATCHED THEN
| INSERT ("id", "data") VALUES ("source"."id", "source"."data")
|""".stripMargin

private val MAX_BUFFER_SIZE = 3
private val buffer = new Array[Row](MAX_BUFFER_SIZE)
private var bufferSize = 0

private var connection: Connection = _

/**
* Flushes the [[buffer]] by writing all rows in the buffer to the database.
*/
private def flushBuffer(): Unit = {
require(connection != null)

if (bufferSize == 0) {
return
}

var upsertStatement: PreparedStatement = null

try {
upsertStatement = connection.prepareStatement(UPSERT_STATEMENT_SQL)

for (i <- 0 until bufferSize) {
val row = buffer(i)
upsertStatement.setInt(1, row.getAs[String]("key"))
upsertStatement.setString(2, row.getAs[String]("value"))
upsertStatement.addBatch()
}

upsertStatement.executeBatch()
connection.commit()

bufferSize = 0
} catch { case e: Exception =>
if (connection != null) {
connection.rollback()
}
throw e
} finally {
if (upsertStatement != null) {
upsertStatement.close()
}
}
}

override def open(partitionId: Long, epochId: Long): Boolean = {
connection = DriverManager.getConnection(config.url, config.username, config.password)
true
}

override def process(row: Row): Unit = {
buffer(bufferSize) = row
bufferSize += 1
if (bufferSize >= MAX_BUFFER_SIZE) {
flushBuffer()
}
}

override def close(errorOrNull: Throwable): Unit = {
flushBuffer()
if (connection != null) {
connection.close()
connection = null
}
}
}


spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", inputTopic)
.load()
.writeStream
.outputMode(OutputMode.Update())
.trigger(defaultTrigger)
.foreach(new JdbcStreamingDataWriter(new JdbcWriterConfig(jdbcUrl, tableName, jdbcUsername, jdbcPassword)))
.start()

次のステップ

これらの短期モードの例を確認したところで、知識を深め、本番運用に対応したストリーミング アプリケーションを構築するためのリソースを以下に示します。