メインコンテンツまでスキップ

Amazon Kinesis に接続する

構造化ストリーミングを使用して、Amazon Kinesis へのデータの読み書きを行います。

Databricksは、すべてのS3トラフィックがAWSネットワーク上でルーティングされるように、S3 VPCエンドポイントを有効にすることを推奨します。

注記

Kinesisストリームを削除して再作成した場合、既存のチェックポイント ディレクトリを再利用してストリーミング クエリを再開することはできません。 チェックポイントディレクトリを削除し、クエリを最初からやり直す必要があります。構造化ストリーミングでは、ストリームを中断したり再開したりすることなく、シャード数を増やすことでシャーディングを再実行できます。

クエリのレイテンシに関するトラブルシューティングの推奨事項については、 「Kinesis を使用したレイテンシ削減のための推奨事項」を参照してください。

Amazon Kinesis で認証する

Databricks Runtime 16.1以降では、 Databricks Serviceの認証情報を使用してKinesisへの接続を管理することをDatabricks推奨しています。 サービス資格情報の作成を参照してください。

サービス資格情報を使用するには、次の手順を実行します。

  1. ロールを使用して、 Databricksにアクセスするために必要な権限を持つ サービスの資格情報を作成します。IAMKinesis「ステップ 1: IAMロールを作成する」を参照してください。
  2. ストリーミング読み取りを定義するときに、 serviceCredentialオプションを使用してサービス資格情報の名前を指定します。
注記

Kinesisソースには、 ListShardsGetRecords 、およびGetShardIterator権限が必要です。 Amazon: Access Denied例外が発生した場合は、 IAMロールにこれらの権限があることを確認してください。 IAMを使用したAmazon Kinesisデータストリームリソースへのアクセスの制御」を参照してください。

代替認証方法

Databricks Runtime 16.0 以前では、 Databricks資格情報は使用できません。 Databricksには、以下の代替認証方法があります。

インスタンスプロファイル

コンピュートの設定中にインスタンスプロファイルをアタッチします。 「インスタンスプロファイル」を参照してください。

インスタンスプロファイルは、標準アクセスモード(旧称:共有アクセスモード)ではサポートされていません。「標準コンピュートの要件と制限事項」を参照してください。

アクセスキーを直接使用する

awsAccessKeyawsSecretKeyオプションを設定してください。

キーを使用する場合は、Databricks シークレットを使用してキーを保存します。「秘密管理」を参照してください。

ロール IAM引き受ける

一部のコンピュート構成では、 roleArnオプションを使用してIAMロールを引き受けることができます。 役割を引き受けるには、役割を引き受ける権限を持ってクラスターを起動するか、 awsAccessKeyawsSecretKeyを通じてアクセスキーを提供します。

この方法は、アカウント間認証をサポートしています。詳細については、 IAMを使用したAWSアカウント間のアクセスの委任」を参照してください。

オプションで、外部 ID を roleExternalId で指定し、セッション名を roleSessionNameで指定できます。

スキーマ

Kinesis は、次のスキーマのレコードを返します。

タイプ

partitionKey

string

data

binary

stream

string

shardId

string

sequenceNumber

string

approximateArrivalTimestamp

タイムスタンプ

data列のデータを逆シリアル化するには、そのフィールドを文字列にキャストします。

クイックスタート

次のノートブックは、Kinesisを使用した構造化ストリーミングを使用して WordCount を実行する方法を示しています。

Kinesis WordCount と構造化ストリーミングノートブック

ノートブックを新しいタブで開く

Kinesis オプションの設定

Databricks Runtime 13.3 LTS 以降では、Kinesis で Trigger.AvailableNow を使用できます。 「 Kinesis レコードを増分バッチとして取り込む」を参照してください。

Databricks Runtime 16.1以降では、 streamARNを使用してKinesisソースを識別できます。 すべての Databricks Runtime バージョンにおいて、 streamNameまたはstreamARNいずれかを指定する必要がありますが、両方を指定することはできません。

警告

アクティブなストリーミングクエリでは、 streamNamestreamARNを切り替えないでください。Databricks 、ストリームの途中でのこれらのオプションの変更をサポートしていません。 クエリを再起動すると、重複レコードが発生したり、データが失われたりする可能性があります。streamNameからstreamARNに切り替えるには、新しいチェックポイントディレクトリを使用して新しいストリーミングクエリを開始します。

Kinesisデータソースの一般的な設定を次に示します。

