メインコンテンツまでスキップ

チュートリアル: Gmail メール送信者オペレーター

備考

プレビュー

この機能は パブリック プレビュー段階です。

このチュートリアルでは、Lakeflow Designer 用のpython-run-function演算子を作成して、DataFrame の内容を CSV 添付ファイルとして Gmail 経由で送信する手順を説明します。この例を使用して、通知の送信や外部システムへの書き込みなどの副作用を実行するYAMLベースの演算子を作成する方法を学びましょう。詳細については、 「Lakeflow Designer のユーザー定義演算子」を参照してください。

要件

  • シークレットスコープを作成するためのアクセス権を備えたDatabricksワークスペース。
  • Google アプリ パスワードが設定された Gmail アカウント (多要素認証 (MFA) が有効な場合に必要)。
  • ローカル開発マシンにDatabricks CLIがインストールされていること。

ステップ 1: シークレットを設定する

Gmail 資格情報をDatabricksシークレットスコープに保存して、オペレーターがランタイム時に取得できるようにします。

  1. Databricks CLIを使用してシークレットスコープを作成します。

    Bash
    databricks secrets create-scope my_email_scope
  2. Gmailアプリのパスワードを以下の範囲に保存してください。

    Bash
    databricks secrets put-secret my_email_scope gmail_app_password

    秘密の値を入力するように求められます。Gmailアプリのパスワードを貼り付けて保存してください。

ステップ 2: run()関数を作成します

python-run-function演算子型には、次のシグネチャを持つrun()関数が必要です。

Python
def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
  • config : Lakeflow Designer UI でユーザーが指定した設定値。
  • inputs : ポート名をキーとする入力DataFrames 。
  • spark : アクティブな Spark セッション。

この関数は、出力ポート名をキーとする出力DataFramesの辞書を返さなければなりません。

ノートブックのセル内で関数を定義し、テストします。

Python
from typing import Dict, Any

def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
input_df = inputs["data"]

# Skip side effects during Designer preview
if config.get("is_preview", False):
return {"data": input_df}

import smtplib
import os
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from email import encoders

sender_email = config.get("sender_email", "")
secret_scope = config.get("secret_scope", "")
secret_key = config.get("secret_key", "")
recipients_raw = config.get("recipients", "")
subject = config.get("subject", "")
body = config.get("body", "")

if not sender_email:
raise ValueError("Sender Email is required.")
if not secret_scope or not secret_key:
raise ValueError("Secret Scope and Secret Key are required.")
if not recipients_raw:
raise ValueError("At least one recipient is required.")

recipients = [r.strip() for r in recipients_raw.split(",") if r.strip()]
if not recipients:
raise ValueError("At least one valid recipient email is required.")

# Retrieve password from Databricks secrets
from pyspark.dbutils import DBUtils
dbutils = DBUtils(spark)
sender_password = dbutils.secrets.get(scope=secret_scope, key=secret_key)

# Convert DataFrame to CSV
pdf = input_df.toPandas()
file_path = "/tmp/designer_email_attachment.csv"
pdf.to_csv(file_path, index=False)

# Send email to each recipient
for recipient in recipients:
msg = MIMEMultipart()
msg["From"] = sender_email
msg["To"] = recipient
msg["Subject"] = subject
msg.attach(MIMEText(body, "plain"))

with open(file_path, "rb") as attachment:
part = MIMEBase("application", "octet-stream")
part.set_payload(attachment.read())
encoders.encode_base64(part)
part.add_header(
"Content-Disposition",
f"attachment; filename={os.path.basename(file_path)}",
)
msg.attach(part)

with smtplib.SMTP_SSL("smtp.gmail.com", 465) as server:
server.login(sender_email, sender_password)
server.send_message(msg)

# Clean up temp file
if os.path.exists(file_path):
os.remove(file_path)

return {"data": input_df}

ステップ 3: 機能をテストする

サンプルDataFrameを使用して関数をテストしてください。

Python
test_df = spark.createDataFrame(
[("Alice", 100), ("Bob", 200)],
["name", "amount"]
)

# Test in preview mode (no email sent)
result = run(
config={
"is_preview": True,
"sender_email": "you@gmail.com",
"secret_scope": "my_email_scope",
"secret_key": "gmail_app_password",
"recipients": "alice@example.com",
"subject": "Test",
"body": "Test body"
},
inputs={"data": test_df},
spark=spark
)

result["data"].show()
# Expected: the original DataFrame, unchanged
注記

設定内のsecret_scopesecret_key値は、ステップ 1 で作成したシークレットスコープとキーの 名前 であり、実際のパスワードではありません。 オペレーターはこれらの名前を使用して、実行時にDatabricksシークレットからパスワードを取得します。

重要

まずis_preview Trueに設定してテストし、Eメールを送信せずにパススルー動作を確認してください。 実際のEメールをテストする準備ができたら、 is_previewFalseに設定してください。

