Apache Kafka と Databricks によるストリーム処理
この記事では、Databricks 上で構造化ストリーミングワークロードを実行する際に、Apache Kafka をソースまたはシンクとして使用する方法について説明します。
Kafka の詳細については、Kafka のドキュメントを参照してください。
Kafka からのデータの読み取り
以下に、Kafkaからのストリーミング読み込みの例を挙げています。
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
)
以下の例の通り、Databricksは、Kafka データソースのバッチ読み込みのセマンティクスもサポートしています。
df = (spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
)
増分バッチ読み込みの場合、Databricks では Kafka を Trigger.AvailableNow
と共に使用することをお勧めします。 増分バッチ処理の構成を参照してください。
Databricks Runtime 13.3 LTS 以降では、Databricks は Kafka データを読み取るための SQL 関数を提供します。SQLを使用したストリーミングは、DLT または Databricks SQLのストリーミングテーブルでのみサポートされています。read_kafka
テーブル値関数を参照してください。
Kafka 構造化ストリーミング リーダーを構成する
Databricks は、Kafka 0.10 以降への接続を構成するためのデータ形式として kafka
キーワードを提供しています。
Kafka の最も一般的な構成を以下に示します。
サブスクライブするトピックを指定する方法は複数あります。次のパラメーターの 1 つのみを指定する必要があります。
オプション | 値 | 説明 |
---|---|---|
subscribe | トピックのコンマ区切りリスト | サブスクライブするトピックのリストです。 |
subscribePattern | Javaの正規表現文字列 | トピックをサブスクライブするのに使われるパターンです。 |
assign | JSON 文字列 | コンシュームする特定の topicPartitions です。 |
その他の注目すべき構成:
オプション | 値 | デフォルト値 | 説明 |
---|---|---|---|
kafka.bootstrap.servers | ホストのコンマ区切りリスト. | 値がありません | [必須] Kafka の |
failOnDataLoss |
|
| [任意] データが失われた可能性がある場合にクエリーを失敗させるかどうか:トピックの削除、処理前のトピックの切り捨てなど、多くのシナリオが原因で、クエリーは Kafka からのデータの読み取りに永久に失敗する可能性があります。データが失われた可能性があるかどうかを保守的に見積もるようになっています。これにより、誤警報が引き起こされる場合があります。期待どおりに動作しない場合、またはデータが失われていてもクエリーの処理を継続したい場合は、このオプションを |
minPartitions | 整数 > = 0、0 = 無効 | 0 (無効) | [オプション] Kafka から読み取るパーティションの最小数。 |
kafka.group.id | Kafka コンシューマーグループの ID | 設定されていません | [オプション] Kafka からの読み取り中に使用するグループ ID。これは注意して使用してください。デフォルトでは、各クエリはデータを読み取るための一意のグループ ID を生成します。これにより、各クエリには他のコンシューマからの干渉を受けない独自のコンシューマグループが確保され、サブスクライブされたトピックのすべてのパーティションを読み取ることができます。一部のシナリオ(Kafka グループベースの承認など)では、特定の承認されたグループ ID を使用してデータを読み取ることが必要な場合があります。必要に応じて、グループ ID を設定することができます。ただし、予期しない動作が発生する可能性があるため、細心の注意を払って実行してください。 - 同じグループ ID で同時に実行されるクエリ (バッチとストリーミングの両方) は、互いに干渉し、各クエリがデータの一部のみを読み取る可能性があります。 - これは、クエリが立て続けに開始/再開された場合にも発生する可能性があります。 このような問題を最小限に抑えるには、Kafka コンシューマー構成 |
startingOffsets | earliest、latest | latest | [オプション] クエリが開始されるときの開始点。最も古いオフセットからの「earliest」、または各 TopicPartition の開始オフセットを指定する JSON 文字列のいずれかです。json では、オフセットとして -2 を使用して最も古いものを参照し、-1 を使用して最新のものを参照できます。注:バッチクエリの場合、latest(暗黙的または json で -1 を使用) は許可されません。ストリーミングクエリの場合、これは新しいクエリが開始された場合にのみ適用され、再開は常にクエリが中断したところから再開されます。クエリ中に新しく検出されたパーティションは、できるだけ早く開始されます。 |
他のオプションの構成については、Structured Streaming Kafka Integration Guide を参照してください。
Kafka レコードのスキーマ
Kafka レコードのスキーマは次のとおりです。
列 | タイプ |
---|---|
キー | binary |
値 | binary |
トピック | string |
パーティション | int |
オフセット | ロング |
タイムスタンプ | ロング |
timestampType | int |
key
と value
は常に、ByteArrayDeserializer
を持つバイト配列として逆シリアル化されます。DataFrame 操作(cast("string")
など)を使用して、キーと値を明示的に逆シリアル化します。
Kafka へのデータの書き込み
以下は、Kafka へのストリーミング書き込みの例です:
(df
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.start()
)
Databricks は、次の例に示すように、Kafka データシンクへのバッチ書き込みセマンティクスもサポートしています。
(df
.write
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.save()
)
Kafka 構造化ストリーミング ライターの構成
Databricks Runtime 13.3 LTS 以降には、デフォルトによるべき等書き込みを可能にする新しいバージョンの kafka-clients
ライブラリが含まれています。 Kafka シンクがバージョン 2.8.0 以下を使用し、ACL が設定されているが、 IDEMPOTENT_WRITE
が有効になっていない場合、書き込みは失敗し、 org.apache.kafka.common.KafkaException:
Cannot execute transactional method because we are in an error state
というエラー メッセージが表示されます。
このエラーを解決するには、Kafka バージョン 2.8.0 以降にアップグレードするか、構造化ストリーミングライターの設定時に .option(“kafka.enable.idempotence”, “false”)
を設定します。
DataStreamWriter に提供されたスキーマは、Kafka シンクと対話します。次のフィールドを使用することができます。
列名 | 必須またはオプション | タイプ |
---|---|---|
| オプション |
|
| 必須 |
|
| オプション |
|
| オプション( |
|
| オプション |
|
以下は、Kafka への書き込み時に設定される一般的なオプションです。
オプション | 値 | デフォルト値 | 説明 |
---|---|---|---|
| カンマで区切られたリスト | なし | [必須] Kafka の |
|
| 設定されていません | [オプション] 書き込まれるすべての行のトピックを設定します。このオプションは、データに存在するすべてのトピック列よりも優先されます。 |
|
|
| [オプション] Kafka ヘッダーを行に含めるかどうか。 |
他のオプションの構成については、Structured Streaming Kafka Integration Guide を参照してください。
Kafka メトリクスの取得
avgOffsetsBehindLatest
、 maxOffsetsBehindLatest
、および minOffsetsBehindLatest
メトリクスを使用して、サブスクライブされたすべてのトピック間で、ストリーミングクエリが利用可能な最新のオフセットよりも遅れているオフセット数の平均、最小、最大を取得できます。「メトリクスをインタラクティブに読み取る」を参照。
Databricks Runtime 9.1 以降で利用可能です。
estimatedTotalBytesBehindLatest
の値を調べて、クエリプロセスがサブスクライブされたトピックから消費しなかった推定合計バイト数を取得します。この推定値は、過去 300 秒間に処理されたバッチに基づきます。推定の基準となる時間枠は、オプション bytesEstimateWindowLength
を別の値に設定することで変更できます。たとえば、10 分に設定するには、次のようにします。
df = (spark.readStream
.format("kafka")
.option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)
ノートブックでストリームを実行している場合、ストリーミングクエリの進行状況ダッシュボードの 生データ タブに次のメトリクスが表示されます。
{
"sources": [
{
"description": "KafkaV2[Subscribe[topic]]",
"metrics": {
"avgOffsetsBehindLatest": "4.0",
"maxOffsetsBehindLatest": "4",
"minOffsetsBehindLatest": "4",
"estimatedTotalBytesBehindLatest": "80.0"
}
}
]
}
SSL を使用して Databricks を Kafka に接続する
Kafka への SSL 接続を有効にするには、Confluent のドキュメント「 SSL による暗号化と認証」の手順に従います。 ここで説明する構成 (オプションとして、プレフィックス kafka.
を付けて提供できます。 たとえば、トラスト ストアの場所をプロパティ kafka.ssl.truststore.location
で指定します。
Databricks では、次のことをお勧めします。
- 証明書をクラウド・オブジェクト・ストレージに保管します。 証明書へのアクセスは、 にアクセスできるクラスタ Kafkaのみに制限できます。 データガバナンス with Unity Catalogを参照してください。
- 証明書のパスワードをシークレットスコープ に シークレットとして保存します 。
次の例では、オブジェクトストレージの場所と Databricks シークレットを使用して SSL 接続を有効にします。
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", ...)
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.ssl.truststore.location", <truststore-location>)
.option("kafka.ssl.keystore.location", <keystore-location>)
.option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
.option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)
で に AmazonManaged ストリーミングを使用するKafkaIAM
プレビュー
この機能は、Databricks Runtime 13.3 LTS 以降で パブリック プレビュー 段階です。
Databricks を使用して、IAM を使用して Amazon Managed Streaming for Kafka (MSK) に接続できます。 MSK の設定手順については、「Amazon MSK の設定を参照してください。
次の設定は、IAM を使用して MSK に接続する場合にのみ必要です。 Apache Spark Kafka コネクタによって提供されるオプションを使用して、MSK への接続を構成することもできます。
Databricks インスタンスプロファイルを使用して MSK への接続を管理することをお勧めします。 「インスタンスプロファイル」を参照してください。
インスタンスプロファイルを使用してMSKに接続するには、次のオプションを構成する必要があります。
- Scala
- Python
"kafka.sasl.mechanism" -> "AWS_MSK_IAM",
"kafka.sasl.jaas.config" ->
"shadedmskiam.software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.client.callback.handler.class" ->
"shadedmskiam.software.amazon.msk.auth.iam.IAMClientCallbackHandler"
"kafka.sasl.mechanism": "AWS_MSK_IAM",
"kafka.sasl.jaas.config":
"shadedmskiam.software.amazon.msk.auth.iam.IAMLoginModule required;",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.client.callback.handler.class":
"shadedmskiam.software.amazon.msk.auth.iam.IAMClientCallbackHandler"
オプションで、インスタンスプロファイルの代わりに IAM ユーザーまたは IAMロールを使用してMSKへの接続を構成できます。 AWS アクセスキーとシークレットキーの値は、環境変数 AWS_ACCESS_KEY_ID
と AWS_SECRET_ACCESS_KEY
を使用して指定する必要があります。 「Spark 構成プロパティまたは環境変数でのシークレットの使用」を参照してください。
さらに、IAMロールを使用して接続を構成する場合は、以下の例のようにkafka.sasl.jaas.config
に指定された値を変更して、ロールのARNを含める必要があります:shadedmskiam.software.amazon.msk.auth.iam.IAMLoginModule required awsRoleArn="arn:aws:iam::123456789012:role/msk_client_role"
Microsoft Entra ID と Azure Event Hubs を使用したサービスプリンシパル authentication
Databricks では、Event Hubs サービスを使用した Spark ジョブの認証がサポートされています。 この認証は、Microsoft Entra ID を使用した OAuth 経由で行われます。
Databricks は Microsoft 次のコンピュート環境でクライアント ID とシークレットを使用した Entra ID 認証をサポートします。
- Databricks Runtime 12.2 LTS 以降で、専用アクセス モード (以前のシングル ユーザー アクセス モード) で構成されたコンピュート。
- Databricks Runtime 14.3 LTS 以降、標準アクセス モード (以前の共有アクセス モード) で構成されたコンピュート。
- Unity Catalog を使用せずに構成された DLT パイプライン。
Databricks は、証明書を使用した Microsoft Entra ID 認証を、コンピュート環境や Unity Catalogで構成された DLT パイプラインでサポートしていません。
この認証は、標準アクセスモードのコンピュートや DLT Unity Catalog 機能しません。
構造化ストリーミング Kafka Connector の構成
Microsoft Entra ID を使用して認証を実行するには、次の値が必要です。
-
テナント ID。 これは、 Microsoft Entra ID サービス タブにあります。
-
clientID (アプリケーション ID とも呼ばれます)。
-
クライアントシークレット。 これを手に入れたら、シークレットとして Databricks Workspaceに追加する必要があります。 このシークレットを追加するには、「 シークレット管理」を参照してください。
-
EventHubs トピック。 トピックの一覧は、特定の Event Hubs 名前空間ページの エンティティ セクションの Event Hubs セクションにあります。複数のトピックを操作するには、Event Hubs レベルで IAMロールを設定します。
-
EventHubs サーバー。 これは、特定の Event Hubs 名前空間 の概要ページで確認できます。
さらに、Entra ID を使用するには、Kafka に OAuth SASL メカニズムを使用するように指示する必要があります (SASL は汎用プロトコルであり、OAuth は SASL の「メカニズム」の一種です)。
kafka.security.protocol
は右の値である必要がありますSASL_SSL
kafka.sasl.mechanism
は右の値である必要がありますOAUTHBEARER
kafka.sasl.login.callback.handler.class
は、影付きの Kafka クラスのログインコールバックハンドラに対してkafkashaded
の値を持つ Java クラスの完全修飾名である必要があります。 正確なクラスについては、次の例を参照してください。
例
次に、実行中の例を見てみましょう。
- Python
- Scala
# This is the only section you need to modify for auth purposes!
# ------------------------------
tenant_id = "..."
client_id = "..."
client_secret = dbutils.secrets.get("your-scope", "your-secret-name")
event_hubs_server = "..."
event_hubs_topic = "..."
# -------------------------------
sasl_config = f'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="{client_id}" clientSecret="{client_secret}" scope="https://{event_hubs_server}/.default" ssl.protocol="SSL";'
kafka_options = {
# Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers": f"{event_hubs_server}:9093",
"kafka.sasl.jaas.config": sasl_config,
"kafka.sasl.oauthbearer.token.endpoint.url": f"https://login.microsoft.com/{tenant_id}/oauth2/v2.0/token",
"subscribe": event_hubs_topic,
# You should not need to modify these
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class": "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
}
df = spark.readStream.format("kafka").options(**kafka_options)
display(df)
// This is the only section you need to modify for auth purposes!
// -------------------------------
val tenantId = "..."
val clientId = "..."
val clientSecret = dbutils.secrets.get("your-scope", "your-secret-name")
val eventHubsServer = "..."
val eventHubsTopic = "..."
// -------------------------------
val saslConfig = s"""kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="$clientId" clientSecret="$clientSecret" scope="https://$eventHubsServer/.default" ssl.protocol="SSL";"""
val kafkaOptions = Map(
// Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers" -> s"$eventHubsServer:9093",
"kafka.sasl.jaas.config" -> saslConfig,
"kafka.sasl.oauthbearer.token.endpoint.url" -> s"https://login.microsoft.com/$tenantId/oauth2/v2.0/token",
"subscribe" -> eventHubsTopic,
// You should not need to modify these
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class" -> "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
)
val scalaDF = spark.readStream
.format("kafka")
.options(kafkaOptions)
.load()
display(scalaDF)
潜在的なエラーの処理
-
ストリーミングオプションはサポートされていません。
Unity Catalog で構成された DLT パイプラインでこの認証メカニズムを使用しようとすると、次のエラーが表示される場合があります。
このエラーを解決するには、サポートされているコンピュート構成を使用します。 「Microsoft Entra ID を使用したサービスプリンシパル認証」と「Event Hubs Azure」を参照してください。
-
新しい
KafkaAdminClient
を作成できませんでした。これは、次の認証オプションのいずれかが正しくない場合に Kafka がスローする内部エラーです。
- クライアント ID (アプリケーション ID とも呼ばれます)
- テナントID
- EventHubs サーバー
このエラーを解決するには、これらのオプションの値が正しいことを確認します。
また、この例でデフォルトで提供されている設定オプション (変更しないように求められたもの) を変更した場合 (
kafka.security.protocol
など) にも、このエラーが表示されることがあります。 -
返されるレコードはありません
DataFrame を表示または処理しようとしているのに結果が得られない場合は、UI に次のように表示されます。
このメッセージは、認証は成功したが、EventHubs がデータを返さなかったことを意味します。 考えられるいくつかの(決して網羅的ではありませんが)理由は次のとおりです。
- 間違った EventHubs トピックを指定しました。
startingOffsets
のデフォルトの Kafka 設定オプションはlatest
で、現在、トピックを通じてデータを受け取っていません。Kafka の最も初期のオフセットからデータの読み取りを開始するようにstartingOffsetstoearliest
を設定できます。