この記事は CyberAgent Developers Advent Calendar 2025 の2日目の記事です。

AI Shift で AI Worker の開発・運用に携わっています、鈴木 (@amata1219) と申します。

今年度に配属されて以降、AI Worker の非同期タスク実行基盤としての Redis Streams を利用したキューコンシューマーの開発・運用に主として取り組んできました。この経験を踏まえ、直近の業務ではキューコンシューマーの信頼性を向上するための抜本的な改修を行い、以降のここ1ヶ月は実行基盤に起因する確認された障害は0件になりました。

本記事では Redis Streams をベースに、耐障害性と優先度制御を備えた、スケーラブルかつ高信頼性のキューコンシューマーの設計方法を紹介します。

Redis Streams ベースのキューコンシューマー?

Redis Streams は、単調増加する ID を持つエントリ(メッセージ)[1]が時系列順にソートされた append-only なログであり、range read やコンシューマーグループによる配信管理、Pending Entries List を通じた再配信が可能な Redis のネイティブデータ型です。Apache Kafka や RabbitMQ Quorum Queues と比較されるようなメッセージブローカーとして機能します。AI Worker は、この Redis Streams をキューイングシステムとして利用し、効率的にタスクを非同期処理する仕組みを実現しています。

なぜキューコンシューマーが必要なんだ?

このような非同期タスク実行基盤が必要な背景としては、サービスからの次のような要求があります。

第一に、システムを疎結合化しメンテナンス性を高く保ちたい意図があります。AI Worker はサービスの特徴として、主要な機能で LLM API を頻繁に呼び出します。LLM API はリクエストの内容次第で応答時間が数秒から数分にまで伸びる可能性があります。このような外部 API 呼び出しを含むタスク実行の際にキューを経由することで、呼び出し側はタスクがどのように実行されるかを気にせず、裏側の都合をロジックに滲ませる必要が無くなります。

第二に、負荷を時間的に均したいとの要求があります。タスクを同期的に実行していると、突発的にトラフィックが増加した場合にサーバーが負荷に耐えきれず、タイムアウトを頻発したりダウンしたりする可能性があります。代わりにリクエストをキューに積むことで、タスクを一定のペースで処理しながらもトラフィックの増加に耐えることができます。[2]

第三に、サービスの信頼性を高く保つ必要性があります。タイムアウトやネットワークエラー、一時的なサービス障害など、様々な理由でタスクは失敗する可能性があります。アプリケーション側からワーカー側の可用性を把握することは難しく、同期的実行のリトライは投機的になりがちです。キューイングシステムを利用することで、ワーカーがタスクを処理できるタイミングでのリトライが可能になり、処理の成功率が高まります。

この他にも、ワーカーのスケール可能性を高めたい、重い処理でも素早いレスポンスで体感のユーザー体験を改善したい、といった背景からタスクを非同期的に実行したいニーズがあります。

わかった、でもそれってどうやって作るのさ?

Redis Streams も、それを利用したキューコンシューマーも、タスクの同期的実行にまつわる様々な問題を解決しますが、それだけで十分ではありません。スケーラブルで高信頼性のコンシューマーには、主に以下の処理や機構を実装しなければなりません。

まず、水平スケールを可能にするためには、分散・並列処理が欠かせません。Redis Streams はキューから複数の分散されたコンシューマーへのメッセージ配信を請け負うため、分散処理についてはあまり気にしなくて問題ありません。それ以降のメッセージの並列処理を別の手段で用意する必要があります。

また、コンシューマーレベルでメッセージの再配信を容易にする操作や仕組みが提供されます。これを活用してリトライ機構を搭載することになります。

そして、Redis Streams がサポートする読み取り操作は、時系列の読み取りとランダムアクセスに限られます。これらの操作では、優先度に応じて読み取るメッセージを変えることができません。もし一部のメッセージを優先的に処理する必要がある場合は、追加の機構を備え付ける必要があります。

他にもコンシューマーの終了時や Thundering Herd のような特定のケースにおいても、サービスの信頼性を低下させない工夫が要求されます。

OK, やってみよう!

以降はスケーラブルで信頼性の高いキューコンシューマーの具体的な実装方法を説明します。

実際のキューコンシューマーのコードは TypeScript で記述されています。確実に動作する説明・例示のために TypeScript を用いますが、他のプログラミング言語でも基本的に通用する内容です。

Fire-And-Forget、制限付きの並列性、コンシューマーグループ

打ちっぱなし

分かりやすさのため Redis Streams から逐次的にメッセージを読み取る処理から始めます。ここに Fire-And-Forget, 制限付きの並列性, そしてコンシューマーグループといった概念を一つひとつ導入し、処理の特性の変化を観察しながら理解を深めます。

while (true) {
  const messages = await queueConsumer.readMessages()
  for (const message of messages) {
    await handleMessage(message)
  }
}

この処理はキューから複数のメッセージを読み取り、メッセージ一つひとつを逐次処理しています。もし await handleMessage() が秒単位で時間を要する処理を行うと、何が起きるでしょうか? 処理の完了まで、次の message の処理が目に見えて遅延します。

では、Promise.all() を使い、並列化するのはどうでしょうか?

while (true) {
  const messages = await queueConsumer.readMessages()
  await Promise.all(messages.map(async (message) => await handleMessage(message)))
}

少なくとも一度に読み取った messages は上手く処理されそうです。しかし、その中の1件でも時間を要する処理を行うものがあれば、while ループが進まず次のメッセージの読み取りが遅れます。1件ずつの逐次処理が k 件ずつの並列処理に拡張されただけです。

