Django RQ

Django RQ 是 Django 对 RQ(Redis Queue)的集成封装,用 Redis 作为队列和结果存储,适合轻量异步任务、定时任务、后台处理和中小规模任务队列。以下内容以 django-rq 4.x + RQ 2.x 为主,兼顾 django-rq 3.x 的常见用法;旧版接口在文中会特别说明。


目录

  1. 一、基础篇

  2. 二、任务使用篇

  3. 三、Worker 篇

  4. 四、定时任务篇

  5. 五、可靠性与幂等篇

  6. 六、性能优化篇

  7. 七、运维与监控篇

  8. 八、常见业务场景

  9. 九、最佳实践

  10. 十、面试要点

  11. 十一、参考资源


一、基础篇

1.1 RQ 与 Django RQ

RQ 是一个基于 Redis 的 Python 任务队列。

核心概念对比:

概念

RQ

Django RQ

队列管理

Queue

django_rq.get_queue()

任务创建

queue.enqueue()

django_rq.job 装饰器

延迟/定时任务

queue.enqueue_in() / queue.enqueue_at()

同 RQ 原生 API,Worker 需加 --with-scheduler

Cron/周期任务

rq.cron / rq-scheduler

manage.py rqcron 或可选的 django_rq.get_scheduler()

Worker

Worker

manage.py rqworker

任务获取

Job.fetch()

django_rq.get_job()

常用导入方式:

# 基础导入
import django_rq
from django_rq import job, get_queue, get_scheduler, get_job

# RQ 原生导入
from rq import Queue, Worker, Retry, Repeat
from rq.job import Job
from rq.registry import FailedJobRegistry, ScheduledJobRegistry

注意:django_rq.get_scheduler() 依赖可选包 rq-scheduler,它和 RQ 2.x 内置的 enqueue_at() / enqueue_in() 调度机制不是同一个系统。只使用延迟/定时入队时,优先使用 RQ 内置 scheduler;确实需要 schedule() / cron() 这类 rq-scheduler API 时,再安装并运行 rqscheduler

Django Web 请求
    |
    | enqueue
    v
Redis Queue
    |
    | pop
    v
RQ Worker 执行任务
    |
    v
Redis 保存任务状态和结果

核心组件:

组件

说明

Redis

队列、任务元数据、结果存储

Queue

队列,如 default、high、low

Job

一个异步任务实例

Worker

消费队列并执行任务的进程

Scheduler

定时或延迟投递任务

Django RQ 提供:

  • Django settings 集成。

  • manage.py rqworker 命令。

  • Django Admin 管理页面。

  • 装饰器和便捷 API。


1.2 安装与配置

安装:

# 基本安装
pip install django-rq

# 如果要使用 django_rq.get_scheduler() / manage.py rqscheduler
pip install rq-scheduler

# 如果要暴露 Prometheus 指标
pip install "django-rq[prometheus]"

基础配置 settings.py

INSTALLED_APPS = [
    # ...
    "django_rq",
]

# 队列配置
RQ_QUEUES = {
    "default": {
        "HOST": "localhost",
        "PORT": 6379,
        "DB": 0,
        "DEFAULT_TIMEOUT": 300,
        "QUEUE_CLASS": "django_rq.queues.DjangoRQ",
    },
    "high": {
        "HOST": "localhost",
        "PORT": 6379,
        "DB": 0,
        "DEFAULT_TIMEOUT": 300,
        "QUEUE_CLASS": "django_rq.queues.DjangoRQ",
    },
    "low": {
        "HOST": "localhost",
        "PORT": 6379,
        "DB": 0,
        "DEFAULT_TIMEOUT": 600,
        "QUEUE_CLASS": "django_rq.queues.DjangoRQ",
    },
}

QUEUE_CLASS 通常不用显式配置;django-rq 默认使用 django_rq.queues.DjangoRQ。如果自定义队列类,建议继承 django_rq.queues.DjangoRQ,而不是继承普通 rq.Queue 后直接替换。

使用 Redis URL:

RQ_QUEUES = {
    "default": {
        "URL": "redis://localhost:6379/0",
        "DEFAULT_TIMEOUT": 300,
        "SSL": False,  # 是否使用 SSL
        "SSL_CERT_REQS": None,  # SSL 证书要求
        "REDIS_CLIENT_KWARGS": {
            "socket_connect_timeout": 5,
            "socket_timeout": 10,
            "retry_on_timeout": True,
        },
    },
    "high": {
        "URL": "redis://:password@localhost:6379/1",  # 带密码
        "DEFAULT_TIMEOUT": 300,
    },
}

说明:RQ_QUEUES 支持 HOST / PORT / DB / USERNAME / PASSWORDURLUSE_REDIS_CACHE、Sentinel 配置等。Redis Cluster 不是 django-rq 的普通 URL 配置能力,生产使用前需要单独验证 Redis 客户端和队列语义。

高级配置选项:

# Django RQ 全局配置
RQ = {
    # django-rq 4.x 默认 on_db_commit:处于事务内时,事务提交后才真正入队。
    # 可选值:on_db_commit、auto、request_finished。
    "COMMIT_MODE": "on_db_commit",
    "DEFAULT_RESULT_TTL": 500,
    "WORKER_CLASS": "rq.Worker",
    "JOB_CLASS": "rq.job.Job",
    "QUEUE_CLASS": "django_rq.queues.DjangoRQ",
    "SERIALIZER": "rq.serializers.JSONSerializer",  # 序列化器
    "SCHEDULER_CLASS": "django_rq.queues.DjangoScheduler",  # 仅 rq-scheduler 可用
}

