Amazon Kinesis に接続する

この記事では、構造化ストリーミングを使用して Amazon Kinesis に対してデータの読み取りと書き込みを行う方法について説明します。

Databricks では、S3 VPC エンドポイントを有効にして、すべての S3 トラフィックが AWS ネットワークでルーティングされるようにすることをお勧めします。

注:

Kinesis ストリームを削除して再作成した場合、既存のチェックポイントディレクトリを再利用してストリーミングクエリーを再開することはできません。 チェックポイントディレクトリを削除し、クエリーを最初から開始する必要があります。 構造化ストリーミングでは、ストリームを中断または再起動せずにシャードの数を増やすことで、リシャーディングを行うことができます。

Kinesis の使用に関する推奨事項」を参照してください。

Amazon Kinesis による認証

Databricks では、インスタンスプロファイルを使用して Kinesis への接続を管理することを推奨しています。 「 インスタンスプロファイル」を参照してください。

警告

場合によっては、共有アクセス モードではサポートされません。 シングル・ユーザー・アクセス・モード、または共有アクセス・モードでの代替認証方法を使用します。 Unity Catalogのコンピュート アクセス モード制限を参照してください。

アクセスにキーを使用する場合は、オプションawsAccessKeyおよびawsSecretKeyを使用してキーを指定できます。

roleArnオプションを使用してIAMロールを引き受けることもできます。オプションで、roleExternalIdで外部IDを指定し、roleSessionNameでセッション名を指定できます。ロールを引き受けるには、そのロールを引き受ける権限でクラスターを起動するか、awsAccessKeyおよびawsSecretKeyを介してアクセスキーを提供します。クロスアカウント認証の場合、Databricksでは、roleArnを使用して引き受けたロールを保持することをお勧めします。このロールは、Databricks AWSアカウントを通じて引き受けることができます。クロスアカウント認証の詳細については、IAMロールを使用した AWSアカウント間のアクセス権限の委譲を参照してください。

注:

Kinesis ソースには、 ListShardsGetRecords 、および GetShardIterator 権限が必要です。Amazon: Access Denied 例外が発生した場合は、ユーザーまたはプロファイルにこれらの権限があることを確認してください。詳細については、「IAM を使用した Amazon Kinesis Data Streams リソースへのアクセスの制御」を参照してください。

スキーマ

Kinesis は、次のスキーマを持つレコードを返します。

タイプ

partitionKey

string

data

binary

stream

string

ShardID

string

sequenceNumber

string

approximateArrivalTimestamp

timestamp

data列のデータを逆シリアル化するには、フィールドを文字列にキャストします。

クイックスタート

次のノートブックは、Kinesisを使用した構造化ストリーミングを使用して WordCount を実行する方法を示しています。

構造化ストリーミング・ノートブックによる Kinesis WordCount

ノートブックを新しいタブで開く

Kinesis オプションの設定

重要

Databricks Runtime 13.3 LTS 以降では、Kinesis でTrigger.AvailableNowを使用できます。 「Kinesis レコードを増分バッチとして取り込む」を参照してください。

Kinesisデータソースの一般的な設定を次に示します。

オプション

デフォルト

説明

streamName

ストリーム名のコンマ区切りリスト

なし(必須パラメータ)

サブスクライブするストリーム名

region

指定するストリームのリージョン

ローカルで解決されたリージョン

ストリームが定義されているリージョン

endpoint

Kinesis データストリームのリージョン

ローカルで解決されたリージョン

Kinesis Data Streams のリージョンエンドポイント

initialPosition

latest, trim_horizon, earliest (trim_horizonの別名), at_timestamp.

latest

ストリーム内のどこから読み取りを開始するか。

at_timestamp は、タイムスタンプの Java デフォルト形式 ({"at_timestamp": "06/25/2020 10:23:45 PDT"}など) を使用して JSON 文字列として指定します。ストリーミングクエリーは、指定されたタイムスタンプ以降のすべての変更を読み取ります。 形式を明示的に指定するには、JSON 文字列に {"at_timestamp": "06/25/2020 10:23:45 PDT", "format": "MM/dd/yyyy HH:mm:ss ZZZ"}などの追加フィールドを指定します。

maxRecordsPerFetch

正の整数

10,000

Kinesis への API リクエストごとに読み取られるレコードの数。Kinesis Producer Library を使用してサブレコードを 1 つのレコードに集約しているかどうかに応じて、返されるレコードの数が実際にはさらに多くなる場合があります。

maxFetchRate

データレートを表す正の 10 進数(MB/秒単位)

