こんにちは、AI事業本部 オペレーションテクノロジーDiv.で「AdPos」というプロダクトのバックエンドエンジニア をしている細田来夢です。

この記事では、Django(およびDjangoRestFramework)で非同期処理を実現する方法として、AdPosでも利用しているCeleryとAmazon SQSを使った設定内容を紹介します!
「この設定でとりあえず動く状態」をゴールにして、初心者でも迷わないよう丁寧に解説していますので、ぜひ参考にしてください!

1. はじめに: 非同期処理の基本とこの記事のゴール

非同期処理は、アプリケーションのパフォーマンスを向上させるための重要な技術です。
特に、DjangoのようなWebフレームワークでは、非同期処理を導入することで、ユーザーのリクエストに対する応答時間を短縮し、システム全体の効率を向上させることができます。

この記事を読むメリット

初心者でもDjango(DjangoRestFramework)で非同期処理をCeleryとSQSを使って実現するための設定内容がわかります
CeleryはPythonで広く使われている非同期ライブラリで、SQS(Simple Queue Service)はAWSが提供するメッセージキューサービスです。
これらを組み合わせることで、スケーラブルで信頼性の高い非同期処理を実現できます。

この記事の対象読者

  • Django(DjangoRestFramework)で非同期処理を実現したいと考えている方
  • CeleryとSQSを使う方法をすでに決めているが、具体的な設定方法が分からない方
  • SQSとCeleryの初心者で、まずは動く環境を構築したい方

この記事で書かないこと

  • そもそもPythonで非同期処理を実現する手法・ライブラリには何があるか、どのようなケースでCeleryやSQSを使うべきか
  • SQS以外のブローカーを使った方法(例: RedisやRabbitMQ)
  • Django・DjangoRestFrameworkの環境構築、設定方法
  • Celeryの仕組みを深掘りする解説
  • 本番のAWS環境の構築とデプロイ方法

この記事のゴール

CeleryとSQSを使った非同期処理を、Djangoプロジェクト上で構築し、ローカルで動作させること
具体的な設定手順を通じて、Djangoプロジェクトに非同期処理を組み込む方法を学びます

この記事の流れ

  1. 前提となる技術知識を解説
  2. 【基礎編】DjangoプロジェクトでCeleryの設定を行い、非同期処理が最低限動作する状態を構築する
  3. 【発展編】ローカル環境での開発効率を高めるために、localstackを使ってローカルでSQSを動かす状態を構築する
  4. 【応用編】本番運用のtipsや注意点、Celeryのチューニング方法などを紹介

2. 前提となる技術知識

非同期処理のかんたん用語解説とイメージ

用語 説明 イメージ
メッセージ システム間でやりとりする情報の媒体 手紙
タスク 非同期で実行したい処理やデータの内容。メッセージにはどのタスクを実行するかの情報が書いてある 手紙に書いてある文章
キュー メッセージを貯める入れ物 手紙を入れる筒
ブローカー キューの管理システム。キューにメッセージを入れたり、キューに入っているメッセージをワーカーに渡したりする(ex SQS、Redis、RabbitMQなど) 差出人から手紙を受け取って、一旦筒に入れておき、受取人(ワーカー)が現れたら筒から手紙を出して渡す郵便局員さん
プロデューサー ブローカーにメッセージを渡すシステム ワーカー宛に手紙を書いて、郵便局員さんに渡す人
ワーカー ブローカーからメッセージを受取り、メッセージの内容に従って具体的な処理を行うシステム 郵便局員さんから手紙を受け取って、手紙を読み、その内容に従って何かをする人

非同期処理の流れのイメージ

システムAからシステムBへキューを介して情報を送り、システムBがその情報を受信して非同期処理を実行する

  1. システムA「メッセージをキューに入れたい」:キューへメッセージを送信する=プロデューサー
  2. システムB「キューに入っているメッセージを取得したい」:キューからメッセージを取得する(「受信する」といいます)=ワーカー
    • POINT
      • ただメッセージを受信するだけではメッセージはなくならない
      • メッセージが不要になったらメッセージを明示的に削除する必要がある
  3. システムB:受信したメッセージの内容は確認できたので、キューに残っているメッセージはもういらない→メッセージを削除する