私たちが求めているのは、それではありません。一度のメッセージ読み取り件数に依らない並列性です。await handleMessage()に左右されず、メッセージを並列に処理することを求めています。

そこで、Fire-And-Forget[3] を導入します。ざっくり言えば、await を外すだけです。

while (true) {
  const messages = await queueConsumer.readMessages()
  for (const message of messages) {
    // 非同期関数を `await` せずに呼ぶ場合は、戻り値を使用しないことを `void` で明示すると親切です[4]
    void handleMessage(message)
  }
}

handleMessage() 自体は Promise を返す関数です。awaitPromise が解決されるまで待機するため、ループを止める原因となっていました。それを取り除くことで handleMessage() に左右されずメッセージ読み取りループを回し続けられるようになりました。

Fire-And-Forget は処理の開始のみ行い、その完了は待たない非同期処理パターンです。現在の状態でも handleMessage() 内の処理は正常に実行されています。ただ、関数の実行完了を待たなくなるため、try / catch で例外を呼び出し元で捕捉できなくなったり、finally 内の処理が関数の終了前に実行されたりしてしまいます。このパターンでは Promise チェーンを記述し、その中で例外処理をしなければなりません。

見やすさのためコード全体は割愛しますが、handleMessage().finally(() => finalize()) のようになります。Node.js では未ハンドルの Promise rejection が警告やプロセス終了に繋がるため、.catch() によるエラーハンドリングも行ってください。

これでキューコンシューマーに並列処理を導入できました。

用法・用量に制限あり

でも、待ってください。そのようなことをすれば、メッセージの読み取りを高速ループで試行し続け、計算資源を枯渇させるのではないでしょうか? その通りで、このままの実装では大量のメッセージ処理によるメモリ不足が発生し得ます。また、不必要なメッセージ読み取りは、Redis やネットワークの負荷を高めます。

while (true) {
  const messages = await queueConsumer.readMessages({
    blockTimeoutMs: 1000 // キューが空でも最大 1000ms は Redis 側で待機します
  })
  for (const message of messages) {
    void handleMessage(message)
  }
}

import * as redis from 'redis'
export class QueueConsumer {
  async readMessages({ blockTimeoutMs }: { blockTimeoutMs: number }) {
    return this.client.xRead(..., { BLOCK: blockTimeoutMs })
  }
}

このように読み取り処理をブロッキング化するのは有効な解決策でしょうか? これは有益ですが、十分ではありません。

過剰な読み取り処理は解消されました。ただ、同時に実行可能なタスクの数を制限していません。もしリクエストが絶え間なくキューイングされている中で、ブロックのタイムアウト時間内にタスクが実行完了しきらないことがほとんどの場合、同時に実行されるタスクは膨れ上がります。いずれ計算資源を使い果たし、コンシューマーの可用性を損なうことが予想されます。

では、実行中のタスクの数を測定し、それを通じて並列性を制御することはできないでしょうか?

const MAX_INFLIGHT_TASKS = 100

let inFlightTasks = 0
while (true) {
  if (inFlightTasks >= MAX_INFLIGHT_TASKS) {
    await sleep(50)
    continue
  }
  const messages = await queueConsumer.readMessages({ blockTimeoutMs: 1000 })
  for (const message of messages) {
    inFlightTasks++
    void handleMessage(message)
      .catch(() => {
        // 適切なエラーハンドリングをします
      })
      .finally(() => {
        inFlightTasks--
      })
  }
}

実行中のタスクのカウンターを設け、タスクの非同期実行の直前にインクリメント、終了後にデクリメントします。毎ループの冒頭で実行中のタスクが上限に達している場合は、メッセージの読み取りを行わず一定時間待機します。簡素ですが実際にこれは上手く機能します。

並列性を厳格に制限する場合は、読み取り件数の動的な調整も行う必要があります。

const MAX_INFLIGHT_TASKS = 100
const BATCH_SIZE = 50

let inFlightTasks = 0
while (true) {
  if (inFlightTasks >= MAX_INFLIGHT_TASKS) {
    await sleep(50)
    continue
  }
  const batchSize = Math.min(BATCH_SIZE, MAX_INFLIGHT_TASKS - inFlightTasks)
  const messages = await queueConsumer.readMessages({ blockTimeoutMs: 1000, batchSize })
  for (const message of messages) {
    inFlightTasks++
    void handleMessage(message)
      .catch(() => {
        // 適切なエラーハンドリングをします
      })
      .finally(() => {
        inFlightTasks--
      })
  }
}

async readMessages({ blockTimeoutMs, batchSize }: { blockTimeoutMs: number, batchSize: number }) {
  return this.client.xRead(..., { BLOCK: blockTimeoutMs, COUNT: batchSize })
}

この制限が機能する前提条件として、inFlightTasks が常に現在実行中のタスク数を正確に表している必要があります。つまり、ここでは inFlightTasks の増減が正しく行われていなければなりません。実際のプログラムでは、タスク数の追跡ロジックを一つの関数として切り出し、責任の境界を明確化したりテスト可能性を高めたりすることで、プログラムの正当性を保証しやすくしています。

const MAX_INFLIGHT_TASKS = 100
const BATCH_SIZE = 50

const { inFlightTasks, runTask } = createConcurrencyLimiter()
while (true) {
  if (inFlightTasks() >= MAX_INFLIGHT_TASKS) {
    await sleep(50)
    continue
  }
  const batchSize = Math.min(BATCH_SIZE, MAX_INFLIGHT_TASKS - inFlightTasks())
  const messages = await queueConsumer.readMessages({ blockTimeoutMs: 1000, batchSize })
  for (const message of messages) {
    runTask(async () => await handleMessage(message))
  }
}

