メインコンテンツまでスキップ

Amazon Kinesis に接続する

この記事では、構造化ストリーミングを使用して Amazon Kinesis に対してデータを読み書きする方法について説明します。

Databricks では、S3 VPC エンドポイントを有効にして、すべての S3 トラフィックが AWS ネットワーク上でルーティングされるようにすることをお勧めします。

注記

Kinesis ストリームを削除して再作成すると、既存のチェックポイントディレクトリを再利用してストリーミングクエリを再開することはできません。 チェックポイント・ディレクトリを削除して、それらのクエリを最初から開始する必要があります。 構造化ストリーミングでリシャーディングするには、ストリームを中断したり再起動したりせずにシャードの数を増やします。

「Kinesis の使用に関する推奨事項」を参照してください。

Amazon Kinesis で認証する

Databricks では、Databricks サービス資格情報を使用して Kinesis への接続を管理することをお勧めします。 「サービス資格情報を使用して外部クラウド サービスへのアクセスを管理する」を参照してください。Databricks サービスの資格情報には、Databricks Runtime 16.2 以降が必要です。

サービス資格情報を使用するには、次の手順を実行します。

  1. ロールを使用して、 Databricksにアクセスするために必要な権限を持つ サービスの資格情報を作成します。IAMKinesis「ステップ 1: IAMロールを作成する」を参照してください。
  2. ストリーミング読み取りを定義するときに、 serviceCredential オプションを使用して、サービス資格情報の名前を指定します。
注記

Kinesis ソースには、 ListShardsGetRecords、および GetShardIterator のアクセス許可が必要です。 Amazon: Access Denied例外が発生した場合は、 IAMロールにこれらの権限があることを確認してください。 詳細については 、「AmazonKinesisを使用した Data ストリーム リソースへのアクセスの制御IAM 」を参照してください。

代替認証方法

Databricks サービスの資格情報が使用できない場合、Databricks には次の代替認証方法が用意されています。

インスタンスプロファイル

コンピュートの設定中にインスタンスプロファイルをアタッチします。 「インスタンスプロファイル」を参照してください。

インスタンスプロファイルは、標準アクセスモード (以前の共有アクセスモード) ではサポートされていません。 Unity Catalogのコンピュート アクセス モードの制限を参照してください。

キーを直接使用する

アクセスにキーを使用するには、読み取りの設定中にオプション awsAccessKeyawsSecretKeyを使用してキーを指定します。

キーを使用する場合は、Databricks シークレットを使用してキーを格納する必要があります。 シークレット管理を参照してください。

ロール IAM引き受ける

一部のコンピュート構成では、roleArnオプションを使用してIAMロールを引き受けることができます。ロールを引き受けるには、ロールを引き受ける権限でクラスターを起動するか、 awsAccessKeyawsSecretKeyを使用してアクセス キーを提供します。

この方法では、クロスアカウント認証をサポートできます。 クロスアカウント認証の詳細については、「ロールを使用した AWSアカウント間でのアクセスの委任IAM 」を参照してください。

注記

オプションで、外部 ID を roleExternalId で指定し、セッション名を roleSessionNameで指定できます。

スキーマ

Kinesis は、次のスキーマのレコードを返します。

タイプ

partitionKey

string

data

binary

stream

string

ShardID

string

sequenceNumber

string

approximateArrivalTimestamp

タイムスタンプ

data列のデータを逆シリアル化するには、フィールドを文字列にキャストします。

クイックスタート

次のノートブックは、Kinesisを使用した構造化ストリーミングを使用して WordCount を実行する方法を示しています。

Kinesis WordCount と構造化ストリーミングノートブック

Open notebook in new tab

Kinesis オプションの設定

important

Databricks Runtime 13.3 LTS 以降では、Kinesis で Trigger.AvailableNow を使用できます。 「 Kinesis レコードを増分バッチとして取り込む」を参照してください。

注記

Databricks Runtime 16.1 以降では、 streamARN を使用して Kinesis ソースを識別できます。

すべての Databricks Runtime バージョンで、 streamName または streamARNを指定する必要があります。 これらのオプションのうち 1 つだけを指定できます。

Kinesisデータソースの一般的な設定を次に示します。

オプション

デフォルト

説明

streamName

ストリーム名のコンマ区切りリスト

なし

