Celery 结合 Django 做异步任务

一、前言

Django原生是单线程的,如果遇到执行时间过长的,只能干等着页面返回,而且不能做别的事情。为了解决这种状况,决定采用异步任务(Celery)的方式。

官方文档 | Using Celery with Django

准备工作

1、安装celery

如果还没安装celery,则先安装 celery

pip3 install celery

2、安装RabbitMQ

file

截图中的这两个标签版本,management 表示带 web 管理界面的版本,也就是可视化操作的,所以镜像相对大个10M左右,我们通常都是使用这个版本。

拉取镜像:

docker pull rabbitmq:3.8.9-management
# docker pull rabbitmq:3.9.21-management

创建并启动镜像:

docker run -d \
-v /Users/kaiyi/dev-data/rabbitmq/data:/var/lib/rabbitmq \
-p 15672:15672 -p 5672:5672 \
--name rabbitmq \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin \
 --hostname my-rabbitmq rabbitmq:3.8.9-management

新方法:

docker run -d -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_VHOST=/ -e RABBITMQ_DEFAULT_USER=guest -e RABBITMQ_DEFAULT_PASS=guest --hostname myRabbit --name rabbitmq  rabbitmq:3.8.9-management

参数说明:

  • -d:表示在后台运行容器;
  • -p:将容器的端口 5672(应用访问端口)和 15672 (控制台Web端口号)映射到主机中;
  • -e:指定环境变量:
  • RABBITMQ_DEFAULT_VHOST:默认虚拟机名;
  • RABBITMQ_DEFAULT_USER:默认的用户名;
  • RABBITMQ_DEFAULT_PASS:默认的用户密码;
  • --hostname:指定主机名(RabbitMQ 的一个重要注意事项是它根据所谓的 节点名称 存储数据,默认为主机名);
  • --name rabbitmq:设置容器名称;
  • rabbitmq:容器使用的镜像名称;

docker安装rabbitmq
docker安装RabbitMQ详细步骤

  1. rabbitmq管理界面
    浏览器访问http://127.0.0l1:15672
    初始账号密码 guest guest

file

二、使用

Celery之前需要一个单独的库(djcelery)与Django一起使用,但是从3.1.x(ps:3.1之后就是4.x版本)以后就不再使用了。只需要安装好Celery,Django就可以直接使用了。经过测试发现Celery4.x版本依然可以是和djcelery一起使用,但真的没必要。
要将Celery与Django项目一起使用,必须首先定义Celery库的实例(称为"app")。
如果已经有了一个Django项目,例如:

- proj/
  - manage.py
  - proj/
    - __init__.py
    - settings.py
    - urls.py

那么建议的方法是创建一个新的 proj/proj/celery.py 模块,该模块定义Celery实例:
文件: proj/proj/celery.py

from __future__ import absolute_import, unicode_literals

import os

from celery import Celery

# set the defalut Django settings module for the 'celery' program
# 为"celery"程序设置默认的Django settings 模块
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

app = Celery('proj')

# Using a string here means the worker dosen't have to serialize the configuration object to child processes.
# 在这里使用字符串意味着worker不必将配置对象序列化为子进程。
# - namespace='CELERY' means all celery-related configuration keys should have a 'CELERY_' prefix
# namespace="CELERY"表示所有与Celery相关的配置keys均应该带有'CELERY_'前缀。
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
# 从所有注册的Django app 配置中加载 task模块。
app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

settings.py文件:

"""
Django settings for djtest project.

Generated by 'django-admin startproject' using Django 3.2.5.

For more information on this file, see
https://docs.djangoproject.com/en/3.2/topics/settings/

For the full list of settings and their values, see
https://docs.djangoproject.com/en/3.2/ref/settings/
"""

from pathlib import Path

# Build paths inside the project like this: BASE_DIR / 'subdir'.
BASE_DIR = Path(__file__).resolve().parent.parent

# Quick-start development settings - unsuitable for production
# See https://docs.djangoproject.com/en/3.2/howto/deployment/checklist/

# SECURITY WARNING: keep the secret key used in production secret!
SECRET_KEY = 'django-insecure-jd5&k^k++^%50_y!gn239!t&h=hr&yh!9anxxsl(-au8((%c-v'

# SECURITY WARNING: don't run with debug turned on in production!
DEBUG = True

ALLOWED_HOSTS = []

# Application definition

INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'app1',
]

MIDDLEWARE = [
    'django.middleware.security.SecurityMiddleware',
    'django.contrib.sessions.middleware.SessionMiddleware',
    'django.middleware.common.CommonMiddleware',
    'django.middleware.csrf.CsrfViewMiddleware',
    'django.contrib.auth.middleware.AuthenticationMiddleware',
    'django.contrib.messages.middleware.MessageMiddleware',
    'django.middleware.clickjacking.XFrameOptionsMiddleware',
]

ROOT_URLCONF = 'djtest.urls'

TEMPLATES = [
    {
        'BACKEND': 'django.template.backends.django.DjangoTemplates',
        'DIRS': [BASE_DIR / 'templates']
        ,
        'APP_DIRS': True,
        'OPTIONS': {
            'context_processors': [
                'django.template.context_processors.debug',
                'django.template.context_processors.request',
                'django.contrib.auth.context_processors.auth',
                'django.contrib.messages.context_processors.messages',
            ],
        },
    },
]

WSGI_APPLICATION = 'djtest.wsgi.application'