function createConcurrencyLimiter() {
  let inFlightTasks = 0
  const runTask = (task: () => Promise<void>) => {
    inFlightTasks++
    void (async () => {
      try {
        await task()
      } finally {
        inFlightTasks--
      }
    })()
  }
  return {
    inFlightTasks: () => inFlightTasks,
    runTask
  }
}

これで並列性に制限を加え、安全に大量のメッセージを処理できるようになりました。

協調せねば

キューコンシューマー内でのメッセージ処理は効率化されましたが、それでも性能の限界があります。大量のメッセージを捌くには、複数のコンシューマーを稼働させなければなりません。ただ、複数のコンシューマーがキューからメッセージを競合せずに読み取る保証はありません。以下の読み取り処理を行うコンシューマーは、他のコンシューマーと協調せず、既に処理中のメッセージを読み取る可能性があります。

import * as redis from 'redis'
export class QueueConsumer {
  async readMessages() {
    return this.client.xRead(...)
  }
}

Redis Streams のコンシューマーグループは、同じグループに属するコンシューマー間で通常は読み取りが競合しない保証を作り出します。冒頭に触れた Pending Entries List (PEL) を通じて、どのメッセージがどのコンシューマーに配信済みであるかを管理します。

import * as redis from 'redis'

// 1. 指定のストリームの中で一意なコンシューマーグループを作成します (前提条件)
client.xCreateGroup(streamName, consumerGroupName, '0') // ストリームの存在が前提のため、必要に応じて MKSTREAM (make stream) を指定します

// (2. 当該コンシューマーグループ内に新たにコンシューマーを作成します)
// 注: 読み取り時にコンシューマーが存在しない場合は暗黙的に作成されるため、この操作はオプションです
client.xGroupCreateConsumer(streamName, consumerGroupName, consumerName)

export class QueueConsumer {
  async readMessages() {
    // 3. `XREADGROUP` の際に、その読み取りがどのグループのどのコンシューマーのものであるか設定します
    // `batchSize` の指定も必要です
    return this.client.xReadGroup(this.consumerGroupName, this.consumerName)
  }
}

// 4. グループに不要なコンシューマーが滞留しないよう、コンシューマー終了時は削除するのが望ましいです
client.xGroupDelConsumer(streamName, consumerGroupName, consumerName)

これで複数のコンシューマーが同じメッセージを処理することは基本的に無くなりました。PEL にある (XACK されていない) メッセージに対し XCLAIMXAUTOCLAIM を行うような、所有権が移転し得るロジックが含まれている場合はその限りではありません。これについては次節で説明します。

コンシューマー名は一意であることが推奨されています。コンシューマー毎の生存状況や XACK されていないメッセージの数などを把握しやすくするという主に運用上の理由から、コンシューマー毎に異なる名前を与えることが望ましいです。

function generateConsumerNameWith(prefix: string): string {
  const suffix = uuidv4().slice(0, 16)
  return `${prefix}-${suffix}`
}

const consumerName = generateConsumerNameWith(`myqueue-${hostname()}`)

キューコンシューマーの分散・並列化への対応が完了しました。

const MYQUEUE_CONSUMER_GROUP_NAME = 'mq-group'
const consumerName = generateConsumerNameWith(`myqueue-${hostname()}`)
const queueConsumer = QueueConsumer.prepared(MYQUEUE_CONSUMER_GROUP_NAME, consumerName)

const MAX_INFLIGHT_TASKS = 100
const BATCH_SIZE = 50

const { inFlightTasks, runTask } = createConcurrencyLimiter()
while (true) {
  if (inFlightTasks() >= MAX_INFLIGHT_TASKS) {
    await sleep(50)
    continue
  }
  const batchSize = Math.min(BATCH_SIZE, MAX_INFLIGHT_TASKS - inFlightTasks())
  const messages = await queueConsumer.readMessages({ blockTimeoutMs: 1000, batchSize })
  for (const message of messages) {
    runTask(async () => await handleMessage(message))
  }
}

XAUTOCLAIM は補助、XCLAIM ハートビート、リトライの使い分け

失敗なんてなかった?

分かりやすさのため、プログラムを途中まで巻き戻します。

while (true) {
  const messages = await queueConsumer.readMessages()
  for (const message of messages) {
    void handleMessage(message)
  }
}

私たちは QueueConsumer#readMessages() の中で XREADGROUP 操作を介してキューからメッセージを読み取ってきました。これは読み取ったメッセージが完全に処理され、常に XACK されることが保証されているのなら問題ありません。しかし現実にはコンシューマーのハングやクラッシュが起きるかもしれません。メッセージは XACK されない限り PEL に残り続けます。

そこでどのコンシューマーにも認知されず PEL に滞留しているメッセージを定期的に回収する必要があります。XAUTOCLAIM はそれを可能にする便利な操作です。

async readPendingMessages() {
  const minIdleTimeMs = 60000
  const start = '0-0'
  // `batchSize` の指定や、設計次第では全件ループのために戻り値の `nextStartId` の参照も必要です
  return this.client.xAutoClaim(..., minIdleTimeMs, start)
}

XAUTOCLAIM は、指定されたアイドルタイムより長く PEL にあり、指定されたもの以上の ID を持つメッセージの所有権を取得します。上記の例では、60000 ミリ秒より長く PEL に残っている全てのメッセージが対象になります。原理的には XPENDINGXCLAIM の併用による所有権の移転と同じですが、それを Redis 側で原子的に行います。これをループに組み込むことで、メッセージの再処理が可能になります。

