Pular para o conteúdo principal

Definir o monitoramento personalizado do pipeline DLT com ganchos de eventos

info

Visualização

O suporte para ganchos de eventos está na versão prévia pública.

O senhor pode usar ganchos de eventos para adicionar Python funções de retorno de chamada personalizadas que são executadas quando os eventos são persistidos em um pipeline evento log da DLT. O senhor pode usar ganchos de eventos para implementar soluções personalizadas de monitoramento e alerta. Por exemplo, o senhor pode usar ganchos de eventos para enviar e-mails ou gravar em log quando ocorrerem eventos específicos ou para integrar-se a soluções de terceiros para monitorar eventos de pipeline.

O senhor define um gancho de evento com uma função Python que aceita um único argumento, em que o argumento é um dicionário que representa um evento. Em seguida, o senhor inclui os ganchos de evento como parte do código-fonte de um pipeline. Todos os ganchos de eventos definidos em um pipeline tentarão processar todos os eventos gerados durante cada atualização do pipeline. Se o seu pipeline for composto de vários artefatos de código-fonte, por exemplo, vários Notebooks, todos os ganchos de eventos definidos serão aplicados a todo o pipeline. Embora os ganchos de eventos estejam incluídos no código-fonte do seu pipeline, eles não estão incluídos no gráfico do pipeline.

O senhor pode usar ganchos de eventos com pipelines que são publicados em Hive metastore ou Unity Catalog.

nota
  • Python é a única linguagem compatível com a definição de ganchos de eventos. Para definir funções personalizadas do Python que processam eventos em um pipeline implementado usando a interface SQL, adicione as funções personalizadas em um Python Notebook separado que é executado como parte do pipeline. As funções do Python são aplicadas a todo o pipeline quando o pipeline é executado.
  • Os ganchos de eventos são acionados somente para eventos em que o maturity_level é STABLE.
  • Os hooks de eventos são executados de forma assíncrona em relação às atualizações do pipeline, mas de forma síncrona com outros hooks de eventos. Isso significa que apenas um único gancho de evento é executado por vez, e outros ganchos de evento aguardam a execução até que o gancho de evento em execução no momento seja concluído. Se um gancho de evento for executado indefinidamente, ele bloqueará todos os outros ganchos de evento.
  • A DLT tenta executar cada gancho de evento em cada evento emitido durante uma atualização do site pipeline. Para ajudar a garantir que os ganchos de eventos atrasados tenham tempo para processar todos os eventos em fila, a DLT aguarda um período fixo não configurável antes de encerrar o compute executando o pipeline. No entanto, não é garantido que todos os hooks sejam acionados em todos os eventos antes que o site compute seja encerrado.

Monitore o processamento do gancho de eventos

Use o tipo de evento hook_progress no log de eventos da DLT para monitorar o estado dos ganchos de eventos de uma atualização. Para evitar dependências circulares, os ganchos de eventos não são acionados para eventos hook_progress.

Defina um gancho de eventos

Para definir um gancho de eventos, use o decorador on_event_hook:

Python
@dlt.on_event_hook(max_allowable_consecutive_failures=None)
def user_event_hook(event):
# Python code defining the event hook

O max_allowable_consecutive_failures descreve o número máximo de vezes consecutivas em que um gancho de eventos pode falhar antes de ser desativado. Uma falha no gancho de eventos é definida como sempre que o gancho de eventos lança uma exceção. Se um gancho de evento estiver desativado, ele não processará novos eventos até que o pipeline seja reiniciado.

max_allowable_consecutive_failures deve ser um número inteiro maior ou igual a 0 ou None. Um valor de None (atribuído por default) significa que não há limite para o número de falhas consecutivas permitidas para o gancho de eventos, e o gancho de eventos nunca é desativado.

As falhas e a desativação dos ganchos de eventos podem ser monitoradas no log de eventos como eventos hook_progress.

A função de gancho de evento deve ser uma função Python que aceita exatamente um parâmetro, uma representação de dicionário do evento que acionou esse gancho de evento. Qualquer valor de retorno da função de gancho de eventos é ignorado.

Exemplo: Selecione eventos específicos para processamento

O exemplo a seguir demonstra um gancho de eventos que seleciona eventos específicos para processamento. Especificamente, esse exemplo aguarda até que os eventos do pipeline STOPPING sejam recebidos e, em seguida, envia uma mensagem para os logs de driver stdout.

Python
@on_event_hook
def my_event_hook(event):
if (
event['event_type'] == 'update_progress' and
event['details']['update_progress']['state'] == 'STOPPING'
):
print('Received notification that update is stopping: ', event)

Exemplo: Enviar todos os eventos para um canal do Slack

O exemplo a seguir implementa um gancho de evento que envia todos os eventos recebidos para um canal do Slack usando a API do Slack.

Este exemplo usa um Databricks segredo para armazenar com segurança os tokens necessários para autenticar no API Slack.

Python
from dlt import on_event_hook
import requests

# Get a Slack API token from a Databricks secret scope.
API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")

@on_event_hook
def write_events_to_slack(event):
res = requests.post(
url='https://slack.com/api/chat.postMessage',
headers={
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + API_TOKEN,
},
json={
'channel': '<channel-id>',
'text': 'Received event:\n' + event,
}
)

Exemplo: configurar um gancho de eventos para desativar após quatro falhas consecutivas

O exemplo a seguir demonstra como configurar um gancho de eventos que é desativado se ele falhar consecutivamente quatro vezes.

Python
from dlt import on_event_hook
import random

def run_failing_operation():
raise Exception('Operation has failed')

# Allow up to 3 consecutive failures. After a 4th consecutive
# failure, this hook is disabled.
@on_event_hook(max_allowable_consecutive_failures=3)
def non_reliable_event_hook(event):
run_failing_operation()

Exemplo: Um pipeline DLT com um gancho de evento

O exemplo a seguir demonstra como adicionar um gancho de evento ao código-fonte de um pipeline. Este é um exemplo simples, mas completo, do uso de ganchos de eventos com um pipeline.

Python
from dlt import table, on_event_hook, read
import requests
import json
import time

API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")
SLACK_POST_MESSAGE_URL = 'https://slack.com/api/chat.postMessage'
DEV_CHANNEL = 'CHANNEL'
SLACK_HTTPS_HEADER_COMMON = {
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + API_TOKEN
}

# Create a single dataset.
@table
def test_dataset():
return spark.range(5)

# Definition of event hook to send events to a Slack channel.
@on_event_hook
def write_events_to_slack(event):
res = requests.post(url=SLACK_POST_MESSAGE_URL, headers=SLACK_HTTPS_HEADER_COMMON, json = {
'channel': DEV_CHANNEL,
'text': 'Event hook triggered by event: ' + event['event_type'] + ' event.'
})