ABEMA バックエンドエンジニアの大真です。 ABEMAのサブスクリプションシステムをリファクタリングするにあたり、Google Cloud Workflowsを採用しました。本記事では、導入に至った経緯や具体的なシステム構成について詳しく解説します。

従来のサブスクリプションシステムが抱えていた課題

ABEMAのサブスクリプション処理は、決済代行会社からのWebhook通知をトリガーに実行されます。従来、この一連のフローはモノリシックな処理として実装されていました。

従来のアーキテクチャ

この構成には、主に3つの課題がありました。

  1. 拡張性の低下: 決済に伴う付随処理(メール送信、クーポン消費、統計更新など)が増えるたびにコードが肥大化し、依存関係や影響範囲の特定が困難になっていました。

  2. リトライの柔軟性の欠如: 処理の途中でエラーが発生した場合、特定のステップ(例:メール送信のみ)から再開することが難しく、整合性を保ちながらの手動リカバリが必要でした。

  3. 状態把握の困難さ: どのステップでエラーが発生したのか把握するためにはログを追うしかなく、調査に時間がかかっていました。

Google Cloud Workflowsの導入

これらの課題を解決するため、各処理を独立したコンポーネントとして切り出し、実行順序や依存関係を制御する「オーケストレーション層」の導入を検討しました。選定にあたっては、Temporal  WorkflowArgo Workflows なども候補に挙がりましたが以下の理由から見送りました。Temporalはセルフホスト、Cloud版とも運用負荷やコストが見合わないと判断しました。Argo Workflows についても、当時の ABEMA の基盤状況とArgo CD との親和性といった観点から、最適ではないという結論に至りました。最終的には、既存基盤である Google Cloudとの親和性が極めて高く、フルマネージドサービスとして運用負荷を最小限に抑えられる点を評価し、Google Cloud Workflows を採用しました。

導入後の処理の流れ

Google Cloud Workflows導入後の処理の流れについて解説します。

  1. 決済代行サービスから通知(Webhook)を受けると、Subscription-Serviceは通知内容に含まれる一意な値をベースに WorkflowEventIDを発行します。このIDを用いてWorkflowEventテーブルに「未処理」状態でレコードを作成することで、同じ通知が仮に複数回届いても、重複を排除できる設計にしています。
  2. イベントの作成完了をトリガーとして Google Cloud Workflows を起動し、引数としてWorkflowEventIDを渡します。Workflows の各ステップでは、この ID をリクエストに含めてSubscription-Serviceを呼び出します。このように、各ステップの実行に必要なコンテキストを ID で引き回す設計にすることで、Workflows 自体はビジネスロジックの状態を保持せず、純粋なオーケストレーションのみに専念できる構成を実現しています。
  3. Workflowから呼び出された Subscription-Serviceは、受け取ったIDをキーにデータベースを参照し、対象のステップの実行状態を事前に確認した上で処理を開始します。これにより二重実行の防止を行い、冪等性を担保した構成を実現しています。

新アーキテクチャ

Workflows定義(YAML) 

YAML で定義された Workflow は、処理順序の制御やリトライ、エラーハンドリングといった「オーケストレーション」の責務を担います。

運用の工夫として、障害発生時の再実行を柔軟に行えるよう、入力引数の skipEventTypesに特定のeventType指定することで、任意のステップをスキップできる設計にしました。これにより、すでに完了した処理を飛ばし、実行したいステップのみ再実行することが可能です。また、分散された各コンポーネントの処理を横断的に追跡できるよう、HTTP ヘッダーにtraceparent を含めて伝播させ、オブザーバビリティを確保しています。


