Amazon Kinesis に接続する
構造化ストリーミングを使用して、Amazon Kinesis へのデータの読み書きを行います。
Databricksは、すべてのS3トラフィックがAWSネットワーク上でルーティングされるように、S3 VPCエンドポイントを有効にすることを推奨します。
Kinesisストリームを削除して再作成した場合、既存のチェックポイント ディレクトリを再利用してストリーミング クエリを再開することはできません。 チェックポイントディレクトリを削除し、クエリを最初からやり直す必要があります。構造化ストリーミングでは、ストリームを中断したり再開したりすることなく、シャード数を増やすことでシャーディングを再実行できます。
クエリのレイテンシに関するトラブルシューティングの推奨事項については、 「Kinesis を使用したレイテンシ削減のための推奨事項」を参照してください。
Amazon Kinesis で認証する
Databricks Runtime 16.1以降では、 Databricks Serviceの認証情報を使用してKinesisへの接続を管理することをDatabricks推奨しています。 サービス資格情報の作成を参照してください。
サービス資格情報を使用するには、次の手順を実行します。
- ロールを使用して、 Databricksにアクセスするために必要な権限を持つ サービスの資格情報を作成します。IAMKinesis「ステップ 1: IAMロールを作成する」を参照してください。
- ストリーミング読み取りを定義するときに、
serviceCredentialオプションを使用してサービス資格情報の名前を指定します。
Kinesisソースには、 ListShards 、 GetRecords 、およびGetShardIterator権限が必要です。 Amazon: Access Denied例外が発生した場合は、 IAMロールにこれらの権限があることを確認してください。 IAMを使用したAmazon Kinesisデータストリームリソースへのアクセスの制御」を参照してください。
代替認証方法
Databricks Runtime 16.0 以前では、 Databricks資格情報は使用できません。 Databricksには、以下の代替認証方法があります。
インスタンスプロファイル
コンピュートの設定中にインスタンスプロファイルをアタッチします。 「インスタンスプロファイル」を参照してください。
インスタンスプロファイルは、標準アクセスモード(旧称:共有アクセスモード)ではサポートされていません。「標準コンピュートの要件と制限事項」を参照してください。
アクセスキーを直接使用する
awsAccessKeyとawsSecretKeyオプションを設定してください。
キーを使用する場合は、Databricks シークレットを使用してキーを保存します。「秘密管理」を参照してください。
ロール IAM引き受ける
一部のコンピュート構成では、 roleArnオプションを使用してIAMロールを引き受けることができます。 役割を引き受けるには、役割を引き受ける権限を持ってクラスターを起動するか、 awsAccessKeyとawsSecretKeyを通じてアクセスキーを提供します。
この方法は、アカウント間認証をサポートしています。詳細については、 IAMを使用したAWSアカウント間のアクセスの委任」を参照してください。
オプションで、外部 ID を roleExternalId で指定し、セッション名を roleSessionNameで指定できます。
スキーマ
Kinesis は、次のスキーマのレコードを返します。
列 | タイプ |
|---|---|
| string |
| binary |
| string |
| string |
| string |
| タイムスタンプ |
data列のデータを逆シリアル化するには、そのフィールドを文字列にキャストします。
クイックスタート
次のノートブックは、Kinesisを使用した構造化ストリーミングを使用して WordCount を実行する方法を示しています。
Kinesis WordCount と構造化ストリーミングノートブック
Kinesis オプションの設定
Databricks Runtime 13.3 LTS 以降では、Kinesis で Trigger.AvailableNow を使用できます。 「 Kinesis レコードを増分バッチとして取り込む」を参照してください。
Databricks Runtime 16.1以降では、 streamARNを使用してKinesisソースを識別できます。 すべての Databricks Runtime バージョンにおいて、 streamNameまたはstreamARNいずれかを指定する必要がありますが、両方を指定することはできません。
アクティブなストリーミングクエリでは、 streamNameとstreamARNを切り替えないでください。Databricks 、ストリームの途中でのこれらのオプションの変更をサポートしていません。 クエリを再起動すると、重複レコードが発生したり、データが失われたりする可能性があります。streamNameからstreamARNに切り替えるには、新しいチェックポイントディレクトリを使用して新しいストリーミングクエリを開始します。
Kinesisデータソースの一般的な設定を次に示します。
オプション | 値 | デフォルト | 説明 |
|---|---|---|---|
| ストリーム名のコンマ区切りリスト | なし | サブスクライブするストリーム名 |
| KinesisストリームARNsのコンマ区切りリスト。たとえば、 | なし | サブスクライブするストリームの ARNs 。 Databricks Runtime 16.1 以降で使用できます。 |
| 指定するストリームのリージョン | ローカルで解決されたリージョン | ストリームが定義されているリージョン |
| Kinesis データストリームのリージョン | ローカルで解決されたリージョン | Kinesis Data Streams のリージョンエンドポイント |
|
|
| ストリーム内のどこから読み取りを開始するか。 タイムスタンプの Java デフォルト形式 ( |
| 正の整数 | 10,000 | KinesisへのAPIリクエストごとに読み込むレコード数。Kinesis Producerを使用してサブレコードが単一のレコードに集約されたかどうかによって、返されるレコードの数が増える可能性があります。 。 |
| データレートを表す正の 10 進数(MB/秒単位) | 1.0(最大 = 2.0) | シャードごとにデータをプリフェッチする速度。このオプションはフェッチのレートを制限し、Kinesisのスロットリングを回避します。Kinesisが許容する最大転送速度は2.0MB/秒です。 |
| 期間を表す文字列。例:1 秒の場合は | 400ms(最小 = 200ms) | 連続するプリフェッチ試行間の待機時間。このオプションはフェッチの頻度を制限し、Kinesisのスロットリングを回避します。Kinesisでは最大5回のフェッチ/秒しか許可されないため、200msが最小値となります。 |
| 期間を表す文字列。例:1 分の場合は | 10s | プリフェッチされた新しいデータを処理に利用可能にする前にバッファリングする時間。 |
| バイトを表す文字列。例: | 20gb | 次のトリガーのためにバッファリングするデータの量。このオプションは停止条件であり、厳密な上限ではありません。この値で指定されているよりも多くのデータがバッファリングされる可能性があります。 |
| 正の整数 | 5 | Sparkタスクごとに並行してプリフェッチするKinesisシャードの数。 理想的には、クエリのレイテンシを最小限に抑え、リソースの使用量を最大化するために、 |
| 期間を表す文字列。例:2 分の場合は | 1s | Kinesisのリシャーディングをポーリングする間隔。 |
| 文字列 | デフォルトはありません。 | Databricks サービスの資格情報の名前。「サービス資格情報の作成」を参照してください。 |
| 文字列 | デフォルトはありません。 | AWS アクセスキー。 |
| 文字列 | デフォルトはありません。 | アクセスキーに対応する AWS シークレットアクセスキー。 |
| 文字列 | デフォルトはありません。 | Kinesis にアクセスするときに引き受けるロールの Amazon リソース名(ARN)。 |
| 文字列 | デフォルトはありません。 | AWS アカウントへのアクセスを委任する場合に使用できる任意の値。「外部 ID の使用方法」を参照してください。 |
| 文字列 | デフォルトはありません。 | 同じロールが異なるプリンシパルによって、または異なる理由で引き受けられる場合に、セッションを一意に識別するための、引き受けられるロールセッションの識別子。 |
| 正の整数 | 10,000,000 | 自動結合が発生するしきい値。平均ブロックサイズがこの値より小さい場合、プリフェッチされたブロックが |
| 正の整数 | 128,000,000 | 結合後のおおよそのブロックサイズ。 |
|
|
| ストリーミング クエリを実行するコンシューマーの種類。 「 ストリーミングクエリ読み取りのための Kinesis 拡張ファンアウト (EFO) の設定」を参照してください。 Databricks Runtime 11.3 LTS 以降で使用できます。 |
|
|
| クエリの終了時に拡張ファンアウト コンシューマーを登録解除するかどうか。 |
| 正の整数 (最大 10000)。 | 100 | シャードの一覧表示中に API 呼び出しごとに読み取るシャードの最大数。 |
| すべてのストリームで使用する単一のコンシューマー名、またはカンマ区切りのコンシューマー名のリスト。 | ストリーミング クエリ ID | EFO モードで Kinesis サービスにクエリを登録するために使用されるコンシューマー名。 Databricks Runtime 11.3 LTS 以降で使用できます。 |
| 空の文字列またはカスタムプレフィックス文字列。 |
| EFO モードで Kinesis サービスにコンシューマーを登録するために consumerName と共に使用されるプレフィックス。 Databricks Runtime 16.0 以降で使用できます。 |
| デュレーション文字列 (例: 1 秒 (最大 3600 秒) の "1s")。 | 300番台 | Kinesis EFO コンシューマー登録がチェックされ、更新される間隔。 Databricks Runtime 11.3 LTS 以降で使用できます。 |
| コンシューマ名または ARNsのカンマ区切りリスト。 | なし | EFO モードの既存のコンシューマーの識別子。 Databricks Runtime 16.1 以降で使用できます。 |
|
| なし | コンシューマ ID が名前か ARNsかを指定します。 Databricks Runtime 16.1 以降で使用できます。 |
デフォルトのオプション値を使用すると、2 つのリーダー (Spark またはその他のリーダー) が Kinesis ストリームを同時に消費しても、Kinesis のレート制限に達することはありません。顧客数が増えた場合は、それに応じてオプションを調整してください。例えば、 maxFetchRateを減らしてminFetchPeriodを増やす必要があるかもしれません。
低遅延モニタリングとアラート
アラート機能のユースケースでは、低遅延が求められる。遅延を最小限に抑えるには:
- ストリーミングクエリがKinesisストリームの唯一のコンシューマであることを確認して、フェッチパフォーマンスを最適化し、 Kinesisレート制限を回避します。
- 取得したデータを可能な限り高速に処理するには、オプション
maxFetchDurationに200msなどの小さな値を設定してください。このオプションはトレードオフの関係にあります。各バッチで最新のレコードが確実に処理されることを保証するよりも、バッチごとの処理速度を優先するからです。例えば、Trigger.AvailableNow使用する場合、値が小さいとクエリが Kinesis ストリームの最新レコードに追いつかなくなる可能性があります。 - オプション
minFetchPeriodを 210ms に設定して、できるだけ頻繁にフェッチします。 - オプション
shardsPerTaskを設定するか、クラスターを# cores in cluster >= 2 * (# Kinesis shards) / shardsPerTaskとなるように構成してください。これにより、バックグラウンドでのプリフェッチタスクとストリーミングクエリタスクが同時に実行されることが保証されます。
クエリが5秒ごとにデータを受信している場合、 Kinesisのレート制限を超える可能性があります。設定を確認してください。
Kinesisのメトリクスを監視する
Kinesisは、ワークスペースごとに、コンシューマーがストリームの開始時点からどれだけ遅れているかをミリ秒単位で報告します。avgMsBehindLatest 、 maxMsBehindLatest 、およびminMsBehindLatestメトリクスは、ストリーミング クエリ プロセスにおけるすべてのワークスペースの平均、最小、最大のミリ秒を提供します。 Databricksのモニタリング構造化ストリーミング クエリ」を参照してください。
ノートブックでストリームを実行している場合は、ストリーミングクエリの進行状況ダッシュボードの 「生データ」 タブにあるメトリクスを確認してください。以下に例を示します。
{
"sources": [
{
"description": "KinesisV2[stream]",
"metrics": {
"avgMsBehindLatest": "32000.0",
"maxMsBehindLatest": "32000",
"minMsBehindLatest": "32000"
}
}
]
}
Kinesis レコードを増分バッチとして取り込む
Databricks Runtime 13.3 LTS 以降では、Databricks はインクリメンタル バッチ セマンティクスのための Trigger.AvailableNow Kinesis データソースの使用をサポートしています。次に、基本設定について説明します。
- マイクロバッチの読み取りが available now モードでトリガーされると、Databricks クライアントによって現在の時刻が記録されます。
- Databricks は、この記録時刻と前のチェックポイントの間のタイムスタンプを持つすべてのレコードについて、ソースシステムにポーリングします。
- Databricks は、
Trigger.AvailableNowセマンティクスを使用してこれらのレコードを読み込みます。
Databricks 、ベストエフォート型メカニズムを使用して、ストリーミング クエリ実行時にKinesisストリームに存在するすべてのレコードを試行して消費します。 タイムスタンプのわずかな差異やデータソースにおける順序の保証の欠如により、トリガーされたバッチには一部のレコードが含まれない可能性があります。省略されたレコードは、次にトリガーされるマイクロバッチで処理されます。
レコードがあってもクエリが Kinesis ストリームからのレコードのフェッチに引き続き失敗する場合は、 maxFetchDuration 値を増やしてみてください。
AvailableNowを参照してください: 増分バッチ処理。
Kinesis への書き込み
次のコードスニペットは、Kinesis にデータを書き込むための ForeachSink として使用できます。これには、Dataset[(String, Array[Byte])] が必要です。
以下のコードスニペットは、「厳密に 1 回」ではなく、「 最低 1 回 」のセマンティクスを提供しています。
Kinesis Foreach シンクノートブック
Kinesisでレイテンシを削減するための推奨事項
このセクションでは、 Kinesisストリームの遅延に関するさまざまな原因をトラブルシューティングするための推奨事項を示します。
Kinesisソースはバックグラウンド スレッドでSparkジョブを実行し、 Kinesisデータを定期的にプリフェッチし、そのデータをSparkエグキューゼター メモリにキャッシュします。 各プリフェッチ ステップが完了すると、ストリーミング クエリでキャッシュされたデータを処理できます。 プリフェッチのステップは、観測されるエンドツーエンドの遅延とスループットに大きな影響を与える。
プリフェッチのレイテンシーを短縮
クエリの待機時間を最小限に抑え、リソース使用量を最大化するように最適化するには、次の計算を使用します。
total number of CPU cores in the cluster (across all executors) >= total number of Kinesis shards / shardsPerTask.
minFetchPeriod Kinesisシャードに対して、 ReadProvisionedThroughputExceededに達するまで複数のGetRecords API呼び出しを作成できます。例外が発生した場合でも、コネクタがKinesisシャードの利用率を最大化するため、問題にならない可能性があります。
レート制限エラーが多すぎることによる速度低下を回避
コネクタは、レート制限エラーが発生するたびに Kinesis から読み取られるデータ量を半分に減らし、このイベントをメッセージとともにログに記録します。 "Hit rate limit. Sleeping for 5 seconds."
ストリームが追いついてくる過程で、これらのエラーが表示されることがあります。ストリームが追いついた後にこれらのエラーが表示される場合は、AWS の Kinesis の容量を増やすか、Spark のプリフェッチオプションを調整することで、ワークロードを調整する必要があるかもしれません。
データをディスクにあふれさせないようにする
Kinesisストリームのデータ量が急激に増加すると、割り当てられたバッファ容量がいっぱいになり、新しいデータを追加するのに十分な速さで空き容量が確保されない可能性があります。 Sparkバッファからディスクにデータを書き出すと、ストリーム処理が遅くなり、ログに次のようなメッセージを含むイベントが表示されます。
./log4j.txt:879546:20/03/02 17:15:04 INFO BlockManagerInfo: Updated kinesis_49290928_1_ef24cc00-abda-4acd-bb73-cb135aed175c on disk on 10.0.208.13:43458 (current size: 88.4 MB, original size: 0.0 B)
スピルを回避するには、ノードを追加するかノードあたりのメモリを増やすことでクラスタ メモリ容量を増やすか、構成を減らしますfetchBufferSize 。
中断されたS3書き込みタスク
Sparkの投機的実行を有効にして、ストリーム処理の進行を妨げる可能性のある中断されたタスクを終了させます。タスクが過度に強制終了されないようにするには、この設定の分位数と乗数を慎重に調整してください。Databricksでは、 spark.speculation.multiplierを3に、 spark.speculation.quantileを0.95に設定し、必要に応じて調整することを推奨しています。
ステートフルストリームにおけるチェックポイント処理によるレイテンシを削減する
Databricks では、ステートフル ストリーミング クエリの変更ログ チェックポイント処理と共に RocksDB を使用することをお勧めします。 「変更ログのチェックポイント設定を有効にする」を参照してください。
ストリーミングクエリ読み取りのための Kinesis 拡張ファンアウト (EFO) の設定
Databricks Runtime 11.3以上では、Databricks Runtime KinesisコネクタがAmazon Kinesisの拡張ファンアウト(EFO)機能の使用をサポートします。
Kinesisの強化されたファンアウト機能は、コンシューマーごとにシャードあたり2MB/秒の専用スループットを提供し(ストリームあたり最大20個のコンシューマー)、プルモードではなくプッシュモードでレコードを配信します。
EFO モードで設定された構造化ストリーミングクエリは、専用のスループットと一意のコンシューマ名とコンシューマARN ( Amazonリソース名) をもつコンシューマとして、 Kinesis Data Stream に登録します。
デフォルトでは、Databricks はストリーミングクエリ ID にdatabricks_という接頭辞を付けて新しいコンシューマーに名前を付けます。必要に応じて、 consumerNamePrefixまたはconsumerNameオプションを指定することで、この動作を上書きできます。consumerNameは、文字、数字、および特殊文字_ . -のみを含む文字列でなければなりません。
クエリの再起動時に、 Kinesisソースはポーリング モードを使用して、コミットされていない最新のバッチが存在する場合はそれを再生します。 ストリームが未コミットのバッチを再生した後、ソースは以降の読み取りのためにEFOモードに戻ります。
登録済みの EFO コンシューマーは、 Amazon Kinesis で追加料金が発生します。 クエリのティアダウン時にコンシューマーを自動的に登録解除するには、 requireConsumerDeregistration オプションを trueに設定します。 Databricks は、ドライバーのクラッシュやノード障害などのイベントに対する登録解除を保証できません。 ジョブが失敗した場合、Databricks では、Kinesis の過剰な料金を防ぐために、登録済みのコンシューマーを直接管理することをお勧めします。
高度なコンシューマー構成オプション
Databricks Runtime 16.1以降では、オプションregisteredConsumerIdとregisteredConsumerIdTypeを使用して、既存のコンシューマーのEFOモードでの読み取りを設定します。
- Python
- Scala
df = (spark.readStream
.format("kinesis")
.option("streamName", "mystreamname1,mystreamname2")
.option("registeredConsumerId", "consumer1,consumer2")
.option("registeredConsumerIdType", "name")
.load()
)
val kinesis = spark.readStream
.format("kinesis")
.option("streamName", "mystreamname1,mystreamname2")
.option("registeredConsumerId", "consumer1,consumer2")
.option("registeredConsumerIdType", "name")
.load()
既存のコンシューマは、名前または ARN を使用して指定できます。 設定で指定された Kinesis ストリームソースごとに 1 つのコンシューマー ID を指定する必要があります。
Databricks ノートブックを使用したオフラインの消費者管理
AWSアカウント コンソールでコンシューマを手動で構成するのではなく、 AWSKinesisConsumerManagerユーティリティを使用して、 Kinesisデータ ストリームのコンシューマをプログラムで登録、一覧表示、または登録解除します。 例えば、このユーティリティを使用して新しいストリームのコンシューマーを作成したり、ストリームを完全に停止する場合は、このユーティリティを使用してAWS上のコンシューマーを削除したりできます。
コンシューマ マネージャ ユーティリティは、コンピュートが専用アクセス モードに設定されているScalaでのみ使用できます。 アクセスモードを参照してください。
Databricksノートブックでこのユーティリティを使用するには:
-
アクティブなクラスターに接続された新しい Databricks ノートブックで、必要な認証情報を使用して
AWSKinesisConsumerManagerを作成します。Scalaimport com.databricks.sql.kinesis.AWSKinesisConsumerManager
val manager = AWSKinesisConsumerManager.newManager()
.option("serviceCredential", serviceCredentialName)
.option("region", kinesisRegion)
.create() -
コンシューマーをリストして表示します。
Scalaval consumers = manager.listConsumers("<stream name>")
display(consumers) -
指定されたストリームのコンシューマを登録します。
Scalaval consumerARN = manager.registerConsumer("<stream name>", "<consumer name>") -
指定されたストリームのコンシューマーの登録を解除します。
Scalamanager.deregisterConsumer("<stream name>", "<consumer name>")