Google アナリティクス 4 からデータを取り込む
LakeFlow Connectと Google BigQueryを使用して Google アナリティクス 4 データをDatabricksに取り込む方法を学びます。
要件
-
取り込みパイプラインを作成するには、まず次の要件を満たす必要があります。
-
ワークスペースでUnity Catalogが有効になっている必要があります。
-
サーバレス コンピュートは、ワークスペースで有効にする必要があります。 サーバレス コンピュートの要件を参照してください。
-
新しい接続を作成する場合:メタストアに対して
CREATE CONNECTION権限が必要です。Unity Catalogの「権限の管理」を参照してください。コネクタが UI ベースのパイプライン オーサリングをサポートしている場合、管理者はこのページのステップを完了することで、接続とパイプラインを同時に作成できます。 ただし、パイプラインを作成するユーザーが API ベースのパイプライン オーサリングを使用している場合、または管理者以外のユーザーである場合、管理者はまずカタログ エクスプローラーで接続を作成する必要があります。 「管理対象取り込みソースへの接続」を参照してください。
-
既存の接続を使用する予定の場合: 接続オブジェクトに対する
USE CONNECTION権限またはALL PRIVILEGESが必要です。 -
ターゲット・カタログに対する
USE CATALOG権限が必要です。 -
既存のスキーマに対する
USE SCHEMA権限とCREATE TABLE権限、またはターゲット・カタログに対するCREATE SCHEMA権限が必要です。
-
-
BigQuery を使用して GA4 から取り込むには、 Databricks 取り込み用に Google アナリティクス 4 と Google BigQuery を設定するをご覧ください。
ネットワークを構成する
サーバレス エグレス コントロールが有効になっている場合は、次の URL を許可リストに登録します。 それ以外の場合は、この手順をスキップします。サーバレス エグレス 制御のためのネットワークポリシーの管理を参照してください。
bigquery.googleapis.comoauth2.googleapis.combigquerystorage.googleapis.comgoogleapis.com
取り込みパイプラインを作成する
取り込まれた各テーブルはストリーミング テーブルに書き込まれます。
ベータ版
取り込み中に行をフィルタリングすることで、パフォーマンスを向上させ、データの重複を減らすことができます。「取り込む行を選択する」を参照してください。
- Databricks UI
- Databricks Asset Bundles
- Databricks notebook
-
Databricksワークスペースのサイドバーで、 データ取り込み をクリックします。
-
データの追加 ページの Databricks コネクタ で、 Google アナリティクス 4 をクリックします。
-
取り込みウィザードの 「接続」 ページで、Google アナリティクス 4 のアクセス認証情報を保存する接続を選択します。メタストアに
CREATE CONNECTION権限がある場合は、クリックして「接続の作成」では 、 Databricksインジェスト用の Google アナリティクス 4 と Google BigQueryのセットアップ」の認証の詳細を使用して新しい接続を作成します。
Databricks UI では、GA4 接続の OAuth のみがサポートされます。ただし、 を使用して接続を作成することで、代わりに基本認証を使用できます Databricks APIs。 Google アナリティクスの生データをご覧ください。
-
次へ をクリックします。
-
インジェスチョン設定 ページで、パイプラインの一意の名前を入力します。
-
イベント ログを書き込むカタログとスキーマを選択します。カタログに対して
USE CATALOGとCREATE SCHEMA権限を持っている場合は、クリックできます。新しいスキーマを作成するには、ドロップダウン メニューでスキーマを作成します。
-
パイプラインの作成および続行 をクリックします。
-
[ソース] ページで、取り込むテーブルを選択します。
-
保存して続行 をクリックします。
-
[宛先] ページで、データをロードするカタログとスキーマを選択します。カタログに対して
USE CATALOGとCREATE SCHEMA権限を持っている場合は、クリックできます。新しいスキーマを作成するには、ドロップダウン メニューでスキーマを作成します。
-
保存して続行 をクリックします。
-
(オプション) スケジュールと通知 ページで、
スケジュールを作成します 。宛先テーブルを更新する頻度を設定します。
-
(オプション)クリック
通知を追加して パイプライン操作の成功または失敗に関する電子メール通知を設定し、 [保存してパイプラインを実行] をクリックします。
Declarative Automation Bundle を使用して、Google アナリティクス 4 パイプラインをコードとして管理します。 バンドルにはジョブとタスクの YAML 定義を含めることができ、 Databricks CLIを使用して管理でき、さまざまなターゲット ワークスペース (開発、ステージング、本番運用など) で共有して実行できます。 詳細については、 「宣言的オートメーション バンドルとは何ですか?」を参照してください。 。
-
Databricks CLI を使用して新しいバンドルを作成します。
Bashdatabricks bundle init -
バンドルに 2 つの新しいリソース ファイルを追加します。
- パイプライン定義ファイル (例:
resources/ga4_pipeline.yml)。パイプライン.ingestion_定義を参照してください。 および例。 - データ取り込みの頻度を制御するジョブ定義ファイル (例:
resources/ga4_job.yml)。
- パイプライン定義ファイル (例:
-
Databricks CLI を使用してパイプラインをデプロイします。
Bashdatabricks bundle deploy
- 次のノートブックを Databricks ワークスペースにインポートします。
-
セル 1 はそのままにしておきます。
-
パイプライン構成の詳細に合わせてセル 3 を変更します。パイプライン.ingestion_定義を参照してください。 および例。
-
「 すべて実行 」をクリックします。
例
これらの例を使用してパイプラインを構成します。
単一のソーステーブルを取り込む
- Databricks Asset Bundles
- Databricks notebook
次のパイプライン定義ファイルは、単一のソース テーブルを取り込みま す。
variables:
dest_catalog:
default: main
dest_schema:
default: ingest_destination_schema
# The main pipeline for ga4_dab
resources:
pipelines:
pipeline_ga4:
name: ga4_pipeline
catalog: ${var.dest_catalog}
schema: ${var.dest_schema}
ingestion_definition:
connection_name: <ga4-connection>
objects:
# An array of objects to ingest from GA4. This example ingests the events table from the analytics_XXXXXXXXX dataset.
- table:
source_schema: analytics_XXXXXXXXX
source_table: events
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
次のパイプライン仕様は、単一のソース テーブルを取り込みます。
pipeline_spec = """
{
"name": "<pipeline-name>",
"ingestion_definition": {
"connection_name": "<ga4-connection>",
"objects": [
{
"table": {
"source_schema": "analytics_XXXXXXXXX",
"source_table": "events",
"destination_catalog": "main",
"destination_schema": "ingest_destination_schema"
}
}
]
}
}
"""
create_pipeline(pipeline_spec)
複数のソーステーブルを取り込む
- Databricks Asset Bundles
- Databricks notebook
次のパイプライン定義ファイルは、複数のソース テーブルを取り込みま す。
variables:
dest_catalog:
default: main
dest_schema:
default: ingest_destination_schema
# The main pipeline for ga4_dab
resources:
pipelines:
pipeline_ga4:
name: ga4_pipeline
catalog: ${var.dest_catalog}
schema: ${var.dest_schema}
ingestion_definition:
connection_name: <ga4-connection>
objects:
# An array of objects to ingest from GA4. This example ingests the events and events_intraday tables.
- table:
source_schema: analytics_XXXXXXXXX
source_table: events
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
- table:
source_schema: analytics_XXXXXXXXX
source_table: events_intraday
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
次のパイプライン仕様は、複数のソース テーブルを取り込みま す。
pipeline_spec = """
{
"name": "<pipeline-name>",
"ingestion_definition": {
"connection_name": "<ga4-connection>",
"objects": [
{
"table": {
"source_schema": "analytics_XXXXXXXXX",
"source_table": "events",
"destination_catalog": "main",
"destination_schema": "ingest_destination_schema"
}
},
{
"table": {
"source_schema": "analytics_XXXXXXXXX",
"source_table": "events_intraday",
"destination_catalog": "main",
"destination_schema": "ingest_destination_schema"
}
}
]
}
}
"""
create_pipeline(pipeline_spec)
バンドルジョブ定義ファイル
以下は、宣言型自動化バンドルで使用するジョブ定義ファイルの例です。ジョブは毎日、最後の実行からちょうど 1 日後に実行されます。
resources:
jobs:
ga4_dab_job:
name: ga4_dab_job
trigger:
periodic:
interval: 1
unit: DAYS
email_notifications:
on_failure:
- <email-address>
tasks:
- task_key: refresh_pipeline
pipeline_task:
pipeline_id: ${resources.pipelines.pipeline_ga4.id}
一般的なパターン
高度なパイプライン構成については、 「管理された取り込みパイプラインの一般的なパターン」を参照してください。
次のステップ
パイプラインを開始、スケジュールし、アラートを設定します。一般的なパイプラインメンテナンスタスクを参照してください。