オプション

デフォルト

説明

streamName

ストリーム名のコンマ区切りリスト

なし

サブスクライブするストリーム名

streamARN

KinesisストリームARNsのコンマ区切りリスト。たとえば、 "arn:aws:kinesis:myarn1,arn:aws:kinesis:myarn2".

なし

サブスクライブするストリームの ARNs 。 Databricks Runtime 16.1 以降で使用できます。

region

指定するストリームのリージョン

ローカルで解決されたリージョン

ストリームが定義されているリージョン

endpoint

Kinesis データストリームのリージョン

ローカルで解決されたリージョン

Kinesis Data Streams のリージョンエンドポイント

initialPosition

latest, trim_horizon, earliest (trim_horizonの別名), at_timestamp.

latest

ストリーム内のどこから読み取りを開始するか。

タイムスタンプの Java デフォルト形式 ( {"at_timestamp": "06/25/2020 10:23:45 PDT"}など) を使用して、at_timestamp を JSON 文字列として指定します。ストリーミング クエリは、指定されたタイムスタンプ (両端を含む) 以降のすべての変更を読み取ります。 形式を明示的に指定するには、JSON 文字列に追加のフィールド ( {"at_timestamp": "06/25/2020 10:23:45 PDT", "format": "MM/dd/yyyy HH:mm:ss ZZZ"}など) を指定します。

maxRecordsPerFetch

正の整数

10,000

KinesisへのAPIリクエストごとに読み込むレコード数。Kinesis Producerを使用してサブレコードが単一のレコードに集約されたかどうかによって、返されるレコードの数が増える可能性があります。 。

maxFetchRate

データレートを表す正の 10 進数(MB/秒単位)

1.0(最大 = 2.0)

シャードごとにデータをプリフェッチする速度。このオプションはフェッチのレートを制限し、Kinesisのスロットリングを回避します。Kinesisが許容する最大転送速度は2.0MB/秒です。

minFetchPeriod

期間を表す文字列。例:1 秒の場合は 1s

400ms(最小 = 200ms)

連続するプリフェッチ試行間の待機時間。このオプションはフェッチの頻度を制限し、Kinesisのスロットリングを回避します。Kinesisでは最大5回のフェッチ/秒しか許可されないため、200msが最小値となります。

maxFetchDuration

期間を表す文字列。例:1 分の場合は 1m

10s

プリフェッチされた新しいデータを処理に利用可能にする前にバッファリングする時間。

fetchBufferSize

バイトを表す文字列。例:2gb10mb

20gb

次のトリガーのためにバッファリングするデータの量。このオプションは停止条件であり、厳密な上限ではありません。この値で指定されているよりも多くのデータがバッファリングされる可能性があります。

shardsPerTask

正の整数

5

Sparkタスクごとに並行してプリフェッチするKinesisシャードの数。 理想的には、クエリのレイテンシを最小限に抑え、リソースの使用量を最大化するために、 # cores in cluster >= # Kinesis shards / shardsPerTaskあるべきです。

shardFetchInterval

期間を表す文字列。例:2 分の場合は 2m

1s

Kinesisのリシャーディングをポーリングする間隔。

serviceCredential

文字列

デフォルトはありません。

Databricks サービスの資格情報の名前。「サービス資格情報の作成」を参照してください。

awsAccessKey

文字列

デフォルトはありません。

AWS アクセスキー。

awsSecretKey

文字列

デフォルトはありません。

アクセスキーに対応する AWS シークレットアクセスキー。

roleArn

文字列

デフォルトはありません。

Kinesis にアクセスするときに引き受けるロールの Amazon リソース名(ARN)。

roleExternalId

文字列

デフォルトはありません。

AWS アカウントへのアクセスを委任する場合に使用できる任意の値。「外部 ID の使用方法」を参照してください。

roleSessionName

文字列

デフォルトはありません。

同じロールが異なるプリンシパルによって、または異なる理由で引き受けられる場合に、セッションを一意に識別するための、引き受けられるロールセッションの識別子。

coalesceThresholdBlockSize

正の整数

10,000,000

自動結合が発生するしきい値。平均ブロックサイズがこの値より小さい場合、プリフェッチされたブロックが coalesceBinSize になるように結合されます。

coalesceBinSize

正の整数

128,000,000

結合後のおおよそのブロックサイズ。

consumerMode

polling または efo

polling