# 自定义异常处理器是独立设置,不放在 RQ 字典里
RQ_EXCEPTION_HANDLERS = ["app.rq_handlers.handle_exception"]

# 自定义序列化器
RQ["SERIALIZER"] = "app.serializers.CustomSerializer"

# 自定义异常处理器
import logging

logger = logging.getLogger(__name__)

def handle_rq_exception(job, exc_type, exc_value, traceback):
    """自定义异常处理"""
    # 记录异常或写入业务告警表
    logger.exception("RQ job failed", exc_info=(exc_type, exc_value, traceback))

    # 发送告警
    send_alert(f"Job {job.id} failed: {exc_value}")

    # RQ 约定:返回 False 表示停止继续调用后续异常处理器;
    # 返回 True 或 None 表示继续传给下一个处理器。
    return False

RQ_EXCEPTION_HANDLERS = [handle_rq_exception]

URL 配置:

# urls.py
from django.contrib import admin
from django.urls import path, include

urlpatterns = [
    path("admin/", admin.site.urls),

    # django-rq 4.x 会自动注册 Django Admin 页面:
    # /admin/django_rq/dashboard/

    # 可选:也可以挂载 standalone 页面,生产环境必须加权限保护
    path("django-rq/", include("django_rq.urls")),
]

环境变量配置:

import os

RQ_QUEUES = {
    "default": {
        "URL": os.getenv("REDIS_URL", "redis://localhost:6379/0"),
        "DEFAULT_TIMEOUT": int(os.getenv("RQ_DEFAULT_TIMEOUT", 300)),
    },
}

路由:

urlpatterns = [
    # ...
    path("django-rq/", include("django_rq.urls")),
]

访问 /django-rq/ 可以查看队列、Worker、任务状态;如果启用了 Admin 集成,也可以访问 /admin/django_rq/dashboard/。暴露 /django-rq/stats.json/django-rq/metrics/ 给监控系统时,建议配置 RQ_API_TOKEN


1.3 队列、Worker、Job

获取队列的多种方式:

import django_rq

# 方式1:获取默认队列
queue = django_rq.get_queue("default")

# 方式2:获取指定队列,带连接参数
queue = django_rq.get_queue(
    "high",
    connection=django_rq.get_connection("high"),
    default_timeout=600
)

# 方式3:按 settings.RQ_QUEUES 获取所有队列
from django.conf import settings

for queue_name in settings.RQ_QUEUES:
    queue = django_rq.get_queue(queue_name)
    print(f"Queue {queue_name}: {queue.count} jobs")

# 方式4:使用 RQ 原生方式
from redis import Redis
from rq import Queue

redis_conn = Redis(host='localhost', port=6379, db=0)
queue = Queue('default', connection=redis_conn)

创建任务的多种方式:

import django_rq
from datetime import datetime, timedelta

queue = django_rq.get_queue("default")

# 方式1:基本任务创建
job = queue.enqueue("app.tasks.send_email", user_id=1001)
print(f"Job ID: {job.id}")

# 方式2:带参数的任务
job = queue.enqueue(
    "app.tasks.process_data",
    args=[1, 2, 3],
    kwargs={"option": "fast"},
    job_timeout=300,  # RQ 原生参数;django-rq 也常兼容 timeout 别名
    result_ttl=3600,
    failure_ttl=7200,
    description="Process user data"
)

# 方式3:延迟任务
job = queue.enqueue_in(
    timedelta(minutes=10),
    "app.tasks.cleanup",
    cleanup_type="temp_files"
)

# 方式4:定时任务
job = queue.enqueue_at(
    datetime(2026, 4, 28, 12, 0),
    "app.tasks.daily_report"
)

# 方式5:使用 create_job 方法(更底层)
from rq.job import Job

job = Job.create(
    func="app.tasks.export_data",
    args=[1001],
    connection=queue.connection,
    result_ttl=1800
)
queue.enqueue_job(job)

获取和管理任务:

import django_rq
from rq.job import Job

# 方式1:使用 Django RQ 的 get_job
job = django_rq.get_job(job_id, "default")
if job:
    print(f"Status: {job.get_status()}")
    print(f"Result: {job.return_value()}")
    print(f"Created at: {job.created_at}")
    print(f"Enqueued at: {job.enqueued_at}")
    print(f"Started at: {job.started_at}")
    print(f"Ended at: {job.ended_at}")

# 方式2:使用原生 Job.fetch
connection = django_rq.get_connection("default")
job = Job.fetch(job_id, connection=connection)

# 方式3:批量获取任务
queued_jobs = queue.get_jobs()  # 只返回当前队列里等待执行的任务

# 方式4:获取失败的任务
from rq.registry import FailedJobRegistry

failed_registry = FailedJobRegistry(queue=queue)
for job_id in failed_registry.get_job_ids():
    failed_job = django_rq.get_job(job_id, "default")
    if failed_job:
        latest_result = failed_job.latest_result()
        error = latest_result.exc_string if latest_result else failed_job.exc_info
        print(f"Failed job {failed_job.id}: {error}")

# 方式5:取消任务
if job.get_status() == "queued":
    job.cancel()
    print(f"Job {job.id} cancelled")

# 方式6:重新入队失败的任务
if job.get_status() == "failed":
    queue.failed_job_registry.requeue(job)
    print(f"Job {job.id} requeued")

启动 Worker:

# 基本启动
python manage.py rqworker default

# 消费多个队列(优先级顺序)
python manage.py rqworker high default low

# 指定 Worker 名称
python manage.py rqworker default --name worker1

# 只处理当前积压任务后退出
python manage.py rqworker default --burst

