イベント フックを使用して Lakeflow 宣言型パイプラインのカスタム モニタリングを定義する
プレビュー
イベント フックのサポートはパブリック プレビュー段階です。
イベント フック を使用すると、イベントがパイプラインのイベント ログに保持されるときに実行されるカスタム Python コールバック関数を追加できます。イベント フックを使用して、カスタムのモニタリングおよびアラート ソリューションを実装できます。 たとえば、イベント フックを使用して、特定のイベントが発生したときに電子メールを送信したり、ログに書き込んだり、サードパーティのソリューションと統合してパイプライン イベントを監視したりできます。
単一の引数を受け入れる Python 関数を使用してイベント フックを定義します。引数は、イベントを表す辞書です。次に、イベント フックをパイプラインのソース コードの一部として含めます。パイプラインで定義されたイベント フックは、パイプラインの更新ごとに生成されたすべてのイベントを処理しようとします。パイプラインが複数のソース コード ファイルで構成されている場合、定義されたイベント フックはパイプライン全体に適用されます。イベント フックはパイプラインのソース コードに含まれていますが、パイプライン グラフには含まれていません。
Hive metastoreまたはUnity Catalogに公開するパイプラインでイベント フックを使用できます。
- イベント フックの定義にサポートされている言語は Python のみです。SQL インターフェースを使用して実装されたパイプライン内のイベントを処理するカスタム Python 関数を定義するには、パイプラインの一部として実行される別の Python ソース ファイルにカスタム関数を追加します。パイプラインの実行時に、Python 関数がパイプライン全体に適用されます。
- イベントフックは、 maturity_level が
STABLE
であるイベントに対してのみトリガーされます。 - イベント フックはパイプラインの更新とは非同期に実行されますが、他のイベント フックとは同期して実行されます。つまり、一度に実行できるイベント フックは 1 つだけで、現在実行中のイベント フックが完了するまで他のイベント フックは実行を待機します。 イベント フックが無期限に実行されると、他のすべてのイベント フックがブロックされます。
- Lakeflow 宣言型パイプラインは、パイプラインの更新中に出力されるすべてのイベントに対して、各イベント フックの実行を試みます。 遅延しているイベント フックがキューに登録されたすべてのイベントを処理する時間を確保するために、Lakeflow宣言型パイプラインはパイプラインを実行しているコンピュートを終了する前に、構成不可能な固定期間を待機します。 ただし、コンピュートが終了する前に、すべてのイベントですべてのフックがトリガーされる保証はありません。
イベントフック処理を監視する
Lakeflow 宣言型パイプライン イベント ログの hook_progress
イベントの種類を使用して、更新のイベント フックの状態を監視します。依存関係の循環を防ぐために、 hook_progress
イベントに対してイベント フックはトリガーされません。
イベントフックを定義する
イベント フックを定義するには、 on_event_hook
デコレータを使用します。
@dp.on_event_hook(max_allowable_consecutive_failures=None)
def user_event_hook(event):
# Python code defining the event hook
max_allowable_consecutive_failures
、イベント フックが無効になるまでに連続して失敗できる最大回数を表します。イベント フックの失敗は、イベント フックが例外をスローしたときと定義されます。イベント フックが無効になっている場合、パイプラインが再起動されるまで新しいイベントは処理されません。
max_allowable_consecutive_failures
0
またはNone
以上の整数である必要があります。値None
(デフォルトで割り当てられる) は、イベント フックに許可される連続失敗の数に制限がなく、イベント フックが無効になることはないことを意味します。
イベント フックの失敗とイベント フックの無効化は、イベント ログでhook_progress
イベントとして監視できます。
イベント フック関数は、このイベント フックをトリガーしたイベントの辞書表現である 構造体 を 1 つだけ受け入れるPython関数である必要があります。 イベント フック関数からの戻り値はすべて無視されます。
例: 処理する特定のイベントを選択する
次の例は、処理する特定のイベントを選択するイベント フックを示しています。具体的には、この例では、パイプラインSTOPPING
イベントが受信されるまで待機し、その後、ドライバー ログstdout
にメッセージを出力します。
@dp.on_event_hook
def my_event_hook(event):
if (
event['event_type'] == 'update_progress' and
event['details']['update_progress']['state'] == 'STOPPING'
):
print('Received notification that update is stopping: ', event)
例: すべてのイベントを Slack チャンネルに送信する
次の例では、Slack APIを使用して、受信したすべてのイベントを Slack チャンネルに送信するイベントフックを実装します。
この例では、Databricksシークレットを使用して、Slack API への認証に必要なトークンを安全に保存します。
from pyspark import pipelines as dp
import requests
# Get a Slack API token from a Databricks secret scope.
API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")
@dp.on_event_hook
def write_events_to_slack(event):
res = requests.post(
url='https://slack.com/api/chat.postMessage',
headers={
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + API_TOKEN,
},
json={
'channel': '<channel-id>',
'text': 'Received event:\n' + event,
}
)
例: 4回連続して失敗した後に無効にするイベントフックを構成する
次の例は、連続して 4 回失敗した場合に無効になるイベント フックを構成する方法を示しています。
from pyspark import pipelines as dp
import random
def run_failing_operation():
raise Exception('Operation has failed')
# Allow up to 3 consecutive failures. After a 4th consecutive
# failure, this hook is disabled.
@dp.on_event_hook(max_allowable_consecutive_failures=3)
def non_reliable_event_hook(event):
run_failing_operation()
例: Lakeflow イベント フックを持つ宣言型パイプライン
次の例は、パイプラインのソース コードにイベント フックを追加する方法を示しています。これは、パイプラインでイベント フックを使用するシンプルだが完全な例です。
from pyspark import pipelines as dp
import requests
import json
import time
API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")
SLACK_POST_MESSAGE_URL = 'https://slack.com/api/chat.postMessage'
DEV_CHANNEL = 'CHANNEL'
SLACK_HTTPS_HEADER_COMMON = {
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + API_TOKEN
}
# Create a single dataset.
@dp.table
def test_dataset():
return spark.range(5)
# Definition of event hook to send events to a Slack channel.
@dp.on_event_hook
def write_events_to_slack(event):
res = requests.post(url=SLACK_POST_MESSAGE_URL, headers=SLACK_HTTPS_HEADER_COMMON, json = {
'channel': DEV_CHANNEL,
'text': 'Event hook triggered by event: ' + event['event_type'] + ' event.'
})