メインコンテンツまでスキップ

Apache Kafka と Databricks によるストリーム処理

この記事では、Databricks 上で構造化ストリーミングワークロードを実行する際に、Apache Kafka をソースまたはシンクとして使用する方法について説明します。

Kafka の詳細については、Kafka のドキュメントを参照してください。

Kafka からのデータの読み取り

以下に、Kafkaからのストリーミング読み込みの例を挙げています。

Python
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
)

以下の例の通り、Databricksは、Kafka データソースのバッチ読み込みのセマンティクスもサポートしています。

Python
df = (spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
)

増分バッチ読み込みの場合、Databricks では Kafka を Trigger.AvailableNowと共に使用することをお勧めします。 増分バッチ処理の構成を参照してください。

Databricks Runtime 13.3 LTS 以降では、Databricks は Kafka データを読み取るための SQL 関数を提供します。SQLを使用したストリーミングは、DLT または Databricks SQLのストリーミングテーブルでのみサポートされています。read_kafkaテーブル値関数を参照してください。

Kafka 構造化ストリーミング リーダーを構成する

Databricks は、Kafka 0.10 以降への接続を構成するためのデータ形式として kafka キーワードを提供しています。

Kafka の最も一般的な構成を以下に示します。

サブスクライブするトピックを指定する方法は複数あります。次のパラメーターの 1 つのみを指定する必要があります。

オプション

説明

subscribe

トピックのコンマ区切りリスト

サブスクライブするトピックのリストです。

subscribePattern

Javaの正規表現文字列

トピックをサブスクライブするのに使われるパターンです。

assign

JSON 文字列 {"topicA":[0,1],"topic":[2,4]}

コンシュームする特定の topicPartitions です。

その他の注目すべき構成:

オプション

デフォルト値

説明

kafka.bootstrap.servers

ホストのコンマ区切りリスト.

値がありません

[必須] Kafka の bootstrap.servers 構成です。Kafka からのデータがない場合は、まずブローカーのアドレスリストを確認してください。ブローカーのアドレスリストが正しくない場合、エラーが発生しない可能性があります。これは、ブローカーが最終的に利用可能になるとKafka クライアントが想定しており、ネットワークエラーが発生した場合には永久に再試行を繰り返すためです。

failOnDataLoss

true または false

true

[任意] データが失われた可能性がある場合にクエリーを失敗させるかどうか:トピックの削除、処理前のトピックの切り捨てなど、多くのシナリオが原因で、クエリーは Kafka からのデータの読み取りに永久に失敗する可能性があります。データが失われた可能性があるかどうかを保守的に見積もるようになっています。これにより、誤警報が引き起こされる場合があります。期待どおりに動作しない場合、またはデータが失われていてもクエリーの処理を継続したい場合は、このオプションを false に設定してください。

minPartitions

整数 > = 0、0 = 無効

0 (無効)

[オプション] Kafka から読み取るパーティションの最小数。minPartitions オプションを使用して、Kafka から読み取るために任意の最小パーティションを使用するように Spark を設定できます。通常、Spark は Kafka の topicPartitions と Kafka から消費される Spark パーティションを 1 対 1 でマッピングしています。minPartitions オプションを Kafka の topicPartitions よりも大きな値に設定すると、Spark は大きなKafka パーティションを分割して小さくします。このオプションは、負荷のピーク時、データスキュー時、およびストリームが遅れているときに設定して、処理速度を向上させることができます。これには各トリガーで Kafka コンシューマーを初期化するというコストがかかり、Kafka への接続時に SSL を使用する場合はパフォーマンスに影響を与える可能性があります。

kafka.group.id

Kafka コンシューマーグループの ID

設定されていません

[オプション] Kafka からの読み取り中に使用するグループ ID。これは注意して使用してください。デフォルトでは、各クエリはデータを読み取るための一意のグループ ID を生成します。これにより、各クエリには他のコンシューマからの干渉を受けない独自のコンシューマグループが確保され、サブスクライブされたトピックのすべてのパーティションを読み取ることができます。一部のシナリオ(Kafka グループベースの承認など)では、特定の承認されたグループ ID を使用してデータを読み取ることが必要な場合があります。必要に応じて、グループ ID を設定することができます。ただし、予期しない動作が発生する可能性があるため、細心の注意を払って実行してください。 - 同じグループ ID で同時に実行されるクエリ (バッチとストリーミングの両方) は、互いに干渉し、各クエリがデータの一部のみを読み取る可能性があります。 - これは、クエリが立て続けに開始/再開された場合にも発生する可能性があります。 このような問題を最小限に抑えるには、Kafka コンシューマー構成 session.timeout.ms を非常に小さく設定します。

