Django使用Celery进行异步任务

发布于 2020-11-09  756 次阅读


异步任务介绍

在开发过程中经常会遇到一些耗时的任务, 比如:文件上传, 图像处理、文件上传, 图像处理大数据操作、发送邮件、发送短信等等~。这些操作如果都同步执行耗时长对用户体验不友好,在这种情况下就可以把任务放在后台异步执行

Celery

Celery是一个功能完备即插即用的异步任务队列系统。它适用于异步处理问题,当发送邮件、或者文件上传, 图像处理等等一些比较耗时的操作,我们可将其异步执行,这样用户不需要等待很久,提高用户体验。

文档:http://docs.jinkan.org/docs/celery/getting-started/index.html

Celery的特点是:

  • 简单,易于使用和维护,有丰富的文档。
  • 高效,单个celery进程每分钟可以处理数百万个任务。
  • 灵活,celery中几乎每个部分都可以自定义扩展。
任务队列是一种跨线程、跨机器工作的一种机制.
任务队列中包含称作任务的工作单元。有专门的工作进程持续不断的监视任务队列,并从中获得新的任务并处理.
celery通过消息进行通信,通常使用一个叫Broker(中间人)协同client(任务的发出者)和worker(任务的处理者). clients发出消息到队列中,broker将队列中的信息派发给worker来处理。

Celery的架构

Celery的架构由三部分组成,消息队列(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。

Celery包含如下组件:

Celery Beat:任务调度器,Beat进程会读取配置文件的内容,周期性地将配置中到期需要执行的任务发送给任务队列。

Celery Worker:执行任务的消费者,通常会在多台服务器运行多个消费者来提高执行效率。

Broker:消息代理,或者叫作消息中间件,接受任务生产者发送过来的任务消息,存进队列再按序分发给任务消费方(通常是消息队列或者数据库)。

Producer:调用了Celery提供的API、函数或者装饰器而产生任务并交给任务队列处理的都是任务生产者。

一个celery系统可以包含很多的worker和broker Celery本身不提供消息队列功能,但是可以很方便地和第三方提供的消息中间件进行集成,包括RabbitMQ,Redis,MongoDB等

安装

pip install django-celery celery-with-redis

使用

使用celery第一件要做的最为重要的事情是需要先创建一个Celery实例,我们一般叫做celery应用,或者更简单直接叫做一个app。app应用是我们使用celery所有功能的入口,比如创建任务,管理任务等,在使用celery的时候,app必须能够被其他的模块导入。

一般celery任务目录直接放在项目的根目录下即可,路径:

项目根目录/
├── danlan_backend_api
│   ├── __init__.py
│   ├── celery.py
│   ├── dev.py
│   ├── log.py
│   ├── production.py
│   ├── settings.py
│   ├── urls.py
│   └── wsgi.py
├── definecode
│   ├── UserDefined.py
│   ├── __init__.py
│   ├── admin.py
│   ├── apps.py
│   ├── models.py
│   ├── serializers.py
│   ├── service.py
│   ├── tests.py
│   ├── urls.py
│   └── views.py

加载celery app(settings.py 文件中)

添加Celery全局配置(settings.py文件中)

# Celery
djcelery.setup_loader() # 加载djcelery
CELERY_TIMEZONE= TIME_ZONE
CELERY_ENABLE_UTC= True

# 允许的格式
CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'yaml']
# 任务队列链接地址
BROKER_URL = 'redis://127.0.0.1:6379/0'    # redis作为中间件
# 结果队列的链接地址
result_backend = 'redis://127.0.0.1:6379/1'

BROKER_TRANSPORT = 'redis'
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'  # backend的数据库

同步Celery表到数据库

python manage.py migrate

创建celery.py文件(与settings.py同级)

import os
import django
from celery import Celery
from django.conf import settings

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'danlan_backend_api.settings')
django.setup()
# 创建celery实例对象
app = Celery('danlan_backend_api')
# 通过app对象加载配置
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)


@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

创建任务文件

在需要使用异步任务的app中创建tasks.py,写入对应的任务函数,博主喜欢把tasks放在对应的app下,其实放在其他目录下也可以的,看个人习惯

from danlan_backend_api.celery import app
from api.service import execute_all_testcase
from env.models import Env


@app.task(name='execute_all_env_case')
def execute_all_env_case(apiID, project, user, caseType):
    """执行项目下所有环境 api对应的case

    Args:
        apiID:
        project:
        caseType:

    Returns:

    """

    env_list = Env.objects.filter(project=project)
    for env in env_list:
        execute_all_testcase(apiID, env.id, project, user, caseType)

触发任务

在对应的视图中导入tasks中的任务函数调用即可

启动Celery

进入danlan_backend_api工程下,启动Celery

启动命令:

celery -A danlan_backend_api worker --loglevel=info

触发了异步任务就会在celery日志里看到任务信息

python 3.7 Celery 启动报错解决

报错”from kombu.async.timer import Entry, Timer as Schedule, to_timestamp, logger”

这是因为在 python 3.7 中将 async 作为了关键字,所以当 py 文件中出现类似 from . import async, base 这类不符合python语法的语句时,Python会报错;解决方法: 在 celery 官方的提议下,建议将 kombu下的async.py 文件的文件名改成 asynchronous;然后把引用和这个文件的所有文件的里面的async改为asynchronous;

我把修改好的文件放在附件中了,解压后,替换到site-packages路径下;

或者哪儿async报错,就手动把文件里的async替换成asynchronous


一名测试工作者,专注接口测试、自动化测试、性能测试、Python技术。