メインコンテンツまでスキップ

チュートリアル:リアルタイムストリーミングワークロードを実行する

リアルタイムモードでは、エンドツーエンドの遅延が 5 ミリ秒という超低遅延ストリーミングが可能になり、不正行為の検出や賭博のパーソナライゼーションなどの運用ワークロードに最適です。 このチュートリアルでは、簡単な例を使用して、最初のリアルタイム ストリーミング クエリを設定する手順を説明します。

ラケット モード、いつ使用するか、サポートされる機能に関する概念的な情報については、構造化ストリーミングのラケット モードを参照してください。 設定要件については、 「リアルタイムモードの設定」を参照してください。

要件

始める前に、 「セットアップ モード」で指定された構成を使用するクラシック コンピュート クラスターを作成する権限があることを確認してください。 または、ワークスペース管理者に連絡して、貯蓄モードのクラスターを作成してもらいます。

ステップ 1: ノートブックを作成する

ノートブックは、ストリーミングクエリの開発とテストを行うための対話型環境を提供する。このノートブックを使ってリアルタイムクエリを記述し、結果が継続的に更新される様子を確認できます。

ノートブックを作成するには:

  1. サイドバーの 「新規」 をクリックし、次にノートブックのアイコン。 ノートブック
  2. [コンピュート] ドロップダウン メニューで、手持ちモード クラスターを選択します。
  3. デフォルト言語として Python または Scalaを 選択してください。

ステップ 2: 保留モードのクエリを実行する

以下のコードをノートブックのセルにコピー&ペーストして実行してください。この例では、指定されたレートで行を生成し、結果をリアルタイムで表示するレートソースを使用しています。

注記

display関数とrealTimeトリガーは、Databricks Runtime 17.1以降で利用可能です。

Python
inputDF = (
spark
.readStream
.format("rate")
.option("numPartitions", 2)
.option("rowsPerSecond", 1)
.load()
)
display(inputDF, realTime="5 minutes", outputMode="update")

コードを実行すると、新しい行が生成されるたびにリアルタイムで更新される表が表示されます。この表には、 timestamp列と、行ごとに増加するvalue列が表示されます。

コードを理解する

上記のコードは、リアルタイムストリーミングクエリの基本的な構成要素を示しています。次の表は、キーとその制御内容を説明しています。

パラメーター

説明

format("rate")

レートソースを使用します。これは、設定可能なレートで行を生成する組み込みソースです。これは、外部依存関係なしでテストを行う場合に役立ちます。

numPartitions

生成されるデータのパーティション数を設定します。

rowsPerSecond

1秒あたりに生成される行数を制御します。

realTime="5 minutes"

リアルタイムモードを有効にします。この間隔は、クエリのチェックポイントが進行する頻度を指定します。間隔が長くなるとチェックポイントの頻度は減りますが、障害発生後の復旧時間が長くなる可能性があります。

outputMode="update"

リアルタイムモードでは、更新出力モードが必要です。

ステップ 3: 結果を検証する

クエリを実行すると、 display関数によってテーブルが作成され、レートソースが新しい行を生成するとリアルタイムで更新されます。各行には以下が含まれます。

  • レートソースによって行が生成された日時を示すタイムスタンプ。
  • 新しい行が追加されるたびに増加する、単調増加するカウンター。

この表は最小限の遅延で継続的に更新され、リアルタイムモードがデータが利用可能になり次第処理する様子を示しています。これがリアルタイムモードの最大のメリットです。つまり、バッチ処理を待つことなく、データを即座に確認し、それに基づいて行動できるということです。

その他のリソース

最初の学期クエリを実行したので、次のリソースを調べて、 Kafka 、 Kinesis 、およびその他のサポートされているソースを使用して本番運用ストリーミング アプリケーションを構築します。