Pular para o conteúdo principal

Tutorial: Operador de envio email do Gmail

info

Visualização

Este recurso está em Pré-visualização Pública.

Este tutorial mostra como criar um operador python-run-function para o Lakeflow Designer que envia o conteúdo de um DataFrame como um anexo CSV via Gmail. Use este exemplo para aprender como criar operadores baseados em YAML que executam efeitos colaterais, como enviar notificações ou gravar em sistemas externos. Para saber mais, consulte Operadores definidos pelo usuário no Lakeflow Designer.

Requisitos

  • Um workspace Databricks com acesso para criar Escopo Secreto.
  • Uma account do Gmail com uma senha de aplicativo do Google (necessária quando a autenticação multifator (MFA) está ativada).
  • A CLI do Databricks instalada em sua máquina de desenvolvimento local.

o passo 1: Configurar segredos

Armazene suas credenciais do Gmail em um Escopo Secreto Databricks para que o operador possa recuperá-las em tempo de execução.

  1. Criar um escopo secreto usando a CLI Databricks :

    Bash
    databricks secrets create-scope my_email_scope
  2. Armazene a senha do seu aplicativo Gmail no seguinte escopo:

    Bash
    databricks secrets put-secret my_email_scope gmail_app_password

    Você será solicitado a inserir o valor secreto. Cole a senha do seu aplicativo Gmail e salve.

o passo 2: Escreva a função run()

O tipo de operador python-run-function requer uma função run() com esta assinatura:

Python
def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
  • config : Valores de configuração fornecidos pelo usuário na interface do Lakeflow Designer.
  • inputs : DataFrames de entrada indexados pelo nome da porta.
  • spark : A sessão Spark ativa.

A função deve retornar um dicionário de DataFrames de saída indexados pelo nome da porta de saída.

Defina e teste a função em uma célula do Notebook:

Python
from typing import Dict, Any

def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
input_df = inputs["data"]

# Skip side effects during Designer preview
if config.get("is_preview", False):
return {"data": input_df}

import smtplib
import os
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from email import encoders

sender_email = config.get("sender_email", "")
secret_scope = config.get("secret_scope", "")
secret_key = config.get("secret_key", "")
recipients_raw = config.get("recipients", "")
subject = config.get("subject", "")
body = config.get("body", "")

if not sender_email:
raise ValueError("Sender Email is required.")
if not secret_scope or not secret_key:
raise ValueError("Secret Scope and Secret Key are required.")
if not recipients_raw:
raise ValueError("At least one recipient is required.")

recipients = [r.strip() for r in recipients_raw.split(",") if r.strip()]
if not recipients:
raise ValueError("At least one valid recipient email is required.")

# Retrieve password from Databricks secrets
from pyspark.dbutils import DBUtils
dbutils = DBUtils(spark)
sender_password = dbutils.secrets.get(scope=secret_scope, key=secret_key)

# Convert DataFrame to CSV
pdf = input_df.toPandas()
file_path = "/tmp/designer_email_attachment.csv"
pdf.to_csv(file_path, index=False)

# Send email to each recipient
for recipient in recipients:
msg = MIMEMultipart()
msg["From"] = sender_email
msg["To"] = recipient
msg["Subject"] = subject
msg.attach(MIMEText(body, "plain"))

with open(file_path, "rb") as attachment:
part = MIMEBase("application", "octet-stream")
part.set_payload(attachment.read())
encoders.encode_base64(part)
part.add_header(
"Content-Disposition",
f"attachment; filename={os.path.basename(file_path)}",
)
msg.attach(part)

with smtplib.SMTP_SSL("smtp.gmail.com", 465) as server:
server.login(sender_email, sender_password)
server.send_message(msg)

# Clean up temp file
if os.path.exists(file_path):
os.remove(file_path)

return {"data": input_df}

o passo 3: Teste a função

Teste a função com um DataFrame de exemplo:

Python
test_df = spark.createDataFrame(
[("Alice", 100), ("Bob", 200)],
["name", "amount"]
)

# Test in preview mode (no email sent)
result = run(
config={
"is_preview": True,
"sender_email": "you@gmail.com",
"secret_scope": "my_email_scope",
"secret_key": "gmail_app_password",
"recipients": "alice@example.com",
"subject": "Test",
"body": "Test body"
},
inputs={"data": test_df},
spark=spark
)

result["data"].show()
# Expected: the original DataFrame, unchanged
nota

Os valores secret_scope e secret_key na configuração são os nomes do Escopo Secreto e key que você criou na etapa 1 -- não a senha real. O operador usa esses nomes para recuperar a senha dos segredos do Databricks em tempo de execução.

importante

Teste primeiro com is_preview definido como True para verificar o comportamento de passagem sem enviar nenhum email. Quando você estiver pronto para testar o email real, defina is_preview para False.

o passo 4: Construir a definição YAML

