メインコンテンツまでスキップ

リアルタイムモードの例

このページでは、構造化ストリーミングにおけるリアルタイムモードクエリの動作コード例を紹介します。単純なステートレス変換から、カスタム状態管理を伴う複雑なステートフル処理まで、幅広い例を取り上げています。概念については「構造化ストリーミング」の「進行モード」を参照してください。実践的なチュートリアルについては「チュートリアル: 連続ストリーミング ワークロードを実行」を参照してください。

前提条件

このページの例を実行するには、以下が必要です。

注記

例では、 broker_addressinput_topiccheckpoint_locationのようなプレースホルダー値を使用しています。コードを実行する前に、これらの値を実際の構成値に置き換えてください。

ステートレスクエリの例

ステートレスクエリは、レコード間で状態を保持することなく、各レコードを独立して処理します。これらのクエリは、状態ストレージの管理やルックアップの実行を必要としないため、通常、ステートフルクエリよりもシンプルでレイテンシが低くなります。変換、フィルタリング、静的データとの結合、ルーティング操作には、ステートレスクエリを使用してください。

KafkaソースからKafkaシンクへ

この例では、 Kafkaソースから読み込み、 Kafkaシンクに書き込みます。

Python
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic)
.load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)

Repartition

この例では、 Kafkaソースからデータを読み込み、データを20個のパーティションに再分割し、 Kafkaシンクに書き込みます。

現在の実装上の制限により、再パーティションを使用する前に、Spark 構成spark.sql.execution.sortBeforeRepartition falseに設定する必要があります。

Python
# Sorting is not supported in repartition with real-time mode, so you must set this to false to achieve low latency.
spark.conf.set("spark.sql.execution.sortBeforeRepartition", "false")

query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.option("startingOffsets", "earliest")
.load()
.repartition(20)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)

ストリームスナップショット参加(ブロードキャストのみ)

この例では、Kafkaからデータを読み込み、静的テーブルとデータを結合し、Kafkaシンクに書き込みます。静的テーブルをブロードキャストするストリーム静的結合のみがサポートされており、つまり静的テーブルはメモリ内に収まる必要があります。

Python
from pyspark.sql.functions import broadcast, expr

# We assume the static table in the path `static_table_location` has a column 'lookupKey'.

query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.option("startingOffsets", "earliest")
.load()
.withColumn("joinKey", expr("CAST(value AS STRING)"))
.join(
broadcast(spark.read.format("parquet").load(static_table_location)),
expr("joinKey = lookupKey")
)
.selectExpr("value AS key", "value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)

KinesisソースからKafkaシンクへ

この例では、 Kinesisソースから読み込み、 Kafkaシンクに書き込みます。

Python
query = (
spark.readStream
.format("kinesis")
.option("region", region_name)
.option("awsAccessKey", aws_access_key_id)
.option("awsSecretKey", aws_secret_access_key)
.option("consumerMode", "efo")
.option("consumerName", consumer_name)
.load()
.selectExpr("partitionKey AS key", "CAST(data AS STRING) AS value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)

Union

この例では、2つの異なるトピックから取得した2つのKafka DataFrames結合し、 Kafkaシンクに書き込みます。

Python
df1 = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic_1)
.load()
)

df2 = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic_2)
.load()
)

query = (
df1.union(df2)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)

ステートフルクエリの例

ステートフルクエリはレコード全体にわたって状態情報を保持し、重複排除、集計、ウィンドウ処理などの操作を可能にします。これらのクエリは、時間の経過に伴う情報追跡や複数のイベントにわたる情報追跡を必要とするユースケースにとって不可欠です。リアルタイムモードは、マイクロバッチモードと同じセマンティクスでステートフルな操作をサポートしますが、レイテンシを低減するためにデータを継続的に処理します。ステートフル クエリは状態を維持および更新する必要があるため、ステートレス クエリよりも多くのメモリとコンピュート リソースを必要とします。

重複排除

この例では、 timestamp列とvalue列に基づいてレコードの重複を削除します。

