はじめに

こんにちは!
FANTECH 本部 所属の川口です。FANTECH本部 では技術発信を強化中です!

https://developers.cyberagent.co.jp/blog/archives/tag/fantech/

近年、マイクロサービスやサーバーレスでクラウドネイティブなアプリケーションの開発において、イベントドリブンアーキテクチャ(EDA)が注目を集めています。
EDA は、システムの疎結合性や拡張性、耐障害性に優れたアーキテクチャの一つです。

今回は、その EDA を実現するための技術として CloudEvents によるイベントデータの標準化、Google Cloud の Eventarc によるイベント配信の仕組みから、EDA の概要から実際の活用方法までをお話します。

 

イベントドリブンアーキテクチャ(EDA)

イベントドリブンアーキテクチャ(EDA) とは、アプリケーションやサービス間の通信をイベントを使って行うアーキテクチャのことです。

EDAではシステム内で出来事、いわゆる イベント が発生すると、そのイベントがシステム内の別の部分にイベントとして送信され、そのイベントに対して適切な処理が行われます。
このように EDA の中核をなすのはイベントという概念です。

このとき、イベントを発行する側を プロデューサー、イベントを受け取って処理する側を コンシューマー と呼びます。
また、プロデューサーとコンシューマーはイベントチャネルと呼ばれる ブローカー やメッセージングシステムを介して疎結合に連携をします。
このようなアーキテクチャスタイルは、従来のリクエスト・レスポンススタイルとは全く異なるアプローチです。

EDA にはどのようなメリットがあるのか整理します。

  • プロデューサーとコンシューマーが疎結合に連携することで、システム全体の拡張性が高まる。
  • イベント処理が並列で行われるため、複数のイベントが同時に発生しても、それぞれのイベントに対する処理が同時に実行される。これにより、システムの性能や応答性が向上し、スケーラビリティが高まる。
  • 障害の影響範囲がイベントの受け手に限定されるため、耐障害性が高くなる。

一方で、EDA には次のようなデメリットもあります。

  • イベントの順序を保証するのが難しい。
  • システム全体を把握するのが複雑になる。
  • 適切なイベントの粒度を決めるのが難しい。

最初にも述べましたが、ここまででマイクロサービスアーキテクチャなど疎結合で構成された分散システムであれば EDA を適用することで、よりメリットが活かせそうだということがわかるかと思います。

ただし、モノリシックアーキテクチャの場合には活かせないかというとそんなことはありません。
実際に、我々のチームはモノリシックアーキテクチャでアプリケーション開発を行っていますが、EDA の導入もおこなっています。
というのも、モノリシックであっても何かしらの PaaS などの外部サービスを利用していることがほとんどだからです。
このような場合であれば、同様に外部サービスとイベントベースで疎結合に連携することができ、メリットを享受することができます。

具体的には、以下のようなユースケースがあります。

  • Cloud Storage, S3 などのストレージサービスにて画像ファイルが追加されたイベントから、最も古い画像ファイルを削除する。
  • Amazon DynamoDB, Datastore などの DB サービスにてデータが修正されたイベントから他のプロダクトにデータを同期する処理を行う。

CloudEvents

簡単に EDA についてお話をさせていただきました。

このようなイベントドリブンな設計を実現する上で、プラットフォーム横断的なイベントの相互運用性を高めるため、標準化の取り組みが重要になります。それが CloudEvents です。

CloudEventsは、プラットフォーム中立的なイベントデータ構造と転送方式を定義する仕様です。
CloudEventsに準拠したイベントデータには、イベントのメタデータ、データコンテンツ、データ形式などが規定されており、様々なイベントプロバイダからのイベントを統一的な形式で扱えるようになります。

また、イベントエンベロープ形式に関しても標準化されており、JSON、Avro、Protoバッファなど、多様なイベントデータ形式をサポートしています。
さらに HTTP(S) によるイベント転送のプロトコルも規定されており、システム構成を問わず CloudEventsに準拠していればイベントデータをシームレスに受け渡しできるようになっています。

Go の SDK も存在しており、イベントの作成やイベントの取得まで SDK 上で行うことができます。
以下のようにして、簡単にイベントを HTTP リクエストから取得することができます。

import (
  "log/slog"
  "net/http"
  "cloudevents github.com/cloudevents/sdk-go/v2"
  // ...
)

// ...

http.HandleFunc("//google/cloud/storage/object/v1/finalized/default", func(w http.ResponseWriter, r *http.Request) {
  event, err := cloudevents.NewEventFromHTTPRequest(r)
  if err != nil {
    slog.Error("failed to parse cloudevents", err)
      return
    }
  return
})

// ...

このイベントの型は以下ですが、この DataEncoded の部分に実際のイベントの内容が入ってきます。

// <https://github.com/cloudevents/sdk-go/blob/f97061a7e72b17f63120aa553123edb28c49e490/v2/event/event.go#L15>
type Event struct {
  Context     EventContext
  DataEncoded []byte
  // DataBase64 indicates if the event, when serialized, represents
  // the data field using the base64 encoding.
  // In v0.3, this field is superseded by DataContentEncoding
  DataBase64  bool
  FieldErrors map[string]error
}

