読者です 読者をやめる 読者になる 読者になる

たなちの開発日誌

プログラミングのメモを中心に気になったことを書いていきます。

【Django】Celeryを試してみる

Celeryとは

Celeryを使えばタスクを別のスレッドやマシンに分散することができる。このタスクを分散させる仕組みをタスクキューといい、Celeryのプロセスは新しいタスクが入ってきていないかタスクキューを監視する。

タスクの実行は1つ以上のワーカーサーバー上でmultiprocessing(プロセスベースで並列処理を行えるPythonの標準ライブラリのパッケージ)等を使用して、同時に行うことができる。 タスクは、非同期に(バックグラウンドで)実行することも、同期して実行することもできる。一日に何百万ものタスクを処理することができる。

ワーカーとのやりとりについて、タスクを開始する際にクライアントはメッセージをキューに追加し、Redisなどのブローカーがそのメッセージをワーカーに持って行く。ブローカーというのは「仲介人」という意味通り、メッセージをクライアントとワーカー間で送受信させるソリューションのこと。(少しだけ後述)

参考文献:http://www.celeryproject.org/

Django First ステップ をやってみた

以下、Django First ステップ — Celery 3.1.18 ドキュメントを行ったメモ。

ソースはgitにあるとのこと
treeはこんな内容。

demoapp
├── __init__.py
├── models.py
├── tasks.py
├── tests.py
└── views.py
manage.py
proj
├── __init__.py
├── celery.py
├── settings.py
├── urls.py
└── wsgi.py

proj/celery.pyが追加されたもの。

Django と Celery の連携

proj/__init__.py

from __future__ import absolute_import

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

celery.pyからappがインポートされることによってDjangoの起動時にappがロードされ
tasks.pyの@shared_taskデコレータがcelery.pyのappを参照できるようになる。
※デコレータというのは、関数に付加機能をつけるようなもの。

proj/celery.py

from __future__ import absolute_import
import os
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

from django.conf import settings

app = Celery('proj')

app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))
app = Celery('proj')

appに対してDjangoの設定モジュールをCeleryの設定として使う。

app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

この一文により、Celeryはtasks.pyの規約に則った再利用可能なアプリケーション内のタスクを自動的に検知する。

autodiscover_tasks(packages=None, related_name=u'tasks', force=False)
"tasks.py"モジュールのパッケージのリストを検索します(またはrelated_name引数を使用します)。

参照:
celery — Distributed processing — Celery 4.0.2 documentation

@app.task(bind=True)
def debug_task(self):

bindオプションにより、関数がバインドされtaskのインスタンス(の属性とメソッド)にアクセスできるようになる。
参照:http://docs.celeryproject.org/en/latest/userguide/tasks.html#example


demoapp/tasks.py

from __future__ import absolute_import
from celery import shared_task

@shared_task
def add(x, y):
    return x + y

@shared_task
def mul(x, y):
    return x * y

@shared_task
def xsum(numbers):
    return sum(numbers)

前述した通り、@shared_taskによってcelery.pyのapp = Celery('proj')を参照できるようになり、複数のタスクが同時に処理されるようになる。

proj/settings.py

#Celery設定箇所
from __future__ import absolute_import

BROKER_URL = 'redis://localhost:6379/0'#'amqp://guest:guest@localhost//'

#: Only add pickle to this list if your broker is secured
#: from unwanted access (see userguide/security.html)
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'

BROKERとは?

Celery はメッセージの送信と受信を行うソリューションを必要とします。通常これは メッセージブローカー と呼ばれる独立したサービスの形で提供されます。

Celery ファーストステップ — Celery 3.1.18 ドキュメント

BROKER_URLはRedis データベースのロケーションを意味している。コメントアウトしているのはRabbitMQの場合のもの。
URLのフォーマットは下記の通り。

redis://:password@hostname:port/db_number

スキーム(scheme)以下の全フィールドはオプションで、デフォルトは
・ホスト名:localhost
・ポート番号:6379
・データベース番号:0
UNIX ソケット接続を使う場合、URL のフォーマットは次のようになる

redis+socket:///path/to/redis.sock

