メインコンテンツまでスキップ

Amazon Kinesis に接続する

構造化ストリーミングを使用して、Amazon Kinesis へのデータの読み書きを行います。

Databricksは、すべてのS3トラフィックがAWSネットワーク上でルーティングされるように、S3 VPCエンドポイントを有効にすることを推奨します。

注記

Kinesisストリームを削除して再作成した場合、既存のチェックポイント ディレクトリを再利用してストリーミング クエリを再開することはできません。 チェックポイントディレクトリを削除し、クエリを最初からやり直す必要があります。構造化ストリーミングでは、ストリームを中断したり再開したりすることなく、シャード数を増やすことでシャーディングを再実行できます。

クエリのレイテンシに関するトラブルシューティングの推奨事項については、 「Kinesis を使用したレイテンシ削減のための推奨事項」を参照してください。

Amazon Kinesis で認証する

Databricks Runtime 16.1以降では、 Databricks Serviceの認証情報を使用してKinesisへの接続を管理することをDatabricks推奨しています。 サービス資格情報の作成を参照してください。

サービス資格情報を使用するには、次の手順を実行します。

  1. ロールを使用して、 Databricksにアクセスするために必要な権限を持つ サービスの資格情報を作成します。IAMKinesis「ステップ 1: IAMロールを作成する」を参照してください。
  2. ストリーミング読み取りを定義するときに、 serviceCredentialオプションを使用してサービス資格情報の名前を指定します。
注記

Kinesisソースには、 ListShardsGetRecords 、およびGetShardIterator権限が必要です。 Amazon: Access Denied例外が発生した場合は、 IAMロールにこれらの権限があることを確認してください。 IAMを使用したAmazon Kinesisデータストリームリソースへのアクセスの制御」を参照してください。

代替認証方法

Databricks Runtime 16.0 以前では、 Databricks資格情報は使用できません。 Databricksには、以下の代替認証方法があります。

インスタンスプロファイル

コンピュートの設定中にインスタンスプロファイルをアタッチします。 「インスタンスプロファイル」を参照してください。

インスタンスプロファイルは、標準アクセスモード(旧称:共有アクセスモード)ではサポートされていません。「標準コンピュートの要件と制限事項」を参照してください。

アクセスキーを直接使用する

awsAccessKeyawsSecretKeyオプションを設定してください。

キーを使用する場合は、Databricks シークレットを使用してキーを保存します。「秘密管理」を参照してください。

ロール IAM引き受ける

一部のコンピュート構成では、 roleArnオプションを使用してIAMロールを引き受けることができます。 役割を引き受けるには、役割を引き受ける権限を持ってクラスターを起動するか、 awsAccessKeyawsSecretKeyを通じてアクセスキーを提供します。

この方法は、アカウント間認証をサポートしています。詳細については、 IAMを使用したAWSアカウント間のアクセスの委任」を参照してください。

オプションで、外部 ID を roleExternalId で指定し、セッション名を roleSessionNameで指定できます。

スキーマ

Kinesis は、次のスキーマのレコードを返します。

タイプ

partitionKey

string

data

binary

stream

string

shardId

string

sequenceNumber

string

approximateArrivalTimestamp

タイムスタンプ

data列のデータを逆シリアル化するには、そのフィールドを文字列にキャストします。

クイックスタート

次のノートブックは、Kinesisを使用した構造化ストリーミングを使用して WordCount を実行する方法を示しています。

Kinesis WordCount と構造化ストリーミングノートブック

ノートブックを新しいタブで開く

Kinesis オプションの設定

Databricks Runtime 13.3 LTS 以降では、Kinesis で Trigger.AvailableNow を使用できます。 「 Kinesis レコードを増分バッチとして取り込む」を参照してください。

Databricks Runtime 16.1以降では、 streamARNを使用してKinesisソースを識別できます。 すべての Databricks Runtime バージョンにおいて、 streamNameまたはstreamARNいずれかを指定する必要がありますが、両方を指定することはできません。

警告

アクティブなストリーミングクエリでは、 streamNamestreamARNを切り替えないでください。Databricks 、ストリームの途中でのこれらのオプションの変更をサポートしていません。 クエリを再起動すると、重複レコードが発生したり、データが失われたりする可能性があります。streamNameからstreamARNに切り替えるには、新しいチェックポイントディレクトリを使用して新しいストリーミングクエリを開始します。

オプションの完全なリストについては、Kinesisを参照してください。

低遅延モニタリングとアラート