startingOffsets

earliest、latest

latest

[オプション] クエリが開始されるときの開始点。最も古いオフセットからの「earliest」、または各 TopicPartition の開始オフセットを指定する JSON 文字列のいずれかです。json では、オフセットとして -2 を使用して最も古いものを参照し、-1 を使用して最新のものを参照できます。注:バッチクエリの場合、latest(暗黙的または json で -1 を使用) は許可されません。ストリーミングクエリの場合、これは新しいクエリが開始された場合にのみ適用され、再開は常にクエリが中断したところから再開されます。クエリ中に新しく検出されたパーティションは、できるだけ早く開始されます。

他のオプションの構成については、Structured Streaming Kafka Integration Guide を参照してください。

Kafka レコードのスキーマ

Kafka レコードのスキーマは次のとおりです。

タイプ

キー

binary

binary

トピック

string

パーティション

int

オフセット

ロング

タイムスタンプ

ロング

timestampType

int

keyvalue は常に、ByteArrayDeserializer を持つバイト配列として逆シリアル化されます。DataFrame 操作(cast("string")など)を使用して、キーと値を明示的に逆シリアル化します。

Kafka へのデータの書き込み

以下は、Kafka へのストリーミング書き込みの例です:

Python
(df
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.start()
)

Databricks は、次の例に示すように、Kafka データシンクへのバッチ書き込みセマンティクスもサポートしています。

Python
(df
.write
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.save()
)

Kafka 構造化ストリーミング ライターの構成

important

Databricks Runtime 13.3 LTS 以降には、デフォルトによるべき等書き込みを可能にする新しいバージョンの kafka-clients ライブラリが含まれています。 Kafka シンクがバージョン 2.8.0 以下を使用し、ACL が設定されているが、 IDEMPOTENT_WRITE が有効になっていない場合、書き込みは失敗し、 org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error stateというエラー メッセージが表示されます。

このエラーを解決するには、Kafka バージョン 2.8.0 以降にアップグレードするか、構造化ストリーミングライターの設定時に .option(“kafka.enable.idempotence”, “false”) を設定します。

DataStreamWriter に提供されたスキーマは、Kafka シンクと対話します。次のフィールドを使用することができます。

列名

必須またはオプション

タイプ

key

オプション

STRING または BINARY

value

必須

STRING または BINARY

headers

オプション

ARRAY

topic

オプション(topic がライターオプションとして設定されている場合は無視されます)

STRING

partition

オプション

INT

以下は、Kafka への書き込み時に設定される一般的なオプションです。

オプション

デフォルト値

説明

kafka.boostrap.servers

カンマで区切られたリスト <host:port>

なし

[必須] Kafka の bootstrap.servers 構成です。

topic

STRING

設定されていません

[オプション] 書き込まれるすべての行のトピックを設定します。このオプションは、データに存在するすべてのトピック列よりも優先されます。

includeHeaders

BOOLEAN

false

[オプション] Kafka ヘッダーを行に含めるかどうか。

他のオプションの構成については、Structured Streaming Kafka Integration Guide を参照してください。

Kafka メトリクスの取得

avgOffsetsBehindLatestmaxOffsetsBehindLatest 、および minOffsetsBehindLatest メトリクスを使用して、サブスクライブされたすべてのトピック間で、ストリーミングクエリが利用可能な最新のオフセットよりも遅れているオフセット数の平均、最小、最大を取得できます。「メトリクスをインタラクティブに読み取る」を参照。

注記

Databricks Runtime 9.1 以降で利用可能です。

estimatedTotalBytesBehindLatest の値を調べて、クエリプロセスがサブスクライブされたトピックから消費しなかった推定合計バイト数を取得します。この推定値は、過去 300 秒間に処理されたバッチに基づきます。推定の基準となる時間枠は、オプション bytesEstimateWindowLength を別の値に設定することで変更できます。たとえば、10 分に設定するには、次のようにします。

