メインコンテンツまでスキップ

Apache Kafka に接続する

このページでは、Databricks 上で構造化ストリーミングワークロードを実行する際に、Apache Kafka をソースまたはシンクとして使用する方法について説明します。

Kafkaの詳細については、 Apache Kafkaドキュメントを参照してください。

Kafkaからデータを読み取る

Kafka への接続を構成するには、kafka 形式を使用します。以下に、ストリーミング読み込みの例を挙げています。

Python
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
)

次の例に示すように、Databricks は Kafka からのバッチ読み込みもサポートしています。

Python
df = (spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
)

増分バッチ読み込みの場合、Databricks では Kafka をTrigger.AvailableNowと共に使用することをお勧めします。AvailableNow : 増分バッチ処理を参照してください。

Databricks Runtime 13.3 LTS 以降では、Databricks は Kafka データを読み取るための SQL 関数も提供します。SQLを使用したストリーミングは、 LakeFlow Spark宣言型パイプラインまたはDatabricks SQLのストリーミング テーブルでのみサポートされます。 read_kafkaテーブル値関数を参照してください。

Kafka構造化ストリーミングリーダーを構成する

バッチ クエリとストリーミング クエリの両方で、Kafka ソースのブートストラップ サーバーを次のオプションで設定する必要があります:

キー

Value

説明

kafka.bootstrap.servers

カンマ区切りのホストリスト

Kafka クラスターブートストラップサーバー

サブスクリプションのトピックを設定するには、次のいずれかのオプションを指定する必要があります。

オプション

Value

説明

subscribe

トピックのコンマ区切りリスト

サブスクライブするトピックのリストです。

subscribePattern

Javaの正規表現文字列

トピックをサブスクライブするのに使われるパターンです。

assign

JSON 文字列 {"topicA":[0,1],"topic":[2,4]}

消費するtopicPartitionsを指定します。

利用可能なオプションの完全なリストについては、「Kafka」を参照してください。

Kafka レコードのスキーマ

Kafka 構造化ストリーミング リーダーは次のスキーマの行を返します。

Type

key

binary

value

binary

topic

string

partition

int

offset

long

timestamp

long

timestampType

int

keyvalueは常にByteArrayDeserializerを使用してバイト配列として逆シリアル化されます。キーと値を明示的に逆シリアル化するには、DataFrame 操作 ( cast("string")from_avroなど) を使用します。

Kafkaにデータを書き込む

以下は、Kafka へのストリーミング書き込みの例です:

Python
(df.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.start()
)

Databricks は、次の例に示すように、Kafka データシンクへのバッチ書き込みセマンティクスもサポートしています。

Python
(df.write
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.save()
)

Kafka構造化ストリーミング ライターを構成する

重要

Databricks Runtime 13.3 LTS 以降には、デフォルトでべき等書き込みを有効にするkafka-clientsライブラリの新しいバージョンが含まれています。Kafka シンクがバージョン 2.8.0 以下を使用し、ACL が設定されているもののIDEMPOTENT_WRITEが有効になっていない場合、書き込みは失敗し、エラー メッセージorg.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error stateが表示されます。

このエラーを解決するには、 Kafkaバージョン 2.8.0 以降にアップグレードするか、構造化ストリーミング ライターの構成時に.option(“kafka.enable.idempotence”, “false”)を設定します。

以下は、Kafka への書き込み時の一般的なオプションです。

キー

Value

デフォルト値

説明

kafka.boostrap.servers

カンマで区切られたリスト <host:port>

なし

必須。Kafka の bootstrap.servers 構成です。

topic

STRING

設定されていません

オプション。書き込まれるすべての行のトピックを設定します。このオプションは、データに存在するすべてのトピック列よりも優先されます。

includeHeaders

BOOLEAN

false

オプション。Kafka ヘッダーを行に含めるかどうか。

利用可能なオプションの完全なリストについては、「Kafka シンク」を参照してください。

Kafkaライターのスキーマ

Kafka にデータを書き込む場合、提供される DataFrame には次のフィールドが含まれる場合があります。

列名

必須またはオプション

