よくある質問
Databricks で Kafka を使用する際によく寄せられる質問。
Kafka オプションがサポートされていないか認識されないというエラーが表示されるのはなぜですか?
よくある間違いは、Kafka ネイティブ構成オプションを設定するときにkafka.プレフィックスを忘れてしまうことです。Kafka クライアントに直接渡されるすべてのオプションには、プレフィックスとしてkafka.を付ける必要があります。
# Incorrect - missing the kafka. prefix
.option("security.protocol", "SASL_SSL")
.option("sasl.mechanism", "PLAIN")
# Correct - using the kafka. prefix
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "PLAIN")
Spark Kafka コネクタ固有のオプション ( subscribe 、 startingOffsets 、 maxOffsetsPerTriggerなど) にはプレフィックスは必要ありません。完全なリストについてはオプションを参照してください。
シェーディングされた Kafka クラスに関するエラーが発生するのはなぜですか?
Databricks では、シェーディングされた Kafka クラス (プレフィックスがkafkashaded.またはshadedmskiam. ) を使用する必要があります。RESTRICTED_STREAMING_OPTION_PERMISSION_ENFORCEDようなエラーが表示された場合は、網掛けされたクラス名を使用する必要があります。
org.apache.kafka.*クラスにはkafkashaded.プレフィックスが必要です。例えば:kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModulesoftware.amazon.msk.*クラスにはshadedmskiam.プレフィックスが必要です。例えば:shadedmskiam.software.amazon.msk.auth.iam.IAMLoginModule
Kafka に接続するとTimeoutExceptionが表示されるのはなぜですか?
一般的な原因は次のとおりです:
- ネットワーク接続 : コンピュート クラスターがKafkaブローカーに到達できません。 ファイアウォール ルール、セキュリティ グループ、VPC 構成を確認します。
- ブートストラップ サーバーが間違っています :
kafka.bootstrap.serversホスト名とポートが正しいことを確認してください。 - DNS 解決 : Kafka ブローカーのホスト名が Databricks ネットワークから解決できることを確認します。
- SSL/TLS の問題 : SSL を使用している場合は、証明書が正しく構成されていることを確認してください。
プライベート リンクまたは VPC ピアリングの設定では、正しいネットワーク ルートが確立されていることを確認します。
Kafka ではバッチ モードとストリーミング モードのどちらを使用すればよいですか?
使用ケースによって異なります:
- ストリーミング モード (
spark.readStream): 継続的なデータ処理または低遅延の取り込みが必要な場合に使用します。 - バッチ モード (
spark.read): 1 回限りのデータ ロード、バックフィル、またはデバッグに使用します。startingOffsetsとendingOffsetsの両方が必要です。
、 、 リアルタイムAvailableNow``ProcessingTimeモード などのトリガー間隔の構成の詳細については 、「構造化ストリーミング トリガー間隔の構成」を 参照してください。
単一のストリームで複数の Kafka トピックを読み取ることはできますか?
はい、使用できます:
subscribe: トピックのコンマ区切りリストを指定します (例:.option("subscribe", "topic1,topic2"))。subscribePattern: トピック名を一致させるには、Java 正規表現パターンを使用します (例:.option("subscribePattern", "topic-.*"))。
Kafka LakeFlow Spark宣言型パイプラインで使用するにはどうすればよいですか?
LakeFlow Spark宣言型パイプラインは、 Kafkaソースのネイティブ サポートを提供します。 Kafkaから読み取るストリーミング テーブルを定義できます。
- Python
- SQL
import dlt
@dlt.table
def kafka_bronze():
return (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:port>")
.option("subscribe", "<topic>")
.load()
)
CREATE OR REFRESH STREAMING TABLE kafka_bronze AS
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<server:port>',
subscribe => '<topic>'
);
LakeFlow Spark宣言型パイプラインのストリーミング ソースの詳細については、「パイプラインへのデータの読み込み」を参照してください。
Kafka のキーと値の列をデシリアル化するにはどうすればよいですか?
key列とvalue列はバイナリ ( BINARY型) として返されます。データ形式に基づいて逆シリアル化するには、DataFrame 操作を使用します。
- 文字列データ :
cast("string")を使用してバイナリを文字列に変換します。 - JSON データ : 文字列にキャストした後、
from_json()を使用します。from_json関数を参照してください。 - Avro データ : Avro でエンコードされたデータを逆シリアル化するには
from_avro()を使用します。「ストリーミング Avro データの読み取りと書き込み」を参照してください。 - プロトコル バッファー : protobuf データを逆シリアル化するには
from_protobuf()を使用します。プロトコル バッファーの読み取りと書き込みを参照してください。
べき等書き込みエラーが発生するのはなぜですか?
Databricks Runtime 13.3 LTS 以降には、デフォルトでべき等書き込みを有効にするkafka-clientsライブラリの新しいバージョンが含まれています。Kafka クラスターがバージョン 2.8.0 以下を使用しており、ACL が設定されているもののIDEMPOTENT_WRITEが有効になっていない場合、書き込みはorg.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error stateで失敗します。
このエラーを解決するには、 Kafkaバージョン 2.8.0 以降にアップグレードするか、構造化ストリーミング ライターの構成時に.option("kafka.enable.idempotence", "false")を設定します。
KAFKA_DATA_LOSS_ERRORとは何ですか? また、どうすれば解決できますか?
このエラーは、チェックポイントに保存されたオフセットが Kafka で使用できなくなったことを Kafka ソースが検出した場合に発生します。通常は次の理由によります。
- ストリームはKafka保存期間よりも長く停止しました。
- Kafka トピック データが削除されたか、トピックが再作成されました。
- Kafka ブローカーでデータ損失が発生しました。
解決するには:
- データ損失が許容される場合 :
.option("failOnDataLoss", "false")を設定して、ストリームが最も早い利用可能なオフセットから続行できるようにします。 - データ損失が許容できない場合 : チェックポイントをリセットして
earliestオフセットから再処理するか、不足している Kafka データを復元します。
詳細については、 「KAFKA_DATA_LOSS エラー条件」を参照してください。
Kafka からデータを読み取る速度を制御するにはどうすればよいですか?
maxOffsetsPerTriggerオプションを使用して、マイクロバッチごとに処理されるオフセット数 (おおよそのレコード数) を制限します。これにより、下流の処理に負担をかけたり、バックログを処理するときにメモリの問題を引き起こしたりする可能性のある大規模なバッチを回避できます。
- Python
- Scala
- SQL
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:port>")
.option("subscribe", "<topic>")
.option("maxOffsetsPerTrigger", 10000)
.load()
)
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:port>")
.option("subscribe", "<topic>")
.option("maxOffsetsPerTrigger", 10000)
.load()
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<server:port>',
subscribe => '<topic>',
maxOffsetsPerTrigger => '10000'
);
または、 minPartitionsやmaxRecordsPerPartitionなどのオプションを使用して、バッチごとに作成される Spark パーティションの数を制御します。
ストリームが最新の Kafka オフセットからどれくらい遅れているかを監視するにはどうすればよいですか?
ストリーミング クエリの進行状況で利用可能なavgOffsetsBehindLatest 、 maxOffsetsBehindLatest 、およびminOffsetsBehindLatestメトリクスを使用します。 これらは、サブスクライブされているすべてのトピック パーティション全体で、ストリームが最新の利用可能なオフセットからどれだけ遅れているかを報告します。Databricksのモニタリング構造化ストリーミング クエリ」を参照してください。
estimatedTotalBytesBehindLatest使用して、まだ処理されていないデータの合計バイト数を推定することもできます。
Kafka ストリームの初期化が遅いのはなぜですか?
Kafka ストリームでは次の処理に時間がかかります。
- Kafka クラスターに接続し、メタデータを取得します。
- トピックのパーティションを検出します。
- 初期オフセットを取得します。
オンプレミスまたはリモートの Kafka クラスターの場合、ネットワーク遅延が初期化時間に大きな影響を与える可能性があります。頻繁に再起動するトリガー/スケジュールされたパイプラインを実行している場合は、初期化のオーバーヘッドの繰り返しを回避するために、継続的なストリーミング モードの使用を検討してください。
Sparkエグゼキューターを追加してもKafkaスループットが増えないのはなぜですか?
Kafka ブローカーが飽和状態になると、Spark エグゼキューターを追加してもスループットは向上せず、コストが増加します。
Kafka がボトルネックになっている兆候:
- コアを追加してもスループットが停滞します。
- Kafka ブローカーの CPU またはネットワーク使用率が高くなっています。
- Spark タスクはすぐに完了しますが、新しいデータを待機します。
これを解決するには、ブローカーを追加するかパーティション数を増やして負荷を分散し、Kafka クラスターを拡張します。
Kafkaストリーミングのコストとコンピュートの使用率を最適化するにはどうすればよいですか?
マイクロバッチおよびAvailableNowモードの場合:
- クラスターの適切なサイズを設定する : メトリクスを監視し、ピーク負荷に合わせて適切な固定クラスター サイズを設定します。
maxOffsetsPerTrigger使用 : 負荷の急増時にリソースの使用を制御するためにバッチ サイズを制限します。- オートスケールを避けてください : ストリーミング ジョブを継続的に実行し、ノードを追加または削除するとタスクの再バランスのオーバーヘッドが発生します。
- データ スキューの削減 : パーティションの偏りにより、一部のタスクが他のタスクよりも大幅に多くのデータを処理することになり、全体的なバッチの完了が遅くなり、アイドル状態のタスクでコンピュート リソースが浪費される混乱が生じます。 よりバランスの取れた処理を実現するために、
minPartitionsオプションを使用して、大きな Kafka パーティションを小さな Spark パーティションに分割します。
リアルタイム モードでは、データを待機している間タスクがアイドル状態になる可能性があるため、適切なサイズ設定が特に重要です。重要な考慮事項:
- 各タスクが複数の Kafka パーティションを処理してオーバーヘッドを削減するように
maxPartitionsを設定します。 - シャッフルを多用するジョブの場合は
spark.sql.shuffle.partitionsを調整します。
ラケット モードのクラスターのサイジングに関するガイダンスについては、「構造化ストリーミング」の「ラケットモード」を参照してください。
トピックにデータが存在するにもかかわらず、ストリームがレコードを返さないのはなぜですか?
一般的な原因は次のとおりです:
- 間違った
startingOffsets設定 : デフォルト値はlatestで、ストリームの開始後に到着した新しいデータのみを読み取ります。既存のデータを読み取るには、startingOffsetsをearliestに設定します。 - トピック名が間違っています : 正しいトピックをサブスクライブしていることを確認してください。
- 認証の問題 : ストリームは正常に接続されていますが、トピックから読み取る権限がありません。Kafka ACL を確認してください。
- オフセットの有効期限 : ストリームが長時間停止され、チェックポイント内のオフセットの有効期限が切れた場合 (Kafka の保持によって削除された場合)、チェックポイントをリセットするか、
failOnDataLossを調整する必要がある場合があります。