ストリーミング クエリを実行するコンシューマーの種類。 「 ストリーミングクエリ読み取りのための Kinesis 拡張ファンアウト (EFO) の設定」を参照してください。 Databricks Runtime 11.3 LTS 以降で使用できます。

requireConsumerDeregistration

true または false

false

クエリの終了時に拡張ファンアウト コンシューマーを登録解除するかどうか。 consumerModeにはefoが必要です。Databricks Runtime 11.3 LTS 以降で使用できます。

maxShardsPerDescribe

正の整数 (最大 10000)。

100

シャードの一覧表示中に API 呼び出しごとに読み取るシャードの最大数。

consumerName

すべてのストリームで使用する単一のコンシューマー名、またはカンマ区切りのコンシューマー名のリスト。

ストリーミング クエリ ID

EFO モードで Kinesis サービスにクエリを登録するために使用されるコンシューマー名。 Databricks Runtime 11.3 LTS 以降で使用できます。

consumerNamePrefix

空の文字列またはカスタムプレフィックス文字列。

databricks_

EFO モードで Kinesis サービスにコンシューマーを登録するために consumerName と共に使用されるプレフィックス。 Databricks Runtime 16.0 以降で使用できます。

consumerRefreshInterval

デュレーション文字列 (例: 1 秒 (最大 3600 秒) の "1s")。

300番台

Kinesis EFO コンシューマー登録がチェックされ、更新される間隔。 Databricks Runtime 11.3 LTS 以降で使用できます。

registeredConsumerId

コンシューマ名または ARNsのカンマ区切りリスト。

なし

EFO モードの既存のコンシューマーの識別子。 Databricks Runtime 16.1 以降で使用できます。

registeredConsumerIdType

name または ARN

なし

コンシューマ ID が名前か ARNsかを指定します。 Databricks Runtime 16.1 以降で使用できます。

デフォルトのオプション値を使用すると、2 つのリーダー (Spark またはその他のリーダー) が Kinesis ストリームを同時に消費しても、Kinesis のレート制限に達することはありません。顧客数が増えた場合は、それに応じてオプションを調整してください。例えば、 maxFetchRateを減らしてminFetchPeriodを増やす必要があるかもしれません。

低遅延モニタリングとアラート

アラート機能のユースケースでは、低遅延が求められる。遅延を最小限に抑えるには:

  • ストリーミングクエリがKinesisストリームの唯一のコンシューマであることを確認して、フェッチパフォーマンスを最適化し、 Kinesisレート制限を回避します。
  • 取得したデータを可能な限り高速に処理するには、オプションmaxFetchDurationに200msなどの小さな値を設定してください。このオプションはトレードオフの関係にあります。各バッチで最新のレコードが確実に処理されることを保証するよりも、バッチごとの処理速度を優先するからです。例えば、 Trigger.AvailableNow使用する場合、値が小さいとクエリが Kinesis ストリームの最新レコードに追いつかなくなる可能性があります。
  • オプション minFetchPeriod を 210ms に設定して、できるだけ頻繁にフェッチします。
  • オプションshardsPerTaskを設定するか、クラスターを# cores in cluster >= 2 * (# Kinesis shards) / shardsPerTaskとなるように構成してください。これにより、バックグラウンドでのプリフェッチタスクとストリーミングクエリタスクが同時に実行されることが保証されます。

クエリが5秒ごとにデータを受信している場合、 Kinesisのレート制限を超える可能性があります。設定を確認してください。

Kinesisのメトリクスを監視する

Kinesisは、ワークスペースごとに、コンシューマーがストリームの開始時点からどれだけ遅れているかをミリ秒単位で報告します。avgMsBehindLatestmaxMsBehindLatest 、およびminMsBehindLatestメトリクスは、ストリーミング クエリ プロセスにおけるすべてのワークスペースの平均、最小、最大のミリ秒を提供します。 Databricksのモニタリング構造化ストリーミング クエリ」を参照してください。

ノートブックでストリームを実行している場合は、ストリーミングクエリの進行状況ダッシュボードの 「生データ」 タブにあるメトリクスを確認してください。以下に例を示します。

JSON
{
"sources": [
{
"description": "KinesisV2[stream]",
"metrics": {
"avgMsBehindLatest": "32000.0",
"maxMsBehindLatest": "32000",
"minMsBehindLatest": "32000"
}
}
]
}

Kinesis レコードを増分バッチとして取り込む

