ServiceNowからデータを取り込む
LakeFlow Connectを使用してマネージド ServiceNow インジェスト パイプラインを作成する方法を学びます。
要件
-
取り込みパイプラインを作成するには、まず次の要件を満たす必要があります。
-
ワークスペースでUnity Catalogが有効になっている必要があります。
-
サーバレス コンピュートは、ワークスペースで有効にする必要があります。 サーバレス コンピュートの要件を参照してください。
-
新しい接続を作成する場合:メタストアに対して
CREATE CONNECTION権限が必要です。Unity Catalogの「権限の管理」を参照してください。コネクタが UI ベースのパイプライン オーサリングをサポートしている場合、管理者はこのページのステップを完了することで、接続とパイプラインを同時に作成できます。 ただし、パイプラインを作成するユーザーが API ベースのパイプライン オーサリングを使用している場合、または管理者以外のユーザーである場合、管理者はまずカタログ エクスプローラーで接続を作成する必要があります。 「管理対象取り込みソースへの接続」を参照してください。
-
既存の接続を使用する予定の場合: 接続オブジェクトに対する
USE CONNECTION権限またはALL PRIVILEGESが必要です。 -
ターゲット・カタログに対する
USE CATALOG権限が必要です。 -
既存のスキーマに対する
USE SCHEMA権限とCREATE TABLE権限、またはターゲット・カタログに対するCREATE SCHEMA権限が必要です。
-
-
ServiceNow から取り込むには、まず「Configure ServiceNow for Databricks ingestion」のステップを完了する必要があります。
取り込みパイプラインを作成する
取り込まれた各テーブルはストリーミング テーブルに書き込まれます。
ベータ版
取り込み中に行をフィルタリングすることで、パフォーマンスを向上させ、データの重複を減らすことができます。「取り込む行を選択する」を参照してください。
- Databricks UI
- Databricks Asset Bundles
- Databricks notebook
- Databricksワークスペースのサイドバーで、 データ取り込み をクリックします。
- [ データの追加 ] ページの [Databricks コネクタ ] で、[ ServiceNow ] をクリックします。
- インジェスト ウィザードの [ 接続] ページで、ServiceNow アクセス資格情報を保存する接続を選択します。メタストアに
CREATE CONNECTION権限がある場合は、クリックして「接続を作成」をクリックして 、「Databricks 取り込み用に ServiceNow を構成する」の認証詳細を使用して新しい接続を作成します。
- 次へ をクリックします。
- インジェスチョン設定 ページで、パイプラインの一意の名前を入力します。
- イベント ログを書き込むカタログとスキーマを選択します。カタログに対して
USE CATALOGとCREATE SCHEMA権限を持っている場合は、クリックできます。新しいスキーマを作成するには、ドロップダウン メニューでスキーマを作成します。
- パイプラインの作成および続行 をクリックします。
- [ソース] ページで、取り込むテーブルを選択します。 [すべてのテーブル] を選択すると、コネクタはソース スキーマ内の既存および将来のすべてのテーブルを宛先スキーマに書き込みます。パイプラインあたりのテーブルの最大数は 250 です。
- 保存して続行 をクリックします。
- [宛先] ページで、データをロードするカタログとスキーマを選択します。カタログに対して
USE CATALOGとCREATE SCHEMA権限を持っている場合は、クリックできます。新しいスキーマを作成するには、ドロップダウン メニューでスキーマを作成します。
- 保存して続行 をクリックします。
- (オプション) スケジュールと通知 ページで、
スケジュールを作成します 。宛先テーブルを更新する頻度を設定します。
- (オプション)クリック
通知を追加して パイプライン操作の成功または失敗に関する電子メール通知を設定し、 [保存してパイプラインを実行] をクリックします。
宣言型オートメーション バンドルを使用して、ServiceNow パイプラインをコードとして管理します。 バンドルにはジョブとタスクの YAML 定義を含めることができ、 Databricks CLIを使用して管理でき、さまざまなターゲット ワークスペース (開発、ステージング、本番運用など) で共有して実行できます。 詳細については、 「宣言的オートメーション バンドルとは何ですか?」を参照してください。 。
-
Databricks CLI を使用して新しいバンドルを作成します。
Bashdatabricks bundle init -
バンドルに 2 つの新しいリソース ファイルを追加します。
- パイプライン定義ファイル (例:
resources/servicenow_pipeline.yml)。パイプライン.ingestion_定義を参照してください。 および例。 - データ取り込みの頻度を制御するジョブ定義ファイル (例:
resources/servicenow_job.yml)。
- パイプライン定義ファイル (例:
-
Databricks CLI を使用してパイプラインをデプロイします。
Bashdatabricks bundle deploy
- 次のノートブックを Databricks ワークスペースにインポートします。
-
セル 1 はそのままにしておきます。
-
パイプライン構成の詳細に合わせてセル 3 を変更します。パイプライン.ingestion_定義を参照してください。 および例。
-
「 すべて実行 」をクリックします。
例
これらの例を使用してパイプラインを構成します。
単一のソーステーブルを取り込む
- Databricks Asset Bundles
- Databricks notebook
次のパイプライン定義ファイルは、単一のソース テーブルを取り込みま す。
variables:
dest_catalog:
default: main
dest_schema:
default: ingest_destination_schema
# The main pipeline for servicenow_dab
resources:
pipelines:
pipeline_servicenow:
name: servicenow_pipeline
catalog: ${var.dest_catalog}
schema: ${var.dest_schema}
ingestion_definition:
connection_name: <servicenow-connection>
objects:
# An array of objects to ingest from ServiceNow. This example ingests the incident table.
- table:
source_schema: default
source_table: incident
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
次のパイプライン仕様は、単一のソース テーブルを取り込みます。
pipeline_spec = """
{
"name": "<pipeline-name>",
"ingestion_definition": {
"connection_name": "<servicenow-connection>",
"objects": [
{
"table": {
"source_schema": "default",
"source_table": "incident",
"destination_catalog": "main",
"destination_schema": "ingest_destination_schema"
}
}
]
}
}
"""
create_pipeline(pipeline_spec)
複数のソーステーブルを取り込む
- Databricks Asset Bundles
- Databricks notebook
次のパイプライン定義ファイルは、複数のソース テーブルを取り込みま す。
variables:
dest_catalog:
default: main
dest_schema:
default: ingest_destination_schema
# The main pipeline for servicenow_dab
resources:
pipelines:
pipeline_servicenow:
name: servicenow_pipeline
catalog: ${var.dest_catalog}
schema: ${var.dest_schema}
ingestion_definition:
connection_name: <servicenow-connection>
objects:
# An array of objects to ingest from ServiceNow. This example ingests the incident and problem tables.
- table:
source_schema: default
source_table: incident
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
- table:
source_schema: default
source_table: problem
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
次のパイプライン仕様は、複数のソース テーブルを取り込みま す。
pipeline_spec = """
{
"name": "<pipeline-name>",
"ingestion_definition": {
"connection_name": "<servicenow-connection>",
"objects": [
{
"table": {
"source_schema": "default",
"source_table": "incident",
"destination_catalog": "main",
"destination_schema": "ingest_destination_schema"
}
},
{
"table": {
"source_schema": "default",
"source_table": "problem",
"destination_catalog": "main",
"destination_schema": "ingest_destination_schema"
}
}
]
}
}
"""
create_pipeline(pipeline_spec)
バンドルジョブ定義ファイル
以下は、宣言型自動化バンドルで使用するジョブ定義ファイルの例です。ジョブは毎日、最後の実行からちょうど 1 日後に実行されます。
resources:
jobs:
servicenow_dab_job:
name: servicenow_dab_job
trigger:
periodic:
interval: 1
unit: DAYS
email_notifications:
on_failure:
- <email-address>
tasks:
- task_key: refresh_pipeline
pipeline_task:
pipeline_id: ${resources.pipelines.pipeline_servicenow.id}
一般的なパターン
高度なパイプライン構成については、 「管理された取り込みパイプラインの一般的なパターン」を参照してください。
次のステップ
パイプラインを開始、スケジュールし、アラートを設定します。一般的なパイプラインメンテナンスタスクを参照してください。