Ingest data from RabbitMQ
This feature is in Beta. Workspace admins can control access to this feature from the Previews page. See Manage Databricks previews.
This page shows how to create a managed RabbitMQ ingestion pipeline using Databricks Lakeflow Connect.
Requirements
-
To create an ingestion pipeline, you must first meet the following requirements:
-
Your workspace must be enabled for Unity Catalog.
-
Serverless compute must be enabled for your workspace. See Serverless compute requirements.
-
If you plan to create a new connection: You must have
CREATE CONNECTIONprivileges on the metastore. See Manage privileges in Unity Catalog.If the connector supports UI-based pipeline authoring, an admin can create the connection and the pipeline at the same time by completing the steps on this page. However, if the users who create pipelines use API-based pipeline authoring or are non-admin users, an admin must first create the connection in Catalog Explorer. See Connect to managed ingestion sources.
-
If you plan to use an existing connection: You must have
USE CONNECTIONprivileges orALL PRIVILEGESon the connection object. -
You must have
USE CATALOGprivileges on the target catalog. -
You must have
USE SCHEMAandCREATE TABLEprivileges on an existing schema orCREATE SCHEMAprivileges on the target catalog.
-
-
To ingest from RabbitMQ, you must first complete the steps in Connect to RabbitMQ.
Create an ingestion pipeline
Each RabbitMQ classic queue is ingested into a streaming table. For a list of supported data and limitations, see Supported data.
UI-based pipeline authoring is not supported for the RabbitMQ connector in Beta. Use Declarative Automation Bundles or a Databricks notebook to create your pipeline.
- Declarative Automation Bundles
- Databricks notebook
Use Declarative Automation Bundles to manage RabbitMQ pipelines as code. Bundles can contain YAML definitions of jobs and tasks, are managed using the Databricks CLI, and can be shared and run in different target workspaces (such as development, staging, and production). For more information, see What are Declarative Automation Bundles?.
-
Create a new bundle using the Databricks CLI:
Bashdatabricks bundle init -
Add a pipeline definition file to the bundle (for example,
resources/rabbitmq_pipeline.yml). See pipeline.ingestion_definition and Examples. -
Deploy the bundle using the Databricks CLI:
Bashdatabricks bundle deploy
-
Import the following notebook into your Databricks workspace:
-
Leave cell one as-is. Do not modify the
channelfield — it must remainPREVIEW. -
Modify the cell with your pipeline configuration details, including your queue name. See pipeline.ingestion_definition and Examples.
-
Click Run all.
Examples
Use these examples to configure your pipeline.
Pipeline with single queue
This example ingests a single RabbitMQ classic queue into a streaming table:
- Declarative Automation Bundles
variables:
connection_name:
default: my-rabbitmq-connection
dest_catalog:
default: main
dest_schema:
default: rabbitmq_ingest
resources:
pipelines:
rabbitmq_pipeline:
name: rabbitmq-ingestion-pipeline
serverless: true
continuous: true
channel: PREVIEW
catalog: ${var.dest_catalog}
target: ${var.dest_schema}
ingestion_definition:
connection_name: ${var.connection_name}
objects:
- table:
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
destination_table: orders
connector_options:
rabbitmq_options:
queue: orders
Pipeline with multiple queues
This example ingests two RabbitMQ classic queues, each into its own destination table. Define one table entry per queue:
- Declarative Automation Bundles
variables:
connection_name:
default: my-rabbitmq-connection
dest_catalog:
default: main
dest_schema:
default: rabbitmq_ingest
resources:
pipelines:
rabbitmq_pipeline:
name: rabbitmq-ingestion-pipeline
serverless: true
continuous: true
channel: PREVIEW
catalog: ${var.dest_catalog}
target: ${var.dest_schema}
ingestion_definition:
connection_name: ${var.connection_name}
objects:
- table:
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
destination_table: orders
connector_options:
rabbitmq_options:
queue: orders
- table:
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
destination_table: shipments
connector_options:
rabbitmq_options:
queue: shipments
Common patterns
For advanced pipeline configurations, see Common patterns for managed ingestion pipelines.
Next steps
The RabbitMQ connector runs continuous pipelines. Next, start your pipeline and set alerts on it. See Common pipeline maintenance tasks.