Ingest data from Google Ads
This feature is in Beta. Workspace admins can control access to this feature from the Previews page. See Manage Databricks previews.
Learn how to create a managed ingestion pipeline to ingest data from Google Ads into Databricks.
Requirements
-
To create an ingestion pipeline, you must meet the following requirements:
-
Your workspace must be enabled for Unity Catalog.
-
Serverless compute must be enabled for your workspace. See Serverless compute requirements.
-
If you plan to create a new connection: You must have
CREATE CONNECTIONprivileges on the metastore. See Manage privileges in Unity Catalog.If the connector supports UI-based pipeline authoring, an admin can create the connection and the pipeline at the same time by completing the steps on this page. However, if the users who create pipelines use API-based pipeline authoring or are non-admin users, an admin must first create the connection in Catalog Explorer. See Connect to managed ingestion sources.
-
If you plan to use an existing connection: You must have
USE CONNECTIONprivileges orALL PRIVILEGESon the connection object. -
You must have
USE CATALOGprivileges on the target catalog. -
You must have
USE SCHEMAandCREATE TABLEprivileges on an existing schema orCREATE SCHEMAprivileges on the target catalog.
-
-
To ingest from Google Ads, you must complete the steps in Configure OAuth for Google Ads ingestion.
Create an ingestion pipeline
- Databricks Asset Bundles
- Databricks notebook
This tab describes how to deploy an ingestion pipeline using Declarative Automation Bundles. Bundles can contain YAML definitions of jobs and tasks, are managed using the Databricks CLI, and can be shared and run in different target workspaces (such as development, staging, and production). For more information, see What are Declarative Automation Bundles?.
-
Create a new bundle using the Databricks CLI:
Bashdatabricks bundle init -
Add two new resource files to the bundle:
- A pipeline definition file (for example,
resources/google_ads_pipeline.yml). - A job definition file that controls the frequency of data ingestion (for example,
resources/google_ads_job.yml).
See pipeline.ingestion_definition and Examples.
- A pipeline definition file (for example,
-
Deploy the pipeline using the Databricks CLI:
Bashdatabricks bundle deploy
-
Import the following notebook into your Databricks workspace:
-
Leave cell one as-is.
-
Modify cell two or three with your pipeline configuration details, depending on your use case. See pipeline.ingestion_definition and Examples.
-
Click Run all.
Examples
- Databricks Asset Bundles
- Databricks notebook
The following pipeline definition file ingests all current and future tables from one account:
resources:
pipelines:
pipeline_google_ads:
name: <pipeline>
catalog: <destination-catalog>
target: <destination-schema>
ingestion_definition:
connection_name: <connection>
objects:
- schema:
source_schema: <account-id>
destination_catalog: <destination-catalog>
destination_schema: <destination-schema>
google_ads_options:
manager_account_id: <manager-account-id>
lookback_window_days: <lookback-window-days>
sync_start_date: <sync-start-date>
The following pipeline definition file selects specific tables from an account to ingest:
resources:
pipelines:
pipeline_google_ads:
name: <pipeline-name>
catalog: <destination-catalog>
target: <destination-schema>
ingestion_definition:
connection_name: <connection-name>
objects:
- table:
source_schema: <customer-account-id>
source_table: <table1>
destination_catalog: <destination-catalog>
destination_schema: <destination-schema>
destination_table: <destination-table>
google_ads_options:
manager_account_id: <manager-account-id>
lookback_window_days: <lookback-window-days>
sync_start_date: <sync-start-date>
- table:
source_schema: <customer-account-id>
source_table: table2
destination_catalog: <destination-catalog>
destination_schema: <destination-schema>
destination_table: <destination-table>
google_ads_options:
manager_account_id: <manager-account-id>
lookback_window_days: <lookback-window-days>
sync_start_date: <sync-start-date>
The following is an example job definition file:
resources:
jobs:
google_ads_dab_job:
name: google_ads_dab_job
trigger:
# Run this job every day, exactly one day from the last run
# See https://docs.databricks.com/api/workspace/jobs/create#trigger
periodic:
interval: 1
unit: DAYS
email_notifications:
on_failure:
- <email-address>
tasks:
- task_key: refresh_pipeline
pipeline_task:
pipeline_id: <pipeline-id>
The following pipeline specification ingests all current and future tables from one account:
pipeline_spec = {
"name": <pipeline>,
"catalog": "<destination-catalog>",
"schema": "<destination-schema>",
"ingestion_definition": {
"connection_name": <connection>,
"objects": [
{
"schema": {
"source_schema": "<account-id>",
"destination_catalog": "<destination-catalog>",
"destination_schema": "<destination-schema>",
"google_ads_options": {
"manager_account_id": "<manager-account-id>",
"lookback_window_days": <lookback-window-days>,
"sync_start_date": "<sync-start-date>"
}
}
}
]
}
}
json_payload = json.dumps(pipeline_spec, indent=2)
create_pipeline(json_payload)
The following pipeline specification selects specific tables from an account to ingest:
pipeline_spec = {
"name": <pipeline>,
"catalog": "<destination-catalog>",
"schema": "<destination-schema>",
"ingestion_definition": {
"connection_name": <connection>,
"objects": [
{
"table": {
"source_schema": "<customer-account-id>",
"source_table": "<table1>",
"destination_catalog": "<destination-catalog>",
"destination_schema": "<destination-schema>",
"destination_table": "<destination-table>",
"google_ads_options": {
"manager_account_id": "<manager-account-id>",
"lookback_window_days": <lookback-window-days>,
"sync_start_date": "<sync-start-date>"
}
}
},
{
"table": {
"source_schema": "<customer-account-id>",
"source_table": "table2",
"destination_catalog": "<destination-catalog>",
"destination_schema": "<destination-schema>",
"destination_table": "<destination-table>",
"google_ads_options": {
"manager_account_id": "<manager-account-id>",
"lookback_window_days": <lookback-window-days>,
"sync_start_date": "<sync-start-date>"
}
}
}
]
}
}
json_payload = json.dumps(pipeline_spec, indent=2)
create_pipeline(json_payload)
Common patterns
For advanced pipeline configurations, see Common patterns for managed ingestion pipelines.
Next steps
Start, schedule, and set alerts on your pipeline. See Common pipeline maintenance tasks.