メインコンテンツまでスキップ

Amazon Kinesis に接続する

このページでは、構造化ストリーミングを使用してAmazon Kinesisにデータを読み書きする方法について説明します。

Databricksは、すべてのS3トラフィックがAWSネットワーク上でルーティングされるように、S3 VPCエンドポイントを有効にすることを推奨します。

注記

Kinesis ストリームを削除して再作成すると、既存のチェックポイント ディレクトリを再利用してストリーミング クエリを再開することはできません。チェックポイント ディレクトリを削除し、クエリを最初から開始する必要があります。ストリームを中断したり再開したりせずにシャードの数を増やすことで、構造化ストリーミングで再シャーディングできます。

「Kinesis の使用に関する推奨事項」を参照してください。

Amazon Kinesis で認証する

Databricks では、Databricks サービス資格情報を使用して Kinesis への接続を管理することをお勧めします。「サービス資格情報の作成」を参照してください。Databricks サービスの資格情報には、Databricks Runtime 16.1 以降が必要です。

サービス資格情報を使用するには、次の手順を実行します。

  1. ロールを使用して、 Databricksにアクセスするために必要な権限を持つ サービスの資格情報を作成します。IAMKinesis「ステップ 1: IAMロールを作成する」を参照してください。
  2. ストリーミング読み取りを定義するときに、 serviceCredentialオプションを使用してサービス資格情報の名前を指定します。
注記

Kinesisソースには、 ListShardsGetRecords 、およびGetShardIterator権限が必要です。 Amazon: Access Denied例外が発生した場合は、 IAMロールにこれらの権限があることを確認してください。 IAMを使用したAmazon Kinesisデータストリームリソースへのアクセスの制御」を参照してください。

代替認証方法

Databricksのサービス認証情報が利用できない場合、Databricksには以下の代替認証方法があります。

インスタンスプロファイル

コンピュートの設定中にインスタンスプロファイルをアタッチします。 「インスタンスプロファイル」を参照してください。

インスタンスプロファイルは、標準アクセスモード (以前の共有アクセスモード) ではサポートされていません。 標準コンピュートの要件と制限を参照してください。

キーを直接使用する

awsAccessKeyawsSecretKeyオプションを設定してください。

キーを使用する場合は、Databricks シークレットを使用してキーを保存します。「秘密管理」を参照してください。

ロール IAM引き受ける

一部のコンピュート構成では、 roleArnオプションを使用してIAMロールを引き受けることができます。 役割を引き受けるには、役割を引き受ける権限を持ってクラスターを起動するか、 awsAccessKeyawsSecretKeyを通じてアクセスキーを提供します。

この方法は、アカウント間認証をサポートしています。詳細については、 IAMを使用したAWSアカウント間のアクセスの委任」を参照してください。

オプションで、外部 ID を roleExternalId で指定し、セッション名を roleSessionNameで指定できます。

スキーマ

Kinesis は、次のスキーマのレコードを返します。

タイプ

partitionKey

string

data

binary

stream

string

shardId

string

sequenceNumber

string

approximateArrivalTimestamp

タイムスタンプ

data列のデータを逆シリアル化するには、そのフィールドを文字列にキャストします。

クイックスタート

次のノートブックは、Kinesisを使用した構造化ストリーミングを使用して WordCount を実行する方法を示しています。

Kinesis WordCount と構造化ストリーミングノートブック

ノートブックを新しいタブで開く

Kinesis オプションの設定

重要

Databricks Runtime 13.3 LTS 以降では、Kinesis で Trigger.AvailableNow を使用できます。 「 Kinesis レコードを増分バッチとして取り込む」を参照してください。

注記

Databricks Runtime 16.1 以降では、 streamARN を使用して Kinesis ソースを識別できます。

すべての Databricks Runtime バージョンで、 streamName または streamARNを指定する必要があります。 これらのオプションのうち 1 つだけを指定できます。

警告

アクティブなストリーミングクエリでは、 streamNamestreamARNを切り替えないでください。Databricks 、これらのオプションを処理途中で変更することをサポートしておらず、クエリを再開すると、重複レコードが発生したり、データが失われたりする可能性があります。 streamNameからstreamARNに変更する必要がある場合は、新しいチェックポイントディレクトリを使用して新しいストリーミングクエリを開始してください。

Kinesisデータソースの一般的な設定を次に示します。

オプション

デフォルト