サブスクライブするストリーム名

ストリームARN

KinesisストリームARNsのコンマ区切りリスト。たとえば、 "arn:aws:kinesis:myarn1,arn:aws:kinesis:myarn2".

なし

サブスクライブするストリームの ARNs 。 Databricks Runtime 16.1 以降で使用できます。

region

指定するストリームのリージョン

ローカルで解決されたリージョン

ストリームが定義されているリージョン

endpoint

Kinesis データストリームのリージョン

ローカルで解決されたリージョン

Kinesis Data Streams のリージョンエンドポイント

initialPosition

latest, trim_horizon, earliest (trim_horizonの別名), at_timestamp.

latest

ストリーム内のどこから読み取りを開始するか。

タイムスタンプの Java デフォルト形式 ( {"at_timestamp": "06/25/2020 10:23:45 PDT"}など) を使用して、at_timestamp を JSON 文字列として指定します。ストリーミング クエリは、指定されたタイムスタンプ (両端を含む) 以降のすべての変更を読み取ります。 形式を明示的に指定するには、JSON 文字列に追加のフィールド ( {"at_timestamp": "06/25/2020 10:23:45 PDT", "format": "MM/dd/yyyy HH:mm:ss ZZZ"}など) を指定します。

maxRecordsPerFetch

正の整数

10,000

Kinesis への API リクエストごとに読み取られるレコードの数。Kinesis Producer Library を使用してサブレコードを 1 つのレコードに集約しているかどうかに応じて、返されるレコードの数が実際にはさらに多くなる場合があります。

maxFetchRate

データレートを表す正の 10 進数(MB/秒単位)

1.0(最大 = 2.0)

シャードごとのデータのプリフェッチ速度。これは、フェッチのレートを制限し、Kinesis のスロットリングを回避するためのオプションです。2.0MB/s は Kinesis で許可される最大レートです。

minFetchPeriod

期間を表す文字列。例:1 秒の場合は 1s

400ms(最小 = 200ms)

連続するプリフェッチ試行間の待ち時間。これは、フェッチの頻度を制限し、Kinesis のスロットリングを回避するためのオプションです。Kinesis では 1 秒あたり最大 5 回のフェッチが許可されるため、最小値は 200 ミリ秒となります。

maxFetchDuration

期間を表す文字列。例:1 分の場合は 1m

10s

プリフェッチされた新しいデータが処理可能になるまでのバッファー時間。

fetchBufferSize

バイトを表す文字列。例:2gb10mb

20gb

次のトリガーのためにバッファーするデータの量。これは停止条件として使用されるものであり、厳密な上限ではないため、この値に指定された値よりも多くのデータがバッファーされる可能性があります。

shardsPerTask

正の整数

5

1 つの Spark タスクで並行してプリフェッチする Kinesis シャードの数。クエリーのレイテンシーを最小限に抑え、リソースの使用量を最大化するには、# cores in cluster >= # Kinesis shards / shardsPerTask となるようにするのが理想的です。

shardFetchInterval

期間を表す文字列。例:2 分の場合は 2m

1s

リシャーディングのために Kinesis をポーリングする頻度。

サービスクレデンシャル

文字列

デフォルトはありません。

Databricks サービスの資格情報の名前。 「サービス資格情報を使用して外部クラウド サービスへのアクセスを管理する」を参照してください。

awsAccessKey

文字列

デフォルトはありません。

AWS アクセスキー。

awsSecretKey

文字列

デフォルトはありません。

アクセスキーに対応する AWS シークレットアクセスキー。

roleArn

文字列

デフォルトはありません。

Kinesis にアクセスするときに引き受けるロールの Amazon リソース名(ARN)。

roleExternalId

文字列

デフォルトはありません。

AWS アカウントへのアクセスを委任する場合に使用できる任意の値。「外部 ID の使用方法」を参照してください。

roleSessionName

文字列

デフォルトはありません。

同じロールが異なるプリンシパルによって、または異なる理由で引き受けられる場合に、セッションを一意に識別するための、引き受けられるロールセッションの識別子。

coalesceThresholdBlockSize

正の整数

10,000,000

自動結合が発生するしきい値。平均ブロックサイズがこの値より小さい場合、プリフェッチされたブロックが coalesceBinSize になるように結合されます。

coalesceBinSize

正の整数

