こんにちは。AI事業本部の協業リテールメディアdivの河野です。

チームでは機械学習システムのバックエンドにVertex AIを使用しており、MLパイプラインをVertex AI Pipeliens上で構築しています。このとき、パイプラインが正常に終了したか、途中でエラーが起きているかをいち早く知れるようにSlack通知を飛ばす機能を作成したので、その方法を紹介しようと思います。

Vertex AI Pipelinesにはパイプラインジョブが終了したときに特定のタスクを実行する方法が存在するのですが、終了状態の情報のやり取りについてドキュメントで書かれていない部分が多かったのでブログに書き残すことにしました。

チーム内でのMLOpsについて

私たちのチームではユーザーの購買予測に機械学習を活用しています。購買予測について説明すると、小売企業のPOSデータを使用し、あるユーザーが次の1ヶ月の間に特定の商品カテゴリについて購買するかどうかを予測しています。

この予測は毎月1日に実行するようにしており、その都度モデルを学習させ新しく作り直しています。また、予測値は商品カテゴリごとに作成しており10数個のMLパイプラインを並列実行させています。

1つのMLパイプラインは完了するまでに約5時間半ほどかかり、パイプラインが成功するまでを監視し続けるのはコストがかかっていました。

そこで、終了状態を通知するようにし、成功したかもしくはエラーで落ちたかをコストをかけずに認知できるようにする必要がありました。

ExitHandlerを使用してSlack通知を行う方法

Vertex AI Pipelinesにおいて実行の終了状態を通知するには、Kubeflow SDK API[1]の ExitHandler を使用することで簡単に利用できるので便利になっています。

ちなみになぜSlack通知にしたのかについてはチームの通知情報がSlackに集約されているためです。ただ、Vertex AI Pipelinesの通知をEmailで行うためのコンポーネントをGoogleが用意しているので、Slackじゃなくていいという方はそちらを参考にするのも良いかと思います。メール通知については最後におまけの章で紹介しています。

ExitHandlerの使い方

ExitHandlerは以下のように使用できます。ExitHandlerの引数に指定しているslack_notificationは自作したコンポーネントで、詳細については後ほど説明します。

from kfp import dsl

slack_notification_task = slack_notification(
    project_id=PROJECT_ID,
    model_name=model,
    category=category,
    version=version,
    alert_slack_channel_id=ALERT_SLACK_CHANNEL_ID,
    notification_slack_channel_id=NOTIFICATION_SLACK_CHANNEL_ID,
)
with dsl.ExitHandler(exit_task=slack_notification_task):
		# Pipelineのタスク定義

Vertex AI Pipelinesのタスク定義をwith文の中に入れてあげることで使用できます。

これにより、パイプラインジョブが終了したときに、 ExitHandler のexit_taskに指定したタスクが実行されるようになります。

タスクとはVertex AI Pipelines(kubeflow pipelines)のパイプラインを構成するコンポーネントを定義したものです。Vertex AI Pipelinesではこのコンポーネントをつなぎこむことで1つのパイプラインを作成します。

上記のコードのように実装すると、GUI上のグラフでは画像のように表示されます。パイプラインジョブが終了したとき、slack-notificationというタスクが実行されていることが見て取れます。

exithanler_ui

ExitHandlerで実行するタスク

exit_taskに指定している slack_notification_task は以下のように書かれたslack-notificationコンポーネントによってタスク定義されています。

@dsl.component(
    packages_to_install=["slack_sdk==3.27.1", "google-cloud-secret-manager==2.20.0"],
    base_image="python:3.11",
)
def slack_notification(
    project_id: str,
    model_name: str,
    category: str,
    version: str,
    alert_slack_channel_id: str,
    notification_slack_channel_id: str,
    status: dsl.PipelineTaskFinalStatus,
) -> None:
    from google.cloud import secretmanager
    from slack_sdk.errors import SlackApiError
    from slack_sdk.web import WebClient

    def get_secret_from_secret_manager(
        secret_id: str,
        project_id: str,
        version: str,
    ) -> str:
        secret_client = secretmanager.SecretManagerServiceClient()
        name = f"projects/{project_id}/secrets/{secret_id}/versions/{version}"
        response = secret_client.access_secret_version(request={"name": name})
        password = response.payload.data.decode("UTF-8")
        return password

    def get_experiment_url(
        project_id: str, model_name: str, category: str, version: str
    ) -> str:
        run_name = (
            f"train-sales-forecast-pipeline-{model_name}-{category}-{version}".replace(
                "_", "-"
            )
        )
        return f"<https://console.cloud.google.com/vertex-ai/experiments/locations/asia-northeast1/experiments/{run_name}/runs&project={project_id}>"

    pipeline_job_name = status.pipeline_job_resource_name.split("/")[-1]
    pipeline_final_status = status.state
    error_message = status.error_message
    if error_message is None:
        slack_channel_id = notification_slack_channel_id
        experiment_url = get_experiment_url(project_id, model_name, category, version)
        main_message = f"✅カテゴリ予測({category})のMLパイプラインが完了しました✅\\n<{experiment_url}|*学習結果はこちら*>"
    else:
        slack_channel_id = alert_slack_channel_id
        main_message = f"⚠カテゴリ予測({category})のMLパイプラインが失敗しました⚠"
    slack_api_token = get_secret_from_secret_manager(
        secret_id="slack_vertex_ai_notification_bot_user_oauth_token",
        project_id=project_id,
        version="latest",
    )
    slack_client = WebClient(token=slack_api_token)
    
    # 以下Slack通知用のメッセージ