説明

streamName

ストリーム名のコンマ区切りリスト

なし

サブスクライブするストリーム名

ストリームARN

KinesisストリームARNsのコンマ区切りリスト。たとえば、 "arn:aws:kinesis:myarn1,arn:aws:kinesis:myarn2".

なし

サブスクライブするストリームの ARNs 。 Databricks Runtime 16.1 以降で使用できます。

region

指定するストリームのリージョン

ローカルで解決されたリージョン

ストリームが定義されているリージョン

endpoint

Kinesis データストリームのリージョン

ローカルで解決されたリージョン

Kinesis Data Streams のリージョンエンドポイント

initialPosition

latest, trim_horizon, earliest (trim_horizonの別名), at_timestamp.

latest

ストリーム内のどこから読み取りを開始するか。

タイムスタンプの Java デフォルト形式 ( {"at_timestamp": "06/25/2020 10:23:45 PDT"}など) を使用して、at_timestamp を JSON 文字列として指定します。ストリーミング クエリは、指定されたタイムスタンプ (両端を含む) 以降のすべての変更を読み取ります。 形式を明示的に指定するには、JSON 文字列に追加のフィールド ( {"at_timestamp": "06/25/2020 10:23:45 PDT", "format": "MM/dd/yyyy HH:mm:ss ZZZ"}など) を指定します。

maxRecordsPerFetch

正の整数

10,000

KinesisへのAPIリクエストごとに読み込むレコード数。Kinesis Producerを使用してサブレコードが単一のレコードに集約されたかどうかによって、返されるレコードの数が増える可能性があります。 。

maxFetchRate

データレートを表す正の 10 進数(MB/秒単位)

1.0(最大 = 2.0)

シャードごとにデータをプリフェッチする速度。このオプションはフェッチのレートを制限し、Kinesisのスロットリングを回避します。Kinesisが許容する最大転送速度は2.0MB/秒です。

minFetchPeriod

期間を表す文字列。例:1 秒の場合は 1s

400ms(最小 = 200ms)

連続するプリフェッチ試行の間隔をどれくらい空けるか。このオプションはフェッチの頻度を制限し、Kinesisのスロットリングを回避します。Kinesisでは最大5回のフェッチ/秒しか許可されないため、200msが最小値となります。

maxFetchDuration

期間を表す文字列。例:1 分の場合は 1m

10s

プリフェッチされた新しいデータが処理可能になるまでのバッファー時間。

fetchBufferSize

バイトを表す文字列。例:2gb10mb

20gb

次のトリガーのためにバッファリングするデータの量。このオプションは停止条件であり、厳密な上限ではありません。この値で指定されているよりも多くのデータがバッファリングされる可能性があります。

shardsPerTask

正の整数

5

Sparkごとに並列でプリフェッチするKinesisシャードの数。 理想的には、クエリのレイテンシを最小限に抑え、リソースの使用量を最大化するために、 # cores in cluster >= # Kinesis shards / shardsPerTaskあるべきです。

shardFetchInterval

期間を表す文字列。例:2 分の場合は 2m

1s

リシャーディングのために Kinesis をポーリングする頻度。

サービスクレデンシャル

文字列

デフォルトはありません。

Databricks サービスの資格情報の名前。「サービス資格情報の作成」を参照してください。

awsAccessKey

文字列

デフォルトはありません。

AWS アクセスキー。

awsSecretKey

文字列

デフォルトはありません。

アクセスキーに対応する AWS シークレットアクセスキー。

roleArn

文字列

デフォルトはありません。

Kinesis にアクセスするときに引き受けるロールの Amazon リソース名(ARN)。

roleExternalId

文字列

デフォルトはありません。

AWS アカウントへのアクセスを委任する場合に使用できる任意の値。「外部 ID の使用方法」を参照してください。

roleSessionName

文字列

デフォルトはありません。

同じロールが異なるプリンシパルによって、または異なる理由で引き受けられる場合に、セッションを一意に識別するための、引き受けられるロールセッションの識別子。

coalesceThresholdBlockSize

正の整数

10,000,000

自動結合が発生するしきい値。平均ブロックサイズがこの値より小さい場合、プリフェッチされたブロックが coalesceBinSize になるように結合されます。

coalesceBinSize

正の整数

128,000,000

結合後のおおよそのブロックサイズ。

consumerMode

polling または efo

polling