128,000,000

結合後のおおよそのブロックサイズ。

consumerMode

polling または efo

polling

ストリーミング クエリを実行するコンシューマーの種類。 「 ストリーミングクエリ読み取りのための Kinesis 拡張ファンアウト (EFO) の設定」を参照してください。 Databricks Runtime 11.3 LTS 以降で使用できます。

requireConsumerDeregistration

true または false

false

クエリの終了時に拡張ファンアウト コンシューマーを登録解除するかどうか。 consumerModeにはefoが必要です。Databricks Runtime 11.3 LTS 以降で使用できます。

maxShardsPerDescribeの

正の整数 (最大 10000)。

100

シャードの一覧表示中に API 呼び出しごとに読み取るシャードの最大数。

消費者の名前

すべてのストリームで使用する単一のコンシューマ名、またはコンマ区切りのコンマ名のリスト。

ストリーミング クエリ ID

EFO モードで Kinesis サービスにクエリを登録するために使用されるコンシューマー名。 Databricks Runtime 11.3 LTS 以降で使用できます。

consumerNameプレフィックス

空の文字列またはカスタムプレフィックス文字列。

databricks_

EFO モードで Kinesis サービスにコンシューマーを登録するために consumerName と共に使用されるプレフィックス。 Databricks Runtime 16.0 以降で使用できます。

consumerRefreshInterval(消費者リフレッシュ間隔)

デュレーション文字列 (例: 1 秒 (最大 3600 秒) の "1s")。

300番台

Kinesis EFO コンシューマー登録がチェックされ、更新される間隔。 Databricks Runtime 11.3 LTS 以降で使用できます。

registeredConsumerId (登録された消費者ID)

コンシューマ名または ARNsのカンマ区切りリスト。

なし

EFO モードの既存のコンシューマーの識別子。 Databricks Runtime 16.1 以降で使用できます。

registeredConsumerIdType (登録済み消費者 IDTYPE)

name または ARN

なし

コンシューマ ID が名前か ARNsかを指定します。 Databricks Runtime 16.1 以降で使用できます。

注記

オプションのデフォルト値には、2 つのリーダー(Spark またはその他)が Kinesis のレート制限に達することなく同時に Kinesis ストリームを消費できるような値が選択されています。コンシューマーの数が多い場合は、それに応じてオプションを調整する必要があります。例えば、maxFetchRate を減らし、minFetchPeriod を増やす必要がある場合があります。

低遅延モニタリングとアラート

アラートのユースケースでは、レイテンシーを小さくすることが望まれます。これを実現するには:

  • Kinesis ストリームのコンシューマーが 1 つだけ(つまり、コンシューマーがストリーミングクエリーのみであり、他のコンシューマーは存在しないこと)になるようにしてください。これにより、Kinesis のレート制限に達することなく、可能な限り迅速にフェッチできるように最適化できます。
  • オプション maxFetchDuration を小さい値 (たとえば 200ms) に設定すると、フェッチされたデータの処理をできるだけ早く開始できます。 Trigger.AvailableNowを使用している場合、Kinesis ストリームの最新のレコードに追いつくことができない可能性が高くなります。
  • オプション minFetchPeriod を 210ms に設定して、できるだけ頻繁にフェッチします。
  • オプション shardsPerTask を設定するか、# cores in cluster >= 2 * (# Kinesis shards) / shardsPerTask となるようにクラスターを設定します。これにより、バックグラウンドのプリフェッチタスクとストリーミングクエリータスクを同時に実行できるようになります。

クエリーが 5 秒ごとにデータを受信していることが観測された場合、Kinesis のレート制限に達している可能性があります。設定を見直してください。

Kinesis はどのようなメトリクスをレポートしますか?

Kinesis は、各ワークスペースのストリームの開始からコンシューマーが遅れたミリ秒数を報告します。 ストリーミング クエリ プロセス内のすべてのワークスペースの平均、最小、および最大ミリ秒数を avgMsBehindLatestmaxMsBehindLatest、および minMsBehindLatest メトリクスとして取得できます。 「ソースメトリクスオブジェクト (Kinesis)」を参照してください。

ノートブックでストリームを実行している場合は、次の例のように、ストリーミング クエリの進行状況ダッシュボードの [Raw Data] (生データ ) タブにメトリクスが表示されます。

