Skip to main content

Ingest data from RabbitMQ

Beta

This feature is in Beta. Workspace admins can control access to this feature from the Previews page. See Manage Databricks previews.

This page shows how to create a managed RabbitMQ ingestion pipeline using Databricks Lakeflow Connect.

Requirements

  • To create an ingestion pipeline, you must first meet the following requirements:

    • Your workspace must be enabled for Unity Catalog.

    • Serverless compute must be enabled for your workspace. See Serverless compute requirements.

    • If you plan to create a new connection: You must have CREATE CONNECTION privileges on the metastore. See Manage privileges in Unity Catalog.

      If the connector supports UI-based pipeline authoring, an admin can create the connection and the pipeline at the same time by completing the steps on this page. However, if the users who create pipelines use API-based pipeline authoring or are non-admin users, an admin must first create the connection in Catalog Explorer. See Connect to managed ingestion sources.

    • If you plan to use an existing connection: You must have USE CONNECTION privileges or ALL PRIVILEGES on the connection object.

    • You must have USE CATALOG privileges on the target catalog.

    • You must have USE SCHEMA and CREATE TABLE privileges on an existing schema or CREATE SCHEMA privileges on the target catalog.

  • To ingest from RabbitMQ, you must first complete the steps in Connect to RabbitMQ.

Create an ingestion pipeline

Each RabbitMQ classic queue is ingested into a streaming table. For a list of supported data and limitations, see Supported data.

note

UI-based pipeline authoring is not supported for the RabbitMQ connector in Beta. Use Declarative Automation Bundles or a Databricks notebook to create your pipeline.

Use Declarative Automation Bundles to manage RabbitMQ pipelines as code. Bundles can contain YAML definitions of jobs and tasks, are managed using the Databricks CLI, and can be shared and run in different target workspaces (such as development, staging, and production). For more information, see What are Declarative Automation Bundles?.

  1. Create a new bundle using the Databricks CLI:

    Bash
    databricks bundle init
  2. Add a pipeline definition file to the bundle (for example, resources/rabbitmq_pipeline.yml). See pipeline.ingestion_definition and Examples.

  3. Deploy the bundle using the Databricks CLI:

    Bash
    databricks bundle deploy

Examples

Use these examples to configure your pipeline.

Pipeline with single queue

This example ingests a single RabbitMQ classic queue into a streaming table:

YAML
variables:
connection_name:
default: my-rabbitmq-connection
dest_catalog:
default: main
dest_schema:
default: rabbitmq_ingest
resources:
pipelines:
rabbitmq_pipeline:
name: rabbitmq-ingestion-pipeline
serverless: true
continuous: true
channel: PREVIEW
catalog: ${var.dest_catalog}
target: ${var.dest_schema}
ingestion_definition:
connection_name: ${var.connection_name}
objects:
- table:
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
destination_table: orders
connector_options:
rabbitmq_options:
queue: orders

Pipeline with multiple queues

This example ingests two RabbitMQ classic queues, each into its own destination table. Define one table entry per queue:

YAML
variables:
connection_name:
default: my-rabbitmq-connection
dest_catalog:
default: main
dest_schema:
default: rabbitmq_ingest
resources:
pipelines:
rabbitmq_pipeline:
name: rabbitmq-ingestion-pipeline
serverless: true
continuous: true
channel: PREVIEW
catalog: ${var.dest_catalog}
target: ${var.dest_schema}
ingestion_definition:
connection_name: ${var.connection_name}
objects:
- table:
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
destination_table: orders
connector_options:
rabbitmq_options:
queue: orders
- table:
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
destination_table: shipments
connector_options:
rabbitmq_options:
queue: shipments

Common patterns

For advanced pipeline configurations, see Common patterns for managed ingestion pipelines.

Next steps

The RabbitMQ connector runs continuous pipelines. Next, start your pipeline and set alerts on it. See Common pipeline maintenance tasks.

Additional resources