Salesforce からデータを取り込む
プレビュー
LakeFlow Connect はゲート付きパブリック プレビュー段階です。 プレビューに参加するには、Databricks アカウント チームにお問い合わせください。
この記事では、LakeFlow Connect を使用して Salesforce からデータを取り込み、Databricks にロードする方法について説明します。 結果として得られる取り込みパイプラインはUnity Catalogによって管理され、サーバレス コンピュートとDelta Live Tablesによって強化されます。
Salesforce インジェスト コネクタは、次のソースをサポートしています。
Salesforce Sales Cloud
始める前に
取り込みパイプラインを作成するには、次の要件を満たす必要があります。
ワークスペースは Unity Catalog に対して有効になっています。
サーバレス コンピュートは、ノートブック、ワークフロー、およびDelta Live Tablesで有効です。 「サーバレス コンピュートを有効にする」を参照してください。
接続を作成するには: メタストアに
CREATE CONNECTION
があります。既存の接続を使用するには: 接続オブジェクトに
USE CONNECTION
またはALL PRIVILEGES
があります。USE CATALOG
ターゲットカタログ上。USE SCHEMA
ターゲットカタログ上の既存のスキーマまたはCREATE SCHEMA
CREATE TABLE
。
Salesforce 接続を作成する
必要なアクセス許可: メタストア CREATE CONNECTION
。 これを許可するには、メタストア管理者に連絡してください。
代わりに既存の接続を使用するには、接続に USE CONNECTION
または ALL PRIVILEGES
が必要です。
Salesforce 接続を作成するには、次の手順を実行します。
Databricksワークスペースで、 [カタログ] > [外部ロケーション] > [接続] > [接続の作成] をクリックします。
[ Connection name] で、Salesforce 接続の一意の名前を入力します。
[接続タイプ] で [Salesforce] をクリックします。
認証タイプをOAuthに設定し、次のフィールドに入力します。
クライアント ID を Salesforce から取得したコンシューマキーに設定します。
クライアントシークレットを、Salesforce から取得したコンシューマシークレットに設定します。
OAuth スコープをリテラル文字列
api refresh_token
に設定します。
Salesforce サンドボックス アカウントから取り込んでいる場合は、サンドボックスを
true
に設定します。「Salesforce でログイン」をクリックします。
注:
接続テスト は、ホストが到達可能であることをテストします。 ユーザ クレデンシャルが正しいユーザ名とパスワードの値であるかどうかはテストされません。
Salesforce ユーザー アカウントでログインします。 ログインして認証すると、
refresh_token
が自動的に作成されます。(オプション) Salesforce サンドボックスを使用している場合は、 「カスタム ドメインの使用」をクリックし、サンドボックスの URL を指定してログインに進みます。
[接続の作成] ページに戻ったら、[作成] をクリックします。
Delta Live Tablesパイプラインの作成
このステップでは、インジェスト パイプラインの作成方法を説明します。 取り込まれた各テーブルは、明示的に名前を変更しない限り、デフォルトで宛先内の同じ名前 (ただしすべて小文字) を持つストリーミング テーブルに対応します。
次のコードを Python ノートブックのセルに貼り付けます。
# SHOULD MODIFY # This step sets up a PAT to make API calls to the Databricks service. api_token = "<personal-access-token>"
次のコードをノートブックの 2 番目のセルに貼り付けます。
# DO NOT MODIFY # This step sets up a connection to make API calls to the Databricks service. import requests import json notebook_context = dbutils.notebook.entry_point.getDbutils().notebook().getContext() workspace_url = notebook_context.apiUrl().get() api_url = f"{workspace_url}/api/2.0/pipelines" headers = { 'Authorization': 'Bearer {}'.format(api_token), 'Content-Type': 'application/json' } def check_response(response): if response.status_code == 200: print("Response from API:\n{}".format(json.dumps(response.json(), indent=2, sort_keys=False))) else: print(f"Failed to retrieve data: error_code={response.status_code}, error_message={response.json().get('message', response.text)}") # DO NOT MODIFY # These are API definition to be used. def create_pipeline(pipeline_definition: str): response = requests.post(url=api_url, headers=headers, data=pipeline_definition) check_response(response) def edit_pipeline(id: str, pipeline_definition: str): response = requests.put(url=f"{api_url}/{id}", headers=headers, data=pipeline_definition) check_response(response) def delete_pipeline(id: str): response = requests.delete(url=f"{api_url}/{id}", headers=headers) check_response(response) def get_pipeline(id: str): response = requests.get(url=f"{api_url}/{id}", headers=headers) check_response(response) def list_pipeline(filter: str = ""): body = "" if len(filter) == 0 else f"""{{"filter": "{filter} AND pipeline_type IN ('MANAGED_INGESTION')"}}""" response = requests.get(url=api_url, headers=headers, data=body) check_response(response)
次のコードをノートブックの 3 番目のセルに貼り付けます。
# SHOULD MODIFY # Update this notebook to configure your ingestion pipeline. pipeline_spec = """ { "name": "<YOUR_PIPELINE_NAME>", "ingestion_definition": { "connection_name": "<YOUR_CONNECTON_NAME>", "objects": [ { "table": { "source_schema": "objects", "source_table": "<YOUR_FIRST_TABLE>", "destination_catalog": "<YOUR_DATABRICKS_CATALOG>", "destination_schema": "<YOUR_DATABRICKS_SCHEMA>", "destination_table": "<YOUR_DATABRICKS_TABLE>" } }, { "table": { "source_schema": "objects", "source_table": "YOUR_SECOND_TABLE", "destination_catalog": "<YOUR_DATABRICKS_CATALOG>", "destination_schema": "<YOUR_DATABRICKS_SCHEMA>", "destination_table": "<YOUR_DATABRICKS_SECOND_TABLE>" } } ] } } """ create_pipeline(pipeline_spec)
最初のノートブック セルを変更せずに実行します。
テンプレート ノートブックの 2 番目のセルをパイプラインの詳細に合わせて変更します。 たとえば、取り込むテーブルと送信先などです。
テンプレート ノートブックの 2 番目のセルを実行します。 これは
create_pipeline
を実行します。list_pipeline を実行すると、パイプライン ID とその詳細を表示できます
edit_pipeline を実行してパイプライン定義を編集できます。
delete_pipeline を実行してパイプラインを削除できます。
パイプラインを作成するには:
databricks pipelines create --json "<pipeline-definition | json-file-path>"
パイプラインを更新するには:
databricks pipelines update --json "<<pipeline-definition | json-file-path>"
パイプライン定義を取得するには:
databricks pipelines get "<pipeline-id>"
パイプラインを削除するには:
databricks pipelines delete "<pipeline-id>"
詳細については、次のコマンドを実行してください。
databricks pipelines --help
databricks pipelines <create|update|get|delete|...> --help
パイプラインを開始、スケジュール、アラートを設定する
パイプラインが作成されたら、Databricks ワークスペースに戻り、 [Delta Live Tables]をクリックします。
新しいパイプラインがパイプライン リストに表示されます。
パイプラインの詳細を表示するには、パイプライン名をクリックします。
パイプラインの詳細ページで、 [開始]をクリックしてパイプラインを実行します。 [スケジュール]をクリックすると、パイプラインをスケジュールできます。
パイプラインにアラートを設定するには、 [スケジュール]をクリックし、 [その他のオプション]をクリックして、通知を追加します。
インジェストが完了したら、テーブルに対してクエリを実行できます。
注:
パイプラインを実行すると、特定のテーブルに対して 2 つのソース ビューが表示される場合があります。 1 つのビューには、数式フィールドのスナップショットが含まれます。 もう一方のビューには、数式以外のフィールドの増分データ取得が含まれています。 これらのビューは、宛先テーブルで結合されます。