# Database
# https://docs.djangoproject.com/en/3.2/ref/settings/#databases

DATABASES = {
    'default': {
        'ENGINE': 'django.db.backends.sqlite3',
        'NAME': BASE_DIR / 'db.sqlite3',
    }
}

# Password validation
# https://docs.djangoproject.com/en/3.2/ref/settings/#auth-password-validators

AUTH_PASSWORD_VALIDATORS = [
    {
        'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator',
    },
    {
        'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator',
    },
    {
        'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator',
    },
    {
        'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator',
    },
]

# Internationalization
# https://docs.djangoproject.com/en/3.2/topics/i18n/

LANGUAGE_CODE = 'en-us'

TIME_ZONE = 'UTC'

USE_I18N = True

USE_L10N = True

USE_TZ = True

# Static files (CSS, JavaScript, Images)
# https://docs.djangoproject.com/en/3.2/howto/static-files/

STATIC_URL = '/static/'

# Default primary key field type
# https://docs.djangoproject.com/en/3.2/ref/settings/#default-auto-field

DEFAULT_AUTO_FIELD = 'django.db.models.BigAutoField'

# Celery
CELERY_BROKER_URL = 'amqp://guest:guest@127.0.0.1:5672//'  # Broker配置,使用mq作为消息中间件
CELERY_RESULT_BACKEND = 'rpc://guest:guest@127.0.0.1:5672//'  # Backend设置,使用mq后端结果存储
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC = False
CELERY_WORKER_CONCURRENCY = 99  # 并发的worker数量
CELERY_ACKS_LATE = True
CELERY_WORKER_MAX_TASKS_PER_CHILD = 5  # 每个worker最多执行的任务数, 可防止内存泄漏
CELERY_TASK_TIME_LIMIT = 15 * 60  # 任务超时时间

broker参数指定代理URL,对于RabbitMQ,传输是amqp。

后端参数指定后端URL。Celery中的后端用于存储任务结果。因此,如果需要在任务完成时访问任务的结果,应该为Celery设置一个后端。
rpc意味着将结果作为AMQP消息发送回去,这对本次演示来说是一种可接受的格式
include参数指定了在Celery工作程序启动时要导入的模块列表。我们在这里添加了tasks模块,以便找到我们的任务。

tasks.py 这个文件中,定义了我们的任务:

from __future__ import absolute_import, unicode_literals
from celery import shared_task

@shared_task
def add(x, y):
    return x + y

@shared_task
def mul(x, y):
    return x * y

启动celery worker:

celery -A djtest worker --loglevel=INFO

输出打印:

(djtest) ➜  djtest celery -A djtest worker -l info 

 -------------- celery@MacdeMacBook-Pro.local v5.2.7 (dawn-chorus)
--- ***** ----- 
-- ******* ---- macOS-10.16-x86_64-i386-64bit 2022-07-31 04:12:54
- *** --- * --- 
- ** ---------- [config]
- ** ---------- .> app:         djtest:0x7fe0f2ffb0d0
- ** ---------- .> transport:   amqp://guest:**@127.0.0.1:5672//
- ** ---------- .> results:     rpc://
- *** --- * --- .> concurrency: 99 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery

[tasks]
  . app1.tasks.add
  . app1.tasks.mul
  . djtest.celery.debug_task

[2022-07-31 04:12:59,366: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
[2022-07-31 04:12:59,386: INFO/MainProcess] mingle: searching for neighbors
[2022-07-31 04:13:00,437: INFO/MainProcess] mingle: all alone
[2022-07-31 04:13:00,477: WARNING/MainProcess] /Users/kaiyi/.conda/envs/djtest/lib/python3.8/site-packages/celery/fixups/django.py:203: UserWarning: Using settings.DEBUG leads to a memory
            leak, never use this setting in production environments!
  warnings.warn('''Using settings.DEBUG leads to a memory

[2022-07-31 04:13:00,477: INFO/MainProcess] celery@MacdeMacBook-Pro.local ready.

在终端进行调用测试:

from app1.tasks import add
add.delay(8, 10)
<AsyncResult: 1c73a428-e40a-4cce-ac6f-9400459a772e>

file

...

[2022-07-31 04:12:59,366: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
[2022-07-31 04:12:59,386: INFO/MainProcess] mingle: searching for neighbors
[2022-07-31 04:13:00,437: INFO/MainProcess] mingle: all alone
[2022-07-31 04:13:00,477: WARNING/MainProcess] /Users/kaiyi/.conda/envs/djtest/lib/python3.8/site-packages/celery/fixups/django.py:203: UserWarning: Using settings.DEBUG leads to a memory
            leak, never use this setting in production environments!
  warnings.warn('''Using settings.DEBUG leads to a memory

[2022-07-31 04:13:00,477: INFO/MainProcess] celery@MacdeMacBook-Pro.local ready.
[2022-07-31 04:14:35,203: INFO/MainProcess] Task app1.tasks.add[51b7c305-ceb5-4ab9-8999-e415e139356a] received
[2022-07-31 04:14:35,231: INFO/ForkPoolWorker-64] Task app1.tasks.add[51b7c305-ceb5-4ab9-8999-e415e139356a] succeeded in 0.024190212000007705s: 6

file

消息队列:
file

file


相关文章:
Celery 学习与实践
官方文档 | Using Celery with Django
Celery 与 Django
celery结合django使用
Github|Django结合Celery案例

为者常成,行者常至