while (true) {
  const messages = await queueConsumer.readMessages()
  for (const message of messages) {
    void handleMessage(message)
  }
  const pendingMessages = await queueConsumer.readPendingMessages()
  for (const pendingMessage of pendingMessages) {
    void handleMessage(pendingMessage)
  }
}

毎ループに通常の読み取りと PEL にあるメッセージの読み取りを行うようにしました。PEL に残り続けているメッセージの有無を常に確認し、必要があれば再処理します。これでリトライ機構を整備できたように見えます。

実際にはこのロジックは様々な問題を抱えています。第一に処理に成功するまでリトライが無制限に行われます。第二にループの度にメッセージの回収を試みるのは無駄な操作になりがちです。もし毎ループの回収が必要なほど PEL に滞留メッセージが生じるのなら、多くの場合そのシステムは何かが間違っているかもしれません。そして、第三に XAUTOCLAIM にフィルタとして指定されるアイドルタイムより長い時間を要するタスクのメッセージは、タスクの実行中に再配信され多重実行が発生します。

1つ目のリトライが無制限に行われる問題は、メッセージの配信回数を確認し、閾値を超えている場合にメッセージを削除することで解決します。

const MAX_HANDLE_ATTEMPTS_PER_MESSAGE = 5

while (true) {
  // 省略

  const pendingMessages = await queueConsumer.readPendingMessages()
  for (const pendingMessage of pendingMessages) {
    const deliveriesCount = await getDeliveriesCountOf(pendingMessage.id)
    // 配信回数は読み取り時点で加算されるため、`MAX_HANDLE_ATTEMPTS_PER_MESSAGE` より大きければ最大試行回数に達したと見做せます
    if (deliveriesCount > MAX_HANDLE_ATTEMPTS_PER_MESSAGE) {
      await settleMessage(pendingMessage.id)
      continue
    }

    void handleMessage(pendingMessage)
  }
}

async getDeliveriesCountOf(messageId: string) {
  const start = messageId
  const end = messageId
  const count = 1
  return this.client.xPendingRange(..., start, end, count)[0].deliveriesCounter
}

async settleMessage(messageId: string) {
  await this.client.xAck(..., messageId)
  // メッセージをキューから物理的に削除したい場合は、`XDEL` もしなければなりません
  await this.client.xDel(..., messageId)
}

2つ目のループの度に回収が試みられる問題は、XAUTOCLAIM の実行を条件付きにすることで解決します。規定のアイドルタイム以上のメッセージしか回収しないことから、最後に回収した時刻からアイドルタイム分の時間経過後に再び回収を行います。XAUTOCLAIM はあくまで再処理のための補助的な操作として利用します。

メッセージの回収頻度が大幅に下がるため、`batchSize` の指定を通じて一度の読み取り件数を増やすか、常に全件処理するか等を実際のプログラムでは検討してください。

const MESSAGE_REDELIVERY_MIN_IDLE_MS = 60000
let lastPendingMessagesReadAtMs = Date.now()
while (true) {
  // 省略

  const now = Date.now()
  if (now - lastPendingMessagesReadAtMs > MESSAGE_REDELIVERY_MIN_IDLE_MS) {
    const pendingMessages = await queueConsumer.readPendingMessages()
    lastPendingMessagesReadAtMs = now
    for (const pendingMessage of pendingMessages) {
      void handleMessage(pendingMessage)
    }
  }
}

私は、ここにいる

もう一つ、多重実行の問題を解決しましょう。行うタスクに冪等性が無ければ、システムが意図しない状態に陥る可能性があります。たとえメッセージ処理に冪等性があったとしても、計算資源を空費しコンシューマーの性能を悪化させる要因になり得ます。不要な処理は行わないのが望ましいです。

どうすればタスクが実行中であることを Redis に示せるでしょうか?

ここで良い知らせがあります。PEL にあるメッセージのアイドルタイムはリセットできます。コンシューマーは自身が処理中のメッセージに対して XCLAIM を行うことで、そのアイドルタイムを 0 に上書きできます。タスクの実行中にこれを定期的に行うことで、XAUTOCLAIM による再配信を防ぎ多重実行の可能性を大きく減じることができます。Redis Streams を利用する以上、基本的に at-least-once になることには注意してください。

while (true) {
  const messages = await queueConsumer.readMessages()
  for (const message of messages) {
    void handleMessage(message)
  }
}

const MESSAGE_REDELIVERY_MIN_IDLE_MS = 60000

// ハートビートの間隔は `XAUTOCLAIM` の実行頻度に基づき決定されなければなりません
// アイドルタイムのリセットが常に間に合うと言えるような値を設定します
const HEARTBEAT_INTERVAL_MS = MESSAGE_REDELIVERY_MIN_IDLE_MS / 2

const handleMessage = async (message) => {
  const stopController = new AbortController()
  const loop = async () => {
    while (!stopController.signal.aborted) {
      await sleep(HEARTBEAT_INTERVAL_MS)
      await queueConsumer.keepAlive(message.id)
    }
  }
  void loop()

  // タスク本体の処理を実行しています
  await doTask()

  stopController.abort()
}

async keepAlive(messageId: string) {
 const newMinIdleTime = 0

  // `minIdleTime` に `0` を指定することで、アイドルタイムにかかわらず常に対象メッセージを claim します
  // claim の副作用として、アイドルタイムがリセットされます
  // `JUSTID` オプションを使い、ハートビートにより配信回数がインクリメントされるのを防ぎます
  await this.client.xClaimJustId(newMinIdleTime, messageId)
}

