Amazon Kinesisに接続する
この記事では、構造化ストリーミングを使用して Amazon Kinesis に対してデータの読み取りと書き込みを行う方法について説明します。
Databricks では、S3 VPC エンドポイントを有効にして、すべての S3 トラフィックが AWS ネットワークでルーティングされるようにすることをお勧めします。
注:
Kinesis ストリームを削除して再作成した場合、既存のチェックポイントディレクトリを再利用してストリーミングクエリーを再開することはできません。 チェックポイントディレクトリを削除し、クエリーを最初から開始する必要があります。 構造化ストリーミングでは、ストリームを中断または再起動せずにシャードの数を増やすことで、リシャーディングを行うことができます。
「 Kinesis の使用に関する推奨事項」を参照してください。
Amazon Kinesis による認証
Databricks では、インスタンスプロファイルを使用して Kinesis への接続を管理することを推奨しています。 「 インスタンスプロファイル」を参照してください。
警告
場合によっては、共有アクセス モードではサポートされません。 シングル・ユーザー・アクセス・モード、または共有アクセス・モードでの代替認証方法を使用します。 Unity Catalogのコンピュート アクセス モード制限を参照してください。
アクセスにキーを使用する場合は、オプションawsAccessKey
およびawsSecretKey
を使用してキーを指定できます。
roleArn
オプションを使用してIAMロールを引き受けることもできます。オプションで、roleExternalId
で外部IDを指定し、roleSessionName
でセッション名を指定できます。ロールを引き受けるには、そのロールを引き受ける権限でクラスターを起動するか、awsAccessKey
およびawsSecretKey
を介してアクセスキーを提供します。クロスアカウント認証の場合、Databricksでは、roleArn
を使用して引き受けたロールを保持することをお勧めします。このロールは、Databricks AWSアカウントを通じて引き受けることができます。クロスアカウント認証の詳細については、IAMロールを使用した AWSアカウント間のアクセス権限の委譲を参照してください。
注:
Kinesis ソースには、 ListShards
、 GetRecords
、および GetShardIterator
権限が必要です。Amazon: Access Denied
例外が発生した場合は、ユーザーまたはプロファイルにこれらの権限があることを確認してください。詳細については、「IAM を使用した Amazon Kinesis Data Streams リソースへのアクセスの制御」を参照してください。
スキーマ
Kinesis は、次のスキーマを持つレコードを返します。
列 |
タイプ |
---|---|
partitionKey |
string |
data |
binary |
stream |
string |
ShardID |
string |
sequenceNumber |
string |
approximateArrivalTimestamp |
timestamp |
data
列のデータを逆シリアル化するには、フィールドを文字列にキャストします。
Kinesis オプションの設定
重要
Databricks Runtime 13.3 LTS 以降では、Kinesis でTrigger.AvailableNow
を使用できます。 「Kinesis レコードを増分バッチとして取り込む」を参照してください。
Kinesisデータソースの一般的な設定を次に示します。
オプション |
値 |
デフォルト |
説明 |
---|---|---|---|
streamName |
ストリーム名のコンマ区切りリスト |
なし(必須パラメータ) |
サブスクライブするストリーム名 |
region |
指定するストリームのリージョン |
ローカルで解決されたリージョン |
ストリームが定義されているリージョン |
endpoint |
Kinesis データストリームのリージョン |
ローカルで解決されたリージョン |
Kinesis Data Streams のリージョンエンドポイント |
initialPosition |
|
|
ストリーム内のどこから読み取りを開始するか。
|
maxRecordsPerFetch |
正の整数 |
10,000 |
Kinesis への API リクエストごとに読み取られるレコードの数。Kinesis Producer Library を使用してサブレコードを 1 つのレコードに集約しているかどうかに応じて、返されるレコードの数が実際にはさらに多くなる場合があります。 |
maxFetchRate |
データレートを表す正の 10 進数(MB/秒単位) |
1.0(最大 = 2.0) |
シャードごとのデータのプリフェッチ速度。これは、フェッチのレートを制限し、Kinesis のスロットリングを回避するためのオプションです。2.0MB/s は Kinesis で許可される最大レートです。 |
minFetchPeriod |
期間を表す文字列。例:1 秒の場合は |
400ms(最小 = 200ms) |
連続するプリフェッチ試行間の待ち時間。これは、フェッチの頻度を制限し、Kinesis のスロットリングを回避するためのオプションです。Kinesis では 1 秒あたり最大 5 回のフェッチが許可されるため、最小値は 200 ミリ秒となります。 |
maxFetchDuration |
期間を表す文字列。例:1 分の場合は |
10s |
プリフェッチされた新しいデータが処理可能になるまでのバッファー時間。 |
fetchBufferSize |
バイトを表す文字列。例: |
20gb |
次のトリガーのためにバッファーするデータの量。これは停止条件として使用されるものであり、厳密な上限ではないため、この値に指定された値よりも多くのデータがバッファーされる可能性があります。 |
shardsPerTask |
正の整数 |
5 |
1 つの Spark タスクで並行してプリフェッチする Kinesis シャードの数。クエリーのレイテンシーを最小限に抑え、リソースの使用量を最大化するには、 |
shardFetchInterval |
期間を表す文字列。例:2 分の場合は |
1s |
リシャーディングのために Kinesis をポーリングする頻度。 |
awsAccessKey |
文字列 |
デフォルトはありません。 |
AWS アクセスキー。 |
awsSecretKey |
文字列 |
デフォルトはありません。 |
アクセスキーに対応する AWS シークレットアクセスキー。 |
roleArn |
文字列 |
デフォルトはありません。 |
Kinesis にアクセスするときに引き受けるロールの Amazon リソース名(ARN)。 |
roleExternalId |
文字列 |
デフォルトはありません。 |
AWS アカウントへのアクセスを委任する場合に使用できる任意の値。「外部 ID の使用方法」を参照してください。 |
roleSessionName |
文字列 |
デフォルトはありません。 |
同じロールが異なるプリンシパルによって、または異なる理由で引き受けられる場合に、セッションを一意に識別するための、引き受けられるロールセッションの識別子。 |
coalesceThresholdBlockSize |
正の整数 |
10,000,000 |
自動結合が発生するしきい値。平均ブロックサイズがこの値より小さい場合、プリフェッチされたブロックが |
coalesceBinSize |
正の整数 |
128,000,000 |
結合後のおおよそのブロックサイズ。 |
consumerMode |
|
|
ストリーミングクエリーを実行するコンシューマタイプ。 「 ストリーミングクエリー読み取りのための Kinesis 拡張ファンアウト (EFO) の設定」を参照してください。 |
requireConsumerDeregistration |
|
|
クエリー終了時に拡張ファンアウトコンシューマーの登録を解除するかどうか。 |
注:
オプションのデフォルト値には、2 つのリーダー(Spark またはその他)が Kinesis のレート制限に達することなく同時に Kinesis ストリームを消費できるような値が選択されています。コンシューマーの数が多い場合は、それに応じてオプションを調整する必要があります。例えば、maxFetchRate
を減らし、minFetchPeriod
を増やす必要がある場合があります。
低レイテンシーのモニタリングとアラート
アラートのユースケースでは、レイテンシーを小さくすることが望まれます。これを実現するには:
Kinesis ストリームのコンシューマーが 1 つだけ(つまり、コンシューマーがストリーミングクエリーのみであり、他のコンシューマーは存在しないこと)になるようにしてください。これにより、Kinesis のレート制限に達することなく、可能な限り迅速にフェッチできるように最適化できます。
オプション
maxFetchDuration
を小さい値 (200 ミリ秒など) に設定して、フェッチされたデータの処理をできるだけ早く開始します。Trigger.AvailableNow
を使用している場合、これにより、Kinesis ストリームの最新のレコードに追いつかない可能性が高くなります。オプション
minFetchPeriod
を 210 ミリ秒に設定して、できるだけ頻繁にフェッチします。オプション
shardsPerTask
を設定するか、# cores in cluster >= 2 * (# Kinesis shards) / shardsPerTask
となるようにクラスターを設定します。これにより、バックグラウンドのプリフェッチタスクとストリーミングクエリータスクを同時に実行できるようになります。
クエリーが 5 秒ごとにデータを受信していることが観測された場合、Kinesis のレート制限に達している可能性があります。設定を見直してください。
Kinesis はどのようなメトリクスを報告するか
Kinesis は、各ワークスペースのストリームの開始からコンシューマーが遅れたミリ秒数を報告します。 ストリーミング クエリ プロセス内のすべてのワークスペース間のミリ秒数の平均、最小、最大は、 avgMsBehindLatest
、 maxMsBehindLatest
、および minMsBehindLatest
メトリクスとして取得できます。 「ソース metrics オブジェクト (Kinesis)」を参照してください。
ノートブックでストリームを実行している場合は、ストリーミングクエリー進行状況ダッシュボードの [ Raw Data ] タブに、次の例のようにメトリクスが表示されます。
{
"sources" : [ {
"description" : "KinesisV2[stream]",
"metrics" : {
"avgMsBehindLatest" : "32000.0",
"maxMsBehindLatest" : "32000",
"minMsBehindLatest" : "32000"
},
} ]
}
Kinesis レコードを増分バッチとして取り込む
Databricks Runtime 13.3 LTS以降では、 Databricks増分バッチ セマンティクスのためにKinesisデータソースで Trigger.AvailableNow
を使用することをサポートしています。 次に、基本的な設定について説明します。
マイクロバッチの読み取りが available now モードでトリガーされると、Databricks クライアントによって現在の時刻が記録されます。
Databricks は、この記録時刻と前のチェックポイントの間のタイムスタンプを持つすべてのレコードについて、ソースシステムにポーリングします。
Databricks は、
Trigger.AvailableNow
セマンティクスを使用してこれらのレコードを読み込みます。
Databricks は、ベストエフォート型メカニズムを使用して、ストリーミングクエリーの実行時に Kinesis ストリームに存在するすべてのレコードの使用を試みます。 タイムスタンプの潜在的な違いが小さく、データソースの順序付けが保証されていないため、一部のレコードはトリガーされたバッチに含まれない可能性があります。 省略されたレコードは、次にトリガーされるマイクロバッチの一部として処理されます。
注:
レコードがあってもクエリーが Kinesis ストリームからのレコードの取得に失敗し続ける場合は、 maxFetchDuration
値を増やしてみてください。
増分バッチ処理の構成を参照してください。
Kinesis への書き込み
次のコードスニペットは、Kinesis にデータを書き込むための ForeachSink
として使用できます。これには、Dataset[(String, Array[Byte])]
が必要です。
注:
以下のコードスニペットは、「厳密に 1 回」ではなく、「最低 1 回」のセマンティクスを提供しています。
Kinesisの使用に関する推奨事項
Kinesis クエリーでは、さまざまな理由でレイテンシーが発生する場合があります。 このセクションでは、遅延のトラブルシューティングに関する推奨事項について説明します。
Kinesis ソースは、バックグラウンドスレッドで Spark ジョブを実行して、Kinesis データを定期的にプリフェッチし、Spark エグゼキューターのメモリにキャッシュします。 ストリーミングクエリーは、各プリフェッチステップの完了後にキャッシュされたデータを処理し、データを処理できるようにします。 プリフェッチのステップは、観測されたエンドツーエンドのレイテンシとスループットに大きく影響します。
プリフェッチの待ち時間を短縮
クエリーのレイテンシーとリソース使用量の最大化を最適化するには、次の計算を使用します。
total number of CPU cores in the cluster (across all executors)
>= total number of Kinesis shards
/ shardsPerTask
.
重要
minFetchPeriod
は、 ReadProvisionedThroughputExceeded
に達するまで、Kinesis シャードに対して複数の GetRecords API コールを作成できます。 例外が発生した場合、コネクタは Kinesis シャードの使用率を最大化するため、問題を示すものではありません。
レート制限エラーが多すぎることによる速度低下を回避
コネクタは、レート制限エラーが発生するたびに Kinesis から読み取られるデータ量を半分に減らし、このイベントをメッセージとともにログに記録します。 "Hit rate limit. Sleeping for 5 seconds."
ストリームが追いつかれているときにこれらのエラーが表示されるのが一般的ですが、キャッチアップ後はこれらのエラーは表示されなくなります。 その場合は、Kinesis 側から (容量を増やして) 調整するか、プリフェッチオプションを調整する必要があります。
ディスクへのデータの流出を防ぐ
Kinesis ストリームに急激なスパイクが発生すると、割り当てられたバッファ容量がいっぱいになり、新しいデータを追加するのに十分な速さでバッファが空にならない可能性があります。
このような場合、Spark はバッファーからディスクにブロックをあふれさせ、処理を遅くし、ストリームのパフォーマンスに影響を与えます。 このイベントは、次のようなメッセージとともにログに表示されます。
./log4j.txt:879546:20/03/02 17:15:04 INFO BlockManagerInfo: Updated kinesis_49290928_1_ef24cc00-abda-4acd-bb73-cb135aed175c on disk on 10.0.208.13:43458 (current size: 88.4 MB, original size: 0.0 B)
この問題に対処するには、クラスターのメモリ容量を増やす (ノードを追加するか、ノードあたりのメモリを増やす) か、構成パラメーター fetchBufferSize
を調整します。
S3 書き込みタスクのハングアップ
Spark の投機を有効にして、ストリーム処理の進行を妨げるハング タスクを終了できます。 タスクが過度に積極的に終了しないようにするには、この設定の分位数と乗数を慎重に調整します。 良い出発点は、 spark.speculation.multiplier
を 3
に設定し、 spark.speculation.quantile
を 0.95
に設定することです。
ステートフルストリームでのチェックポイント設定に関連するレイテンシーを削減
Databricks では、ステートフル ストリーミング クエリーに changelog チェックポイント処理で RocksDB を使用することを推奨しています。 「 変更ログのチェックポイントを有効にする」を参照してください。
ストリーミングクエリ読み取り用にKinesis Enhanced Fan-Out (EFO) を設定する
Databricks Runtime 11.3以上では、Databricks Runtime KinesisコネクタがAmazon Kinesisの拡張ファンアウト(EFO)機能の使用をサポートします。
Kinesis enhanced fan-outは、シャードあたり、コンシューマーあたり2MB/秒の専用スループット(Kinesisストリームあたり最大20コンシューマー)で拡張ファンアウトストリームコンシューマーのサポートし、プルモードではなくプッシュモードで配信を記録する機能です。
Structured StreamingクエリーがEFOモードで実行されている場合、専用のスループットを持つコンシューマーとして動作し、Kinesis Data Streamsに登録されます。Kinesis Data Streamsに登録するには、生成されたコンシューマーARN(Amazon Resource Number)を今後の操作に使用できるように、クエリーで一意のコンシューマ名を指定する必要があります。明示的にコンシューマ−名を指定することも、ストリーミングクエリーIDをコンシューマー名として再利用することもできます。Databricksソースによって登録されたすべてのコンシューマーには、「databricks_」というプレフィックスが付いています。以前に登録されたコンシューマーを参照する構造化ストリーミングクエリーは、describeStreamConsumer
が返すconsumerARN
を使用します。
consumerName
フィールドを使用すると、ストリーミングクエリーに一意の名前を指定できます。名前を指定しないことを選択した場合は、ストリーミングクエリーIDが使用されます。consumerName
は、文字、数字、および_
(アンダースコア)、 .
(ドット)、 -
(ハイフン) などの特殊文字で構成される文字列である必要があります。
重要
登録されたEFOコンシューマーには、Amazon Kinesisで追加料金が発生します。クエリーのティアダウン時にコンシューマーを自動的に登録解除するには、requireConsumerDeregistration
オプションをtrue
に設定します。Databricksは、ドライバーのクラッシュやノードの障害などのイベントによる登録解除を保証できません。ジョブが失敗した場合、DatabricksはKinesisの過剰請求を防ぐために、登録されたコンシューマーを直接管理することを推奨します。
Databricksノートブックを使用したオフラインコンシューマー管理
Databricksは、Kinesisデータストリームに関連するコンシューマーの登録、一覧表示、登録解除を行うコンシューマー管理ユーティリティを提供します。次のコードは、Databricksノートブックでこのユーティリティを使用する方法を示しています。
アクティブなクラスターに接続された新しいDatabricksノートブックで、必要な認証情報を提供して
AWSKinesisConsumerManager
を作成します。import com.databricks.sql.kinesis.AWSKinesisConsumerManager val manager = AWSKinesisConsumerManager.newManager() .option("awsAccessKey", awsAccessKeyId) .option("awsSecretKey", awsSecretKey) .option("region", kinesisRegion) .create()
コンシューマーをリストして表示します。
val consumers = manager.listConsumers("<stream name>") display(consumers)
特定のストリームのコンシューマーを登録します。
val consumerARN = manager.registerConsumer("<stream name>", "<consumer name>")
特定のストリームのコンシューマーの登録を解除します。
manager.deregisterConsumer("<stream name>", "<consumer name>")