*システムBが3でメッセージを削除しなければ、システムBは同じメッセージを何度も受信することになる
*メッセージが受信された回数はSQSがカウントしている
→「受信回数が一定数をこえたら、デッドレターキューへメッセージを移動させる」という設定もできる:Amazon SQS Dead Letter Queueの挙動を確認したメモ

SQSって何?

  • SQSとは、AWSが提供するメッセージングサービスです(「Simple Queue Service」の略):AWS SQS
  • 上記の「ブローカー」の1種
  • SQSのキューにはFIFOキューと標準キューの2つがあります
    • 厳格さを求めるならFIFOキューなので本記事ではFIFOキューを使用します
  • この2つのキューの違いは、いくつかありますが、ここでは「メッセージを取り出すときに、入れた順番を守るかどうか」と「メッセージが1度しか配信されないか」について理解しておきましょう
    • FIFOキュー
      • メッセージを入れた順に取り出します:筒に玉A,玉B,玉Cを入れたら、素直にそのまま玉A,玉B,玉Cの順に玉が出てきます
      • 1回しか配信されません(何度も配信されてしまう心配がありません)
    • 標準キュー
      • メッセージを入れた順番通りに取り出すことを保証しません(玉A,玉B,玉Cの順で入れたら、玉A,玉B,玉Cの順で出てくるとは限らない)
      • メッセージは少なくとも 1回は確実に配信されますが、1回以上配信されてしまうこともあります

Celeryとは

  • Celeryとは、Pythonの非同期タスクキューライブラリです
  • Celeryは、上記の「プロデューサー」「ワーカー」の役割を果たしてくれます
    • プロデューサー:delay()apply_async()といったメソッドを使って、メッセージをブローカー(SQSなど)に送信します
    • ワーカー:Celeryワーカーの常駐プロセスがキューを監視し、キューにメッセージが入ったら即座にメッセージを受信してタスクを実行します

Celeryを使うと何がうれしいか

Celeryを使うと、上記の「メッセージの送信、受信、削除」といった処理が簡単にできること、これが嬉しいポイントです
「Celeryを使わないケース=boto3(AWSのサービスをPythonで操作するためのライブラリ)を使うケース」と比較するとわかりやすいので見てみましょう。

項目 Celeryを使用する場合 boto3を使用する場合
キューへの送信 delay() や apply_async() といったcelery独自メソッドで簡単にメッセージをブローカーに送信可能 boto3 の send_message を使用してキューにメッセージを送信する処理を自前で用意する必要がある
ワーカーによるメッセージ受信・タスクの実行 Celeryワーカーがキューを監視し、自動でメッセージを受信し、タスクを実行 ワーカーを自分で設計・実装する必要あり(例: メッセージポーリングの仕組みを自前で作る)
メッセージの削除 タスク完了後、Celeryワーカーが自動でメッセージを削除 タスク完了後、明示的に deleteメソッド を使用してメッセージを削除する必要あり
最適化の容易さ ワーカーのプロセス数やポーリング間隔、メッセージ削除のタイミングなど、専用の設定値を設定・変更するだけでメッセージの処理を簡単に最適化できる メッセージ処理全体を自分で管理する必要がある

3. 【基礎編】CeleryとSQSの基本設定

ここからのセクションでは、DjangoプロジェクトでCeleryとSQSを使用するための基本設定の大枠について説明します。

まずは、プロジェクトの簡単なディレクトリ構造図を示し、主要ファイルの位置を記載します。
以下のディレクトリ構造図は、この記事のゴールを達成するために必要なファイルのみを示しています。