1.0(最大 = 2.0)

シャードごとのデータのプリフェッチ速度。これは、フェッチのレートを制限し、Kinesis のスロットリングを回避するためのオプションです。2.0MB/s は Kinesis で許可される最大レートです。

minFetchPeriod

期間を表す文字列。例:1 秒の場合は 1s

400ms(最小 = 200ms)

連続するプリフェッチ試行間の待ち時間。これは、フェッチの頻度を制限し、Kinesis のスロットリングを回避するためのオプションです。Kinesis では 1 秒あたり最大 5 回のフェッチが許可されるため、最小値は 200 ミリ秒となります。

maxFetchDuration

期間を表す文字列。例:1 分の場合は 1m

10s

プリフェッチされた新しいデータが処理可能になるまでのバッファー時間。

fetchBufferSize

バイトを表す文字列。例:2gb10mb

20gb

次のトリガーのためにバッファーするデータの量。これは停止条件として使用されるものであり、厳密な上限ではないため、この値に指定された値よりも多くのデータがバッファーされる可能性があります。

shardsPerTask

正の整数

5

1 つの Spark タスクで並行してプリフェッチする Kinesis シャードの数。クエリーのレイテンシーを最小限に抑え、リソースの使用量を最大化するには、# cores in cluster >= # Kinesis shards / shardsPerTask となるようにするのが理想的です。

shardFetchInterval

期間を表す文字列。例:2 分の場合は 2m

1s

リシャーディングのために Kinesis をポーリングする頻度。

awsAccessKey

文字列

デフォルトはありません。

AWS アクセスキー。

awsSecretKey

文字列

デフォルトはありません。

アクセスキーに対応する AWS シークレットアクセスキー。

roleArn

文字列

デフォルトはありません。

Kinesis にアクセスするときに引き受けるロールの Amazon リソース名(ARN)。

roleExternalId

文字列

デフォルトはありません。

AWS アカウントへのアクセスを委任する場合に使用できる任意の値。「外部 ID の使用方法」を参照してください。

roleSessionName

文字列

デフォルトはありません。

同じロールが異なるプリンシパルによって、または異なる理由で引き受けられる場合に、セッションを一意に識別するための、引き受けられるロールセッションの識別子。

coalesceThresholdBlockSize

正の整数

10,000,000

自動結合が発生するしきい値。平均ブロックサイズがこの値より小さい場合、プリフェッチされたブロックが coalesceBinSize になるように結合されます。

coalesceBinSize

正の整数

128,000,000

結合後のおおよそのブロックサイズ。

consumerMode

polling または efo

polling

ストリーミングクエリーを実行するコンシューマタイプ。 「 ストリーミングクエリー読み取りのための Kinesis 拡張ファンアウト (EFO) の設定」を参照してください。

requireConsumerDeregistration

true または false

false

クエリー終了時に拡張ファンアウトコンシューマーの登録を解除するかどうか。consumerModeにはefoが必要です。

注:

オプションのデフォルト値には、2 つのリーダー(Spark またはその他)が Kinesis のレート制限に達することなく同時に Kinesis ストリームを消費できるような値が選択されています。コンシューマーの数が多い場合は、それに応じてオプションを調整する必要があります。例えば、maxFetchRate を減らし、minFetchPeriod を増やす必要がある場合があります。

低レイテンシーのモニタリングとアラート

アラートのユースケースでは、レイテンシーを小さくすることが望まれます。これを実現するには:

  • Kinesis ストリームのコンシューマーが 1 つだけ(つまり、コンシューマーがストリーミングクエリーのみであり、他のコンシューマーは存在しないこと)になるようにしてください。これにより、Kinesis のレート制限に達することなく、可能な限り迅速にフェッチできるように最適化できます。

  • オプション maxFetchDuration を小さい値 (200 ミリ秒など) に設定して、フェッチされたデータの処理をできるだけ早く開始します。 Trigger.AvailableNowを使用している場合、これにより、Kinesis ストリームの最新のレコードに追いつかない可能性が高くなります。

  • オプション minFetchPeriod を 210 ミリ秒に設定して、できるだけ頻繁にフェッチします。

  • オプション shardsPerTask を設定するか、# cores in cluster >= 2 * (# Kinesis shards) / shardsPerTask となるようにクラスターを設定します。これにより、バックグラウンドのプリフェッチタスクとストリーミングクエリータスクを同時に実行できるようになります。

クエリーが 5 秒ごとにデータを受信していることが観測された場合、Kinesis のレート制限に達している可能性があります。設定を見直してください。