# 单进程内限制最多处理的任务数
python manage.py rqworker default --max-jobs 100

# 指定日志级别
python manage.py rqworker default --verbosity 2

# 开启 RQ 内置 scheduler,用于 enqueue_at/enqueue_in、Retry(interval=...)、Repeat
python manage.py rqworker default --with-scheduler

# 多进程 Worker 池
python manage.py rqworker-pool default low --num-workers 4

二、任务使用篇

2.1 普通任务

app/tasks.py

def send_welcome_email(user_id: int) -> str:
    from django.contrib.auth import get_user_model

    User = get_user_model()
    user = User.objects.get(id=user_id)

    # 这里调用真实邮件服务
    print(f"send email to {user.email}")
    return "ok"

视图中投递:

import django_rq
from app.tasks import send_welcome_email

def register(request):
    # 创建用户后异步发送邮件
    queue = django_rq.get_queue("default")
    job = queue.enqueue(send_welcome_email, user_id=request.user.id)
    return JsonResponse({"job_id": job.id})

2.2 装饰器方式

基本装饰器:

import django_rq

@django_rq.job("default", timeout=300)
def generate_report(report_id: int):
    # 生成报表
    return {"report_id": report_id, "status": "done"}

调用方式:

# 方式1:使用 delay 方法(推荐)
job = generate_report.delay(report_id=1)

# 方式2:enqueue 是 delay 的别名
job = generate_report.enqueue(report_id=1)

# 方式3:直接调用原函数(同步执行)
result = generate_report(report_id=1)  # 注意:这会同步执行!

# 方式4:使用 enqueue 方法
queue = django_rq.get_queue("default")
job = queue.enqueue(generate_report, report_id=1)

装饰器高级选项:

import django_rq
from datetime import timedelta
from rq import Retry

@django_rq.job(
    "default",
    timeout=600,
    result_ttl=3600,
    failure_ttl=86400,
    ttl=300,  # 任务在队列中的最大等待时间
    depends_on=None,
    job_id=None,  # 自定义 Job ID
    at_front=False,  # 是否插入队列前端
    meta={"source": "api", "version": "1.0"},
    retry=Retry(max=3, interval=[10, 30, 60]),
    description="生成用户报表"
)
def generate_user_report(user_id: int, report_type: str = "basic"):
    """生成用户报表任务"""
    # 业务逻辑
    return {"user_id": user_id, "report_type": report_type, "status": "done"}


# 延迟任务装饰器
@django_rq.job("default", timeout=300)
def send_reminder_email(user_id: int):
    # 发送提醒邮件
    pass

# 延迟调用:@job 装饰器只提供 delay/enqueue,
# 延迟和定时投递请使用 Queue 的 enqueue_in/enqueue_at。
queue = django_rq.get_queue("default")
job = queue.enqueue_in(timedelta(hours=24), send_reminder_email, user_id=1001)

# 定时调用
from datetime import datetime
job = queue.enqueue_at(datetime(2026, 4, 29, 9, 0), send_reminder_email, user_id=1001)

类方法装饰器:

import django_rq

class ReportService:
    @classmethod
    @django_rq.job("default")
    def generate_report(cls, report_id: int):
        # 生成报表
        return {"report_id": report_id}

# 调用
job = ReportService.generate_report.delay(report_id=1)

带参数绑定的装饰器:

import django_rq
from functools import partial

# 创建带预设参数的装饰器
def low_priority_job(func):
    return django_rq.job("low", timeout=1800)(func)

@low_priority_job
def batch_process_data(batch_id: int):
    # 批量处理数据
    pass

# 使用 partial 创建特定配置的装饰器
def create_job_decorator(queue_name, **kwargs):
    """创建自定义队列的装饰器"""
    def decorator(func):
        return django_rq.job(queue_name, **kwargs)(func)
    return decorator

email_job = create_job_decorator("high", timeout=30, result_ttl=300)

@email_job
def send_notification_email(user_id: int):
    # 发送通知邮件
    pass

2.3 延迟任务

from datetime import timedelta
import django_rq

queue = django_rq.get_queue("default")

job = queue.enqueue_in(
    timedelta(minutes=10),
    "app.tasks.close_unpaid_order",
    order_id=1001,
)

指定时间执行:

from datetime import datetime, timezone

queue.enqueue_at(
    datetime(2026, 4, 28, 12, 0, tzinfo=timezone.utc),
    "app.tasks.send_notification",
    user_id=1001,
)

enqueue_in() / enqueue_at() 使用 RQ 内置 scheduler,需要至少有一个 Worker 带 scheduler 组件运行。启动命令见前文 1.3 队列、Worker、Job


2.4 任务结果

job = queue.enqueue(generate_report, report_id=1, result_ttl=3600)

print(job.id)
print(job.return_value())  # RQ 1.12+ 推荐;job.result 已逐步废弃
print(job.get_status())

latest_result = job.latest_result()
if latest_result:
    print(latest_result.type, latest_result.return_value)

常见状态:

状态

说明

queued

等待执行

started

正在执行

finished

执行成功

failed

执行失败

deferred

依赖任务未完成

scheduled

已计划执行


2.5 失败重试

基本重试:

from rq import Retry

queue.enqueue(
    "app.tasks.call_remote_api",
    user_id=1001,
    retry=Retry(max=3, interval=[10, 30, 60]),
)

高级重试配置:

from rq import Retry

# 1. 指数退避重试
retry = Retry(
    max=5,
    interval=[10, 30, 60, 120, 240],  # 自定义间隔
)