project_root/
│
├── apps/
│ ├── hoge_app/
│ │ └── tasks.py
│ └── ...
│
├── config/
│ ├── settings/
│ │ └── base.py
│ │ └── local.py
│ │ └── prd.py
│ └── celery.py
│
├── docker/
│ ├── app/
│ │ └── Dockerfile
│ └── worker/
│   └── Dockerfile
│   └── entrypoint.sh
└── docker-compose.yml
└── docker-compose.local.yml
└── poetry.lock
└── pyproject.toml
└── .env

ここからは、上記のディレクトリ構造図をもとに、主要ファイルのざっくり紹介をします。

以下に各ファイルの設定や記載内容について、より詳しい説明を追記します。

pyproject.toml: インストールするパッケージを記載

pipやrequirements.txtでceleryをインストールしてもいいんですが、本番運用時には何かしらのパッケージ管理ツールを使うのが一般的かと思います。
今回はローカルとAWS環境でパッケージや設定を分けたいので、パッケージはpoetryを使ってインストールします。

poetryについて詳しくはこちらをご覧ください。

※ちなみにpipでインストールするなら→pip3 install celery[sqs]==5.2.4

celery = {version = "5.2.4", extras = ["sqs"]}

解説

  • celeryのバージョン5.2.4をインストールします。
    • 2024年12月時点でceleryは5.4.0がリリースされていますが、5.2.4以上のバージョンでは動作確認をしていないため、5.2.4を指定しています。
  • extras = ["sqs"]は、CeleryがAmazon SQSをブローカーとして使用するための追加機能をインストールするためのオプションです。
    • celeryは、ブローカーとしてSQS以外にもRedisやRabbitMQなどを使用できますが、今回はSQSを使用するためにextras = ["sqs"]を指定しています。

.env: 環境変数を定義するファイル

QUEUE_NAME=hoge-queue.fifo
QUEUE_URL="https://sqs.ap-northeast-1.amazonaws.com/11111111111/hoge-queue.fifo"

解説

  • QUEUE_NAMEは、使用するSQSキューの名前を指定します。.fifoはFIFO(First-In-First-Out)キューを示しています。

  • QUEUE_URLは、使用するSQSキューのURLを指定します。

    上記2つの値はAWSのSQSコンソール画面で確認してセットしてください。

config/settings/base.py: CeleryとSQSの基本設定を含むファイル

import os

... # Django関係の諸々な必要な設定を記載

QUEUE_NAME = os.getenv("QUEUE_NAME")
QUEUE_URL = os.getenv("QUEUE_URL")

CELERY_BROKER_URL = "sqs://"
CELERY_BROKER_TRANSPORT_OPTIONS = {
    "region": "ap-northeast-1",
    "polling_interval": 20,
    "predefined_queues": {
        QUEUE_NAME: {
            "url": QUEUE_URL,
        },
    },
}
CELERY_TASK_ACKS_LATE = True

解説

  • CELERY_BROKER_URLは、ブローカーのURLを指定します。
    • sqs://というURLを指定することで、SQSをブローカーとして指定できます
  • CELERY_BROKER_TRANSPORT_OPTIONSは、SQSの設定オプションを指定します
    • regionはAWSリージョン
    • polling_intervalはポーリングの間隔
    • predefined_queuesは、キューの設定を指定します
      • QUEUE_NAMEは、使用するSQSキューの名前を指定します
      • urlは、使用するSQSキューのURLを指定します
  • CELERY_TASK_ACKS_LATE = Trueは、「タスクが完了した後のタイミングでワーカーがメッセージを削除する設」という設定です。

config/celery.py: Celeryの設定を行うファイル

from __future__ import absolute_import, unicode_literals
from celery import Celery

app = Celery("config")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()

解説

  • このcelery.pyファイルを設置しておくことで、本システム内でCeleryを使用できるようになります
    • Celeryインスタンスを作成し、Djangoの設定からCeleryの設定を読み込みます。
  • Celery("config")では、このcelery.pyファイルの名前空間を指定します(「configディレクトリに設置しているよ」という意味)
  • app.autodiscover_tasks()は、Djangoアプリケーション内のtasks.pyファイルを自動的に検出し、タスクを登録します。