Databricks Runtime 13.3 LTS 以降では、Databricks はインクリメンタル バッチ セマンティクスのための Trigger.AvailableNow Kinesis データソースの使用をサポートしています。次に、基本設定について説明します。

  1. マイクロバッチの読み取りが available now モードでトリガーされると、Databricks クライアントによって現在の時刻が記録されます。
  2. Databricks は、この記録時刻と前のチェックポイントの間のタイムスタンプを持つすべてのレコードについて、ソースシステムにポーリングします。
  3. Databricks は、Trigger.AvailableNow セマンティクスを使用してこれらのレコードを読み込みます。

Databricks 、ベストエフォート型メカニズムを使用して、ストリーミング クエリ実行時にKinesisストリームに存在するすべてのレコードを試行して消費します。 タイムスタンプのわずかな差異やデータソースにおける順序の保証の欠如により、トリガーされたバッチには一部のレコードが含まれない可能性があります。省略されたレコードは、次にトリガーされるマイクロバッチで処理されます。

注記

レコードがあってもクエリが Kinesis ストリームからのレコードのフェッチに引き続き失敗する場合は、 maxFetchDuration 値を増やしてみてください。

AvailableNowを参照してください: 増分バッチ処理

Kinesis への書き込み

次のコードスニペットは、Kinesis にデータを書き込むための ForeachSink として使用できます。これには、Dataset[(String, Array[Byte])] が必要です。

注記

以下のコードスニペットは、「厳密に 1 回」ではなく、「 最低 1 回 」のセマンティクスを提供しています。

Kinesis Foreach シンクノートブック

ノートブックを新しいタブで開く

Kinesisでレイテンシを削減するための推奨事項

このセクションでは、 Kinesisストリームの遅延に関するさまざまな原因をトラブルシューティングするための推奨事項を示します。

Kinesisソースはバックグラウンド スレッドでSparkジョブを実行し、 Kinesisデータを定期的にプリフェッチし、そのデータをSparkエグキューゼター メモリにキャッシュします。 各プリフェッチ ステップが完了すると、ストリーミング クエリでキャッシュされたデータを処理できます。 プリフェッチのステップは、観測されるエンドツーエンドの遅延とスループットに大きな影響を与える。

プリフェッチのレイテンシーを短縮

クエリの待機時間を最小限に抑え、リソース使用量を最大化するように最適化するには、次の計算を使用します。

total number of CPU cores in the cluster (across all executors) >= total number of Kinesis shards / shardsPerTask.

重要

minFetchPeriod Kinesisシャードに対して、 ReadProvisionedThroughputExceededに達するまで複数のGetRecords API呼び出しを作成できます。例外が発生した場合でも、コネクタがKinesisシャードの利用率を最大化するため、問題にならない可能性があります。

レート制限エラーが多すぎることによる速度低下を回避

コネクタは、レート制限エラーが発生するたびに Kinesis から読み取られるデータ量を半分に減らし、このイベントをメッセージとともにログに記録します。 "Hit rate limit. Sleeping for 5 seconds."

ストリームが追いついてくる過程で、これらのエラーが表示されることがあります。ストリームが追いついた後にこれらのエラーが表示される場合は、AWS の Kinesis の容量を増やすか、Spark のプリフェッチオプションを調整することで、ワークロードを調整する必要があるかもしれません。

データをディスクにあふれさせないようにする

Kinesisストリームのデータ量が急激に増加すると、割り当てられたバッファ容量がいっぱいになり、新しいデータを追加するのに十分な速さで空き容量が確保されない可能性があります。 Sparkバッファからディスクにデータを書き出すと、ストリーム処理が遅くなり、ログに次のようなメッセージを含むイベントが表示されます。

Bash
./log4j.txt:879546:20/03/02 17:15:04 INFO BlockManagerInfo: Updated kinesis_49290928_1_ef24cc00-abda-4acd-bb73-cb135aed175c on disk on 10.0.208.13:43458 (current size: 88.4 MB, original size: 0.0 B)

スピルを回避するには、ノードを追加するかノードあたりのメモリを増やすことでクラスタ メモリ容量を増やすか、構成を減らしますfetchBufferSize

中断されたS3書き込みタスク

Sparkの投機的実行を有効にして、ストリーム処理の進行を妨げる可能性のある中断されたタスクを終了させます。タスクが過度に強制終了されないようにするには、この設定の分位数と乗数を慎重に調整してください。Databricksでは、 spark.speculation.multiplier3に、 spark.speculation.quantile0.95に設定し、必要に応じて調整することを推奨しています。

