Skip to main content

Ingest data from HubSpot

Beta

This feature is in Beta. Workspace admins can control access to this feature from the Previews page. See Manage Databricks previews.

Learn how to create a managed HubSpot ingestion pipeline using Databricks Lakeflow Connect.

Requirements

To create an ingestion pipeline, you must first meet the following requirements:

  • Your workspace must be enabled for Unity Catalog.

  • Serverless compute must be enabled for your workspace. See Serverless compute requirements.

  • If you plan to create a new connection: You must have CREATE CONNECTION privileges on the metastore.

    If the connector supports UI-based pipeline authoring, an admin can create the connection and the pipeline at the same time by completing the steps on this page. However, if the users who create pipelines use API-based pipeline authoring or are non-admin users, an admin must first create the connection in Catalog Explorer. See Connect to managed ingestion sources.

  • If you plan to use an existing connection: You must have USE CONNECTION privileges or ALL PRIVILEGES on the connection object.

  • You must have USE CATALOG privileges on the target catalog.

  • You must have USE SCHEMA and CREATE TABLE privileges on an existing schema or CREATE SCHEMA privileges on the target catalog.

To ingest from HubSpot, you must first complete the steps in Configure OAuth for HubSpot ingestion.

Create an ingestion pipeline

Each source table is ingested into a streaming table.

Databricks UI

  1. In the sidebar of the Databricks workspace, click Data Ingestion.
  2. On the Add data page, under Databricks connectors, click HubSpot.
  3. On the Connection page of the ingestion wizard, select the connection that stores your HubSpot access credentials. If you have the CREATE CONNECTION privilege on the metastore, you can click Plus icon. Create connection to create a new connection with the authentication details in Configure OAuth for HubSpot ingestion.
  4. Click Next.
  5. On the Ingestion setup page, enter a unique name for the pipeline.
  6. Select a catalog and a schema to write event logs to. If you have USE CATALOG and CREATE SCHEMA privileges on the catalog, you can click Plus icon. Create schema in the drop-down menu to create a new schema.
  7. Click Create pipeline and continue.
  8. On the Source page, select the tables to ingest.
  9. Click Save and continue.
  10. On the Destination page, select a catalog and a schema to load data into. If you have USE CATALOG and CREATE SCHEMA privileges on the catalog, you can click Plus icon. Create schema in the drop-down menu to create a new schema.
  11. Click Save and continue.
  12. (Optional) On the Schedules and notifications page, click Plus icon. Create schedule. Set the frequency to refresh the destination tables.
  13. (Optional) Click Plus icon. Add notification to set email notifications for pipeline operation success or failure, then click Save and run pipeline.

Databricks Asset Bundles

Use Databricks Asset Bundles to manage HubSpot pipelines as code. Bundles can contain YAML definitions of jobs and tasks, are managed using the Databricks CLI, and can be shared and run in different target workspaces (such as development, staging, and production). For more information, see What are Databricks Asset Bundles?.

  1. Create a new bundle using the Databricks CLI:

    Bash
    databricks bundle init
  2. Add two new resource files to the bundle:

    • A pipeline definition file (resources/hubspot_pipeline.yml). See Pipeline configuration properties and Examples.
    • A workflow file that controls the frequency of data ingestion (resources/hubspot_job.yml).
  3. Deploy the pipeline using the Databricks CLI:

    Bash
    databricks bundle deploy

Databricks notebook

  1. Import the following notebook into your Databricks workspace:

    Open notebook in new tab
  2. Leave cell one as-is.

  3. Modify cell two or three with your pipeline configuration details. See Pipeline configuration properties and Examples.

  4. Click Run all.

Pipeline configuration properties

Applies to: check marked yes Databricks Asset Bundles check marked yes Databricks notebook

Set the following properties in your bundle's pipeline definition file:

Value

Description

name

A unique name for the pipeline.

connection_name

The name of the Unity Catalog connection that stores authentication details for HubSpot.

source_schema

The name of the schema that contains the data you want to ingest.

source_table

The name of the table you want to ingest.

destination_catalog

The name of the catalog you want to write to in Databricks.

destination_schema

The name of the schema you want to write to in Databricks.

destination_table (optional)

A unique name for the table you want to write to in Databricks. If you don't provide this, the connector automatically uses the source table name.

Advanced configurations

For advanced pipeline configurations, see Common patterns for managed ingestion pipelines.

Examples

Applies to: check marked yes Databricks Asset Bundles check marked yes Databricks notebook

Use these examples to configure your pipeline.

Ingest a single source table

The following resources/hubspot_pipeline.yml file ingests a single source table:

YAML
variables:
dest_catalog:
default: main
dest_schema:
default: ingest_destination_schema

# The main pipeline for hubspot_dab
resources:
pipelines:
pipeline_hubspot:
name: hubspot_pipeline
catalog: ${var.dest_catalog}
schema: ${var.dest_schema}
ingestion_definition:
connection_name: <hubspot-connection>
objects:
# An array of objects to ingest from HubSpot. This example ingests the contacts table.
- table:
source_schema: default
source_table: contacts
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}

Databricks notebook

The following is an example pipeline spec that ingests a single source table:

Python
pipeline_name = "hubspot_pipeline"
connection_name = "<hubspot-connection>"

pipeline_spec = {
"name": pipeline_name,
"ingestion_definition": {
"connection_name": connection_name,
"objects": [
{
"table": {
"source_schema": "default",
"source_table": "contacts",
"destination_catalog": "main",
"destination_schema": "ingest_destination_schema"
}
}
]
}
}
json_payload = json.dumps(pipeline_spec, indent=2)
create_pipeline(json_payload)

Ingest multiple source tables

The following resources/hubspot_pipeline.yml file ingests multiple source tables:

YAML
variables:
dest_catalog:
default: main
dest_schema:
default: ingest_destination_schema

# The main pipeline for hubspot_dab
resources:
pipelines:
pipeline_hubspot:
name: hubspot_pipeline
catalog: ${var.dest_catalog}
schema: ${var.dest_schema}
ingestion_definition:
connection_name: <hubspot-connection>
objects:
# An array of objects to ingest from HubSpot. This example ingests the contacts and companies tables.
- table:
source_schema: default
source_table: contacts
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
- table:
source_schema: default
source_table: companies
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}

Databricks notebook

The following is an example pipeline spec that ingests multiple source tables:

Python
pipeline_name = "hubspot_pipeline"
connection_name = "<hubspot-connection>"

pipeline_spec = {
"name": pipeline_name,
"ingestion_definition": {
"connection_name": connection_name,
"objects": [
{
"table": {
"source_schema": "default",
"source_table": "contacts",
"destination_catalog": "main",
"destination_schema": "ingest_destination_schema"
}
},
{
"table": {
"source_schema": "default",
"source_table": "companies",
"destination_catalog": "main",
"destination_schema": "ingest_destination_schema"
}
}
]
}
}
json_payload = json.dumps(pipeline_spec, indent=2)
create_pipeline(json_payload)

Bundle workflow file

Applies to: check marked yes Databricks Asset Bundles

The following is an example resources/hubspot_job.yml file. The job runs every day, exactly one day from the last run.

YAML
resources:
jobs:
hubspot_dab_job:
name: hubspot_dab_job

trigger:
periodic:
interval: 1
unit: DAYS

email_notifications:
on_failure:
- <email-address>

tasks:
- task_key: refresh_pipeline
pipeline_task:
pipeline_id: ${resources.pipelines.pipeline_hubspot.id}

Next steps

Start, schedule, and set alerts on your pipeline. See Common pipeline maintenance tasks.

Additional resources