メインコンテンツまでスキップ

Salesforce からデータを取り込む

このページでは、Salesforce からデータを取り込み、Databricks を使用してLakeFlow Connect に読み込む方法について説明します。

始める前に

インジェスト パイプラインを作成するには、次の要件を満たす必要があります。

  • ワークスペースが Unity Catalog に対して有効になっています。
  • サーバレス コンピュートがワークスペースで有効になっています。 サーバレス コンピュートの有効化を参照してください。
  • 接続を作成する予定の場合: メタストアに対する CREATE CONNECTION 権限があります。
  • 既存の接続を使用する予定の場合: 接続オブジェクトに対する USE CONNECTION 権限または ALL PRIVILEGES があります。
  • ターゲット・カタログに対する USE CATALOG 権限があります。
  • 既存のスキーマに対する USE SCHEMA 権限と CREATE TABLE 権限、またはターゲット カタログに対する CREATE SCHEMA 権限を持っている。

Salesforce から取り込むには、次のことをお勧めします。

  • Databricks がデータを取得するために使用できる Salesforce ユーザーを作成します。ユーザーが API アクセスと、取り込む予定のすべてのオブジェクトにアクセスできることを確認します。

インジェスト パイプラインを作成する

必要な権限: 接続 USE CONNECTION または ALL PRIVILEGES

この手順では、インジェスト パイプラインを作成する方法について説明します。取り込まれた各テーブルは、同じ名前 (ただしすべて小文字) でストリーミングテーブルに書き込まれます。

  1. Databricksワークスペースのサイドバーで、 データ取り込み をクリックします。

  2. データの追加 ページの Databricks コネクタ で、 Salesforce をクリックします。

    Salesforce インジェスト ウィザードが開きます。

  3. ウィザードの パイプライン ページで、インジェスト パイプラインの一意の名前を入力します。

  4. 宛先カタログ ドロップダウンで、カタログを選択します。取り込まれたデータとイベント ログは、このカタログに書き込まれます。

  5. Salesforce データへのアクセスに必要な資格情報を保存する Unity Catalog 接続を選択します。

    Salesforce 接続がない場合は、 接続の作成 をクリックします。 メタストアに対する CREATE CONNECTION 特権が必要です。

  6. パイプラインの作成および続行 をクリックします。

  7. ソース 」ページで、取り込むテーブルを選択し、「 次へ 」をクリックします。

    [ すべてのテーブル ] を選択した場合、Salesforce インジェスト コネクタは、ソース スキーマ内のすべての既存および将来のテーブルを送信先スキーマに書き込みます。パイプラインごとに最大 250 個のオブジェクトがあります。

  8. 宛先 ページで、書き込む Unity Catalog カタログとスキーマを選択します。

    既存のスキーマを使用しない場合は、[ スキーマの作成 ] をクリックします。親カタログに対する USE CATALOG 権限と CREATE SCHEMA 権限が必要です。

  9. パイプラインを保存と続行 をクリックします。

  10. (オプション) 設定 ページで、 スケジュールの作成 をクリックします。宛先テーブルを更新する頻度を設定します。

  11. (オプション)パイプライン操作の成功または失敗のEメール 通知を設定します。

  12. パイプラインの保存と実行 をクリックします。

JSON パイプライン定義の例

JSON
"ingestion_definition": {

"connection_name": "<connection-name>",

"objects": [

{

"table": {

"source_schema": "<source-schema>",

"source_table": "<source-table>",

"destination_catalog": "<destination-catalog>",

"destination_schema": "<destination-schema>",

"table_configuration": {

"include_columns": ["<column-a>", "<column-b>", "<column-c>"]

}

}

}

]

}

パイプラインの開始、スケジュール設定、アラートの設定

パイプラインの詳細ページでパイプラインのスケジュールを作成できます。

  1. パイプラインが作成されたら、 Databricks ワークスペースに再度アクセスし、[ パイプライン ] をクリックします。

    新しいパイプラインがパイプライン リストに表示されます。

  2. パイプラインの詳細を表示するには、パイプライン名をクリックします。

  3. パイプラインの詳細ページで、 スケジュール をクリックしてパイプラインをスケジュールできます。

  4. パイプラインに通知を設定するには、[ 設定 ] をクリックし、通知を追加します。

パイプラインに追加するスケジュールごとに、 LakeFlow Connect によってそのジョブが自動的に作成されます。 インジェスト パイプラインは、ジョブ内のタスクです。オプションで、ジョブにタスクを追加できます。

注記

パイプラインを実行すると、特定のテーブルに対して 2 つのソース ビューが表示される場合があります。1 つのビューには、数式フィールドのスナップショットが含まれています。もう一方のビューには、数式以外のフィールドの増分データ取得が含まれています。これらのビューは、宛先テーブルで結合されます。

例: 2 つの Salesforce オブジェクトを別々のスキーマに取り込む

このセクションのパイプライン定義の例では、2 つの Salesforce オブジェクトを別々のスキーマに取り込みます。マルチデスティネーションパイプラインのサポートはAPIのみです。

YAML
resources:
pipelines:
pipeline_sfdc:
name: salesforce_pipeline
catalog: my_catalog_1 # Location of the pipeline event log
schema: my_schema_1 # Location of the pipeline event log
ingestion_definition:
connection_name: <salesforce-connection>
objects:
- table:
source_schema: objects
source_table: AccountShare
destination_catalog: my_catalog_1 # Location of this table
destination_schema: my_schema_1 # Location of this table
- table:
source_schema: objects
source_table: AccountPartner
destination_catalog: my_catalog_2 # Location of this table
destination_schema: my_schema_2 # Location of this table

例: 1 つの Salesforce オブジェクトを 3 回取り込む

このセクションのパイプライン定義の例では、Salesforce オブジェクトを 3 つの異なる宛先テーブルに取り込みます。マルチデスティネーションパイプラインのサポートはAPIのみです。

オプションで、取り込むテーブルの名前を変更できます。パイプライン内のテーブルの名前を変更すると、そのテーブルは API のみのパイプラインになり、UI でパイプラインを編集できなくなります。

YAML
resources:
pipelines:
pipeline_sfdc:
name: salesforce_pipeline
catalog: my_catalog_1 # Location of the pipeline event log
schema: my_schema_1 # Location of the pipeline event log
ingestion_definition:
connection_name: <salesforce-connection>
objects:
- table:
source_schema: objects
source_table: Order
destination_catalog: my_catalog_1 # Location of first copy
destination_schema: my_schema_1 # Location of first copy
- table:
source_schema: objects
source_table: Order
destination_catalog: my_catalog_2 # Location of second copy
destination_schema: my_schema_2 # Location of second copy
- table:
source_schema: objects
source_table: Order
destination_catalog: my_catalog_2 # Location of third copy, renamed
destination_schema: my_schema_2 # Location of third copy, renamed
destination_table: order_duplicate # Table rename

追加のリソース