Salesforce からデータを取り込む

プレビュー

LakeFlow Connect はゲート付きパブリック プレビュー段階です。 プレビューに参加するには、Databricks アカウント チームにお問い合わせください。

この記事では、LakeFlow Connect を使用して Salesforce からデータを取り込み、Databricks にロードする方法について説明します。 結果として得られる取り込みパイプラインはUnity Catalogによって管理され、サーバレス コンピュートとDelta Live Tablesによって強化されます。

Salesforce インジェスト コネクタは、次のソースをサポートしています。

  • Salesforce Sales Cloud

始める前に

取り込みパイプラインを作成するには、次の要件を満たす必要があります。

  • ワークスペースは Unity Catalog に対して有効になっています。

  • サーバレス コンピュートは、ノートブック、ワークフロー、およびDelta Live Tablesで有効です。 「サーバレス コンピュートを有効にする」を参照してください。

  • 接続を作成するには: メタストアに CREATE CONNECTION があります。

    既存の接続を使用するには: 接続オブジェクトに USE CONNECTION または ALL PRIVILEGES があります。

  • USE CATALOG ターゲットカタログ上。

  • USE SCHEMA ターゲットカタログ上の既存のスキーマまたはCREATE SCHEMACREATE TABLE

Salesforce 接続を作成する

必要なアクセス許可: メタストア CREATE CONNECTION 。 これを許可するには、メタストア管理者に連絡してください。

代わりに既存の接続を使用するには、接続に USE CONNECTION または ALL PRIVILEGES が必要です。

Salesforce 接続を作成するには、次の手順を実行します。

  1. Databricksワークスペースで、 [カタログ] > [外部ロケーション] > [接続] > [接続の作成] をクリックします。

  2. [ Connection name] で、Salesforce 接続の一意の名前を入力します。

  3. [接続タイプ] で [Salesforce] をクリックします。

  4. 認証タイプOAuthに設定し、次のフィールドに入力します。

    • クライアント ID を Salesforce から取得したコンシューマキーに設定します。

    • クライアントシークレットを、Salesforce から取得したコンシューマシークレットに設定します。

    • OAuth スコープをリテラル文字列api  refresh_tokenに設定します。

  5. Salesforce サンドボックス アカウントから取り込んでいる場合は、サンドボックスをtrueに設定します。

  6. 「Salesforce でログイン」をクリックします。

    注:

    接続テスト は、ホストが到達可能であることをテストします。 ユーザ クレデンシャルが正しいユーザ名とパスワードの値であるかどうかはテストされません。

    Salesforce ログイン
  7. Salesforce ユーザー アカウントでログインします。 ログインして認証すると、 refresh_token が自動的に作成されます。

  8. (オプション) Salesforce サンドボックスを使用している場合は、 「カスタム ドメインの使用」をクリックし、サンドボックスの URL を指定してログインに進みます。

    カスタムドメインボタンを使用する
    サンドボックスのURLを入力
  9. [接続の作成] ページに戻ったら、[作成] をクリックします。

Delta Live Tablesパイプラインの作成