ステップ 4: YAML 定義を構築する

以下の内容でgmail_email_sender.yamlという名前のファイルを作成してください。

YAML
schema: user-defined-operator-v0.1.0
id: gmail_email_sender
type: python-run-function
version: '1.0.0'
name: Gmail Email Sender
description: Sends the input DataFrame as a CSV attachment via Gmail SMTP to one or more recipients.

config:
type: object
properties:
is_preview:
type: boolean
format: is_preview
default: false
sender_email:
type: string
title: Sender Email
default: ''
examples:
- 'you@gmail.com'
x-ui:
widget: input
secret_scope:
type: string
title: Secret Scope
default: ''
examples:
- 'my_email_scope'
x-ui:
widget: input
secret_key:
type: string
title: Secret Key
default: ''
examples:
- 'gmail_app_password'
x-ui:
widget: input
recipients:
type: string
title: Recipients
default: ''
examples:
- 'alice@example.com, bob@example.com'
x-ui:
widget: textarea
rows: 2
subject:
type: string
title: Subject
default: ''
examples:
- 'Designer Output Data'
x-ui:
widget: input
body:
type: string
title: Email Body
default: "Hello,\n\nAttached is the latest data.\n\nBest,\nDatabricks Workflow"
x-ui:
widget: textarea
rows: 6
required:
- sender_email
- secret_scope
- secret_key
- recipients
- subject
additionalProperties: false

ports:
input:
- name: data
title: Input Data
mime: application/vnd.databricks.dataframe
output:
- name: data
title: Output Data
mime: application/vnd.databricks.dataframe

run_function:
type: inline
code: |
from typing import Dict, Any

def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
input_df = inputs["data"]

if config.get("is_preview", False):
return {"data": input_df}

import smtplib
import os
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from email import encoders

sender_email = config.get("sender_email", "")
secret_scope = config.get("secret_scope", "")
secret_key = config.get("secret_key", "")
recipients_raw = config.get("recipients", "")
subject = config.get("subject", "")
body = config.get("body", "")

if not sender_email:
raise ValueError("Sender Email is required.")
if not secret_scope or not secret_key:
raise ValueError("Secret Scope and Secret Key are required.")
if not recipients_raw:
raise ValueError("At least one recipient is required.")

recipients = [r.strip() for r in recipients_raw.split(",") if r.strip()]
if not recipients:
raise ValueError("At least one valid recipient email is required.")

from pyspark.dbutils import DBUtils
dbutils = DBUtils(spark)
sender_password = dbutils.secrets.get(scope=secret_scope, key=secret_key)

pdf = input_df.toPandas()
file_path = "/tmp/designer_email_attachment.csv"
pdf.to_csv(file_path, index=False)

for recipient in recipients:
msg = MIMEMultipart()
msg["From"] = sender_email
msg["To"] = recipient
msg["Subject"] = subject
msg.attach(MIMEText(body, "plain"))

with open(file_path, "rb") as attachment:
part = MIMEBase("application", "octet-stream")
part.set_payload(attachment.read())
encoders.encode_base64(part)
part.add_header(
"Content-Disposition",
f"attachment; filename={os.path.basename(file_path)}",
)
msg.attach(part)

with smtplib.SMTP_SSL("smtp.gmail.com", 465) as server:
server.login(sender_email, sender_password)
server.send_message(msg)

if os.path.exists(file_path):
os.remove(file_path)

return {"data": input_df}

ステップ 5: オペレーターを保存して登録する

  1. YAML ファイルをDatabricksに保存します。 例えば:

    /Workspace/Users/<user-name>/gmail_email_sender.yaml
  2. .user_defined_operators.yamlファイルに演算子を追加してください。

    YAML
    operators:
    - /Workspace/Users/<user-name>/gmail_email_sender.yaml

登録オプションの詳細については、 「オペレーターを検出可能にする」を参照してください。

権限

このオペレーターを含むワークフローを実行するユーザーは、シークレットスコープへのアクセス権がREAD必要であるか、オペレーター構成で独自のシークレットスコープとキー値を指定できます。ユーザーはワークスペース内のYAMLファイルへの読み取りアクセス権も必要です。

シークレットスコープへのアクセスを許可するには:

Bash
databricks secrets put-acl my_email_scope <user-or-group> READ

Lakeflow Designer での演算子の使用

登録後、 Lakeflow Designerにオペレーターが表示され、データソースの入力ポートと、送信者の電子メール、シークレットスコープ、シークレットキー、受信者、件名、本文の設定フィールドが表示されます。

ワークフロー実行時、オペレーターは入力されたDataFrameをCSVに変換し、メールに添付して各受信者に送信します。 DataFrameは変更されずにそのまま出力ポートに渡されるため、下流で追加の演算子を連結できます。ワークフローのプレビュー中は、電子メールは送信されません。