Django RQ¶
Django RQ 是 Django 对 RQ(Redis Queue)的集成封装,用 Redis 作为队列和结果存储,适合轻量异步任务、定时任务、后台处理和中小规模任务队列。以下内容以
django-rq 4.x+RQ 2.x为主,兼顾django-rq 3.x的常见用法;旧版接口在文中会特别说明。
目录¶
一、基础篇¶
1.1 RQ 与 Django RQ¶
RQ 是一个基于 Redis 的 Python 任务队列。
核心概念对比:
概念 |
RQ |
Django RQ |
|---|---|---|
队列管理 |
|
|
任务创建 |
|
|
延迟/定时任务 |
|
同 RQ 原生 API,Worker 需加 |
Cron/周期任务 |
|
|
Worker |
|
|
任务获取 |
|
|
常用导入方式:
# 基础导入
import django_rq
from django_rq import job, get_queue, get_scheduler, get_job
# RQ 原生导入
from rq import Queue, Worker, Retry, Repeat
from rq.job import Job
from rq.registry import FailedJobRegistry, ScheduledJobRegistry
注意:django_rq.get_scheduler() 依赖可选包 rq-scheduler,它和 RQ 2.x 内置的 enqueue_at() / enqueue_in() 调度机制不是同一个系统。只使用延迟/定时入队时,优先使用 RQ 内置 scheduler;确实需要 schedule() / cron() 这类 rq-scheduler API 时,再安装并运行 rqscheduler。
Django Web 请求
|
| enqueue
v
Redis Queue
|
| pop
v
RQ Worker 执行任务
|
v
Redis 保存任务状态和结果
核心组件:
组件 |
说明 |
|---|---|
Redis |
队列、任务元数据、结果存储 |
Queue |
队列,如 default、high、low |
Job |
一个异步任务实例 |
Worker |
消费队列并执行任务的进程 |
Scheduler |
定时或延迟投递任务 |
Django RQ 提供:
Django settings 集成。
manage.py rqworker命令。Django Admin 管理页面。
装饰器和便捷 API。
1.2 安装与配置¶
安装:
# 基本安装
pip install django-rq
# 如果要使用 django_rq.get_scheduler() / manage.py rqscheduler
pip install rq-scheduler
# 如果要暴露 Prometheus 指标
pip install "django-rq[prometheus]"
基础配置 settings.py:
INSTALLED_APPS = [
# ...
"django_rq",
]
# 队列配置
RQ_QUEUES = {
"default": {
"HOST": "localhost",
"PORT": 6379,
"DB": 0,
"DEFAULT_TIMEOUT": 300,
"QUEUE_CLASS": "django_rq.queues.DjangoRQ",
},
"high": {
"HOST": "localhost",
"PORT": 6379,
"DB": 0,
"DEFAULT_TIMEOUT": 300,
"QUEUE_CLASS": "django_rq.queues.DjangoRQ",
},
"low": {
"HOST": "localhost",
"PORT": 6379,
"DB": 0,
"DEFAULT_TIMEOUT": 600,
"QUEUE_CLASS": "django_rq.queues.DjangoRQ",
},
}
QUEUE_CLASS 通常不用显式配置;django-rq 默认使用 django_rq.queues.DjangoRQ。如果自定义队列类,建议继承 django_rq.queues.DjangoRQ,而不是继承普通 rq.Queue 后直接替换。
使用 Redis URL:
RQ_QUEUES = {
"default": {
"URL": "redis://localhost:6379/0",
"DEFAULT_TIMEOUT": 300,
"SSL": False, # 是否使用 SSL
"SSL_CERT_REQS": None, # SSL 证书要求
"REDIS_CLIENT_KWARGS": {
"socket_connect_timeout": 5,
"socket_timeout": 10,
"retry_on_timeout": True,
},
},
"high": {
"URL": "redis://:password@localhost:6379/1", # 带密码
"DEFAULT_TIMEOUT": 300,
},
}
说明:RQ_QUEUES 支持 HOST / PORT / DB / USERNAME / PASSWORD、URL、USE_REDIS_CACHE、Sentinel 配置等。Redis Cluster 不是 django-rq 的普通 URL 配置能力,生产使用前需要单独验证 Redis 客户端和队列语义。
高级配置选项:
# Django RQ 全局配置
RQ = {
# django-rq 4.x 默认 on_db_commit:处于事务内时,事务提交后才真正入队。
# 可选值:on_db_commit、auto、request_finished。
"COMMIT_MODE": "on_db_commit",
"DEFAULT_RESULT_TTL": 500,
"WORKER_CLASS": "rq.Worker",
"JOB_CLASS": "rq.job.Job",
"QUEUE_CLASS": "django_rq.queues.DjangoRQ",
"SERIALIZER": "rq.serializers.JSONSerializer", # 序列化器
"SCHEDULER_CLASS": "django_rq.queues.DjangoScheduler", # 仅 rq-scheduler 可用
}
# 自定义异常处理器是独立设置,不放在 RQ 字典里
RQ_EXCEPTION_HANDLERS = ["app.rq_handlers.handle_exception"]
# 自定义序列化器
RQ["SERIALIZER"] = "app.serializers.CustomSerializer"
# 自定义异常处理器
import logging
logger = logging.getLogger(__name__)
def handle_rq_exception(job, exc_type, exc_value, traceback):
"""自定义异常处理"""
# 记录异常或写入业务告警表
logger.exception("RQ job failed", exc_info=(exc_type, exc_value, traceback))
# 发送告警
send_alert(f"Job {job.id} failed: {exc_value}")
# RQ 约定:返回 False 表示停止继续调用后续异常处理器;
# 返回 True 或 None 表示继续传给下一个处理器。
return False
RQ_EXCEPTION_HANDLERS = [handle_rq_exception]
URL 配置:
# urls.py
from django.contrib import admin
from django.urls import path, include
urlpatterns = [
path("admin/", admin.site.urls),
# django-rq 4.x 会自动注册 Django Admin 页面:
# /admin/django_rq/dashboard/
# 可选:也可以挂载 standalone 页面,生产环境必须加权限保护
path("django-rq/", include("django_rq.urls")),
]
环境变量配置:
import os
RQ_QUEUES = {
"default": {
"URL": os.getenv("REDIS_URL", "redis://localhost:6379/0"),
"DEFAULT_TIMEOUT": int(os.getenv("RQ_DEFAULT_TIMEOUT", 300)),
},
}
路由:
urlpatterns = [
# ...
path("django-rq/", include("django_rq.urls")),
]
访问 /django-rq/ 可以查看队列、Worker、任务状态;如果启用了 Admin 集成,也可以访问 /admin/django_rq/dashboard/。暴露 /django-rq/stats.json 或 /django-rq/metrics/ 给监控系统时,建议配置 RQ_API_TOKEN。
1.3 队列、Worker、Job¶
获取队列的多种方式:
import django_rq
# 方式1:获取默认队列
queue = django_rq.get_queue("default")
# 方式2:获取指定队列,带连接参数
queue = django_rq.get_queue(
"high",
connection=django_rq.get_connection("high"),
default_timeout=600
)
# 方式3:按 settings.RQ_QUEUES 获取所有队列
from django.conf import settings
for queue_name in settings.RQ_QUEUES:
queue = django_rq.get_queue(queue_name)
print(f"Queue {queue_name}: {queue.count} jobs")
# 方式4:使用 RQ 原生方式
from redis import Redis
from rq import Queue
redis_conn = Redis(host='localhost', port=6379, db=0)
queue = Queue('default', connection=redis_conn)
创建任务的多种方式:
import django_rq
from datetime import datetime, timedelta
queue = django_rq.get_queue("default")
# 方式1:基本任务创建
job = queue.enqueue("app.tasks.send_email", user_id=1001)
print(f"Job ID: {job.id}")
# 方式2:带参数的任务
job = queue.enqueue(
"app.tasks.process_data",
args=[1, 2, 3],
kwargs={"option": "fast"},
job_timeout=300, # RQ 原生参数;django-rq 也常兼容 timeout 别名
result_ttl=3600,
failure_ttl=7200,
description="Process user data"
)
# 方式3:延迟任务
job = queue.enqueue_in(
timedelta(minutes=10),
"app.tasks.cleanup",
cleanup_type="temp_files"
)
# 方式4:定时任务
job = queue.enqueue_at(
datetime(2026, 4, 28, 12, 0),
"app.tasks.daily_report"
)
# 方式5:使用 create_job 方法(更底层)
from rq.job import Job
job = Job.create(
func="app.tasks.export_data",
args=[1001],
connection=queue.connection,
result_ttl=1800
)
queue.enqueue_job(job)
获取和管理任务:
import django_rq
from rq.job import Job
# 方式1:使用 Django RQ 的 get_job
job = django_rq.get_job(job_id, "default")
if job:
print(f"Status: {job.get_status()}")
print(f"Result: {job.return_value()}")
print(f"Created at: {job.created_at}")
print(f"Enqueued at: {job.enqueued_at}")
print(f"Started at: {job.started_at}")
print(f"Ended at: {job.ended_at}")
# 方式2:使用原生 Job.fetch
connection = django_rq.get_connection("default")
job = Job.fetch(job_id, connection=connection)
# 方式3:批量获取任务
queued_jobs = queue.get_jobs() # 只返回当前队列里等待执行的任务
# 方式4:获取失败的任务
from rq.registry import FailedJobRegistry
failed_registry = FailedJobRegistry(queue=queue)
for job_id in failed_registry.get_job_ids():
failed_job = django_rq.get_job(job_id, "default")
if failed_job:
latest_result = failed_job.latest_result()
error = latest_result.exc_string if latest_result else failed_job.exc_info
print(f"Failed job {failed_job.id}: {error}")
# 方式5:取消任务
if job.get_status() == "queued":
job.cancel()
print(f"Job {job.id} cancelled")
# 方式6:重新入队失败的任务
if job.get_status() == "failed":
queue.failed_job_registry.requeue(job)
print(f"Job {job.id} requeued")
启动 Worker:
# 基本启动
python manage.py rqworker default
# 消费多个队列(优先级顺序)
python manage.py rqworker high default low
# 指定 Worker 名称
python manage.py rqworker default --name worker1
# 只处理当前积压任务后退出
python manage.py rqworker default --burst
# 单进程内限制最多处理的任务数
python manage.py rqworker default --max-jobs 100
# 指定日志级别
python manage.py rqworker default --verbosity 2
# 开启 RQ 内置 scheduler,用于 enqueue_at/enqueue_in、Retry(interval=...)、Repeat
python manage.py rqworker default --with-scheduler
# 多进程 Worker 池
python manage.py rqworker-pool default low --num-workers 4
二、任务使用篇¶
2.1 普通任务¶
app/tasks.py:
def send_welcome_email(user_id: int) -> str:
from django.contrib.auth import get_user_model
User = get_user_model()
user = User.objects.get(id=user_id)
# 这里调用真实邮件服务
print(f"send email to {user.email}")
return "ok"
视图中投递:
import django_rq
from app.tasks import send_welcome_email
def register(request):
# 创建用户后异步发送邮件
queue = django_rq.get_queue("default")
job = queue.enqueue(send_welcome_email, user_id=request.user.id)
return JsonResponse({"job_id": job.id})
2.2 装饰器方式¶
基本装饰器:
import django_rq
@django_rq.job("default", timeout=300)
def generate_report(report_id: int):
# 生成报表
return {"report_id": report_id, "status": "done"}
调用方式:
# 方式1:使用 delay 方法(推荐)
job = generate_report.delay(report_id=1)
# 方式2:enqueue 是 delay 的别名
job = generate_report.enqueue(report_id=1)
# 方式3:直接调用原函数(同步执行)
result = generate_report(report_id=1) # 注意:这会同步执行!
# 方式4:使用 enqueue 方法
queue = django_rq.get_queue("default")
job = queue.enqueue(generate_report, report_id=1)
装饰器高级选项:
import django_rq
from datetime import timedelta
from rq import Retry
@django_rq.job(
"default",
timeout=600,
result_ttl=3600,
failure_ttl=86400,
ttl=300, # 任务在队列中的最大等待时间
depends_on=None,
job_id=None, # 自定义 Job ID
at_front=False, # 是否插入队列前端
meta={"source": "api", "version": "1.0"},
retry=Retry(max=3, interval=[10, 30, 60]),
description="生成用户报表"
)
def generate_user_report(user_id: int, report_type: str = "basic"):
"""生成用户报表任务"""
# 业务逻辑
return {"user_id": user_id, "report_type": report_type, "status": "done"}
# 延迟任务装饰器
@django_rq.job("default", timeout=300)
def send_reminder_email(user_id: int):
# 发送提醒邮件
pass
# 延迟调用:@job 装饰器只提供 delay/enqueue,
# 延迟和定时投递请使用 Queue 的 enqueue_in/enqueue_at。
queue = django_rq.get_queue("default")
job = queue.enqueue_in(timedelta(hours=24), send_reminder_email, user_id=1001)
# 定时调用
from datetime import datetime
job = queue.enqueue_at(datetime(2026, 4, 29, 9, 0), send_reminder_email, user_id=1001)
类方法装饰器:
import django_rq
class ReportService:
@classmethod
@django_rq.job("default")
def generate_report(cls, report_id: int):
# 生成报表
return {"report_id": report_id}
# 调用
job = ReportService.generate_report.delay(report_id=1)
带参数绑定的装饰器:
import django_rq
from functools import partial
# 创建带预设参数的装饰器
def low_priority_job(func):
return django_rq.job("low", timeout=1800)(func)
@low_priority_job
def batch_process_data(batch_id: int):
# 批量处理数据
pass
# 使用 partial 创建特定配置的装饰器
def create_job_decorator(queue_name, **kwargs):
"""创建自定义队列的装饰器"""
def decorator(func):
return django_rq.job(queue_name, **kwargs)(func)
return decorator
email_job = create_job_decorator("high", timeout=30, result_ttl=300)
@email_job
def send_notification_email(user_id: int):
# 发送通知邮件
pass
2.3 延迟任务¶
from datetime import timedelta
import django_rq
queue = django_rq.get_queue("default")
job = queue.enqueue_in(
timedelta(minutes=10),
"app.tasks.close_unpaid_order",
order_id=1001,
)
指定时间执行:
from datetime import datetime, timezone
queue.enqueue_at(
datetime(2026, 4, 28, 12, 0, tzinfo=timezone.utc),
"app.tasks.send_notification",
user_id=1001,
)
enqueue_in() / enqueue_at() 使用 RQ 内置 scheduler,需要至少有一个 Worker 带 scheduler 组件运行。启动命令见前文 1.3 队列、Worker、Job。
2.4 任务结果¶
job = queue.enqueue(generate_report, report_id=1, result_ttl=3600)
print(job.id)
print(job.return_value()) # RQ 1.12+ 推荐;job.result 已逐步废弃
print(job.get_status())
latest_result = job.latest_result()
if latest_result:
print(latest_result.type, latest_result.return_value)
常见状态:
状态 |
说明 |
|---|---|
|
等待执行 |
|
正在执行 |
|
执行成功 |
|
执行失败 |
|
依赖任务未完成 |
|
已计划执行 |
2.5 失败重试¶
基本重试:
from rq import Retry
queue.enqueue(
"app.tasks.call_remote_api",
user_id=1001,
retry=Retry(max=3, interval=[10, 30, 60]),
)
高级重试配置:
from rq import Retry
# 1. 指数退避重试
retry = Retry(
max=5,
interval=[10, 30, 60, 120, 240], # 自定义间隔
)
# 2. 只对临时性异常重试:在任务内区分异常类型,永久性错误直接抛出
def call_remote_api(user_id: int):
try:
return remote_client.call(user_id)
except (ConnectionError, TimeoutError):
raise # 让 RQ 的 Retry 机制处理
except ValueError:
raise # 参数错误属于永久失败,不应盲目重试
# 3. 装饰器中的重试
@django_rq.job("default", retry=Retry(max=3))
def process_payment(payment_id: int):
# 处理支付
pass
# 4. 不同任务使用不同 Retry 配置
email_retry = Retry(max=2, interval=30)
payment_retry = Retry(max=5, interval=[10, 30, 60, 120, 300])
queue.enqueue("app.tasks.send_email", user_id=1001, retry=email_retry)
queue.enqueue("app.tasks.process_payment", payment_id=2001, retry=payment_retry)
# 5. 任务内主动要求重试(RQ 2.1+)
import django_rq
from rq import get_current_job
def process_with_retry(data_id: int):
job = get_current_job()
try:
# 业务逻辑
result = call_external_api(data_id)
return result
except ConnectionError as e:
# 检查重试次数
retry_count = job.meta.get("retry_count", 0)
if retry_count < 3:
job.meta["retry_count"] = retry_count + 1
job.save_meta()
# 返回 Retry 会让 RQ 重新调度本次任务
return Retry(max=1, interval=60)
raise e
重试监控和统计:
import django_rq
queue = django_rq.get_queue("default")
# 获取失败任务
failed_jobs = queue.failed_job_registry.get_job_ids()
# 分析失败原因
for job_id in failed_jobs:
job = django_rq.get_job(job_id, "default")
if job:
print(f"Job {job_id} failed: {job.exc_info}")
# 检查重试次数
retry_count = job.meta.get("retry_count", 0)
print(f"Retry count: {retry_count}")
# 检查失败时间
if job.ended_at:
failed_time = job.ended_at
print(f"Failed at: {failed_time}")
建议:
网络请求、第三方服务调用适合重试
参数错误、业务状态不合法不应盲目重试
重试任务必须保证幂等
设置合理的重试上限,避免无限重试
使用指数退避策略,避免对服务造成压力
记录重试日志,便于问题排查
区分临时性错误和永久性错误,不同错误不同处理
考虑使用死信队列处理多次重试仍失败的任务
2.6 任务依赖¶
job1 = queue.enqueue("app.tasks.extract_data")
job2 = queue.enqueue("app.tasks.transform_data", depends_on=job1)
job3 = queue.enqueue("app.tasks.load_data", depends_on=job2)
适合简单流水线任务;复杂 DAG 建议使用 Airflow、Prefect 等工作流系统。
三、Worker 篇¶
3.1 启动 Worker¶
Worker 启动命令已统一放在前文 1.3 队列、Worker、Job。队列顺序有优先级含义,Worker 会优先消费前面的队列。
3.2 Worker 数量¶
建议:
CPU 密集型任务:Worker 数量接近 CPU 核数。
IO 密集型任务:可适当增加 Worker。
长任务和短任务分不同队列,避免相互阻塞。
高优先级任务使用独立队列。
3.3 Supervisor 管理¶
/etc/supervisor/conf.d/django-rq.conf:
[program:rqworker-default]
directory=/srv/myproject
command=/srv/myproject/venv/bin/python manage.py rqworker default
user=www-data
numprocs=2
process_name=%(program_name)s_%(process_num)02d
autostart=true
autorestart=true
redirect_stderr=true
stdout_logfile=/var/log/myproject/rqworker-default.log
stopwaitsecs=3600
启动:
sudo supervisorctl reread
sudo supervisorctl update
sudo supervisorctl status
3.4 任务超时¶
队列默认超时:
RQ_QUEUES = {
"default": {
"URL": "redis://localhost:6379/0",
"DEFAULT_TIMEOUT": 300,
}
}
单任务超时:
queue.enqueue("app.tasks.long_task", job_timeout=1800)
超时任务会被标记失败。长任务要合理拆分,避免一个任务运行过久。
四、定时任务篇¶
4.1 RQ 内置 scheduler¶
RQ 2.x 自带 scheduler,适合“一次性延迟/定时任务”、Retry(interval=...) 和 Repeat。它不需要安装 rq-scheduler,但需要 Worker 带 --with-scheduler 运行。
from datetime import datetime, timedelta, timezone
from rq import Repeat
import django_rq
queue = django_rq.get_queue("default")
# 10 分钟后执行
job = queue.enqueue_in(
timedelta(minutes=10),
"app.tasks.close_unpaid_order",
order_id=1001,
)
# 指定时间执行;建议使用带时区的 datetime
job = queue.enqueue_at(
datetime(2026, 4, 29, 9, 0, tzinfo=timezone.utc),
"app.tasks.daily_report",
)
# 成功后重复执行(RQ 2.2+)
job = queue.enqueue(
"app.tasks.sync_statistics",
repeat=Repeat(times=5, interval=300),
)
启动命令见前文 1.3 队列、Worker、Job 中的 --with-scheduler 示例。
已计划的任务存放在 ScheduledJobRegistry 中:
from rq.registry import ScheduledJobRegistry
registry = ScheduledJobRegistry(queue=queue)
for job_id in registry.get_job_ids():
scheduled_at = registry.get_scheduled_time(job_id)
print(job_id, scheduled_at)
4.2 RQ CronScheduler¶
django-rq 4.x 支持 RQ 的 CronScheduler,可用于 cron 表达式和固定间隔的周期任务。先创建 cron 配置文件:
# cron_config.py
from rq import cron
from myapp.tasks import send_report, sync_data
cron.register(send_report, queue_name="default", cron="0 9 * * *")
cron.register(sync_data, queue_name="high", interval=30)
启动:
python manage.py rqcron cron_config.py
python manage.py rqcron myapp.cron_jobs --logging-level DEBUG
4.3 可选:rq-scheduler / get_scheduler¶
django_rq.get_scheduler() 是对第三方包 rq-scheduler 的集成。只有需要 scheduler.schedule() / scheduler.cron() 这类 API 时才需要它。
pip install rq-scheduler
启动:
# 基本启动
python manage.py rqscheduler
# 指定队列
python manage.py rqscheduler --queue default
# 指定检查间隔
python manage.py rqscheduler --interval 30
# 写入 pid 文件
python manage.py rqscheduler --pid /tmp/rqscheduler.pid
使用:
from datetime import datetime, timedelta, timezone
import django_rq
scheduler = django_rq.get_scheduler("default", interval=60)
# 一次性定时任务
job = scheduler.enqueue_at(
datetime.now(timezone.utc) + timedelta(hours=1),
"app.tasks.send_reminder",
user_id=1001,
)
# 周期任务
job = scheduler.schedule(
scheduled_time=datetime.now(timezone.utc),
func="app.tasks.sync_statistics",
interval=3600,
repeat=None,
kwargs={"force": False},
timeout=300,
result_ttl=1800,
)
# cron 任务
job = scheduler.cron(
"0 9 * * *",
func="app.tasks.daily_report",
kwargs={"report_type": "daily"},
repeat=None,
timeout=600,
)
for scheduled_job in scheduler.get_jobs():
print(scheduled_job.id, scheduled_job.func_name)
scheduler.cancel(job)
4.4 Cron 表达式示例¶
Cron 表达式示例:
# 每分钟执行一次
"* * * * *"
# 每5分钟执行一次
"*/5 * * * *"
# 每小时的第30分钟执行
"30 * * * *"
# 每天凌晨2点执行
"0 2 * * *"
# 每周一上午9点执行
"0 9 * * 1"
# 每月1号和15号凌晨3点执行
"0 3 1,15 * *"
# 工作日(周一到周五)上午10点执行
"0 10 * * 1-5"
# 每季度第一天凌晨4点执行
"0 4 1 1,4,7,10 *"
rq-scheduler 的可选配置:
# settings.py
RQ = {
"SCHEDULER_CLASS": "django_rq.queues.DjangoScheduler",
"DEFAULT_RESULT_TTL": 86400,
}
RQ_QUEUES = {
"default": {
"URL": "redis://localhost:6379/0",
"DEFAULT_TIMEOUT": 300,
},
}
4.5 调度器监控和管理¶
from rq.registry import ScheduledJobRegistry
import django_rq
queue = django_rq.get_queue("default")
registry = ScheduledJobRegistry(queue=queue)
print(f"Scheduled jobs: {registry.count}")
for job_id in registry.get_job_ids():
print(job_id, registry.get_scheduled_time(job_id))
django-rq 的 dashboard 和 /django-rq/stats.json 会展示 RQ 内置 scheduler 的 scheduled jobs。rq-scheduler 的周期任务需要使用 scheduler.get_jobs() 或其自身数据结构查看。
生产环境建议:
内置 scheduler 可多 Worker 启用:RQ 会为同一队列选出一个 active scheduler。
rq-scheduler 单实例运行:如果使用
rqscheduler,确保每个队列只有一个 scheduler 实例,避免重复调度。监控:监控调度器进程状态和 Redis 连接
日志:记录调度器的操作日志
错误处理:调度器异常时自动重启
备份:定期备份调度器配置
# 使用 Supervisor 管理调度器
[program:rqscheduler]
command=/path/to/venv/bin/python manage.py rqscheduler --queue default --interval 30
autostart=true
autorestart=true
redirect_stderr=true
stdout_logfile=/var/log/rq/scheduler.log
五、可靠性与幂等篇¶
5.1 幂等设计¶
任务可能因为 Worker 崩溃、网络抖动、重试而执行多次。
常用方式:
使用业务唯一键。
状态机控制流转。
数据库唯一约束防重复。
外部接口传幂等键。
任务开始前检查是否已完成。
示例:
def pay_success_callback(order_id: int, event_id: str):
if PaymentEvent.objects.filter(event_id=event_id).exists():
return
PaymentEvent.objects.create(event_id=event_id, order_id=order_id)
Order.objects.filter(id=order_id, status="pending").update(status="paid")
5.2 事务提交后投递任务¶
避免数据库事务回滚但任务已经执行:
from django.db import transaction
import django_rq
def create_order(request):
order = Order.objects.create(...)
transaction.on_commit(
lambda: django_rq.get_queue("default").enqueue(
"orders.tasks.close_unpaid_order",
order_id=order.id,
)
)
5.3 失败任务处理¶
建议:
明确哪些异常可重试。
记录业务失败原因。
对失败任务提供后台重试入口。
对外部接口调用设置超时。
任务日志带
job_id和业务 ID。
六、性能优化篇¶
6.1 队列拆分¶
常见队列:
队列 |
任务 |
|---|---|
|
短任务、用户感知任务 |
|
普通异步任务 |
|
报表、批处理、低优先级任务 |
|
长耗时任务 |
Worker 启动命令见前文 1.3 队列、Worker、Job,不同优先级队列建议按业务重要性拆分独立 Worker。
6.2 Redis 优化¶
建议:
RQ 使用独立 Redis DB 或独立实例。
设置合理
result_ttl和failure_ttl。不要在 Redis 内长期保存大量任务结果。
监控 Redis 内存、连接数、慢查询。
开启持久化时关注磁盘 IO。
6.3 任务优化¶
批量处理数据库查询,避免 N+1。
长任务拆分为多个小任务。
外部 HTTP 请求设置超时。
大文件处理使用对象存储,不把大内容塞入任务参数。
CPU 密集型任务可以考虑 Celery、进程池或专门计算服务。
七、运维与监控篇¶
7.1 Django RQ 管理页面¶
基本管理页面:
/django-rq/ 可查看:
队列长度和状态
Worker 状态和数量
成功任务列表
失败任务列表
计划任务列表
任务详情和执行日志
Admin 集成与指标接口:
# urls.py
urlpatterns = [
# 可选 standalone dashboard
path("django-rq/", include("django_rq.urls")),
]
# settings.py
RQ_API_TOKEN = "change-me" # 保护 /django-rq/stats.json 和 /django-rq/metrics/
RQ_SHOW_ADMIN_LINK = True # 是否在 Django Admin 侧边栏显示入口
功能对比:
功能 |
Django Admin 集成 |
Standalone URL |
|---|---|---|
实时队列监控 |
✓ |
✓ |
Worker 状态 |
✓ |
✓ |
Job registry 浏览 |
✓ |
✓ |
JSON 统计 |
✓ |
✓ |
Prometheus 指标 |
安装 |
安装 |
7.2 命令行工具¶
队列管理:
# 查看队列、Worker、失败/完成/延迟等统计
python manage.py rqstats
python manage.py rqstats --interval 1
python manage.py rqstats --json
python manage.py rqstats --yaml
任务管理:
# 暂停/恢复 Worker 拉取新任务
python manage.py rqsuspend
python manage.py rqsuspend --duration 600
python manage.py rqresume
# CronScheduler
python manage.py rqcron cron_config.py
Worker 管理:
rqworker 启动参数已经在前文 1.3 队列、Worker、Job 统一说明,这里不再重复。
7.3 编程接口监控¶
import django_rq
from datetime import datetime, timedelta
# 1. 获取队列统计
queue = django_rq.get_queue("default")
# 队列长度
queue_length = queue.count
print(f"Queue length: {queue_length}")
queued_jobs = queue.get_jobs()
print(f"Queued jobs: {len(queued_jobs)}")
# 按 registry 统计
status_counts = {
"queued": len(queued_jobs),
"started": queue.started_job_registry.count,
"finished": queue.finished_job_registry.count,
"failed": queue.failed_job_registry.count,
"deferred": queue.deferred_job_registry.count,
"scheduled": queue.scheduled_job_registry.count,
}
print(f"Status counts: {status_counts}")
# 2. Worker 监控
from rq import Worker
workers = Worker.all(connection=queue.connection)
for worker in workers:
print(f"Worker: {worker.name}")
print(f" State: {worker.state}")
print(f" Birth: {worker.birth_date}")
print(f" Queues: {worker.queue_names()}")
# 检查是否存活
if worker.is_idle:
print(" Status: Idle")
elif worker.is_suspended:
print(" Status: Suspended")
else:
print(" Status: Busy")
# 3. 任务执行时间监控
finished_jobs = queue.finished_job_registry.get_job_ids()
for job_id in finished_jobs[:10]: # 查看最近10个
job = django_rq.get_job(job_id, "default")
if job and job.ended_at and job.started_at:
duration = (job.ended_at - job.started_at).total_seconds()
print(f"Job {job_id}: {duration:.2f} seconds")
# 4. 失败任务分析
failed_jobs = queue.failed_job_registry.get_job_ids()
print(f"Failed jobs: {len(failed_jobs)}")
for job_id in failed_jobs[:5]:
job = django_rq.get_job(job_id, "default")
if job:
print(f"Failed job {job_id}:")
print(f" Function: {job.func_name}")
print(f" Arguments: {job.args}")
print(f" Error: {job.exc_info}")
print(f" Failed at: {job.ended_at}")
# 5. 内存使用监控
import redis
redis_conn = django_rq.get_connection("default")
info = redis_conn.info()
print(f"Redis memory used: {info['used_memory_human']}")
print(f"Connected clients: {info['connected_clients']}")
print(f"Total commands: {info['total_commands_processed']}")
# 6. 自定义监控指标
class RQMonitor:
def __init__(self):
self.queues = ["default", "high", "low"]
def get_queue_metrics(self):
metrics = {}
for queue_name in self.queues:
queue = django_rq.get_queue(queue_name)
metrics[queue_name] = {
"length": queue.count,
"jobs": len(queue.get_jobs()),
"started": queue.started_job_registry.count,
"failed": queue.failed_job_registry.count,
"finished": queue.finished_job_registry.count,
"scheduled": queue.scheduled_job_registry.count,
}
return metrics
def get_worker_metrics(self):
metrics = {"total": 0, "busy": 0, "idle": 0}
for queue_name in self.queues:
connection = django_rq.get_connection(queue_name)
workers = Worker.all(connection=connection)
metrics["total"] += len(workers)
for worker in workers:
if worker.is_idle:
metrics["idle"] += 1
else:
metrics["busy"] += 1
return metrics
def check_health(self):
"""检查 RQ 系统健康状态"""
health = {"status": "healthy", "issues": []}
# 检查 Redis 连接
try:
redis_conn = django_rq.get_connection("default")
redis_conn.ping()
except Exception as e:
health["status"] = "unhealthy"
health["issues"].append(f"Redis connection failed: {e}")
# 检查队列积压
queue_metrics = self.get_queue_metrics()
for queue_name, metrics in queue_metrics.items():
if metrics["length"] > 1000: # 阈值
health["issues"].append(f"Queue {queue_name} backlog: {metrics['length']}")
if metrics["failed"] > 100: # 失败任务过多
health["issues"].append(f"Queue {queue_name} has {metrics['failed']} failed jobs")
# 检查 Worker 状态
worker_metrics = self.get_worker_metrics()
if worker_metrics["total"] == 0:
health["status"] = "unhealthy"
health["issues"].append("No workers running")
return health
# 使用监控
monitor = RQMonitor()
print("Queue metrics:", monitor.get_queue_metrics())
print("Worker metrics:", monitor.get_worker_metrics())
print("Health check:", monitor.check_health())
7.4 集成外部监控系统¶
Prometheus 监控:
# app/monitoring/prometheus_metrics.py
from prometheus_client import Counter, Gauge, Histogram
import django_rq
# 定义指标
RQ_JOBS_TOTAL = Counter(
"rq_jobs_total",
"Total number of RQ jobs",
["queue", "status"]
)
RQ_QUEUE_LENGTH = Gauge(
"rq_queue_length",
"Current queue length",
["queue"]
)
RQ_JOB_DURATION = Histogram(
"rq_job_duration_seconds",
"RQ job duration in seconds",
["queue", "function"]
)
RQ_FAILED_JOBS = Counter(
"rq_failed_jobs_total",
"Total failed RQ jobs",
["queue", "error_type"]
)
# 监控装饰器
def monitor_rq_job(queue_name="default"):
"""监控 RQ 任务的装饰器"""
def decorator(func):
def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = func(*args, **kwargs)
RQ_JOBS_TOTAL.labels(queue=queue_name, status="success").inc()
return result
except Exception as e:
error_type = type(e).__name__
RQ_JOBS_TOTAL.labels(queue=queue_name, status="failed").inc()
RQ_FAILED_JOBS.labels(queue=queue_name, error_type=error_type).inc()
raise
finally:
duration = time.time() - start_time
RQ_JOB_DURATION.labels(queue=queue_name, function=func.__name__).observe(duration)
return wrapper
return decorator
# 使用示例
@monitor_rq_job("default")
def process_data(data_id: int):
# 处理数据
pass
# 定期收集队列指标
def collect_queue_metrics():
"""收集队列指标到 Prometheus"""
queues = ["default", "high", "low"]
for queue_name in queues:
queue = django_rq.get_queue(queue_name)
RQ_QUEUE_LENGTH.labels(queue=queue_name).set(queue.count)
生产环境必须加权限控制,不要开放公网访问。
7.5 常用 Redis 命令¶
基本监控命令:
# 连接 Redis
redis-cli
# 查看内存使用
INFO memory
# 查看客户端连接
INFO clients
# 查看数据库大小
DBSIZE
# 查看 RQ 相关键(生产环境慎用)
KEYS rq:*
# 安全扫描(推荐)
SCAN 0 MATCH rq:* COUNT 100
RQ 特定命令:
# 查看队列长度
redis-cli LLEN rq:queue:default
# 查看 Worker 注册
redis-cli SMEMBERS rq:workers
# 查看失败任务
redis-cli ZRANGE rq:failed:default 0 -1
# 查看计划任务
redis-cli ZRANGE rq:scheduled:default 0 -1
# 查看任务结果
redis-cli XREVRANGE rq:results:job_id + - COUNT 1
# 查看任务元数据
redis-cli HGETALL rq:job:job_id
性能诊断命令:
# 查看慢查询
redis-cli SLOWLOG GET 10
# 查看命令统计
redis-cli INFO commandstats
# 查看连接统计
redis-cli INFO stats
# 查看键空间统计
redis-cli INFO keyspace
# 监控实时命令
redis-cli MONITOR
维护命令:
# 清空特定队列
redis-cli DEL rq:queue:default
# 删除失败任务
redis-cli DEL rq:failed:default
# 删除所有 RQ 相关键(危险!)
redis-cli --scan --pattern "rq:*" | xargs redis-cli DEL
# 导出 RQ 数据
redis-cli --rdb rq_backup.rdb
# 导入 RQ 数据
redis-cli --pipe < rq_backup.rdb
生产不要对大实例频繁使用 KEYS,可用 SCAN。
7.6 监控指标和阈值建议¶
指标 |
警告阈值 |
严重阈值 |
检查频率 |
|---|---|---|---|
队列长度 |
> 500 |
> 1000 |
1分钟 |
Worker 离线率 |
> 20% |
> 50% |
30秒 |
失败任务率 |
> 5% |
> 10% |
5分钟 |
Redis 内存使用 |
> 70% |
> 85% |
1分钟 |
平均任务耗时 |
> 300秒 |
> 600秒 |
5分钟 |
Scheduler 状态 |
未运行 |
停止 > 5分钟 |
30秒 |
告警规则示例(Prometheus):
自动恢复策略:
告警通知集成:
八、常见业务场景¶
8.1 发送邮件和短信¶
请求线程只负责创建业务数据,发送动作异步执行。
transaction.on_commit(
lambda: queue.enqueue("notifications.tasks.send_sms", user_id=user.id)
)
8.2 订单超时关闭¶
queue.enqueue_in(
timedelta(minutes=30),
"orders.tasks.close_unpaid_order",
order_id=order.id,
)
任务执行时再次检查订单状态,避免误关闭已支付订单。
九、最佳实践¶
9.1 最佳实践¶
任务设计最佳实践:
参数设计:
传递 ID 而不是完整对象
避免传递大对象
使用明确的参数名称
# 好:传递 ID queue.enqueue("app.tasks.process_user", user_id=1001) # 不好:传递完整对象 user = User.objects.get(id=1001) queue.enqueue("app.tasks.process_user", user=user)
错误处理:
明确区分临时错误和永久错误
记录详细的错误信息
提供重试机制
def process_payment(payment_id: int): try: payment = Payment.objects.get(id=payment_id) result = call_payment_gateway(payment) return result except Payment.DoesNotExist: # 永久错误:不重试 logger.error(f"Payment {payment_id} not found") raise except (ConnectionError, TimeoutError) as e: # 临时错误:可以重试 logger.warning(f"Payment gateway error: {e}") raise
幂等性设计:
使用唯一业务键
状态机控制
数据库约束
def send_notification(notification_id: int): # 检查是否已发送 if Notification.objects.filter(id=notification_id, status="sent").exists(): return "already_sent" # 标记为发送中 Notification.objects.filter(id=notification_id).update(status="sending") # 发送逻辑 send_actual_notification(notification_id) # 标记为已发送 Notification.objects.filter(id=notification_id).update(status="sent") return "sent"
配置最佳实践:
环境分离:
# settings/production.py RQ_QUEUES = { "default": { "URL": os.getenv("REDIS_URL"), "DEFAULT_TIMEOUT": 600, "SSL": True, } } # settings/development.py RQ_QUEUES = { "default": { "URL": "redis://localhost:6379/0", "DEFAULT_TIMEOUT": 300, } }
队列分离:
RQ_QUEUES = { "high": { # 高优先级:用户感知任务 "URL": "redis://localhost:6379/0", "DEFAULT_TIMEOUT": 30, }, "default": { # 普通任务 "URL": "redis://localhost:6379/0", "DEFAULT_TIMEOUT": 300, }, "low": { # 低优先级:批处理 "URL": "redis://localhost:6379/0", "DEFAULT_TIMEOUT": 3600, }, "scheduler": { # 调度器专用 "URL": "redis://localhost:6379/1", "DEFAULT_TIMEOUT": 300, }, }
9.2 性能优化¶
Redis 优化:
连接池:
RQ_QUEUES = { "default": { "URL": "redis://localhost:6379/0", "REDIS_CLIENT_KWARGS": { "socket_connect_timeout": 5, "socket_timeout": 10, "retry_on_timeout": True, "max_connections": 50, # 连接池大小 }, } }
内存优化:
# Redis 配置 maxmemory 2gb maxmemory-policy allkeys-lru save 900 1 save 300 10 save 60 10000
持久化优化:
# 使用 AOF 和 RDB 混合 appendonly yes appendfsync everysec aof-use-rdb-preamble yes
任务优化:
批量处理:
# 批量处理而不是单个处理 @django_rq.job("low") def batch_process_items(item_ids: List[int]): items = Item.objects.filter(id__in=item_ids) for item in items: process_single_item(item)
连接复用:
# 在任务中复用数据库连接 def process_with_reuse(data_id: int): from django.db import connection # 复用连接 with connection.cursor() as cursor: cursor.execute("SELECT * FROM data WHERE id = %s", [data_id]) # ...
缓存使用:
def process_with_cache(user_id: int): from django.core.cache import cache cache_key = f"user_data_{user_id}" data = cache.get(cache_key) if not data: data = fetch_user_data(user_id) cache.set(cache_key, data, timeout=300) # 使用缓存的数据 process_data(data)
十、面试要点¶
10.1 Django RQ 和 Celery 的区别?¶
对比 |
Django RQ |
Celery |
|---|---|---|
复杂度 |
简单轻量 |
功能强大但复杂 |
Broker |
Redis |
Redis、RabbitMQ 等 |
定时任务 |
依赖 scheduler |
Celery Beat |
生态 |
较小 |
成熟丰富 |
适合场景 |
中小规模异步任务 |
大规模复杂任务系统 |
10.2 RQ 任务为什么要幂等?¶
任务可能重试、Worker 崩溃后重新执行、外部接口超时但实际成功。如果任务不幂等,可能导致重复扣款、重复发券、重复发消息。
10.3 如何保证事务提交后再执行任务?¶
使用 transaction.on_commit() 投递任务,确保数据库事务成功提交后任务才进入队列。
10.4 Worker 挂了怎么办?¶
使用 Supervisor/systemd/Kubernetes 保活。
监控 Worker 心跳和队列长度。
失败任务可重试。
任务逻辑保持幂等。
十一、参考资源¶
官方文档¶
Django RQ: https://github.com/rq/django-rq
Redis: https://redis.io/docs/
RQ scheduling: https://python-rq.org/docs/scheduling/
RQ CronScheduler: https://python-rq.org/docs/cron/
rq-scheduler(可选扩展): https://github.com/rq/rq-scheduler