Celery 结合 Django 做异步任务
一、前言
Django原生是单线程的,如果遇到执行时间过长的,只能干等着页面返回,而且不能做别的事情。为了解决这种状况,决定采用异步任务(Celery)的方式。
官方文档 | Using Celery with Django
准备工作
1、安装celery
如果还没安装celery,则先安装 celery
pip3 install celery
2、安装RabbitMQ
截图中的这两个标签版本,management 表示带 web 管理界面的版本,也就是可视化操作的,所以镜像相对大个10M左右,我们通常都是使用这个版本。
拉取镜像:
docker pull rabbitmq:3.8.9-management
# docker pull rabbitmq:3.9.21-management
创建并启动镜像:
docker run -d \
-v /Users/kaiyi/dev-data/rabbitmq/data:/var/lib/rabbitmq \
-p 15672:15672 -p 5672:5672 \
--name rabbitmq \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin \
--hostname my-rabbitmq rabbitmq:3.8.9-management
新方法:
docker run -d -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_VHOST=/ -e RABBITMQ_DEFAULT_USER=guest -e RABBITMQ_DEFAULT_PASS=guest --hostname myRabbit --name rabbitmq rabbitmq:3.8.9-management
参数说明:
- -d:表示在后台运行容器;
- -p:将容器的端口 5672(应用访问端口)和 15672 (控制台Web端口号)映射到主机中;
- -e:指定环境变量:
- RABBITMQ_DEFAULT_VHOST:默认虚拟机名;
- RABBITMQ_DEFAULT_USER:默认的用户名;
- RABBITMQ_DEFAULT_PASS:默认的用户密码;
- --hostname:指定主机名(RabbitMQ 的一个重要注意事项是它根据所谓的 节点名称 存储数据,默认为主机名);
- --name rabbitmq:设置容器名称;
- rabbitmq:容器使用的镜像名称;
docker安装rabbitmq
docker安装RabbitMQ详细步骤
- rabbitmq管理界面
浏览器访问http://127.0.0l1:15672
初始账号密码 guest guest
二、使用
Celery之前需要一个单独的库(djcelery)与Django一起使用,但是从3.1.x(ps:3.1之后就是4.x版本)以后就不再使用了。只需要安装好Celery,Django就可以直接使用了。经过测试发现Celery4.x版本依然可以是和djcelery一起使用,但真的没必要。
要将Celery与Django项目一起使用,必须首先定义Celery库的实例(称为"app")。
如果已经有了一个Django项目,例如:
- proj/
- manage.py
- proj/
- __init__.py
- settings.py
- urls.py
那么建议的方法是创建一个新的 proj/proj/celery.py
模块,该模块定义Celery实例:
文件: proj/proj/celery.py
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
# set the defalut Django settings module for the 'celery' program
# 为"celery"程序设置默认的Django settings 模块
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')
app = Celery('proj')
# Using a string here means the worker dosen't have to serialize the configuration object to child processes.
# 在这里使用字符串意味着worker不必将配置对象序列化为子进程。
# - namespace='CELERY' means all celery-related configuration keys should have a 'CELERY_' prefix
# namespace="CELERY"表示所有与Celery相关的配置keys均应该带有'CELERY_'前缀。
app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django app configs.
# 从所有注册的Django app 配置中加载 task模块。
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
settings.py文件:
"""
Django settings for djtest project.
Generated by 'django-admin startproject' using Django 3.2.5.
For more information on this file, see
https://docs.djangoproject.com/en/3.2/topics/settings/
For the full list of settings and their values, see
https://docs.djangoproject.com/en/3.2/ref/settings/
"""
from pathlib import Path
# Build paths inside the project like this: BASE_DIR / 'subdir'.
BASE_DIR = Path(__file__).resolve().parent.parent
# Quick-start development settings - unsuitable for production
# See https://docs.djangoproject.com/en/3.2/howto/deployment/checklist/
# SECURITY WARNING: keep the secret key used in production secret!
SECRET_KEY = 'django-insecure-jd5&k^k++^%50_y!gn239!t&h=hr&yh!9anxxsl(-au8((%c-v'
# SECURITY WARNING: don't run with debug turned on in production!
DEBUG = True
ALLOWED_HOSTS = []
# Application definition
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'app1',
]
MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
]
ROOT_URLCONF = 'djtest.urls'
TEMPLATES = [
{
'BACKEND': 'django.template.backends.django.DjangoTemplates',
'DIRS': [BASE_DIR / 'templates']
,
'APP_DIRS': True,
'OPTIONS': {
'context_processors': [
'django.template.context_processors.debug',
'django.template.context_processors.request',
'django.contrib.auth.context_processors.auth',
'django.contrib.messages.context_processors.messages',
],
},
},
]
WSGI_APPLICATION = 'djtest.wsgi.application'
# Database
# https://docs.djangoproject.com/en/3.2/ref/settings/#databases
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': BASE_DIR / 'db.sqlite3',
}
}
# Password validation
# https://docs.djangoproject.com/en/3.2/ref/settings/#auth-password-validators
AUTH_PASSWORD_VALIDATORS = [
{
'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator',
},
]
# Internationalization
# https://docs.djangoproject.com/en/3.2/topics/i18n/
LANGUAGE_CODE = 'en-us'
TIME_ZONE = 'UTC'
USE_I18N = True
USE_L10N = True
USE_TZ = True
# Static files (CSS, JavaScript, Images)
# https://docs.djangoproject.com/en/3.2/howto/static-files/
STATIC_URL = '/static/'
# Default primary key field type
# https://docs.djangoproject.com/en/3.2/ref/settings/#default-auto-field
DEFAULT_AUTO_FIELD = 'django.db.models.BigAutoField'
# Celery
CELERY_BROKER_URL = 'amqp://guest:guest@127.0.0.1:5672//' # Broker配置,使用mq作为消息中间件
CELERY_RESULT_BACKEND = 'rpc://guest:guest@127.0.0.1:5672//' # Backend设置,使用mq后端结果存储
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC = False
CELERY_WORKER_CONCURRENCY = 99 # 并发的worker数量
CELERY_ACKS_LATE = True
CELERY_WORKER_MAX_TASKS_PER_CHILD = 5 # 每个worker最多执行的任务数, 可防止内存泄漏
CELERY_TASK_TIME_LIMIT = 15 * 60 # 任务超时时间
broker参数指定代理URL,对于RabbitMQ,传输是amqp。
后端参数指定后端URL。Celery中的后端用于存储任务结果。因此,如果需要在任务完成时访问任务的结果,应该为Celery设置一个后端。rpc
意味着将结果作为AMQP消息发送回去,这对本次演示来说是一种可接受的格式include
参数指定了在Celery工作程序启动时要导入的模块列表。我们在这里添加了tasks模块,以便找到我们的任务。
在 tasks.py
这个文件中,定义了我们的任务:
from __future__ import absolute_import, unicode_literals
from celery import shared_task
@shared_task
def add(x, y):
return x + y
@shared_task
def mul(x, y):
return x * y
启动celery worker:
celery -A djtest worker --loglevel=INFO
输出打印:
(djtest) ➜ djtest celery -A djtest worker -l info
-------------- celery@MacdeMacBook-Pro.local v5.2.7 (dawn-chorus)
--- ***** -----
-- ******* ---- macOS-10.16-x86_64-i386-64bit 2022-07-31 04:12:54
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: djtest:0x7fe0f2ffb0d0
- ** ---------- .> transport: amqp://guest:**@127.0.0.1:5672//
- ** ---------- .> results: rpc://
- *** --- * --- .> concurrency: 99 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. app1.tasks.add
. app1.tasks.mul
. djtest.celery.debug_task
[2022-07-31 04:12:59,366: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
[2022-07-31 04:12:59,386: INFO/MainProcess] mingle: searching for neighbors
[2022-07-31 04:13:00,437: INFO/MainProcess] mingle: all alone
[2022-07-31 04:13:00,477: WARNING/MainProcess] /Users/kaiyi/.conda/envs/djtest/lib/python3.8/site-packages/celery/fixups/django.py:203: UserWarning: Using settings.DEBUG leads to a memory
leak, never use this setting in production environments!
warnings.warn('''Using settings.DEBUG leads to a memory
[2022-07-31 04:13:00,477: INFO/MainProcess] celery@MacdeMacBook-Pro.local ready.
在终端进行调用测试:
from app1.tasks import add
add.delay(8, 10)
<AsyncResult: 1c73a428-e40a-4cce-ac6f-9400459a772e>
...
[2022-07-31 04:12:59,366: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
[2022-07-31 04:12:59,386: INFO/MainProcess] mingle: searching for neighbors
[2022-07-31 04:13:00,437: INFO/MainProcess] mingle: all alone
[2022-07-31 04:13:00,477: WARNING/MainProcess] /Users/kaiyi/.conda/envs/djtest/lib/python3.8/site-packages/celery/fixups/django.py:203: UserWarning: Using settings.DEBUG leads to a memory
leak, never use this setting in production environments!
warnings.warn('''Using settings.DEBUG leads to a memory
[2022-07-31 04:13:00,477: INFO/MainProcess] celery@MacdeMacBook-Pro.local ready.
[2022-07-31 04:14:35,203: INFO/MainProcess] Task app1.tasks.add[51b7c305-ceb5-4ab9-8999-e415e139356a] received
[2022-07-31 04:14:35,231: INFO/ForkPoolWorker-64] Task app1.tasks.add[51b7c305-ceb5-4ab9-8999-e415e139356a] succeeded in 0.024190212000007705s: 6
消息队列:
相关文章:
Celery 学习与实践
官方文档 | Using Celery with Django
Celery 与 Django
celery结合django使用
Github|Django结合Celery案例
为者常成,行者常至
自由转载-非商用-非衍生-保持署名(创意共享3.0许可证)