# 2. 只对临时性异常重试:在任务内区分异常类型,永久性错误直接抛出
def call_remote_api(user_id: int):
    try:
        return remote_client.call(user_id)
    except (ConnectionError, TimeoutError):
        raise  # 让 RQ 的 Retry 机制处理
    except ValueError:
        raise  # 参数错误属于永久失败,不应盲目重试

# 3. 装饰器中的重试
@django_rq.job("default", retry=Retry(max=3))
def process_payment(payment_id: int):
    # 处理支付
    pass

# 4. 不同任务使用不同 Retry 配置
email_retry = Retry(max=2, interval=30)
payment_retry = Retry(max=5, interval=[10, 30, 60, 120, 300])

queue.enqueue("app.tasks.send_email", user_id=1001, retry=email_retry)
queue.enqueue("app.tasks.process_payment", payment_id=2001, retry=payment_retry)

# 5. 任务内主动要求重试(RQ 2.1+)
import django_rq
from rq import get_current_job

def process_with_retry(data_id: int):
    job = get_current_job()

    try:
        # 业务逻辑
        result = call_external_api(data_id)
        return result
    except ConnectionError as e:
        # 检查重试次数
        retry_count = job.meta.get("retry_count", 0)
        if retry_count < 3:
            job.meta["retry_count"] = retry_count + 1
            job.save_meta()

            # 返回 Retry 会让 RQ 重新调度本次任务
            return Retry(max=1, interval=60)

        raise e

重试监控和统计:

import django_rq

queue = django_rq.get_queue("default")

# 获取失败任务
failed_jobs = queue.failed_job_registry.get_job_ids()

# 分析失败原因
for job_id in failed_jobs:
    job = django_rq.get_job(job_id, "default")
    if job:
        print(f"Job {job_id} failed: {job.exc_info}")

        # 检查重试次数
        retry_count = job.meta.get("retry_count", 0)
        print(f"Retry count: {retry_count}")

        # 检查失败时间
        if job.ended_at:
            failed_time = job.ended_at
            print(f"Failed at: {failed_time}")

建议:

  1. 网络请求、第三方服务调用适合重试

  2. 参数错误、业务状态不合法不应盲目重试

  3. 重试任务必须保证幂等

  4. 设置合理的重试上限,避免无限重试

  5. 使用指数退避策略,避免对服务造成压力

  6. 记录重试日志,便于问题排查

  7. 区分临时性错误和永久性错误,不同错误不同处理

  8. 考虑使用死信队列处理多次重试仍失败的任务


2.6 任务依赖

job1 = queue.enqueue("app.tasks.extract_data")
job2 = queue.enqueue("app.tasks.transform_data", depends_on=job1)
job3 = queue.enqueue("app.tasks.load_data", depends_on=job2)

适合简单流水线任务;复杂 DAG 建议使用 Airflow、Prefect 等工作流系统。


三、Worker 篇

3.1 启动 Worker

Worker 启动命令已统一放在前文 1.3 队列、Worker、Job。队列顺序有优先级含义,Worker 会优先消费前面的队列。


3.2 Worker 数量

建议:

  • CPU 密集型任务:Worker 数量接近 CPU 核数。

  • IO 密集型任务:可适当增加 Worker。

  • 长任务和短任务分不同队列,避免相互阻塞。

  • 高优先级任务使用独立队列。


3.3 Supervisor 管理

/etc/supervisor/conf.d/django-rq.conf

[program:rqworker-default]
directory=/srv/myproject
command=/srv/myproject/venv/bin/python manage.py rqworker default
user=www-data
numprocs=2
process_name=%(program_name)s_%(process_num)02d
autostart=true
autorestart=true
redirect_stderr=true
stdout_logfile=/var/log/myproject/rqworker-default.log
stopwaitsecs=3600

启动:

sudo supervisorctl reread
sudo supervisorctl update
sudo supervisorctl status

3.4 任务超时

队列默认超时:

RQ_QUEUES = {
    "default": {
        "URL": "redis://localhost:6379/0",
        "DEFAULT_TIMEOUT": 300,
    }
}

单任务超时:

queue.enqueue("app.tasks.long_task", job_timeout=1800)

超时任务会被标记失败。长任务要合理拆分,避免一个任务运行过久。


四、定时任务篇

4.1 RQ 内置 scheduler

RQ 2.x 自带 scheduler,适合“一次性延迟/定时任务”、Retry(interval=...)Repeat。它不需要安装 rq-scheduler,但需要 Worker 带 --with-scheduler 运行。

from datetime import datetime, timedelta, timezone
from rq import Repeat
import django_rq

queue = django_rq.get_queue("default")

# 10 分钟后执行
job = queue.enqueue_in(
    timedelta(minutes=10),
    "app.tasks.close_unpaid_order",
    order_id=1001,
)

# 指定时间执行;建议使用带时区的 datetime
job = queue.enqueue_at(
    datetime(2026, 4, 29, 9, 0, tzinfo=timezone.utc),
    "app.tasks.daily_report",
)

# 成功后重复执行(RQ 2.2+)
job = queue.enqueue(
    "app.tasks.sync_statistics",
    repeat=Repeat(times=5, interval=300),
)

启动命令见前文 1.3 队列、Worker、Job 中的 --with-scheduler 示例。

已计划的任务存放在 ScheduledJobRegistry 中:

from rq.registry import ScheduledJobRegistry

registry = ScheduledJobRegistry(queue=queue)
for job_id in registry.get_job_ids():
    scheduled_at = registry.get_scheduled_time(job_id)
    print(job_id, scheduled_at)

4.2 RQ CronScheduler

