MENU

Celery入門:Pythonで非同期タスクを効率的に実行しよう!

こんにちは。今日は、Pythonの強力なタスクキューライブラリ「Celery」について詳しく解説していきます。

Webアプリケーションを開発していると、メール送信やデータ処理など、時間のかかる処理が出てくることがありますよね。これらの処理をリクエストと同期的に実行してしまうと、レスポンスが返ってくるまで待たされ、ユーザーエクスペリエンスが損なわれてしまいます。

そこで活躍するのが、非同期タスク実行ライブラリのCeleryです。Celeryを使えば、重い処理をバックグラウンドで非同期に実行でき、アプリケーションのレスポンス性能を大幅に向上させることができるのです。

本記事では、Celeryの特徴や使い方、アーキテクチャ、使用例などを詳しく見ていきます。Django、Flask、FastAPIなどのウェブフレームワークと組み合わせて使う方法もご紹介しますので、ぜひ最後までお付き合いください。

目次

Celeryの主な特徴

Celeryには以下のような特徴があります。

  • メッセージブローカー(RabbitMQ、Redisなど)を介してタスクを非同期に実行できる
  • 複数のワーカープロセスを使ってタスクを並列処理できる
  • 定期的なタスクのスケジューリングもサポート
  • Django、Flask、Pyramidなどの主要なPythonウェブフレームワークと統合しやすい
  • タスクの実行状況や結果をモニタリング・管理できる(Celery flower)

特に、メッセージブローカーを介した非同期実行と、ワーカープロセスによる並列処理が強力です。これにより、アプリケーションのレスポンス性能を損なうことなく、重い処理を効率的に実行できるようになります。

Celeryの基本的な使い方

Celeryの使い方は、以下のようにシンプルです。

  1. Celeryアプリケーションを作成:

from celery import Celery app = Celery('tasks', broker='redis://localhost:6379')

  1. タスクを定義:

@app.task def add(x, y): return x + y

  1. ワーカーを起動:

celery -A tasks worker --loglevel=info

  1. タスクを呼び出し:

result = add.delay(4, 4) result.get() # 8

上記の例では、Redisをメッセージブローカーとして使用していますが、RabbitMQなど他のブローカーも使用可能です。@app.taskデコレータを使ってタスクを定義し、delay()メソッドを使ってタスクを非同期に呼び出しています。

Celeryのアーキテクチャ

Celeryは以下のコンポーネントから構成されます。

  • クライアント:タスクを発行するアプリケーション(Webアプリケーションなど)
  • メッセージブローカー:タスクを受け取るキュー(RabbitMQ、Redisなど)
  • ワーカー:タスクを実行するプロセス
  • バックエンド:タスクの結果を保存するストレージ(Redis、RDBMSなど)

クライアントがタスクをブローカーに送信し、ワーカーがブローカーからタスクを受け取って実行します。タスクの実行結果はバックエンドに保存され、クライアントから参照できます。

このアーキテクチャにより、クライアントとワーカーを分離し、スケーラビリティと信頼性を高めることができます。

Celeryの主な使用例

Celeryは以下のようなケースで活用できます。

  • メール送信などの時間のかかる処理の非同期化
  • 大量のデータ処理やファイル変換などのバックグラウンドジョブ
  • 定期的なデータのクロールやクリーンアップタスクのスケジューリング
  • 機械学習モデルの非同期推論

例えば、ユーザー登録時にウェルカムメールを送信する処理があるとします。この処理をリクエストと同期的に実行すると、メール送信が完了するまでレスポンスが返せません。しかし、Celeryを使ってメール送信を非同期タスクとして実行することで、レスポンスをすぐに返せるようになります。

ウェブフレームワークとの統合

CeleryはDjango、Flask、FastAPIなどの主要なPythonウェブフレームワークと簡単に統合できます。

Django との統合

Djangoでは、django-celery-resultsパッケージを使ってCeleryを簡単に統合できます。

  1. django-celery-resultsをインストール:

pip install django-celery-results

  1. Djangoの設定ファイルに以下を追加:

INSTALLED_APPS = [ ... 'django_celery_results', ] CELERY_RESULT_BACKEND = 'django-db'

  1. タスクを定義:

from celery import shared_task @shared_task def add(x, y): return x + y

  1. タスクを呼び出し:

from .tasks import add result = add.delay(4, 4)

FastAPI との統合

FastAPIでは、fastapi-celeryパッケージを使ってCeleryを簡単に統合できます。

  1. fastapi-celeryをインストール:

pip install fastapi-celery

  1. FastAPIアプリケーションにCeleryを追加:

from fastapi import FastAPI from fastapi_celery import CeleryClient app = FastAPI() celery_client = CeleryClient( broker="redis://localhost:6379", result_backend="redis://localhost:6379" ) @app.post("/tasks") async def run_task(name): # Run a task asynchronously via Celery task = celery_client.send_task(name) return {"task_id": task.id}

このように、各フレームワーク用のパッケージを使えば、Celeryとの統合は非常に簡単です。

まとめ

本記事では、Pythonの非同期タスク実行ライブラリであるCeleryについて詳しく解説しました。Celeryを使えば、重い処理をバックグラウンドで非同期に実行でき、アプリケーションのレスポンス性能を大幅に向上させることができます。

Celeryはメッセージブローカーを介した非同期実行と、ワーカープロセスによる並列処理が特徴です。Django、Flask、FastAPIなどの主要なウェブフレームワークとの統合も簡単で、様々な用途で活用できます。

長時間かかる処理を非同期化し、アプリケーションのパフォーマンスを高めたい場合は、ぜひCeleryを検討してみてください。

Celeryを始めるためのTips

項目 説明 メッセージブローカー RabbitMQ、Redis、Amazon SQSなどから選択。RabbitMQが一般的 バックエンド タスクの結果を保存するために使用。Redis、RDBMSなどを使用可能 モニタリング Celery flowerを使ってタスクの実行状況をモニタリング エラーハンドリング タスクの再試行やエラー通知の設定が重要 スケーリング ワーカー数を増やすことで並列処理能力を向上可能

参考リンク

ぜひ、これらのリソースも参考にしながら、Celeryを使ったPythonでの非同期タスク実行にチャレンジしてみてください。

よかったらシェアしてね!
  • URLをコピーしました!
  • URLをコピーしました!

この記事を書いた人

目次