今回実装していないが、タスクのステートや戻り値を Redis に保存したい場合は、次の設定を追加すべし。

CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'

Redis の使用 — Celery 3.1.18 ドキュメント
結果バックエンドの記事


その他の設定についてはこちらを参照
Configuration and defaults — Celery 3.1.18 ドキュメント

ワーカープロセスの起動

本番環境ではワーカーをデーモンとしてバックグラウンドで稼動させるだろうが、テストや開発時においてはcelery workerコマンドを使ってワーカーインスタンスを起動する方が便利だろうとのこと。
デーモンとは、Unix系のOSで使用される常駐ソフトのこと。

$ celery -A proj worker -l info

このコマンドでworkerプロセスを起動。Ctrl+C で停止する。-lはログレベルのオプション。
ログレベルでは、DEBUG、INFO、WARNING、ERROR、CRITICAL、またはFATALのいずれかを選択します。

ワーカーについての記事:ワーカー — Celery 3.1.18 ドキュメント
オプションについてはこのページ:celery.bin.worker — Celery 3.1.18 ドキュメント

実行結果

$ celery -A proj worker -l info

 -------------- celery@hoge-VirtualBox v4.0.2 (latentcall)
---- **** ----- 
--- * ***  * -- Linux-4.4.0-57-generic-x86_64-with-Ubuntu-16.04-xenial 2016-12-27 10:55:36
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app:         proj:0x7f86e2254590
- ** ---------- .> transport:   redis://localhost:6379/0
- ** ---------- .> results:     disabled://
- *** --- * --- .> concurrency: 2 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery
                
[tasks]
  . demoapp.tasks.add
  . demoapp.tasks.mul
  . demoapp.tasks.xsum
  . proj.celery.debug_task

[2016-12-27 10:55:36,647: INFO/MainProcess] Connected to redis://localhost:6379/0
[2016-12-27 10:55:36,658: INFO/MainProcess] mingle: searching for neighbors
[2016-12-27 10:55:37,684: INFO/MainProcess] mingle: all alone
/usr/local/lib/python2.7/dist-packages/celery/fixups/django.py:202: UserWarning: Using settings.DEBUG leads to a memory leak, never use this setting in production environments!
  warnings.warn('Using settings.DEBUG leads to a memory leak, never '

[2016-12-27 10:55:39,233: WARNING/MainProcess] /usr/local/lib/python2.7/dist-packages/celery/fixups/django.py:202: UserWarning: Using settings.DEBUG leads to a memory leak, never use this setting in production environments!
  warnings.warn('Using settings.DEBUG leads to a memory leak, never '

以下に実行結果に表示されたバナーのメッセージの説明を
ネクストステップ — Celery 3.1.18 ドキュメント
より抜粋する。

Concurrency

プリフォークされるワーカープロセスの数で、同時にタスクを処理する数になります。すべてのワーカープロセスが処理中の場合、新しいタスクは他のタスクが完了するまで待たなくてはいけません。デフォルトの concurrency 数はマシンの CPU (Core も含む) 数です。-c オプションでこの数を指定できます。
オプション:-c, --concurrency

Events

ワーカーで発生したアクションの監視メッセージ(イベント)を Celery に送信させるオプションです。イベントは、celery events や Flower - リアルタイムのCelery モニター - といった監視プログラムで使われます。
オプション:-E, --events

Queues

ワーカーがタスクを取得してくるキューのリストです。一度にいくつかのキューからタスクを取得するようにワーカーに指示することもできます。これは Quality of Service や separation of concerns、優先度のエミュレートを実現する手段として特定のワーカーにメッセージを送るのに使用されます。
オプション:-Q, --queues

複数のworkerをバックグラウンドで起動する方法

$ celery multi start w1 -A proj -l info

再起動

$ celery  multi restart w1 -A proj -l info

停止

$ celery multi stop w1 -A proj -l info
または
$ celery multi stopwait w1 -A proj -l info

stop コマンドは非同期なのでワーカーが停止するまで待ちません。stopwait コマンドを使えば現在実行中の全タスクが完了するのを待って終了します。

以下の記事も参考になりました。
DjangoでCeleryのメモ - Qiita