タスクを開始する前に XCLAIM をハートビートとして発するループを起動します。周期は XAUTOCLAIM より短くします。

XCLAIM が失敗する可能性を考慮して、数回程度の再試行ロジックを入れるとより安全です。それも踏まえたハートビート自体の処理は以下の通りです。

const MESSAGE_REDELIVERY_MIN_IDLE_MS = 60000
const HEARTBEAT_INTERVAL_MS = MESSAGE_REDELIVERY_MIN_IDLE_MS / 2
function startHeartbeat(messageId: string): () => void {
  const stopController = new AbortController()
  const loop = async () => {
    while (!stopController.signal.aborted) {
      await sleep(HEARTBEAT_INTERVAL_MS)
      const maxAttempts = 5
      const retryIntervalMs = 1000
      for (let attempts = 0; attempts < maxAttempts; attempts++) {
        try {
          await queueConsumer.keepAlive(messageId)
          break
        } catch (err) {
          // 未知のエラーでない限りは無視してしまいます
        }
      }
    }
  }
  void loop()
  return () => stopController.abort()
}

async function handleMessage(message) {
  let stopHeartbeat: (() => void) | undefined
  try {
    stopHeartbeat = startHeartbeat(message.id)
    await doTask()
  } catch (err) {
    // タスクの種類に応じた適切なエラーハンドリングをします
  } finally {
    stopHeartbeat?.()
  }
}

安全なリトライ機構ができました。

最後の切り札

メッセージ処理の失敗時にリトライする仕組みが安全に搭載されたため、安定的にタスクが実行される可能性が高まりました。それでもこのリトライ機構には更なる信頼性向上の余地があります。

今まで失敗の内容を曖昧なままにしていました。それはネットワークエラー、タイムアウト、コンシューマーのクラッシュ、あるいは他の何かかもしれません。この中にはコンシューマーレベルで再試行せずとも、アプリケーションレベルの再試行ロジックの方が適切なものもあります。ネットワークエラーやタイムアウトはそれに該当します。反対にコンシューマーのクラッシュについては、アプリケーション側でそれを把握する術が無いため対応することができません。

コンシューマーレベルとアプリケーションレベルのリトライを使い分けることで、コンシューマーレベルのリトライを温存できます。より効率的に再試行できる点でも望ましいです。

async function handleMessage(message) {
  await doTask()
}

const doTask = async () => {
  const maxAttempts = 5
  for (let attempts = 0; attempts < maxAttempts; attempts++) {
    const response = await myApi.call()
    if (response.status.isOk()) return
    await sleep(1000)
  }
  // エラーを返します
}

キューコンシューマーの信頼性を高めるリトライ機構の実装が完了しました。前節の分散・並列処理も加えると以下のようになります。下記のコードには含めていませんが、常に処理に失敗する Poison Message は再処理を試みないようにするとより効率的です。Poison 判定したメッセージは Dead Letter Queue に退避し、正しく処理できるようにする仕組みを備えると更に堅牢です。

const MYQUEUE_CONSUMER_GROUP_NAME = 'mq-group'
const consumerName = generateConsumerNameWith(`myqueue-${hostname()}`)
const queueConsumer = QueueConsumer.prepared(MYQUEUE_CONSUMER_GROUP_NAME, consumerName)

const MAX_INFLIGHT_TASKS = 100
const { inFlightTasks, runTask } = createConcurrencyLimiter()

const BATCH_SIZE = 50

const MESSAGE_REDELIVERY_MIN_IDLE_MS = 60000
let lastPendingMessagesReadAtMs = Date.now()

const MAX_HANDLE_ATTEMPTS_PER_MESSAGE = 5

while (true) {
  if (inFlightTasks() >= MAX_INFLIGHT_TASKS) {
    await sleep(50)
    continue
  }
  const batchSize = Math.min(BATCH_SIZE, MAX_INFLIGHT_TASKS - inFlightTasks())
  const messages = await queueConsumer.readMessages({ blockTimeoutMs: 1000, batchSize })
  for (const message of messages) {
    runTask(async () => await handleMessage(message))
  }

  const now = Date.now()
  if (now - lastPendingMessagesReadAtMs > MESSAGE_REDELIVERY_MIN_IDLE_MS) {
    const pendingBatchSize = Math.min(BATCH_SIZE, MAX_INFLIGHT_TASKS - inFlightTasks())
    const pendingMessages = await queueConsumer.readPendingMessages({ minIdleTimeMs: MESSAGE_REDELIVERY_MIN_IDLE_MS, batchSize: pendingBatchSize })
    lastPendingMessagesReadAtMs = now
    for (const pendingMessage of pendingMessages) {
      const deliveriesCount = await getDeliveriesCountOf(pendingMessage.id)
      if (deliveriesCount <= MAX_HANDLE_ATTEMPTS_PER_MESSAGE) {
        runTask(async () => await handleMessage(pendingMessage))
      }
    }
  }
}

const handleMessage = async (message) => {
  let stopHeartbeat: (() => void) | undefined
  try {
    stopHeartbeat = startHeartbeat(message.id)
    await doTask()
    await settleMessage(message.id)
  } catch (err) {
    // 省略
  } finally {
    stopHeartbeat?.()
  }
}

