问小白 wenxiaobai
资讯
历史
科技
环境与自然
成长
游戏
财经
文学与艺术
美食
健康
家居
文化
情感
汽车
三农
军事
旅行
运动
教育
生活
星座命理

Django中使用Celery完成异步任务,并实时监控

创作时间:
作者:
@小白创作中心

Django中使用Celery完成异步任务,并实时监控

引用
CSDN
1.
https://blog.csdn.net/gbfeng123/article/details/100532825

本文将详细介绍如何在Django项目中使用Celery完成异步任务,并实时监控任务进程。通过本文,你将学会如何配置Celery、编写耗时任务、设置HTTP请求接口以及使用Flower进行任务监控。

环境准备

在开始之前,确保你的开发环境满足以下要求:

  • Python==3.6
  • Django==2.0(注意检查django-celery包的兼容性)
  • Celery==3.1.26.post2
  • django-celery==3.3.1
  • redis==2.10.6

如果在运行异步任务时遇到错误,请尝试降低redis包版本。

创建虚拟环境:

mkvirtualenv --python=python版本路径 虚拟环境名称

创建Django项目:

创建耗时任务并在请求中触发异步任务

tasks.py中创建耗时任务:

import time
from celery.task import Task

class UsersTask(Task):
    # 给任务命名
    name = 'user-task'
    # 给任务单独配置超时时间,也可通过CELERYD_TASK_TIME_LIMIT进行全局配置
    time_limit = 60
    # 其他属性配置可查看Task源码自行配置

    def run(self, *args, **kwargs):
        print('come in celery ...')
        time.sleep(10)
        print('out celery ...')

celery.py中设置Celery配置信息:

import djcelery
from celeryProject.settings import TIME_ZONE
djcelery.setup_loader()

# 导入celery任务
CELERY_IMPORTS = (
    'users.tasks',
)

# celery内容等消息的格式设置
CELERY_ACCEPT_CONTENT = ['application/json', ]
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'

# 为定时任务和异步任务单独设置QUEUES
CELERY_QUEUES = {
    'beat_tasks': {
        'exchange': 'beat_tasks',
        'exchange_type': 'direct',
        'binding_key': 'beat_tasks'
    },
    'work_queue': {
        'exchange': 'work_queue',
        'exchange_type': 'direct',
        'binding_key': 'work_queue'
    },
}

# 默认使用队列
CELERY_DEFAULT_QUEUE = 'work_queue'

# celery时区设置,使用settings中TIME_ZONE同样的时区,使用定时任务时必须设置
CELERY_TIMEZONE = TIME_ZONE

# 某个程序中出现的队列,在broker中不存在,则立刻创建它
CELERY_CREATE_MISSING_QUEUES = True
CELERYD_PREFETCH_MULTIPLIER = 1

# 有些情况可以防止死锁
# CELERYD_FORCE_EXECV = True

# 根据服务器配合设置并发的worker数量
CELERYD_CONCURRENCY = 3

# 允许失败时重试
CELERYD_ACKS_LATE = True

# 每个worker最多执行100个任务被销毁,可以防止内存泄漏
CELERYD_MAX_TASKS_PER_CHILD = 20

# 单个任务最大运行时间
CELERY_TASK_TIME_LIMIT = 60

编写HTTP请求接口:

from rest_framework.views import APIView, Response
from users.tasks import UsersTask

class UserCelery(APIView):
    def get(self, request):
        # 执行异步任务
        print('start request ...')
        UsersTask.delay()
        print('end request ...')
        return Response({"code":200, "status":"OK"})

设置路由:

from django.conf.urls import url
from users import views

urlpatterns = [
    url(r'^celeryTest$', views.UserCelery.as_view(),name=""),
]

settings配置文件中添加app和redis信息:

INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'rest_framework',
    'djcelery',
    'users',
]

from .celery import *

# celery配置信息
# celery中间人 redis://:redis密码@redis服务所在的ip地址:端口/数据库号
# channels配置redis也是这样配置,如果没有密码,就可以把':redis密码@'省略
BROKER_BACKEND = 'redis'
BROKER_URL = 'redis://ip:password@ip:6379/1'

# celery结果返回,可用于跟踪结果
CELERY_RESULT_BACKEND = 'redis://ip:password@ip:6379/1'

启动Django服务:

python manage.py runserver

启动Celery服务:

python manage.py celery worker -l INFO

启动后可以看到类似以下的配置信息:

请求调用异步任务的HTTP接口看到如下:

到此Celery异步任务添加成功。

添加定时任务

除了添加异步任务,还可以添加定时任务。在tasks.py中添加定时执行任务:

class UserTaskBeat(Task):
    name = 'beat-task'

    def run(self, *args, **kwargs):
        print('执行时间:{}'.format(datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
        time.sleep(5)
        print('完成时间:{}'.format(datetime.now().strftime("%Y-%m-%d %H:%M:%S")))

celery.py中添加配置:

CELERYBEAT_SCHEDULE = {
    'task-A':{
        'task':'beat-task',  # 需要执行的任务名
        'schedule':timedelta(seconds=5),  # 每分钟执行一次
        # 'args':''  # 给异步任务传递参数
        'options':{
            'queue': 'beat_tasks' # 给异步任务指定队列
        }
    }
}

另起控制台启动Celery定时任务:

python manage.py celery beat -l INFO

在两个控制台中分别看到:

安装监控工具Flower

安装Flower:

pip install flower

启动worker:

python manage.py celery worker -l INFO

启动Celery Flower:

python manage.py celery flower

启动定时任务:

python manage.py celery beat -l INFO

在浏览器中输入:

http://localhost:5555

如图:

现在即可实时监控所有任务。

注意在进入监控页面选择Broker时如果报错:

/broker (::1): Unable to get queues: ''str' object has no attribute 'name''

请查看是否Flower版本是否兼容。

© 2023 北京元石科技有限公司 ◎ 京公网安备 11010802042949号