Python
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic)
.load()
.dropDuplicates(["timestamp", "value"])
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)

集約

この例では、レコードをtimestampvalueでグループ化し、出現回数をカウントします。

Python
from pyspark.sql.functions import col

query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic)
.load()
.groupBy(col("timestamp"), col("value"))
.count()
.selectExpr("CAST(value AS STRING) AS key", "CAST(count AS STRING) AS value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)

集約との結合

この例では、まず2つの異なるトピックからの2つのKafka DataFrames結合し、次に集計を実行します。 最終的には、Kafkaシンクに書き込みます。

Python
from pyspark.sql.functions import col

df1 = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic_1)
.load()
)

df2 = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic_2)
.load()
)

query = (
df1.union(df2)
.groupBy(col("timestamp"), col("value"))
.count()
.selectExpr("CAST(value AS STRING) AS key", "CAST(count AS STRING) AS value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)

transformWithState

この例では、 transformWithStateを使用して TTL (有効期限) 付きのカスタム状態を維持します。プロセッサは、各キーに対して検出されたレコードの数をカウントします。

Python
from typing import Iterator, Tuple

from pyspark.sql import Row
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import LongType, StringType, TimestampType, StructField, StructType


class RTMStatefulProcessor(StatefulProcessor):
"""
This processor counts the number of records it has seen for each key using state variables
with TTLs. It redundantly maintains this count with a value, list, and map state to put load
on the state variable cleanup mechanism. (In practice, only one value state is needed to maintain
the count for a given grouping key.)

The input schema it expects is (String, Long) which represents a (key, source-timestamp) tuple.
The source-timestamp is passed through so that we can calculate end-to-end latency. The output
schema is (String, Long, Long), which represents a (key, count, source-timestamp) 3-tuple.
"""

def init(self, handle: StatefulProcessorHandle) -> None:
state_schema = StructType([StructField("value", LongType(), True)])
self.value_state = handle.getValueState("value", state_schema, 30000)
map_key_schema = StructType([StructField("key", LongType(), True)])
map_value_schema = StructType([StructField("value", StringType(), True)])
self.map_state = handle.getMapState("map", map_key_schema, map_value_schema, 30000)
list_schema = StructType([StructField("value", StringType(), True)])
self.list_state = handle.getListState("list", list_schema, 30000)

def handleInputRows(self, key, rows, timerValues) -> Iterator[Row]:
for row in rows:
# row is a tuple (key, source_timestamp)
key_str = row[0]
source_timestamp = row[1]
old_value = value.get()
if old_value is None:
old_value = 0
self.value_state.update((old_value + 1,))
self.map_state.update((old_value,), (key_str,))
self.list_state.appendValue((key_str,))
yield Row(key=key_str, value=old_value + 1, timestamp=source_timestamp)

def close(self) -> None:
pass


output_schema = StructType(
[
StructField("key", StringType(), True),
StructField("value", LongType(), True),
StructField("timestamp", TimestampType(), True),
]
)

query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
.groupBy("key")
.transformWithState(
statefulProcessor=RTMStatefulProcessor(),
outputStructType=output_schema,
outputMode="Update",
timeMode="processingTime",
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("Update")
.start()
)
注記

構造化ストリーミングにおけるリアルタイムモードとその他の実行モードでは、 transformWithStateStatefulProcessorの実行方法に違いがあります。リアルタイムモードでtransformWithState参照してください。

開発とテスト

display関数を使用すると、ポータル ストリーミング データをノートブックで直接視覚化し、 Kafkaまたはカスタム シンクを使用して本番運用にデプロイする前にクエリ ロジックとデータ変換を検証できます。 これは、外部シンクや本番運用インフラストラクチャをセットアップせずに、インタラクティブな開発、テスト、およびデバッグ モードのクエリに役立ちます。

display関数とrealTimeトリガーは、Databricks Runtime 17.1以降で利用可能です。レートソースをdisplayとともに使用する完全な例については、 「チュートリアル: リアルタイムストリーミングワークロードを実行する」を参照してください。

表示レートソース

この例では、レート ソースから読み取り、ストリーミングDataFrameをノートブックに表示します。

Python
inputDF = (
spark
.readStream
.format("rate")
.option("numPartitions", 2)
.option("rowsPerSecond", 1)
.load()
)
display(inputDF, realTime="5 minutes", outputMode="update")

カスタムシンクの例

組み込みの構造化ストリーミングをサポートしていない宛先にストリーミングデータを書き込む必要がある場合は、 foreachSinkを使用してカスタム書き込みロジックを実装してください。カスタムシンクを使用すると、データの書き込み方法を完全に制御できるため、あらゆるデータベース、API、またはストレージシステムと統合できます。以下の例は、JDBCを使用してPostgreSQLデータベースに書き込む方法を示しています。

foreachSinkを使用してPostgreSQLに書き込みます。

Scala
import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.spark.sql.{ForeachWriter, Row}

/**
* Groups connection properties for
* the JDBC writers.
*
* @param url JDBC url of the form jdbc:subprotocol:subname to connect to
* @param dbtable database table that should be written into
* @param username username for authentication
* @param password password for authentication
*/
class JdbcWriterConfig(
val url: String,
val dbtable: String,
val username: String,
val password: String,
) extends Serializable

/**
* Handles streaming data writes to a database sink via JDBC, by:
* - connecting to the database
* - buffering incoming data rows in batches to reduce write overhead
*
* @param config connection parameters and configuration knobs for the writer
*/
class JdbcStreamingDataWriter(config: JdbcWriterConfig)
extends ForeachWriter[Row] with Serializable {
// The writer currently only supports this hard-coded schema
private val UPSERT_STATEMENT_SQL =
s"""MERGE INTO "${config.dbtable}"
|USING (
| SELECT
| CAST(? AS INTEGER) AS "id",
| CAST(? AS CHARACTER VARYING) AS "data"
|) AS "source"
|ON "test"."id" = "source"."id"
|WHEN MATCHED THEN
| UPDATE SET "data" = "source"."data"
|WHEN NOT MATCHED THEN
| INSERT ("id", "data") VALUES ("source"."id", "source"."data")
|""".stripMargin

private val MAX_BUFFER_SIZE = 3
private val buffer = new Array[Row](MAX_BUFFER_SIZE)
private var bufferSize = 0

private var connection: Connection = _

/**
* Flushes the [[buffer]] by writing all rows in the buffer to the database.
*/
private def flushBuffer(): Unit = {
require(connection != null)

if (bufferSize == 0) {
return
}

var upsertStatement: PreparedStatement = null

try {
upsertStatement = connection.prepareStatement(UPSERT_STATEMENT_SQL)

for (i <- 0 until bufferSize) {
val row = buffer(i)
upsertStatement.setInt(1, row.getAs[String]("key"))
upsertStatement.setString(2, row.getAs[String]("value"))
upsertStatement.addBatch()
}

upsertStatement.executeBatch()
connection.commit()

bufferSize = 0
} catch { case e: Exception =>
if (connection != null) {
connection.rollback()
}
throw e
} finally {
if (upsertStatement != null) {
upsertStatement.close()
}
}
}

override def open(partitionId: Long, epochId: Long): Boolean = {
connection = DriverManager.getConnection(config.url, config.username, config.password)
true
}

override def process(row: Row): Unit = {
buffer(bufferSize) = row
bufferSize += 1
if (bufferSize >= MAX_BUFFER_SIZE) {
flushBuffer()
}
}

override def close(errorOrNull: Throwable): Unit = {
flushBuffer()
if (connection != null) {
connection.close()
connection = null
}
}
}


spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", inputTopic)
.load()
.writeStream
.outputMode(OutputMode.Update())
.trigger(defaultTrigger)
.foreach(new JdbcStreamingDataWriter(new JdbcWriterConfig(jdbcUrl, tableName, jdbcUsername, jdbcPassword)))
.start()

その他のリソース

これらのリアルタイムモードの例を確認したところで、知識を深め、本番運用に対応したストリーミング アプリケーションを構築するためのリソースを以下に示します。