JSON
{
"sources": [
{
"description": "KinesisV2[stream]",
"metrics": {
"avgMsBehindLatest": "32000.0",
"maxMsBehindLatest": "32000",
"minMsBehindLatest": "32000"
}
}
]
}

Kinesis レコードを増分バッチとして取り込む

Databricks Runtime 13.3 LTS 以降では、Databricks はインクリメンタル バッチ セマンティクスのための Trigger.AvailableNow Kinesis データソースの使用をサポートしています。次に、基本設定について説明します。

  1. マイクロバッチの読み取りが available now モードでトリガーされると、Databricks クライアントによって現在の時刻が記録されます。
  2. Databricks は、この記録時刻と前のチェックポイントの間のタイムスタンプを持つすべてのレコードについて、ソースシステムにポーリングします。
  3. Databricks は、Trigger.AvailableNow セマンティクスを使用してこれらのレコードを読み込みます。

Databricks は、ベストエフォート メカニズムを使用して、ストリーミング クエリの実行時に Kinesis ストリームに存在するすべてのレコードを使用しようとします。 タイムスタンプにわずかな違いがあり、データソースでの順序付けには保証がないため、一部のレコードがトリガーされたバッチに含まれない場合があります。 省略されたレコードは、次にトリガーされるマイクロバッチの一部として処理されます。

注記

レコードがあってもクエリが Kinesis ストリームからのレコードのフェッチに引き続き失敗する場合は、 maxFetchDuration 値を増やしてみてください。

増分バッチ処理の構成を参照してください。

Kinesis への書き込み

次のコードスニペットは、Kinesis にデータを書き込むための ForeachSink として使用できます。これには、Dataset[(String, Array[Byte])] が必要です。

注記

以下のコードスニペットは、「厳密に 1 回」ではなく、「 最低 1 回 」のセマンティクスを提供しています。

Kinesis Foreach シンクノートブック

Open notebook in new tab

Kinesis の使用に関する推奨事項

Kinesis クエリでは、さまざまな理由でレイテンシーが発生する可能性があります。 このセクションでは、遅延のトラブルシューティングに関する推奨事項について説明します。

Kinesis ソースは、バックグラウンドスレッドで Spark ジョブを実行して、Kinesis データを定期的にプリフェッチし、Spark エグゼキューターのメモリにキャッシュします。 ストリーミング クエリは、各プリフェッチ ステップが完了した後にキャッシュされたデータを処理し、データを処理に使用できるようにします。 プリフェッチ ステップは、観測されるエンドツーエンドのレイテンシとスループットに大きく影響します。

プリフェッチのレイテンシーを短縮

クエリの待機時間を最小限に抑え、リソース使用量を最大化するように最適化するには、次の計算を使用します。

total number of CPU cores in the cluster (across all executors) >= total number of Kinesis shards / shardsPerTask.

important

minFetchPeriod は、Kinesis シャードが にヒットするまで、複数の GetRecords API コールを作成できますReadProvisionedThroughputExceeded例外が発生した場合、コネクタが Kinesis シャードを最大限に活用するため、問題を示すものではありません。

レート制限エラーが多すぎることによる速度低下を回避

コネクタは、レート制限エラーが発生するたびに Kinesis から読み取られるデータ量を半分に減らし、このイベントをメッセージとともにログに記録します。 "Hit rate limit. Sleeping for 5 seconds."

ストリームが追いついているときにこれらのエラーが表示されるのは一般的ですが、追いついた後は、これらのエラーは表示されなくなります。 その場合は、Kinesis 側からチューニングする (容量を増やす) か、プリフェッチオプションを調整する必要があります。

データをディスクにあふれさせないようにする

Kinesis ストリームが突然急増すると、割り当てられたバッファ容量がいっぱいになり、新しいデータを追加するのに十分な速さでバッファが空にならない可能性があります。

このような場合、Spark はバッファーからディスクにブロックをスピルし、処理を遅くし、ストリームのパフォーマンスに影響を与えます。 このイベントは、次のようなメッセージと共にログに表示されます。

Bash
./log4j.txt:879546:20/03/02 17:15:04 INFO BlockManagerInfo: Updated kinesis_49290928_1_ef24cc00-abda-4acd-bb73-cb135aed175c on disk on 10.0.208.13:43458 (current size: 88.4 MB, original size: 0.0 B)