django-rq 4.x 支持 RQ 的 CronScheduler,可用于 cron 表达式和固定间隔的周期任务。先创建 cron 配置文件:

# cron_config.py
from rq import cron
from myapp.tasks import send_report, sync_data

cron.register(send_report, queue_name="default", cron="0 9 * * *")
cron.register(sync_data, queue_name="high", interval=30)

启动:

python manage.py rqcron cron_config.py
python manage.py rqcron myapp.cron_jobs --logging-level DEBUG

4.3 可选:rq-scheduler / get_scheduler

django_rq.get_scheduler() 是对第三方包 rq-scheduler 的集成。只有需要 scheduler.schedule() / scheduler.cron() 这类 API 时才需要它。

pip install rq-scheduler

启动:

# 基本启动
python manage.py rqscheduler

# 指定队列
python manage.py rqscheduler --queue default

# 指定检查间隔
python manage.py rqscheduler --interval 30

# 写入 pid 文件
python manage.py rqscheduler --pid /tmp/rqscheduler.pid

使用:

from datetime import datetime, timedelta, timezone
import django_rq

scheduler = django_rq.get_scheduler("default", interval=60)

# 一次性定时任务
job = scheduler.enqueue_at(
    datetime.now(timezone.utc) + timedelta(hours=1),
    "app.tasks.send_reminder",
    user_id=1001,
)

# 周期任务
job = scheduler.schedule(
    scheduled_time=datetime.now(timezone.utc),
    func="app.tasks.sync_statistics",
    interval=3600,
    repeat=None,
    kwargs={"force": False},
    timeout=300,
    result_ttl=1800,
)

# cron 任务
job = scheduler.cron(
    "0 9 * * *",
    func="app.tasks.daily_report",
    kwargs={"report_type": "daily"},
    repeat=None,
    timeout=600,
)

for scheduled_job in scheduler.get_jobs():
    print(scheduled_job.id, scheduled_job.func_name)

scheduler.cancel(job)

4.4 Cron 表达式示例

Cron 表达式示例:

# 每分钟执行一次
"* * * * *"

# 每5分钟执行一次
"*/5 * * * *"

# 每小时的第30分钟执行
"30 * * * *"

# 每天凌晨2点执行
"0 2 * * *"

# 每周一上午9点执行
"0 9 * * 1"

# 每月1号和15号凌晨3点执行
"0 3 1,15 * *"

# 工作日(周一到周五)上午10点执行
"0 10 * * 1-5"

# 每季度第一天凌晨4点执行
"0 4 1 1,4,7,10 *"

rq-scheduler 的可选配置:

# settings.py
RQ = {
    "SCHEDULER_CLASS": "django_rq.queues.DjangoScheduler",
    "DEFAULT_RESULT_TTL": 86400,
}

RQ_QUEUES = {
    "default": {
        "URL": "redis://localhost:6379/0",
        "DEFAULT_TIMEOUT": 300,
    },
}

4.5 调度器监控和管理

from rq.registry import ScheduledJobRegistry
import django_rq

queue = django_rq.get_queue("default")
registry = ScheduledJobRegistry(queue=queue)

print(f"Scheduled jobs: {registry.count}")
for job_id in registry.get_job_ids():
    print(job_id, registry.get_scheduled_time(job_id))

django-rq 的 dashboard 和 /django-rq/stats.json 会展示 RQ 内置 scheduler 的 scheduled jobs。rq-scheduler 的周期任务需要使用 scheduler.get_jobs() 或其自身数据结构查看。

生产环境建议:

  1. 内置 scheduler 可多 Worker 启用:RQ 会为同一队列选出一个 active scheduler。

  2. rq-scheduler 单实例运行:如果使用 rqscheduler,确保每个队列只有一个 scheduler 实例,避免重复调度。

  3. 监控:监控调度器进程状态和 Redis 连接

  4. 日志:记录调度器的操作日志

  5. 错误处理:调度器异常时自动重启

  6. 备份:定期备份调度器配置

# 使用 Supervisor 管理调度器
[program:rqscheduler]
command=/path/to/venv/bin/python manage.py rqscheduler --queue default --interval 30
autostart=true
autorestart=true
redirect_stderr=true
stdout_logfile=/var/log/rq/scheduler.log

五、可靠性与幂等篇

5.1 幂等设计

任务可能因为 Worker 崩溃、网络抖动、重试而执行多次。

常用方式:

  • 使用业务唯一键。

  • 状态机控制流转。

  • 数据库唯一约束防重复。

  • 外部接口传幂等键。

  • 任务开始前检查是否已完成。

示例:

def pay_success_callback(order_id: int, event_id: str):
    if PaymentEvent.objects.filter(event_id=event_id).exists():
        return

    PaymentEvent.objects.create(event_id=event_id, order_id=order_id)
    Order.objects.filter(id=order_id, status="pending").update(status="paid")

5.2 事务提交后投递任务

避免数据库事务回滚但任务已经执行:

from django.db import transaction
import django_rq

def create_order(request):
    order = Order.objects.create(...)

    transaction.on_commit(
        lambda: django_rq.get_queue("default").enqueue(
            "orders.tasks.close_unpaid_order",
            order_id=order.id,
        )
    )

5.3 失败任务处理

建议:

  • 明确哪些异常可重试。

  • 记录业务失败原因。

  • 对失败任务提供后台重试入口。

  • 对外部接口调用设置超时。

  • 任务日志带 job_id 和业务 ID。


六、性能优化篇

6.1 队列拆分

常见队列:

队列

任务

high

短任务、用户感知任务

default

普通异步任务

low

报表、批处理、低优先级任务

long

长耗时任务

