オプション
このページでは、Databricks 上の構造化ストリーミングを使用して Apache Kafka の読み取りと書き込みを行うための構成オプションについて説明します。
Databricks Kafka コネクタは、Apache Spark Kafka コネクタ上に構築されており、すべての標準 Kafka 構成オプションをサポートしています。kafka.で始まるオプションはすべて、基盤となる Kafka クライアントに直接渡されます。たとえば、 .option("kafka.max.poll.records", "500") Kafka コンシューマーのmax.poll.recordsプロパティを設定します。利用可能な Kafka プロパティの完全なリストについては、 Kafka 構成ドキュメントを参照してください。
構造化ストリーミングのソースおよびシンクのオプションの完全なリストについては、 Kafkaおよび構造化ストリーミング + Kafka 統合ガイドを参照してください。
必須オプション
必要なオプションの詳細については、 Kafkaを参照してください。
読み取りと書き込みの両方に次のオプションが必要です。
Key | 説明 |
|---|---|
| カンマ区切りのホストリスト Kafkaブローカーのアドレス。Kafkaクライアントの Kafkaからデータが取得できない場合は、このブローカーアドレスリストを確認して、アドレスに誤りがないか確認してください。ブローカーのアドレスリストが間違っている場合、エラーが発生しない可能性があります。Kafkaクライアントは、ブローカーがいずれ利用可能になると想定し、ネットワークエラーが発生した場合は永久に再試行します。 |
Kafkaの読み取りを行う際には、消費するトピックを識別するために、以下のオプションのうちいずれか1つを正確に指定する必要があります。
subscribesubscribePatternassign
Kafka に書き込むときに、必要に応じてtopicオプションを設定して、すべての行の宛先トピックを指定できます。設定されていない場合、DataFrame にはtopic列が含まれている必要があります。
一般的なリーダーオプション
Kafka から読み取るときによく使用されるオプションは次のとおりです。
Key | 説明 |
|---|---|
| Kafkaから読み込む最小パーティション数。 |
| Sparkパーティションあたりの最大レコード数。 |
| データが失われた可能性がある場合に、クエリを失敗させるかどうか。 |
| トリガー間隔ごとに処理されるオフセットの最大数。 |
| クエリが読み取りを開始するオフセット。 |
| バッチクエリの場合、どこで読み込みを停止すべきか。 |
| 自動生成されたコンシューマ グループ ID のカスタマイズされたプレフィックス。 |
| Kafkaからの読み取り中に使用するグループ ID。 予期せぬ動作を引き起こす可能性があるため、使用には十分注意してください。もちろん、各クエリはデータを読み取るための一意のグループ ID を生成します。 これにより、各クエリが独自のコンシューマーグループを持つことが保証され、他のコンシューマーからの干渉を回避できるとともに、各クエリが購読しているトピックのすべてのパーティションを読み取ることができるようになります。Kafkaグループ ベースの承認などの一部のシナリオでは、特定の承認されたグループ ID を使用してデータを読み取ることができます。 同じ ID を持つクエリは互いに干渉し合い、データの一部しか読み取れない可能性があります。 バッチやストリーミングのワークロードを実行するとき、またはクエリを連続して開始および再起動するときに、干渉が発生する可能性があります。 問題を最小限に抑えるため、Kafkaコンシューマーの設定 |
| 出力に Kafka メッセージ ヘッダーを含めるかどうか。 |
|
|
構造化ストリーミングのソースおよびシンクのオプションの完全なリストについては、 Kafkaおよび構造化ストリーミング + Kafka 統合ガイドを参照してください。
一般的なライターオプション
Kafka に書き込むときによく使用されるオプションは次のとおりです。
Key | 説明 |
|---|---|
| すべての行のトピックを設定します。これは、データ内の |
| 行に Kafka ヘッダーを含めるかどうか。 |
Databricks Runtime 13.3 LTS 以降には、デフォルトでべき等書き込みを有効にするkafka-clientsライブラリの新しいバージョンが含まれています。Kafka シンクがバージョン 2.8.0 以下を使用し、ACL が設定されているもののIDEMPOTENT_WRITEが有効になっていない場合、書き込みは失敗します。この問題を解決するには、Kafka 2.8.0 以上にアップグレードするか、 .option("kafka.enable.idempotence", "false")を設定します。
構造化ストリーミングのソースおよびシンクのオプションの完全なリストについては、 Kafkaおよび構造化ストリーミング + Kafka 統合ガイドを参照してください。
認証オプション
Databricks 、 Unity Catalog認証情報、SASL/ SSL 、 AWS MSK、 Azure Event Hubs、Google クラウドマネージドKafkaのクラウド固有のオプションなど、 Kafkaの複数の認証方法をサポートしています。
Databricks 、クラウド管理のKafkaへの認証にUnity Catalog資格情報を使用することをお勧めします。
オプション | 説明 |
|---|---|
| クラウド管理型Kafkaサービス ( AWS MSK、 Azure Event Hubs、または Google クラウド管理型Kafka ) に対する認証のためのUnity Catalogサービス 認証情報の名前。 Databricks Runtime 16.1 以降で利用できます。 |
| サービス資格情報の OAuth スコープ。Databricks が Kafka サービスのスコープを自動的に推測できない場合にのみこれを設定します。 |
Unity Catalog サービス資格情報を使用する場合、 kafka.sasl.mechanism 、 kafka.sasl.jaas.config 、 kafka.security.protocolなどの SASL/SSL オプションを指定する必要はありません。
一般的な SASL/SSL オプションは次のとおりです。
オプション | 説明 |
|---|---|
| ブローカーと通信するために使用されるプロトコル(例: |
| SASLメカニズム(例: |
| JAASログイン設定文字列。 |
| SASL認証用のログインコールバックハンドラの完全修飾クラス名。 |
| SASL認証用のクライアントコールバックハンドラーの完全修飾クラス名。 |
| SSLトラストストアファイルの場所。 |
| SSLトラストストアファイルのパスワード。 |
| SSLキーストアファイルの場所。 |
| SSLキーストアファイルのパスワード。 |
完全な認証設定手順については、 「認証」を参照してください。
その他のリソース
- 構造化ストリーミング + Kafka 統合ガイド(Apache Spark ドキュメント)
- Apache Kafka の構成