ストリーミング クエリを実行するコンシューマーの種類。 「 ストリーミングクエリ読み取りのための Kinesis 拡張ファンアウト (EFO) の設定」を参照してください。 Databricks Runtime 11.3 LTS 以降で使用できます。

requireConsumerDeregistration

true または false

false

クエリの終了時に拡張ファンアウト コンシューマーを登録解除するかどうか。 consumerModeにはefoが必要です。Databricks Runtime 11.3 LTS 以降で使用できます。

maxShardsPerDescribeの

正の整数 (最大 10000)。

100

シャードの一覧表示中に API 呼び出しごとに読み取るシャードの最大数。

消費者の名前

すべてのストリームで使用する単一のコンシューマ名、またはコンマ区切りのコンマ名のリスト。

ストリーミング クエリ ID

EFO モードで Kinesis サービスにクエリを登録するために使用されるコンシューマー名。 Databricks Runtime 11.3 LTS 以降で使用できます。

consumerNameプレフィックス

空の文字列またはカスタムプレフィックス文字列。

databricks_

EFO モードで Kinesis サービスにコンシューマーを登録するために consumerName と共に使用されるプレフィックス。 Databricks Runtime 16.0 以降で使用できます。

consumerRefreshInterval(消費者リフレッシュ間隔)

デュレーション文字列 (例: 1 秒 (最大 3600 秒) の "1s")。

300番台

Kinesis EFO コンシューマー登録がチェックされ、更新される間隔。 Databricks Runtime 11.3 LTS 以降で使用できます。

registeredConsumerId (登録された消費者ID)

コンシューマ名または ARNsのカンマ区切りリスト。

なし

EFO モードの既存のコンシューマーの識別子。 Databricks Runtime 16.1 以降で使用できます。

registeredConsumerIdType (登録済み消費者 IDTYPE)

name または ARN

なし

コンシューマ ID が名前か ARNsかを指定します。 Databricks Runtime 16.1 以降で使用できます。

注記

もちろんオプション値は、2 つのリーダー ( Sparkまたはその他) がKinesisレート制限に達することなくKinesisストリームを同時に消費できるように構成されています。 顧客数が増えれば、それに応じて選択肢を調整する必要があります。例えば、 maxFetchRateを減らしてminFetchPeriodを増やす必要があるかもしれません。

低遅延モニタリングとアラート

アラート機能のユースケースでは、低遅延が求められる。遅延を最小限に抑えるには:

  • ストリーミングクエリがKinesisストリームの唯一のコンシューマであることを確認して、フェッチパフォーマンスを最適化し、 Kinesisレート制限を回避します。
  • 取得したデータを可能な限り高速に処理するには、オプションmaxFetchDurationに小さな値(例えば200ms)を設定してください。このオプションはトレードオフの関係にあります。各バッチで最新のレコードが確実に処理されることを保証するよりも、バッチごとの処理速度を優先するからです。例えば、 Trigger.AvailableNow使用する場合、値が小さいとクエリが Kinesis ストリームの最新レコードに追いつかなくなる可能性があります。
  • オプション minFetchPeriod を 210ms に設定して、できるだけ頻繁にフェッチします。
  • オプションshardsPerTaskを設定するか、クラスターを# cores in cluster >= 2 * (# Kinesis shards) / shardsPerTaskとなるように構成してください。これにより、バックグラウンドでのプリフェッチタスクとストリーミングクエリタスクが同時に実行されることが保証されます。

クエリが5秒ごとにデータを受信している場合、 Kinesisのレート制限に達する可能性があります。設定を確認してください。

Kinesis はどのようなメトリクスをレポートしますか?

Kinesisは、各ワークスペースにおいて、コンシューマーがストリームの開始時点から何ミリ秒遅れているかを報告します。avgMsBehindLatestmaxMsBehindLatest 、およびminMsBehindLatestメトリクスは、ストリーミング クエリ プロセスにおけるすべてのワークスペースの平均、最小、最大のミリ秒を提供します。 Databricksのモニタリング構造化ストリーミング クエリ」を参照してください。

ノートブックでストリームを実行している場合は、ストリーミングクエリの進行状況ダッシュボードの 「生データ」 タブにあるメトリクスを確認してください。以下に例を示します。

JSON
{
"sources": [
{
"description": "KinesisV2[stream]",
"metrics": {
"avgMsBehindLatest": "32000.0",
"maxMsBehindLatest": "32000",
"minMsBehindLatest": "32000"
}
}
]
}