const HEARTBEAT_INTERVAL_MS = MESSAGE_REDELIVERY_MIN_IDLE_MS / 2
function startHeartbeat(messageId: string): () => void {
  const stopController = new AbortController()
  const loop = async () => {
    while (!stopController.signal.aborted) {
      await sleep(HEARTBEAT_INTERVAL_MS)
      const maxAttempts = 5
      const retryIntervalMs = 1000
      for (let attempts = 0; attempts < maxAttempts; attempts++) {
        try {
          await queueConsumer.keepAlive(messageId)
          break
        } catch (err) {
          // 省略
        }
      }
    }
  }
  void loop()
  return () => stopController.abort()
}

応用編1: 複数のキュー + ラウンドロビン による優先度制御

これでキューコンシューマーはスケーラブルで信頼性の高いものになりました。本当でしょうか?

上記の実装は多くの場面でよく機能します。ただ、キューの中に複数の種類のメッセージが混在し、その比率に偏りがある場合はそうであるとは限りません。特定の種類のメッセージが膨大な数でタスク実行枠を埋め尽くし、他の種類のメッセージの処理が進まない可能性があります。実際に AI Worker はこの問題に直面しました。

type Message = {
  type: 'realtime' | 'batch'
  payload: {}
}

例えば、リアルタイムに行う必要のある処理が大量のバッチ処理に圧迫されるケースを考えます。type: 'batch' なメッセージが大量にキューイングされ、読み取り続けてもなかなか type: 'realtime' なメッセージを得られない状況です。

それならば type: 'realtime' なメッセージが type: 'batch' なメッセージに優先して読み取り結果に含まれれば良さそうです。ただ、 Redis Streams はネイティブでそのような読み取り操作をサポートしていません。

発想を転換します。読み取りはメッセージの種類毎に行います。種別の読み取りにより、常に 'type': 'realtime' なメッセージに読み取りチャンスが与えられることを保証します。

残念ながら、これも Redis Streams で効率的に行うことはできません。サポートされている読み取り操作は、時系列の読み取りかランダムアクセスの性質を持つもののみです。これらを利用・組み合わせて、種類別の効率的な読み取りを実現することはできません。

でもその発想は活かすことができます。

[
  'myqueue:realtime',
  'myqueue:batch',
]

このように、メッセージの種類に応じてキューを分ければ、事実上の種類別の読み取りを実現できます。それぞれのキューに対し読み取り操作を行うだけです。

while (true) {
  const messages = []
  messages.push(...(await queueConsumer.readMessagesFrom('myqueue:realtime')))
  messages.push(...(await queueConsumer.readMessagesFrom('myqueue:realtime'))) // 読み取り回数を増やすことで、実質的な優先度制御も可能です
  messages.push(...(await queueConsumer.readMessagesFrom('myqueue:batch')))
  for (const message of messages) {
    void handleMessage(message)
  }
}

async readMessagesFrom(queueId: string) {
  return this.client.xReadGroup(..., [{ key: queueId, id: '>' }])
}

ラウンドロビンを取り入れるとより綺麗になります。

const queueIds = [
  'myqueue:realtime',
  'myqueue:realtime',
  'myqueue:batch'
]

let queueIndex = 0
while (true) {
  const queueId = queueIds[queueIndex]
  queueIndex = (queueIndex + 1) % queueIds.length

  const messages =  await queueConsumer.readMessagesFrom(queueId)
  for (const message of messages) {
    void handleMessage(queueId, message)
  }
}

応用編2: タイムアウト付き Graceful Shutdown

キューコンシューマーはシャットダウンすることがあります。デプロイや環境変数変更に伴うローリングアップデートの際に、kubelet がコンテナに SIGTERM を送るようなケースが挙げられます。SIGTERM 受信時点でコンシューマーはまだタスクを実行しているかもしれません。

そのままシャットダウンすればタスクが強制的に中断され失敗する可能性があります。リトライ機構により他のコンシューマーが再実行する見込みはあります。ただそれに要した時間だけタスクの完了が遅れてしまいます。これはリリース頻度を高める上での障害になり得ます。ホットフィックスの際に考慮すべき事項を増やすことにもなります。

この問題は、タイムアウト付きの Graceful Shutdown を導入することで解決できます。もちろん残存タスクがある状態でタイムアウトとなれば、そのタスクは中断されます。これは仕方ないものとして別のコンシューマーに引き継いでしまいます。

また、SIGTERM を受信しても、ブロッキング読み取りが終わるまではループから脱出できないことに留意してください。

const GRACEFUL_SHUTDOWN_TIMEOUT_MS = 300000 // 5分

const consumerStopper = new AbortController()

process.once('SIGTERM', () => consumerStopper.abort())

const { inFlightTasks } = createConcurrencyLimiter()
while (!consumerStopper.signal.aborted) {
  const messages = await queueConsumer.readMessages()
  for (const message of messages) {
    void handleMessage(message)
  }
}

async function waitForInFlightTasksOrTimeout() {
  const start = Date.now()
  while (Date.now() - start < GRACEFUL_SHUTDOWN_TIMEOUT_MS) {
    if (inFlightTasks() === 0) break
    await sleep(100)
  }
}

await waitForInFlightTasksOrTimeout()

応用編3: 指数バックオフ + Jitter

環境にもよりますが、もし複数のコンシューマーを稼働させる場合は、各コンシューマーの振る舞いがシンクロしないよう注意する必要があるかもしれません。固定間隔のポーリングがほぼ同期的に行われたり、デプロイ直後や障害復旧後にコマンドが同時に実行されたりすると、キューイングシステムへのアクセスがスパイクします。現実にこれは障害の原因になります。

