メインコンテンツまでスキップ

オプション

このページでは、Databricks 上の構造化ストリーミングを使用して Apache Kafka の読み取りと書き込みを行うための構成オプションについて説明します。

Databricks Kafka コネクタは、Apache Spark Kafka コネクタ上に構築されており、すべての標準 Kafka 構成オプションをサポートしています。kafka.で始まるオプションはすべて、基盤となる Kafka クライアントに直接渡されます。たとえば、 .option("kafka.max.poll.records", "500") Kafka コンシューマーのmax.poll.recordsプロパティを設定します。利用可能な Kafka プロパティの完全なリストについては、 Kafka 構成ドキュメントを参照してください。

このページに記載されていない追加の構造化ストリーミング ソースおよびシンク オプションについては、 「構造化ストリーミング + Kafka統合ガイド」を参照してください。

必須オプション

読み取りと書き込みの両方に次のオプションが必要です。

オプション

Value

説明

kafka.bootstrap.servers

カンマ区切りのホストリスト

Kafka bootstrap.servers構成。Kafka からのデータがない場合、まずブローカーのアドレス リストを確認してください。ブローカーのアドレス リストが正しくない場合は、エラーが発生しない可能性があります。これは、Kafka クライアントがブローカーが最終的に利用可能になると想定し、ネットワーク エラーが発生した場合に永久に再試行するためです。

Kafka から読み取る場合は、消費するトピックを識別するために、次のいずれかのオプションも指定する必要があります。

オプション

Value

説明

subscribe

カンマ区切りのトピックリスト

サブスクライブするトピックのリストです。

subscribePattern

Java正規表現文字列

トピックをサブスクライブするのに使われるパターンです。

assign

JSON文字列 {"topicA":[0,1],"topicB":[2,4]}

コンシュームする特定の topicPartitions です。

Kafka に書き込むときに、必要に応じてtopicオプションを設定して、すべての行の宛先トピックを指定できます。設定されていない場合、DataFrame にはtopic列が含まれている必要があります。

一般的なリーダーオプション

Kafka から読み取るときによく使用されるオプションは次のとおりです。

オプション

Value

デフォルト

説明

minPartitions

INT

なし

Kafka から読み取るパーティションの最小数。通常、Spark は Kafka トピック パーティションごとに 1 つのパーティションを作成します。この値を高く設定すると、大きな Kafka パーティションが小さな Spark パーティションに分割され、並列処理が向上します。データの偏りやピーク負荷の処理に役立ちます。注意: これを有効にすると、トリガーごとに Kafka コンシューマーが再初期化され、SSL 使用時のパフォーマンスに影響する可能性があります。

maxRecordsPerPartition

LONG

なし

Spark パーティションあたりのレコードの最大数。設定すると、Spark は Kafka パーティションを分割し、各 Spark パーティションに最大でこの数のレコードが含まれるようになります。minPartitionsと一緒に使用できます。両方が設定されている場合、Spark はパーティションが多くなる方を使用します。

failOnDataLoss

BOOLEAN

true

データが失われた可能性がある場合にクエリを失敗させるかどうか。トピックの削除、処理前のトピックの切り捨てなど、さまざまなシナリオにより、クエリは Kafka からのデータの読み取りに永続的に失敗する可能性があります。データが失われた可能性があるかどうかを控えめに推定します。場合によっては、誤報が発生する可能性があります。期待どおりに動作しない場合、またはデータが失われてもクエリの処理を続行する場合は、このオプションをfalseに設定します。

maxOffsetsPerTrigger

LONG

なし

[ストリーミングのみ] トリガー間隔ごとに処理されるオフセットの最大数のレート制限。オフセットの合計数はトピック パーティション間で比例的に分割されます。 より高度なフロー制御を行うには、 minOffsetsPerTrigger (トリガー前の最小オフセット) とmaxTriggerDelay (最大待機時間、デフォルトは15m ) を使用することもできます。詳細については、 Spark Kafka 統合ガイドを参照してください。

startingOffsets

earliestlatest 、またはJSON文字列

latest

クエリの開始時に読み取りを開始する場所を決定します。最も古い利用可能なオフセットから読み取るにはearliest使用し、ストリームの開始後に新しいデータのみを読み取るにはlatest使用し、各トピック パーティションの開始オフセットを指定するには JSON 文字列を使用します (例: {"topicA":{"0":23,"1":-2},"topicB":{"0":-2}} )。JSON では、 -2最も古いもの、 -1最新のものを指します。 ストリーミング クエリの場合、これは新しいクエリが開始されたときにのみ適用され、再開すると常にクエリが中断された場所から再開されます。新しく検出されたパーティションはearliestから始まります。 注: バッチ クエリの場合、 latest (暗黙的に、または JSON で-1を使用して) は許可されません。代わりに特定のタイムスタンプから開始するには、 startingTimestampまたはstartingOffsetsByTimestampを使用します。

endingOffsets

latest またはJSON文字列

latest