このイベントの内容に関しては、CloudEvents の範疇ではなくなり、個々のイベントプロバイダでどのような定義がなされているかを確認する必要があります。
例えば Google Cloud 関連のイベントであれば、以下のようにイベントの型が定義されています。

Cloud Storage での finalized イベント(オブジェクトがファイナライズされた時のイベント)に焦点を当てると以下のようになっています。
当該オブジェクトの ID やバケット、Content Type なども取得することができることがわかります。

// <https://github.com/googleapis/google-cloudevents/blob/main/proto/google/events/cloud/storage/v1>

// The CloudEvent raised when an object is finalized in Google Cloud Storage.
message ObjectFinalizedEvent {
  option (google.events.cloud_event_type) =
      "google.cloud.storage.object.v1.finalized";
  option (google.events.cloud_event_extension_name) = "bucket";

  // The data associated with the event.
  StorageObjectData data = 1;
}

// An object within Google Cloud Storage.
message StorageObjectData {
  // Describes the customer-specified mechanism used to store the data at rest.
  message CustomerEncryption {
    // The encryption algorithm.
    string encryption_algorithm = 1;

    // SHA256 hash value of the encryption key.
    string key_sha256 = 2;
  }

  // Content-Encoding of the object data, matching
  // [<https://tools.ietf.org/html/rfc7231#section-3.1.2.2>][RFC 7231 §3.1.2.2]
  string content_encoding = 1;

  // ...
}
// <https://github.com/googleapis/google-cloudevents-go/blob/main/cloud/storagedata/data.pb.go>

// An object within Google Cloud Storage.
type StorageObjectData struct {
  state         protoimpl.MessageState
  sizeCache     protoimpl.SizeCache
  unknownFields protoimpl.UnknownFields

  // Content-Encoding of the object data, matching
  // [<https://tools.ietf.org/html/rfc7231#section-3.1.2.2>][RFC 7231 §3.1.2.2]
  ContentEncoding string `protobuf:"bytes,1,opt,name=content_encoding,json=contentEncoding,proto3" json:"content_encoding,omitempty"`
  
  // ...
}

Eventarc

CloudEvents による標準化により、異なるシステム間でもイベントのポータビリティと相互運用性が確保されることがわかりました。

ここでは、クラウドベンダーが提供するマネージドイベントブローカーサービスや、オープンソースのイベントブローカー基盤など、CloudEventsに準拠した製品・サービスの話をします。

そのひとつが、Google Cloud の Eventarc です。他にも以下のようなサービスがあります。

Eventarc を用いると以下のようことが簡単に実現できるようになります。

  • イベントソースの管理
    • CloudEvents 規格のイベントに対応
    • Google Cloud サービス(Cloud Storage、Cloud Run、Cloud Build など)からのイベントの収集
    • サードパーティのイベントソースからのイベントの収集
  • イベント配信
    • 収集したイベントを Cloud Run、 GKE(プレビュー)、Workflows などの Google Cloud サービスに配信
    • イベントのルーティング
    • イベントのフィルタリング
  • 運用とモニタリング
    • IAM によるアクセス制御
    • コンシューマーにて適切に消費されているかの確認
    • レイテンシがどれくらいかの確認

それでは今回実際に、Cloud Storage のオブジェクトのファイナライズイベントをもとに Cloud Run からそのイベントの内容を処理できるようにしたいと思います。
全体の構成としては以下のようになります。

全体の構成

 

Eventarc の作成

はじめに、Eventarc の作成から行います。Eventarc は、terraform から作成できるようになっています。

まず Eventarc には、適切な権限を付与する必要があり、こちら を参考にしています。

また今回は Cloud Storage や、Cloud Run の設定方法については省略します。
以下のようなコードで Eventarc と Eventarc で扱うサービスアカウントが作成できるはずです。

resource "google_eventarc_trigger" "default" {
  name = "default"

  location                = local.region
  event_data_content_type = "application/json"
  service_account         = google_service_account.eventarc-invoker.email

  matching_criteria {
    attribute = "type"
    value     = "google.cloud.storage.object.v1.finalized"
  }
  matching_criteria {
    attribute = "bucket"
    value     = google_storage_bucket.default.name
  }
  destination {
    cloud_run_service {
      service = google_cloud_run_service.default.name
      region  = local.region
      path    = "/google/cloud/storage/object/v1/finalized/default"
    }
  }
}

// Eventarc で使用するためのサービスアカウント
resource "google_service_account" "eventarc-invoker" {
  account_id   = "eventarc-invoker"
  display_name = "eventarc-invoker"
}

resource "google_project_iam_member" "eventarc-invoker-run-invoker" {
  role    = "roles/run.invoker"
  member  = "serviceAccount:${google_service_account.eventarc-invoker.email}"
}

resource "google_project_iam_member" "eventarc-invoker-event-receiver" {
  role    = "roles/eventarc.eventReceiver"
  member  = "serviceAccount:${google_service_account.eventarc-invoker.email}"
}