まず、ポーリングの頻度は落とすことができそうです。キューが空の場合はブロッキング読み取りのタイムアウトを指数関数的に増加させるようにします。さらに、各コンシューマーの動作周期に揺らぎを与えることで、アクセスのタイミングを分散させます。このようなランダム性を付加する手法が Jitter です。

その一例として、以下では こちらの記事で紹介されている “Decorrelated Jitter” をそのまま実装しています。

const MIN_BLOCK_TIMEOUT_MS = 50
const MAX_BLOCK_TIMEOUT_MS = 1000

let blockTimeoutMs = MIN_BLOCK_TIMEOUT_MS

while (true) {
  const messages = await queueConsumer.readMessages({ blockTimeoutMs })
  if (messages.length === 0) {
    blockTimeoutMs = Math.min(MAX_BLOCK_TIMEOUT_MS, Math.floor(Math.random() * (blockTimeoutMs * 3 - MIN_BLOCK_TIMEOUT_MS) + MIN_BLOCK_TIMEOUT_MS))
    continue
  }

  blockTimeoutMs = MIN_BLOCK_TIMEOUT_MS

  for (const message of messages) {
    void handleMessage(message)
  }
}

本当にこれでいいのか?

応用編も含め、ここまでに取り上げた概念を総ざらいしたプログラムは以下のようになります。

並列・分散処理、安全なリトライ機構、メッセージ間の公平性・優先度制御、タイムアウト付きの Graceful Shutdown、指数バックオフの Jitter などによる、スケーラブルで高信頼性のキューコンシューマーに辿り着きました。

const GRACEFUL_SHUTDOWN_TIMEOUT_MS = 300000
const consumerStopper = new AbortController()

const queueIds = [
  'myqueue:realtime',
  'myqueue:realtime',
  'myqueue:batch'
]
const MYQUEUE_CONSUMER_GROUP_NAME = 'mq-group'
const consumerName = generateConsumerNameWith(`myqueue-${hostname()}`)
const queueConsumer = QueueConsumer.prepared(queueIds, MYQUEUE_CONSUMER_GROUP_NAME, consumerName)

const MAX_INFLIGHT_TASKS = 100
const { inFlightTasks, runTask } = createConcurrencyLimiter()

const BATCH_SIZE = 50

const MESSAGE_REDELIVERY_MIN_IDLE_MS = 60000
const queueLastPendingMessagesReadAtMs = new Map(queueIds.map(queueId => [queueId, Date.now()]))

const MAX_HANDLE_ATTEMPTS_PER_MESSAGE = 5

let queueIndex = 0

const MIN_BLOCK_TIMEOUT_MS = 50
const MAX_BLOCK_TIMEOUT_MS = 1000

const queueBlockTimeoutMs = new Map(queueIds.map(queueId => [queueId, MIN_BLOCK_TIMEOUT_MS]))

process.once('SIGTERM', () => consumerStopper.abort())

while (!consumerStopper.signal.aborted) {
  if (inFlightTasks() >= MAX_INFLIGHT_TASKS) {
    await sleep(50)
    continue
  }

  const queueId = queueIds[queueIndex]
  queueIndex = (queueIndex + 1) % queueIds.length

  const blockTimeoutMs = queueBlockTimeoutMs.get(queueId)!
  const batchSize = Math.min(BATCH_SIZE, MAX_INFLIGHT_TASKS - inFlightTasks())
  const messages = await queueConsumer.readMessagesFrom({ queueId, blockTimeoutMs, batchSize })
  if (messages.length > 0) {
    for (const message of messages) {
      runTask(async () => await handleMessage(queueId, message, consumerStopper.signal))
    }
    queueBlockTimeoutMs.set(queueId, MIN_BLOCK_TIMEOUT_MS)
  } else {
    queueBlockTimeoutMs.set(queueId, decorrelatedJitter(blockTimeoutMs, MIN_BLOCK_TIMEOUT_MS, MAX_BLOCK_TIMEOUT_MS))
  }

  const now = Date.now()
  if (now - queueLastPendingMessagesReadAtMs.get(queueId)! > MESSAGE_REDELIVERY_MIN_IDLE_MS) {
    const pendingBatchSize = Math.min(BATCH_SIZE, MAX_INFLIGHT_TASKS - inFlightTasks())
    const pendingMessages = await queueConsumer.readPendingMessagesFrom({ queueId, minIdleTimeMs: MESSAGE_REDELIVERY_MIN_IDLE_MS, batchSize: pendingBatchSize })
    queueLastPendingMessagesReadAtMs.set(queueId, now)
    for (const pendingMessage of pendingMessages) {
      const deliveriesCount = await getDeliveriesCountOf(queueId, pendingMessage.id)
      if (deliveriesCount <= MAX_HANDLE_ATTEMPTS_PER_MESSAGE) {
        runTask(async () => await handleMessage(queueId, pendingMessage, consumerStopper.signal))
      }
    }
  }
}

async function handleMessage(message, abortSignal) {
  let stopHeartbeat: (() => void) | undefined
  try {
    stopHeartbeat = startHeartbeat(queueId, message.id, abortSignal)
    await doTask()
    await settleMessage(queueId, message.id)
  } catch (err) {
    // 省略
  } finally {
    stopHeartbeat?.()
  }
}

