メインコンテンツまでスキップ

オプション

このページでは、Databricks 上の構造化ストリーミングを使用して Apache Kafka の読み取りと書き込みを行うための構成オプションについて説明します。

Databricks Kafka コネクタは、Apache Spark Kafka コネクタ上に構築されており、すべての標準 Kafka 構成オプションをサポートしています。kafka.で始まるオプションはすべて、基盤となる Kafka クライアントに直接渡されます。たとえば、 .option("kafka.max.poll.records", "500") Kafka コンシューマーのmax.poll.recordsプロパティを設定します。利用可能な Kafka プロパティの完全なリストについては、 Kafka 構成ドキュメントを参照してください。

構造化ストリーミングのソースおよびシンクのオプションの完全なリストについては、 Kafkaおよび構造化ストリーミング + Kafka 統合ガイドを参照してください。

必須オプション

必要なオプションの詳細については、 Kafkaを参照してください。

読み取りと書き込みの両方に次のオプションが必要です。

Key

説明

kafka.bootstrap.servers

カンマ区切りのホストリスト Kafkaブローカーのアドレス。Kafkaクライアントのbootstrap.serversプロパティを設定します。

Kafkaからデータが取得できない場合は、このブローカーアドレスリストを確認して、アドレスに誤りがないか確認してください。ブローカーのアドレスリストが間違っている場合、エラーが発生しない可能性があります。Kafkaクライアントは、ブローカーがいずれ利用可能になると想定し、ネットワークエラーが発生した場合は永久に再試行します。

Kafkaの読み取りを行う際には、消費するトピックを識別するために、以下のオプションのうちいずれか1つを正確に指定する必要があります。

  • subscribe
  • subscribePattern
  • assign

Kafka に書き込むときに、必要に応じてtopicオプションを設定して、すべての行の宛先トピックを指定できます。設定されていない場合、DataFrame にはtopic列が含まれている必要があります。

一般的なリーダーオプション

Kafka から読み取るときによく使用されるオプションは次のとおりです。

Key

説明

minPartitions

Kafkaから読み込む最小パーティション数。

maxRecordsPerPartition

Sparkパーティションあたりの最大レコード数。

failOnDataLoss

データが失われた可能性がある場合に、クエリを失敗させるかどうか。

maxOffsetsPerTrigger

トリガー間隔ごとに処理されるオフセットの最大数。

startingOffsets

クエリが読み取りを開始するオフセット。

endingOffsets

バッチクエリの場合、どこで読み込みを停止すべきか。

groupIdPrefix

自動生成されたコンシューマ グループ ID のカスタマイズされたプレフィックス。

kafka.group.id

Kafkaからの読み取り中に使用するグループ ID。

予期せぬ動作を引き起こす可能性があるため、使用には十分注意してください。もちろん、各クエリはデータを読み取るための一意のグループ ID を生成します。 これにより、各クエリが独自のコンシューマーグループを持つことが保証され、他のコンシューマーからの干渉を回避できるとともに、各クエリが購読しているトピックのすべてのパーティションを読み取ることができるようになります。Kafkaグループ ベースの承認などの一部のシナリオでは、特定の承認されたグループ ID を使用してデータを読み取ることができます。

同じ ID を持つクエリは互いに干渉し合い、データの一部しか読み取れない可能性があります。 バッチやストリーミングのワークロードを実行するとき、またはクエリを連続して開始および再起動するときに、干渉が発生する可能性があります。

問題を最小限に抑えるため、Kafkaコンシューマーの設定session.timeout.msを非常に小さく設定してください。

includeHeaders

出力に Kafka メッセージ ヘッダーを含めるかどうか。

bytesEstimateWindowLength

estimatedTotalBytesBehindLatestメトリクスを介して残りのバイトを推定するために使用される時間枠。

構造化ストリーミングのソースおよびシンクのオプションの完全なリストについては、 Kafkaおよび構造化ストリーミング + Kafka 統合ガイドを参照してください。

一般的なライターオプション

Kafka に書き込むときによく使用されるオプションは次のとおりです。

Key

説明

topic

すべての行のトピックを設定します。これは、データ内のtopic列よりも優先されます。

includeHeaders

行に Kafka ヘッダーを含めるかどうか。

重要

Databricks Runtime 13.3 LTS 以降には、デフォルトでべき等書き込みを有効にするkafka-clientsライブラリの新しいバージョンが含まれています。Kafka シンクがバージョン 2.8.0 以下を使用し、ACL が設定されているもののIDEMPOTENT_WRITEが有効になっていない場合、書き込みは失敗します。この問題を解決するには、Kafka 2.8.0 以上にアップグレードするか、 .option("kafka.enable.idempotence", "false")を設定します。

構造化ストリーミングのソースおよびシンクのオプションの完全なリストについては、 Kafkaおよび構造化ストリーミング + Kafka 統合ガイドを参照してください。

認証オプション

Databricks 、 Unity Catalog認証情報、SASL/ SSL 、 AWS MSK、 Azure Event Hubs、Google クラウドマネージドKafkaのクラウド固有のオプションなど、 Kafkaの複数の認証方法をサポートしています。

Databricks 、クラウド管理のKafkaへの認証にUnity Catalog資格情報を使用することをお勧めします。

オプション

説明

databricks.serviceCredential

クラウド管理型Kafkaサービス ( AWS MSK、 Azure Event Hubs、または Google クラウド管理型Kafka ) に対する認証のためのUnity Catalogサービス 認証情報の名前。 Databricks Runtime 16.1 以降で利用できます。

databricks.serviceCredential.scope

サービス資格情報の OAuth スコープ。Databricks が Kafka サービスのスコープを自動的に推測できない場合にのみこれを設定します。

Unity Catalog サービス資格情報を使用する場合、 kafka.sasl.mechanismkafka.sasl.jaas.configkafka.security.protocolなどの SASL/SSL オプションを指定する必要はありません。

一般的な SASL/SSL オプションは次のとおりです。

オプション

説明

kafka.security.protocol

ブローカーと通信するために使用されるプロトコル(例: SASL_SSLSSLPLAINTEXT )。

kafka.sasl.mechanism

SASLメカニズム(例: PLAINSCRAM-SHA-256SCRAM-SHA-512OAUTHBEARERAWS_MSK_IAM )。

kafka.sasl.jaas.config

JAASログイン設定文字列。

kafka.sasl.login.callback.handler.class

SASL認証用のログインコールバックハンドラの完全修飾クラス名。

kafka.sasl.client.callback.handler.class

SASL認証用のクライアントコールバックハンドラーの完全修飾クラス名。

kafka.ssl.truststore.location

SSLトラストストアファイルの場所。

kafka.ssl.truststore.password

SSLトラストストアファイルのパスワード。

kafka.ssl.keystore.location

SSLキーストアファイルの場所。

kafka.ssl.keystore.password

SSLキーストアファイルのパスワード。

完全な認証設定手順については、 「認証」を参照してください。

その他のリソース