今天介绍一下如何在django项目中使用celery搭建一个有两个节点的任务队列(一个主节点一个子节点;主节点发布任务,子节点收到任务并执行。搭建3个或者以上的节点就类似了),使用到了celery,rabbitmq。这里不会单独介绍celery和rabbitmq中的知识了。
基础环境 两个ubuntu18.04虚拟机、python3.6.5、django2.0.4、celery3.1.26post2
项目结构 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 proj - celery1 - admin.py - apps.py - models.py - tasks.py - tests.py - urls.py - views.py - proj - celery.py - settings.py - urls.py - wsgi.py - manage,py
celery的配置 settings.py中关于celery的配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import djceleryfrom kombu import Queue, Exchangedjcelery.setup_loader() BROKER_URL = 'amqp://test:test@192.168.43.6:5672/testhost' CELERY_RESULT_BACKEND = 'amqp://test:test@192.168.43.6:5672/testhost' CELERY_TASK_RESULT_EXPIRES=3600 CELERY_TASK_SERIALIZER='json' CELERY_RESULT_SERIALIZER='json' CELERY_DEFAULT_EXCHANGE = 'train' CELERY_DEFAULT_EXCHANGE_TYPE = 'direct' CELERY_IMPORTS = ("proj.celery1.tasks" , ) CELERY_QUEUES = ( Queue('train' , routing_key='train' ), Queue('predict' , routing_key='predict' ), )
celery.py中的配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 from __future__ import absolute_importimport osfrom celery import Celeryfrom django.conf import settingsos.environ.setdefault('DJANGO_SETTINGS_MODULE' , 'proj.settings' ) app = Celery('proj' ) app.config_from_object('django.conf:settings' ) app.autodiscover_tasks(settings.INSTALLED_APPS) @app.task(bind=True) def debug_task (self) : print('Request: {0!r}' .format(self.request))
proj/init .py中的配置:
1 2 from __future__ import absolute_importfrom .celery import app as celery_app
celery1/tasks.py:(主节点中的任务不会执行,只执行子节点中的任务)
1 2 3 4 5 6 7 from __future__ import absolute_importfrom celery import task@task def do_train (x, y) : return x + y
celery1/views.py:
1 2 3 4 5 6 7 8 9 10 11 12 from .tasks import do_trainclass Test1View (APIView) : def get (self, request) : try : ret = do_train.apply_async(args=[4 , 2 ], queue="train" , routing_key="train" ) data = ret.get() except Exception as e: return Response(dict(msg=str(e), code=10001 )) return Response(dict(msg="OK" , code=10000 , data=data))
子节点信息 目录结构 1 2 3 4 celery1 - celery.py - config.py - tasks.py
配置 子节点中celery1/celery.py:
1 2 3 4 5 6 7 8 9 10 11 12 13 from __future__ import absolute_importfrom celery import CeleryCELERY_IMPORTS = ("celery1.tasks" , ) app = Celery('myapp' , broker='amqp://test:test@192.168.43.6:5672/testhost' , backend='amqp://test:test@192.168.43.6:5672/testhost' , include=['celery1.tasks' ]) app.config_from_object('celery1.config' ) if __name__ == '__main__' : app.start()
子节点中celery1/config.py:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 from __future__ import absolute_importfrom kombu import Queue,Exchangefrom datetime import timedeltaCELERY_TASK_RESULT_EXPIRES=3600 CELERY_TASK_SERIALIZER='json' CELERY_RESULT_SERIALIZER='json' CELERY_ACCEPT_CONTENT = ['json' ,'pickle' ,'msgpack' ,'yaml' ] CELERY_DEFAULT_EXCHANGE = 'train' CELERY_DEFAULT_EXCHANGE_TYPE = 'direct' CELERT_QUEUES = ( Queue('train' ,exchange='train' ,routing_key='train' ), )
子节点celery1/tasks.py:(这个是要真正执行的task,每个节点可以不同)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 from __future__ import absolute_importfrom celery1.celery import appimport timefrom celery import task@task def do_train (x, y) : """ 训练 :param data: :return: """ time.sleep(3 ) return dict(data=str(x+y),msg="train" )
项目启动 启动子节点:其中celery1是项目,-Q train表示从train这个queue中接收任务
1 celery -A celery1 worker -l info -Q train
启动主节点:
1 python manage.py runserver
使用Postman请求对应的view:
1 2 3 4 5 6 7 8 9 10 请求url:http://127.0 .0 .1 :8000 /api/v1/celery1/test/ 返回的结果是: { "msg" : "OK" , "code" : 10000 , "data" : { "data" : "6" , "msg" : "train" } }
可能遇到的问题 1)celery队列报错: AttributeError: ‘str’ object has no attribute ‘items’ 解决:将redis库从3.0回退到了2.10,pip install redis==2.10 解决方法参考链接:https://stackoverflow.com/questions/53322425/celery-critical-mainprocess-unrecoverable-error-attributeerrorfloat-object