リアルタイムモードを始める
プレビュー
この機能は パブリック プレビュー段階です。
呼びかけモードでは、エンドツーエンドの遅延が 5 ミリ秒という超低遅延ストリーミングが可能になり、不正行為の検出や賭博のパーソナライゼーションなどの運用ワークロードに最適です。 このチュートリアルでは、簡単な例を使用して、最初のリアルタイム ストリーミング クエリを設定する手順を説明します。
ラケット モード、いつ使用するか、サポートされる機能に関する概念的な情報については、構造化ストリーミングのラケット モードを参照してください。
要件
- クラシック コンピュートを作成する権限があります。
- Databricks Runtime 17.1 以上 (リアルタイム モードで
display関数を使用するために必要)。
クラシック コンピュートの作成権限がない場合は、ワークスペース管理者に連絡して、ステップ 1 の構成を使用して、ログイン モード クラスターを作成してもらいます。
ステップ 1: 貯蓄モード用のクラシックなコンピュートを作成する
超低レイテンシを実現するには、起動モードで特定のクラシック コンピュート構成が必要です。 これらの設定により、タスクはすべてのステージで同時に実行され、データはバッチではなく到着時に継続的に処理されます。
適切に設定されたクラシック コンピュートを作成するには:
-
Databricksワークスペースで、サイドバーの コンピュート をクリックします。
-
コンピュートを作成 をクリックします。
-
名前を入力してください。
-
Databricks Runtime 17.1 以上を選択します。
-
Photon アクセラレーション をクリアします (リアルタイム モードでは Photon はサポートされません)。
-
[オートスケールを有効にする] をオフにします (モードでは固定クラスター サイズが必要です)。
-
[詳細なパフォーマンス] で、 [スポット インスタンスを使用する] を オフにします (スポット インスタンスは中断を引き起こす可能性があります)。
-
追加の設定を展開するには、 「詳細オプション」 をクリックします。
-
[アクセス モード] で、 [専用 (旧称: シングル ユーザー)] を選択します。
-
Spark config の下に、次の構成を追加します。
Textspark.databricks.streaming.realTimeMode.enabled true -
コンピュートを作成 をクリックします。
ステップ 2: ノートブックを作成する
ノートブックは、ストリーミング クエリを開発およびテストするためのインタラクティブな環境を提供します。このノートブックを使用してリアルタイム クエリを記述し、結果が継続的に更新されるのを確認します。
ノートブックを作成するには:
- サイドバーの [新規] をクリックし、 [ノートブック] をクリックします。
- コンピュート ドロップダウン メニューで、ステップ 1 で作成したコンピュートを選択します。
- デフォルトの言語として Python または Scala を 選択します。
ステップ 3: 保留モードのクエリを実行する
次のコードをコピーしてノートブックのセルに貼り付け、実行します。この例では、指定されたレートで行を生成し、結果をリアルタイムで表示するレート ソースを使用します。
realTimeトリガーを持つdisplay関数は、Databricks Runtime 17.1 以降で使用できます。
- Python
- Scala
inputDF = (
spark
.readStream
.format("rate")
.option("numPartitions", 2)
.option("rowsPerSecond", 1)
.load()
)
display(inputDF, realTime="5 minutes", outputMode="update")
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.streaming.OutputMode
val inputDF = spark
.readStream
.format("rate")
.option("numPartitions", 2)
.option("rowsPerSecond", 1)
.load()
display(inputDF, trigger=Trigger.RealTime(), outputMode=OutputMode.Update())
コードを実行すると、新しい行が生成されるとリアルタイムで更新されるテーブルが表示されます。テーブルには、行ごとに増加するtimestamp列とvalue列が表示されます。
コードを理解する
上記のコードは、リアルタイム ストリーミング クエリの重要なコンポーネントを示しています。次の表は、キーとその制御内容を説明しています。
- Python
- Scala
パラメーター | 説明 |
|---|---|
| 設定可能なレートで行を生成する組み込みソースであるレート ソースを使用します。これは外部依存関係なしでテストする場合に便利です。 |
| 生成されるデータのパーティション数を設定します。 |
| 1 秒あたりに生成される行数を制御します。 |
| リアルタイム モードを有効にします。間隔は、クエリ チェックポイントが進行する頻度を指定します。間隔が長いほどチェックポイントの頻度は少なくなりますが、障害後の回復時間は長くなる可能性があります。 |
| リアルタイム モードでは更新出力モードが必要です。 |
パラメーター | 説明 |
|---|---|
| 設定可能なレートで行を生成する組み込みソースであるレート ソースを使用します。これは外部依存関係なしでテストする場合に便利です。 |
| 生成されるデータのパーティション数を設定します。 |
| 1 秒あたりに生成される行数を制御します。 |
| デフォルトのチェックポイント間隔でリアルタイム モードを有効にします。間隔を指定することもできます (例: |
| リアルタイム モードでは更新出力モードが必要です。 |
あなたが見ているもの
クエリを実行すると、 display関数によって、レート ソースが新しい行を生成するとリアルタイムで更新されるテーブルが作成されます。各行には次の内容が含まれます。
- タイムスタンプ : レートソースによって行が生成された時刻
- 値 : 新しい行ごとに増加する単調に増加するカウンター
テーブルは最小限の遅延で継続的に更新され、データが利用可能になるとすぐにリアルタイム モードが処理する方法を示しています。これがリアルタイム モードの主な利点です。バッチ処理を待つことなく、データをすぐに確認して対応することができます。
学んだこと
最初のリアルタイム ストリーミング クエリの設定と実行が正常に完了しました。これで、次の方法がわかりました。
- 幣モードに必要な設定を使用してクラシック コンピュートを構成します (専用クラスター、 Photon無効、オートスケール無効、 Spark構成)
realTimeトリガーを使用してリアルタイム処理を有効にする- インタラクティブな開発とテストには
display関数を使用します - 継続的な更新を観察して、クエリがリアルタイムモードで実行されていることを確認します。
これで、 Kafka 、 Kinesis 、およびその他のサポートされているソースを使用して本番運用短期パイプラインを構築する準備が整いました。 構造化ストリーミングの詳細については、 「構造化ストリーミングの概念」を参照してください。
次のステップ
最初の短期クエリを実行したので、次のリソースを調べて本番運用ストリーミング アプリケーションを構築します。
- リアルタイム モードの例 - Kafka ソースとシンク、ステートフル クエリ、集計、カスタム シンクの実用的なコード例
- 中断モードのリファレンス - クラスターのサイジング、サポートされているオペレーター、モニタリング、および機能の制限について学びます
- ステートフル ストリーミング アプリケーション - 重複排除、集約、ウィンドウ処理のためのストリーミング クエリに状態管理を追加します。
- 高度な状態管理 - TTL(Time-to-Live)と複雑なロジックを使用したカスタムステートフル処理には
transformWithStateを使用します