アラート機能のユースケースでは、低遅延が求められる。遅延を最小限に抑えるには:

  • ストリーミングクエリがKinesisストリームの唯一のコンシューマであることを確認して、フェッチパフォーマンスを最適化し、 Kinesisレート制限を回避します。
  • 取得したデータを可能な限り高速に処理するには、オプションmaxFetchDurationに200msなどの小さな値を設定してください。このオプションはトレードオフの関係にあります。各バッチで最新のレコードが確実に処理されることを保証するよりも、バッチごとの処理速度を優先するからです。例えば、 Trigger.AvailableNow使用する場合、値が小さいとクエリが Kinesis ストリームの最新レコードに追いつかなくなる可能性があります。
  • オプション minFetchPeriod を 210ms に設定して、できるだけ頻繁にフェッチします。
  • オプションshardsPerTaskを設定するか、クラスターを# cores in cluster >= 2 * (# Kinesis shards) / shardsPerTaskとなるように構成してください。これにより、バックグラウンドでのプリフェッチタスクとストリーミングクエリタスクが同時に実行されることが保証されます。

クエリが5秒ごとにデータを受信している場合、 Kinesisのレート制限を超える可能性があります。設定を確認してください。

Kinesisのメトリクスを監視する

Kinesisは、ワークスペースごとに、コンシューマーがストリームの開始時点からどれだけ遅れているかをミリ秒単位で報告します。avgMsBehindLatestmaxMsBehindLatest 、およびminMsBehindLatestメトリクスは、ストリーミング クエリ プロセスにおけるすべてのワークスペースの平均、最小、最大のミリ秒を提供します。 Databricksのモニタリング構造化ストリーミング クエリ」を参照してください。

ノートブックでストリームを実行している場合は、ストリーミングクエリの進行状況ダッシュボードの 「生データ」 タブにあるメトリクスを確認してください。以下に例を示します。

JSON
{
"sources": [
{
"description": "KinesisV2[stream]",
"metrics": {
"avgMsBehindLatest": "32000.0",
"maxMsBehindLatest": "32000",
"minMsBehindLatest": "32000"
}
}
]
}

Kinesis レコードを増分バッチとして取り込む

Databricks Runtime 13.3 LTS 以降では、Databricks はインクリメンタル バッチ セマンティクスのための Trigger.AvailableNow Kinesis データソースの使用をサポートしています。次に、基本設定について説明します。

  1. マイクロバッチの読み取りが available now モードでトリガーされると、Databricks クライアントによって現在の時刻が記録されます。
  2. Databricks は、この記録時刻と前のチェックポイントの間のタイムスタンプを持つすべてのレコードについて、ソースシステムにポーリングします。
  3. Databricks は、Trigger.AvailableNow セマンティクスを使用してこれらのレコードを読み込みます。

Databricks 、ベストエフォート型メカニズムを使用して、ストリーミング クエリ実行時にKinesisストリームに存在するすべてのレコードを試行して消費します。 タイムスタンプのわずかな差異やデータソースにおける順序の保証の欠如により、トリガーされたバッチには一部のレコードが含まれない可能性があります。省略されたレコードは、次にトリガーされるマイクロバッチで処理されます。

注記

レコードがあってもクエリが Kinesis ストリームからのレコードのフェッチに引き続き失敗する場合は、 maxFetchDuration 値を増やしてみてください。

AvailableNowを参照してください: 増分バッチ処理

Kinesis への書き込み

Kinesis にデータを書き込むための ForeachSink として、次のコードスニペットを使用します。これには、Dataset[(String, Array[Byte])] が必要です。

注記

以下のコードスニペットは、「厳密に 1 回」ではなく、「 最低 1 回 」のセマンティクスを提供しています。

Kinesis Foreach シンクノートブック

ノートブックを新しいタブで開く

Kinesisでレイテンシを削減するための推奨事項

このセクションでは、 Kinesisストリームの遅延に関するさまざまな原因をトラブルシューティングするための推奨事項を示します。

Kinesisソースはバックグラウンド スレッドでSparkジョブを実行し、 Kinesisデータを定期的にプリフェッチし、そのデータをSparkエグキューゼター メモリにキャッシュします。 各プリフェッチ ステップが完了すると、ストリーミング クエリでキャッシュされたデータを処理できます。 プリフェッチのステップは、観測されるエンドツーエンドの遅延とスループットに大きな影響を与える。

プリフェッチのレイテンシーを短縮

クエリの待機時間を最小限に抑え、リソース使用量を最大化するように最適化するには、次の計算を使用します。

total number of CPU cores in the cluster (across all executors) >= total number of Kinesis shards / shardsPerTask.

重要

minFetchPeriod Kinesisシャードに対して、 ReadProvisionedThroughputExceededに達するまで複数のGetRecords API呼び出しを作成できます。例外が発生した場合でも、コネクタがKinesisシャードの利用率を最大化するため、問題にならない可能性があります。

レート制限エラーが多すぎることによる速度低下を回避

コネクタは、レート制限エラーが発生するたびに Kinesis から読み取られるデータ量を半分に減らし、このイベントをメッセージとともにログに記録します。 "Hit rate limit. Sleeping for 5 seconds."

ストリームが追いついてくる過程で、これらのエラーが表示されることがあります。ストリームが追いついた後にこれらのエラーが表示される場合は、AWS の Kinesis の容量を増やすか、Spark のプリフェッチオプションを調整することで、ワークロードを調整する必要があるかもしれません。