main:
  params: [req]
  steps:
    - initVars:
        assign:
          - serviceUrlForPrivateAccess: "https://subscription.internal.abema.sample"
          # Service Directoryのリソースフルパス
          - serviceDirectoryFullName: "projects/YOUR_PROJECT_ID/locations/asia-northeast1/namespaces/subscription-internal/services/subscription-api"
          - workflowEventId: ${req.workflowEventId}
          - skipEventTypes: ${default(req.skipEventTypes, [])}
          - traceparent: ${default(req.traceparent, "")}

    - mainProcess:
        try:
          steps:
            # 1. サブスクリプションの同期
            - syncSubscription:
                call: http.post
                args:
                  url: ${serviceUrlForPrivateAccess}
                  private_service_name: ${serviceDirectoryFullName}
                  headers:
                    Content-Type: "application/json"
                    traceparent: ${traceparent}
                  body:
                    id: ${workflowEventId}
                    eventType: "SYNC_SUBSCRIPTION"
                    skipEventTypes: ${skipEventTypes}
                result: syncRes
            
            - assignPlanIds:
                assign:
                  - planId: ${default(syncRes.body.planId, "default_plan")}

            # 2. 並列実行セクション
            - parallelBranches:
                parallel:
                  shared: [serviceUrlForPrivateAccess, serviceDirectoryFullName, workflowEventId, skipEventTypes, planId, traceparent]
                  branches:
                    - sendEmail:
                        call: http.post
                        args:
                          url: ${serviceUrlForPrivateAccess}
                          private_service_name: ${serviceDirectoryFullName}
                          body: 
                            id: ${workflowEventId}
                            eventType: "SEND_EMAIL"
                            skipEventTypes: ${skipEventTypes}
                            planId: ${planId}
                    
                    - consumeCoupon:
                        call: http.post
                        args:
                          url: ${serviceUrlForPrivateAccess}
                          private_service_name: ${serviceDirectoryFullName}
                          body: 
                            id: ${workflowEventId}
                            eventType: "CONSUME_COUPON"
                            skipEventTypes: ${skipEventTypes}

        except:
          as: e
          steps:
            - failure:
                call: http.post
                args:
                  url: ${serviceUrlForPrivateAccess}
                  private_service_name: ${serviceDirectoryFullName}
                  body: { id: ${workflowEventId}, eventType: "WORKFLOW_FAILED", error: ${e} }
            - raiseOriginalError:
                raise: ${e}

※ 本記事で紹介するコードや構成、リソース名などは、解説のために簡略化・一般化したものです。実際のプロダクション環境で利用している実装とは一部異なりますのでご了承ください。

レシーバー側の実装(Go)

Google Cloud Workflowsからのリクエストを受けるSubscription-Serviceのエンドポイントも、役割を明確にするため刷新しました。リクエストボディの eventType に基づき、処理を各関数へディスパッチするシンプルな構造にしています。


// WorkflowRequest はGoogle Cloud Workflowsから送られてくる JSON構造を定義
type WorkflowRequest struct {
	ID             string   `json:"id"`             // Workflowsから引き回される workflowEventID
	EventType      string   `json:"eventType"`      // 実行する処理の種類
	PlanID         string   `json:"planId,omitempty"`
	SkipEventTypes []string `json:"skipEventTypes"` // リトライ時などにスキップしたい処理リスト
}

func workflowHandler(w http.ResponseWriter, r *http.Request) {
	var req WorkflowRequest
	if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
		log.Printf("Failed to decode JSON: %v", err)
		http.Error(w, "Invalid request body", http.StatusBadRequest)
		return
	}

	// 1. 指定されたイベントタイプのスキップ判定
	if contains(req.SkipEventTypes, req.EventType) {
		log.Printf("[Skip] EventType=%s (Requested by skipEventTypes)", req.EventType)
		w.WriteHeader(http.StatusOK)
		return
	}

	// 2. event テーブルを参照し、このステップの実行済みチェック(冪等性の担保)
	// すでに COMPLETED になっている場合は、処理をスキップ
	status, err := getEventStatus(req.ID, req.EventType)
	if err != nil {
		log.Printf("Failed to fetch event status: %v", err)
		http.Error(w, "Internal Server Error", http.StatusInternalServerError)
		return
	}
	if status == "COMPLETED" {
		log.Printf("[Skip] EventType=%s (Already completed for ID=%s)", req.EventType, req.ID)
		w.WriteHeader(http.StatusOK)
		return
	}

	// 3. eventType に基づき処理を分岐
	var procErr error
	switch req.EventType {
	case "SYNC_SUBSCRIPTION":
		procErr = syncSubscription(req.ID)
	case "CONSUME_COUPON":
		procErr = consumeCoupon(req.ID, req.PlanID)
	case "SEND_EMAIL":
		procErr = sendEmail(req.ID, req.PlanID)
	case "WORKFLOW_FAILED":
		procErr = handleWorkflowFail(req.ID)
	default:
		log.Printf("Unknown event type: %s", req.EventType)
		http.Error(w, "Unknown event type", http.StatusBadRequest)
		return
	}

	// 4. 処理結果に応じたステータス更新
	if procErr != nil {
		log.Printf("[Error] Failed to process %s: %v", req.EventType, procErr)
		// Workflows側でリトライさせるため、エラーを返す
		http.Error(w, "Processing failed", http.StatusInternalServerError)
		return
	}

	// 成功したら event テーブルのステータスを COMPLETED に更新
	if err := updateEventStatus(req.ID, req.EventType, "COMPLETED"); err != nil {
		log.Printf("Failed to update event status: %v", err)
		// ステータス更新失敗もエラーとして扱い、Workflowsに再送を促す
		http.Error(w, "Status update failed", http.StatusInternalServerError)
		return
	}

	w.WriteHeader(http.StatusOK)
	fmt.Fprint(w, "Success")
}


