异步任务介绍
在开发过程中经常会遇到一些耗时的任务, 比如:文件上传, 图像处理、文件上传, 图像处理大数据操作、发送邮件、发送短信等等~。这些操作如果都同步执行耗时长对用户体验不友好,在这种情况下就可以把任务放在后台异步执行
Celery
Celery是一个功能完备即插即用的异步任务队列系统。它适用于异步处理问题,当发送邮件、或者文件上传, 图像处理等等一些比较耗时的操作,我们可将其异步执行,这样用户不需要等待很久,提高用户体验。
文档:http://docs.jinkan.org/docs/celery/getting-started/index.html
Celery的特点是:
- 简单,易于使用和维护,有丰富的文档。
- 高效,单个celery进程每分钟可以处理数百万个任务。
- 灵活,celery中几乎每个部分都可以自定义扩展。
任务队列是一种跨线程、跨机器工作的一种机制.
任务队列中包含称作任务的工作单元。有专门的工作进程持续不断的监视任务队列,并从中获得新的任务并处理.
celery通过消息进行通信,通常使用一个叫Broker(中间人)协同client(任务的发出者)和worker(任务的处理者). clients发出消息到队列中,broker将队列中的信息派发给worker来处理。
Celery的架构
Celery的架构由三部分组成,消息队列(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。
Celery包含如下组件:
Celery Beat:任务调度器,Beat进程会读取配置文件的内容,周期性地将配置中到期需要执行的任务发送给任务队列。
Celery Worker:执行任务的消费者,通常会在多台服务器运行多个消费者来提高执行效率。
Broker:消息代理,或者叫作消息中间件,接受任务生产者发送过来的任务消息,存进队列再按序分发给任务消费方(通常是消息队列或者数据库)。
Producer:调用了Celery提供的API、函数或者装饰器而产生任务并交给任务队列处理的都是任务生产者。
一个celery系统可以包含很多的worker和broker Celery本身不提供消息队列功能,但是可以很方便地和第三方提供的消息中间件进行集成,包括RabbitMQ,Redis,MongoDB等
安装
pip install django-celery celery-with-redis使用
使用celery第一件要做的最为重要的事情是需要先创建一个Celery实例,我们一般叫做celery应用,或者更简单直接叫做一个app。app应用是我们使用celery所有功能的入口,比如创建任务,管理任务等,在使用celery的时候,app必须能够被其他的模块导入。
一般celery任务目录直接放在项目的根目录下即可,路径:
项目根目录/
├── danlan_backend_api
│ ├── __init__.py
│ ├── celery.py
│ ├── dev.py
│ ├── log.py
│ ├── production.py
│ ├── settings.py
│ ├── urls.py
│ └── wsgi.py
├── definecode
│ ├── UserDefined.py
│ ├── __init__.py
│ ├── admin.py
│ ├── apps.py
│ ├── models.py
│ ├── serializers.py
│ ├── service.py
│ ├── tests.py
│ ├── urls.py
│ └── views.py
加载celery app(settings.py 文件中)

添加Celery全局配置(settings.py文件中)
# Celery
djcelery.setup_loader() # 加载djcelery
CELERY_TIMEZONE= TIME_ZONE
CELERY_ENABLE_UTC= True
# 允许的格式
CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'yaml']
# 任务队列链接地址
BROKER_URL = 'redis://127.0.0.1:6379/0' # redis作为中间件
# 结果队列的链接地址
result_backend = 'redis://127.0.0.1:6379/1'
BROKER_TRANSPORT = 'redis'
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler' # backend的数据库同步Celery表到数据库
python manage.py migrate
创建celery.py文件(与settings.py同级)
import os
import django
from celery import Celery
from django.conf import settings
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'danlan_backend_api.settings')
django.setup()
# 创建celery实例对象
app = Celery('danlan_backend_api')
# 通过app对象加载配置
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))创建任务文件
在需要使用异步任务的app中创建tasks.py,写入对应的任务函数,博主喜欢把tasks放在对应的app下,其实放在其他目录下也可以的,看个人习惯
from danlan_backend_api.celery import app
from api.service import execute_all_testcase
from env.models import Env
@app.task(name='execute_all_env_case')
def execute_all_env_case(apiID, project, user, caseType):
"""执行项目下所有环境 api对应的case
Args:
apiID:
project:
caseType:
Returns:
"""
env_list = Env.objects.filter(project=project)
for env in env_list:
execute_all_testcase(apiID, env.id, project, user, caseType)触发任务
在对应的视图中导入tasks中的任务函数调用即可

启动Celery
进入danlan_backend_api工程下,启动Celery
启动命令:
celery -A danlan_backend_api worker --loglevel=info

触发了异步任务就会在celery日志里看到任务信息
python 3.7 Celery 启动报错解决
报错”from kombu.async.timer import Entry, Timer as Schedule, to_timestamp, logger”
这是因为在 python 3.7 中将 async 作为了关键字,所以当 py 文件中出现类似 from . import async, base 这类不符合python语法的语句时,Python会报错;解决方法: 在 celery 官方的提议下,建议将 kombu下的async.py 文件的文件名改成 asynchronous;然后把引用和这个文件的所有文件的里面的async改为asynchronous;
我把修改好的文件放在附件中了,解压后,替换到site-packages路径下;

或者哪儿async报错,就手动把文件里的async替换成asynchronous





Comments | NOTHING