Kinesis はどのようなメトリクスを報告しますか?

Kinesis は、各ワークスペースのストリームの開始からコンシューマーが遅れたミリ秒数を報告します。 ストリーミング クエリ プロセス内のすべてのワークスペース間のミリ秒数の平均、最小、最大は、 avgMsBehindLatestmaxMsBehindLatest、および minMsBehindLatest メトリクスとして取得できます。 「ソース metrics オブジェクト (Kinesis)」を参照してください。

ノートブックでストリームを実行している場合は、ストリーミングクエリー進行状況ダッシュボードの [ Raw Data ] タブに、次の例のようにメトリクスが表示されます。

{
  "sources" : [ {
    "description" : "KinesisV2[stream]",
    "metrics" : {
      "avgMsBehindLatest" : "32000.0",
      "maxMsBehindLatest" : "32000",
      "minMsBehindLatest" : "32000"
    },
  } ]
}

Kinesis レコードを増分バッチとして取り込む

Databricks Runtime 13.3 LTS以降では、 Databricks増分バッチ セマンティクスのためにKinesisデータソースで Trigger.AvailableNow を使用することをサポートしています。 次に、基本的な設定について説明します。

  1. マイクロバッチの読み取りが available now モードでトリガーされると、Databricks クライアントによって現在の時刻が記録されます。

  2. Databricks は、この記録時刻と前のチェックポイントの間のタイムスタンプを持つすべてのレコードについて、ソースシステムにポーリングします。

  3. Databricks は、Trigger.AvailableNow セマンティクスを使用してこれらのレコードを読み込みます。

Databricks は、ベストエフォート型メカニズムを使用して、ストリーミングクエリーの実行時に Kinesis ストリームに存在するすべてのレコードの使用を試みます。 タイムスタンプの潜在的な違いが小さく、データソースの順序付けが保証されていないため、一部のレコードはトリガーされたバッチに含まれない可能性があります。 省略されたレコードは、次にトリガーされるマイクロバッチの一部として処理されます。

注:

レコードがあってもクエリーが Kinesis ストリームからのレコードの取得に失敗し続ける場合は、 maxFetchDuration 値を増やしてみてください。

増分バッチ処理の構成を参照してください。

Kinesis への書き込み

次のコードスニペットは、Kinesis にデータを書き込むための ForeachSink として使用できます。これには、Dataset[(String, Array[Byte])] が必要です。

注:

以下のコードスニペットは、「厳密に 1 回」ではなく、「最低 1 回」のセマンティクスを提供しています。

Kinesis Foreach シンクノートブック

ノートブックを新しいタブで開く

Kinesis の使用に関する推奨事項

Kinesis クエリーでは、さまざまな理由でレイテンシーが発生する場合があります。 このセクションでは、遅延のトラブルシューティングに関する推奨事項について説明します。

Kinesis ソースは、バックグラウンドスレッドで Spark ジョブを実行して、Kinesis データを定期的にプリフェッチし、Spark エグゼキューターのメモリにキャッシュします。 ストリーミングクエリーは、各プリフェッチステップの完了後にキャッシュされたデータを処理し、データを処理できるようにします。 プリフェッチのステップは、観測されたエンドツーエンドのレイテンシとスループットに大きく影響します。

プリフェッチの待ち時間を短縮

クエリーのレイテンシーとリソース使用量の最大化を最適化するには、次の計算を使用します。

total number of CPU cores in the cluster (across all executors) >= total number of Kinesis shards / shardsPerTask.

重要

minFetchPeriod は、 ReadProvisionedThroughputExceededに達するまで、Kinesis シャードに対して複数の GetRecords API コールを作成できます。 例外が発生した場合、コネクタは Kinesis シャードの使用率を最大化するため、問題を示すものではありません。

レート制限エラーが多すぎることによる速度低下を回避

コネクタは、レート制限エラーが発生するたびに Kinesis から読み取られるデータ量を半分に減らし、このイベントをメッセージとともにログに記録します。 "Hit rate limit. Sleeping for 5 seconds."

ストリームが追いつかれているときにこれらのエラーが表示されるのが一般的ですが、キャッチアップ後はこれらのエラーは表示されなくなります。 その場合は、Kinesis 側から (容量を増やして) 調整するか、プリフェッチオプションを調整する必要があります。

ディスクへのデータの流出を防ぐ

Kinesis ストリームに急激なスパイクが発生すると、割り当てられたバッファ容量がいっぱいになり、新しいデータを追加するのに十分な速さでバッファが空にならない可能性があります。

