Apache Kafka に接続する
この記事では、Databricks 上で構造化ストリーミングワークロードを実行する際に、Apache Kafka をソースまたはシンクとして使用する方法について説明します。
Kafkaの詳細については、 Apache Kafkaドキュメントを参照してください。
Kafkaからデータを読み取る
Databricks は、Kafka への接続を構成するためのデータ形式としてkafkaキーワードを提供します。以下はストリーミング読み取りの例です。
- Python
- Scala
- SQL
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
)
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
CREATE OR REFRESH STREAMING TABLE <table_name> AS
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<server:ip>',
subscribe => '<topic>'
);
Databricks は、次の例に示すように、バッチ読み取りセマンティクスもサポートしています。
- Python
- Scala
- SQL
df = (spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
)
val df = spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
SELECT * FROM read_kafka(
bootstrapServers => '<server:ip>',
subscribe => '<topic>',
startingOffsets => 'earliest',
endingOffsets => 'latest'
);
増分バッチ読み込みの場合、Databricks では Kafka をTrigger.AvailableNowと共に使用することをお勧めします。AvailableNow : 増分バッチ処理を参照してください。
Databricks Runtime 13.3 LTS 以降では、Databricks は Kafka データを読み取るための SQL 関数も提供します。SQLを使用したストリーミングは、 LakeFlow Spark宣言型パイプラインまたはDatabricks SQLのストリーミング テーブルでのみサポートされます。 read_kafkaテーブル値関数を参照してください。
Kafka構造化ストリーミングリーダーを構成する
バッチ クエリとストリーミング クエリの両方において、Kafka ソースに次のオプションを設定する必要があります。
オプション | Value | 説明 |
|---|---|---|
| カンマ区切りのホストリスト | Kafka クラスターブートストラップサーバー |
さらに、サブスクライブするトピックを指定するには、次のいずれかのオプションが必要です。
オプション | Value | 説明 |
|---|---|---|
| トピックのコンマ区切りリスト | サブスクライブするトピックのリストです。 |
| Javaの正規表現文字列 | トピックをサブスクライブするのに使われるパターンです。 |
| JSON 文字列 | コンシュームする特定の topicPartitions です。 |
利用可能なオプションの完全なリストについては、オプションページを参照してください。
Kafkaレコードのスキーマ
Kafka 構造化ストリーミング リーダーによって返されるレコードのスキーマは次のようになります。
列 | Type |
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
keyとvalueは常にByteArrayDeserializerを使用してバイト配列として逆シリアル化されます。キーと値を明示的に逆シリアル化するには、DataFrame 操作 ( cast("string")やfrom_avroなど) を使用します。
Kafkaにデータを書き込む
以下は、Kafka へのストリーミング書き込みの例です:
- Python
- Scala
(df.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.start()
)
df.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.start()
Databricks は、次の例に示すように、Kafka データシンクへのバッチ書き込みセマンティクスもサポートしています。
- Python
- Scala
(df.write
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.save()
)
df.write
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.save()
Kafka構造化ストリーミング ライターを構成する
Databricks Runtime 13.3 LTS 以降には、デフォルトでべき等書き込みを有効にするkafka-clientsライブラリの新しいバージョンが含まれています。Kafka シンクがバージョン 2.8.0 以下を使用し、ACL が設定されているもののIDEMPOTENT_WRITEが有効になっていない場合、書き込みは失敗し、エラー メッセージorg.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error stateが表示されます。
このエラーを解決するには、 Kafkaバージョン 2.8.0 以降にアップグレードするか、構造化ストリーミング ライターの構成時に.option(“kafka.enable.idempotence”, “false”)を設定します。
以下は、Kafka への書き込み時に設定される一般的なオプションです。
オプション | Value | デフォルト値 | 説明 |
|---|---|---|---|
| カンマで区切られたリスト | なし | [必須] Kafka の |
|
| 設定されていません | [オプション] 書き込まれるすべての行のトピックを設定します。このオプションは、データに存在するすべてのトピック列よりも優先されます。 |
|
|
| [オプション] Kafka ヘッダーを行に含めるかどうか。 |
利用可能なオプションの完全なリストについては、オプションページを参照してください。
Kafkaライターのスキーマ
Kafka にデータを書き込む場合、提供される DataFrame には次のフィールドが含まれる場合があります。
列名 | 必須またはオプション | Type |
|---|---|---|
| オプション |
|
| 必須 |
|
| オプション |
|
| オプション( |
|
| オプション |
|
認証
Databricks 、 Unity Catalog認証情報、SASL/ SSL 、 AWS MSK、 Azure Event Hubs、Google クラウドマネージドKafkaのクラウド固有のオプションなど、 Kafkaの複数の認証方法をサポートしています。 認証を参照してください。
Kafkaメトリクスの取得
avgOffsetsBehindLatest 、 maxOffsetsBehindLatest 、およびminOffsetsBehindLatestメトリクスを使用して、ストリーミング クエリがKafkaに比べてどの程度遅れているかを監視できます。 これらは、Kafka の最新のオフセットを基準として、サブスクライブされているすべてのトピック パーティション全体の平均、最大、最小のオフセット ラグを報告します。「メトリクスを対話的に読む」を参照してください。
クエリがまだ消費していないデータ量を推定するには、 estimatedTotalBytesBehindLatestメトリクスを使用します。 このメトリクスは、過去 300 秒間に処理されたバッチに基づいて、サブスクライブされたすべてのパーティションにわたって残っている合計バイト数を推定します。 bytesEstimateWindowLengthオプションを設定することで、この見積りに使用される時間枠を変更できます。たとえば、10 分に設定するには、次のようにします。
- Python
- Scala
df = (spark.readStream
.format("kafka")
.option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)
val df = spark.readStream
.format("kafka")
.option("bytesEstimateWindowLength", "10m") // m for minutes, you can also use "600s" for 600 seconds
ノートブックでストリームを実行している場合、ストリーミングクエリの進行状況ダッシュボードの 生データ タブに次のメトリクスが表示されます。
{
"sources": [
{
"description": "KafkaV2[Subscribe[topic]]",
"metrics": {
"avgOffsetsBehindLatest": "4.0",
"maxOffsetsBehindLatest": "4",
"minOffsetsBehindLatest": "4",
"estimatedTotalBytesBehindLatest": "80.0"
}
}
]
}
詳細については、 Databricksのモニタリング構造化ストリーミング クエリを参照してください。
コード例: Kafka から Delta へ
次の例は、Kafka から Delta テーブルにデータを継続的にストリーミングするための完全なワークフローを示しています。このパターンは、急速なデータ取り込みワークロードに最適です。
この例では、固定された JSON スキーマを使用します。Avro や Protobuf などの他の形式の場合は、 from_avroまたはfrom_protobufを使用します。スキーマ レジストリと統合することもできます。スキーマ レジストリの例を参照してください。
- Python
- Scala
- SQL
from pyspark.sql.functions import from_json, col
# Define simple JSON schemas for key and value
key_schema = "user_id STRING"
value_schema = "event_type STRING, event_ts TIMESTAMP"
# Configure Kafka options with service credentials
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-server>:9092",
"subscribe": "<topic-name>",
"databricks.serviceCredential": "<service-credential-name>",
}
# Read from Kafka and parse JSON
parsed_df = (spark.readStream
.format("kafka")
.options(**kafka_options)
.load()
.select(
from_json(col("key").cast("string"), key_schema).alias("key"),
from_json(col("value").cast("string"), value_schema).alias("value")
)
.select("key.*", "value.*")
)
# Write to Delta table
query = (parsed_df.writeStream
.format("delta")
.option("checkpointLocation", "/path/to/checkpoint")
.trigger(processingTime="10 seconds")
.toTable("catalog.schema.events_table")
)
query.awaitTermination()
import org.apache.spark.sql.functions.{from_json, col}
import org.apache.spark.sql.streaming.Trigger
// Define JSON schemas for key and value
val keySchema = "user_id STRING"
val valueSchema = "event_type STRING, event_ts TIMESTAMP"
// Configure Kafka options with service credentials
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> "<bootstrap-server>:9092",
"subscribe" -> "<topic-name>",
"databricks.serviceCredential" -> "<service-credential-name>"
)
// Read from Kafka and parse JSON
val parsedDF = spark.readStream
.format("kafka")
.options(kafkaOptions)
.load()
.select(
from_json(col("key").cast("string"), keySchema).alias("key"),
from_json(col("value").cast("string"), valueSchema).alias("value")
)
.select("key.*", "value.*")
// Write to Delta table
val query = parsedDF.writeStream
.format("delta")
.option("checkpointLocation", "/path/to/checkpoint")
.trigger(Trigger.ProcessingTime("10 seconds"))
.toTable("catalog.schema.events_table")
query.awaitTermination()
-- Create a streaming table from Kafka using read_kafka
CREATE OR REFRESH STREAMING TABLE catalog.schema.events_table AS
SELECT
key::string:user_id AS user_id,
value::string:event_type AS event_type,
to_timestamp(value::string:event_ts) AS event_ts
FROM STREAM read_kafka(
bootstrapServers => '<bootstrap-server>:9092',
subscribe => '<topic-name>',
serviceCredential => '<service-credential-name>'
);