このステップでは、インジェスト パイプラインの作成方法を説明します。 取り込まれた各テーブルは、明示的に名前を変更しない限り、デフォルトで宛先内の同じ名前 (ただしすべて小文字) を持つストリーミング テーブルに対応します。

  1. 個人的なアクセスポイントを生成します

  2. 次のコードを Python ノートブックのセルに貼り付けます。

    # SHOULD MODIFY
    # This step sets up a PAT to make API calls to the Databricks service.
    api_token = "<personal-access-token>"
    
  3. 次のコードをノートブックの 2 番目のセルに貼り付けます。

    # DO NOT MODIFY
    # This step sets up a connection to make API calls to the Databricks service.
    import requests
    import json
    
    
    notebook_context = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
    workspace_url = notebook_context.apiUrl().get()
    api_url = f"{workspace_url}/api/2.0/pipelines"
    
    
    headers = {
       'Authorization': 'Bearer {}'.format(api_token),
       'Content-Type': 'application/json'
    }
    
    
    def check_response(response):
       if response.status_code == 200:
          print("Response from API:\n{}".format(json.dumps(response.json(), indent=2, sort_keys=False)))
       else:
          print(f"Failed to retrieve data: error_code={response.status_code}, error_message={response.json().get('message', response.text)}")
    
    
    # DO NOT MODIFY
    # These are API definition to be used.
    def create_pipeline(pipeline_definition: str):
    response = requests.post(url=api_url, headers=headers, data=pipeline_definition)
    check_response(response)
    
    
    def edit_pipeline(id: str, pipeline_definition: str):
    response = requests.put(url=f"{api_url}/{id}", headers=headers, data=pipeline_definition)
    check_response(response)
    
    
    def delete_pipeline(id: str):
    response = requests.delete(url=f"{api_url}/{id}", headers=headers)
    check_response(response)
    
    
    def get_pipeline(id: str):
    response = requests.get(url=f"{api_url}/{id}", headers=headers)
    check_response(response)
    
    
    def list_pipeline(filter: str = ""):
    body = "" if len(filter) == 0 else f"""{{"filter": "{filter} AND pipeline_type IN ('MANAGED_INGESTION')"}}"""
    response = requests.get(url=api_url, headers=headers, data=body)
    check_response(response)
    
  4. 次のコードをノートブックの 3 番目のセルに貼り付けます。

    # SHOULD MODIFY
    # Update this notebook to configure your ingestion pipeline.
    
    
    pipeline_spec = """
    {
    "name": "<YOUR_PIPELINE_NAME>",
    "ingestion_definition": {
       "connection_name": "<YOUR_CONNECTON_NAME>",
       "objects": [
          {
             "table": {
             "source_schema": "objects",
             "source_table": "<YOUR_FIRST_TABLE>",
             "destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
             "destination_schema": "<YOUR_DATABRICKS_SCHEMA>",
             "destination_table": "<YOUR_DATABRICKS_TABLE>"
             }
          },
          {
             "table": {
             "source_schema": "objects",
             "source_table": "YOUR_SECOND_TABLE",
             "destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
             "destination_schema": "<YOUR_DATABRICKS_SCHEMA>",
             "destination_table": "<YOUR_DATABRICKS_SECOND_TABLE>"
             }
           }
         ]
       }
    }
    """
    
    
    create_pipeline(pipeline_spec)
    
  5. 最初のノートブック セルを変更せずに実行します。

  6. テンプレート ノートブックの 2 番目のセルをパイプラインの詳細に合わせて変更します。 たとえば、取り込むテーブルと送信先などです。

  7. テンプレート ノートブックの 2 番目のセルを実行します。 これはcreate_pipelineを実行します。

    1. list_pipeline を実行すると、パイプライン ID とその詳細を表示できます

    2. edit_pipeline を実行してパイプライン定義を編集できます。

    3. delete_pipeline を実行してパイプラインを削除できます。

パイプラインを作成するには:

databricks pipelines create --json "<pipeline-definition | json-file-path>"

パイプラインを更新するには:

databricks pipelines update --json "<<pipeline-definition | json-file-path>"

パイプライン定義を取得するには:

databricks pipelines get "<pipeline-id>"

パイプラインを削除するには:

databricks pipelines delete "<pipeline-id>"

詳細については、次のコマンドを実行してください。

databricks pipelines --help
databricks pipelines <create|update|get|delete|...> --help

パイプラインを開始、スケジュール、アラートを設定する

  1. パイプラインが作成されたら、Databricks ワークスペースに戻り、 [Delta Live Tables]をクリックします。

    新しいパイプラインがパイプライン リストに表示されます。

  2. パイプラインの詳細を表示するには、パイプライン名をクリックします。

  3. パイプラインの詳細ページで、 [開始]をクリックしてパイプラインを実行します。 [スケジュール]をクリックすると、パイプラインをスケジュールできます。

  4. パイプラインにアラートを設定するには、 [スケジュール]をクリックし、 [その他のオプション]をクリックして、通知を追加します。

  5. インジェストが完了したら、テーブルに対してクエリを実行できます。

注:

パイプラインを実行すると、特定のテーブルに対して 2 つのソース ビューが表示される場合があります。 1 つのビューには、数式フィールドのスナップショットが含まれます。 もう一方のビューには、数式以外のフィールドの増分データ取得が含まれています。 これらのビューは、宛先テーブルで結合されます。