メインコンテンツまでスキップ

イベントフックを使用したDLTパイプラインのカスタムモニタリングの定義

備考

プレビュー

イベント フックのサポートは パブリック プレビュー段階です。

イベント フック を使用して、イベントが DLT パイプラインのイベント ログに永続化されるときに実行されるカスタム Python コールバック関数を追加できます。イベント フックを使用して、カスタム モニタリングおよびアラート ソリューションを実装できます。 たとえば、イベント フックを使用して、特定のイベントが発生したときに Eメール を送信したり、ログに書き込んだり、サードパーティのソリューションと統合してパイプライン イベントを監視したりできます。

イベントフックは、単一の引数を受け入れるPython関数を使用して定義します。引数はイベントを表すディクショナリです。 その後、イベント フックをパイプラインのソース コードの一部として含めます。 パイプラインで定義されているイベント フックは、各パイプラインの更新中に生成されたすべてのイベントの処理を試みます。 パイプラインが複数のソース コード アーティファクト (複数のノートブックなど) で構成されている場合、定義されたイベント フックはパイプライン全体に適用されます。 イベント フックはパイプラインのソース コードに含まれていますが、パイプライン グラフには含まれていません。

イベント フックは、 Hive metastore または Unity Catalogに発行するパイプラインで使用できます。

注記
  • Python は、イベントフックの定義でサポートされている唯一の言語です。 SQL インターフェイスを使用して実装されたパイプライン内のイベントを処理するカスタム Python 関数を定義するには、パイプラインの一部として実行される別の Python ノートブックにカスタム関数を追加します。 Python 関数は、パイプラインの実行時にパイプライン全体に適用されます。
  • イベントフックは、 maturity_levelSTABLEされているイベントに対してのみトリガーされます。
  • イベント フックは、パイプラインの更新から非同期的に実行されますが、他のイベント フックと同期的に実行されます。 つまり、一度に実行されるイベント フックは 1 つだけで、他のイベント フックは現在実行中のイベント フックが完了するまで実行を待機します。 イベント フックが無期限に実行されると、他のすべてのイベント フックがブロックされます。
  • DLT は、パイプラインの更新中に出力されるすべてのイベントに対して各イベント フックの実行を試みます。遅延しているイベント フックがキューに登録されたすべてのイベントを処理する時間を確保するために、DLT は構成不可能な固定期間を待機してから、パイプラインを実行しているコンピュートを終了します。 ただし、コンピュートが終了する前に、すべてのイベントですべてのフックがトリガーされる保証はありません。

イベント・フック処理の監視

DLT イベント ログの hook_progress イベントの種類を使用して、更新プログラムのイベント フックの状態を監視します。依存関係の循環を防ぐために、 hook_progress イベントに対してイベント フックはトリガーされません。

イベントフックを定義する

イベントフックを定義するには、 on_event_hook デコレータを使用します。

Python
@dlt.on_event_hook(max_allowable_consecutive_failures=None)
def user_event_hook(event):
# Python code defining the event hook

この max_allowable_consecutive_failures は、イベント フックが無効になるまでに連続して失敗する最大回数を示します。 イベント フックの失敗は、イベント フックが例外をスローする任意の時間として定義されます。 イベント フックが無効になっている場合、パイプラインが再起動されるまで、イベント フックは新しいイベントを処理しません。

max_allowable_consecutive_failures 0 または None以上の整数である必要があります。値 None (デフォルトで割り当て) は、イベント フックに許可される連続する失敗の数に制限がなく、イベント フックが無効にならないことを意味します。

イベント フックの失敗とイベント フックの無効化は、イベント ログで hook_progress イベントとして監視できます。

イベントフック関数は、このイベントフックをトリガーしたイベントのディクショナリ表現である 1 つのパラメーターのみを受け入れる Python 関数である必要があります。 イベント フック関数からの戻り値はすべて無視されます。

例: 処理する特定のイベントを選択する

次の例は、処理する特定のイベントを選択するイベント フックを示しています。 具体的には、この例では、パイプライン STOPPING イベントが受信されるまで待機し、ドライバー ログ stdoutにメッセージを出力します。

Python
@on_event_hook
def my_event_hook(event):
if (
event['event_type'] == 'update_progress' and
event['details']['update_progress']['state'] == 'STOPPING'
):
print('Received notification that update is stopping: ', event)

例:すべてのイベントをSlackチャンネルに送信する

次の例では、受信したすべてのイベントを Slack チャンネルに送信するイベント フックを Slack APIで実装しています。

この例では、Databricks シークレット を使用して、Slack API への認証に必要なトークンを安全に格納します。

Python
from dlt import on_event_hook
import requests

# Get a Slack API token from a Databricks secret scope.
API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")

@on_event_hook
def write_events_to_slack(event):
res = requests.post(
url='https://slack.com/api/chat.postMessage',
headers={
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + API_TOKEN,
},
json={
'channel': '<channel-id>',
'text': 'Received event:\n' + event,
}
)

例: 4 回連続して失敗した後に無効にするようにイベント フックを構成する

次の例は、4 回連続して失敗した場合に無効になるイベント フックを構成する方法を示しています。

Python
from dlt import on_event_hook
import random

def run_failing_operation():
raise Exception('Operation has failed')

# Allow up to 3 consecutive failures. After a 4th consecutive
# failure, this hook is disabled.
@on_event_hook(max_allowable_consecutive_failures=3)
def non_reliable_event_hook(event):
run_failing_operation()

例: イベント フックを持つ DLT パイプライン

次の例は、パイプラインのソース コードにイベント フックを追加する方法を示しています。 これは、パイプラインでイベント フックを使用する単純ですが完全な例です。

Python
from dlt import table, on_event_hook, read
import requests
import json
import time

API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")
SLACK_POST_MESSAGE_URL = 'https://slack.com/api/chat.postMessage'
DEV_CHANNEL = 'CHANNEL'
SLACK_HTTPS_HEADER_COMMON = {
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + API_TOKEN
}

# Create a single dataset.
@table
def test_dataset():
return spark.range(5)

# Definition of event hook to send events to a Slack channel.
@on_event_hook
def write_events_to_slack(event):
res = requests.post(url=SLACK_POST_MESSAGE_URL, headers=SLACK_HTTPS_HEADER_COMMON, json = {
'channel': DEV_CHANNEL,
'text': 'Event hook triggered by event: ' + event['event_type'] + ' event.'
})