このような場合、Spark はバッファーからディスクにブロックをあふれさせ、処理を遅くし、ストリームのパフォーマンスに影響を与えます。 このイベントは、次のようなメッセージとともにログに表示されます。

./log4j.txt:879546:20/03/02 17:15:04 INFO BlockManagerInfo: Updated kinesis_49290928_1_ef24cc00-abda-4acd-bb73-cb135aed175c on disk on 10.0.208.13:43458 (current size: 88.4 MB, original size: 0.0 B)

この問題に対処するには、クラスターのメモリ容量を増やす (ノードを追加するか、ノードあたりのメモリを増やす) か、構成パラメーター fetchBufferSizeを調整します。

S3 書き込みタスクのハングアップ

Spark の投機を有効にして、ストリーム処理の進行を妨げるハング タスクを終了できます。 タスクが過度に積極的に終了しないようにするには、この設定の分位数と乗数を慎重に調整します。 良い出発点は、 spark.speculation.multiplier3 に設定し、 spark.speculation.quantile0.95に設定することです。

ステートフルストリームでのチェックポイント設定に関連するレイテンシーを削減

Databricks では、ステートフル ストリーミング クエリーに changelog チェックポイント処理で RocksDB を使用することを推奨しています。 「 変更ログのチェックポイントを有効にする」を参照してください。

ストリーミングクエリ読み取り用にKinesis Enhanced Fan-Out (EFO) を設定する

Databricks Runtime 11.3以上では、Databricks Runtime KinesisコネクタがAmazon Kinesisの拡張ファンアウト(EFO)機能の使用をサポートします。

Kinesis enhanced fan-outは、シャードあたり、コンシューマーあたり2MB/秒の専用スループット(Kinesisストリームあたり最大20コンシューマー)で拡張ファンアウトストリームコンシューマーのサポートし、プルモードではなくプッシュモードで配信を記録する機能です。

Structured StreamingクエリーがEFOモードで実行されている場合、専用のスループットを持つコンシューマーとして動作し、Kinesis Data Streamsに登録されます。Kinesis Data Streamsに登録するには、生成されたコンシューマーARN(Amazon Resource Number)を今後の操作に使用できるように、クエリーで一意のコンシューマ名を指定する必要があります。明示的にコンシューマ−名を指定することも、ストリーミングクエリーIDをコンシューマー名として再利用することもできます。Databricksソースによって登録されたすべてのコンシューマーには、「databricks_」というプレフィックスが付いています。以前に登録されたコンシューマーを参照する構造化ストリーミングクエリーは、describeStreamConsumerが返すconsumerARNを使用します。

consumerNameフィールドを使用すると、ストリーミングクエリーに一意の名前を指定できます。名前を指定しないことを選択した場合は、ストリーミングクエリーIDが使用されます。consumerNameは、文字、数字、および_ (アンダースコア)、 . (ドット)、 - (ハイフン) などの特殊文字で構成される文字列である必要があります。

重要

登録されたEFOコンシューマーには、Amazon Kinesisで追加料金が発生します。クエリーのティアダウン時にコンシューマーを自動的に登録解除するには、requireConsumerDeregistrationオプションをtrueに設定します。Databricksは、ドライバーのクラッシュやノードの障害などのイベントによる登録解除を保証できません。ジョブが失敗した場合、DatabricksはKinesisの過剰請求を防ぐために、登録されたコンシューマーを直接管理することを推奨します。

Databricksノートブックを使用したオフラインコンシューマー管理

Databricksは、Kinesisデータストリームに関連するコンシューマーの登録、一覧表示、登録解除を行うコンシューマー管理ユーティリティを提供します。次のコードは、Databricksノートブックでこのユーティリティを使用する方法を示しています。

  1. アクティブなクラスターに接続された新しいDatabricksノートブックで、必要な認証情報を提供してAWSKinesisConsumerManagerを作成します。

    import com.databricks.sql.kinesis.AWSKinesisConsumerManager
    
    val manager = AWSKinesisConsumerManager.newManager()
    .option("awsAccessKey", awsAccessKeyId)
    .option("awsSecretKey", awsSecretKey)
    .option("region", kinesisRegion)
    .create()
    
  2. コンシューマーをリストして表示します。

    val consumers = manager.listConsumers("<stream name>")
    display(consumers)
    
  3. 特定のストリームのコンシューマーを登録します。

    val consumerARN = manager.registerConsumer("<stream name>", "<consumer name>")
    
  4. 特定のストリームのコンシューマーの登録を解除します。

    manager.deregisterConsumer("<stream name>", "<consumer name>")