オプション
このページでは、Databricks 上の構造化ストリーミングを使用して Apache Kafka の読み取りと書き込みを行うための構成オプションについて説明します。
Databricks Kafka コネクタは、Apache Spark Kafka コネクタ上に構築されており、すべての標準 Kafka 構成オプションをサポートしています。kafka.で始まるオプションはすべて、基盤となる Kafka クライアントに直接渡されます。たとえば、 .option("kafka.max.poll.records", "500") Kafka コンシューマーのmax.poll.recordsプロパティを設定します。利用可能な Kafka プロパティの完全なリストについては、 Kafka 構成ドキュメントを参照してください。
このページに記載されていない追加の構造化ストリーミング ソースおよびシンク オプションについては、 「構造化ストリーミング + Kafka統合ガイド」を参照してください。
必須オプション
読み取りと書き込みの両方に次のオプションが必要です。
オプション | Value | 説明 |
|---|---|---|
| カンマ区切りのホストリスト | Kafka |
Kafka から読み取る場合は、消費するトピックを識別するために、次のいずれかのオプションも指定する必要があります。
オプション | Value | 説明 |
|---|---|---|
| カンマ区切りのトピックリスト | サブスクライブするトピックのリストです。 |
| Java正規表現文字列 | トピックをサブスクライブするのに使われるパターンです。 |
| JSON文字列 | コンシュームする特定の topicPartitions です。 |
Kafka に書き込むときに、必要に応じてtopicオプションを設定して、すべての行の宛先トピックを指定できます。設定されていない場合、DataFrame にはtopic列が含まれている必要があります。
一般的なリーダーオプション
Kafka から読み取るときによく使用されるオプションは次のとおりです。
オプション | Value | デフォルト | 説明 |
|---|---|---|---|
|
| なし | Kafka から読み取るパーティションの最小数。通常、Spark は Kafka トピック パーティションごとに 1 つのパーティションを作成します。この値を高く設定すると、大きな Kafka パーティションが小さな Spark パーティションに分割され、並列処理が向上します。データの偏りやピーク負荷の処理に役立ちます。注意: これを有効にすると、トリガーごとに Kafka コンシューマーが再初期化され、SSL 使用時のパフォーマンスに影響する可能性があります。 |
|
| なし | Spark パーティションあたりのレコードの最大数。設定すると、Spark は Kafka パーティションを分割し、各 Spark パーティションに最大でこの数のレコードが含まれるようになります。 |
|
|
| データが失われた可能性がある場合にクエリを失敗させるかどうか。トピックの削除、処理前のトピックの切り捨てなど、さまざまなシナリオにより、クエリは Kafka からのデータの読み取りに永続的に失敗する可能性があります。データが失われた可能性があるかどうかを控えめに推定します。場合によっては、誤報が発生する可能性があります。期待どおりに動作しない場合、またはデータが失われてもクエリの処理を続行する場合は、このオプションを |
|
| なし | [ストリーミングのみ] トリガー間隔ごとに処理されるオフセットの最大数のレート制限。オフセットの合計数はトピック パーティション間で比例的に分割されます。 より高度なフロー制御を行うには、 |
|
|
| クエリの開始時に読み取りを開始する場所を決定します。最も古い利用可能なオフセットから読み取るには |
|
|
| [バッチのみ] バッチクエリが終了するときのエンドポイント。 最新のオフセットまで読み取るには |
|
|
| 自動生成されたコンシューマー グループ ID のプレフィックス。コネクタはクエリごとに一意の |
|
| なし | Kafka からの読み取り中に使用するグループ ID。注意して使用してください。デフォルトでは、各クエリはデータを読み取るための一意のグループ ID を生成します。これにより、各クエリには、他のコンシューマーからの干渉を受けない独自のコンシューマー グループが存在するようになり、サブスクライブされているトピックのすべてのパーティションを読み取ることができるようになります。一部のシナリオ (Kafka グループベースの承認など) では、特定の承認済みグループ ID を使用してデータを読み取る必要がある場合があります。オプションでグループ ID を設定できます。ただし、予期しない動作が発生する可能性があるため、細心の注意を払って実行してください。 - 同じグループ ID を持つクエリ (バッチとストリーミングの両方) を同時に実行すると、相互に干渉し、各クエリがデータの一部のみを読み取る可能性があります。 - クエリが立て続けに開始/再開された場合にも、この問題が発生する可能性があります。このような問題を最小限に抑えるには、Kafka コンシューマー構成 |
|
|
| 出力に Kafka メッセージ ヘッダーを含めるかどうか。 |
|
|
| [ストリーミングのみ] |
一般的なライターオプション
Kafka に書き込むときによく使用されるオプションは次のとおりです。
オプション | Value | デフォルト | 説明 |
|---|---|---|---|
|
| なし | すべての行のトピックを設定します。これにより、データ内の |
|
|
| 行に Kafka ヘッダーを含めるかどうか。 |
Databricks Runtime 13.3 LTS 以降には、デフォルトでべき等書き込みを有効にするkafka-clientsライブラリの新しいバージョンが含まれています。Kafka シンクがバージョン 2.8.0 以下を使用し、ACL が設定されているもののIDEMPOTENT_WRITEが有効になっていない場合、書き込みは失敗します。この問題を解決するには、Kafka 2.8.0 以上にアップグレードするか、 .option("kafka.enable.idempotence", "false")を設定します。
認証オプション
Databricks 、 Unity Catalog認証情報、SASL/ SSL 、 AWS MSK、 Azure Event Hubs、Google クラウドマネージドKafkaのクラウド固有のオプションなど、 Kafkaの複数の認証方法をサポートしています。
Databricks 、クラウド管理のKafkaへの認証にUnity Catalog資格情報を使用することをお勧めします。
オプション | Value | 説明 |
|---|---|---|
|
| クラウド管理型Kafkaサービス ( AWS MSK、 Azure Event Hubs、または Google クラウド管理型Kafka ) に対する認証のためのUnity Catalogサービス 認証情報の名前。 Databricks Runtime 16.1 以降で利用できます。 |
|
| サービス資格情報の OAuth スコープ。Databricks が Kafka サービスのスコープを自動的に推測できない場合にのみこれを設定します。 |
Unity Catalog サービス資格情報を使用する場合、 kafka.sasl.mechanism 、 kafka.sasl.jaas.config 、 kafka.security.protocolなどの SASL/SSL オプションを指定する必要はありません。
一般的な SASL/SSL オプションは次のとおりです。
オプション | Value | 説明 |
|---|---|---|
|
| ブローカーとの通信に使用されるプロトコル (例: |
|
| SASL メカニズム (例: |
|
| JAAS ログイン構成文字列。 |
|
| SASL 認証のログイン コールバック ハンドラーの完全修飾クラス名。 |
|
| SASL 認証用のクライアント コールバック ハンドラーの完全修飾クラス名。 |
|
| SSL 信頼ストア ファイルの場所。 |
|
| SSL 信頼ストア ファイルのパスワード。 |
|
| SSL キーストア ファイルの場所。 |
|
| SSL キー ストア ファイルのパスワード。 |
完全な認証設定手順については、 「認証」を参照してください。
その他のリソース
- 構造化ストリーミング + Kafka 統合ガイド(Apache Spark ドキュメント)
- Apache Kafka の構成