この問題に対処するには、クラスター・メモリー容量を増やす (ノードを追加するか、ノードあたりのメモリーを増やす) か、構成パラメーター を調整します fetchBufferSize.

S3 書き込みタスクのハング

Spark 投機を有効にして、ストリーム処理の進行を妨げるハング タスクを終了できます。 タスクが過度に積極的に終了しないようにするには、この設定の分位数と乗数を慎重に調整します。 良い出発点は、 spark.speculation.multiplier3 に、 spark.speculation.quantile0.95に設定することです。

ステートフルストリームのチェックポイント設定に関連するレイテンシーを削減

Databricks では、ステートフル ストリーミング クエリの変更ログ チェックポイント処理と共に RocksDB を使用することをお勧めします。 「変更ログのチェックポイント設定を有効にする」を参照してください。

ストリーミングクエリ読み取りのための Kinesis 拡張ファンアウト (EFO) の設定

Databricks Runtime 11.3以上では、Databricks Runtime KinesisコネクタがAmazon Kinesisの拡張ファンアウト(EFO)機能の使用をサポートします。

Kinesis enhanced fan-outは、シャードあたり、コンシューマーあたり2MB/秒の専用スループット(Kinesisストリームあたり最大20コンシューマー)で拡張ファンアウトストリームコンシューマーのサポートし、プルモードではなくプッシュモードで配信を記録する機能です。

By デフォルト, EFO モードで構成された構造化ストリーミング クエリは、それ自体を専用のスループットを持つコンシューマーとして登録するARN AmazonKinesisData ストリーム内の一意のコンシューマー名とコンシューマー ( リソース番号.

デフォルトでは、 Databricks はストリーミング クエリ ID と databricks_ プレフィックスを使用して、新しいコンシューマーに名前を付けます。 必要に応じて、 consumerNamePrefix または consumerName オプションを指定して、この動作をオーバーライドできます。 consumerNameは、文字、数字、特殊文字で構成される文字列である必要があります_ . -

important

登録済みの EFO コンシューマーは、 Amazon Kinesis で追加料金が発生します。 クエリのティアダウン時にコンシューマーを自動的に登録解除するには、 requireConsumerDeregistration オプションを trueに設定します。 Databricks は、ドライバーのクラッシュやノード障害などのイベントに対する登録解除を保証できません。 ジョブが失敗した場合、Databricks では、Kinesis の過剰な料金を防ぐために、登録済みのコンシューマーを直接管理することをお勧めします。

高度なコンシューマー構成オプション

Databricks Runtime 16.1 以降では、次の例のように、オプション registeredConsumerIdregisteredConsumerIdTypeを使用して、既存のコンシューマーを使用して EFO モードで読み取りを構成できます。

Python
df = (spark.readStream
.format("kinesis")
.option("streamName", "mystreamname1,mystreamname2")
.option("registeredConsumerId", "consumer1,consumer2")
.option("registeredConsumerIdType", "name")
.load()
)

既存のコンシューマは、名前または ARN を使用して指定できます。 設定で指定された Kinesis ストリームソースごとに 1 つのコンシューマー ID を指定する必要があります。

Databricks ノートブックを使用したオフラインの消費者管理

Databricksは、Kinesisデータストリームに関連するコンシューマーの登録、一覧表示、登録解除を行うコンシューマー管理ユーティリティを提供します。次のコードは、Databricksノートブックでこのユーティリティを使用する方法を示しています。

  1. アクティブなクラスターに接続された新しいDatabricksノートブックで、必要な認証情報を提供してAWSKinesisConsumerManagerを作成します。

    Scala
    import com.databricks.sql.kinesis.AWSKinesisConsumerManager

    val manager = AWSKinesisConsumerManager.newManager()
    .option("serviceCredential", serviceCredentialName)
    .option("region", kinesisRegion)
    .create()
  2. コンシューマーをリストして表示します。

    Scala
    val consumers = manager.listConsumers("<stream name>")
    display(consumers)
  3. 特定のストリームのコンシューマーを登録します。

    Scala
    val consumerARN = manager.registerConsumer("<stream name>", "<consumer name>")
  4. 特定のストリームのコンシューマーの登録を解除します。

    Scala
    manager.deregisterConsumer("<stream name>", "<consumer name>")