Amazon Kinesis に接続する
この記事では、構造化ストリーミングを使用して Amazon Kinesis に対してデータを読み書きする方法について説明します。
Databricks では、S3 VPC エンドポイントを有効にして、すべての S3 トラフィックが AWS ネットワーク上でルーティングされるようにすることをお勧めします。
Kinesis ストリームを削除して再作成すると、既存のチェックポイントディレクトリを再利用してストリーミングクエリを再開することはできません。 チェックポイント・ディレクトリを削除して、それらのクエリを最初から開始する必要があります。 構造化ストリーミングでリシャーディングするには、ストリームを中断したり再起動したりせずにシャードの数を増やします。
「Kinesis の使用に関する推奨事項」を参照してください。
Amazon Kinesis で認証する
Databricks では、Databricks サービス資格情報を使用して Kinesis への接続を管理することをお勧めします。 「サービス資格情報を使用して外部クラウド サービスへのアクセスを管理する」を参照してください。Databricks サービスの資格情報には、Databricks Runtime 16.2 以降が必要です。
サービス資格情報を使用するには、次の手順を実行します。
- ロールを使用して、 Databricksにアクセスするために必要な権限を持つ サービスの資格情報を作成します。IAMKinesis「ステップ 1: IAMロールを作成する」を参照してください。
- ストリーミング読み取りを定義するときに、
serviceCredential
オプションを使用して、サービス資格情報の名前を指定します。
Kinesis ソースには、 ListShards
、 GetRecords
、および GetShardIterator
のアクセス許可が必要です。 Amazon: Access Denied
例外が発生した場合は、 IAMロールにこれらの権限があることを確認してください。 詳細については 、「AmazonKinesisを使用した Data ストリーム リソースへのアクセスの制御IAM 」を参照してください。
代替認証方法
Databricks サービスの資格情報が使用できない場合、Databricks には次の代替認証方法が用意されています。
インスタンスプロファイル
コンピュートの設定中にインスタンスプロファイルをアタッチします。 「インスタンスプロファイル」を参照してください。
インスタンスプロファイルは、標準アクセスモード (以前の共有アクセスモード) ではサポートされていません。 Unity Catalogのコンピュート アクセス モードの制限を参照してください。
キーを直接使用する
アクセスにキーを使用するには、読み取りの設定中にオプション awsAccessKey
と awsSecretKey
を使用してキーを指定します。
キーを使用する場合は、Databricks シークレットを使用してキーを格納する必要があります。 シークレット管理を参照してください。
ロール IAM引き受ける
一部のコンピュート構成では、roleArn
オプションを使用してIAMロールを引き受けることができます。ロールを引き受けるには、ロールを引き受ける権限でクラスターを起動するか、 awsAccessKey
と awsSecretKey
を使用してアクセス キーを提供します。
この方法では、クロスアカウント認証をサポートできます。 クロスアカウント認証の詳細については、「ロールを使用した AWSアカウント間でのアクセスの委任IAM 」を参照してください。
オプションで、外部 ID を roleExternalId
で指定し、セッション名を roleSessionName
で指定できます。
スキーマ
Kinesis は、次のスキーマのレコードを返します。
列 | タイプ |
---|---|
partitionKey | string |
data | binary |
stream | string |
ShardID | string |
sequenceNumber | string |
approximateArrivalTimestamp | タイムスタンプ |
data
列のデータを逆シリアル化するには、フィールドを文字列にキャストします。
クイックスタート
次のノートブックは、Kinesisを使用した構造化ストリーミングを使用して WordCount を実行する方法を示しています。
Kinesis WordCount と構造化ストリーミングノートブック
Kinesis オプションの設定
Databricks Runtime 13.3 LTS 以降では、Kinesis で Trigger.AvailableNow
を使用できます。 「 Kinesis レコードを増分バッチとして取り込む」を参照してください。
Databricks Runtime 16.1 以降では、 streamARN
を使用して Kinesis ソースを識別できます。
すべての Databricks Runtime バージョンで、 streamName
または streamARN
を指定する必要があります。 これらのオプションのうち 1 つだけを指定できます。
Kinesisデータソースの一般的な設定を次に示します。
オプション | 値 | デフォルト | 説明 |
---|---|---|---|
streamName | ストリーム名のコンマ区切りリスト | なし | サブスクライブするストリーム名 |
ストリームARN | KinesisストリームARNsのコンマ区切りリスト。たとえば、 | なし | サブスクライブするストリームの ARNs 。 Databricks Runtime 16.1 以降で使用できます。 |
region | 指定するストリームのリージョン | ローカルで解決されたリージョン | ストリームが定義されているリージョン |
endpoint | Kinesis データストリームのリージョン | ローカルで解決されたリージョン | Kinesis Data Streams のリージョンエンドポイント |
initialPosition |
|
| ストリーム内のどこから読み取りを開始するか。 タイムスタンプの Java デフォルト形式 ( |
maxRecordsPerFetch | 正の整数 | 10,000 | Kinesis への API リクエストごとに読み取られるレコードの数。Kinesis Producer Library を使用してサブレコードを 1 つのレコードに集約しているかどうかに応じて、返されるレコードの数が実際にはさらに多くなる場合があります。 |
maxFetchRate | データレートを表す正の 10 進数(MB/秒単位) | 1.0(最大 = 2.0) | シャードごとのデータのプリフェッチ速度。これは、フェッチのレートを制限し、Kinesis のスロットリングを回避するためのオプションです。2.0MB/s は Kinesis で許可される最大レートです。 |
minFetchPeriod | 期間を表す文字列。例:1 秒の場合は | 400ms(最小 = 200ms) | 連続するプリフェッチ試行間の待ち時間。これは、フェッチの頻度を制限し、Kinesis のスロットリングを回避するためのオプションです。Kinesis では 1 秒あたり最大 5 回のフェッチが許可されるため、最小値は 200 ミリ秒となります。 |
maxFetchDuration | 期間を表す文字列。例:1 分の場合は | 10s | プリフェッチされた新しいデータが処理可能になるまでのバッファー時間。 |
fetchBufferSize | バイトを表す文字列。例: | 20gb | 次のトリガーのためにバッファーするデータの量。これは停止条件として使用されるものであり、厳密な上限ではないため、この値に指定された値よりも多くのデータがバッファーされる可能性があります。 |
shardsPerTask | 正の整数 | 5 | 1 つの Spark タスクで並行してプリフェッチする Kinesis シャードの数。クエリーのレイテンシーを最小限に抑え、リソースの使用量を最大化するには、 |
shardFetchInterval | 期間を表す文字列。例:2 分の場合は | 1s | リシャーディングのために Kinesis をポーリングする頻度。 |
サービスクレデンシャル | 文字列 | デフォルトはありません。 | Databricks サービスの資格情報の名前。 「サービス資格情報を使用して外部クラウド サービスへのアクセスを管理する」を参照してください。 |
awsAccessKey | 文字列 | デフォルトはありません。 | AWS アクセスキー。 |
awsSecretKey | 文字列 | デフォルトはありません。 | アクセスキーに対応する AWS シークレットアクセスキー。 |
roleArn | 文字列 | デフォルトはありません。 | Kinesis にアクセスするときに引き受けるロールの Amazon リソース名(ARN)。 |
roleExternalId | 文字列 | デフォルトはありません。 | AWS アカウントへのアクセスを委任する場合に使用できる任意の値。「外部 ID の使用方法」を参照してください。 |
roleSessionName | 文字列 | デフォルトはありません。 | 同じロールが異なるプリンシパルによって、または異なる理由で引き受けられる場合に、セッションを一意に識別するための、引き受けられるロールセッションの識別子。 |
coalesceThresholdBlockSize | 正の整数 | 10,000,000 | 自動結合が発生するしきい値。平均ブロックサイズがこの値より小さい場合、プリフェッチされたブロックが |
coalesceBinSize | 正の整数 | 128,000,000 | 結合後のおおよそのブロックサイズ。 |
consumerMode |
|
| ストリーミング クエリを実行するコンシューマーの種類。 「 ストリーミングクエリ読み取りのための Kinesis 拡張ファンアウト (EFO) の設定」を参照してください。 Databricks Runtime 11.3 LTS 以降で使用できます。 |
requireConsumerDeregistration |
|
| クエリの終了時に拡張ファンアウト コンシューマーを登録解除するかどうか。 |
maxShardsPerDescribeの | 正の整数 (最大 10000)。 | 100 | シャードの一覧表示中に API 呼び出しごとに読み取るシャードの最大数。 |
消費者の名前 | すべてのストリームで使用する単一のコンシューマ名、またはコンマ区切りのコンマ名のリスト。 | ストリーミング クエリ ID | EFO モードで Kinesis サービスにクエリを登録するために使用されるコンシューマー名。 Databricks Runtime 11.3 LTS 以降で使用できます。 |
consumerNameプレフィックス | 空の文字列またはカスタムプレフィックス文字列。 | databricks_ | EFO モードで Kinesis サービスにコンシューマーを登録するために consumerName と共に使用されるプレフィックス。 Databricks Runtime 16.0 以降で使用できます。 |
consumerRefreshInterval(消費者リフレッシュ間隔) | デュレーション文字列 (例: 1 秒 (最大 3600 秒) の "1s")。 | 300番台 | Kinesis EFO コンシューマー登録がチェックされ、更新される間隔。 Databricks Runtime 11.3 LTS 以降で使用できます。 |
registeredConsumerId (登録された消費者ID) | コンシューマ名または ARNsのカンマ区切りリスト。 | なし | EFO モードの既存のコンシューマーの識別子。 Databricks Runtime 16.1 以降で使用できます。 |
registeredConsumerIdType (登録済み消費者 IDTYPE) |
| なし | コンシューマ ID が名前か ARNsかを指定します。 Databricks Runtime 16.1 以降で使用できます。 |
オプションのデフォルト値には、2 つのリーダー(Spark またはその他)が Kinesis のレート制限に達することなく同時に Kinesis ストリームを消費できるような値が選択されています。コンシューマーの数が多い場合は、それに応じてオプションを調整する必要があります。例えば、maxFetchRate
を減らし、minFetchPeriod
を増やす必要がある場合があります。
低遅延モニタリングとアラート
アラートのユースケースでは、レイテンシーを小さくすることが望まれます。これを実現するには:
- Kinesis ストリームのコンシューマーが 1 つだけ(つまり、コンシューマーがストリーミングクエリーのみであり、他のコンシューマーは存在しないこと)になるようにしてください。これにより、Kinesis のレート制限に達することなく、可能な限り迅速にフェッチできるように最適化できます。
- オプション
maxFetchDuration
を小さい値 (たとえば 200ms) に設定すると、フェッチされたデータの処理をできるだけ早く開始できます。Trigger.AvailableNow
を使用している場合、Kinesis ストリームの最新のレコードに追いつくことができない可能性が高くなります。 - オプション
minFetchPeriod
を 210ms に設定して、できるだけ頻繁にフェッチします。 - オプション
shardsPerTask
を設定するか、# cores in cluster >= 2 * (# Kinesis shards) / shardsPerTask
となるようにクラスターを設定します。これにより、バックグラウンドのプリフェッチタスクとストリーミングクエリータスクを同時に実行できるようになります。
クエリーが 5 秒ごとにデータを受信していることが観測された場合、Kinesis のレート制限に達している可能性があります。設定を見直してください。
Kinesis はどのようなメトリクスをレポートしますか?
Kinesis は、各ワークスペースのストリームの開始からコンシューマーが遅れたミリ秒数を報告します。 ストリーミング クエリ プロセス内のすべてのワークスペースの平均、最小、および最大ミリ秒数を avgMsBehindLatest
、 maxMsBehindLatest
、および minMsBehindLatest
メトリクスとして取得できます。 「ソースメトリクスオブジェクト (Kinesis)」を参照してください。
ノートブックでストリームを実行している場合は、次の例のように、ストリーミング クエリの進行状況ダッシュボードの [Raw Data] (生データ ) タブにメトリクスが表示されます。
{
"sources": [
{
"description": "KinesisV2[stream]",
"metrics": {
"avgMsBehindLatest": "32000.0",
"maxMsBehindLatest": "32000",
"minMsBehindLatest": "32000"
}
}
]
}
Kinesis レコードを増分バッチとして取り込む
Databricks Runtime 13.3 LTS 以降では、Databricks はインクリメンタル バッチ セマンティクスのための Trigger.AvailableNow
Kinesis データソースの使用をサポートしています。次に、基本設定について説明します。
- マイクロバッチの読み取りが available now モードでトリガーされると、Databricks クライアントによって現在の時刻が記録されます。
- Databricks は、この記録時刻と前のチェックポイントの間のタイムスタンプを持つすべてのレコードについて、ソースシステムにポーリングします。
- Databricks は、
Trigger.AvailableNow
セマンティクスを使用してこれらのレコードを読み込みます。
Databricks は、ベストエフォート メカニズムを使用して、ストリーミング クエリの実行時に Kinesis ストリームに存在するすべてのレコードを使用しようとします。 タイムスタンプにわずかな違いがあり、データソースでの順序付けには保証がないため、一部のレコードがトリガーされたバッチに含まれない場合があります。 省略されたレコードは、次にトリガーされるマイクロバッチの一部として処理されます。
レコードがあってもクエリが Kinesis ストリームからのレコードのフェッチに引き続き失敗する場合は、 maxFetchDuration
値を増やしてみてください。
増分バッチ処理の構成を参照してください。
Kinesis への書き込み
次のコードスニペットは、Kinesis にデータを書き込むための ForeachSink
として使用できます。これには、Dataset[(String, Array[Byte])]
が必要です。
以下のコードスニペットは、「厳密に 1 回」ではなく、「 最低 1 回 」のセマンティクスを提供しています。
Kinesis Foreach シンクノートブック
Kinesis の使用に関する推奨事項
Kinesis クエリでは、さまざまな理由でレイテンシーが発生する可能性があります。 このセクションでは、遅延のトラブルシューティングに関する推奨事項について説明します。
Kinesis ソースは、バックグラウンドスレッドで Spark ジョブを実行して、Kinesis データを定期的にプリフェッチし、Spark エグゼキューターのメモリにキャッシュします。 ストリーミング クエリは、各プリフェッチ ステップが完了した後にキャッシュされたデータを処理し、データを処理に使用できるようにします。 プリフェッチ ステップは、観測されるエンドツーエンドのレイテンシとスループットに大きく影響します。
プリフェッチのレイテンシーを短縮
クエリの待機時間を最小限に抑え、リソース使用量を最大化するように最適化するには、次の計算を使用します。
total number of CPU cores in the cluster (across all executors)
>= total number of Kinesis shards
/ shardsPerTask
.
minFetchPeriod
は、Kinesis シャードが にヒットするまで、複数の GetRecords API コールを作成できますReadProvisionedThroughputExceeded
例外が発生した場合、コネクタが Kinesis シャードを最大限に活用するため、問題を示すものではありません。
レート制限エラーが多すぎることによる速度低下を回避
コネクタは、レート制限エラーが発生するたびに Kinesis から読み取られるデータ量を半分に減らし、このイベントをメッセージとともにログに記録します。 "Hit rate limit. Sleeping for 5 seconds."
ストリームが追いついているときにこれらのエラーが表示されるのは一般的ですが、追いついた後は、これらのエラーは表示されなくなります。 その場合は、Kinesis 側からチューニングする (容量を増やす) か、プリフェッチオプションを調整する必要があります。
データをディスクにあふれさせないようにする
Kinesis ストリームが突然急増すると、割り当てられたバッファ容量がいっぱいになり、新しいデータを追加するのに十分な速さでバッファが空にならない可能性があります。
このような場合、Spark はバッファーからディスクにブロックをスピルし、処理を遅くし、ストリームのパフォーマンスに影響を与えます。 このイベントは、次のようなメッセージと共にログに表示されます。
./log4j.txt:879546:20/03/02 17:15:04 INFO BlockManagerInfo: Updated kinesis_49290928_1_ef24cc00-abda-4acd-bb73-cb135aed175c on disk on 10.0.208.13:43458 (current size: 88.4 MB, original size: 0.0 B)
この問題に対処するには、クラスター・メモリー容量を増やす (ノードを追加するか、ノードあたりのメモリーを増やす) か、構成パラメーター を調整します fetchBufferSize
.
S3 書き込みタスクのハング
Spark 投機を有効にして、ストリーム処理の進行を妨げるハング タスクを終了できます。 タスクが過度に積極的に終了しないようにするには、この設定の分位数と乗数を慎重に調整します。 良い出発点は、 spark.speculation.multiplier
を 3
に、 spark.speculation.quantile
を 0.95
に設定することです。
ステートフルストリームのチェックポイント設定に関連するレイテンシーを削減
Databricks では、ステートフル ストリーミング クエリの変更ログ チェックポイント処理と共に RocksDB を使用することをお勧めします。 「変更ログのチェックポイント設定を有効にする」を参照してください。
ストリーミングクエリ読み取りのための Kinesis 拡張ファンアウト (EFO) の設定
Databricks Runtime 11.3以上では、Databricks Runtime KinesisコネクタがAmazon Kinesisの拡張ファンアウト(EFO)機能の使用をサポートします。
Kinesis enhanced fan-outは、シャードあたり、コンシューマーあたり2MB/秒の専用スループット(Kinesisストリームあたり最大20コンシューマー)で拡張ファンアウトストリームコンシューマーのサポートし、プルモードではなくプッシュモードで配信を記録する機能です。
By デフォルト, EFO モードで構成された構造化ストリーミング クエリは、それ自体を専用のスループットを持つコンシューマーとして登録するARN AmazonKinesisData ストリーム内の一意のコンシューマー名とコンシューマー ( リソース番号.
デフォルトでは、 Databricks はストリーミング クエリ ID と databricks_
プレフィックスを使用して、新しいコンシューマーに名前を付けます。 必要に応じて、 consumerNamePrefix
または consumerName
オプションを指定して、この動作をオーバーライドできます。 consumerName
は、文字、数字、特殊文字で構成される文字列である必要があります_ . -
。
登録済みの EFO コンシューマーは、 Amazon Kinesis で追加料金が発生します。 クエリのティアダウン時にコンシューマーを自動的に登録解除するには、 requireConsumerDeregistration
オプションを true
に設定します。 Databricks は、ドライバーのクラッシュやノード障害などのイベントに対する登録解除を保証できません。 ジョブが失敗した場合、Databricks では、Kinesis の過剰な料金を防ぐために、登録済みのコンシューマーを直接管理することをお勧めします。
高度なコンシューマー構成オプション
Databricks Runtime 16.1 以降では、次の例のように、オプション registeredConsumerId
と registeredConsumerIdType
を使用して、既存のコンシューマーを使用して EFO モードで読み取りを構成できます。
- Python
- Scala
df = (spark.readStream
.format("kinesis")
.option("streamName", "mystreamname1,mystreamname2")
.option("registeredConsumerId", "consumer1,consumer2")
.option("registeredConsumerIdType", "name")
.load()
)
val kinesis = spark.readStream
.format("kinesis")
.option("streamName", "mystreamname1,mystreamname2")
.option("registeredConsumerId", "consumer1,consumer2")
.option("registeredConsumerIdType", "name")
.load()
既存のコンシューマは、名前または ARN を使用して指定できます。 設定で指定された Kinesis ストリームソースごとに 1 つのコンシューマー ID を指定する必要があります。
Databricks ノートブックを使用したオフラインの消費者管理
Databricksは、Kinesisデータストリームに関連するコンシューマーの登録、一覧表示、登録解除を行うコンシューマー管理ユーティリティを提供します。次のコードは、Databricksノートブックでこのユーティリティを使用する方法を示しています。
-
アクティブなクラスターに接続された新しいDatabricksノートブックで、必要な認証情報を提供して
AWSKinesisConsumerManager
を作成します。Scalaimport com.databricks.sql.kinesis.AWSKinesisConsumerManager
val manager = AWSKinesisConsumerManager.newManager()
.option("serviceCredential", serviceCredentialName)
.option("region", kinesisRegion)
.create() -
コンシューマーをリストして表示します。
Scalaval consumers = manager.listConsumers("<stream name>")
display(consumers) -
特定のストリームのコンシューマーを登録します。
Scalaval consumerARN = manager.registerConsumer("<stream name>", "<consumer name>")
-
特定のストリームのコンシューマーの登録を解除します。
Scalamanager.deregisterConsumer("<stream name>", "<consumer name>")