ディスク流出を防ぐ

Kinesisストリームのデータ量が急激に増加すると、割り当てられたバッファ容量がいっぱいになり、新しいデータを追加するのに十分な速さで空き容量が確保されない可能性があります。 Sparkバッファからディスクにデータを書き出すと、ストリーム処理が遅くなり、ログに次のようなメッセージを含むイベントが表示されます。

Bash
./log4j.txt:879546:20/03/02 17:15:04 INFO BlockManagerInfo: Updated kinesis_49290928_1_ef24cc00-abda-4acd-bb73-cb135aed175c on disk on 10.0.208.13:43458 (current size: 88.4 MB, original size: 0.0 B)

スピルを回避するには、ノードを追加するかノードあたりのメモリを増やすことでクラスタ メモリ容量を増やすか、構成を減らしますfetchBufferSize

中断されたS3書き込みタスク

Sparkの投機的実行を有効にして、ストリーム処理の進行を妨げる可能性のある中断されたタスクを終了させます。タスクが過度に強制終了されないようにするには、この設定の分位数と乗数を慎重に調整してください。Databricksでは、 spark.speculation.multiplier3に、 spark.speculation.quantile0.95に設定し、必要に応じて調整することを推奨しています。

ステートフルストリームにおけるチェックポイント処理によるレイテンシを削減する

Databricks では、ステートフル ストリーミング クエリの変更ログ チェックポイント処理と共に RocksDB を使用することをお勧めします。 「変更ログのチェックポイント設定を有効にする」を参照してください。

ストリーミングクエリ読み取りのための Kinesis 拡張ファンアウト (EFO) の設定

Databricks Runtime 11.3以上では、Databricks Runtime KinesisコネクタがAmazon Kinesisの拡張ファンアウト(EFO)機能の使用をサポートします。

Kinesisの強化されたファンアウト機能は、コンシューマーごとにシャードあたり2MB/秒の専用スループットを提供し(ストリームあたり最大20個のコンシューマー)、プルモードではなくプッシュモードでレコードを配信します。

EFO モードで設定された構造化ストリーミングクエリは、専用のスループットと一意のコンシューマ名とコンシューマARN ( Amazonリソース名) をもつコンシューマとして、 Kinesis Data Stream に登録します。

デフォルトでは、Databricks はストリーミングクエリ ID にdatabricks_という接頭辞を付けて新しいコンシューマーに名前を付けます。必要に応じて、 consumerNamePrefixまたはconsumerNameオプションを指定することで、この動作を上書きできます。consumerNameは、文字、数字、および特殊文字_ . -のみを含む文字列でなければなりません。

クエリの再起動時に、 Kinesisソースはポーリング モードを使用して、コミットされていない最新のバッチが存在する場合はそれを再生します。 ストリームが未コミットのバッチを再生した後、ソースは以降の読み取りのためにEFOモードに戻ります。

重要

登録済みの EFO コンシューマーは、 Amazon Kinesis で追加料金が発生します。 クエリのティアダウン時にコンシューマーを自動的に登録解除するには、 requireConsumerDeregistration オプションを trueに設定します。 Databricks は、ドライバーのクラッシュやノード障害などのイベントに対する登録解除を保証できません。 ジョブが失敗した場合、Databricks では、Kinesis の過剰な料金を防ぐために、登録済みのコンシューマーを直接管理することをお勧めします。

Databricks ノートブックを使用したオフラインの消費者管理

AWSアカウント コンソールでコンシューマを手動で構成するのではなく、 AWSKinesisConsumerManagerユーティリティを使用して、 Kinesisデータ ストリームのコンシューマをプログラムで登録、一覧表示、または登録解除します。 例えば、このユーティリティを使用して新しいストリームのコンシューマーを作成したり、ストリームを完全に停止する場合は、このユーティリティを使用してAWS上のコンシューマーを削除したりできます。

コンシューマ マネージャ ユーティリティは、コンピュートが専用アクセス モードに設定されているScalaでのみ使用できます。 アクセスモードを参照してください。

Databricksノートブックでこのユーティリティを使用するには:

  1. アクティブなクラスターに接続された新しい Databricks ノートブックで、必要な認証情報を使用してAWSKinesisConsumerManagerを作成します。

    Scala
    import com.databricks.sql.kinesis.AWSKinesisConsumerManager

    val manager = AWSKinesisConsumerManager.newManager()
    .option("serviceCredential", serviceCredentialName)
    .option("region", kinesisRegion)
    .create()
  2. コンシューマーをリストして表示します。

    Scala
    val consumers = manager.listConsumers("<stream name>")
    display(consumers)
  3. 指定されたストリームのコンシューマを登録します。

    Scala
    val consumerARN = manager.registerConsumer("<stream name>", "<consumer name>")
  4. 指定されたストリームのコンシューマーの登録を解除します。

    Scala
    manager.deregisterConsumer("<stream name>", "<consumer name>")