こんにちは、AI事業本部 オペレーションテクノロジーDiv.で「AdPos」というプロダクトのバックエンドエンジニア をしている細田来夢です。
この記事では、Django(およびDjangoRestFramework)で非同期処理を実現する方法として、AdPosでも利用しているCeleryとAmazon SQSを使った設定内容を紹介します!
「この設定でとりあえず動く状態」をゴールにして、初心者でも迷わないよう丁寧に解説していますので、ぜひ参考にしてください!
1. はじめに: 非同期処理の基本とこの記事のゴール
非同期処理は、アプリケーションのパフォーマンスを向上させるための重要な技術です。
特に、DjangoのようなWebフレームワークでは、非同期処理を導入することで、ユーザーのリクエストに対する応答時間を短縮し、システム全体の効率を向上させることができます。
この記事を読むメリット
初心者でもDjango(DjangoRestFramework)で非同期処理をCeleryとSQSを使って実現するための設定内容がわかります
CeleryはPythonで広く使われている非同期ライブラリで、SQS(Simple Queue Service)はAWSが提供するメッセージキューサービスです。
これらを組み合わせることで、スケーラブルで信頼性の高い非同期処理を実現できます。
この記事の対象読者
- Django(DjangoRestFramework)で非同期処理を実現したいと考えている方
- CeleryとSQSを使う方法をすでに決めているが、具体的な設定方法が分からない方
- SQSとCeleryの初心者で、まずは動く環境を構築したい方
この記事で書かないこと
- そもそもPythonで非同期処理を実現する手法・ライブラリには何があるか、どのようなケースでCeleryやSQSを使うべきか
- SQS以外のブローカーを使った方法(例: RedisやRabbitMQ)
- Django・DjangoRestFrameworkの環境構築、設定方法
- Celeryの仕組みを深掘りする解説
- 本番のAWS環境の構築とデプロイ方法
この記事のゴール
CeleryとSQSを使った非同期処理を、Djangoプロジェクト上で構築し、ローカルで動作させること
具体的な設定手順を通じて、Djangoプロジェクトに非同期処理を組み込む方法を学びます
この記事の流れ
- 前提となる技術知識を解説
- 【基礎編】DjangoプロジェクトでCeleryの設定を行い、非同期処理が最低限動作する状態を構築する
- 【発展編】ローカル環境での開発効率を高めるために、localstackを使ってローカルでSQSを動かす状態を構築する
- 【応用編】本番運用のtipsや注意点、Celeryのチューニング方法などを紹介
2. 前提となる技術知識
非同期処理のかんたん用語解説とイメージ
用語 | 説明 | イメージ |
---|---|---|
メッセージ | システム間でやりとりする情報の媒体 | 手紙 |
タスク | 非同期で実行したい処理やデータの内容。メッセージにはどのタスクを実行するかの情報が書いてある | 手紙に書いてある文章 |
キュー | メッセージを貯める入れ物 | 手紙を入れる筒 |
ブローカー | キューの管理システム。キューにメッセージを入れたり、キューに入っているメッセージをワーカーに渡したりする(ex SQS、Redis、RabbitMQなど) | 差出人から手紙を受け取って、一旦筒に入れておき、受取人(ワーカー)が現れたら筒から手紙を出して渡す郵便局員さん |
プロデューサー | ブローカーにメッセージを渡すシステム | ワーカー宛に手紙を書いて、郵便局員さんに渡す人 |
ワーカー | ブローカーからメッセージを受取り、メッセージの内容に従って具体的な処理を行うシステム | 郵便局員さんから手紙を受け取って、手紙を読み、その内容に従って何かをする人 |
非同期処理の流れのイメージ
システムAからシステムBへキューを介して情報を送り、システムBがその情報を受信して非同期処理を実行する
- システムA「メッセージをキューに入れたい」:キューへメッセージを送信する=プロデューサー
- システムB「キューに入っているメッセージを取得したい」:キューからメッセージを取得する(「受信する」といいます)=ワーカー
- POINT
- ただメッセージを受信するだけではメッセージはなくならない
- メッセージが不要になったらメッセージを明示的に削除する必要がある
- POINT
- システムB:受信したメッセージの内容は確認できたので、キューに残っているメッセージはもういらない→メッセージを削除する
*システムBが3でメッセージを削除しなければ、システムBは同じメッセージを何度も受信することになる
*メッセージが受信された回数はSQSがカウントしている
→「受信回数が一定数をこえたら、デッドレターキューへメッセージを移動させる」という設定もできる:Amazon SQS Dead Letter Queueの挙動を確認したメモ
SQSって何?
- SQSとは、AWSが提供するメッセージングサービスです(「Simple Queue Service」の略):AWS SQS
- 上記の「ブローカー」の1種
- SQSのキューにはFIFOキューと標準キューの2つがあります
- 厳格さを求めるならFIFOキューなので本記事ではFIFOキューを使用します
- この2つのキューの違いは、いくつかありますが、ここでは「メッセージを取り出すときに、入れた順番を守るかどうか」と「メッセージが1度しか配信されないか」について理解しておきましょう
- FIFOキュー
- メッセージを入れた順に取り出します:筒に玉A,玉B,玉Cを入れたら、素直にそのまま玉A,玉B,玉Cの順に玉が出てきます
- 1回しか配信されません(何度も配信されてしまう心配がありません)
- 標準キュー
- メッセージを入れた順番通りに取り出すことを保証しません(玉A,玉B,玉Cの順で入れたら、玉A,玉B,玉Cの順で出てくるとは限らない)
- メッセージは少なくとも 1回は確実に配信されますが、1回以上配信されてしまうこともあります
- FIFOキュー
Celeryとは
- Celeryとは、Pythonの非同期タスクキューライブラリです
- Celeryは、上記の「プロデューサー」「ワーカー」の役割を果たしてくれます
- プロデューサー:
delay()
やapply_async()
といったメソッドを使って、メッセージをブローカー(SQSなど)に送信します - ワーカー:Celeryワーカーの常駐プロセスがキューを監視し、キューにメッセージが入ったら即座にメッセージを受信してタスクを実行します
- プロデューサー:
Celeryを使うと何がうれしいか
Celeryを使うと、上記の「メッセージの送信、受信、削除」といった処理が簡単にできること、これが嬉しいポイントです
「Celeryを使わないケース=boto3(AWSのサービスをPythonで操作するためのライブラリ)を使うケース」と比較するとわかりやすいので見てみましょう。
項目 | Celeryを使用する場合 | boto3を使用する場合 |
---|---|---|
キューへの送信 | delay() や apply_async() といったcelery独自メソッドで簡単にメッセージをブローカーに送信可能 |
boto3 の send_message を使用してキューにメッセージを送信する処理を自前で用意する必要がある |
ワーカーによるメッセージ受信・タスクの実行 | Celeryワーカーがキューを監視し、自動でメッセージを受信し、タスクを実行 | ワーカーを自分で設計・実装する必要あり(例: メッセージポーリングの仕組みを自前で作る) |
メッセージの削除 | タスク完了後、Celeryワーカーが自動でメッセージを削除 | タスク完了後、明示的に delete メソッド を使用してメッセージを削除する必要あり |
最適化の容易さ | ワーカーのプロセス数やポーリング間隔、メッセージ削除のタイミングなど、専用の設定値を設定・変更するだけでメッセージの処理を簡単に最適化できる | メッセージ処理全体を自分で管理する必要がある |
3. 【基礎編】CeleryとSQSの基本設定
ここからのセクションでは、DjangoプロジェクトでCeleryとSQSを使用するための基本設定の大枠について説明します。
まずは、プロジェクトの簡単なディレクトリ構造図を示し、主要ファイルの位置を記載します。
以下のディレクトリ構造図は、この記事のゴールを達成するために必要なファイルのみを示しています。
project_root/
│
├── apps/
│ ├── hoge_app/
│ │ └── tasks.py
│ └── ...
│
├── config/
│ ├── settings/
│ │ └── base.py
│ │ └── local.py
│ │ └── prd.py
│ └── celery.py
│
├── docker/
│ ├── app/
│ │ └── Dockerfile
│ └── worker/
│ └── Dockerfile
│ └── entrypoint.sh
└── docker-compose.yml
└── docker-compose.local.yml
└── poetry.lock
└── pyproject.toml
└── .env
ここからは、上記のディレクトリ構造図をもとに、主要ファイルのざっくり紹介をします。
以下に各ファイルの設定や記載内容について、より詳しい説明を追記します。
pyproject.toml: インストールするパッケージを記載
pipやrequirements.txtでceleryをインストールしてもいいんですが、本番運用時には何かしらのパッケージ管理ツールを使うのが一般的かと思います。
今回はローカルとAWS環境でパッケージや設定を分けたいので、パッケージはpoetry
を使ってインストールします。
poetryについて詳しくはこちらをご覧ください。
※ちなみにpipでインストールするなら→pip3 install celery[sqs]==5.2.4
celery = {version = "5.2.4", extras = ["sqs"]}
解説
celery
のバージョン5.2.4をインストールします。- 2024年12月時点で
celery
は5.4.0がリリースされていますが、5.2.4以上のバージョンでは動作確認をしていないため、5.2.4を指定しています。
- 2024年12月時点で
extras = ["sqs"]
は、CeleryがAmazon SQSをブローカーとして使用するための追加機能をインストールするためのオプションです。- celeryは、ブローカーとしてSQS以外にもRedisやRabbitMQなどを使用できますが、今回はSQSを使用するために
extras = ["sqs"]
を指定しています。
- celeryは、ブローカーとしてSQS以外にもRedisやRabbitMQなどを使用できますが、今回はSQSを使用するために
.env: 環境変数を定義するファイル
QUEUE_NAME=hoge-queue.fifo
QUEUE_URL="https://sqs.ap-northeast-1.amazonaws.com/11111111111/hoge-queue.fifo"
解説
-
QUEUE_NAME
は、使用するSQSキューの名前を指定します。.fifo
はFIFO(First-In-First-Out)キューを示しています。 -
QUEUE_URL
は、使用するSQSキューのURLを指定します。上記2つの値はAWSのSQSコンソール画面で確認してセットしてください。
config/settings/base.py: CeleryとSQSの基本設定を含むファイル
import os
... # Django関係の諸々な必要な設定を記載
QUEUE_NAME = os.getenv("QUEUE_NAME")
QUEUE_URL = os.getenv("QUEUE_URL")
CELERY_BROKER_URL = "sqs://"
CELERY_BROKER_TRANSPORT_OPTIONS = {
"region": "ap-northeast-1",
"polling_interval": 20,
"predefined_queues": {
QUEUE_NAME: {
"url": QUEUE_URL,
},
},
}
CELERY_TASK_ACKS_LATE = True
解説
CELERY_BROKER_URL
は、ブローカーのURLを指定します。sqs://
というURLを指定することで、SQSをブローカーとして指定できます
CELERY_BROKER_TRANSPORT_OPTIONS
は、SQSの設定オプションを指定しますregion
はAWSリージョンpolling_interval
はポーリングの間隔predefined_queues
は、キューの設定を指定しますQUEUE_NAME
は、使用するSQSキューの名前を指定しますurl
は、使用するSQSキューのURLを指定します
CELERY_TASK_ACKS_LATE = True
は、「タスクが完了した後のタイミングでワーカーがメッセージを削除する設」という設定です。
config/celery.py: Celeryの設定を行うファイル
from __future__ import absolute_import, unicode_literals
from celery import Celery
app = Celery("config")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()
解説
- この
celery.py
ファイルを設置しておくことで、本システム内でCeleryを使用できるようになりますCelery
インスタンスを作成し、Djangoの設定からCeleryの設定を読み込みます。
Celery("config")
では、このcelery.py
ファイルの名前空間を指定します(「config
ディレクトリに設置しているよ」という意味)app.autodiscover_tasks()
は、Djangoアプリケーション内のtasks.py
ファイルを自動的に検出し、タスクを登録します。
ここまでで、Celeryの基本設定は完了です!
ここからは、SQSへメッセージを送信する「プロデューサー」と、SQSからメッセージを受信して非同期処理を行う「ワーカー」の設定を行います。
tasks.py: 非同期処理の「タスク」を定義するファイル
from celery import shared_task
from django.conf import settings # 環境変数を読み込むために必要
from apps.hoge_app.models import HogeModel
@shared_task(queue=QUEUE_NAME)
def insert_hoge_task(x: int, y: int) -> None:
result = x + y
HogeModel.objects.create(hoge=result)
解説
- 任意の関数に
@shared_task
デコレーターをつけると、その関数が「タスク」としてCeleryに認識・登録されます - この「タスク」関数こそが非同期処理として実行される具体的な処理内容になります
queue=QUEUE_NAME
:どのキューからメッセージを受信するかを指定します- 関数名や関数の中身は何でもOKです
insert_hoge_task
関数は、2つの数値を加算してデータベースに保存するシンプルなタスクの例です
SQSへメッセージを投げる(プロデューサー): 任意のファイル内に以下を記載
import uuid
from django.conf import settings
from apps.hoge_app.tasks import insert_hoge_task
# apply_asyncはインポート不要
def send_message_to_sqs() -> None:
insert_hoge_task.apply_async(
kwargs={"x": 10, "y": 20},
queue=settings.QUEUE_NAME,
**{"MessageGroupId": str(uuid.uuid4()), "MessageDeduplicationId": str(uuid.uuid4())}
)
# delayというメソッドもありますが、fifoキューを使う場合はapply_asyncを使うことを推奨します
解説
apply_async
は、タスクを非同期に実行するためのceleryのメソッドで、tasks.pyで定義したタスク関数名にapply_async
メソッドをつけるだけで(exinsert_hoge_task.apply_async()
)、SQSへ簡単にメッセージを送信できますkwargs
は、タスクに渡す引数を指定しますx
とy
は、insert_hoge_task
関数に渡す引数ですargs
で、タスクに渡す引数をリストで指定することも可能:ex:args=[10, 20]
queue
は、「どのキューへメッセージを送信するか」をキュー名で指定しますMessageGroupId
とMessageDeduplicationId
は、fifoキューを使う場合に使用します(任意項目)MessageGroupId
は、メッセージをグループ分けするためのIDです- 同じMessageGroupIdを持つメッセージは、送信された順に処理されます(詳しくはこちら)
MessageDeduplicationId
は、メッセージの重複排除を行うためのIDです- SQSは5分の間に同じMessageDeduplicationIdを持つメッセージが2回以上送られてきたら、重複とみなして2回目以降は処理しないようにします
- このIDをメッセージごとに一意に設定することで、万が一、同じメッセージを2回送信してしまった場合でも、重複を防ぐことができます
まとめると、このsend_message_to_sqs
関数を実行することで、
- SQSの
hoge-queue.fifo
キューへメッセージが送信されます - メッセージには「
insert_hoge_task
タスクを実行せよ。引数はx=10, y=20
」という内容が含まれています
docker/worker/Dockerfile: workerイメージの設定ファイル
# Celeryワーカーを起動
ENTRYPOINT ["/entrypoint.sh"]
解説
ENTRYPOINT
は、Dockerコンテナが起動した際に実行されるスクリプトを指定しますENTRYPOINT ["/entrypoint.sh"]
でdocker/worker/entrypoint.sh
を実行しています
docker/worker/entrypoint.sh: workerイメージのエントリーポイントスクリプト
#!/bin/sh
exec celery -A config worker -l INFO -Q "$QUEUE_NAME"
解説
exec celery -A config worker -l INFO -Q "$QUEUE_NAME"
で、Celeryワーカーを起動し、config
ディレクトリに設置したcelery.py
ファイルを使用して設定を読み込みます-l INFO
は、ログレベルをINFO
に設定します-Q "$QUEUE_NAME"
は、「どのキューからメッセージを受信するか」を指定しています- この
QUEUE_NAME
は、.env
ファイルで定義した環境変数の値を使用しています -Q
の後に複数のキュー名を指定することで、複数のキューからメッセージを受信することも可能です
- この
docker-compose.yml: コンテナを起動するための設定ファイル
version: "3"
services:
... # appコンテナの設定
worker:
image: hoge-worker
build:
context: .
dockerfile: ./docker/worker/Dockerfile
target: production
volumes:
- .:/usr/local/worker
- ~/.aws:/root/.aws:ro
解説
volumes
/.:/usr/local/worker
- ローカルの
worker
ディレクトリをコンテナ内にマウントしています - これにより、ローカルでの変更がコンテナ内に反映されるようになり、ローカルでの開発が行えるようになります
- ローカルの
volumes
/~/.aws:/root/.aws:ro
- ローカルの
~/.aws
ディレクトリをコンテナ内にマウントしています - これにより、AWSの認証情報がコンテナ内でも使用できるようになり、SQSへのアクセスができるようになります
- ローカルの
docker-compose.local.yml: ローカル環境でコンテナを起動するための設定ファイル
version: "3"
services:
... # worker以外のコンテナの設定
worker:
platform: linux/x86_64
build:
target: development
解説
platform: linux/x86_64
- M1チップのMacでもコンテナを動かせるようにするために指定
build: target: development
- poetryでdevelopment環境用のパッケージをインストールするために指定
4.コンテナを起動し、非同期処理を実行する
ここまでで、CeleryワーカーとDjangoアプリケーションを起動し、非同期処理を実行する最低限の準備は整いました。
さあ、Docker Composeを使用してCeleryワーカーとDjangoアプリケーションを起動し、非同期処理を試してみましょう。
手順
-
AWSの認証情報を設定する
ローカルからSQSを操作するためには、AWSの認証情報が必要です。
具体的にはローカルの~/.aws
ディレクトリにcredentials
ファイルを作成し、以下の内容を記載する必要があります。[default] aws_access_key_id = YOUR_ACCESS_KEY aws_secret_access_key = YOUR_SECRET_KEY
AWSのアクセスキーの具体的な作成方法はこちらを参考にしてみてください。
:[初心者向け] IAMユーザーにアクセスキーを払い出す手順を確認してみた | DevelopersIO -
Docker Composeでコンテナを起動する
以下のコマンドを使用して、Docker Composeでコンテナを起動します。
docker-compose -f docker-compose.yml build docker-compose -f docker-compose.yml up -d
-
コンテナの状態を確認する
コンテナが正しく起動しているかを確認するために、以下のコマンドを実行します。
docker-compose -f docker-compose.yml ps
このコマンドは、現在実行中のコンテナのリストを表示します。
app
とworker
のステータスがUp
であることを確認してください。 -
ログを確認する
CeleryワーカーやDjangoアプリケーションのログを確認することで、非同期処理が正しく実行されているかを確認できます。
以下のコマンドを使用して、ログを確認します。docker-compose -f docker-compose.yml logs -f worker
docker-compose -f docker-compose.yml logs -f app
-f
オプションを使用することで、リアルタイムでログを追跡できます。 -
非同期タスクを実行する
Djangoアプリケーション内で非同期タスクを実行するために、適切なエンドポイントを呼び出すか、管理画面からタスクをトリガーします。
タスクが正常にキューに追加され、Celeryワーカーによって処理が実行される
workerコンテナのログを確認して、タスクが正常にキューに追加され、Celeryワーカーによって処理が実行されることを確認してください。
これで、Docker Composeを使用してコンテナを起動し、非同期処理を実行する準備が整いました。
これらの手順を通じて、Djangoプロジェクトでの非同期処理の実行を確認できます。
5. ローカル環境でSQSを模擬する
さて、ここまででとりあえずAWS環境のSQSを使って、Celeryの非同期処理を動かす設定はできました。
しかし、開発効率を上げるためにも、SQSも含めて全て各自のローカル環境で完結させる方が望ましいでしょう。
なので、ここからは「localstack」というツールを使って、ローカル環境でSQSをエミュレートするための設定を行います。
設定ファイルを少し変更する
【変更点】
docker
ディレクトリにlocalstack/start.sh
を作成しますdocker-compose.local.yml
にlocalstackの設定を追加しますsettings/local.py
にlocalstackの設定を追加します
※「3.【基礎編】CeleryとSQSの基本設定」の時点でlocalstackを使用しておいてももちろん良いのですが、本記事では説明をわかりやすくするためにここでlocalstackの設定を追加しています。
project_root/
│
├── apps/
│ ├── hoge_app/
│ │ └── tasks.py
│ └── ...
│
├── config/
│ ├── settings/
│ │ └── base.py
│ │ └── local.py
│ │ └── prd.py
│ └── celery.py
│
├── docker/
│ ├── app/
│ │ └── Dockerfile
│ └── worker/
│ │ └── Dockerfile
│ │ └── entrypoint.sh
│ └── localstack/ # localstackのディレクトリ
│ └── start.sh # localstackの起動スクリプト
└── docker-compose.yml
└── docker-compose.local.yml
└── poetry.lock
└── pyproject.toml
└── .env
ここから設定ファイルの内容を解説していきますが、その前にlocalstackについて少し解説します。
localstackとは
localstackは、AWSサービスをローカルで模擬するためのエミュレータツールで、開発やテストに便利です。
localstack公式サイトはこちら:localstack
Dockerイメージが提供されているので、Dockerをインストールしていればすぐに使えます。
今回は、localstackを使用してSQSをエミュレートします。
.env: ローカルで環境変数を定義するファイル
QUEUE_NAMEとQUEUE_URLの値を変更します。
QUEUE_NAME=local-queue.fifo
QUEUE_URL="http://localstack:4566/000000000000/local-queue.fifo"
解説
QUEUE_NAME
はローカル環境でのSQSキューの名前を指定します- 任意の名前にしてOKですが、
.fifo
はつけてください - この名称のキューがlocalstack上に作成されます
- 任意の名前にしてOKですが、
QUEUE_URL
は、ローカル環境でのSQSキューのURLを指定しますhttp://localstack:4566/000000000000/
がlocalstackのデフォルトのURLですので、このまま指定してくださいQUEUE_NAME
の値がlocal-queue.fifo
なので、http://localstack:4566/000000000000/local-queue.fifo
となります
docker/localstack/start.sh: localstackの起動スクリプト
#!/bin/sh
awslocal sqs create-queue --queue-name ${QUEUE_NAME} --region ap-northeast-1 --attributes "FifoQueue=true,ContentBasedDeduplication=true,VisibilityTimeout=15"
- 解説
awslocal sqs create-queu"
- SQSキューを作成するためのコマンドです
--queue-name ${QUEUE_NAME}
QUEUE_NAME
は、.env
ファイルで定義した環境変数の値を使用しています
--region ap-northeast-1
- リージョンを指定しています
--attributes "FifoQueue=true,ContentBasedDeduplication=true,VisibilityTimeout=15"
- FIFOキューを作成するためのオプションです
FifoQueue=true
は、FIFOキューを作成するためのオプションですContentBasedDeduplication=true
は、メッセージ重複排除設定を有効にするためのオプションですVisibilityTimeout=15
は、メッセージの可視性タイムアウトを設定するためのオプションです(単位は秒)- ここの秒数は非同期タスクの実行時間よりも大きく設定してください
docker-compose.local.yml: ローカル環境でコンテナを起動するための設定ファイル
version: "3"
services:
... # worker以外のコンテナの設定
worker:
build:
target: development
depends_on: # 追加
localstack:
condition: service_healthy
localstack: # 追加
environment:
QUEUE_NAME: ${QUEUE_NAME}
image: localstack/localstack:3.7.0
ports:
- "4566:4566"
volumes:
- ./docker/localstack/start.sh:/etc/localstack/init/ready.d/start.sh
解説
-
workerコンテナ
depends_on
/localstack: condition: service_healthy
- localstackが起動していることを確認してからworkerを起動するようにしています
- この設定により、localstackコンテナよりも先にworkerコンテナが起動して、workerがSQSを見つけられずにエラーになることを防ぐことができます
-
localstackコンテナ
environment
/QUEUE_NAME: ${QUEUE_NAME}
QUEUE_NAME
は、.env
ファイルで定義した環境変数の値を参照します- start.sh内でQUEUE_NAMEを使用するために環境変数として渡しています
image: localstack/localstack:3.7.0
- localstackのイメージを指定しています
ports
/4566:4566
- localstackのポートをローカルのポートにマッピングしています
volumes
/./docker/localstack/start.sh:/etc/localstack/init/ready.d/start.sh
start.sh
はlocalstackの起動スクリプトです- ローカルの
docker/localstack/start.sh
をコンテナ内の/etc/localstack/init/ready.d/start.sh
にマウントしています - これにより、localstackコンテナの起動時に
start.sh
が実行され、SQSが作成されるようになります
config/settings/local.py: ローカル環境での設定
import os
from .base import * # noqa
... # Django関係の諸々な必要な設定を記載
CELERY_BROKER_URL = "sqs://{aws_access_key}:{aws_secret_key}@localstack:4566".format(
aws_access_key=safequote("hoge", safe=""),
aws_secret_key=safequote("huge", safe=""),
)
解説
- base.pyで定義していた
CELERY_BROKER_URL
を、local.pyで上書きしています sqs://{aws_access_key}:{aws_secret_key}@localstack:4566
は、localstackのSQSのURLを指定していますlocalstack
はlocalstackのコンテナ名です4566
はlocalstackのデフォルトのポートです{aws_access_key}
と{aws_secret_key}
は、AWSの認証情報ですが、localstackの場合はダミーの値を使用しています
ローカル環境でコンテナを起動する
ここまでの設定を行ったら、
docker compose build
→docker compose -f docker-compose.local.yml up -d
でローカル環境でコンテナを起動します。
以下のコマンドでlocalstackのコンテナも起動していることが確認できると思います。
docker-compose -f docker-compose.local.yml ps
あとは非同期処理が実行されるように、Djangoアプリケーション内で非同期タスクを実行し、ログで非同期タスクが実行されていることを確認できれば、ローカル環境での非同期処理の設定はひとまず完了です!
6. 応用編 もっとCeleryを活用する
ここまで、CeleryとSQSを使った非同期処理の基本の設定・実行方法について解説しました。
ここからは、これらの設定を応用して、より複雑な非同期処理を実現する方法について解説します。
※追記予定
複数のworkerを起動したい
アプリごとに非同期タスクを作成したい
delayとapply_asyncの違いと使い分け方
delayとapply_asyncの違い
SQSのメッセージには何が入っているの?
workerのプロセス数を調整したい
メッセージサイズが大きすぎてSQSの上限をオーバーしてしまう
効率的にタスクを処理したい
重たいタスクをバックグラウンドで実行したい
ユニットテストの注意点は?
AWSのサービス構成はどうする?
本番での耐障害性を高くするには
デプロイ時にバグを生むケース
まとめ
この記事では、DjangoプロジェクトにおけるCeleryとSQSを活用した非同期タスクの設定と実行方法について詳しく解説しました。
この記事を通じて、Djangoプロジェクトでの非同期処理の基本的な設定と実行方法を理解していただけたと思います。
非同期タスクを活用することで、アプリケーションのスケーラビリティと効率性を大幅に向上させることができます。今後のプロジェクトで、これらの知識を活用して、より高度な非同期処理を実現してください。