ここまでで、Celeryの基本設定は完了です!

ここからは、SQSへメッセージを送信する「プロデューサー」と、SQSからメッセージを受信して非同期処理を行う「ワーカー」の設定を行います。

tasks.py: 非同期処理の「タスク」を定義するファイル

from celery import shared_task
from django.conf import settings # 環境変数を読み込むために必要
from apps.hoge_app.models import HogeModel

@shared_task(queue=QUEUE_NAME)
def insert_hoge_task(x: int, y: int) -> None:
    result = x + y
    HogeModel.objects.create(hoge=result)

解説

  • 任意の関数に@shared_taskデコレーターをつけると、その関数が「タスク」としてCeleryに認識・登録されます
  • この「タスク」関数こそが非同期処理として実行される具体的な処理内容になります
  • queue=QUEUE_NAME:どのキューからメッセージを受信するかを指定します
  • 関数名や関数の中身は何でもOKです
    • insert_hoge_task関数は、2つの数値を加算してデータベースに保存するシンプルなタスクの例です

SQSへメッセージを投げる(プロデューサー): 任意のファイル内に以下を記載

import uuid
from django.conf import settings
from apps.hoge_app.tasks import insert_hoge_task
# apply_asyncはインポート不要

def send_message_to_sqs() -> None:
  insert_hoge_task.apply_async(
      kwargs={"x": 10, "y": 20},
      queue=settings.QUEUE_NAME,
      **{"MessageGroupId": str(uuid.uuid4()), "MessageDeduplicationId": str(uuid.uuid4())}
  )
  # delayというメソッドもありますが、fifoキューを使う場合はapply_asyncを使うことを推奨します