func updateEventStatus(id, eventType, status string) error {
	// 実際には DB (eventテーブル) のステータスを更新する
	log.Printf("[DB Update] ID=%s, Event=%s, Status=%s", id, eventType, status)
	return nil
}

func syncSubscription(id string) error {
	log.Printf("[Processing] Sync Subscription: ID=%s", id)
	return nil
}

func consumeCoupon(id, planID string) error {
	log.Printf("[Processing] Consume Coupon: ID=%s, Plan=%s", id, planID)
	return nil
}

func sendEmail(id, planID string) error {
	log.Printf("[Processing] Send Email: ID=%s, Plan=%s", id, planID)
	return nil
}

func handleWorkflowFail(id string) error {
    log.Printf("[Failure Handler] Recording failure for ID=%s", id)
    return nil
}

処理の可視化

Google Cloud コンソール上で「どのステップで、どのようなエラーが出たか」をGUIから一目で把握できるため障害発生時の調査に短縮に役立てています。

Workflows GUI

セキュリティを考慮したインフラ構成

Google Cloud Workflowsからリクエストを受けるSubscription-Serviceは Google Kubernetes Engine(GKE)のprivate cluster 内で稼働しています。セキュリティを担保しつつ、Google Cloud Workflowsから安全に通信するために、こちらの記事を参考にしてService Directory と Internal Application Load Balancer (ALB) を組み合わせた構成を採用していますインフラ図

上記構成により、Workflows のyaml定義内で private_service_name に Service Directory のリソース名を指定するだけでVirtual Private Cloud内部での通信が完結します。また、予約済みの静的内部 IP を持つ Internal Application Load Balancerをエンドポイントとし、証明書をアタッチすることで、VPC 内の通信であっても HTTPS による暗号化をおこないパブリックなネットワークへの露出を一切排除した、安全な通信経路を実現しています。

Service Directoryの設定例(Terraform)


# 1. サービスをグループ化する「名前空間」の作成
resource "google_service_directory_namespace" "subscription" {
  namespace_id = "subscription-internal"
  location = "asia-northeast1"
}

# 2. 名前解決の対象となる「サービス」の定義
resource "google_service_directory_service" "api" {
  service_id = "subscription-api"
  namespace = google_service_directory_namespace.subscription.id
}

# 3. 実際の接続先(Internal Application Load Balancerの内部IP)を紐付ける「エンドポイント」
resource "google_service_directory_endpoint" "lb" {
  endpoint_id = "internal-lb-endpoint"
  service = google_service_directory_service.api.id

  # Internal ALBの静的内部IPとポートを指定
  address = google_compute_address.internal_lb_address.address
  port = 443
  network = data.google_compute_network.main_vpc.id
}

導入の効果と今後の展望

Google Cloud Workflowsの導入により、以下の成果が得られました。

  • 運用の可視化: Google Cloud コンソール上で「どのステップで、どのようなエラーが出たか」をGUIから一目で把握できるようになり、調査時間を短縮することができました。

  • 開発効率の向上: 各処理がコンポーネント化されたことで、テストコードの記述や新機能の追加が容易になりました。

  • システムの安定化: 二重実行の防止と冪等性の担保、そしてWorkflowsによるリトライ制御を組み合わせたことで、堅牢で安定したシステムを実現しました。

今後もこの基盤を活用し、ABEMAの根幹である決済システムを、より堅牢でスケーラブルなものへと進化させていきます。