Python
df = (spark.readStream
.format("kafka")
.option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)

ノートブックでストリームを実行している場合、ストリーミングクエリの進行状況ダッシュボードの 生データ タブに次のメトリクスが表示されます。

JSON
{
"sources": [
{
"description": "KafkaV2[Subscribe[topic]]",
"metrics": {
"avgOffsetsBehindLatest": "4.0",
"maxOffsetsBehindLatest": "4",
"minOffsetsBehindLatest": "4",
"estimatedTotalBytesBehindLatest": "80.0"
}
}
]
}

SSL を使用して Databricks を Kafka に接続する

Kafka への SSL 接続を有効にするには、Confluent のドキュメント「 SSL による暗号化と認証」の手順に従います。 ここで説明する構成 (オプションとして、プレフィックス kafka.を付けて提供できます。 たとえば、トラスト ストアの場所をプロパティ kafka.ssl.truststore.locationで指定します。

Databricks では、次のことをお勧めします。

  • 証明書をクラウド・オブジェクト・ストレージに保管します。 証明書へのアクセスは、 にアクセスできるクラスタ Kafkaのみに制限できます。 データガバナンス with Unity Catalogを参照してください。
  • 証明書のパスワードをシークレットスコープ に シークレットとして保存します 。

次の例では、オブジェクトストレージの場所と Databricks シークレットを使用して SSL 接続を有効にします。

Python
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", ...)
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.ssl.truststore.location", <truststore-location>)
.option("kafka.ssl.keystore.location", <keystore-location>)
.option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
.option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)

で に AmazonManaged ストリーミングを使用するKafkaIAM

備考

プレビュー

この機能は、Databricks Runtime 13.3 LTS 以降で パブリック プレビュー 段階です。

Databricks を使用して、IAM を使用して Amazon Managed Streaming for Kafka (MSK) に接続できます。 MSK の設定手順については、「Amazon MSK の設定を参照してください。

注記

次の設定は、IAM を使用して MSK に接続する場合にのみ必要です。 Apache Spark Kafka コネクタによって提供されるオプションを使用して、MSK への接続を構成することもできます。

Databricks インスタンスプロファイルを使用して MSK への接続を管理することをお勧めします。 「インスタンスプロファイル」を参照してください。

インスタンスプロファイルを使用してMSKに接続するには、次のオプションを構成する必要があります。

Scala
"kafka.sasl.mechanism" -> "AWS_MSK_IAM",
"kafka.sasl.jaas.config" ->
"shadedmskiam.software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.client.callback.handler.class" ->
"shadedmskiam.software.amazon.msk.auth.iam.IAMClientCallbackHandler"

オプションで、インスタンスプロファイルの代わりに IAM ユーザーまたは IAMロールを使用してMSKへの接続を構成できます。 AWS アクセスキーとシークレットキーの値は、環境変数 AWS_ACCESS_KEY_IDAWS_SECRET_ACCESS_KEYを使用して指定する必要があります。 「Spark 構成プロパティまたは環境変数でのシークレットの使用」を参照してください。

さらに、IAMロールを使用して接続を構成する場合は、以下の例のようにkafka.sasl.jaas.configに指定された値を変更して、ロールのARNを含める必要があります:shadedmskiam.software.amazon.msk.auth.iam.IAMLoginModule required awsRoleArn="arn:aws:iam::123456789012:role/msk_client_role"

Microsoft Entra ID と Azure Event Hubs を使用したサービスプリンシパル authentication

Databricks では、Event Hubs サービスを使用した Spark ジョブの認証がサポートされています。 この認証は、Microsoft Entra ID を使用した OAuth 経由で行われます。

AAD 認証図

Databricks は Microsoft 次のコンピュート環境でクライアント ID とシークレットを使用した Entra ID 認証をサポートします。

  • Databricks Runtime 12.2 LTS 以降で、専用アクセス モード (以前のシングル ユーザー アクセス モード) で構成されたコンピュート。
  • Databricks Runtime 14.3 LTS 以降、標準アクセス モード (以前の共有アクセス モード) で構成されたコンピュート。
  • Unity Catalog を使用せずに構成された DLT パイプライン。

