メインコンテンツまでスキップ

Apache Kafka に接続する

この記事では、Databricks 上で構造化ストリーミングワークロードを実行する際に、Apache Kafka をソースまたはシンクとして使用する方法について説明します。

Kafkaの詳細については、 Apache Kafkaドキュメントを参照してください。

Kafkaからデータを読み取る

Databricks は、Kafka への接続を構成するためのデータ形式としてkafkaキーワードを提供します。以下はストリーミング読み取りの例です。

Python
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
)

Databricks は、次の例に示すように、バッチ読み取りセマンティクスもサポートしています。

Python
df = (spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
)

増分バッチ読み込みの場合、Databricks では Kafka をTrigger.AvailableNowと共に使用することをお勧めします。AvailableNow : 増分バッチ処理を参照してください。

Databricks Runtime 13.3 LTS 以降では、Databricks は Kafka データを読み取るための SQL 関数も提供します。SQLを使用したストリーミングは、 LakeFlow Spark宣言型パイプラインまたはDatabricks SQLのストリーミング テーブルでのみサポートされます。 read_kafkaテーブル値関数を参照してください。

Kafka構造化ストリーミングリーダーを構成する

バッチ クエリとストリーミング クエリの両方において、Kafka ソースに次のオプションを設定する必要があります。

オプション

Value

説明

kafka.bootstrap.servers

カンマ区切りのホストリスト

Kafka クラスターブートストラップサーバー

さらに、サブスクライブするトピックを指定するには、次のいずれかのオプションが必要です。

オプション

Value

説明

subscribe

トピックのコンマ区切りリスト

サブスクライブするトピックのリストです。

subscribePattern

Javaの正規表現文字列

トピックをサブスクライブするのに使われるパターンです。

assign

JSON 文字列 {"topicA":[0,1],"topic":[2,4]}

コンシュームする特定の topicPartitions です。

利用可能なオプションの完全なリストについては、オプションページを参照してください。

Kafkaレコードのスキーマ

Kafka 構造化ストリーミング リーダーによって返されるレコードのスキーマは次のようになります。

Type

key

binary

value

binary

topic

string

partition

int

offset

long

timestamp

long

timestampType

int

keyvalueは常にByteArrayDeserializerを使用してバイト配列として逆シリアル化されます。キーと値を明示的に逆シリアル化するには、DataFrame 操作 ( cast("string")from_avroなど) を使用します。

Kafkaにデータを書き込む

以下は、Kafka へのストリーミング書き込みの例です:

Python
(df.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.start()
)

Databricks は、次の例に示すように、Kafka データシンクへのバッチ書き込みセマンティクスもサポートしています。

Python
(df.write
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.save()
)

Kafka構造化ストリーミング ライターを構成する

重要

Databricks Runtime 13.3 LTS 以降には、デフォルトでべき等書き込みを有効にするkafka-clientsライブラリの新しいバージョンが含まれています。Kafka シンクがバージョン 2.8.0 以下を使用し、ACL が設定されているもののIDEMPOTENT_WRITEが有効になっていない場合、書き込みは失敗し、エラー メッセージorg.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error stateが表示されます。

このエラーを解決するには、 Kafkaバージョン 2.8.0 以降にアップグレードするか、構造化ストリーミング ライターの構成時に.option(“kafka.enable.idempotence”, “false”)を設定します。

以下は、Kafka への書き込み時に設定される一般的なオプションです。

オプション

Value

デフォルト値

説明

kafka.boostrap.servers

カンマで区切られたリスト <host:port>

なし

[必須] Kafka の bootstrap.servers 構成です。

topic

STRING

設定されていません

[オプション] 書き込まれるすべての行のトピックを設定します。このオプションは、データに存在するすべてのトピック列よりも優先されます。

includeHeaders

BOOLEAN

false

[オプション] Kafka ヘッダーを行に含めるかどうか。

利用可能なオプションの完全なリストについては、オプションページを参照してください。

Kafkaライターのスキーマ

Kafka にデータを書き込む場合、提供される DataFrame には次のフィールドが含まれる場合があります。

列名

必須またはオプション

Type

key

オプション

STRING または BINARY

value

必須

STRING または BINARY

headers

オプション

ARRAY

topic

オプション(topic がライターオプションとして設定されている場合は無視されます)

STRING

partition

オプション

INT

認証

Databricks 、 Unity Catalog認証情報、SASL/ SSL 、 AWS MSK、 Azure Event Hubs、Google クラウドマネージドKafkaのクラウド固有のオプションなど、 Kafkaの複数の認証方法をサポートしています。 認証を参照してください。

Kafkaメトリクスの取得

avgOffsetsBehindLatestmaxOffsetsBehindLatest 、およびminOffsetsBehindLatestメトリクスを使用して、ストリーミング クエリがKafkaに比べてどの程度遅れているかを監視できます。 これらは、Kafka の最新のオフセットを基準として、サブスクライブされているすべてのトピック パーティション全体の平均、最大、最小のオフセット ラグを報告します。「メトリクスを対話的に読む」を参照してください。

クエリがまだ消費していないデータ量を推定するには、 estimatedTotalBytesBehindLatestメトリクスを使用します。 このメトリクスは、過去 300 秒間に処理されたバッチに基づいて、サブスクライブされたすべてのパーティションにわたって残っている合計バイト数を推定します。 bytesEstimateWindowLengthオプションを設定することで、この見積りに使用される時間枠を変更できます。たとえば、10 分に設定するには、次のようにします。

Python
df = (spark.readStream
.format("kafka")
.option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)

ノートブックでストリームを実行している場合、ストリーミングクエリの進行状況ダッシュボードの 生データ タブに次のメトリクスが表示されます。

JSON
{
"sources": [
{
"description": "KafkaV2[Subscribe[topic]]",
"metrics": {
"avgOffsetsBehindLatest": "4.0",
"maxOffsetsBehindLatest": "4",
"minOffsetsBehindLatest": "4",
"estimatedTotalBytesBehindLatest": "80.0"
}
}
]
}

詳細については、 Databricksのモニタリング構造化ストリーミング クエリを参照してください。

コード例: Kafka から Delta へ

次の例は、Kafka から Delta テーブルにデータを継続的にストリーミングするための完全なワークフローを示しています。このパターンは、急速なデータ取り込みワークロードに最適です。

この例では、固定された JSON スキーマを使用します。Avro や Protobuf などの他の形式の場合は、 from_avroまたはfrom_protobufを使用します。スキーマ レジストリと統合することもできます。スキーマ レジストリの例を参照してください。

Python
from pyspark.sql.functions import from_json, col

# Define simple JSON schemas for key and value
key_schema = "user_id STRING"
value_schema = "event_type STRING, event_ts TIMESTAMP"

# Configure Kafka options with service credentials
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-server>:9092",
"subscribe": "<topic-name>",
"databricks.serviceCredential": "<service-credential-name>",
}

# Read from Kafka and parse JSON
parsed_df = (spark.readStream
.format("kafka")
.options(**kafka_options)
.load()
.select(
from_json(col("key").cast("string"), key_schema).alias("key"),
from_json(col("value").cast("string"), value_schema).alias("value")
)
.select("key.*", "value.*")
)

# Write to Delta table
query = (parsed_df.writeStream
.format("delta")
.option("checkpointLocation", "/path/to/checkpoint")
.trigger(processingTime="10 seconds")
.toTable("catalog.schema.events_table")
)

query.awaitTermination()