解説

  • apply_asyncは、タスクを非同期に実行するためのceleryのメソッドで、tasks.pyで定義したタスク関数名にapply_asyncメソッドをつけるだけで(ex insert_hoge_task.apply_async())、SQSへ簡単にメッセージを送信できます
  • kwargsは、タスクに渡す引数を指定します
    • xyは、insert_hoge_task関数に渡す引数です
    • argsで、タスクに渡す引数をリストで指定することも可能:ex: args=[10, 20]
  • queueは、「どのキューへメッセージを送信するか」をキュー名で指定します
  • MessageGroupIdMessageDeduplicationIdは、fifoキューを使う場合に使用します(任意項目)
    • MessageGroupIdは、メッセージをグループ分けするためのIDです
      • 同じMessageGroupIdを持つメッセージは、送信された順に処理されます(詳しくはこちら
    • MessageDeduplicationIdは、メッセージの重複排除を行うためのIDです
      • SQSは5分の間に同じMessageDeduplicationIdを持つメッセージが2回以上送られてきたら、重複とみなして2回目以降は処理しないようにします
      • このIDをメッセージごとに一意に設定することで、万が一、同じメッセージを2回送信してしまった場合でも、重複を防ぐことができます

まとめると、このsend_message_to_sqs関数を実行することで、

  • SQSのhoge-queue.fifoキューへメッセージが送信されます
  • メッセージには「insert_hoge_taskタスクを実行せよ。引数はx=10, y=20」という内容が含まれています

docker/worker/Dockerfile: workerイメージの設定ファイル

# Celeryワーカーを起動
ENTRYPOINT ["/entrypoint.sh"]

解説

  • ENTRYPOINTは、Dockerコンテナが起動した際に実行されるスクリプトを指定します
  • ENTRYPOINT ["/entrypoint.sh"]docker/worker/entrypoint.shを実行しています

docker/worker/entrypoint.sh: workerイメージのエントリーポイントスクリプト

#!/bin/sh

exec celery -A config worker -l INFO -Q "$QUEUE_NAME"

解説

  • exec celery -A config worker -l INFO -Q "$QUEUE_NAME"で、Celeryワーカーを起動し、configディレクトリに設置したcelery.pyファイルを使用して設定を読み込みます
  • -l INFOは、ログレベルをINFOに設定します
  • -Q "$QUEUE_NAME"は、「どのキューからメッセージを受信するか」を指定しています
    • このQUEUE_NAMEは、.envファイルで定義した環境変数の値を使用しています
    • -Q の後に複数のキュー名を指定することで、複数のキューからメッセージを受信することも可能です

docker-compose.yml: コンテナを起動するための設定ファイル

version: "3"
services:

  ... # appコンテナの設定

  worker:
    image: hoge-worker
    build:
      context: .
      dockerfile: ./docker/worker/Dockerfile
      target: production
    volumes:
      - .:/usr/local/worker
      - ~/.aws:/root/.aws:ro

解説

  • volumes/.:/usr/local/worker
    • ローカルのworkerディレクトリをコンテナ内にマウントしています
    • これにより、ローカルでの変更がコンテナ内に反映されるようになり、ローカルでの開発が行えるようになります
  • volumes/~/.aws:/root/.aws:ro
    • ローカルの~/.awsディレクトリをコンテナ内にマウントしています
    • これにより、AWSの認証情報がコンテナ内でも使用できるようになり、SQSへのアクセスができるようになります

docker-compose.local.yml: ローカル環境でコンテナを起動するための設定ファイル

version: "3"
services:

  ... # worker以外のコンテナの設定

  worker:
    platform: linux/x86_64
    build:
      target: development

解説

  • platform: linux/x86_64
    • M1チップのMacでもコンテナを動かせるようにするために指定
  • build: target: development
    • poetryでdevelopment環境用のパッケージをインストールするために指定

4.コンテナを起動し、非同期処理を実行する

ここまでで、CeleryワーカーとDjangoアプリケーションを起動し、非同期処理を実行する最低限の準備は整いました。
さあ、Docker Composeを使用してCeleryワーカーとDjangoアプリケーションを起動し、非同期処理を試してみましょう。

手順

  1. AWSの認証情報を設定する
    ローカルからSQSを操作するためには、AWSの認証情報が必要です。
    具体的にはローカルの~/.awsディレクトリにcredentialsファイルを作成し、以下の内容を記載する必要があります。

    [default]
    aws_access_key_id = YOUR_ACCESS_KEY
    aws_secret_access_key = YOUR_SECRET_KEY
    

    AWSのアクセスキーの具体的な作成方法はこちらを参考にしてみてください。
    [初心者向け] IAMユーザーにアクセスキーを払い出す手順を確認してみた | DevelopersIO

  2. Docker Composeでコンテナを起動する

    以下のコマンドを使用して、Docker Composeでコンテナを起動します。

    docker-compose -f docker-compose.yml build
    docker-compose -f docker-compose.yml up -d
    
  3. コンテナの状態を確認する

    コンテナが正しく起動しているかを確認するために、以下のコマンドを実行します。

    docker-compose -f docker-compose.yml ps
    

    このコマンドは、現在実行中のコンテナのリストを表示します。
    appworkerのステータスがUpであることを確認してください。

  4. ログを確認する

    CeleryワーカーやDjangoアプリケーションのログを確認することで、非同期処理が正しく実行されているかを確認できます。
    以下のコマンドを使用して、ログを確認します。

    docker-compose -f docker-compose.yml logs -f worker
    
    docker-compose -f docker-compose.yml logs -f app
    

    -fオプションを使用することで、リアルタイムでログを追跡できます。

  5. 非同期タスクを実行する

    Djangoアプリケーション内で非同期タスクを実行するために、適切なエンドポイントを呼び出すか、管理画面からタスクをトリガーします。

    タスクが正常にキューに追加され、Celeryワーカーによって処理が実行される
    workerコンテナのログを確認して、タスクが正常にキューに追加され、Celeryワーカーによって処理が実行されることを確認してください。

これで、Docker Composeを使用してコンテナを起動し、非同期処理を実行する準備が整いました。
これらの手順を通じて、Djangoプロジェクトでの非同期処理の実行を確認できます。

5. ローカル環境でSQSを模擬する

さて、ここまででとりあえずAWS環境のSQSを使って、Celeryの非同期処理を動かす設定はできました。

しかし、開発効率を上げるためにも、SQSも含めて全て各自のローカル環境で完結させる方が望ましいでしょう。

なので、ここからは「localstack」というツールを使って、ローカル環境でSQSをエミュレートするための設定を行います。

設定ファイルを少し変更する

【変更点】

  1. dockerディレクトリにlocalstack/start.shを作成します
  2. docker-compose.local.ymlにlocalstackの設定を追加します
  3. settings/local.pyにlocalstackの設定を追加します

※「3.【基礎編】CeleryとSQSの基本設定」の時点でlocalstackを使用しておいてももちろん良いのですが、本記事では説明をわかりやすくするためにここでlocalstackの設定を追加しています。

project_root/
│
├── apps/
│ ├── hoge_app/
│ │ └── tasks.py
│ └── ...
│
├── config/
│ ├── settings/
│ │ └── base.py
│ │ └── local.py
│ │ └── prd.py
│ └── celery.py
│
├── docker/
│ ├── app/
│ │ └── Dockerfile
│ └── worker/
│ │ └── Dockerfile
│ │ └── entrypoint.sh
│ └── localstack/ # localstackのディレクトリ
│  └── start.sh # localstackの起動スクリプト
└── docker-compose.yml
└── docker-compose.local.yml
└── poetry.lock
└── pyproject.toml
└── .env

ここから設定ファイルの内容を解説していきますが、その前にlocalstackについて少し解説します。

localstackとは

localstackは、AWSサービスをローカルで模擬するためのエミュレータツールで、開発やテストに便利です。
localstack公式サイトはこちら:localstack
Dockerイメージが提供されているので、Dockerをインストールしていればすぐに使えます。
今回は、localstackを使用してSQSをエミュレートします。

.env: ローカルで環境変数を定義するファイル

QUEUE_NAMEとQUEUE_URLの値を変更します。

QUEUE_NAME=local-queue.fifo
QUEUE_URL="http://localstack:4566/000000000000/local-queue.fifo"

解説

  • QUEUE_NAMEはローカル環境でのSQSキューの名前を指定します
    • 任意の名前にしてOKですが、.fifoはつけてください
    • この名称のキューがlocalstack上に作成されます
  • QUEUE_URLは、ローカル環境でのSQSキューのURLを指定します
    • http://localstack:4566/000000000000/がlocalstackのデフォルトのURLですので、このまま指定してください
    • QUEUE_NAMEの値がlocal-queue.fifoなので、http://localstack:4566/000000000000/local-queue.fifoとなります

docker/localstack/start.sh: localstackの起動スクリプト

#!/bin/sh
awslocal sqs create-queue --queue-name ${QUEUE_NAME} --region ap-northeast-1 --attributes "FifoQueue=true,ContentBasedDeduplication=true,VisibilityTimeout=15"
  • 解説
    • awslocal sqs create-queu"
      • SQSキューを作成するためのコマンドです
    • --queue-name ${QUEUE_NAME}
      • QUEUE_NAMEは、.envファイルで定義した環境変数の値を使用しています
    • --region ap-northeast-1
      • リージョンを指定しています
    • --attributes "FifoQueue=true,ContentBasedDeduplication=true,VisibilityTimeout=15"
      • FIFOキューを作成するためのオプションです
      • FifoQueue=trueは、FIFOキューを作成するためのオプションです
      • ContentBasedDeduplication=trueは、メッセージ重複排除設定を有効にするためのオプションです
      • VisibilityTimeout=15は、メッセージの可視性タイムアウトを設定するためのオプションです(単位は秒)
        • ここの秒数は非同期タスクの実行時間よりも大きく設定してください

docker-compose.local.yml: ローカル環境でコンテナを起動するための設定ファイル

version: "3"
services:

  ... # worker以外のコンテナの設定

  worker:
    build:
      target: development
    depends_on: # 追加
      localstack:
        condition: service_healthy
  localstack: # 追加
    environment:
      QUEUE_NAME: ${QUEUE_NAME}
    image: localstack/localstack:3.7.0
    ports:
      - "4566:4566"
    volumes:
      - ./docker/localstack/start.sh:/etc/localstack/init/ready.d/start.sh

解説

  • workerコンテナ

    • depends_on/localstack: condition: service_healthy
      • localstackが起動していることを確認してからworkerを起動するようにしています
      • この設定により、localstackコンテナよりも先にworkerコンテナが起動して、workerがSQSを見つけられずにエラーになることを防ぐことができます
  • localstackコンテナ

    • environment/QUEUE_NAME: ${QUEUE_NAME}
      • QUEUE_NAMEは、.envファイルで定義した環境変数の値を参照します
      • start.sh内でQUEUE_NAMEを使用するために環境変数として渡しています
    • image: localstack/localstack:3.7.0
      • localstackのイメージを指定しています
    • ports/4566:4566
      • localstackのポートをローカルのポートにマッピングしています
    • volumes/./docker/localstack/start.sh:/etc/localstack/init/ready.d/start.sh
      • start.shはlocalstackの起動スクリプトです
      • ローカルのdocker/localstack/start.shをコンテナ内の/etc/localstack/init/ready.d/start.shにマウントしています
      • これにより、localstackコンテナの起動時にstart.shが実行され、SQSが作成されるようになります

config/settings/local.py: ローカル環境での設定

import os

from .base import *  # noqa

... # Django関係の諸々な必要な設定を記載

CELERY_BROKER_URL = "sqs://{aws_access_key}:{aws_secret_key}@localstack:4566".format(
    aws_access_key=safequote("hoge", safe=""),
    aws_secret_key=safequote("huge", safe=""),
)

解説

  • base.pyで定義していたCELERY_BROKER_URLを、local.pyで上書きしています
  • sqs://{aws_access_key}:{aws_secret_key}@localstack:4566は、localstackのSQSのURLを指定しています
    • localstackはlocalstackのコンテナ名です
    • 4566はlocalstackのデフォルトのポートです
    • {aws_access_key}{aws_secret_key}は、AWSの認証情報ですが、localstackの場合はダミーの値を使用しています

ローカル環境でコンテナを起動する

ここまでの設定を行ったら、
docker compose builddocker compose -f docker-compose.local.yml up -d
でローカル環境でコンテナを起動します。

以下のコマンドでlocalstackのコンテナも起動していることが確認できると思います。

docker-compose -f docker-compose.local.yml ps

あとは非同期処理が実行されるように、Djangoアプリケーション内で非同期タスクを実行し、ログで非同期タスクが実行されていることを確認できれば、ローカル環境での非同期処理の設定はひとまず完了です!

6. 応用編 もっとCeleryを活用する

ここまで、CeleryとSQSを使った非同期処理の基本の設定・実行方法について解説しました。

ここからは、これらの設定を応用して、より複雑な非同期処理を実現する方法について解説します。

※追記予定

複数のworkerを起動したい

アプリごとに非同期タスクを作成したい

delayとapply_asyncの違いと使い分け方

delayとapply_asyncの違い

SQSのメッセージには何が入っているの?

workerのプロセス数を調整したい

メッセージサイズが大きすぎてSQSの上限をオーバーしてしまう

効率的にタスクを処理したい

重たいタスクをバックグラウンドで実行したい

ユニットテストの注意点は?

AWSのサービス構成はどうする?

本番での耐障害性を高くするには

デプロイ時にバグを生むケース

まとめ

この記事では、DjangoプロジェクトにおけるCeleryとSQSを活用した非同期タスクの設定と実行方法について詳しく解説しました。
この記事を通じて、Djangoプロジェクトでの非同期処理の基本的な設定と実行方法を理解していただけたと思います。
非同期タスクを活用することで、アプリケーションのスケーラビリティと効率性を大幅に向上させることができます。今後のプロジェクトで、これらの知識を活用して、より高度な非同期処理を実現してください。