Kinesis レコードを増分バッチとして取り込む

Databricks Runtime 13.3 LTS 以降では、Databricks はインクリメンタル バッチ セマンティクスのための Trigger.AvailableNow Kinesis データソースの使用をサポートしています。次に、基本設定について説明します。

  1. マイクロバッチの読み取りが available now モードでトリガーされると、Databricks クライアントによって現在の時刻が記録されます。
  2. Databricks は、この記録時刻と前のチェックポイントの間のタイムスタンプを持つすべてのレコードについて、ソースシステムにポーリングします。
  3. Databricks は、Trigger.AvailableNow セマンティクスを使用してこれらのレコードを読み込みます。

Databricks は、ベストエフォート メカニズムを使用して、ストリーミング クエリの実行時に Kinesis ストリームに存在するすべてのレコードを使用しようとします。 タイムスタンプにわずかな違いがあり、データソースでの順序付けには保証がないため、一部のレコードがトリガーされたバッチに含まれない場合があります。 省略されたレコードは、次にトリガーされるマイクロバッチの一部として処理されます。

注記

レコードがあってもクエリが Kinesis ストリームからのレコードのフェッチに引き続き失敗する場合は、 maxFetchDuration 値を増やしてみてください。

AvailableNowを参照してください: 増分バッチ処理

Kinesis への書き込み

次のコードスニペットは、Kinesis にデータを書き込むための ForeachSink として使用できます。これには、Dataset[(String, Array[Byte])] が必要です。

注記

以下のコードスニペットは、「厳密に 1 回」ではなく、「 最低 1 回 」のセマンティクスを提供しています。

Kinesis Foreach シンクノートブック

ノートブックを新しいタブで開く

Kinesis の使用に関する推奨事項

Kinesis クエリでは、さまざまな理由でレイテンシーが発生する可能性があります。 このセクションでは、遅延のトラブルシューティングに関する推奨事項について説明します。

Kinesis ソースは、バックグラウンドスレッドで Spark ジョブを実行して、Kinesis データを定期的にプリフェッチし、Spark エグゼキューターのメモリにキャッシュします。 ストリーミング クエリは、各プリフェッチ ステップが完了した後にキャッシュされたデータを処理し、データを処理に使用できるようにします。 プリフェッチ ステップは、観測されるエンドツーエンドのレイテンシとスループットに大きく影響します。

プリフェッチのレイテンシーを短縮

クエリの待機時間を最小限に抑え、リソース使用量を最大化するように最適化するには、次の計算を使用します。

total number of CPU cores in the cluster (across all executors) >= total number of Kinesis shards / shardsPerTask.

重要

minFetchPeriod は、Kinesis シャードが にヒットするまで、複数の GetRecords API コールを作成できますReadProvisionedThroughputExceeded例外が発生した場合、コネクタが Kinesis シャードを最大限に活用するため、問題を示すものではありません。

レート制限エラーが多すぎることによる速度低下を回避

コネクタは、レート制限エラーが発生するたびに Kinesis から読み取られるデータ量を半分に減らし、このイベントをメッセージとともにログに記録します。 "Hit rate limit. Sleeping for 5 seconds."

ストリームが追いついているときにこれらのエラーが表示されるのは一般的ですが、追いついた後は、これらのエラーは表示されなくなります。 その場合は、Kinesis 側からチューニングする (容量を増やす) か、プリフェッチオプションを調整する必要があります。

データをディスクにあふれさせないようにする

Kinesis ストリームが突然急増すると、割り当てられたバッファ容量がいっぱいになり、新しいデータを追加するのに十分な速さでバッファが空にならない可能性があります。

このような場合、Spark はバッファーからディスクにブロックをスピルし、処理を遅くし、ストリームのパフォーマンスに影響を与えます。 このイベントは、次のようなメッセージと共にログに表示されます。

Bash
./log4j.txt:879546:20/03/02 17:15:04 INFO BlockManagerInfo: Updated kinesis_49290928_1_ef24cc00-abda-4acd-bb73-cb135aed175c on disk on 10.0.208.13:43458 (current size: 88.4 MB, original size: 0.0 B)

この問題に対処するには、クラスター・メモリー容量を増やす (ノードを追加するか、ノードあたりのメモリーを増やす) か、構成パラメーター を調整します fetchBufferSize.