ステートフルストリームにおけるチェックポイント処理によるレイテンシを削減する

Databricks では、ステートフル ストリーミング クエリの変更ログ チェックポイント処理と共に RocksDB を使用することをお勧めします。 「変更ログのチェックポイント設定を有効にする」を参照してください。

ストリーミングクエリ読み取りのための Kinesis 拡張ファンアウト (EFO) の設定

Databricks Runtime 11.3以上では、Databricks Runtime KinesisコネクタがAmazon Kinesisの拡張ファンアウト(EFO)機能の使用をサポートします。

Kinesisの強化されたファンアウト機能は、コンシューマーごとにシャードあたり2MB/秒の専用スループットを提供し(ストリームあたり最大20個のコンシューマー)、プルモードではなくプッシュモードでレコードを配信します。

EFO モードで設定された構造化ストリーミングクエリは、専用のスループットと一意のコンシューマ名とコンシューマARN ( Amazonリソース名) をもつコンシューマとして、 Kinesis Data Stream に登録します。

デフォルトでは、Databricks はストリーミングクエリ ID にdatabricks_という接頭辞を付けて新しいコンシューマーに名前を付けます。必要に応じて、 consumerNamePrefixまたはconsumerNameオプションを指定することで、この動作を上書きできます。consumerNameは、文字、数字、および特殊文字_ . -のみを含む文字列でなければなりません。

クエリの再起動時に、 Kinesisソースはポーリング モードを使用して、コミットされていない最新のバッチが存在する場合はそれを再生します。 ストリームが未コミットのバッチを再生した後、ソースは以降の読み取りのためにEFOモードに戻ります。

重要

登録済みの EFO コンシューマーは、 Amazon Kinesis で追加料金が発生します。 クエリのティアダウン時にコンシューマーを自動的に登録解除するには、 requireConsumerDeregistration オプションを trueに設定します。 Databricks は、ドライバーのクラッシュやノード障害などのイベントに対する登録解除を保証できません。 ジョブが失敗した場合、Databricks では、Kinesis の過剰な料金を防ぐために、登録済みのコンシューマーを直接管理することをお勧めします。

高度なコンシューマー構成オプション

Databricks Runtime 16.1以降では、オプションregisteredConsumerIdregisteredConsumerIdTypeを使用して、既存のコンシューマーのEFOモードでの読み取りを設定します。

Python
df = (spark.readStream
.format("kinesis")
.option("streamName", "mystreamname1,mystreamname2")
.option("registeredConsumerId", "consumer1,consumer2")
.option("registeredConsumerIdType", "name")
.load()
)

既存のコンシューマは、名前または ARN を使用して指定できます。 設定で指定された Kinesis ストリームソースごとに 1 つのコンシューマー ID を指定する必要があります。

Databricks ノートブックを使用したオフラインの消費者管理

AWSアカウント コンソールでコンシューマを手動で構成するのではなく、 AWSKinesisConsumerManagerユーティリティを使用して、 Kinesisデータ ストリームのコンシューマをプログラムで登録、一覧表示、または登録解除します。 例えば、このユーティリティを使用して新しいストリームのコンシューマーを作成したり、ストリームを完全に停止する場合は、このユーティリティを使用してAWS上のコンシューマーを削除したりできます。

コンシューマ マネージャ ユーティリティは、コンピュートが専用アクセス モードに設定されているScalaでのみ使用できます。 アクセスモードを参照してください。

Databricksノートブックでこのユーティリティを使用するには:

  1. アクティブなクラスターに接続された新しい Databricks ノートブックで、必要な認証情報を使用してAWSKinesisConsumerManagerを作成します。

    Scala
    import com.databricks.sql.kinesis.AWSKinesisConsumerManager

    val manager = AWSKinesisConsumerManager.newManager()
    .option("serviceCredential", serviceCredentialName)
    .option("region", kinesisRegion)
    .create()
  2. コンシューマーをリストして表示します。

    Scala
    val consumers = manager.listConsumers("<stream name>")
    display(consumers)
  3. 指定されたストリームのコンシューマを登録します。

    Scala
    val consumerARN = manager.registerConsumer("<stream name>", "<consumer name>")
  4. 指定されたストリームのコンシューマーの登録を解除します。

    Scala
    manager.deregisterConsumer("<stream name>", "<consumer name>")