RabbitMQからデータを取り込む
ベータ版
この機能はベータ版です。ワークスペース管理者は、 プレビュー ページからこの機能へのアクセスを制御できます。Databricksのプレビューを管理するを参照してください。
このページでは、Databricks Lakeflow Connect を使用して、managed RabbitMQ インジェスト パイプラインを作成する方法について説明します。
要件
-
取り込みパイプラインを作成するには、まず、次の要件を満たす必要があります。
-
ワークスペースでUnity Catalogが有効になっている必要があります。
-
サーバレス コンピュートがワークスペースで有効になっている必要があります。サーバレス コンピュートの要件を参照してください。
-
新しい接続を作成する予定がある場合は、メタストアに対する
CREATE CONNECTION特権が必要です。Unity Catalog での特権の管理を参照してください。コネクタが UI ベースのパイプライン オーサリングをサポートしている場合、管理者はこのページのステップを完了することで、接続とパイプラインを同時に作成できます。 ただし、パイプラインを作成するユーザーが API ベースのパイプライン オーサリングを使用している場合、または管理者以外のユーザーである場合、管理者はまずカタログ エクスプローラーで接続を作成する必要があります。 「管理対象取り込みソースへの接続」を参照してください。
-
既存の接続を使用する場合:接続オブジェクトに対して
USE CONNECTIONまたはALL PRIVILEGESの権限が必要です。 -
ターゲットカタログに対する
USE CATALOG権限が必要です。 -
既存のスキーマに対する
USE SCHEMAおよびCREATE TABLE権限、またはターゲットカタログに対するCREATE SCHEMA権限を持っている必要があります。
-
-
RabbitMQから取り込むには、まず「RabbitMQに接続する」のステップを完了する必要があります。
取り込みパイプラインを作成
各 RabbitMQ クラシックキューはストリーミングテーブルに取り込まれます。サポートされているデータと制限については、サポートされているデータを参照してください。
UI ベースのパイプライン オーサリングは、RabbitMQ コネクタのベータ版ではサポートされていません。宣言型オートメーションバンドル、またはDatabricksノートブックを使用してパイプラインを作成します。
- Declarative Automation Bundles
- Databricks notebook
宣言型オートメーションバンドルを使用して、RabbitMQ パイプラインをコードとして管理します。バンドルには、ジョブとタスクの YAML 定義を含めることができ、Databricks CLI を使用して管理され、異なるターゲット ワークスペース(開発、ステージング、本番運用など)で共有および実行できます。詳細については、「宣言型オートメーションバンドルとは」をご覧ください。
-
Databricks CLI を使用した新しいバンドルの作成
Bashdatabricks bundle init -
バンドルにパイプライン定義ファイルを追加してください (たとえば、
resources/rabbitmq_pipeline.yml)。「パイプライン.ingestion_definition」を参照してください。と例 -
Databricks CLI を使用してバンドルをデプロイする:
Bashdatabricks bundle deploy
- 次のノートブックをDatabricksワークスペースにインポートしてください:
-
セル1をそのままにしてください。
channelフィールドは変更しないでください —PREVIEWのままにする必要があります。 -
キュー名を含むパイプラインの構成詳細を使用して、セルを変更します。「パイプライン.ingestion_definition」を参照してください。と例。
-
「 すべて実行 」をクリックします。
例
これらの例を使用してパイプラインを構成してください。
単一キューのパイプライン
この例では、単一のRabbitMQクラシックキューをストリーミングテーブルに取り込みます。
- Declarative Automation Bundles
variables:
connection_name:
default: my-rabbitmq-connection
dest_catalog:
default: main
dest_schema:
default: rabbitmq_ingest
resources:
pipelines:
rabbitmq_pipeline:
name: rabbitmq-ingestion-pipeline
serverless: true
continuous: true
channel: PREVIEW
catalog: ${var.dest_catalog}
target: ${var.dest_schema}
ingestion_definition:
connection_name: ${var.connection_name}
objects:
- table:
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
destination_table: orders
connector_options:
rabbitmq_options:
queue: orders
複数のキューを持つパイプライン
この例では、2つのRabbitMQクラシックキューをそれぞれ独自の宛先テーブルに読み込みます。キューごとにテーブルエントリを1つ定義します。
- Declarative Automation Bundles
variables:
connection_name:
default: my-rabbitmq-connection
dest_catalog:
default: main
dest_schema:
default: rabbitmq_ingest
resources:
pipelines:
rabbitmq_pipeline:
name: rabbitmq-ingestion-pipeline
serverless: true
continuous: true
channel: PREVIEW
catalog: ${var.dest_catalog}
target: ${var.dest_schema}
ingestion_definition:
connection_name: ${var.connection_name}
objects:
- table:
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
destination_table: orders
connector_options:
rabbitmq_options:
queue: orders
- table:
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
destination_table: shipments
connector_options:
rabbitmq_options:
queue: shipments
一般的なパターン
高度なパイプライン構成については、「マネージド取り込みパイプラインの一般的なパターン」を参照してください。
次のステップ
RabbitMQコネクタは継続的なパイプラインを実行します。次に、パイプラインを開始し、アラートを設定してください。一般的なパイプラインメンテナンスタスクを参照してください。