google_eventarc_trigger リソースに関する項目をいくつか抜粋して説明します。

  • location: 言わずもがな location です。ただし、この location はイベントのプロデューサー(今回は、Cloud Storage)とコンシューマー(今回は、Cloud Run)と同一でなければなりません。
  • event_data_content_type: event のデータの Content Type です。 “application/json” と “application/protobuf” などが指定できます。
  • matching_criteria: こちらでどのような event を処理するかを指定します。また event の種類によっては、パスパターン を用いることでより柔軟に設定が行えます。2024-05 時点でプレビューですが、terraform も対応しており operator を用いればこのパスパターンを利用することができます。
  • destination: イベントのコンシューマーを指定します。今回は Cloud Run を指定しており、path を指定することによりルーティングまで制御できます。path には、Cloud Storage の default バケットのファイナライズドイベントをハンドリングするという意味で上記のようにしています。

また今回は、Cloud Storage のイベントを処理しますがこちらを行うにあたって別途、Google サービスエージェントに権限を加える必要があります。以下のようなコマンドを打ちます。

$ gcloud projects add-iam-policy-binding PROJECT_ID \\
    --member=serviceAccount:service-PROJECT_NUMBER@gs-project-accounts.iam.gserviceaccount.com \\
    --role=roles/pubsub.publisher

作成できれば、以下のようにコンソールから確認できるはずです。

Eventarc コンソール画面

 

Eventarc ハンドラーの実装

実際に、Eventarc が作成できたので今度は、Cloud Run にて Eventarc ハンドラーの実装を行います。

基本的には、単純に HTTP リクエストを処理できる機構があればよく、先述の通り HTTP リクエストからイベントを取得することができます。

import (
  "log/slog"
  "net/http"
  "cloudevents github.com/cloudevents/sdk-go/v2"
  // ...
)

// ...

http.HandleFunc("//google/cloud/storage/object/v1/finalized/default", func(w http.ResponseWriter, r *http.Request) {
  event, err := cloudevents.NewEventFromHTTPRequest(r)
  if err != nil {
    slog.Error("failed to parse cloudevents", err)
      return
    }
  return
})

// ...

また、イベント内のデータを取得するには以下のように Unmarshal を行います。

import (
  cloudevents "github.com/cloudevents/sdk-go/v2"
  "github.com/googleapis/google-cloudevents-go/cloud/storagedata"
  "google.golang.org/protobuf/encoding/protojson"
)

func UnmarshalEventData(event *cloudevents.Event) (*storagedata.StorageObjectData, error) {
  var data storagedata.StorageObjectData
  err := protojson.Unmarshal(event.Data(), &data)
  if err != nil {
    return nil, err
  }
  return &data, nil
}

※ ちなみに今回は、イベント内のデータの Content Type が application/json でしたが(上記の terraform にて記述。) application/protobuf を指定している場合にはここのデータの Unmarshal には、 proto.Unmarshal を用います。

ここまでで、イベントとイベント内のデータとともに型情報が持てていると思うので、あとは好きなように実装ができるはずです。

Eventarc のメトリクス

最後に Eventarc のメトリクスについて説明をします。

Eventarc では内部で Cloud Pub/Sub を用いて実装が行われているため、上記で Eventarc を作っていればこの時点で Cloud Pub/Sub のトピックやサブスクリプションといった各種リソースが作成されています。
したがって、Eventarc のメトリクスには Cloud Pub/Sub の各種メトリクスを扱うことができます。

※ 本記事では触れませんが、Cloud Pub/Sub を用いていることから同等の再試行制御が行えます。

以下のように、Metrics Explorer から各種 Cloud Pub/Sub の各種メトリクスが確認できるはずです。

すでに Cloud Pub/Sub を利用している場合には、Eventarc 用に再度モニタリングの整備をしなくてよいので助かりますね!

Metrics Explorer のサンプル画面

 

おわりに

今回は、Google Cloud の Eventarc を用いてイベントドリブンアーキテクチャを実現するための方法についてお話しました。

今までもイベントドリブンアーキテクチャのように、イベントをトリガーに他サービス間との通信を非同期で行う試みは行われてきたかと思います。
ただしその場合、「イベント発生後のメッセージの作成やそのメッセージのパブリッシュまでをアトミックに行うこと。」・「それらのメッセージの管理。」など、考慮するところが多くありました。

それらが今回のこの CloudEvents や、Eventarc を用いることにより簡素な実装となりうるかもしれません。

また今回は触れませんでしたが Google Cloud のいくつかの NoSQL サービスのイベントでは、変更前後のデータをイベント内のデータとして持ち合わせているために変更データキャプチャ(CDC)としても応用することができます。

近頃でも、Eventarc でサポートされるサービスが続々と GA になってきているので今後の展開も楽しみです。

最後までご覧いただきありがとうございました!

2023年9月中途入社のサーバーサイドエンジニアです。現在はメディア事業部の FanTech にて、複数プロダクトの開発を行っています。