Worker 启动命令见前文 1.3 队列、Worker、Job,不同优先级队列建议按业务重要性拆分独立 Worker。


6.2 Redis 优化

建议:

  • RQ 使用独立 Redis DB 或独立实例。

  • 设置合理 result_ttlfailure_ttl

  • 不要在 Redis 内长期保存大量任务结果。

  • 监控 Redis 内存、连接数、慢查询。

  • 开启持久化时关注磁盘 IO。


6.3 任务优化

  • 批量处理数据库查询,避免 N+1。

  • 长任务拆分为多个小任务。

  • 外部 HTTP 请求设置超时。

  • 大文件处理使用对象存储,不把大内容塞入任务参数。

  • CPU 密集型任务可以考虑 Celery、进程池或专门计算服务。


七、运维与监控篇

7.1 Django RQ 管理页面

基本管理页面:

/django-rq/ 可查看:

  • 队列长度和状态

  • Worker 状态和数量

  • 成功任务列表

  • 失败任务列表

  • 计划任务列表

  • 任务详情和执行日志

Admin 集成与指标接口:

# urls.py
urlpatterns = [
    # 可选 standalone dashboard
    path("django-rq/", include("django_rq.urls")),
]

# settings.py
RQ_API_TOKEN = "change-me"  # 保护 /django-rq/stats.json 和 /django-rq/metrics/
RQ_SHOW_ADMIN_LINK = True   # 是否在 Django Admin 侧边栏显示入口

功能对比:

功能

Django Admin 集成

Standalone URL

实时队列监控

Worker 状态

Job registry 浏览

JSON 统计

Prometheus 指标

安装 django-rq[prometheus] 后可用

安装 django-rq[prometheus] 后可用

7.2 命令行工具

队列管理:

# 查看队列、Worker、失败/完成/延迟等统计
python manage.py rqstats
python manage.py rqstats --interval 1
python manage.py rqstats --json
python manage.py rqstats --yaml

任务管理:

# 暂停/恢复 Worker 拉取新任务
python manage.py rqsuspend
python manage.py rqsuspend --duration 600
python manage.py rqresume

# CronScheduler
python manage.py rqcron cron_config.py

Worker 管理:

rqworker 启动参数已经在前文 1.3 队列、Worker、Job 统一说明,这里不再重复。

7.3 编程接口监控

import django_rq
from datetime import datetime, timedelta

# 1. 获取队列统计
queue = django_rq.get_queue("default")

# 队列长度
queue_length = queue.count
print(f"Queue length: {queue_length}")

queued_jobs = queue.get_jobs()
print(f"Queued jobs: {len(queued_jobs)}")

# 按 registry 统计
status_counts = {
    "queued": len(queued_jobs),
    "started": queue.started_job_registry.count,
    "finished": queue.finished_job_registry.count,
    "failed": queue.failed_job_registry.count,
    "deferred": queue.deferred_job_registry.count,
    "scheduled": queue.scheduled_job_registry.count,
}
print(f"Status counts: {status_counts}")

# 2. Worker 监控
from rq import Worker

workers = Worker.all(connection=queue.connection)
for worker in workers:
    print(f"Worker: {worker.name}")
    print(f"  State: {worker.state}")
    print(f"  Birth: {worker.birth_date}")
    print(f"  Queues: {worker.queue_names()}")

    # 检查是否存活
    if worker.is_idle:
        print("  Status: Idle")
    elif worker.is_suspended:
        print("  Status: Suspended")
    else:
        print("  Status: Busy")

# 3. 任务执行时间监控
finished_jobs = queue.finished_job_registry.get_job_ids()
for job_id in finished_jobs[:10]:  # 查看最近10个
    job = django_rq.get_job(job_id, "default")
    if job and job.ended_at and job.started_at:
        duration = (job.ended_at - job.started_at).total_seconds()
        print(f"Job {job_id}: {duration:.2f} seconds")

# 4. 失败任务分析
failed_jobs = queue.failed_job_registry.get_job_ids()
print(f"Failed jobs: {len(failed_jobs)}")

for job_id in failed_jobs[:5]:
    job = django_rq.get_job(job_id, "default")
    if job:
        print(f"Failed job {job_id}:")
        print(f"  Function: {job.func_name}")
        print(f"  Arguments: {job.args}")
        print(f"  Error: {job.exc_info}")
        print(f"  Failed at: {job.ended_at}")

# 5. 内存使用监控
import redis

redis_conn = django_rq.get_connection("default")
info = redis_conn.info()

print(f"Redis memory used: {info['used_memory_human']}")
print(f"Connected clients: {info['connected_clients']}")
print(f"Total commands: {info['total_commands_processed']}")