Databricks は、証明書を使用した Microsoft Entra ID 認証を、コンピュート環境や Unity Catalogで構成された DLT パイプラインでサポートしていません。

この認証は、標準アクセスモードのコンピュートや DLT Unity Catalog 機能しません。

構造化ストリーミング Kafka Connector の構成

Microsoft Entra ID を使用して認証を実行するには、次の値が必要です。

  • テナント ID。 これは、 Microsoft Entra ID サービス タブにあります。

  • clientID (アプリケーション ID とも呼ばれます)。

  • クライアントシークレット。 これを手に入れたら、シークレットとして Databricks Workspaceに追加する必要があります。 このシークレットを追加するには、「 シークレット管理」を参照してください。

  • EventHubs トピック。 トピックの一覧は、特定の Event Hubs 名前空間ページの エンティティ セクションの Event Hubs セクションにあります。複数のトピックを操作するには、Event Hubs レベルで IAMロールを設定します。

  • EventHubs サーバー。 これは、特定の Event Hubs 名前空間 の概要ページで確認できます。

    Event Hubs 名前空間

さらに、Entra ID を使用するには、Kafka に OAuth SASL メカニズムを使用するように指示する必要があります (SASL は汎用プロトコルであり、OAuth は SASL の「メカニズム」の一種です)。

  • kafka.security.protocol は右の値である必要があります SASL_SSL
  • kafka.sasl.mechanism は右の値である必要があります OAUTHBEARER
  • kafka.sasl.login.callback.handler.class は、影付きの Kafka クラスのログインコールバックハンドラに対して kafkashaded の値を持つ Java クラスの完全修飾名である必要があります。 正確なクラスについては、次の例を参照してください。

次に、実行中の例を見てみましょう。

Python
# This is the only section you need to modify for auth purposes!
# ------------------------------
tenant_id = "..."
client_id = "..."
client_secret = dbutils.secrets.get("your-scope", "your-secret-name")

event_hubs_server = "..."
event_hubs_topic = "..."
# -------------------------------

sasl_config = f'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="{client_id}" clientSecret="{client_secret}" scope="https://{event_hubs_server}/.default" ssl.protocol="SSL";'

kafka_options = {
# Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers": f"{event_hubs_server}:9093",
"kafka.sasl.jaas.config": sasl_config,
"kafka.sasl.oauthbearer.token.endpoint.url": f"https://login.microsoft.com/{tenant_id}/oauth2/v2.0/token",
"subscribe": event_hubs_topic,

# You should not need to modify these
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class": "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
}

df = spark.readStream.format("kafka").options(**kafka_options)

display(df)

潜在的なエラーの処理

  • ストリーミングオプションはサポートされていません。

    Unity Catalog で構成された DLT パイプラインでこの認証メカニズムを使用しようとすると、次のエラーが表示される場合があります。

    サポートされていないストリーミングエラー

    このエラーを解決するには、サポートされているコンピュート構成を使用します。 「Microsoft Entra ID を使用したサービスプリンシパル認証」と「Event Hubs Azure」を参照してください。

  • 新しい KafkaAdminClientを作成できませんでした。

    これは、次の認証オプションのいずれかが正しくない場合に Kafka がスローする内部エラーです。

    • クライアント ID (アプリケーション ID とも呼ばれます)
    • テナントID
    • EventHubs サーバー

    このエラーを解決するには、これらのオプションの値が正しいことを確認します。

    また、この例でデフォルトで提供されている設定オプション (変更しないように求められたもの) を変更した場合 ( kafka.security.protocolなど) にも、このエラーが表示されることがあります。

  • 返されるレコードはありません

    DataFrame を表示または処理しようとしているのに結果が得られない場合は、UI に次のように表示されます。

    結果メッセージなし

    このメッセージは、認証は成功したが、EventHubs がデータを返さなかったことを意味します。 考えられるいくつかの(決して網羅的ではありませんが)理由は次のとおりです。

    • 間違った EventHubs トピックを指定しました。
    • startingOffsets のデフォルトの Kafka 設定オプションは latestで、現在、トピックを通じてデータを受け取っていません。Kafka の最も初期のオフセットからデータの読み取りを開始するように startingOffsetstoearliest を設定できます。