イベントフックを使用してDelta Live Tablesパイプラインのカスタム モニタリングを定義する

プレビュー

イベント フックのサポートは パブリック プレビュー段階です。

イベント フックを使用して、 イベント が Delta Live Tables パイプラインの イベント ログに永続化されるときに実行されるカスタム Python コールバック関数を追加できます。 イベントフックを使用して、カスタムのモニタリングとアラートのソリューションを実装できます。 たとえば、イベント フックを使用して、特定のイベントが発生したときに電子メールを送信したり、ログに書き込んだり、サードパーティのソリューションと統合してパイプライン イベントを監視したりできます。

イベントフックは、単一の引数を受け入れる Python 関数で定義し、引数はイベントを表すディクショナリです。 次に、イベントフックをパイプラインのソースコードの一部として含めます。 パイプラインで定義されたイベントフックは、各パイプラインの更新中に生成されたすべてのイベントの処理を試みます。 パイプラインが複数のソース コード成果物 (複数のノートブックなど) で構成されている場合、定義済みのイベント フックがパイプライン全体に適用されます。 イベントフックはパイプラインのソースコードに含まれていますが、パイプライングラフには含まれません。

イベントフックは、 Hive metastore または Unity Catalogに発行するパイプラインで使用できます。

注:

  • Python は、イベントフックの定義でサポートされている唯一の言語です。 SQL インターフェイスを使用して実装されたパイプライン内のイベントを処理するカスタム Python 関数を定義するには、パイプラインの一部として実行される別の Python ノートブックにカスタム関数を追加します。 Python 関数は、パイプラインの実行時にパイプライン全体に適用されます。

  • イベントフックは、 maturity_levelSTABLEのイベントに対してのみトリガーされます。

  • イベントフックは、パイプラインの更新から非同期的に実行されますが、他のイベントフックとは同期的に実行されます。 つまり、一度に実行されるイベントフックは 1 つだけで、他のイベントフックは現在実行中のイベントフックが完了するまで実行を待機します。 イベントフックが無期限に実行されると、他のすべてのイベントフックがブロックされます。

  • Delta Live Tables は、パイプラインの更新中に生成されたすべてのイベントに対して各イベント フックの実行を試みます。 遅行イベント フックがキューに置かれたすべてのイベントを処理する時間を確保するために、Delta Live Tables は、パイプラインを実行しているコンピュートを終了する前に、構成不可能な固定期間待機します。 ただし、コンピュートが終了する前に、すべてのイベントですべてのフックがトリガーされることは保証されません。

イベントフック処理の監視

Delta Live Tables イベント ログの hook_progress イベントの種類を使用して、更新プログラムのイベント フックの状態を監視します。 循環依存関係を防ぐために、 hook_progress イベントに対してイベントフックはトリガーされません。

イベントフックを定義する

イベントフックを定義するには、 on_event_hook デコレータを使用します。

@dlt.on_event_hook(max_allowable_consecutive_failures=None)
def user_event_hook(event):
  # Python code defining the event hook

この max_allowable_consecutive_failures は、イベントフックが無効になるまでに失敗できる最大回数を示します。 イベントフックの失敗は、イベントフックが例外をスローするたびに定義されます。 イベントフックが無効になっている場合、パイプラインが再起動されるまで新しいイベントは処理されません。

max_allowable_consecutive_failures0 または None以上の整数でなければなりません。 値 None (デフォルトで割り当てられる) は、イベントフックに許可される連続した失敗の数に制限がなく、イベントフックが無効にならないことを意味します。

イベント フックの失敗とイベント フックの無効化は、 hook_progress イベントとしてイベント ログで監視できます。

イベントフック関数は、このイベントフックをトリガーしたイベントのディクショナリ表現である 1 つのパラメーターのみを受け取る Python 関数である必要があります。 イベントフック関数からの戻り値は無視されます。

例: 処理する特定のイベントを選択する

次の例は、処理する特定のイベントを選択するイベント フックを示しています。 具体的には、この例では、パイプライン STOPPING イベントが受信されるまで待機し、ドライバー ログ stdoutにメッセージを出力します。

@on_event_hook
def my_event_hook(event):
  if (
    event['event_type'] == 'update_progress' and
    event['details']['update_progress']['state'] == 'STOPPING'
  ):
    print('Received notification that update is stopping: ', event)

例: すべてのイベントを Slack チャンネルに送信する

次の例では、受信したすべてのイベントを Slack API を使用して Slack チャンネルに送信するイベントフックを実装します。

この例では、Databricks シークレット を使用して、Slack API への認証に必要なトークンを安全に格納します。

from dlt import on_event_hook
import requests

# Get a Slack API token from a Databricks secret scope.
API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")

@on_event_hook
def write_events_to_slack(event):
  res = requests.post(
    url='https://slack.com/api/chat.postMessage',
    headers={
      'Content-Type': 'application/json',
      'Authorization': 'Bearer ' + API_TOKEN,
    },
    json={
      'channel': '<channel-id>',
      'text': 'Received event:\n' + event,
    }
  )

例: 4 回連続して失敗した後に無効にするようにイベントフックを設定する

次の例は、4 回連続して失敗した場合に無効になるイベント フックを構成する方法を示しています。

from dlt import on_event_hook
import random

def run_failing_operation():
   raise Exception('Operation has failed')


# Allow up to 3 consecutive failures. After a 4th consecutive
# failure, this hook is disabled.
@on_event_hook(max_allowable_consecutive_failures=3)
def non_reliable_event_hook(event):
  run_failing_operation()

例:Delta Live Tables イベントフック を持つ パイプライン

次の例は、パイプラインのソース コードにイベント フックを追加する方法を示しています。 これは、パイプラインでイベントフックを使用するシンプルですが完全な例です。

from dlt import table, on_event_hook, read
import requests
import json
import time

API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")
SLACK_POST_MESSAGE_URL = 'https://slack.com/api/chat.postMessage'
DEV_CHANNEL = 'CHANNEL'
SLACK_HTTPS_HEADER_COMMON = {
 'Content-Type': 'application/json',
 'Authorization': 'Bearer ' + API_TOKEN
}

# Create a single dataset.
@table
def test_dataset():
 return spark.range(5)


# Definition of event hook to send events to a Slack channel.
@on_event_hook
def write_events_to_slack(event):
  res = requests.post(url=SLACK_POST_MESSAGE_URL, headers=SLACK_HTTPS_HEADER_COMMON, json = {
    'channel': DEV_CHANNEL,
    'text': 'Event hook triggered by event: ' + event['event_type'] + ' event.'
  })