上から順に実装について解説します。

@dsl.component の使用

Kubeflow pipelines(以下kfp)ではコンポーネントを定義する方法がいくつかあります。ドキュメント[3]を見ると以下の2つの方法が推奨されているようです。

  • @dsl.component デコレータを使用しPython関数で定義する方法
  • @dsl.container_component デコレータを使用し、返り値に dsl.ContainerSpec でコンテナを指定する方法

他にも以前推奨されていたyamlファイルでコンポーネントを定義する方法もあります。

保持しているデータ 説明
state タスクの状態(SUCCEEDED, FAILED, CANCELLED)
pipeline_job_resource_name パイプラインジョブのリソース名
pipeline_task_name 最後に実行されたタスク名
error_code エラーコード(使うことはないかも)
error_message エラーメッセージ

パイプラインの終了状態をSlackメッセージに乗せて通知するために必ず必要になるものなので、今回の実装の中心を担っていました。

実装の中身

関数の実装自体については、ステータス情報によってSlackチャンネルとメッセージ内容を変更しています。Slack通知自体は特に工夫をしていないですが、パイプラインの終了状態がわかりやすいようにすることと、対象のパイプラインへ1クリックで飛べるようにURLを乗せるようにしてあります。

実際のメッセージは以下のようにしました。

slack_notify

実装の肝

Vertex AI Pipelinesの終了状態をSlack通知させるのに肝心だったのは、 dsl.PipelineTaskFinalStatus を使用することと、そのためのVertex AIの内部の仕様を理解することでした。

flow_engineering

今回は割り切ってPythonの関数で定義して利用する方法を採用しましたが、コンポーネントのコードが分散すること、関数へのベタ書きで可読性が落ちること、など設計面であまり嬉しくないことが起きてしまうため、今後のアップデートでコンテナでもステータス情報を取得できるようになると個人的には嬉しいなと思っています。

【おまけ】VertexNotificationEmailOpを利用して、メール通知を行う方法

Google CloudはVertex AI Pipelinesの通知方法をメール通知に限り事前に用意してくれています。こちらのドキュメントを読むと、Google Cloud Pipeline Components[2]にてメール通知用の事前定義されたコンポーネント( VertexNotificationEmailOp )を使って簡単にメール通知ができる方法が記述されています。

こちらの VertexNotificationEmailOp を使用するメリットはコンポーネントの実装を必要とせず数行で書けることだと思います。ドキュメントでは以下のようなコードが紹介されています。

from kfp import dsl
from kfp import compiler
from google_cloud_pipeline_components.v1.vertex_notification_email import VertexNotificationEmailOp

@dsl.pipeline(
    name="PIPELINE_NAME",
    pipeline_root=PIPELINE_ROOT_PATH,
)
def notification_email_pipeline():
    notify_email_task = VertexNotificationEmailOp(recipients=RECIPIENTS_LIST)

    with dsl.ExitHandler(notify_email_task):
        # Add your pipeline tasks here.

compiler.Compiler().compile(pipeline_func=notification_email_pipeline,
        package_path="notification_email_pipeline.yaml")

notify_email_task = VertexNotificationEmailOp(recipients=RECIPIENTS_LIST) の1行でタスクの定義ができるようになっており、簡単に記述できることがわかります。

デメリットとしては、通知内容を柔軟に変更できないことと、通知を受け取るメールリスト( RECIPENTS_LIST )が最大3つしか指定できないことです。

通知内容は以下のようなメールが届きます。

件名: Vertex Pipelines ジョブ "PIPELINE_NAME" タスク "TASK_NAME"
From: Google 通知 <notify-noreply@google.com>

Vertex AI をご利用のお客様

Vertex Pipelines ジョブ "PIPELINE_NAME" タスク "TASK_NAME" は、ステータス {status} で終了しました。

追加情報:
- プロジェクト: {project}
- パイプライン名: PIPELINE_NAME
- パイプライン ジョブ ID: {pipeline_job_id}
- 開始時間: {start_time}

Cloud コンソールでこのパイプライン ジョブを表示するには、こちらのリンク {console_link} をお使いください。

よろしくお願いいたします。
Google Cloud AI チーム

パイプラインジョブのメタデータ情報と、対象のパイプラインジョブのリンクがメールに載せられるようです。

参考

 

 

現在、協業リテールメディアdivでは積極的に採用活動を行っております。以下のフォームからカジュアル面談を申し込めますので興味のある方はお気軽にお問い合わせください。

ご応募お待ちしております!

アバター画像
2023年新卒入社のデータサイエンティストです。現在はAI事業本部のリテールメディア事業にて機械学習活用の推進などをしています。