Tutorial: Operador de envio email do Gmail
Visualização
Este recurso está em Pré-visualização Pública.
Este tutorial mostra como criar um operador python-run-function para o Lakeflow Designer que envia o conteúdo de um DataFrame como um anexo CSV via Gmail. Use este exemplo para aprender como criar operadores baseados em YAML que executam efeitos colaterais, como enviar notificações ou gravar em sistemas externos. Para saber mais, consulte Operadores definidos pelo usuário no Lakeflow Designer.
Requisitos
- Um workspace Databricks com acesso para criar Escopo Secreto.
- Uma account do Gmail com uma senha de aplicativo do Google (necessária quando a autenticação multifator (MFA) está ativada).
- A CLI do Databricks instalada em sua máquina de desenvolvimento local.
o passo 1: Configurar segredos
Armazene suas credenciais do Gmail em um Escopo Secreto Databricks para que o operador possa recuperá-las em tempo de execução.
-
Criar um escopo secreto usando a CLI Databricks :
Bashdatabricks secrets create-scope my_email_scope -
Armazene a senha do seu aplicativo Gmail no seguinte escopo:
Bashdatabricks secrets put-secret my_email_scope gmail_app_passwordVocê será solicitado a inserir o valor secreto. Cole a senha do seu aplicativo Gmail e salve.
o passo 2: Escreva a função run()
O tipo de operador python-run-function requer uma função run() com esta assinatura:
def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
config: Valores de configuração fornecidos pelo usuário na interface do Lakeflow Designer.inputs: DataFrames de entrada indexados pelo nome da porta.spark: A sessão Spark ativa.
A função deve retornar um dicionário de DataFrames de saída indexados pelo nome da porta de saída.
Defina e teste a função em uma célula do Notebook:
from typing import Dict, Any
def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
input_df = inputs["data"]
# Skip side effects during Designer preview
if config.get("is_preview", False):
return {"data": input_df}
import smtplib
import os
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from email import encoders
sender_email = config.get("sender_email", "")
secret_scope = config.get("secret_scope", "")
secret_key = config.get("secret_key", "")
recipients_raw = config.get("recipients", "")
subject = config.get("subject", "")
body = config.get("body", "")
if not sender_email:
raise ValueError("Sender Email is required.")
if not secret_scope or not secret_key:
raise ValueError("Secret Scope and Secret Key are required.")
if not recipients_raw:
raise ValueError("At least one recipient is required.")
recipients = [r.strip() for r in recipients_raw.split(",") if r.strip()]
if not recipients:
raise ValueError("At least one valid recipient email is required.")
# Retrieve password from Databricks secrets
from pyspark.dbutils import DBUtils
dbutils = DBUtils(spark)
sender_password = dbutils.secrets.get(scope=secret_scope, key=secret_key)
# Convert DataFrame to CSV
pdf = input_df.toPandas()
file_path = "/tmp/designer_email_attachment.csv"
pdf.to_csv(file_path, index=False)
# Send email to each recipient
for recipient in recipients:
msg = MIMEMultipart()
msg["From"] = sender_email
msg["To"] = recipient
msg["Subject"] = subject
msg.attach(MIMEText(body, "plain"))
with open(file_path, "rb") as attachment:
part = MIMEBase("application", "octet-stream")
part.set_payload(attachment.read())
encoders.encode_base64(part)
part.add_header(
"Content-Disposition",
f"attachment; filename={os.path.basename(file_path)}",
)
msg.attach(part)
with smtplib.SMTP_SSL("smtp.gmail.com", 465) as server:
server.login(sender_email, sender_password)
server.send_message(msg)
# Clean up temp file
if os.path.exists(file_path):
os.remove(file_path)
return {"data": input_df}
o passo 3: Teste a função
Teste a função com um DataFrame de exemplo:
test_df = spark.createDataFrame(
[("Alice", 100), ("Bob", 200)],
["name", "amount"]
)
# Test in preview mode (no email sent)
result = run(
config={
"is_preview": True,
"sender_email": "you@gmail.com",
"secret_scope": "my_email_scope",
"secret_key": "gmail_app_password",
"recipients": "alice@example.com",
"subject": "Test",
"body": "Test body"
},
inputs={"data": test_df},
spark=spark
)
result["data"].show()
# Expected: the original DataFrame, unchanged
Os valores secret_scope e secret_key na configuração são os nomes do Escopo Secreto e key que você criou na etapa 1 -- não a senha real. O operador usa esses nomes para recuperar a senha dos segredos do Databricks em tempo de execução.
Teste primeiro com is_preview definido como True para verificar o comportamento de passagem sem enviar nenhum email. Quando você estiver pronto para testar o email real, defina is_preview para False.
o passo 4: Construir a definição YAML
Crie um arquivo chamado gmail_email_sender.yaml com o seguinte conteúdo:
schema: user-defined-operator-v0.1.0
id: gmail_email_sender
type: python-run-function
version: '1.0.0'
name: Gmail Email Sender
description: Sends the input DataFrame as a CSV attachment via Gmail SMTP to one or more recipients.
config:
type: object
properties:
is_preview:
type: boolean
format: is_preview
default: false
sender_email:
type: string
title: Sender Email
default: ''
examples:
- 'you@gmail.com'
x-ui:
widget: input
secret_scope:
type: string
title: Secret Scope
default: ''
examples:
- 'my_email_scope'
x-ui:
widget: input
secret_key:
type: string
title: Secret Key
default: ''
examples:
- 'gmail_app_password'
x-ui:
widget: input
recipients:
type: string
title: Recipients
default: ''
examples:
- 'alice@example.com, bob@example.com'
x-ui:
widget: textarea
rows: 2
subject:
type: string
title: Subject
default: ''
examples:
- 'Designer Output Data'
x-ui:
widget: input
body:
type: string
title: Email Body
default: "Hello,\n\nAttached is the latest data.\n\nBest,\nDatabricks Workflow"
x-ui:
widget: textarea
rows: 6
required:
- sender_email
- secret_scope
- secret_key
- recipients
- subject
additionalProperties: false
ports:
input:
- name: data
title: Input Data
mime: application/vnd.databricks.dataframe
output:
- name: data
title: Output Data
mime: application/vnd.databricks.dataframe
run_function:
type: inline
code: |
from typing import Dict, Any
def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
input_df = inputs["data"]
if config.get("is_preview", False):
return {"data": input_df}
import smtplib
import os
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from email import encoders
sender_email = config.get("sender_email", "")
secret_scope = config.get("secret_scope", "")
secret_key = config.get("secret_key", "")
recipients_raw = config.get("recipients", "")
subject = config.get("subject", "")
body = config.get("body", "")
if not sender_email:
raise ValueError("Sender Email is required.")
if not secret_scope or not secret_key:
raise ValueError("Secret Scope and Secret Key are required.")
if not recipients_raw:
raise ValueError("At least one recipient is required.")
recipients = [r.strip() for r in recipients_raw.split(",") if r.strip()]
if not recipients:
raise ValueError("At least one valid recipient email is required.")
from pyspark.dbutils import DBUtils
dbutils = DBUtils(spark)
sender_password = dbutils.secrets.get(scope=secret_scope, key=secret_key)
pdf = input_df.toPandas()
file_path = "/tmp/designer_email_attachment.csv"
pdf.to_csv(file_path, index=False)
for recipient in recipients:
msg = MIMEMultipart()
msg["From"] = sender_email
msg["To"] = recipient
msg["Subject"] = subject
msg.attach(MIMEText(body, "plain"))
with open(file_path, "rb") as attachment:
part = MIMEBase("application", "octet-stream")
part.set_payload(attachment.read())
encoders.encode_base64(part)
part.add_header(
"Content-Disposition",
f"attachment; filename={os.path.basename(file_path)}",
)
msg.attach(part)
with smtplib.SMTP_SSL("smtp.gmail.com", 465) as server:
server.login(sender_email, sender_password)
server.send_message(msg)
if os.path.exists(file_path):
os.remove(file_path)
return {"data": input_df}
o passo 5: Salvar e registrar a operadora
-
Salve o arquivo YAML em seu workspace Databricks . Por exemplo:
/Workspace/Users/<user-name>/gmail_email_sender.yaml -
Adicione o operador ao seu arquivo
.user_defined_operators.yaml:YAMLoperators:
- /Workspace/Users/<user-name>/gmail_email_sender.yaml
Para obter mais informações sobre as opções de registro, consulte Torne sua operadora detectável.
Permissões
Usuários que executam um fluxo de trabalho contendo este operador precisam de acesso READ ao Escopo Secreto, ou podem fornecer seu próprio Escopo Secreto e valores key na configuração do operador. Os usuários também precisam de acesso de leitura ao arquivo YAML no workspace.
Para conceder acesso ao Escopo Secreto:
databricks secrets put-acl my_email_scope <user-or-group> READ
Utilizando o operador no Lakeflow Designer
Após o cadastro, o operador aparece no Lakeflow Designer com uma porta de entrada para sua fonte de dados e campos de configuração para email do remetente, Escopo Secreto, chave secreta, destinatários, assunto e corpo da mensagem.
Quando o fluxo de trabalho é executado, o operador converte o DataFrame de entrada em CSV, anexa-o a um email e o envia para cada destinatário. O DataFrame passa sem alterações para a porta de saída, permitindo que você encadeie operadores adicionais posteriormente. Durante a visualização do fluxo de trabalho, nenhum email é enviado.