Crie um arquivo chamado gmail_email_sender.yaml com o seguinte conteúdo:

YAML
schema: user-defined-operator-v0.1.0
id: gmail_email_sender
type: python-run-function
version: '1.0.0'
name: Gmail Email Sender
description: Sends the input DataFrame as a CSV attachment via Gmail SMTP to one or more recipients.

config:
type: object
properties:
is_preview:
type: boolean
format: is_preview
default: false
sender_email:
type: string
title: Sender Email
default: ''
examples:
- 'you@gmail.com'
x-ui:
widget: input
secret_scope:
type: string
title: Secret Scope
default: ''
examples:
- 'my_email_scope'
x-ui:
widget: input
secret_key:
type: string
title: Secret Key
default: ''
examples:
- 'gmail_app_password'
x-ui:
widget: input
recipients:
type: string
title: Recipients
default: ''
examples:
- 'alice@example.com, bob@example.com'
x-ui:
widget: textarea
rows: 2
subject:
type: string
title: Subject
default: ''
examples:
- 'Designer Output Data'
x-ui:
widget: input
body:
type: string
title: Email Body
default: "Hello,\n\nAttached is the latest data.\n\nBest,\nDatabricks Workflow"
x-ui:
widget: textarea
rows: 6
required:
- sender_email
- secret_scope
- secret_key
- recipients
- subject
additionalProperties: false

ports:
input:
- name: data
title: Input Data
mime: application/vnd.databricks.dataframe
output:
- name: data
title: Output Data
mime: application/vnd.databricks.dataframe

run_function:
type: inline
code: |
from typing import Dict, Any

def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
input_df = inputs["data"]

if config.get("is_preview", False):
return {"data": input_df}

import smtplib
import os
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from email import encoders

sender_email = config.get("sender_email", "")
secret_scope = config.get("secret_scope", "")
secret_key = config.get("secret_key", "")
recipients_raw = config.get("recipients", "")
subject = config.get("subject", "")
body = config.get("body", "")

if not sender_email:
raise ValueError("Sender Email is required.")
if not secret_scope or not secret_key:
raise ValueError("Secret Scope and Secret Key are required.")
if not recipients_raw:
raise ValueError("At least one recipient is required.")

recipients = [r.strip() for r in recipients_raw.split(",") if r.strip()]
if not recipients:
raise ValueError("At least one valid recipient email is required.")

from pyspark.dbutils import DBUtils
dbutils = DBUtils(spark)
sender_password = dbutils.secrets.get(scope=secret_scope, key=secret_key)

pdf = input_df.toPandas()
file_path = "/tmp/designer_email_attachment.csv"
pdf.to_csv(file_path, index=False)

for recipient in recipients:
msg = MIMEMultipart()
msg["From"] = sender_email
msg["To"] = recipient
msg["Subject"] = subject
msg.attach(MIMEText(body, "plain"))

with open(file_path, "rb") as attachment:
part = MIMEBase("application", "octet-stream")
part.set_payload(attachment.read())
encoders.encode_base64(part)
part.add_header(
"Content-Disposition",
f"attachment; filename={os.path.basename(file_path)}",
)
msg.attach(part)

with smtplib.SMTP_SSL("smtp.gmail.com", 465) as server:
server.login(sender_email, sender_password)
server.send_message(msg)

if os.path.exists(file_path):
os.remove(file_path)

return {"data": input_df}

o passo 5: Salvar e registrar a operadora

  1. Salve o arquivo YAML em seu workspace Databricks . Por exemplo:

    /Workspace/Users/<user-name>/gmail_email_sender.yaml
  2. Adicione o operador ao seu arquivo .user_defined_operators.yaml :

    YAML
    operators:
    - /Workspace/Users/<user-name>/gmail_email_sender.yaml

Para obter mais informações sobre as opções de registro, consulte Torne sua operadora detectável.

Permissões

Usuários que executam um fluxo de trabalho contendo este operador precisam de acesso READ ao Escopo Secreto, ou podem fornecer seu próprio Escopo Secreto e valores key na configuração do operador. Os usuários também precisam de acesso de leitura ao arquivo YAML no workspace.

Para conceder acesso ao Escopo Secreto:

Bash
databricks secrets put-acl my_email_scope <user-or-group> READ

Utilizando o operador no Lakeflow Designer

Após o cadastro, o operador aparece no Lakeflow Designer com uma porta de entrada para sua fonte de dados e campos de configuração para email do remetente, Escopo Secreto, chave secreta, destinatários, assunto e corpo da mensagem.

Quando o fluxo de trabalho é executado, o operador converte o DataFrame de entrada em CSV, anexa-o a um email e o envia para cada destinatário. O DataFrame passa sem alterações para a porta de saída, permitindo que você encadeie operadores adicionais posteriormente. Durante a visualização do fluxo de trabalho, nenhum email é enviado.