メインコンテンツまでスキップ

RabbitMQからデータを取り込む

備考

ベータ版

この機能はベータ版です。ワークスペース管理者は、 プレビュー ページからこの機能へのアクセスを制御できます。Databricksのプレビューを管理するを参照してください。

このページでは、Databricks Lakeflow Connect を使用して、managed RabbitMQ インジェスト パイプラインを作成する方法について説明します。

要件

  • 取り込みパイプラインを作成するには、まず、次の要件を満たす必要があります。

    • ワークスペースでUnity Catalogが有効になっている必要があります。

    • サーバレス コンピュートがワークスペースで有効になっている必要があります。サーバレス コンピュートの要件を参照してください。

    • 新しい接続を作成する予定がある場合は、メタストアに対する CREATE CONNECTION 特権が必要です。Unity Catalog での特権の管理を参照してください。

      コネクタが UI ベースのパイプライン オーサリングをサポートしている場合、管理者はこのページのステップを完了することで、接続とパイプラインを同時に作成できます。 ただし、パイプラインを作成するユーザーが API ベースのパイプライン オーサリングを使用している場合、または管理者以外のユーザーである場合、管理者はまずカタログ エクスプローラーで接続を作成する必要があります。 「管理対象取り込みソースへの接続」を参照してください。

    • 既存の接続を使用する場合:接続オブジェクトに対してUSE CONNECTIONまたはALL PRIVILEGESの権限が必要です。

    • ターゲットカタログに対するUSE CATALOG権限が必要です。

    • 既存のスキーマに対するUSE SCHEMAおよびCREATE TABLE権限、またはターゲットカタログに対するCREATE SCHEMA権限を持っている必要があります。

  • RabbitMQから取り込むには、まず「RabbitMQに接続する」のステップを完了する必要があります。

取り込みパイプラインを作成

各 RabbitMQ クラシックキューはストリーミングテーブルに取り込まれます。サポートされているデータと制限については、サポートされているデータを参照してください。

注記

UI ベースのパイプライン オーサリングは、RabbitMQ コネクタのベータ版ではサポートされていません。宣言型オートメーションバンドル、またはDatabricksノートブックを使用してパイプラインを作成します。

宣言型オートメーションバンドルを使用して、RabbitMQ パイプラインをコードとして管理します。バンドルには、ジョブとタスクの YAML 定義を含めることができ、Databricks CLI を使用して管理され、異なるターゲット ワークスペース(開発、ステージング、本番運用など)で共有および実行できます。詳細については、「宣言型オートメーションバンドルとは」をご覧ください。

  1. Databricks CLI を使用した新しいバンドルの作成

    Bash
    databricks bundle init
  2. バンドルにパイプライン定義ファイルを追加してください (たとえば、resources/rabbitmq_pipeline.yml)。「パイプライン.ingestion_definition」を参照してください。と

  3. Databricks CLI を使用してバンドルをデプロイする:

    Bash
    databricks bundle deploy

これらの例を使用してパイプラインを構成してください。

単一キューのパイプライン

この例では、単一のRabbitMQクラシックキューをストリーミングテーブルに取り込みます。

YAML
variables:
connection_name:
default: my-rabbitmq-connection
dest_catalog:
default: main
dest_schema:
default: rabbitmq_ingest
resources:
pipelines:
rabbitmq_pipeline:
name: rabbitmq-ingestion-pipeline
serverless: true
continuous: true
channel: PREVIEW
catalog: ${var.dest_catalog}
target: ${var.dest_schema}
ingestion_definition:
connection_name: ${var.connection_name}
objects:
- table:
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
destination_table: orders
connector_options:
rabbitmq_options:
queue: orders

複数のキューを持つパイプライン

この例では、2つのRabbitMQクラシックキューをそれぞれ独自の宛先テーブルに読み込みます。キューごとにテーブルエントリを1つ定義します。

YAML
variables:
connection_name:
default: my-rabbitmq-connection
dest_catalog:
default: main
dest_schema:
default: rabbitmq_ingest
resources:
pipelines:
rabbitmq_pipeline:
name: rabbitmq-ingestion-pipeline
serverless: true
continuous: true
channel: PREVIEW
catalog: ${var.dest_catalog}
target: ${var.dest_schema}
ingestion_definition:
connection_name: ${var.connection_name}
objects:
- table:
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
destination_table: orders
connector_options:
rabbitmq_options:
queue: orders
- table:
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
destination_table: shipments
connector_options:
rabbitmq_options:
queue: shipments

一般的なパターン

高度なパイプライン構成については、「マネージド取り込みパイプラインの一般的なパターン」を参照してください。

次のステップ

RabbitMQコネクタは継続的なパイプラインを実行します。次に、パイプラインを開始し、アラートを設定してください。一般的なパイプラインメンテナンスタスクを参照してください。

その他のリソース