# 6. 自定义监控指标
class RQMonitor:
    def __init__(self):
        self.queues = ["default", "high", "low"]

    def get_queue_metrics(self):
        metrics = {}
        for queue_name in self.queues:
            queue = django_rq.get_queue(queue_name)
            metrics[queue_name] = {
                "length": queue.count,
                "jobs": len(queue.get_jobs()),
                "started": queue.started_job_registry.count,
                "failed": queue.failed_job_registry.count,
                "finished": queue.finished_job_registry.count,
                "scheduled": queue.scheduled_job_registry.count,
            }
        return metrics

    def get_worker_metrics(self):
        metrics = {"total": 0, "busy": 0, "idle": 0}
        for queue_name in self.queues:
            connection = django_rq.get_connection(queue_name)
            workers = Worker.all(connection=connection)
            metrics["total"] += len(workers)
            for worker in workers:
                if worker.is_idle:
                    metrics["idle"] += 1
                else:
                    metrics["busy"] += 1
        return metrics

    def check_health(self):
        """检查 RQ 系统健康状态"""
        health = {"status": "healthy", "issues": []}

        # 检查 Redis 连接
        try:
            redis_conn = django_rq.get_connection("default")
            redis_conn.ping()
        except Exception as e:
            health["status"] = "unhealthy"
            health["issues"].append(f"Redis connection failed: {e}")

        # 检查队列积压
        queue_metrics = self.get_queue_metrics()
        for queue_name, metrics in queue_metrics.items():
            if metrics["length"] > 1000:  # 阈值
                health["issues"].append(f"Queue {queue_name} backlog: {metrics['length']}")

            if metrics["failed"] > 100:  # 失败任务过多
                health["issues"].append(f"Queue {queue_name} has {metrics['failed']} failed jobs")

        # 检查 Worker 状态
        worker_metrics = self.get_worker_metrics()
        if worker_metrics["total"] == 0:
            health["status"] = "unhealthy"
            health["issues"].append("No workers running")

        return health

# 使用监控
monitor = RQMonitor()
print("Queue metrics:", monitor.get_queue_metrics())
print("Worker metrics:", monitor.get_worker_metrics())
print("Health check:", monitor.check_health())

7.4 集成外部监控系统

Prometheus 监控:

# app/monitoring/prometheus_metrics.py
from prometheus_client import Counter, Gauge, Histogram
import django_rq

# 定义指标
RQ_JOBS_TOTAL = Counter(
    "rq_jobs_total",
    "Total number of RQ jobs",
    ["queue", "status"]
)

RQ_QUEUE_LENGTH = Gauge(
    "rq_queue_length",
    "Current queue length",
    ["queue"]
)

RQ_JOB_DURATION = Histogram(
    "rq_job_duration_seconds",
    "RQ job duration in seconds",
    ["queue", "function"]
)

RQ_FAILED_JOBS = Counter(
    "rq_failed_jobs_total",
    "Total failed RQ jobs",
    ["queue", "error_type"]
)

# 监控装饰器
def monitor_rq_job(queue_name="default"):
    """监控 RQ 任务的装饰器"""
    def decorator(func):
        def wrapper(*args, **kwargs):
            start_time = time.time()
            try:
                result = func(*args, **kwargs)
                RQ_JOBS_TOTAL.labels(queue=queue_name, status="success").inc()
                return result
            except Exception as e:
                error_type = type(e).__name__
                RQ_JOBS_TOTAL.labels(queue=queue_name, status="failed").inc()
                RQ_FAILED_JOBS.labels(queue=queue_name, error_type=error_type).inc()
                raise
            finally:
                duration = time.time() - start_time
                RQ_JOB_DURATION.labels(queue=queue_name, function=func.__name__).observe(duration)
        return wrapper
    return decorator

# 使用示例
@monitor_rq_job("default")
def process_data(data_id: int):
    # 处理数据
    pass

# 定期收集队列指标
def collect_queue_metrics():
    """收集队列指标到 Prometheus"""
    queues = ["default", "high", "low"]
    for queue_name in queues:
        queue = django_rq.get_queue(queue_name)
        RQ_QUEUE_LENGTH.labels(queue=queue_name).set(queue.count)

生产环境必须加权限控制,不要开放公网访问。


7.5 常用 Redis 命令

基本监控命令:

# 连接 Redis
redis-cli

# 查看内存使用
INFO memory

# 查看客户端连接
INFO clients

# 查看数据库大小
DBSIZE

# 查看 RQ 相关键(生产环境慎用)
KEYS rq:*

# 安全扫描(推荐)
SCAN 0 MATCH rq:* COUNT 100

RQ 特定命令:

# 查看队列长度
redis-cli LLEN rq:queue:default

# 查看 Worker 注册
redis-cli SMEMBERS rq:workers

# 查看失败任务
redis-cli ZRANGE rq:failed:default 0 -1

# 查看计划任务
redis-cli ZRANGE rq:scheduled:default 0 -1

# 查看任务结果
redis-cli XREVRANGE rq:results:job_id + - COUNT 1

# 查看任务元数据
redis-cli HGETALL rq:job:job_id

性能诊断命令:

# 查看慢查询
redis-cli SLOWLOG GET 10

# 查看命令统计
redis-cli INFO commandstats

# 查看连接统计
redis-cli INFO stats

# 查看键空间统计
redis-cli INFO keyspace

# 监控实时命令
redis-cli MONITOR

维护命令:

# 清空特定队列
redis-cli DEL rq:queue:default

# 删除失败任务
redis-cli DEL rq:failed:default

# 删除所有 RQ 相关键(危险!)
redis-cli --scan --pattern "rq:*" | xargs redis-cli DEL

# 导出 RQ 数据
redis-cli --rdb rq_backup.rdb

# 导入 RQ 数据
redis-cli --pipe < rq_backup.rdb

生产不要对大实例频繁使用 KEYS,可用 SCAN


7.6 监控指标和阈值建议

指标

警告阈值

严重阈值

检查频率

队列长度

> 500

> 1000

1分钟

Worker 离线率

> 20%

> 50%

30秒

失败任务率

> 5%

> 10%

5分钟

Redis 内存使用

> 70%

> 85%

1分钟

平均任务耗时

> 300秒

> 600秒

5分钟

Scheduler 状态

未运行

停止 > 5分钟

30秒

告警规则示例(Prometheus):

自动恢复策略:

告警通知集成:


八、常见业务场景

8.1 发送邮件和短信

请求线程只负责创建业务数据,发送动作异步执行。