S3 書き込みタスクのハング

Spark 投機を有効にして、ストリーム処理の進行を妨げるハング タスクを終了できます。 タスクが過度に積極的に終了しないようにするには、この設定の分位数と乗数を慎重に調整します。 良い出発点は、 spark.speculation.multiplier3 に、 spark.speculation.quantile0.95に設定することです。

ステートフルストリームのチェックポイント設定に関連するレイテンシーを削減

Databricks では、ステートフル ストリーミング クエリの変更ログ チェックポイント処理と共に RocksDB を使用することをお勧めします。 「変更ログのチェックポイント設定を有効にする」を参照してください。

ストリーミングクエリ読み取りのための Kinesis 拡張ファンアウト (EFO) の設定

Databricks Runtime 11.3以上では、Databricks Runtime KinesisコネクタがAmazon Kinesisの拡張ファンアウト(EFO)機能の使用をサポートします。

Kinesis enhanced fan-outは、シャードあたり、コンシューマーあたり2MB/秒の専用スループット(Kinesisストリームあたり最大20コンシューマー)で拡張ファンアウトストリームコンシューマーのサポートし、プルモードではなくプッシュモードで配信を記録する機能です。

By デフォルト, EFO モードで構成された構造化ストリーミング クエリは、それ自体を専用のスループットを持つコンシューマーとして登録するARN AmazonKinesisData ストリーム内の一意のコンシューマー名とコンシューマー ( リソース番号.

デフォルトでは、 Databricks はストリーミング クエリ ID と databricks_ プレフィックスを使用して、新しいコンシューマーに名前を付けます。 必要に応じて、 consumerNamePrefix または consumerName オプションを指定して、この動作をオーバーライドできます。 consumerNameは、文字、数字、特殊文字で構成される文字列である必要があります_ . -

クエリの再起動時に、 Kinesisソースはポーリング モードを使用して、コミットされていない最新のバッチが存在する場合はそれを再生します。 ストリームが未コミットのバッチを再生した後、ソースは以降の読み取りのためにEFOモードに戻ります。

重要

登録済みの EFO コンシューマーは、 Amazon Kinesis で追加料金が発生します。 クエリのティアダウン時にコンシューマーを自動的に登録解除するには、 requireConsumerDeregistration オプションを trueに設定します。 Databricks は、ドライバーのクラッシュやノード障害などのイベントに対する登録解除を保証できません。 ジョブが失敗した場合、Databricks では、Kinesis の過剰な料金を防ぐために、登録済みのコンシューマーを直接管理することをお勧めします。

高度なコンシューマー構成オプション

Databricks Runtime 16.1 以降では、次の例のように、オプション registeredConsumerIdregisteredConsumerIdTypeを使用して、既存のコンシューマーを使用して EFO モードで読み取りを構成できます。

Python
df = (spark.readStream
.format("kinesis")
.option("streamName", "mystreamname1,mystreamname2")
.option("registeredConsumerId", "consumer1,consumer2")
.option("registeredConsumerIdType", "name")
.load()
)

既存のコンシューマは、名前または ARN を使用して指定できます。 設定で指定された Kinesis ストリームソースごとに 1 つのコンシューマー ID を指定する必要があります。

Databricks ノートブックを使用したオフラインの消費者管理

Databricksは、Kinesisデータストリームに関連するコンシューマーの登録、一覧表示、登録解除を行うコンシューマー管理ユーティリティを提供します。次のコードは、Databricksノートブックでこのユーティリティを使用する方法を示しています。

  1. アクティブなクラスターに接続された新しいDatabricksノートブックで、必要な認証情報を提供してAWSKinesisConsumerManagerを作成します。

    Scala
    import com.databricks.sql.kinesis.AWSKinesisConsumerManager

    val manager = AWSKinesisConsumerManager.newManager()
    .option("serviceCredential", serviceCredentialName)
    .option("region", kinesisRegion)
    .create()
  2. コンシューマーをリストして表示します。

    Scala
    val consumers = manager.listConsumers("<stream name>")
    display(consumers)
  3. 特定のストリームのコンシューマーを登録します。

    Scala
    val consumerARN = manager.registerConsumer("<stream name>", "<consumer name>")
  4. 特定のストリームのコンシューマーの登録を解除します。

    Scala
    manager.deregisterConsumer("<stream name>", "<consumer name>")