[バッチのみ] バッチクエリが終了するときのエンドポイント。 最新のオフセットまで読み取るにはlatest使用し、トピック パーティションごとに終了オフセットを指定するには JSON 文字列を使用します (例: {"topicA":{"0":50,"1":-1},"topicB":{"0":-1}} )。JSON では、 -1最新を参照します。 -2 (最も古い) は許可されません。代わりに特定のタイムスタンプで終了するには、 endingTimestampまたはendingOffsetsByTimestampを使用します。

groupIdPrefix

STRING

spark-kafka-source (ストリーミング)またはspark-kafka-relation (バッチ)

自動生成されたコンシューマー グループ ID のプレフィックス。コネクタはクエリごとに一意のgroup.id自動的に生成します。このオプションは、生成された ID のプレフィックスをカスタマイズします。kafka.group.idが設定されている場合は無視されます。

kafka.group.id

STRING

なし

Kafka からの読み取り中に使用するグループ ID。注意して使用してください。デフォルトでは、各クエリはデータを読み取るための一意のグループ ID を生成します。これにより、各クエリには、他のコンシューマーからの干渉を受けない独自のコンシューマー グループが存在するようになり、サブスクライブされているトピックのすべてのパーティションを読み取ることができるようになります。一部のシナリオ (Kafka グループベースの承認など) では、特定の承認済みグループ ID を使用してデータを読み取る必要がある場合があります。オプションでグループ ID を設定できます。ただし、予期しない動作が発生する可能性があるため、細心の注意を払って実行してください。 - 同じグループ ID を持つクエリ (バッチとストリーミングの両方) を同時に実行すると、相互に干渉し、各クエリがデータの一部のみを読み取る可能性があります。 - クエリが立て続けに開始/再開された場合にも、この問題が発生する可能性があります。このような問題を最小限に抑えるには、Kafka コンシューマー構成session.timeout.msを非常に小さく設定します。

includeHeaders

BOOLEAN

false

出力に Kafka メッセージ ヘッダーを含めるかどうか。

bytesEstimateWindowLength

STRING

300s

[ストリーミングのみ] estimatedTotalBytesBehindLatestメトリクスを介して残りのバイトを推定するために使用される時間枠。 10m (10 分) や600s (600 秒) のような期間文字列を受け入れます。Kafka取得」を参照してください。

一般的なライターオプション

Kafka に書き込むときによく使用されるオプションは次のとおりです。

オプション

Value

デフォルト

説明

topic

STRING

なし

すべての行のトピックを設定します。これにより、データ内のtopic列が上書きされます。

includeHeaders

BOOLEAN

false

行に Kafka ヘッダーを含めるかどうか。

重要

Databricks Runtime 13.3 LTS 以降には、デフォルトでべき等書き込みを有効にするkafka-clientsライブラリの新しいバージョンが含まれています。Kafka シンクがバージョン 2.8.0 以下を使用し、ACL が設定されているもののIDEMPOTENT_WRITEが有効になっていない場合、書き込みは失敗します。この問題を解決するには、Kafka 2.8.0 以上にアップグレードするか、 .option("kafka.enable.idempotence", "false")を設定します。

認証オプション

Databricks 、 Unity Catalog認証情報、SASL/ SSL 、 AWS MSK、 Azure Event Hubs、Google クラウドマネージドKafkaのクラウド固有のオプションなど、 Kafkaの複数の認証方法をサポートしています。

Databricks 、クラウド管理のKafkaへの認証にUnity Catalog資格情報を使用することをお勧めします。

オプション

Value

説明

databricks.serviceCredential

STRING

クラウド管理型Kafkaサービス ( AWS MSK、 Azure Event Hubs、または Google クラウド管理型Kafka ) に対する認証のためのUnity Catalogサービス 認証情報の名前。 Databricks Runtime 16.1 以降で利用できます。

databricks.serviceCredential.scope

STRING

サービス資格情報の OAuth スコープ。Databricks が Kafka サービスのスコープを自動的に推測できない場合にのみこれを設定します。

Unity Catalog サービス資格情報を使用する場合、 kafka.sasl.mechanismkafka.sasl.jaas.configkafka.security.protocolなどの SASL/SSL オプションを指定する必要はありません。

一般的な SASL/SSL オプションは次のとおりです。

オプション

Value

説明

kafka.security.protocol

STRING

ブローカーとの通信に使用されるプロトコル (例: SASL_SSLSSLPLAINTEXT )。

kafka.sasl.mechanism

STRING

SASL メカニズム (例: PLAINSCRAM-SHA-256SCRAM-SHA-512OAUTHBEARERAWS_MSK_IAM )。

kafka.sasl.jaas.config

STRING

JAAS ログイン構成文字列。

kafka.sasl.login.callback.handler.class

STRING

SASL 認証のログイン コールバック ハンドラーの完全修飾クラス名。

kafka.sasl.client.callback.handler.class

STRING

SASL 認証用のクライアント コールバック ハンドラーの完全修飾クラス名。

kafka.ssl.truststore.location

STRING

SSL 信頼ストア ファイルの場所。

kafka.ssl.truststore.password

STRING

SSL 信頼ストア ファイルのパスワード。

kafka.ssl.keystore.location

STRING

SSL キーストア ファイルの場所。

kafka.ssl.keystore.password

STRING

SSL キー ストア ファイルのパスワード。

完全な認証設定手順については、 「認証」を参照してください。

その他のリソース