HubSpotからデータを取り込む
ベータ版
この機能はベータ版です。ワークスペース管理者は、 プレビュー ページからこの機能へのアクセスを制御できます。「Databricks プレビューの管理」を参照してください。
Databricks LakeFlow Connectを使用してマネージド HubSpot インジェスト パイプラインを作成する方法を学びます。
要件
-
取り込みパイプラインを作成するには、まず次の要件を満たす必要があります。
-
ワークスペースでUnity Catalogが有効になっている必要があります。
-
ワークスペースでサーバレスコンピュートを有効にする必要があります。 「サーバレス コンピュート要件」を参照してください。
-
新しい接続を作成する場合: メタストアに対する
CREATE CONNECTION権限が必要です。コネクタが UI ベースのパイプライン オーサリングをサポートしている場合、管理者はこのページのステップを完了することで、接続とパイプラインを同時に作成できます。 ただし、パイプラインを作成するユーザーが API ベースのパイプライン オーサリングを使用している場合、または管理者以外のユーザーである場合、管理者はまずカタログ エクスプローラーで接続を作成する必要があります。 「管理対象取り込みソースへの接続」を参照してください。
-
既存の接続を使用する場合: 接続オブジェクトに対する
USE CONNECTION権限またはALL PRIVILEGESが必要です。 -
ターゲット カタログに対する
USE CATALOG権限が必要です。 -
既存のスキーマに対する
USE SCHEMAおよびCREATE TABLE権限、またはターゲット カタログに対するCREATE SCHEMA権限が必要です。
-
-
HubSpot から取り込むには、まず「Configure OAuth for HubSpot ingestion」のステップを完了する必要があります。
取り込みパイプラインを作成する
各ソース テーブルはストリーミング テーブルに取り込まれます。
- Databricks UI
- Databricks Asset Bundles
- Databricks notebook
- Databricksワークスペースのサイドバーで、 データ取り込み をクリックします。
- 「データの追加」 ページの 「Databricks コネクタ」 で、 「HubSpot」 をクリックします。
- 取り込みウィザードの 「接続」 ページで、HubSpot アクセス資格情報を保存する接続を選択します。メタストアに
CREATE CONNECTION権限がある場合は、クリックして「接続を作成」をクリックして 、HubSpot 取り込み用の OAuth の構成の認証詳細を使用して新しい接続を作成します。
- 次へ をクリックします。
- インジェスチョン設定 ページで、パイプラインの一意の名前を入力します。
- イベント ログを書き込むカタログとスキーマを選択します。カタログに対して
USE CATALOGとCREATE SCHEMA権限を持っている場合は、クリックできます。新しいスキーマを作成するには、ドロップダウン メニューでスキーマを作成します。
- パイプラインの作成および続行 をクリックします。
- [ソース] ページで、取り込むテーブルを選択します。
- 保存して続行 をクリックします。
- [宛先] ページで、データをロードするカタログとスキーマを選択します。カタログに対して
USE CATALOGとCREATE SCHEMA権限を持っている場合は、クリックできます。新しいスキーマを作成するには、ドロップダウン メニューでスキーマを作成します。
- 保存して続行 をクリックします。
- (オプション) スケジュールと通知 ページで、
スケジュールを作成します 。宛先テーブルを更新する頻度を設定します。
- (オプション)クリック
通知を追加して パイプライン操作の成功または失敗に関する電子メール通知を設定し、 [保存してパイプラインを実行] をクリックします。
Databricks アセット バンドルを使用して、HubSpot パイプラインをコードとして管理します。バンドルにはジョブとタスクの YAML 定義を含めることができ、 Databricks CLIを使用して管理でき、さまざまなターゲット ワークスペース (開発、ステージング、本番運用など) で共有して実行できます。 詳細については、 Databricksアセット バンドルとは何ですか?」を参照してください。 。
-
Databricks CLI を使用して新しいバンドルを作成します。
Bashdatabricks bundle init -
バンドルに 2 つの新しいリソース ファイルを追加します。
- パイプライン定義ファイル (例:
resources/hubspot_pipeline.yml)。パイプライン.ingestion_定義を参照してください。 および例。 - データ取り込みの頻度を制御するジョブ定義ファイル (例:
resources/hubspot_job.yml)。
- パイプライン定義ファイル (例:
-
Databricks CLI を使用してパイプラインをデプロイします。
Bashdatabricks bundle deploy
- 次のノートブックを Databricks ワークスペースにインポートします。
-
セル 1 はそのままにしておきます。
-
パイプライン構成の詳細に合わせてセル 3 を変更します。パイプライン.ingestion_定義を参照してください。 および例。
-
「 すべて実行 」をクリックします。
例
これらの例を使用してパイプラインを構成します。
単一のソーステーブルを取り込む
- Databricks Asset Bundles
- Databricks notebook
次のパイプライン定義ファイルは、単一のソース テーブルを取り込みま す。
variables:
dest_catalog:
default: main
dest_schema:
default: ingest_destination_schema
# The main pipeline for hubspot_dab
resources:
pipelines:
pipeline_hubspot:
name: hubspot_pipeline
catalog: ${var.dest_catalog}
schema: ${var.dest_schema}
ingestion_definition:
connection_name: <hubspot-connection>
objects:
# An array of objects to ingest from HubSpot. This example ingests the contacts table.
- table:
source_schema: default
source_table: contacts
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
以下は、単一のソース テーブルを取り込むパイプライン仕様の例です。
pipeline_name = "hubspot_pipeline"
connection_name = "<hubspot-connection>"
pipeline_spec = {
"name": pipeline_name,
"ingestion_definition": {
"connection_name": connection_name,
"objects": [
{
"table": {
"source_schema": "default",
"source_table": "contacts",
"destination_catalog": "main",
"destination_schema": "ingest_destination_schema"
}
}
]
}
}
json_payload = json.dumps(pipeline_spec, indent=2)
create_pipeline(json_payload)
複数のソーステーブルを取り込む
- Databricks Asset Bundles
- Databricks notebook
次のパイプライン定義ファイルは、複数のソース テーブルを取り込みま す。
variables:
dest_catalog:
default: main
dest_schema:
default: ingest_destination_schema
# The main pipeline for hubspot_dab
resources:
pipelines:
pipeline_hubspot:
name: hubspot_pipeline
catalog: ${var.dest_catalog}
schema: ${var.dest_schema}
ingestion_definition:
connection_name: <hubspot-connection>
objects:
# An array of objects to ingest from HubSpot. This example ingests the contacts and companies tables.
- table:
source_schema: default
source_table: contacts
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
- table:
source_schema: default
source_table: companies
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
以下は、複数のソース テーブルを取り込むパイプライン仕様の例です。
pipeline_name = "hubspot_pipeline"
connection_name = "<hubspot-connection>"
pipeline_spec = {
"name": pipeline_name,
"ingestion_definition": {
"connection_name": connection_name,
"objects": [
{
"table": {
"source_schema": "default",
"source_table": "contacts",
"destination_catalog": "main",
"destination_schema": "ingest_destination_schema"
}
},
{
"table": {
"source_schema": "default",
"source_table": "companies",
"destination_catalog": "main",
"destination_schema": "ingest_destination_schema"
}
}
]
}
}
json_payload = json.dumps(pipeline_spec, indent=2)
create_pipeline(json_payload)
バンドルジョブ定義ファイル
以下は、Databricks Asset Bundle で使用するジョブ定義ファイルの例です。ジョブは毎日、最後の実行からちょうど 1 日後に実行されます。
resources:
jobs:
hubspot_dab_job:
name: hubspot_dab_job
trigger:
periodic:
interval: 1
unit: DAYS
email_notifications:
on_failure:
- <email-address>
tasks:
- task_key: refresh_pipeline
pipeline_task:
pipeline_id: ${resources.pipelines.pipeline_hubspot.id}
一般的なパターン
高度なパイプライン構成については、 「管理された取り込みパイプラインの一般的なパターン」を参照してください。
次のステップ
パイプラインを開始、スケジュールし、アラートを設定します。一般的なパイプラインメンテナンスタスクを参照してください。