Type

key

オプション

STRING または BINARY

value

必須

STRING または BINARY

headers

オプション

ARRAY

topic

オプション(topic がライターオプションとして設定されている場合は無視されます)

STRING

partition

オプション

INT

認証

Databricks 、 Unity Catalog認証情報、SASL/ SSL 、 AWS MSK、 Azure Event Hubs、Google クラウドマネージドKafkaのクラウド固有のオプションなど、 Kafkaの複数の認証方法をサポートしています。 認証を参照してください。

Kafkaメトリクスの取得

ストリーミングクエリでKafkaに対する遅延を監視するには、avgOffsetsBehindLatestmaxOffsetsBehindLatestminOffsetsBehindLatestのメトリクスを使用します。これらのメトリクスは、Kafka の最新のオフセットを基準として、すべてのサブスクライブ済みトピックパーティションにおける平均、最大、最小のオフセット遅延を提供します。「メトリクスをインタラクティブに読み取る」を参照。

注記

Databricks Runtime 17.1以降では、各マイクロバッチの完了後に最新のKafkaオフセットが取得されます。データを継続的に受信するトピックでは、バックログ メトリクスに小さく永続的なゼロ以外の値が表示される場合があります。 これは想定される動作であり、ストリームが遅延していることを示すものではありません。

Databricks Runtime 17.0以前のバージョンでは、最新のKafkaオフセットはマイクロバッチの開始時に取得されます。ストリーミングクエリがマイクロバッチの開始時に利用可能なすべてのレコードを一貫して消費する場合、バックログメトリクスは0返す可能性があります。

クエリが読み取る残りのデータ量を推定するには、estimatedTotalBytesBehindLatestメトリクスを使用してください。このメトリクスは、過去 300 秒間に処理されたバッチに基づいて、すべてのサブスクライブ済みパーティションに残っているバイトの合計数を推定します。この推定で使用される時間枠は、bytesEstimateWindowLength オプションを設定することで変更できます。

たとえば、ウィンドウ長を10分に設定するには、次のようにします。

Python
df = (spark.readStream
.format("kafka")
.option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)

ノートブックでストリームを実行している場合、ストリーミングクエリの進行状況ダッシュボードの 生データ タブに次のメトリクスが表示されます。

JSON
{
"sources": [
{
"description": "KafkaV2[Subscribe[topic]]",
"metrics": {
"avgOffsetsBehindLatest": "4.0",
"maxOffsetsBehindLatest": "4",
"minOffsetsBehindLatest": "4",
"estimatedTotalBytesBehindLatest": "80.0"
}
}
]
}

詳細については、 Databricksのモニタリング構造化ストリーミング クエリを参照してください。

Kafka から Delta Lake への例

以下の例は、KafkaからDelta Lakeテーブルに継続的にデータをストリーミングするための完全なワークフローを示しています。このアプローチは、ほぼリアルタイムデータ取り込みワークロードに活用できます。

この例では、固定された JSON スキーマを使用します。Avro や Protobuf などの他の形式の場合は、 from_avroまたはfrom_protobufを使用します。スキーマ レジストリと統合することもできます。スキーマ レジストリの例を参照してください。

Python
from pyspark.sql.functions import from_json, col

# Define simple JSON schemas for key and value
key_schema = "user_id STRING"
value_schema = "event_type STRING, event_ts TIMESTAMP"

# Configure Kafka options with service credentials
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-server>:9092",
"subscribe": "<topic-name>",
"databricks.serviceCredential": "<service-credential-name>",
}

# Read from Kafka and parse JSON
parsed_df = (spark.readStream
.format("kafka")
.options(**kafka_options)
.load()
.select(
from_json(col("key").cast("string"), key_schema).alias("key"),
from_json(col("value").cast("string"), value_schema).alias("value")
)
.select("key.*", "value.*")
)

# Write to Delta table
query = (parsed_df.writeStream
.format("delta")
.option("checkpointLocation", "/path/to/checkpoint")
.trigger(processingTime="10 seconds")
.toTable("catalog.schema.events_table")
)

query.awaitTermination()