const HEARTBEAT_INTERVAL_MS = MESSAGE_REDELIVERY_MIN_IDLE_MS / 2
function startHeartbeat(queueId: string, messageId: string, abortSignal: AbortSignal): () => void {
  const stopController = new AbortController()
  const stopSignal = AbortSignal.any([abortSignal, stopController.signal])
  const loop = async () => {
    while (!stopSignal.aborted) {
      await sleep(HEARTBEAT_INTERVAL_MS)
      const maxAttempts = 5
      const retryIntervalMs = 1000
      for (let attempts = 0; attempts < maxAttempts; attempts++) {
        try {
          await queueConsumer.keepAlive(queueId, messageId)
          break
        } catch (err) {
          // 省略
        }
      }
    }
  }
  void loop()
  return () => stopController.abort()
}

function decorrelatedJitter(current: number, min: number, max: number): number {
  return Math.min(max, Math.floor(Math.random() * (current * 3 - min) + min))
}

async function waitForInFlightTasksOrTimeout() {
  const start = Date.now()
  while (Date.now() - start < GRACEFUL_SHUTDOWN_TIMEOUT_MS) {
    if (inFlightTasks() === 0) break
    await sleep(100)
  }
}

await waitForInFlightTasksOrTimeout()

現実にプログラムをここまで改修して以降は、キューコンシューマーの障害は確認されていません。

それでも潜在的な課題がまだあります。例えばテナント間の公平性を担保する仕組みがありません。特定の種類のメッセージが他の種類のメッセージの処理を圧迫したように、テナント間でも同じことが起こり得ます。また、メッセージスキーマのバージョニングをしていないため、スキーマ変更時は運用やマイグレーションによるカバーが必要になります。

他にも、認識できていない様々な問題が潜んでいるはずです。今後も運用を経ながら洗練させていくことになると思います。

まとめ

スケーラブルで信頼性の高いキューコンシューマーを設計するには、以下の要素が欠かせませんでした。

一番に分散・並列処理でスループットを引き出す必要があります。そのために Fire-And-Forget、制限付きの並列性、コンシューマーグループを利用します。

次に、耐障害性を付加するためにリトライ機構を導入します。XAUTOCLAIM は補助的な操作として扱うこと、XCLAIM のハートビートで再配信を防ぐこと、リトライレベルの使い分けをすることが重要でした。

さらに、複数のキューとラウンドロビンを活用したメッセージ間の公平性・優先度制御や、タイムアウト付きの Graceful Shutdown、指数バックオフの Jitter を取り入れることで、より高い信頼性のあるキューコンシューマーを実現できます。

それでもテナント公平性やスキーマバージョニング、認識できていないであろう改善の余地が残されています。

あとがき

キューイングシステムに初めて触れたのは AI Shift への配属後、今年の5月でした。インターンや技術系のアルバイトといった商業サービスの開発・運用に関わる機会が今までに無かったこともあり、信頼性に関する知識に乏しい状態からの始まりでした。

当初は非同期タスク実行基盤と呼べるものは無く、既存機能を非同期化するところから始まりました。実行基盤はその過程で形成されてきたサービスです。自分の実装・運用ミスで何度も障害を引き起こし、大変なご迷惑をおかけしてきました。それにもかかわらず、いつも安心して挑戦できるようなお声がけやお気遣いをしていただき、とてもありがたかったです。

特にリトライ機構は、その中で洗練されていった機能の最たる例です。最初は毎ループ XAUTOCLAIM を実行し、コンシューマーとアプリケーションレベルのリトライを使い分けてもいませんでした。途中で導入した分散ロックは相性が良くなく多重実行の温床となりました。そのような運用で得た知見をもとに試行錯誤し、今のところは本番で死なないと言えるキューコンシューマーに辿り着くことができました。

採用情報

この間、よりシンプルで低廉でクールな先端の情報技術、あるいは成熟したそれに漸近しているようで、業務としてのプレッシャーはありつつもとても楽しかったです。認識に誤りがあるかもしれませんが、背伸びをすれば少なくとも開発における技術的競争力で世界が見えるような環境があると感じています。自由と自己責任、そして人をエンパワメントする文化のもと、自身の技術力や想像力を遺憾無く発揮できる瞬間があります。

日本から世界的な影響力のあるソフトウェア・ITサービスが生まれるとすれば、ここはその可能性がある場所の一つだと思います。あなたも、21世紀を代表する会社 を一緒に創りませんか。

【新卒採用】
「27卒エントリー受付中」
https://www.cyberagent.co.jp/careers/special/students/tech/

【リキャリア採用】
https://www.cyberagent.co.jp/careers/special/students/tech_potential/

【中途採用】
https://hrmos.co/pages/cyberagent-group

Special Thanks

トレーナー、開発チーム、AI Shift の皆さん、その他にも支えてくださっている方々に感謝しています。いつも沢山のサポートや温かいお言葉をありがとうございます。
また、本記事の公開に当たりデザインや採用情報の掲載等でご協力いただいた皆様も、ご多忙の中本当にありがとうございました。
そして、この記事を読んでくださった、目の前のあなたにも感謝を申し上げます。


明日の記事は全社データ技術局に所属している同期の與田さんで「dbt×Snowflakeで構築するマルチカタログなIceberg Materializationの実装🧊」です。


1. 一般的なキューイング方法では XADDstreamID* を指定するため、ID が自動的に採番されます。明示的に ID を指定することも可能ですが、その場合でもストリーム内に存在する最大の ID より大きい ID を指定する必要があります。そのためストリーム内の ID 自体は常に単調増加します。
2. この方法は Queue-Based Load Leveling Pattern として知られています。
3. 打ちっぱなし、発射しっぱなしというニュアンスがあります。プログラミング言語が異なりますが、これでも何故動くのかを理解するには Rust の Future について が参考になると思います。
4. https://developer.mozilla.org/ja/docs/Web/JavaScript/Reference/Operators/void