Salesforce からデータを取り込む
この記事では、LakeFlow Connect を使用して Salesforce からデータを取り込み、Databricks に読み込む方法について説明します。結果として得られるインジェスト パイプラインは Unity Catalog によって制御され、サーバレス コンピュートと DLT によって駆動されます。
Salesforce インジェスト コネクタは、次のソースをサポートしています。
- Salesforce Sales Cloud
始める前に
インジェスト パイプラインを作成するには、次の要件を満たす必要があります。
-
ワークスペースが Unity Catalog に対して有効になっています。
-
サーバレス コンピュートがワークスペースで有効になっています。 サーバレス コンピュートの有効化を参照してください。
-
接続を作成する予定の場合: メタストアに対する
CREATE CONNECTION
権限があります。既存の接続を使用する予定の場合: 接続オブジェクトに対する
USE CONNECTION
権限またはALL PRIVILEGES
があります。 -
ターゲット・カタログに対する
USE CATALOG
権限があります。 -
既存のスキーマに対する
USE SCHEMA
権限とCREATE TABLE
権限、またはターゲット カタログに対するCREATE SCHEMA
権限を持っている。
Salesforce Sales Cloud から取り込むには、次のことをお勧めします。
- Databricks がデータを取得するために使用できる Salesforce ユーザーを作成します。ユーザーが API アクセスと、取り込む予定のすべてのオブジェクトにアクセスできることを確認します。
Salesforce 接続を作成する
必要なアクセス許可: メタストア CREATE CONNECTION
。これを許可するには、メタストア管理者に連絡してください。
既存の接続を使用してインジェストパイプラインを作成する場合は、次のセクションに進んでください。接続には USE CONNECTION
または ALL PRIVILEGES
が必要です。
Salesforce 接続を作成するには、次の手順を実行します。
-
Databricksワークスペースで、 カタログ > 外部ロケーション > 接続 > 接続の作成 をクリックします。
-
接続名 で、Salesforce 接続の一意の名前を入力します。
-
接続タイプ で Salesforce をクリックします。
-
Salesforce サンドボックス アカウントから取り込む場合は、 サンドボックスである を
true
に設定します。 -
Salesforce でログイン をクリックします。
-
Salesforce サンドボックスから取り込む場合は、 カスタムドメインを使用 をクリックします。 サンドボックスのURLを入力し、ログインに進みます。 Databricks では、Databricks インジェスト専用の Salesforce ユーザーとしてログインすることをお勧めします。
-
接続の作成 ページに戻ったら、 作成 をクリックします。
インジェスト パイプラインを作成する
必要な権限: 接続 USE CONNECTION
または ALL PRIVILEGES
。
この手順では、インジェスト パイプラインを作成する方法について説明します。取り込まれた各テーブルは、明示的に名前を変更しない限り、デフォルトによって宛先に同じ名前 (ただしすべて小文字) のストリーミングテーブルに対応します。
- Databricks UI
- Databricks Asset Bundles
- Databricks CLI
-
Databricksワークスペースのサイドバーで、 データ取り込み をクリックします。
-
データの追加 ページの Databricks コネクタ で、 Salesforce をクリックします。
Salesforce インジェスト ウィザードが開きます。
-
ウィザードの パイプライン ページで、インジェスト パイプラインの一意の名前を入力します。
-
宛先カタログ ドロップダウンで、カタログを選択します。取り込まれたデータとイベント ログは、このカタログに書き込まれます。
-
Salesforce データへのアクセスに必要な資格情報を保存する Unity Catalog 接続を選択します。
Salesforce 接続がない場合は、 接続の作成 をクリックします。 メタストアに対する
CREATE CONNECTION
特権が必要です。 -
パイプラインの作成および続行 をクリックします。
-
「 ソース 」ページで、取り込むテーブルを選択し、「 次へ 」をクリックします。
[ すべてのテーブル ] を選択した場合、Salesforce インジェスト コネクタは、ソース スキーマ内のすべての既存および将来のテーブルをマネージド テーブル Unity Catalog 書き込みます。
-
宛先 ページで、書き込む Unity Catalog カタログとスキーマを選択します。
既存のスキーマを使用しない場合は、 スキーマの作成 をクリックします。 親カタログに対する
USE CATALOG
権限とCREATE SCHEMA
権限が必要です。 -
パイプラインを保存と続行 をクリックします。
-
設定 ページで、 スケジュールの作成 をクリックします。宛先テーブルを更新する頻度を設定します。
-
オプションで、パイプライン操作の成功または失敗に関するEメール 通知を設定します。
-
パイプラインの保存と実行 をクリックします。
このタブでは、Databricks Asset Bundle を使用してインジェスト パイプラインをデプロイする方法について説明します。バンドルには、ジョブとタスクの YAML 定義を含めることができ、 Databricks CLIを使用して管理され、異なるターゲット ワークスペース (開発、ステージング、本番運用など) で共有および実行できます。 詳細については、「アセットバンドルDatabricks」を参照してください。
パイプライン定義で次のテーブル設定プロパティを使用して、取り込む特定の列を選択または選択解除できます。
include_columns
: 必要に応じて、インジェストに含める列のリストを指定します。このオプションを使用して列を明示的に含めると、パイプラインは将来ソースに追加される列を自動的に除外します。将来の列を取り込むには、それらをリストに追加する必要があります。exclude_columns
: 必要に応じて、インジェストから除外する列のリストを指定します。このオプションを使用して列を明示的に除外すると、パイプラインには、今後ソースに追加される列が自動的に含まれます。将来の列を取り込むには、それらをリストに追加する必要があります。
-
Databricks CLI を使用して新しいバンドルを作成します。
Bashdatabricks bundle init
-
バンドルに 2 つの新しいリソース ファイルを追加します。
- パイプライン定義ファイル (
resources/sfdc_pipeline.yml
)。 - データ取り込みの頻度を制御するワークフロー ファイル (
resources/sfdc_job.yml
)。
次に、
resources/sfdc_pipeline.yml
ファイルの例を示します。YAMLvariables:
dest_catalog:
default: main
dest_schema:
default: ingest_destination_schema
# The main pipeline for sfdc_dab
resources:
pipelines:
pipeline_sfdc:
name: salesforce_pipeline
catalog: ${var.dest_catalog}
target: ${var.dest_schema}
ingestion_definition:
connection_name: <salesforce-connection>
objects:
# An array of objects to ingest from Salesforce. This example
# ingests the AccountShare, AccountPartner, and ApexPage objects.
- table:
source_schema: objects
source_table: AccountShare
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
table_configuration:
include_columns: # This can be exclude_columns instead
- <column_a>
- <column_b>
- <column_c>
- table:
source_schema: objects
source_table: AccountPartner
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
- table:
source_schema: objects
source_table: ApexPage
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}次に、
resources/sfdc_job.yml
ファイルの例を示します。YAMLresources:
jobs:
sfdc_dab_job:
name: sfdc_dab_job
trigger:
# Run this job every day, exactly one day from the last run
# See https://docs.databricks.com/api/workspace/jobs/create#trigger
periodic:
interval: 1
unit: DAYS
email_notifications:
on_failure:
- <email-address>
tasks:
- task_key: refresh_pipeline
pipeline_task:
pipeline_id: ${resources.pipelines.pipeline_sfdc.id} - パイプライン定義ファイル (
-
Databricks CLI を使用してパイプラインをデプロイします。
Bashdatabricks bundle deploy
パイプライン定義で次のテーブル設定プロパティを使用して、取り込む特定の列を選択または選択解除できます。
include_columns
: 必要に応じて、インジェストに含める列のリストを指定します。このオプションを使用して列を明示的に含めると、パイプラインは将来ソースに追加される列を自動的に除外します。将来の列を取り込むには、それらをリストに追加する必要があります。exclude_columns
: 必要に応じて、インジェストから除外する列のリストを指定します。このオプションを使用して列を明示的に除外すると、パイプラインには、今後ソースに追加される列が自動的に含まれます。将来の列を取り込むには、それらをリストに追加する必要があります。
パイプラインを作成するには:
databricks pipelines create --json "<pipeline-definition | json-file-path>"
パイプラインを更新するには:
databricks pipelines update --json "<pipeline-definition | json-file-path>"
パイプライン定義を取得するには:
databricks pipelines get "<pipeline-id>"
パイプラインを削除するには:
databricks pipelines delete "<pipeline-id>"
詳細については、以下を実行できます。
databricks pipelines --help
databricks pipelines <create|update|get|delete|...> --help
JSON パイプライン定義の例:
"ingestion_definition": {
"connection_name": "<connection-name>",
"objects": [
{
"table": {
"source_schema": "<source-schema>",
"source_table": "<source-table>",
"destination_catalog": "<destination-catalog>",
"destination_schema": "<destination-schema>",
"table_configuration": {
"include_columns": ["<column-a>", "<column-b>", "<column-c>"]
}
}
}
]
}
パイプラインの開始、スケジュール設定、アラートの設定
-
パイプラインが作成されたら、 Databricks ワークスペースに再度アクセスし、[ パイプライン ] をクリックします。
新しいパイプラインがパイプライン リストに表示されます。
-
パイプラインの詳細を表示するには、パイプライン名をクリックします。
-
パイプラインの詳細ページで、 スケジュール をクリックしてパイプラインをスケジュールできます。
-
パイプラインに通知を設定するには、[ 設定 ] をクリックし、通知を追加します。
パイプラインを実行すると、特定のテーブルに対して 2 つのソース ビューが表示される場合があります。1 つのビューには、数式フィールドのスナップショットが含まれています。もう一方のビューには、数式以外のフィールドの増分データ取得が含まれています。これらのビューは、宛先テーブルで結合されます。
例: 2 つの Salesforce オブジェクトを別々のスキーマに取り込む
このセクションのパイプライン定義の例では、2 つの Salesforce オブジェクトを別々のスキーマに取り込みます。マルチデスティネーションパイプラインのサポートはAPIのみです。
resources:
pipelines:
pipeline_sfdc:
name: salesforce_pipeline
catalog: my_catalog_1 # Location of the pipeline event log
schema: my_schema_1 # Location of the pipeline event log
ingestion_definition:
connection_name: <salesforce-connection>
objects:
- table:
source_schema: objects
source_table: AccountShare
destination_catalog: my_catalog_1 # Location of this table
destination_schema: my_schema_1 # Location of this table
- table:
source_schema: objects
source_table: AccountPartner
destination_catalog: my_catalog_2 # Location of this table
destination_schema: my_schema_2 # Location of this table
例: 1 つの Salesforce オブジェクトを 3 回取り込む
このセクションのパイプライン定義の例では、Salesforce オブジェクトを 3 つの異なる宛先テーブルに取り込みます。マルチデスティネーションパイプラインのサポートはAPIのみです。
オプションで、取り込むテーブルの名前を変更できます。パイプライン内のテーブルの名前を変更すると、そのテーブルは API のみのパイプラインになり、UI でパイプラインを編集できなくなります。
resources:
pipelines:
pipeline_sfdc:
name: salesforce_pipeline
catalog: my_catalog_1 # Location of the pipeline event log
schema: my_schema_1 # Location of the pipeline event log
ingestion_definition:
connection_name: <salesforce-connection>
objects:
- table:
source_schema: objects
source_table: Order
destination_catalog: my_catalog_1 # Location of first copy
destination_schema: my_schema_1 # Location of first copy
- table:
source_schema: objects
source_table: Order
destination_catalog: my_catalog_2 # Location of second copy
destination_schema: my_schema_2 # Location of second copy
- table:
source_schema: objects
source_table: Order
destination_catalog: my_catalog_2 # Location of third copy, renamed
destination_schema: my_schema_2 # Location of third copy, renamed
destination_table: order_duplicate # Table rename