transaction.on_commit(
    lambda: queue.enqueue("notifications.tasks.send_sms", user_id=user.id)
)

8.2 订单超时关闭

queue.enqueue_in(
    timedelta(minutes=30),
    "orders.tasks.close_unpaid_order",
    order_id=order.id,
)

任务执行时再次检查订单状态,避免误关闭已支付订单。


九、最佳实践

9.1 最佳实践

任务设计最佳实践:

  1. 参数设计

    • 传递 ID 而不是完整对象

    • 避免传递大对象

    • 使用明确的参数名称

    # 好:传递 ID
    queue.enqueue("app.tasks.process_user", user_id=1001)
    
    # 不好:传递完整对象
    user = User.objects.get(id=1001)
    queue.enqueue("app.tasks.process_user", user=user)
    
  2. 错误处理

    • 明确区分临时错误和永久错误

    • 记录详细的错误信息

    • 提供重试机制

    def process_payment(payment_id: int):
        try:
            payment = Payment.objects.get(id=payment_id)
            result = call_payment_gateway(payment)
            return result
        except Payment.DoesNotExist:
            # 永久错误:不重试
            logger.error(f"Payment {payment_id} not found")
            raise
        except (ConnectionError, TimeoutError) as e:
            # 临时错误:可以重试
            logger.warning(f"Payment gateway error: {e}")
            raise
    
  3. 幂等性设计

    • 使用唯一业务键

    • 状态机控制

    • 数据库约束

    def send_notification(notification_id: int):
        # 检查是否已发送
        if Notification.objects.filter(id=notification_id, status="sent").exists():
            return "already_sent"
    
        # 标记为发送中
        Notification.objects.filter(id=notification_id).update(status="sending")
    
        # 发送逻辑
        send_actual_notification(notification_id)
    
        # 标记为已发送
        Notification.objects.filter(id=notification_id).update(status="sent")
        return "sent"
    

配置最佳实践:

  1. 环境分离

    # settings/production.py
    RQ_QUEUES = {
        "default": {
            "URL": os.getenv("REDIS_URL"),
            "DEFAULT_TIMEOUT": 600,
            "SSL": True,
        }
    }
    
    # settings/development.py
    RQ_QUEUES = {
        "default": {
            "URL": "redis://localhost:6379/0",
            "DEFAULT_TIMEOUT": 300,
        }
    }
    
  2. 队列分离

    RQ_QUEUES = {
        "high": {  # 高优先级:用户感知任务
            "URL": "redis://localhost:6379/0",
            "DEFAULT_TIMEOUT": 30,
        },
        "default": {  # 普通任务
            "URL": "redis://localhost:6379/0",
            "DEFAULT_TIMEOUT": 300,
        },
        "low": {  # 低优先级:批处理
            "URL": "redis://localhost:6379/0",
            "DEFAULT_TIMEOUT": 3600,
        },
        "scheduler": {  # 调度器专用
            "URL": "redis://localhost:6379/1",
            "DEFAULT_TIMEOUT": 300,
        },
    }
    

9.2 性能优化

Redis 优化:

  1. 连接池

    RQ_QUEUES = {
        "default": {
            "URL": "redis://localhost:6379/0",
            "REDIS_CLIENT_KWARGS": {
                "socket_connect_timeout": 5,
                "socket_timeout": 10,
                "retry_on_timeout": True,
                "max_connections": 50,  # 连接池大小
            },
        }
    }
    
  2. 内存优化

    # Redis 配置
    maxmemory 2gb
    maxmemory-policy allkeys-lru
    save 900 1
    save 300 10
    save 60 10000
    
  3. 持久化优化

    # 使用 AOF 和 RDB 混合
    appendonly yes
    appendfsync everysec
    aof-use-rdb-preamble yes
    

任务优化:

  1. 批量处理

    # 批量处理而不是单个处理
    @django_rq.job("low")
    def batch_process_items(item_ids: List[int]):
        items = Item.objects.filter(id__in=item_ids)
        for item in items:
            process_single_item(item)
    
  2. 连接复用

    # 在任务中复用数据库连接
    def process_with_reuse(data_id: int):
        from django.db import connection
    
        # 复用连接
        with connection.cursor() as cursor:
            cursor.execute("SELECT * FROM data WHERE id = %s", [data_id])
            # ...
    
  3. 缓存使用

    def process_with_cache(user_id: int):
        from django.core.cache import cache
    
        cache_key = f"user_data_{user_id}"
        data = cache.get(cache_key)
    
        if not data:
            data = fetch_user_data(user_id)
            cache.set(cache_key, data, timeout=300)
    
        # 使用缓存的数据
        process_data(data)
    

十、面试要点

10.1 Django RQ 和 Celery 的区别?

对比

Django RQ

Celery

复杂度

简单轻量

功能强大但复杂

Broker

Redis

Redis、RabbitMQ 等

定时任务

依赖 scheduler

Celery Beat

生态

较小

成熟丰富

适合场景

中小规模异步任务

大规模复杂任务系统

10.2 RQ 任务为什么要幂等?

任务可能重试、Worker 崩溃后重新执行、外部接口超时但实际成功。如果任务不幂等,可能导致重复扣款、重复发券、重复发消息。

10.3 如何保证事务提交后再执行任务?

使用 transaction.on_commit() 投递任务,确保数据库事务成功提交后任务才进入队列。

10.4 Worker 挂了怎么办?

  • 使用 Supervisor/systemd/Kubernetes 保活。

  • 监控 Worker 心跳和队列长度。

  • 失败任务可重试。

  • 任务逻辑保持幂等。


十一、参考资源

官方文档