Django¶
基于 Django 4.2 的前后端分离架构后端开发完整教程,涵盖核心概念、最佳实践、性能优化和生产级应用构建。
文档说明¶
Django 版本: 4.2 LTS (Long Term Support)
架构模式: 前后端分离(纯后端 API)
不包含: 模板系统、DRF(Django REST Framework)
重点: 核心概念理解 + 实战最佳实践
目录¶
第一部分:基础核心¶
第二部分:数据层¶
第三部分:核心功能¶
-
本章节内容与第十章认证系统的权限部分重合,详见第十章第 2 节"权限与分组"
第四部分:高级特性¶
第五部分:实战优化¶
一、Django 框架概述¶
1.1 Django 简介¶
Django 是一个高级的 Python Web 框架,遵循"快速开发"和"不重复造轮子(DRY)"的设计哲学。
核心特点:
电池齐全(Batteries Included): 内置 ORM、Admin、认证、表单等
安全性: 默认防护 CSRF、XSS、SQL 注入等
可扩展性: 支持中间件、信号、自定义管理器等扩展点
MTV 架构: Model-Template-View(前后端分离时 Template 层由前端替代)
ORM 强大: 类 Python 风格的数据库操作
1.2 Django 4.2 LTS 新特性¶
Django 4.2 是长期支持版本(至 2026 年 4 月),主要新特性:
# 1. 新增 Psycopg 3 支持(PostgreSQL 驱动)
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.postgresql', # 自动使用 psycopg 3
'NAME': 'mydb',
}
}
# 2. 异步 ORM 改进
async def get_users():
users = [user async for user in User.objects.filter(is_active=True)]
return users
# 3. 改进的约束验证
from django.db import models
from django.db.models import CheckConstraint, Q, UniqueConstraint
class Product(models.Model):
name = models.CharField(max_length=100)
price = models.DecimalField(max_digits=10, decimal_places=2)
class Meta:
constraints = [
CheckConstraint(
check=Q(price__gte=0),
name='price_positive',
violation_error_message='价格必须为正数' # 4.2 新增自定义错误信息
),
]
# 4. 改进的自定义命令
from django.core.management.base import BaseCommand
class Command(BaseCommand):
def add_arguments(self, parser):
parser.add_argument('--force', action='store_true') # 改进的参数处理
1.3 快速开始¶
# 1. 安装 Django 4.2
pip install django==4.2
# 2. 创建项目
django-admin startproject myproject
cd myproject
# 3. 创建应用
python manage.py startapp users
# 4. 运行开发服务器
python manage.py runserver
# 访问 http://127.0.0.1:8000/
# 5. 常用命令
python manage.py makemigrations # 生成迁移文件
python manage.py migrate # 执行迁移
python manage.py createsuperuser # 创建超级用户
python manage.py shell # 进入 Django Shell
python manage.py dbshell # 进入数据库 Shell
项目结构说明:
myproject/
├── manage.py # 命令行工具
└── myproject/ # 项目配置目录
├── __init__.py
├── settings.py # 配置文件
├── urls.py # 主路由
├── asgi.py # ASGI 入口(异步)
└── wsgi.py # WSGI 入口(同步)
二、项目架构与配置¶
2.1 标准项目结构设计¶
生产级项目结构
myproject/
├── config/ # 项目配置根目录
│ ├── __init__.py
│ ├── settings/ # 分环境配置
│ │ ├── __init__.py
│ │ ├── base.py # 基础配置
│ │ ├── development.py # 开发环境
│ │ ├── staging.py # 预发环境
│ │ └── production.py # 生产环境
│ ├── urls.py # 主路由
│ ├── asgi.py # ASGI 入口(异步)
│ └── wsgi.py # WSGI 入口(同步)
│
├── apps/ # 业务应用目录
│ ├── __init__.py
│ ├── core/ # 核心模块(抽象基类、工具)
│ │ ├── __init__.py
│ │ ├── models.py # 抽象基础模型
│ │ ├── managers.py # 自定义 Manager
│ │ ├── mixins.py # Mixin 类
│ │ ├── exceptions.py # 自定义异常
│ │ ├── validators.py # 通用验证器
│ │ ├── utils.py # 工具函数
│ │ └── pagination.py # 分页工具
│ │
│ ├── users/ # 用户模块
│ │ ├── __init__.py
│ │ ├── models.py # 用户模型
│ │ ├── views.py # 视图
│ │ ├── urls.py # 路由
│ │ ├── forms.py # 表单
│ │ ├── services.py # 业务逻辑层
│ │ ├── serializers.py # 数据序列化
│ │ ├── tasks.py # 异步任务
│ │ ├── admin.py # Admin 配置
│ │ ├── apps.py # 应用配置
│ │ └── tests/ # 测试
│ │ ├── __init__.py
│ │ ├── test_models.py
│ │ ├── test_views.py
│ │ └── test_services.py
│ │
│ └── orders/ # 订单模块
│ └── ...
│
├── middleware/ # 自定义中间件
│ ├── __init__.py
│ ├── request_id.py # 请求 ID
│ ├── logging_middleware.py # 日志记录
│ ├── exception_handler.py # 异常处理
│ └── response_formatter.py # 响应格式化
│
├── utils/ # 全局工具函数
│ ├── __init__.py
│ ├── crypto.py # 加密工具
│ ├── datetime_utils.py # 时间处理
│ └── validators.py # 验证工具
│
├── tests/ # 集成测试
│ ├── __init__.py
│ ├── conftest.py # pytest 配置
│ ├── fixtures/ # 测试数据
│ └── integration/ # 集成测试
│
├── static/ # 静态文件源目录
│ ├── css/
│ ├── js/
│ └── images/
│
├── media/ # 用户上传文件
├── staticfiles/ # collectstatic 输出目录
├── logs/ # 日志目录
│
├── scripts/ # 管理脚本
│ ├── init_db.py # 初始化数据库
│ ├── seed_data.py # 填充测试数据
│ └── backup.py # 备份脚本
│
├── docs/ # 项目文档
│ ├── api.md # API 文档
│ ├── architecture.md # 架构文档
│ └── deployment.md # 部署文档
│
├── deploy/ # 部署配置
│ ├── docker/
│ │ ├── Dockerfile
│ │ ├── docker-compose.yml
│ │ └── nginx.conf
│ └── scripts/
│ └── deploy.sh
│
├── requirements/ # 依赖管理
│ ├── base.txt # 基础依赖
│ ├── development.txt # 开发依赖
│ ├── production.txt # 生产依赖
│ └── test.txt # 测试依赖
│
├── .env.example # 环境变量示例
├── .gitignore # Git 忽略文件
├── manage.py # Django 管理脚本
├── pytest.ini # pytest 配置
├── setup.cfg # 项目配置
└── README.md # 项目说明
目录划分原则:
apps/: 按业务领域划分,每个应用独立
config/: 项目配置,多环境分离
middleware/: 横切关注点的统一处理
utils/: 全局工具函数
tests/: 集成测试独立目录
scripts/: 管理脚本与自动化工具
创建项目
# 1. 安装 Django 4.2
pip install django==4.2
# 2. 创建项目(使用 config 作为配置目录名)
django-admin startproject config .
# 3. 创建 apps 目录并创建应用
mkdir apps
python manage.py startapp users apps/users
# 4. 安装常用依赖
pip install psycopg2-binary # PostgreSQL 驱动
pip install mysqlclient # MySQL 驱动(推荐)
pip install django-redis # Redis 缓存
pip install django-rq # 任务队列
pip install rq-scheduler # RQ 定时任务(可选)
pip install django-environ # 环境变量管理
pip install pillow # 图片处理
pip install python-dotenv # .env 文件支持
pip install django-cors-headers # CORS 支持
pip install gunicorn # 生产服务器
# 5. 开发工具(可选)
pip install django-debug-toolbar # 开发调试工具栏
pip install django-extensions # 扩展命令
pip install ipython # 增强 Shell
pip install pytest-django # 测试框架
三、路由系统(URLconf)¶
3.1 URLconf 工作原理¶
Django 通过 URLconf(URL Configuration)实现 URL 到视图的映射。
请求处理流程:
1. 用户请求 → 2. MIDDLEWARE_CLASSES(请求阶段)→ 3. URLconf 路由匹配
↓
4. View 处理 → 5. MIDDLEWARE_CLASSES(响应阶段)→ 6. 返回响应
config/urls.py - 主路由配置
from django.contrib import admin
from django.urls import path, include
from django.conf import settings
from django.conf.urls.static import static
urlpatterns = [
path('admin/', admin.site.urls),
path('api/users/', include('apps.users.urls')),
path('api/orders/', include('apps.orders.urls')),
path('api/auth/', include('apps.authentication.urls')),
]
# 开发环境静态文件服务(生产环境由 Nginx 处理)
if settings.DEBUG:
urlpatterns += static(settings.MEDIA_URL, document_root=settings.MEDIA_ROOT)
urlpatterns += static(settings.STATIC_URL, document_root=settings.STATIC_ROOT)
3.2 路径模式匹配¶
基本路径定义
URLconf 路由匹配, 是一种顺序匹配,匹配到第一个的就成功
# apps/users/urls.py
from django.urls import path, re_path
from . import views
app_name = 'users' # 命名空间
urlpatterns = [
# 基础路径
path('', views.UserListView.as_view(), name='list'),
# 路径参数(使用路径转换器)
path('<int:pk>/', views.UserDetailView.as_view(), name='detail'),
path('<str:username>/', views.UserProfileView.as_view(), name='profile'),
path('<uuid:uid>/', views.UserByUUIDView.as_view(), name='by_uuid'),
path('<slug:slug>/', views.UserBySlugView.as_view(), name='by_slug'),
path('<path:filepath>/', views.FileView.as_view(), name='file'),
# 多级路径
path('<int:pk>/orders/', views.UserOrdersView.as_view(), name='orders'),
path('<int:pk>/profile/', views.UserProfileDetailView.as_view(), name='profile_detail'),
# 正则表达式路径(Django 4.2 仍支持)
re_path(r'^archive/(?P<year>[0-9]{4})/$', views.YearArchiveView.as_view(), name='year_archive'),
]
3.3 路径转换器(Path Converters)¶
内置路径转换器
转换器 |
匹配规则 |
示例 |
|---|---|---|
|
匹配除 |
|
|
匹配零或任意正整数 |
|
|
匹配字母、数字、连字符、下划线 |
|
|
匹配 UUID 格式 |
|
|
匹配任意非空字符串,包括 |
|
自定义路径转换器
# apps/core/converters.py
class YearMonthConverter:
regex = r'[0-9]{4}-[0-9]{2}'
def to_python(self, value):
year, month = value.split('-')
return {'year': int(year), 'month': int(month)}
def to_url(self, value):
return f"{value['year']:04d}-{value['month']:02d}"
class PhoneNumberConverter:
regex = r'1[3-9]\d{9}' # 中国手机号
def to_python(self, value):
return value
def to_url(self, value):
return value
# config/urls.py
from django.urls import register_converter
from apps.core import converters
register_converter(converters.YearMonthConverter, 'ym')
register_converter(converters.PhoneNumberConverter, 'phone')
# apps/users/urls.py
urlpatterns = [
path('stats/<ym:period>/', views.MonthlyStatsView.as_view(), name='monthly_stats'),
path('verify/<phone:number>/', views.VerifyPhoneView.as_view(), name='verify_phone'),
]
3.4 路由命名与反向解析¶
路由命名
# apps/users/urls.py
from django.urls import path
from . import views
app_name = 'users' # 命名空间
urlpatterns = [
path('', views.UserListView.as_view(), name='list'),
path('<int:pk>/', views.UserDetailView.as_view(), name='detail'),
path('<int:pk>/edit/', views.UserEditView.as_view(), name='edit'),
]
反向解析(在代码中生成 URL)
from django.urls import reverse
from django.shortcuts import redirect
# 基础用法
url = reverse('users:list') # /api/users/
url = reverse('users:detail', kwargs={'pk': 1}) # /api/users/1/
url = reverse('users:detail', args=[1]) # 等同于上面
# 在视图中重定向
def some_view(request):
return redirect('users:detail', pk=1)
# 在模型中定义 get_absolute_url
class User(models.Model):
username = models.CharField(max_length=50)
def get_absolute_url(self):
return reverse('users:detail', kwargs={'pk': self.pk})
# 使用示例
user = User.objects.get(id=1)
url = user.get_absolute_url() # /api/users/1/
3.5 路由包含与模块化¶
模块化路由组织
# config/urls.py
from django.urls import path, include
urlpatterns = [
path('admin/', admin.site.urls),
path('api/v1/', include([
path('users/', include('apps.users.urls')),
path('orders/', include('apps.orders.urls')),
path('products/', include('apps.products.urls')),
])),
]
# 或者使用命名空间
urlpatterns = [
path('api/v1/', include(('apps.api_v1.urls', 'api_v1'), namespace='v1')),
path('api/v2/', include(('apps.api_v2.urls', 'api_v2'), namespace='v2')),
]
# 反向解析时指定版本
url = reverse('v1:users:list') # /api/v1/users/
url = reverse('v2:users:list') # /api/v2/users/
四、视图层(Views)¶
4.1 视图的本质与职责¶
视图是什么?
视图是一个可调用对象(函数或类),接收 HTTP 请求,返回 HTTP 响应。
# 最简视图
from django.http import HttpResponse
def hello_view(request):
return HttpResponse("Hello, Django!")
视图的职责(单一职责原则):
接收请求参数
验证参数
调用业务逻辑(Service 层)
返回响应
避免在视图中:
复杂的业务逻辑(应放在 Service 层)
直接的数据库操作(应通过 Model 或 Service)
大量的数据处理(应放在 Serializer 或 Service)
4.2 函数视图(FBV - Function-Based Views)¶
基础函数视图
# apps/users/views.py
from django.http import JsonResponse, HttpResponse
from django.views.decorators.http import require_http_methods, require_GET, require_POST
from django.shortcuts import get_object_or_404
import json
from .models import User
@require_GET
def user_list(request):
"""用户列表"""
users = User.objects.filter(is_active=True).values('id', 'username', 'email')
return JsonResponse({
'code': 0,
'data': list(users),
'msg': 'success'
})
@require_http_methods(['GET', 'POST'])
def user_list_create(request):
"""用户列表与创建"""
if request.method == 'GET':
users = User.objects.all().values('id', 'username')
return JsonResponse({'code': 0, 'data': list(users)})
elif request.method == 'POST':
data = json.loads(request.body)
user = User.objects.create(
username=data['username'],
email=data['email']
)
return JsonResponse({
'code': 0,
'data': {'id': user.id, 'username': user.username}
}, status=201)
def user_detail(request, pk):
"""用户详情"""
user = get_object_or_404(User, pk=pk)
if request.method == 'GET':
return JsonResponse({
'code': 0,
'data': {
'id': user.id,
'username': user.username,
'email': user.email,
}
})
elif request.method == 'PUT':
data = json.loads(request.body)
user.username = data.get('username', user.username)
user.email = data.get('email', user.email)
user.save()
return JsonResponse({'code': 0, 'msg': 'updated'})
elif request.method == 'DELETE':
user.delete()
return JsonResponse({'code': 0, 'msg': 'deleted'}, status=204)
FBV 优点:
简单直观,易于理解
适合逻辑简单的视图
灵活性高
FBV 缺点:
代码复用困难
多个 HTTP 方法处理在一个函数中,函数过长
难以组合和扩展
4.3 类视图(CBV - Class-Based Views)¶
基础类视图
from django.views import View
from django.http import JsonResponse
from django.shortcuts import get_object_or_404
import json
from .models import User
class UserListView(View):
"""用户列表"""
def get(self, request):
users = User.objects.filter(is_active=True).values('id', 'username', 'email')
return JsonResponse({'code': 0, 'data': list(users)})
def post(self, request):
data = json.loads(request.body)
user = User.objects.create(
username=data['username'],
email=data['email']
)
return JsonResponse({
'code': 0,
'data': {'id': user.id, 'username': user.username}
}, status=201)
class UserDetailView(View):
"""用户详情"""
def get_object(self, pk):
return get_object_or_404(User, pk=pk)
def get(self, request, pk):
user = self.get_object(pk)
return JsonResponse({
'code': 0,
'data': {
'id': user.id,
'username': user.username,
'email': user.email,
}
})
def put(self, request, pk):
user = self.get_object(pk)
data = json.loads(request.body)
user.username = data.get('username', user.username)
user.save()
return JsonResponse({'code': 0, 'msg': 'updated'})
def delete(self, request, pk):
user = self.get_object(pk)
user.delete()
return HttpResponse(status=204)
CBV 优点:
代码组织清晰(按 HTTP 方法分离)
易于复用(继承和 Mixin)
符合面向对象思想
4.4 通用视图与 Mixin 模式¶
Mixin 模式实现代码复用
# apps/core/mixins.py
from django.http import JsonResponse
import json
class JSONResponseMixin:
"""JSON 响应 Mixin"""
def render_json_response(self, data, status=200):
return JsonResponse(data, status=status)
def render_success(self, data=None, msg='success'):
return self.render_json_response({
'code': 0,
'data': data,
'msg': msg
})
def render_error(self, msg='error', code=1, status=400):
return self.render_json_response({
'code': code,
'msg': msg,
}, status=status)
class ParseJSONMixin:
"""解析 JSON 请求体 Mixin"""
def get_json_data(self):
try:
return json.loads(self.request.body)
except json.JSONDecodeError:
return {}
class PaginationMixin:
"""分页 Mixin"""
page_size = 20
def paginate_queryset(self, queryset):
from django.core.paginator import Paginator
page = self.request.GET.get('page', 1)
paginator = Paginator(queryset, self.page_size)
return paginator.get_page(page)
# apps/users/views.py
from django.views import View
from apps.core.mixins import JSONResponseMixin, ParseJSONMixin, PaginationMixin
from .models import User
class UserListView(JSONResponseMixin, ParseJSONMixin, PaginationMixin, View):
"""用户列表(使用 Mixin)"""
def get(self, request):
users = User.objects.filter(is_active=True)
page = self.paginate_queryset(users)
data = [
{'id': u.id, 'username': u.username, 'email': u.email}
for u in page.object_list
]
return self.render_success({
'list': data,
'total': page.paginator.count,
'page': page.number,
'pages': page.paginator.num_pages,
})
def post(self, request):
data = self.get_json_data()
user = User.objects.create(
username=data['username'],
email=data['email']
)
return self.render_success(
{'id': user.id, 'username': user.username},
msg='created'
)
4.5 请求对象(HttpRequest)¶
def example_view(request):
# ===== 请求方法 =====
request.method # 'GET', 'POST', 'PUT', 'DELETE', etc.
# ===== 请求参数 =====
request.GET # QueryDict: URL 查询参数
request.POST # QueryDict: POST 表单数据
request.body # bytes: 原始请求体
request.FILES # MultiValueDict: 上传的文件
# GET 参数示例:/api/users/?page=1&size=20
page = request.GET.get('page', 1) # 单个值,带默认值
page = request.GET['page'] # 单个值,不存在会抛 KeyError
tags = request.GET.getlist('tag') # 多个值:?tag=a&tag=b
# POST 数据解析
import json
data = json.loads(request.body) # JSON 请求体
# ===== 请求头 =====
request.headers # dict-like: HTTP 头
request.headers.get('Authorization')
request.headers.get('Content-Type')
request.headers.get('User-Agent')
# ===== 请求元数据 =====
request.META # dict: WSGI 环境变量
request.META.get('REMOTE_ADDR') # 客户端 IP
request.META.get('HTTP_USER_AGENT') # User-Agent
request.META.get('HTTP_REFERER') # 来源页面
request.META.get('REQUEST_METHOD') # 请求方法
# ===== 用户与认证 =====
request.user # 当前用户对象(需要认证中间件)
request.user.is_authenticated # 是否已认证
request.user.is_staff # 是否是管理员
# ===== Session =====
request.session # dict-like: Session 数据
request.session['key'] = 'value'
request.session.get('key')
# ===== Cookie =====
request.COOKIES # dict: Cookie 数据
request.COOKIES.get('sessionid')
# ===== 其他 =====
request.path # '/api/users/1/'
request.path_info # 同 path
request.get_full_path() # '/api/users/1/?page=1'
request.get_host() # 'example.com'
request.is_secure() # 是否 HTTPS
request.is_ajax() # 是否 AJAX 请求(检查 X-Requested-With)
return JsonResponse({'status': 'ok'})
4.6 响应对象(HttpResponse)¶
from django.http import HttpResponse, JsonResponse, FileResponse, StreamingHttpResponse
from django.shortcuts import redirect, render
def response_examples(request):
# ===== HttpResponse(基础响应)=====
response = HttpResponse('Plain text response')
response = HttpResponse('<h1>HTML response</h1>', content_type='text/html')
response = HttpResponse(b'Binary data', content_type='application/octet-stream')
# ===== JsonResponse(JSON 响应)=====
data = {'code': 0, 'msg': 'success', 'data': {'id': 1, 'name': 'Alice'}}
response = JsonResponse(data)
# 非 dict 数据需要 safe=False
response = JsonResponse([1, 2, 3], safe=False)
# ===== 重定向 =====
response = redirect('/new-path/')
response = redirect('users:detail', pk=1) # 命名 URL
# ===== 文件下载 =====
response = FileResponse(
open('file.pdf', 'rb'),
as_attachment=True,
filename='download.pdf'
)
# ===== 流式响应(大文件)=====
def file_iterator(file_path, chunk_size=8192):
with open(file_path, 'rb') as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
yield chunk
response = StreamingHttpResponse(
file_iterator('large_file.zip'),
content_type='application/zip'
)
response['Content-Disposition'] = 'attachment; filename="file.zip"'
# ===== 自定义响应 =====
response = JsonResponse({'msg': 'ok'})
response.status_code = 201
response['X-Custom-Header'] = 'value'
response['Access-Control-Allow-Origin'] = '*'
response.set_cookie('token', 'xxx', max_age=3600, httponly=True, secure=True)
response.delete_cookie('old_token')
return response
4.7 视图装饰器¶
from django.views.decorators.http import require_http_methods, require_GET, require_POST
from django.views.decorators.csrf import csrf_exempt, csrf_protect
from django.views.decorators.cache import cache_page, never_cache
from django.contrib.auth.decorators import login_required, permission_required, user_passes_test
from django.utils.decorators import method_decorator
# ===== HTTP 方法限制 =====
@require_GET
def get_only_view(request):
pass
@require_POST
def post_only_view(request):
pass
@require_http_methods(['GET', 'POST'])
def get_post_view(request):
pass
# ===== CSRF 保护 =====
@csrf_exempt # 禁用 CSRF 保护(API 端点)
def api_view(request):
pass
# ===== 缓存 =====
@cache_page(60 * 15) # 缓存 15 分钟
def cached_view(request):
pass
@never_cache # 永不缓存
def never_cached_view(request):
pass
# ===== 认证与权限 =====
@login_required(login_url='/login/')
def protected_view(request):
pass
@permission_required('users.delete_user', raise_exception=True)
def delete_user_view(request):
pass
def is_vip(user):
return user.is_authenticated and user.is_vip
@user_passes_test(is_vip, login_url='/vip/upgrade/')
def vip_only_view(request):
pass
# ===== 类视图装饰器 =====
@method_decorator(login_required, name='dispatch') # 所有方法都需要登录
class ProtectedView(View):
pass
class MixedProtection(View):
@method_decorator(login_required)
def get(self, request): # 只有 GET 需要登录
pass
def post(self, request): # POST 不需要登录
pass
# ===== 自定义装饰器 =====
from functools import wraps
from django.http import JsonResponse
def api_login_required(view_func):
"""API 登录验证装饰器"""
@wraps(view_func)
def wrapper(request, *args, **kwargs):
if not request.user.is_authenticated:
return JsonResponse({
'code': 401,
'msg': 'Authentication required'
}, status=401)
return view_func(request, *args, **kwargs)
return wrapper
def rate_limit(max_requests=100, window=60):
"""API 限流装饰器"""
def decorator(view_func):
@wraps(view_func)
def wrapper(request, *args, **kwargs):
# 限流逻辑实现
return view_func(request, *args, **kwargs)
return wrapper
return decorator
@api_login_required
@rate_limit(max_requests=100, window=60)
def protected_api(request):
pass
4.8 请求生命周期¶
1. 用户请求到达 Web 服务器(Nginx/Apache)
↓
2. WSGI/ASGI 服务器(Gunicorn/Uvicorn)接收请求
↓
3. Django 请求处理开始
├─ 3.1 创建 HttpRequest 对象
├─ 3.2 中间件 process_request(从上到下)
├─ 3.3 URLconf 路由匹配
├─ 3.4 中间件 process_view
├─ 3.5 视图函数/类方法执行
│ ├─ 参数验证
│ ├─ 业务逻辑处理
│ ├─ 数据库操作(ORM)
│ └─ 生成响应
├─ 3.6 中间件 process_template_response(如果返回 TemplateResponse)
├─ 3.7 中间件 process_response(从下到上)
└─ 3.8 创建 HttpResponse 对象
↓
4. WSGI/ASGI 服务器发送响应
↓
5. Web 服务器返回给客户端
五、ORM 模型层(Models)¶
5.1 ORM 设计理念¶
什么是 ORM?
ORM(Object-Relational Mapping,对象关系映射)是一种编程技术,用于在面向对象编程语言中实现对象与关系型数据库之间的数据转换。
Django ORM 的核心设计理念:
DRY 原则:定义一次模型,多处使用
Pythonic:使用 Python 代码操作数据库,无需手写 SQL
数据库无关:支持多种数据库(PostgreSQL、MySQL、SQLite、Oracle)
延迟加载:QuerySet 是惰性的,直到真正需要数据时才查询
ORM 与原生 SQL 对比:
# 原生 SQL
SELECT * FROM users WHERE age > 18 AND status = 'active' ORDER BY created_at DESC;
# Django ORM
User.objects.filter(age__gt=18, status='active').order_by('-created_at')
5.2 模型定义与字段类型¶
基础模型定义
# apps/users/models.py
from django.db import models
class User(models.Model):
"""用户模型"""
# 主键(如果不定义,Django 会自动创建 id 字段)
id = models.BigAutoField(primary_key=True)
# 字符串字段
username = models.CharField(max_length=50, unique=True, verbose_name='用户名')
nickname = models.CharField(max_length=50, blank=True, default='', verbose_name='昵称')
email = models.EmailField(max_length=100, unique=True, verbose_name='邮箱')
phone = models.CharField(max_length=20, null=True, blank=True, verbose_name='手机号')
# 文本字段
bio = models.TextField(max_length=500, blank=True, verbose_name='个人简介')
# 数值字段
age = models.PositiveSmallIntegerField(null=True, blank=True, verbose_name='年龄')
balance = models.DecimalField(max_digits=10, decimal_places=2, default=0.00, verbose_name='余额')
score = models.FloatField(default=0.0, verbose_name='积分')
# 布尔字段
is_active = models.BooleanField(default=True, verbose_name='是否激活')
is_vip = models.BooleanField(default=False, verbose_name='是否VIP')
# 时间字段
created_at = models.DateTimeField(auto_now_add=True, verbose_name='创建时间')
updated_at = models.DateTimeField(auto_now=True, verbose_name='更新时间')
birth_date = models.DateField(null=True, blank=True, verbose_name='生日')
# 文件字段
avatar = models.ImageField(upload_to='avatars/%Y/%m/', null=True, blank=True, verbose_name='头像')
resume = models.FileField(upload_to='resumes/', null=True, blank=True, verbose_name='简历')
# JSON 字段(Django 3.1+)
settings = models.JSONField(default=dict, blank=True, verbose_name='设置')
# 枚举字段
class Gender(models.TextChoices):
MALE = 'M', '男'
FEMALE = 'F', '女'
OTHER = 'O', '其他'
gender = models.CharField(max_length=1, choices=Gender.choices, default=Gender.MALE, verbose_name='性别')
class Meta:
db_table = 'users'
verbose_name = '用户'
verbose_name_plural = '用户'
ordering = ['-created_at']
indexes = [
models.Index(fields=['username'], name='idx_username'),
models.Index(fields=['email'], name='idx_email'),
]
def __str__(self):
return self.username
常用字段类型
字段类型 |
说明 |
对应 SQL |
常用参数 |
|---|---|---|---|
|
自增整数 |
SERIAL/INT |
|
|
大自增整数 |
BIGSERIAL/BIGINT |
|
|
定长字符串 |
VARCHAR |
|
|
长文本 |
TEXT |
- |
|
整数 |
INTEGER |
- |
|
大整数 |
BIGINT |
- |
|
正整数 |
INTEGER UNSIGNED |
- |
|
小整数 |
SMALLINT |
- |
|
浮点数 |
DOUBLE |
- |
|
精确小数 |
DECIMAL |
|
|
布尔值 |
BOOLEAN |
- |
|
可空布尔值 |
BOOLEAN |
- |
|
日期 |
DATE |
|
|
日期时间 |
TIMESTAMP |
|
|
时间 |
TIME |
- |
|
邮箱 |
VARCHAR(254) |
- |
|
URL |
VARCHAR(200) |
- |
|
短标签 |
VARCHAR(50) |
|
|
UUID |
UUID |
- |
|
IPv4 |
VARCHAR(15) |
- |
|
IPv4/IPv6 |
VARCHAR(39) |
|
|
文件 |
VARCHAR(100) |
|
|
图片 |
VARCHAR(100) |
|
|
JSON 数据 |
JSON |
- |
|
二进制数据 |
BLOB |
- |
字段通用参数
models.CharField(
verbose_name='字段名', # 显示名称
help_text='帮助文本', # 表单帮助文本
default='默认值', # 默认值
null=True, # 允许 NULL(数据库层面)
blank=True, # 允许空值(表单验证层面)
unique=True, # 唯一约束
db_index=True, # 创建索引
db_column='custom_column', # 自定义数据库列名
primary_key=True, # 设为主键
editable=False, # 不可编辑(不在表单中显示)
error_messages={ # 自定义错误消息
'required': '此字段必填',
'max_length': '超出最大长度',
},
validators=[validate_custom], # 自定义验证器
)
5.3 模型元数据(Meta)¶
from django.core.validators import MaxValueValidator, MinValueValidator
class Product(models.Model):
name = models.CharField(max_length=100)
price = models.DecimalField(max_digits=10, decimal_places=2, validators=[MinValueValidator(1), MaxValueValidator(100)])
# validators 是应用层的验证比如django表单,django admin 验证,而constraints是数据库层的验证,这要注意区别
class Meta:
# 数据库表名(默认 app_label_model_name)
db_table = 'products'
# 显示名称
verbose_name = '商品'
verbose_name_plural = '商品列表'
# 默认排序
ordering = ['-created_at', 'name']
# 联合唯一约束
unique_together = [['name', 'category']]
# 联合索引
index_together = [['category', 'status']]
# 索引(Django 3.2+ 推荐)
indexes = [
models.Index(fields=['name'], name='idx_product_name'),
models.Index(fields=['price', 'status'], name='idx_price_status'),
]
# 约束(Django 4.2+)
constraints = [
models.CheckConstraint(
check=models.Q(price__gte=0),
name='price_positive',
violation_error_message='价格必须为正数'
),
models.UniqueConstraint(
fields=['sku'],
name='unique_sku',
violation_error_message='SKU 已存在'
),
]
# 抽象基类
# abstract = True
# 代理模型
# proxy = True
# 指定数据库
# using = 'other_db'
# 管理器
# base_manager_name = 'objects'
# default_manager_name = 'objects'
5.4 模型方法与属性¶
from django.db import models
from django.urls import reverse
class Article(models.Model):
title = models.CharField(max_length=200)
content = models.TextField()
author = models.ForeignKey('users.User', on_delete=models.CASCADE)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
ordering = ['-created_at']
# ===== 魔法方法 =====
def __str__(self):
return self.title
def __repr__(self):
return f'<Article {self.id}: {self.title}>'
# ===== 自定义方法 =====
def get_absolute_url(self):
"""获取对象详情页 URL"""
return reverse('articles:detail', kwargs={'pk': self.pk})
def get_summary(self, length=100):
"""获取文章摘要"""
if len(self.content) <= length:
return self.content
return self.content[:length] + '...'
def publish(self):
"""发布文章"""
self.status = 'published'
self.save(update_fields=['status'])
# ===== 属性装饰器 =====
@property
def word_count(self):
"""计算字数"""
return len(self.content)
@property
def reading_time(self):
"""预估阅读时间(分钟)"""
return max(1, self.word_count // 300)
@cached_property
def comment_count(self):
"""评论数(缓存)"""
return self.comments.count()
5.5 抽象模型与模型继承¶
抽象基类(Abstract Base Class)
# apps/core/models.py
from django.db import models
class BaseModel(models.Model):
"""抽象基础模型"""
created_at = models.DateTimeField(auto_now_add=True, verbose_name='创建时间')
updated_at = models.DateTimeField(auto_now=True, verbose_name='更新时间')
is_deleted = models.BooleanField(default=False, verbose_name='是否删除')
class Meta:
abstract = True # 抽象基类,不会创建数据库表
ordering = ['-created_at']
class SoftDeleteManager(models.Manager):
"""软删除管理器"""
def get_queryset(self):
return super().get_queryset().filter(is_deleted=False)
class SoftDeleteModel(BaseModel):
"""软删除模型"""
objects = SoftDeleteManager()
all_objects = models.Manager() # 包含已删除的记录
class Meta:
abstract = True
def delete(self, using=None, keep_parents=False):
"""软删除"""
self.is_deleted = True
self.save(update_fields=['is_deleted'])
def hard_delete(self, using=None, keep_parents=False):
"""硬删除"""
super().delete(using, keep_parents)
def restore(self):
"""恢复"""
self.is_deleted = False
self.save(update_fields=['is_deleted'])
# 使用
class Product(SoftDeleteModel):
name = models.CharField(max_length=100)
price = models.DecimalField(max_digits=10, decimal_places=2)
class Meta:
verbose_name = '商品'
verbose_name_plural = '商品'
多表继承(Multi-table Inheritance)
# 父模型
class Place(models.Model):
name = models.CharField(max_length=50)
address = models.CharField(max_length=80)
# 子模型(会创建独立的表,包含指向 Place 的 OneToOne 外键)
class Restaurant(Place):
serves_hot_dogs = models.BooleanField(default=False)
serves_pizza = models.BooleanField(default=False)
# 数据库结构
# place 表: id, name, address
# restaurant 表: place_ptr_id, serves_hot_dogs, serves_pizza
# 访问
restaurant = Restaurant.objects.get(id=1)
print(restaurant.name) # 可以直接访问父模型字段
print(restaurant.place_ptr_id) # 指向父模型的外键
代理模型(Proxy Model)
添加 实例方法 或 类方法
改变 默认 Manager/QuerySet
改变 Admin 界面或排序规则
改变 Meta 配置(比如 ordering)
class Person(models.Model):
first_name = models.CharField(max_length=30)
last_name = models.CharField(max_length=30)
class OrderedPerson(Person):
"""代理模型:改变默认排序,不创建新表"""
class Meta:
proxy = True
ordering = ['last_name', 'first_name']
@property
def full_name(self):
return f'{self.first_name} {self.last_name}'
# 使用
Person.objects.create(first_name='John', last_name='Doe')
print(OrderedPerson.objects.all()) # 按 last_name 排序
5.6 模型管理器(Manager)¶
自定义 Manager
用来改变默认查询集
from django.db import models
from django.db.models import Q
class ProductManager(models.Manager):
"""商品管理器"""
def active(self):
"""获取活跃商品"""
return self.filter(status='active', stock__gt=0)
def in_stock(self):
"""获取有库存的商品"""
return self.filter(stock__gt=0)
def on_sale(self):
"""获取促销商品"""
return self.filter(discount_price__isnull=False)
def search(self, keyword):
"""搜索商品"""
return self.filter(
Q(name__icontains=keyword) |
Q(description__icontains=keyword)
)
def get_by_natural_key(self, slug):
"""通过自然键获取"""
return self.get(slug=slug)
class ProductQuerySet(models.QuerySet):
"""自定义 QuerySet"""
def published(self):
return self.filter(status='published')
def by_category(self, category_id):
return self.filter(category_id=category_id)
def price_range(self, min_price, max_price):
return self.filter(price__gte=min_price, price__lte=max_price)
class Product(models.Model):
name = models.CharField(max_length=100)
status = models.CharField(max_length=20)
stock = models.PositiveIntegerField(default=0)
price = models.DecimalField(max_digits=10, decimal_places=2)
# 使用自定义 Manager
objects = ProductManager()
# 或者使用自定义 QuerySet
# objects = ProductQuerySet.as_manager()
# 使用
active_products = Product.objects.active()
search_results = Product.objects.search('手机')
多个 Manager
class Article(models.Model):
title = models.CharField(max_length=200)
status = models.CharField(max_length=20, default='draft')
# 默认管理器(第一个)
objects = models.Manager()
# 自定义管理器
published = PublishedManager()
drafts = DraftManager()
class PublishedManager(models.Manager):
def get_queryset(self):
return super().get_queryset().filter(status='published')
class DraftManager(models.Manager):
def get_queryset(self):
return super().get_queryset().filter(status='draft')
# 使用
Article.objects.all() # 所有文章
Article.published.all() # 已发布文章
Article.drafts.all() # 草稿文章
5.7 QuerySet 与惰性加载¶
QuerySet 的惰性特性
# QuerySet 是惰性的,以下语句不会查询数据库
users = User.objects.filter(age__gt=18) # 不查询
users = users.order_by('-created_at') # 不查询
users = users.select_related('profile') # 不查询
# 以下情况会触发查询
list(users) # 转换为列表
users[0] # 索引访问
for user in users: # 迭代
user in users # 成员检查
len(users) # 获取长度
bool(users) # 布尔判断
users.count() # 计数
users.exists() # 是否存在
缓存机制
# 第一次迭代会缓存结果
users = User.objects.all()
for user in users: # 查询数据库
print(user.name)
for user in users: # 使用缓存,不查询
print(user.email)
# 切片不会缓存
# 切片返回的是一个新的 QuerySet(或单个对象),它是"限制过的",没有经过完整求值,使用 list() 强制查询并缓存
users = User.objects.all()
print(users[0]) # 查询
print(users[0]) # 再次查询
# 使用 list() 强制缓存
users = list(User.objects.all())
六、数据库关系¶
字段类型 |
关系 |
反向查询返回 |
自动创建中间表 |
|---|---|---|---|
|
一对多 |
QuerySet(多个对象) |
否 |
|
一对一 |
单个对象 |
否 |
|
多对多 |
QuerySet(多个对象) |
是 |
6.1 一对多(ForeignKey)¶
基础用法
from django.db import models
class Category(models.Model):
"""商品分类"""
name = models.CharField(max_length=50, verbose_name='分类名称')
parent = models.ForeignKey(
'self', # 自关联
on_delete=models.CASCADE,
null=True,
blank=True,
related_name='children',
verbose_name='父分类'
)
class Meta:
verbose_name = '分类'
verbose_name_plural = '分类'
def __str__(self):
return self.name
class Product(models.Model):
"""商品"""
name = models.CharField(max_length=100, verbose_name='商品名称')
category = models.ForeignKey(
Category,
on_delete=models.CASCADE,
related_name='products', # 反向关系名称
related_query_name='product', # 反向查询名称
verbose_name='分类'
)
price = models.DecimalField(max_digits=10, decimal_places=2, verbose_name='价格')
class Meta:
verbose_name = '商品'
verbose_name_plural = '商品'
class Order(models.Model):
"""订单"""
user = models.ForeignKey(
'users.User',
on_delete=models.CASCADE,
related_name='orders',
verbose_name='用户'
)
total_amount = models.DecimalField(max_digits=10, decimal_places=2, verbose_name='总金额')
created_at = models.DateTimeField(auto_now_add=True, verbose_name='创建时间')
class Meta:
verbose_name = '订单'
verbose_name_plural = '订单'
on_delete 参数详解
# 1. CASCADE - 级联删除(最常用)
# 删除父对象时,自动删除子对象
product = models.ForeignKey(Category, on_delete=models.CASCADE)
# 2. PROTECT - 保护删除
# 删除父对象时,如果有子对象引用,抛出 ProtectedError
product = models.ForeignKey(Category, on_delete=models.PROTECT)
# 3. SET_NULL - 置空
# 删除父对象时,将外键字段设为 NULL(需要 null=True)
product = models.ForeignKey(Category, on_delete=models.SET_NULL, null=True)
# 4. SET_DEFAULT - 设为默认值
# 删除父对象时,将外键字段设为默认值
product = models.ForeignKey(Category, on_delete=models.SET_DEFAULT, default=1)
# 5. SET() - 设为指定值或调用函数
product = models.ForeignKey(Category, on_delete=models.SET(get_default_category))
# 6. DO_NOTHING - 不操作
# 删除父对象时,不对外键做任何操作(可能导致数据库完整性错误)
product = models.ForeignKey(Category, on_delete=models.DO_NOTHING)
# 7. RESTRICT - 限制删除(Django 3.1+)
# 类似 PROTECT,但在某些情况下允许删除
product = models.ForeignKey(Category, on_delete=models.RESTRICT)
关系查询
# 正向查询(从子到父)
product = Product.objects.get(id=1)
print(product.category.name) # 获取商品所属分类
# 反向查询(从父到子)
category = Category.objects.get(id=1)
products = category.products.all() # 获取分类下所有商品
# 使用 related_name
orders = user.orders.filter(status='pending') # 获取用户的待处理订单
# 跨关系查询(双下划线)
# 查询某个分类下的所有商品
products = Product.objects.filter(category__name='电子产品')
# 查询某个用户的订单
orders = Order.objects.filter(user__username='john')
# 多级跨关系查询
# 查询订单中商品属于某个分类的订单
orders = Order.objects.filter(items__product__category__name='电子产品')
6.2 多对多(ManyToManyField)¶
基础用法
class Tag(models.Model):
"""标签"""
name = models.CharField(max_length=30, unique=True, verbose_name='标签名')
class Meta:
verbose_name = '标签'
verbose_name_plural = '标签'
class Article(models.Model):
"""文章"""
title = models.CharField(max_length=200, verbose_name='标题')
content = models.TextField(verbose_name='内容')
tags = models.ManyToManyField(
Tag,
related_name='articles',
blank=True,
verbose_name='标签'
)
class Meta:
verbose_name = '文章'
verbose_name_plural = '文章'
# 使用示例
python_tag = Tag.objects.create(name='Python')
django_tag = Tag.objects.create(name='Django')
article = Article.objects.create(title='Django ORM 详解', content='...')
article.tags.add(python_tag, django_tag) # 添加标签
article.tags.remove(python_tag) # 移除标签
article.tags.clear() # 清空所有标签
article.tags.set([python_tag, django_tag]) # 设置标签(会替换原有)
# 查询
articles = Article.objects.filter(tags__name='Python') # 有 Python 标签的文章
tags = python_tag.articles.all() # Python 标签的所有文章
自定义中间模型
当需要在多对多关系中存储额外字段时,使用 through 参数:
class Student(models.Model):
"""学生"""
name = models.CharField(max_length=50)
class Meta:
verbose_name = '学生'
verbose_name_plural = '学生'
class Course(models.Model):
"""课程"""
name = models.CharField(max_length=100)
students = models.ManyToManyField(
Student,
through='Enrollment', # 指定中间模型
through_fields=('course', 'student'), # 指定字段顺序
related_name='courses'
)
class Meta:
verbose_name = '课程'
verbose_name_plural = '课程'
class Enrollment(models.Model):
"""选课记录(中间模型)"""
student = models.ForeignKey(Student, on_delete=models.CASCADE)
course = models.ForeignKey(Course, on_delete=models.CASCADE)
enrolled_at = models.DateTimeField(auto_now_add=True)
grade = models.DecimalField(max_digits=4, decimal_places=2, null=True, blank=True)
semester = models.CharField(max_length=20)
class Meta:
db_table = 'enrollments'
unique_together = [['student', 'course', 'semester']]
verbose_name = '选课记录'
verbose_name_plural = '选课记录'
# 使用
student = Student.objects.create(name='张三')
course = Course.objects.create(name='Python 编程')
# 通过中间模型创建关系
enrollment = Enrollment.objects.create(
student=student,
course=course,
semester='2024春季'
)
# 查询
enrollments = Enrollment.objects.filter(student=student, grade__gte=60)
courses = student.courses.all()
students = course.students.all()
对称关系
class Person(models.Model):
"""人"""
name = models.CharField(max_length=50)
# 对称关系:如果 A 是 B 的朋友,B 也是 A 的朋友
friends = models.ManyToManyField(
'self',
symmetrical=True, # 对称关系(默认)
blank=True
)
# 非对称关系:关注关系
following = models.ManyToManyField(
'self',
symmetrical=False, # 非对称关系
related_name='followers',
blank=True
)
# 使用
alice = Person.objects.create(name='Alice')
bob = Person.objects.create(name='Bob')
alice.friends.add(bob) # Alice 和 Bob 互为朋友
alice.following.add(bob) # Alice 关注 Bob,但 Bob 不一定关注 Alice
6.3 一对一(OneToOneField)¶
基础用法
from django.contrib.auth.models import User
class UserProfile(models.Model):
"""用户资料"""
user = models.OneToOneField(
User,
on_delete=models.CASCADE,
related_name='profile',
primary_key=True # 使用 user_id 作为主键
)
bio = models.TextField(max_length=500, blank=True, verbose_name='简介')
birth_date = models.DateField(null=True, blank=True, verbose_name='生日')
location = models.CharField(max_length=100, blank=True, verbose_name='所在地')
website = models.URLField(blank=True, verbose_name='个人网站')
class Meta:
verbose_name = '用户资料'
verbose_name_plural = '用户资料'
class Place(models.Model):
"""地点"""
name = models.CharField(max_length=50)
address = models.CharField(max_length=80)
class Restaurant(models.Model):
"""餐厅(继承自 Place)"""
place = models.OneToOneField(
Place,
on_delete=models.CASCADE,
parent_link=True # 标识为父链接
)
serves_hot_dogs = models.BooleanField(default=False)
serves_pizza = models.BooleanField(default=False)
关系查询
# 正向查询
profile = UserProfile.objects.get(user_id=1)
print(profile.user.username)
# 反向查询
user = User.objects.get(id=1)
print(user.profile.bio) # 获取用户资料
# 跨关系查询
users = User.objects.filter(profile__birth_date__year=1990)
profiles = UserProfile.objects.filter(user__username='john')
# 创建时自动创建关联对象
from django.db.models.signals import post_save
from django.dispatch import receiver
@receiver(post_save, sender=User)
def create_user_profile(sender, instance, created, **kwargs):
if created:
UserProfile.objects.create(user=instance)
@receiver(post_save, sender=User)
def save_user_profile(sender, instance, **kwargs):
instance.profile.save()
6.4 关系查询与反向关系¶
反向关系查询
# related_name 的使用
class Comment(models.Model):
article = models.ForeignKey(
Article,
on_delete=models.CASCADE,
related_name='comments' # 默认是 comment_set
)
content = models.TextField()
# 查询
article = Article.objects.get(id=1)
comments = article.comments.all() # 使用 related_name
comments = article.comment_set.all() # 如果不设置 related_name
# related_query_name 的使用
class Product(models.Model):
category = models.ForeignKey(
Category,
on_delete=models.CASCADE,
related_name='products',
related_query_name='product' # 用于过滤查询
)
# 使用 related_query_name 进行过滤
categories = Category.objects.filter(product__price__gt=100)
6.5 自关联与递归结构¶
树形结构(递归外键)
class Category(models.Model):
"""分类(树形结构)"""
name = models.CharField(max_length=50)
parent = models.ForeignKey(
'self',
on_delete=models.CASCADE,
null=True,
blank=True,
related_name='children'
)
class Meta:
verbose_name = '分类'
verbose_name_plural = '分类'
def __str__(self):
return self.name
@property
def level(self):
"""获取层级"""
level = 0
parent = self.parent
while parent:
level += 1
parent = parent.parent
return level
def get_ancestors(self):
"""获取所有祖先"""
ancestors = []
parent = self.parent
while parent:
ancestors.append(parent)
parent = parent.parent
return ancestors[::-1] # 反转,从根节点开始
def get_descendants(self):
"""获取所有后代"""
descendants = []
children = list(self.children.all())
while children:
child = children.pop(0)
descendants.append(child)
children.extend(child.children.all())
return descendants
# 使用
electronics = Category.objects.create(name='电子产品')
phones = Category.objects.create(name='手机', parent=electronics)
iPhones = Category.objects.create(name='iPhone', parent=phones)
print(iPhones.level) # 2
print([c.name for c in iPhones.get_ancestors()]) # ['电子产品', '手机']
使用 django-mptt 优化树形结构
pip install django-mptt
from mptt.models import MPTTModel, TreeForeignKey
class Category(MPTTModel):
"""MPTT 分类"""
name = models.CharField(max_length=50)
parent = TreeForeignKey(
'self',
on_delete=models.CASCADE,
null=True,
blank=True,
related_name='children'
)
class MPTTMeta:
order_insertion_by = ['name']
class Meta:
verbose_name = '分类'
verbose_name_plural = '分类'
# MPTT 提供的方法
category.get_ancestors() # 获取祖先
category.get_descendants() # 获取后代
category.get_children() # 获取子节点
category.get_siblings() # 获取兄弟节点
category.is_root_node() # 是否是根节点
category.is_leaf_node() # 是否是叶子节点
# 整棵树查询(单次查询获取所有节点)
tree = Category.objects.all()
七、数据查询¶
7.1 基础查询操作(CRUD)¶
创建(Create)
# 方式1:创建并保存
user = User(username='john', email='john@example.com')
user.save()
# 方式2:使用 create()(推荐)
user = User.objects.create(username='john', email='john@example.com')
# 方式3:使用 get_or_create()(避免重复创建)
user, created = User.objects.get_or_create(
username='john',
defaults={'email': 'john@example.com'}
)
# 方式4:使用 update_or_create()
user, created = User.objects.update_or_create(
username='john',
defaults={'email': 'new@example.com'}
)
# 方式5:批量创建
users = [
User(username='user1', email='user1@example.com'),
User(username='user2', email='user2@example.com'),
User(username='user3', email='user3@example.com'),
]
User.objects.bulk_create(users)
查询(Read)
# 获取所有记录
all_users = User.objects.all()
# 获取单条记录
user = User.objects.get(id=1) # 不存在时抛出 DoesNotExist
user = User.objects.get(username='john') # 多条匹配时抛出 MultipleObjectsReturned
# 获取第一条记录(可能返回 None)
first_user = User.objects.first()
# 获取最后一条记录
last_user = User.objects.last()
# 获取指定字段值
user_id = User.objects.get(id=1).id
# 使用 values() 获取字典
user_dict = User.objects.values('id', 'username').get(id=1)
# {'id': 1, 'username': 'john'}
# 使用 values_list() 获取元组
user_tuple = User.objects.values_list('id', 'username').get(id=1)
# (1, 'john')
# 扁平化列表
usernames = User.objects.values_list('username', flat=True)
# ['john', 'jane', 'bob']
更新(Update)
# 方式1:获取后修改保存
user = User.objects.get(id=1)
user.email = 'new@example.com'
user.save()
# 方式2:使用 update()(推荐,批量更新)
User.objects.filter(status='inactive').update(status='active')
# 方式3:更新单个对象
User.objects.filter(id=1).update(email='new@example.com')
# 方式4:使用 F() 表达式
from django.db.models import F
Product.objects.update(price=F('price') * 1.1) # 涨价 10%
# 方式5:批量更新
users = User.objects.filter(status='pending')
for user in users:
user.status = 'active'
User.objects.bulk_update(users, ['status'])
删除(Delete)
# 方式1:获取后删除
user = User.objects.get(id=1)
user.delete()
# 方式2:使用 delete() 批量删除
User.objects.filter(status='inactive').delete()
# 方式3:清空表(慎用)
User.objects.all().delete()
# 级联删除
# 如果 User 有外键关联 Order,删除 User 会自动删除关联的 Order(取决于 on_delete)
7.2 查询集(QuerySet)API¶
过滤查询
# filter() - 包含匹配
users = User.objects.filter(age__gt=18)
# exclude() - 排除匹配
users = User.objects.exclude(status='banned')
# 链式调用
users = User.objects.filter(age__gt=18).exclude(status='inactive')
# 获取数量
count = User.objects.filter(status='active').count()
# 检查是否存在
exists = User.objects.filter(username='john').exists()
# 去重
users = User.objects.values('city').distinct()
排序
# 升序
users = User.objects.order_by('created_at')
# 降序
users = User.objects.order_by('-created_at')
# 多字段排序
users = User.objects.order_by('status', '-created_at')
# 随机排序
users = User.objects.order_by('?')
# 移除排序
users = User.objects.order_by() # 使用模型默认排序
切片与分页
# 切片(LIMIT/OFFSET)
first_10 = User.objects.all()[:10] # 前10条
next_10 = User.objects.all()[10:20] # 第11-20条
# 注意:切片后不能继续过滤
# users = User.objects.all()[:10].filter(...) # 错误!
# 分页
from django.core.paginator import Paginator
users = User.objects.all()
paginator = Paginator(users, 10) # 每页10条
page_1 = paginator.page(1) # 第一页
page_2 = paginator.page(2) # 第二页
# 页面对象属性
page_1.object_list # 当前页数据
page_1.has_next() # 是否有下一页
page_1.has_previous() # 是否有上一页
page_1.next_page_number() # 下一页页码
page_1.previous_page_number() # 上一页页码
page_1.paginator.count # 总记录数
page_1.paginator.num_pages # 总页数
字段选择
# 只查询指定字段(defer 相反)
users = User.objects.only('id', 'username') # 只查询 id 和 username
# 排除指定字段
users = User.objects.defer('large_field') # 不查询 large_field
# 使用 values() 返回字典
users = User.objects.values('id', 'username')
# 使用 values_list() 返回元组
users = User.objects.values_list('id', 'username')
# <QuerySet [(1, '小明'), (2, '小张'), (3, '小红')]>
# values_list扁平化结果
usernames = User.objects.values_list('username', flat=True) # 单字段时可以使用 flat=True 来返回一个简单的列表
# values_list命名元组
usernames = User.objects.values_list('id', 'name', named=True)
# <QuerySet [Row(id=1, title='小红'), ...]>
7.3 字段查找(Field Lookups)¶
比较查找
# 精确匹配
User.objects.filter(username__exact='john')
User.objects.filter(username='john') # exact 是默认的
# 不区分大小写匹配
User.objects.filter(username__iexact='John')
# 包含
User.objects.filter(username__contains='jo')
# 不区分大小写包含
User.objects.filter(username__icontains='Jo')
# 以...开头
User.objects.filter(username__startswith='jo')
User.objects.filter(username__istartswith='Jo')
# 以...结尾
User.objects.filter(username__endswith='hn')
User.objects.filter(username__iendswith='HN')
# 在列表中
User.objects.filter(id__in=[1, 2, 3])
# 大于/大于等于
User.objects.filter(age__gt=18)
User.objects.filter(age__gte=18)
# 小于/小于等于
User.objects.filter(age__lt=60)
User.objects.filter(age__lte=60)
# 范围
User.objects.filter(age__range=(18, 60))
# 为空
User.objects.filter(email__isnull=True)
User.objects.filter(email__isnull=False)
日期时间查找
# 日期
User.objects.filter(created_at__date='2024-01-01')
# 年份
User.objects.filter(created_at__year=2024)
User.objects.filter(created_at__year__gt=2023)
# 月份
User.objects.filter(created_at__month=1)
# 日
User.objects.filter(created_at__day=1)
# 周几(1=周日,7=周六)
User.objects.filter(created_at__week_day=1)
# 周数
User.objects.filter(created_at__week=1)
# 季度
User.objects.filter(created_at__quarter=1)
# 时间
User.objects.filter(created_at__time='12:00:00')
# 小时
User.objects.filter(created_at__hour=12)
# 分钟
User.objects.filter(created_at__minute=30)
# 秒
User.objects.filter(created_at__second=0)
# 范围
from datetime import datetime
start = datetime(2024, 1, 1)
end = datetime(2024, 12, 31)
User.objects.filter(created_at__range=(start, end))
JSON 字段查找(Django 3.1+)
# 精确匹配
User.objects.filter(settings__theme='dark')
# 包含键
User.objects.filter(settings__has_key='notifications')
# 嵌套查找
User.objects.filter(settings__preferences__language='zh')
# 数组查找
User.objects.filter(settings__tags__contains=['vip'])
# 数组包含任意
User.objects.filter(settings__tags__overlap=['vip', 'premium'])
7.4 Q 对象复杂查询¶
from django.db.models import Q
# 简单 OR 查询
User.objects.filter(Q(username='john') | Q(email='john@example.com'))
# AND 查询(默认)
User.objects.filter(Q(age__gt=18) & Q(status='active'))
# 等同于
User.objects.filter(age__gt=18, status='active')
# NOT 查询
User.objects.filter(~Q(status='inactive'))
# 复杂组合
User.objects.filter(
(Q(username__startswith='j') | Q(email__contains='john')) &
Q(is_active=True) &
~Q(status='banned')
)
# 动态构建 Q 对象
query = Q()
if username:
query |= Q(username__icontains=username)
if email:
query |= Q(email__icontains=email)
if phone:
query |= Q(phone__contains=phone)
users = User.objects.filter(query)
7.5 F 表达式字段引用¶
from django.db.models import F
# 在查询中引用字段
# 查询评论数大于点赞数的文章
Article.objects.filter(comment_count__gt=F('like_count'))
# 更新时引用字段
# 所有商品价格增加 10%
Product.objects.update(price=F('price') * 1.1)
# 库存减少
Product.objects.filter(id=1).update(stock=F('stock') - 1)
# 跨关系引用
# 查询订单金额大于用户余额的订单
Order.objects.filter(amount__gt=F('user__balance'))
# 使用算术运算
Product.objects.filter(price__gt=F('cost') * 2) # 利润率大于 100%
# 使用日期运算
from datetime import timedelta
from django.db.models.functions import Now
# 查询创建时间超过30天的订单
Order.objects.filter(created_at__lt=Now() - timedelta(days=30))
7.6 聚合与注解¶
聚合(Aggregate)
# aggregate — 对整个 QuerySet 汇总,返回一个字典结果,不需要model的情况
# 强调的是聚合,是整体的反映情况
from django.db.models import Count, Sum, Avg, Max, Min, StdDev, Variance
# 全局聚合
User.objects.aggregate(Count('id')) # {'id__count': 100}
User.objects.aggregate(total=Count('id')) # {'total': 100}
# 多表:统计某个用户所有订单的总金额
User.objects.filter(id=1).aggregate(total=Sum('orders__amount'))
# 多个聚合
stats = User.objects.aggregate(
total=Count('id'),
avg_age=Avg('age'),
max_age=Max('age'),
min_age=Min('age')
)
# {'total': 100, 'avg_age': 30.5, 'max_age': 80, 'min_age': 18}
# 条件聚合
from django.db.models import Q
User.objects.aggregate(
active_count=Count('id', filter=Q(is_active=True)),
inactive_count=Count('id', filter=Q(is_active=False))
)
注解(Annotate)
# annotate — 对每一行追加字段,返回QuerySet,返回的是model,顺便带上其他字段
# annotate 的 GROUP BY 对象 = 你查的是哪个 Model
# annotate 强调的是各自怎样,是每一条数据的情况,常用与多表操作
from django.db.models import Count, Sum, F, OuterRef, Subquery
users = User.objects.annotate(order_count=Count('orders'))
for user in users:
print(user.name, user.order_count)
# SELECT users.*, COUNT(orders.id) AS order_count FROM users LEFT JOIN orders ON orders.user_id = users.id GROUP BY users.id
# 按 status 分组,统计每种状态的订单数, 返回结果变成QuerySet of dict
Order.objects.values('status').annotate(count=Count('id'))
# SELECT status, COUNT(id) AS count FROM orders GROUP BY status -- 不再是 id,而是 status
# return <QuerySet [{'status': 'paid', 'count': 10}, {'status': 'pending', 'count': 5}] ]>
# 条件注解
from django.db.models import Case, When, Value, IntegerField
users = User.objects.annotate(
status_order=Case(
When(status='vip', then=Value(1)),
When(status='active', then=Value(2)),
When(status='inactive', then=Value(3)),
default=Value(4),
output_field=IntegerField()
)
).order_by('status_order')
# 子查询注解
from django.db.models import OuterRef, Subquery
latest_order = Order.objects.filter(
user=OuterRef('pk')
).order_by('-created_at')
users = User.objects.annotate(
latest_order_date=Subquery(
latest_order.values('created_at')[:1]
)
)
7.7 原生 SQL 查询¶
原始查询(Raw Query)
# 基础用法
users = User.objects.raw('SELECT * FROM users WHERE age > %s', [18])
# 指定模型
users = User.objects.raw('''
SELECT u.*, COUNT(o.id) as order_count
FROM users u
LEFT JOIN orders o ON u.id = o.user_id
WHERE u.age > %s
GROUP BY u.id
''', [18])
# 访问额外字段
for user in users:
print(user.username, user.order_count)
# 使用字典游标
from django.db import connection
with connection.cursor() as cursor:
cursor.execute('SELECT * FROM users WHERE age > %s', [18])
rows = cursor.fetchall()
for row in rows:
print(row)
执行原始 SQL
from django.db import connection
# 查询
with connection.cursor() as cursor:
cursor.execute('SELECT COUNT(*) FROM users')
count = cursor.fetchone()[0]
# 插入/更新/删除
with connection.cursor() as cursor:
cursor.execute('''
UPDATE users SET status = %s WHERE last_login < %s
''', ['inactive', '2024-01-01'])
affected_rows = cursor.rowcount
# 使用命名参数
with connection.cursor() as cursor:
cursor.execute('''
SELECT * FROM users WHERE username = %(username)s
''', {'username': 'john'})
7.8 查询性能优化¶
select_related(一对一、多对一)
# 不使用优化:N+1 问题
orders = Order.objects.all()
for order in orders:
print(order.user.username) # 每次都会查询用户表
# 使用 select_related
orders = Order.objects.select_related('user')
for order in orders:
print(order.user.username) # 使用缓存,不查询
# 多级关联
orders = Order.objects.select_related('user', 'user__profile', 'product__category')
prefetch_related(多对多、一对多)
# 不使用优化:N+1 问题
users = User.objects.all()
for user in users:
for order in user.orders.all(): # 每次都会查询订单表
print(order.id)
# 使用 prefetch_related
users = User.objects.prefetch_related('orders')
for user in users:
for order in user.orders.all(): # 使用缓存,不查询
print(order.id)
# 多级预取
users = User.objects.prefetch_related(
'orders',
'orders__items',
'orders__items__product'
)
# 带过滤的预取
from django.db.models import Prefetch
users = User.objects.prefetch_related(
Prefetch(
'orders',
queryset=Order.objects.filter(status='completed'),
to_attr='completed_orders'
)
)
Prefetch 优化查询
from django.db.models import Prefetch
# 优化反向关系查询(避免 N+1 问题)
# 不使用优化:每次访问 article.comments 都会查询数据库
articles = Article.objects.all()
for article in articles:
for comment in article.comments.all(): # N+1 问题
print(comment.content)
# 使用 prefetch_related 优化
articles = Article.objects.prefetch_related('comments')
for article in articles:
for comment in article.comments.all(): # 使用缓存,不查询
print(comment.content)
# 复杂 Prefetch
articles = Article.objects.prefetch_related(
Prefetch(
'comments',
queryset=Comment.objects.filter(is_approved=True),
to_attr='approved_comments' # 存储到自定义属性
)
)
for article in articles:
for comment in article.approved_comments: # 使用自定义属性
print(comment.content)
其他优化技巧
# 1. 只查询需要的字段
users = User.objects.only('id', 'username')
# 2. 使用 iterator() 处理大量数据
for user in User.objects.all().iterator():
process_user(user)
# 3. 使用 count() 而不是 len()
count = User.objects.count() # 使用 SELECT COUNT(*)
count = len(User.objects.all()) # 加载所有数据
# 4. 使用 exists() 而不是 bool()
exists = User.objects.filter(username='john').exists()
exists = bool(User.objects.filter(username='john')) # 低效
# 5. 批量操作
User.objects.bulk_create(users, batch_size=100) # 分批创建
User.objects.bulk_update(users, ['status'])
# 6. 使用数据库索引(模型定义中)
class User(models.Model):
username = models.CharField(max_length=50, db_index=True)
class Meta:
indexes = [
models.Index(fields=['username', 'email']),
]
八、数据库迁移¶
8.1 迁移系统原理¶
什么是迁移?
迁移是 Django 用于管理数据库模式(Schema)变更的系统。它允许你通过 Python 代码来描述数据库结构的变化,而不是直接编写 SQL。
迁移的核心组件:
迁移文件:位于
app/migrations/目录下的 Python 文件迁移记录表:
django_migrations表记录已应用的迁移迁移执行器:
migrate命令负责应用或回滚迁移
迁移文件结构:
# 0001_initial.py
from django.db import migrations, models
class Migration(migrations.Migration):
# 依赖的迁移
dependencies = [
('users', '0001_initial'),
]
# 迁移操作
operations = [
migrations.CreateModel(
name='User',
fields=[
('id', models.BigAutoField(primary_key=True)),
('username', models.CharField(max_length=50)),
],
),
]
迁移的工作流程:
1. 修改模型(models.py)
↓
2. 生成迁移文件(makemigrations)
↓
3. 应用迁移(migrate)
↓
4. 数据库结构更新
8.2 创建和应用迁移¶
生成迁移文件
# 为所有应用生成迁移
python manage.py makemigrations
# 为指定应用生成迁移
python manage.py makemigrations users
# 命名迁移
python manage.py makemigrations users --name add_user_profile
# 检查迁移(不生成文件)
python manage.py makemigrations --check
# 显示迁移的 SQL(不执行)
python manage.py sqlmigrate users 0001
# 显示迁移的依赖图
python manage.py showmigrations
# 显示迁移的依赖树
python manage.py showmigrations --plan
应用迁移
# 应用所有迁移
python manage.py migrate
# 应用到指定版本
python manage.py migrate users 0001
# 假应用(标记为已应用但不执行 SQL)
python manage.py migrate --fake
# 假应用到指定版本
python manage.py migrate users 0001 --fake
# 应用迁移但不执行(仅显示 SQL)
python manage.py migrate --plan
8.3 迁移文件管理¶
迁移操作类型
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
dependencies = [('users', '0001_initial')]
operations = [
# 创建模型
migrations.CreateModel(
name='Profile',
fields=[
('id', models.BigAutoField(primary_key=True)),
('bio', models.TextField(blank=True)),
],
),
# 删除模型
migrations.DeleteModel(
name='OldModel',
),
# 添加字段
migrations.AddField(
model_name='user',
name='phone',
field=models.CharField(max_length=20, null=True),
),
# 删除字段
migrations.RemoveField(
model_name='user',
name='old_field',
),
# 修改字段
migrations.AlterField(
model_name='user',
name='email',
field=models.EmailField(max_length=100, unique=True),
),
# 重命名字段
migrations.RenameField(
model_name='user',
old_name='username',
new_name='name',
),
# 添加索引
migrations.AddIndex(
model_name='user',
index=models.Index(fields=['email'], name='idx_user_email'),
),
# 删除索引
migrations.RemoveIndex(
model_name='user',
name='idx_user_email',
),
# 添加约束
migrations.AddConstraint(
model_name='product',
constraint=models.CheckConstraint(
check=models.Q(price__gte=0),
name='price_positive',
),
),
# 运行自定义 SQL
migrations.RunSQL(
sql='CREATE INDEX idx_custom ON users (username);',
reverse_sql='DROP INDEX idx_custom;',
),
# 运行 Python 代码
migrations.RunPython(forward_func, reverse_func),
]
模型重命名
# 重命名模型
class Migration(migrations.Migration):
dependencies = [('users', '0001_initial')]
operations = [
migrations.RenameModel(
old_name='UserProfile',
new_name='Profile',
),
]
8.4 数据迁移¶
什么是数据迁移?
数据迁移是在模式迁移之外,用于修改或迁移数据的特殊迁移。
创建数据迁移
# 创建空的数据迁移
python manage.py makemigrations users --empty --name populate_categories
数据迁移示例
# users/migrations/0002_populate_categories.py
from django.db import migrations
def populate_categories(apps, schema_editor):
"""填充初始分类数据"""
Category = apps.get_model('products', 'Category')
categories = [
{'name': '电子产品', 'slug': 'electronics'},
{'name': '服装', 'slug': 'clothing'},
{'name': '食品', 'slug': 'food'},
]
for cat_data in categories:
Category.objects.get_or_create(
slug=cat_data['slug'],
defaults={'name': cat_data['name']}
)
def reverse_populate(apps, schema_editor):
"""回滚操作"""
Category = apps.get_model('products', 'Category')
Category.objects.filter(slug__in=['electronics', 'clothing', 'food']).delete()
class Migration(migrations.Migration):
dependencies = [
('products', '0001_initial'),
]
operations = [
migrations.RunPython(populate_categories, reverse_populate),
]
数据迁移最佳实践
def migrate_data(apps, schema_editor):
"""数据迁移最佳实践"""
# 1. 使用 apps.get_model() 获取模型,而不是直接导入
User = apps.get_model('users', 'User')
Profile = apps.get_model('users', 'Profile')
# 2. 使用 schema_editor 进行数据库操作
db_alias = schema_editor.connection.alias
# 3. 批量处理大数据集
batch_size = 1000
users = User.objects.using(db_alias).all()
profiles = []
for user in users.iterator():
profiles.append(Profile(user=user, bio=''))
if len(profiles) >= batch_size:
Profile.objects.using(db_alias).bulk_create(profiles)
profiles = []
if profiles:
Profile.objects.using(db_alias).bulk_create(profiles)
# 4. 使用 atomic 确保原子性
# 迁移默认在事务中运行,无需额外处理
8.5 迁移回滚与合并¶
回滚迁移
# 回滚到指定版本
python manage.py migrate users 0001
# 回滚所有迁移(删除表)
python manage.py migrate users zero
# 回滚并重新应用
python manage.py migrate users 0001
python manage.py migrate
回滚注意事项:
# 1. 数据丢失风险
# 回滚删除字段的迁移会导致该字段数据丢失
# 2. 外键约束
# 回滚包含外键的迁移可能需要先回滚依赖的迁移
# 3. 数据迁移回滚
# 确保 reverse_func 正确实现
合并迁移
当多个开发者同时创建迁移时,可能会产生分支:
0001_initial.py
├── 0002_add_field_a.py (开发者A)
└── 0002_add_field_b.py (开发者B)
解决方案:
# 方法1:手动编辑依赖
# 修改 0002_add_field_b.py 的 dependencies
dependencies = [
('users', '0002_add_field_a'), # 改为依赖另一个分支
]
# 方法2:使用 makemigrations --merge
python manage.py makemigrations --merge
# 这会创建一个新的合并迁移
# 0003_merge_0002_add_field_a_0002_add_field_b.py
迁移冲突解决
# 合并迁移示例
class Migration(migrations.Migration):
dependencies = [
('users', '0002_add_field_a'),
('users', '0002_add_field_b'),
]
operations = [
# 合并后的操作
]
迁移压缩
# 压缩迁移(合并多个迁移为一个)
python manage.py squashmigrations users 0001 0005
# 压缩并忽略可优化
python manage.py squashmigrations users 0001 0005 --no-optimize
迁移最佳实践
# 1. 不要修改已应用的迁移文件
# 错误做法:修改 0001_initial.py 后重新应用
# 2. 创建新的迁移来修正
# 正确做法:创建 0002_fix_something.py
# 3. 在模型中使用 help_text 和 verbose_name
class User(models.Model):
username = models.CharField(
max_length=50,
help_text='用户的登录名',
verbose_name='用户名'
)
# 4. 使用 makemigrations --check 在 CI 中检查
# 确保所有模型变更都已生成迁移
# 5. 测试迁移
# 在测试环境中先运行迁移,确认无误后再应用到生产环境
# 6. 备份数据库
# 在生产环境应用迁移前,务必备份数据库
九、事务与并发¶
9.1 事务的 ACID 特性¶
什么是事务?
事务是一组数据库操作的逻辑单元,这些操作要么全部成功执行,要么全部不执行。
ACID 特性:
特性 |
说明 |
示例 |
|---|---|---|
Atomicity(原子性) |
事务是不可分割的最小单元,要么全成功,要么全失败 |
转账:扣款和入账必须同时成功或失败 |
Consistency(一致性) |
事务执行前后,数据库必须处于一致状态 |
转账前后,双方账户总额不变 |
Isolation(隔离性) |
并发事务之间相互隔离,互不干扰 |
两个用户同时修改同一数据时不会相互影响 |
Durability(持久性) |
事务一旦提交,数据永久保存 |
提交后即使系统崩溃,数据也不会丢失 |
9.2 Django 事务管理¶
自动提交模式
Django 默认开启自动提交(autocommit),每个 SQL 语句都是一个独立的事务。
# 默认行为:自动提交
user = User.objects.create(username='john') # 自动提交
user.email = 'john@example.com'
user.save() # 自动提交
禁用自动提交
# settings.py
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.mysql',
'NAME': 'mydb',
'OPTIONS': {
'autocommit': False, # 禁用自动提交
}
}
}
9.3 事务装饰器与上下文管理器¶
@transaction.atomic 装饰器
from django.db import transaction
@transaction.atomic
def transfer_money(from_user_id, to_user_id, amount):
"""转账操作"""
from_user = User.objects.select_for_update().get(id=from_user_id)
to_user = User.objects.select_for_update().get(id=to_user_id)
if from_user.balance < amount:
raise ValueError('余额不足')
from_user.balance -= amount
to_user.balance += amount
from_user.save()
to_user.save()
# 记录交易日志
Transaction.objects.create(
from_user=from_user,
to_user=to_user,
amount=amount
)
# 如果这里抛出异常,所有操作都会回滚
with transaction.atomic() 上下文管理器
from django.db import transaction
def create_order(user, items):
"""创建订单"""
with transaction.atomic():
# 创建订单
order = Order.objects.create(
user=user,
status='pending',
total_amount=sum(item['price'] * item['quantity'] for item in items)
)
# 创建订单项
for item in items:
product = Product.objects.select_for_update().get(id=item['product_id'])
if product.stock < item['quantity']:
raise ValueError(f'商品 {product.name} 库存不足')
product.stock -= item['quantity']
product.save()
OrderItem.objects.create(
order=order,
product=product,
quantity=item['quantity'],
price=item['price']
)
# 扣减用户余额
if user.balance < order.total_amount:
raise ValueError('余额不足')
user.balance -= order.total_amount
user.save()
# 事务外操作:发送通知(不影响事务)
send_order_notification.delay(order.id)
return order
嵌套事务
from django.db import transaction
@transaction.atomic
def outer_function():
"""外层事务"""
User.objects.create(username='outer')
try:
with transaction.atomic(): # 保存点(savepoint)
"""内层事务"""
User.objects.create(username='inner')
raise ValueError('内层出错')
except ValueError:
pass # 内层回滚,外层继续
# outer 用户仍然存在
User.objects.create(username='outer2')
# 如果这里出错,outer 和 outer2 都会回滚
事务保存点(Savepoint)
from django.db import transaction
def complex_operation():
with transaction.atomic():
# 保存点1
sid1 = transaction.savepoint()
User.objects.create(username='user1')
# 保存点2
sid2 = transaction.savepoint()
User.objects.create(username='user2')
# 回滚到保存点2
transaction.savepoint_rollback(sid2)
# user2 被回滚,user1 仍然存在
# 提交保存点1
transaction.savepoint_commit(sid1)
9.4 行锁与乐观锁¶
select_for_update() 行锁
from django.db import transaction
@transaction.atomic
def deduct_stock(product_id, quantity):
"""扣减库存(悲观锁)"""
# 获取行级排他锁(X锁)
product = Product.objects.select_for_update().get(id=product_id)
if product.stock < quantity:
raise ValueError('库存不足')
product.stock -= quantity
product.save()
return product
# select_for_update 选项
Product.objects.select_for_update(nowait=True) # 不等待,立即报错
Product.objects.select_for_update(skip_locked=True) # 跳过已锁定的行
Product.objects.select_for_update(of=('self',)) # 指定锁定的表
乐观锁实现
from django.db import models, transaction
from django.db.models import F
class Product(models.Model):
name = models.CharField(max_length=100)
stock = models.PositiveIntegerField(default=0)
version = models.PositiveIntegerField(default=0) # 版本号
class Meta:
verbose_name = '商品'
@transaction.atomic
def deduct_stock_optimistic(product_id, quantity):
"""扣减库存(乐观锁)"""
from django.db.models import F
# 获取当前版本
product = Product.objects.get(id=product_id)
current_version = product.version
# 更新时检查版本
updated = Product.objects.filter(
id=product_id,
version=current_version,
stock__gte=quantity
).update(
stock=F('stock') - quantity,
version=F('version') + 1
)
if updated == 0:
raise ValueError('库存不足或并发冲突')
return True
# 带重试的乐观锁
import time
def deduct_stock_with_retry(product_id, quantity, max_retries=3):
"""带重试的乐观锁"""
for attempt in range(max_retries):
try:
return deduct_stock_optimistic(product_id, quantity)
except ValueError as e:
if attempt < max_retries - 1:
time.sleep(0.1 * (attempt + 1)) # 指数退避
continue
raise
9.5 事务隔离级别¶
隔离级别对比
隔离级别 |
脏读 |
不可重复读 |
幻读 |
说明 |
|---|---|---|---|---|
READ UNCOMMITTED |
✓ |
✓ |
✓ |
最低级别,基本不使用 |
READ COMMITTED |
✗ |
✓ |
✓ |
大多数数据库默认级别 |
REPEATABLE READ |
✗ |
✗ |
✓ |
MySQL 默认级别 |
SERIALIZABLE |
✗ |
✗ |
✗ |
最高级别,性能最差 |
设置隔离级别
# settings.py
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.mysql',
'NAME': 'mydb',
'OPTIONS': {
'isolation_level': 'read committed', # 设置隔离级别
}
}
}
# PostgreSQL
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.postgresql',
'NAME': 'mydb',
'OPTIONS': {
'isolation_level': 'READ COMMITTED',
}
}
}
事务隔离问题示例
# 脏读(Dirty Read)
# 事务A读取了事务B未提交的数据
# 事务B回滚后,事务A读取的数据就是脏数据
# 不可重复读(Non-repeatable Read)
# 事务A多次读取同一数据,期间事务B修改并提交了该数据
# 事务A两次读取结果不一致
# 幻读(Phantom Read)
# 事务A查询某个范围的数据
# 事务B插入了一条符合该范围的新数据并提交
# 事务A再次查询,发现多了一条数据(幻行)
9.6 事务后回调¶
on_commit 回调
from django.db import transaction
def send_welcome_email(user_id):
"""发送欢迎邮件"""
user = User.objects.get(id=user_id)
# 发送邮件逻辑...
def register_user(data):
"""注册用户"""
with transaction.atomic():
user = User.objects.create(
username=data['username'],
email=data['email']
)
# 事务提交后才执行
transaction.on_commit(lambda: send_welcome_email(user.id))
# 如果这里抛出异常,回调不会执行
return user
实际应用场景
from django.db import transaction
import django_rq
def create_order_with_tasks(user, items):
"""创建订单并触发后续任务"""
with transaction.atomic():
order = Order.objects.create(
user=user,
status='pending',
total_amount=calculate_total(items)
)
# 事务内操作
for item in items:
OrderItem.objects.create(order=order, **item)
# 事务提交后执行的任务
transaction.on_commit(lambda: django_rq.enqueue(
send_order_confirmation_email, order.id
))
transaction.on_commit(lambda: django_rq.enqueue(
notify_warehouse, order.id
))
transaction.on_commit(lambda: django_rq.enqueue(
update_inventory, order.id
))
return order
事务最佳实践
# 1. 保持事务简短
@transaction.atomic
def good_practice():
# 只将数据库操作放在事务内
order = create_order()
process_payment(order)
# 事务结束
# 事务外操作
send_email(order.id) # 异步发送
cache.clear() # 清理缓存
# 2. 避免在事务中做耗时操作
@transaction.atomic
def bad_practice():
order = create_order()
# 错误:在事务中调用外部 API
result = call_external_api() # 可能导致事务长时间持有锁
update_order(order, result)
# 3. 正确处理异常
@transaction.atomic
def handle_exception():
try:
risky_operation()
except SomeException:
# 异常会触发回滚
# 如果需要部分提交,使用保存点
with transaction.atomic():
fallback_operation()
# 4. 使用 select_for_update 防止竞态条件
@transaction.atomic
def prevent_race_condition():
# 先查询并锁定
account = Account.objects.select_for_update().get(id=1)
# 基于最新数据进行操作
account.balance += 100
account.save()
# 5. 批量操作优化
@transaction.atomic
def bulk_operations():
# 批量创建
User.objects.bulk_create(users, batch_size=100, ignore_conflicts=True) # ignore_conflicts 忽略冲突
# 批量更新
User.objects.bulk_update(users, ['status'])
十、认证系统¶
10.1 自定义用户模型¶
# apps/users/models.py
from django.contrib.auth.models import AbstractUser
class User(AbstractUser):
phone = models.CharField(max_length=20, unique=True, null=True, blank=True)
avatar = models.ImageField(upload_to='avatars/', blank=True)
# config/settings/base.py(项目开始就设置,后期无法修改)
AUTH_USER_MODEL = 'users.User'
# 引用用户模型(不要硬编码)
from django.conf import settings
from django.contrib.auth import get_user_model
User = get_user_model()
# 或在模型外键中
user = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE)
10.2 权限与分组¶
# 检查权限
user.has_perm('users.change_user') # app_label.action_model
user.has_perms(['users.add_user', 'users.change_user'])
user.has_module_perms('users')
# 视图权限控制
@permission_required('users.delete_user', raise_exception=True)
def delete_user(request, pk):
...
# 自定义权限
class Order(models.Model):
class Meta:
permissions = [
('can_refund', '可以退款'),
('can_export', '可以导出订单'),
]
# 分配权限
from django.contrib.auth.models import Permission, Group
# 创建分组
admin_group = Group.objects.create(name='管理员')
perm = Permission.objects.get(codename='can_refund')
admin_group.permissions.add(perm)
# 用户加入分组
user.groups.add(admin_group)
user.user_permissions.add(perm)
10.3 用户密码处理¶
10.3.1 加密方式¶
PBKDF2(Password-Based Key Derivation Function 2)是一种密钥派生方式:在密码和盐(salt)上反复应用一个伪随机函数(PRF),把「短密码」变成固定长度的派生串。常见实现里 PRF 选 HMAC-SHA256 或 HMAC-SHA1,于是常说 PBKDF2-HMAC-SHA256 / PBKDF2-HMAC-SHA1。迭代次数很大,目的是拖慢离线暴力破解和彩虹表。
Django 默认写的是哪一种?
类:
django.contrib.auth.hashers.PBKDF2PasswordHasher内部标识:
algorithm = "pbkdf2_sha256",即用 PBKDF2 + HMAC-SHA256(底层走 Pythonhashlib.pbkdf2_hmac,摘要算法为 SHA-256)。兼容老数据时,列表里还会保留
PBKDF2SHA1PasswordHasher(pbkdf2_sha1),只用于校验旧密码,必要时在登录成功后按PASSWORD_HASHERS顺序重哈希升级到默认算法。
存进 user.password 的字符串长什么样?
Django 把「算法、迭代次数、盐、派生出的摘要」编码成一行,用 $ 分段,例如(字段为示意):
pbkdf2_sha256$<迭代次数>$<盐 Base64>$<摘要 Base64>
校验时:读出这一行 → 用同一算法、同一迭代、同一盐,对用户输入的明文再算一遍 PBKDF2 → 与存库的摘要做常量时间比较(防时序侧信道)。
迭代次数:写在密文里,每条记录自带;Django 会在新版本里提高默认迭代次数(PBKDF2PasswordHasher.iterations),新设的密码用新默认值,旧密码仍按库里保存的迭代次数验证,登录成功后可升级为当前默认。
PASSWORD_HASHERS 在干什么?
# 第一项:新密码一律用它生成(默认即 PBKDF2 + SHA256)
PASSWORD_HASHERS = [
'django.contrib.auth.hashers.PBKDF2PasswordHasher',
'django.contrib.auth.hashers.PBKDF2SHA1PasswordHasher', # 仅示例:验旧密、可升级
# 还可接 Argon2、BCrypt 等,同样靠「识别密文前缀」选对 hasher
]
业务里怎么用(不必自己调 PBKDF2):
# 写密码:始终走 Django 封装,会自动用 PASSWORD_HASHERS[0](默认 pbkdf2_sha256)
user.set_password('原始密码')
user.save()
# 校验
user.check_password('用户输入')
# 需要「只生成一段密文字符串」时(脚本、测试数据)
from django.contrib.auth.hashers import make_password, identify_hasher
encoded = make_password('原始密码')
# identify_hasher(encoded) 可反查对应 Hasher 类
和「密码强度校验」的关系: AUTH_PASSWORD_VALIDATORS 管的是明文是否太弱;PBKDF2 管的是存库形态与暴力破解成本。两件事独立,注册/改密时通常会同时生效。
注意: 不要 user.password = '明文';应 set_password 或 User.objects.create_user(..., password=...)。
10.3.2 默认密码验证¶
Django 通过 settings.py 中的 AUTH_PASSWORD_VALIDATORS 配置密码验证器,默认包含 4 个验证器:
# settings.py
AUTH_PASSWORD_VALIDATORS = [
{
# 不能与用户属性过于相似(用户名、邮箱、姓名)
# max_similarity 默认 0.7,越小越严格
'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator',
},
{
# 最小长度,默认 8 位
'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator',
},
{
# 不能是常见密码(内置 20000 个黑名单,如 password123、qwerty)
'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator',
},
{
# 不能全是数字(如 12345678 不通过)
'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator',
},
]
10.3.3 DIY密码验证¶
# yourapp/validators.py
import re
from django.core.exceptions import ValidationError
from django.utils.translation import gettext as _
class PasswordComplexityValidator:
"""
支持配置:
- min_length: 最小长度
- min_categories: 至少满足几种字符类型(大写/小写/数字/特殊字符)
"""
def __init__(self, min_length=12, min_categories=3):
self.min_length = min_length
self.min_categories = min_categories
def validate(self, password, user=None):
errors = []
if len(password) < self.min_length:
errors.append(
_('密码长度至少 %(min)d 位') % {'min': self.min_length}
)
categories = [
bool(re.search(r'[A-Z]', password)), # 大写字母
bool(re.search(r'[a-z]', password)), # 小写字母
bool(re.search(r'\d', password)), # 数字
bool(re.search(r'[!@#$%^&*()_+\-=\[\]{}|<>?,.:]', password)), # 特殊字符
]
if sum(categories) < self.min_categories:
errors.append(
_('密码须包含大写字母、小写字母、数字、特殊字符中的任意 %(min)d 种')
% {'min': self.min_categories}
)
if errors:
raise ValidationError(errors)
def get_help_text(self):
return _(
'密码长度至少 %(min_length)d 位,且须包含大写字母、小写字母、数字、特殊字符中的任意 %(min_categories)d 种'
) % {'min_length': self.min_length, 'min_categories': self.min_categories}
AUTH_PASSWORD_VALIDATORS = [
{
'NAME': 'yourapp.validators.PasswordComplexityValidator',
'OPTIONS': {
'min_length': 12, # 最少 12 位
'min_categories': 3, # 至少满足 3 种字符类型
}
},
]
至此就实现了django的默认密码配置规则的修改,但是Django默认只适用于传统的前后端不分离的项目,前后端分离的配置,需要搭配DRF来实现。
10.4 接口认证¶
前后端分离、移动端、第三方调用等场景下,HTTP API 通常不用浏览器 Cookie + Session 来维持登录态,而采用无状态或可横向扩展的凭证,常见做法如下。
1. Token(DRF 内置)
安装并启用 rest_framework.authtoken,用户登录成功后签发长期有效的静态 Token,客户端每次请求放在 Header 里。
# settings.py
INSTALLED_APPS += ['rest_framework.authtoken']
REST_FRAMEWORK = {
'DEFAULT_AUTHENTICATION_CLASSES': [
'rest_framework.authentication.TokenAuthentication',
],
}
# 为用户创建 Token(例如在登录视图里)
from rest_framework.authtoken.models import Token
token, _ = Token.objects.get_or_create(user=user)
# 客户端请求头:Authorization: Token <key>
适合服务简单、可接受服务端存一条 Token 记录的场景;要吊销需删库记录或换 key。
2. JWT(推荐与 SPA / 移动端配合)
服务端用私钥签发带过期时间的 JWT,客户端存 access(短效)+ 可选 refresh(长效换票);服务端校验签名与 exp,一般不在 Session 里记登录态。
常用包:djangorestframework-simplejwt。请求头形式:Authorization: Bearer <access_token>。
# settings.py(概念示例)
REST_FRAMEWORK = {
'DEFAULT_AUTHENTICATION_CLASSES': (
'rest_framework_simplejwt.authentication.JWTAuthentication',
),
}
# urls:挂载 TokenObtainPairView、TokenRefreshView 等
3. 自定义 API Key / HMAC
面向内部服务或 Webhook,可用固定 Key、或请求签名(时间戳 + HMAC);实现上多为自定义 Authentication 类,校验 Header 或 Query 中的凭证。
4. 与 Session 的取舍
Session 认证:依赖 Cookie、需处理 CSRF,同源页面型后台很方便;跨域 SPA 要额外配置 CORS、Cookie 属性,复杂度常高于 Token/JWT。
纯 API:在
REST_FRAMEWORK中可只启用TokenAuthentication/JWTAuthentication,不要把SessionAuthentication设为默认,避免误用 Cookie 态;若仍同站混用管理后台,可按视图粒度拆分authentication_classes。
安全要点(简要): 生产环境用 HTTPS;Token/JWT 勿写进 URL;access 短过期、refresh 妥善保管;敏感操作可叠加二次校验或权限对象级控制(见 10.2)。
十一、权限系统¶
本章节内容与第十章认证系统的权限部分重合,详见第十章第 2 节"权限与分组"。
将包含:权限模型设计、Django 权限框架、用户权限与组权限、自定义权限、对象级权限、权限装饰器、RBAC 权限模型实现等内容。
十二、中间件¶
12.1 基本介绍¶
django 请求先到中间件再到视图
而 django 的中间件默认是对所有的视图生效的,所以中间件要相对个别url做白名单,需要自定义中间件,也可以使用装饰器的形式直接在视图函数上使用中间件装饰器,比如csrf_exempt
请求阶段:从上到下
响应阶段:从下到上
12.1 内置中间件¶
MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware', # 安全相关
'django.contrib.sessions.middleware.SessionMiddleware', # Session
'django.middleware.common.CommonMiddleware', # URL 规范化
'django.middleware.csrf.CsrfViewMiddleware', # CSRF 保护
'django.contrib.auth.middleware.AuthenticationMiddleware', # 用户认证
'django.contrib.messages.middleware.MessageMiddleware', # 消息框架
'django.middleware.clickjacking.XFrameOptionsMiddleware', # 点击劫持防护
'corsheaders.middleware.CorsMiddleware', # CORS(django-cors-headers)
]
12.2 自定义中间件¶
12.2.1 通用中间件¶
# apps/common/middleware.py
import time
import logging
logger = logging.getLogger(__name__)
class RequestLogMiddleware:
def __init__(self, get_response):
self.get_response = get_response # 初始化时执行一次
# 不需要走中间件的路径
self.exempt_urls = ['/api/health/', '/webhook/'] # 其实也可以定义前缀
def __call__(self, request):
# 如果需要url的白名单
if request.path in self.exempt_urls:
return self.get_response(request) # 直接跳过逻辑
# 请求前
start = time.time()
response = self.get_response(request)
# 响应后
duration = time.time() - start
logger.info(
f'{request.method} {request.path} '
f'{response.status_code} {duration:.3f}s'
)
response['X-Response-Time'] = f'{duration:.3f}s'
return response
def process_view(self, request, view_func, view_args, view_kwargs):
"""视图调用前(可选)"""
pass
def process_exception(self, request, exception):
"""视图抛出异常时(可选)"""
logger.error(f'Exception: {exception}', exc_info=True)
return None # 返回 None 继续传播,返回 Response 则拦截
def process_template_response(self, request, response):
"""返回 TemplateResponse 时(可选)"""
return response
# 注册
MIDDLEWARE = [
...
'apps.common.middleware.RequestLogMiddleware',
]
12.2.2 装饰器中间件¶
# 自定义装饰器中间件
def middleware_exempt(view_func):
view_func.middleware_exempt = True
return view_func
# 中间件中检查标记
class MyMiddleware:
def __call__(self, request):
response = self.get_response(request)
return response
def process_view(self, request, view_func, view_args, view_kwargs):
if getattr(view_func, 'middleware_exempt', False):
return None # 跳过,交给视图处理
# 执行中间件逻辑
...
十三、信号机制¶
13.1 关于信号¶
基于“发布-订阅(观察者模式)”的解耦机制,用于在某些事件发生时自动通知并执行额外逻辑。 信号核心价值是在不修改原有代码的情况下扩展行为,实现模块间解耦。
13.2 内置信号案例¶
# signals.py
# 模型生命周期扩展(最常见)
from django.db.models.signals import pre_save, post_save, pre_delete, post_delete, m2m_changed
# 用户登录
from django.contrib.auth.signals import user_logged_in, user_logged_out, user_login_failed
# 请求相关
from django.core.signals import request_started, request_finished
from django.dispatch import receiver
# 用户注册后自动创建 Profile
@receiver(post_save, sender=User)
def create_user_profile(sender, instance, created, **kwargs):
if created:
Profile.objects.create(user=instance)
# 登录记录
@receiver(user_logged_in)
def log_user_login(sender, request, user, **kwargs):
logger.info(f'User {user.username} logged in from {request.META.get("REMOTE_ADDR")}')
# apps.py
class UsersConfig(AppConfig):
name = 'apps.users'
def ready(self):
import apps.users.signals # 导入信号模块触发注册
13.3 自定义信号案例¶
from django.dispatch import Signal, receiver
# 定义自定义信号
order_paid = Signal()
# 发送信号
def pay_order(order_id):
order = Order.objects.get(id=order_id)
order.status = 'paid'
order.save()
order_paid.send(sender=Order, order=order) # 发送信号
# 订阅信号
@receiver(order_paid, sender=Order)
def on_order_paid(sender, order, **kwargs):
send_invoice_email(order)
update_inventory(order)
十四、缓存系统¶
14.1 缓存配置¶
# pip install django-redis
CACHES = {
'default': {
'BACKEND': 'django_redis.cache.RedisCache',
'LOCATION': 'redis://127.0.0.1:6379/1',
'OPTIONS': {
'CLIENT_CLASS': 'django_redis.client.DefaultClient',
'PASSWORD': 'yourpassword',
'SOCKET_CONNECT_TIMEOUT': 5,
'SOCKET_TIMEOUT': 5,
},
'KEY_PREFIX': 'myapp',
'TIMEOUT': 300, # 默认超时 5 分钟
}
}
# Session 也用 Redis
SESSION_ENGINE = 'django.contrib.sessions.backends.cache'
SESSION_CACHE_ALIAS = 'default'
14.2 视图缓存¶
from django.views.decorators.cache import cache_page, never_cache
from django.utils.decorators import method_decorator
# FBV 缓存(60秒)
@cache_page(60)
def hot_articles(request):
...
# CBV 缓存
@method_decorator(cache_page(60), name='dispatch')
class HotArticlesView(View):
...
# 不缓存
@never_cache
def user_profile(request):
...
# 模板片段缓存
{% load cache %}
{% cache 300 sidebar request.user.id %}
... 侧边栏内容 ...
{% endcache %}
14.3 低级缓存 API¶
from django.core.cache import cache
# 基本操作
cache.set('key', value, timeout=300)
cache.get('key', default=None)
cache.delete('key')
cache.get_or_set('key', default_value, timeout=300)
# 批量操作
cache.set_many({'k1': v1, 'k2': v2}, timeout=300)
cache.get_many(['k1', 'k2'])
cache.delete_many(['k1', 'k2'])
# 计数器
cache.set('counter', 0)
cache.incr('counter')
cache.decr('counter')
# 通配删除(django-redis 支持)
cache.delete_pattern('user:*')
# 缓存穿透防护(缓存空值)
def get_user(user_id):
key = f'user:{user_id}'
user = cache.get(key)
if user is None:
user = User.objects.filter(id=user_id).first()
cache.set(key, user or '', timeout=300) # 空值也缓存
return user if user != '' else None
十五、异步支持¶
15.1 异步视图¶
# Django 3.1+ 支持异步视图
import asyncio
from django.http import JsonResponse
async def async_view(request):
await asyncio.sleep(1) # 非阻塞等待
return JsonResponse({'msg': 'async response'})
# 异步 CBV
from django.views import View
class AsyncView(View):
async def get(self, request):
data = await fetch_external_data()
return JsonResponse({'data': data})
十六、Django-RQ 任务队列¶
什么是 Django-RQ?
Django-RQ 是一个简单的 Django 任务队列,基于 Python RQ(Redis Queue)。相比 Celery,它更轻量级、更易于使用,适合中小型项目。
Django-RQ vs Celery
特性 |
Django-RQ |
Celery |
|---|---|---|
复杂度 |
简单 |
复杂 |
学习曲线 |
平缓 |
陡峭 |
功能 |
基础功能完善 |
功能丰富 |
性能 |
中等 |
高 |
适用场景 |
中小型项目 |
大型分布式系统 |
安装与配置
# 安装 django-rq
pip install django-rq
# 可选:安装 rq-scheduler 支持定时任务
pip install rq-scheduler
# config/settings/base.py
INSTALLED_APPS = [
...
'django_rq',
]
# Django-RQ 配置
RQ_QUEUES = {
'default': {
'HOST': 'localhost',
'PORT': 6379,
'DB': 0,
'PASSWORD': '',
'DEFAULT_TIMEOUT': 360, # 默认超时时间(秒)
},
'high': { # 高优先级队列
'HOST': 'localhost',
'PORT': 6379,
'DB': 0,
'DEFAULT_TIMEOUT': 360,
},
'low': { # 低优先级队列
'HOST': 'localhost',
'PORT': 6379,
'DB': 0,
'DEFAULT_TIMEOUT': 600,
},
}
# 可选:RQ 显示配置
RQ_SHOW_ADMIN_LINK = True # Admin 中显示 RQ 链接
# config/urls.py
from django.urls import path, include
urlpatterns = [
...
path('django-rq/', include('django_rq.urls')), # RQ 监控面板
]
定义任务
# apps/users/tasks.py
import django_rq
from django.core.mail import send_mail
from .models import User
def send_welcome_email(user_id):
"""发送欢迎邮件(普通函数)"""
try:
user = User.objects.get(id=user_id)
send_mail(
'欢迎注册',
f'欢迎您,{user.username}!',
'noreply@example.com',
[user.email],
fail_silently=False,
)
return f'Email sent to {user.email}'
except User.DoesNotExist:
return f'User {user_id} not found'
except Exception as e:
return f'Error: {str(e)}'
def generate_report(report_type):
"""生成报告"""
import time
time.sleep(5) # 模拟耗时操作
return f'Report {report_type} generated'
def process_large_file(file_path):
"""处理大文件"""
# 耗时操作
pass
任务调用
# apps/users/views.py
import django_rq
from django.http import JsonResponse
from django.db import transaction
from .tasks import send_welcome_email, generate_report
def register(request):
"""用户注册"""
# 创建用户
user = User.objects.create_user(
username=request.POST['username'],
email=request.POST['email'],
password=request.POST['password']
)
# 方式1:立即入队(默认队列)
django_rq.enqueue(send_welcome_email, user.id)
# 方式2:指定队列
django_rq.enqueue(send_welcome_email, user.id, queue='high')
# 方式3:延迟执行(10分钟后)
from datetime import timedelta
django_rq.enqueue(
send_welcome_email,
user.id,
queue='default',
timeout=300, # 任务超时时间
result_ttl=3600, # 结果保留时间
at_front=False, # 是否插入队列前端
)
# 方式4:事务提交后执行
transaction.on_commit(lambda: django_rq.enqueue(send_welcome_email, user.id))
return JsonResponse({'code': 0, 'msg': 'ok'})
def generate_monthly_report_view(request):
"""生成月度报告"""
# 异步执行
job = django_rq.enqueue(generate_report, 'monthly', queue='low')
# 获取任务ID
job_id = job.id
return JsonResponse({
'code': 0,
'job_id': job_id,
'msg': '报告生成任务已提交'
})
def check_job_status(request, job_id):
"""检查任务状态"""
from django_rq import get_queue
from rq.job import Job
from redis import Redis
redis_conn = Redis(host='localhost', port=6379, db=0)
try:
job = Job.fetch(job_id, connection=redis_conn)
return JsonResponse({
'code': 0,
'data': {
'status': job.get_status(), # queued/started/finished/failed
'result': job.result if job.is_finished else None,
'error': str(job.exc_info) if job.is_failed else None,
}
})
except Exception as e:
return JsonResponse({'code': 1, 'msg': str(e)})
定时任务(使用 rq-scheduler)
# 安装
pip install rq-scheduler
# config/settings/base.py
INSTALLED_APPS = [
...
'django_rq',
]
RQ_QUEUES = {
'default': {
...
},
}
# RQ Scheduler 配置
RQ_SCHEDULER = {
'HOST': 'localhost',
'PORT': 6379,
'DB': 0,
}
# apps/reports/tasks.py
import django_rq
from datetime import datetime, timedelta
def schedule_daily_report():
"""调度每日报告"""
scheduler = django_rq.get_scheduler('default')
# 方式1:在特定时间执行(一次性)
scheduler.enqueue_at(
datetime(2024, 12, 31, 23, 59), # 执行时间
generate_report, # 任务函数
'daily' # 参数
)
# 方式2:延迟执行
scheduler.enqueue_in(
timedelta(hours=24), # 24小时后
generate_report,
'daily'
)
# 方式3:周期性执行
scheduler.schedule(
scheduled_time=datetime.utcnow(),
func=generate_report,
args=['daily'],
interval=86400, # 每天执行(秒)
repeat=None, # 无限重复
)
# 方式4:Cron 表达式
from rq_scheduler import Scheduler
from redis import Redis
redis_conn = Redis()
scheduler = Scheduler(connection=redis_conn)
scheduler.cron(
"0 0 * * *", # 每天0点
func=generate_report,
args=['daily'],
queue_name='default'
)
启动 Worker
# 启动 Worker(处理任务)
python manage.py rqworker default
# 启动多个队列的 Worker
python manage.py rqworker default high low
# 启动多个 Worker(并发处理)
python manage.py rqworker default &
python manage.py rqworker default &
# 启动 Scheduler(定时任务)
rqscheduler --host localhost --port 6379 --db 0
监控与管理
# 在 Django shell 中查看队列状态
python manage.py shell
from django_rq import get_queue, get_connection
# 获取队列
queue = get_queue('default')
# 队列统计
print(f'任务数量: {len(queue)}')
print(f'等待任务: {queue.count}')
# 获取所有任务
jobs = queue.jobs
for job in jobs:
print(f'Job ID: {job.id}, Status: {job.get_status()}')
# 清空队列
queue.empty()
# 获取失败的任务
from rq.registry import FailedJobRegistry
from redis import Redis
redis_conn = Redis()
registry = FailedJobRegistry(queue=queue)
for job_id in registry.get_job_ids():
print(job_id)
访问监控面板
访问 http://localhost:8000/django-rq/ 查看:
队列状态
任务列表
Worker 状态
失败任务
最佳实践
任务设计原则
任务函数应该是纯函数,避免副作用
任务应该是幂等的(可重复执行)
避免在任务中使用全局变量
队列划分
high: 重要且紧急的任务(发送验证码)default: 普通任务(发送通知邮件)low: 不紧急的任务(生成报告、数据分析)
错误处理
def robust_task(data): try: # 任务逻辑 process_data(data) except Exception as e: # 记录错误 logger.error(f'Task failed: {str(e)}') # 可以重新入队或发送告警 raise
事务与任务
with transaction.atomic(): order = Order.objects.create(...) # 确保事务提交后再执行任务 transaction.on_commit( lambda: django_rq.enqueue(process_order, order.id) )
十七、文件上传与存储¶
17.1 文件上传处理¶
基础文件上传
# views.py
from django.http import JsonResponse
from django.views import View
from django.core.files.storage import default_storage
from django.core.files.base import ContentFile
class FileUploadView(View):
def post(self, request):
# 获取上传的文件
uploaded_file = request.FILES.get('file')
if not uploaded_file:
return JsonResponse({'code': 400, 'msg': '未找到文件'})
# 文件信息
file_name = uploaded_file.name
file_size = uploaded_file.size
file_type = uploaded_file.content_type
# 保存文件
file_path = default_storage.save(
f'uploads/{file_name}',
ContentFile(uploaded_file.read())
)
return JsonResponse({
'code': 0,
'data': {
'file_name': file_name,
'file_path': file_path,
'file_size': file_size,
'file_url': default_storage.url(file_path)
}
})
多文件上传
class MultiFileUploadView(View):
def post(self, request):
files = request.FILES.getlist('files')
uploaded_files = []
for file in files:
file_path = default_storage.save(
f'uploads/{file.name}',
ContentFile(file.read())
)
uploaded_files.append({
'name': file.name,
'path': file_path,
'url': default_storage.url(file_path)
})
return JsonResponse({
'code': 0,
'data': uploaded_files
})
文件验证
import os
from django.core.exceptions import ValidationError
class FileValidator:
"""文件验证器"""
# 允许的文件扩展名
ALLOWED_EXTENSIONS = {
'image': ['.jpg', '.jpeg', '.png', '.gif', '.webp'],
'document': ['.pdf', '.doc', '.docx', '.txt'],
'video': ['.mp4', '.avi', '.mov'],
}
# 最大文件大小 (MB)
MAX_FILE_SIZE = 10 * 1024 * 1024 # 10MB
@classmethod
def validate(cls, file, file_type=None):
# 验证文件大小
if file.size > cls.MAX_FILE_SIZE:
raise ValidationError(f'文件大小不能超过 {cls.MAX_FILE_SIZE // 1024 // 1024}MB')
# 验证文件扩展名
ext = os.path.splitext(file.name)[1].lower()
if file_type:
allowed = cls.ALLOWED_EXTENSIONS.get(file_type, [])
if ext not in allowed:
raise ValidationError(f'不支持的文件类型,允许: {", ".join(allowed)}')
else:
all_allowed = sum(cls.ALLOWED_EXTENSIONS.values(), [])
if ext not in all_allowed:
raise ValidationError('不支持的文件类型')
return True
# 使用验证器
class ValidatedUploadView(View):
def post(self, request):
file = request.FILES.get('file')
file_type = request.POST.get('type', 'image')
try:
FileValidator.validate(file, file_type)
except ValidationError as e:
return JsonResponse({'code': 400, 'msg': str(e)})
# 保存文件...
17.2 自定义存储后端¶
自定义文件命名
import os
import uuid
from datetime import datetime
def generate_filename(instance, filename):
"""生成唯一文件名"""
# 获取文件扩展名
ext = os.path.splitext(filename)[1]
# 生成唯一文件名
unique_name = f"{uuid.uuid4().hex}{ext}"
# 按日期组织
today = datetime.now()
return f"uploads/{today.strftime('%Y/%m/%d')}/{unique_name}"
# 模型中使用
class Attachment(models.Model):
file = models.FileField(upload_to=generate_filename)
original_name = models.CharField(max_length=255)
uploaded_at = models.DateTimeField(auto_now_add=True)
自定义存储类
from django.core.files.storage import Storage
from django.core.files.base import File
from django.conf import settings
class CustomStorage(Storage):
"""自定义存储后端"""
def __init__(self, base_url=None):
self.base_url = base_url or settings.MEDIA_URL
self.base_path = settings.MEDIA_ROOT
def _open(self, name, mode='rb'):
"""打开文件"""
path = os.path.join(self.base_path, name)
return File(open(path, mode))
def _save(self, name, content):
"""保存文件"""
# 自定义保存逻辑
path = os.path.join(self.base_path, name)
# 确保目录存在
os.makedirs(os.path.dirname(path), exist_ok=True)
# 写入文件
with open(path, 'wb') as f:
for chunk in content.chunks():
f.write(chunk)
return name
def delete(self, name):
"""删除文件"""
path = os.path.join(self.base_path, name)
if os.path.exists(path):
os.remove(path)
def exists(self, name):
"""检查文件是否存在"""
path = os.path.join(self.base_path, name)
return os.path.exists(path)
def url(self, name):
"""获取文件 URL"""
return f"{self.base_url}{name}"
def size(self, name):
"""获取文件大小"""
path = os.path.join(self.base_path, name)
return os.path.getsize(path)
# 使用自定义存储
class Document(models.Model):
file = models.FileField(
upload_to='documents/',
storage=CustomStorage()
)
17.3 对象存储集成(S3/OSS)¶
阿里云 OSS 集成
pip install django-oss-storage
# settings.py
DEFAULT_FILE_STORAGE = 'django_oss_storage.backends.OssMediaStorage'
OSS_ACCESS_KEY_ID = 'your-access-key-id'
OSS_ACCESS_KEY_SECRET = 'your-access-key-secret'
OSS_BUCKET_NAME = 'your-bucket-name'
OSS_ENDPOINT = 'oss-cn-hangzhou.aliyuncs.com'
# 可选配置
OSS_EXPIRE_TIME = 3600 # URL 过期时间
AWS S3 集成
pip install django-storages[boto3]
# settings.py
DEFAULT_FILE_STORAGE = 'storages.backends.s3boto3.S3Boto3Storage'
AWS_ACCESS_KEY_ID = 'your-access-key-id'
AWS_SECRET_ACCESS_KEY = 'your-secret-access-key'
AWS_STORAGE_BUCKET_NAME = 'your-bucket-name'
AWS_S3_REGION_NAME = 'ap-northeast-1'
AWS_S3_FILE_OVERWRITE = False # 不覆盖同名文件
AWS_DEFAULT_ACL = 'private' # 默认私有
AWS_QUERYSTRING_AUTH = True # 生成签名 URL
# 自定义域名
AWS_S3_CUSTOM_DOMAIN = 'cdn.example.com'
腾讯云 COS 集成
pip install django-qcloud-storage
# settings.py
DEFAULT_FILE_STORAGE = 'django_qcloud_storage.QcloudStorage'
QCLOUD_SECRET_ID = 'your-secret-id'
QCLOUD_SECRET_KEY = 'your-secret-key'
QCLOUD_BUCKET = 'your-bucket-name'
QCLOUD_REGION = 'ap-guangzhou'
17.4 图片处理¶
Pillow 基础处理
from PIL import Image, ImageOps
from io import BytesIO
import os
def process_image(image_file, max_size=(800, 800), quality=85):
"""处理图片"""
img = Image.open(image_file)
# 转换模式
if img.mode in ('RGBA', 'LA', 'P'):
img = img.convert('RGB')
# 调整大小
img.thumbnail(max_size, Image.LANCZOS)
# 保存
output = BytesIO()
img.save(output, format='JPEG', quality=quality, optimize=True)
output.seek(0)
return output
def create_thumbnail(image_file, size=(200, 200)):
"""创建缩略图"""
img = Image.open(image_file)
img = ImageOps.fit(img, size, Image.LANCZOS)
output = BytesIO()
img.save(output, format='JPEG', quality=85)
output.seek(0)
return output
图片上传视图
class ImageUploadView(View):
def post(self, request):
file = request.FILES.get('image')
if not file:
return JsonResponse({'code': 400, 'msg': '未找到图片'})
# 验证文件类型
if not file.content_type.startswith('image/'):
return JsonResponse({'code': 400, 'msg': '只能上传图片文件'})
# 处理原图
processed = process_image(file, max_size=(1920, 1080))
# 生成文件名
ext = os.path.splitext(file.name)[1]
filename = f"{uuid.uuid4().hex}{ext}"
# 保存原图
file_path = default_storage.save(
f'images/{filename}',
ContentFile(processed.read())
)
# 生成缩略图
file.seek(0)
thumbnail = create_thumbnail(file, size=(300, 300))
thumb_path = default_storage.save(
f'images/thumbnails/{filename}',
ContentFile(thumbnail.read())
)
return JsonResponse({
'code': 0,
'data': {
'original': default_storage.url(file_path),
'thumbnail': default_storage.url(thumb_path)
}
})
图片模型
class ImageModel(models.Model):
"""图片模型"""
title = models.CharField(max_length=100, blank=True)
image = models.ImageField(upload_to='images/%Y/%m/')
thumbnail = models.ImageField(upload_to='images/thumbnails/%Y/%m/', blank=True)
width = models.PositiveIntegerField(blank=True, null=True)
height = models.PositiveIntegerField(blank=True, null=True)
file_size = models.PositiveIntegerField(blank=True, null=True)
uploaded_at = models.DateTimeField(auto_now_add=True)
class Meta:
verbose_name = '图片'
verbose_name_plural = '图片'
def save(self, *args, **kwargs):
# 获取图片尺寸
if self.image:
img = Image.open(self.image)
self.width, self.height = img.size
self.file_size = self.image.size
super().save(*args, **kwargs)
# 自动生成缩略图
if not self.thumbnail:
self.generate_thumbnail()
def generate_thumbnail(self):
"""生成缩略图"""
if not self.image:
return
img = Image.open(self.image)
img = ImageOps.fit(img, (300, 300), Image.LANCZOS)
thumb_io = BytesIO()
img.save(thumb_io, format='JPEG', quality=85)
thumb_io.seek(0)
thumb_name = f'thumb_{os.path.basename(self.image.name)}'
self.thumbnail.save(
thumb_name,
ContentFile(thumb_io.read()),
save=False
)
self.save()
17.5 大文件分片上传¶
分片上传实现
import hashlib
from django.core.cache import cache
class ChunkedUploadView(View):
"""分片上传视图"""
def post(self, request):
"""上传分片"""
chunk = request.FILES.get('chunk')
chunk_index = int(request.POST.get('chunk_index'))
total_chunks = int(request.POST.get('total_chunks'))
file_hash = request.POST.get('file_hash')
file_name = request.POST.get('file_name')
# 保存分片
chunk_key = f"upload:{file_hash}:{chunk_index}"
cache.set(chunk_key, chunk.read(), timeout=3600)
# 检查是否所有分片都已上传
uploaded_chunks = cache.get(f"upload:{file_hash}:count", 0) + 1
cache.set(f"upload:{file_hash}:count", uploaded_chunks, timeout=3600)
if uploaded_chunks >= total_chunks:
# 合并分片
return self.merge_chunks(file_hash, file_name, total_chunks)
return JsonResponse({
'code': 0,
'data': {
'chunk_index': chunk_index,
'uploaded': uploaded_chunks,
'total': total_chunks
}
})
def merge_chunks(self, file_hash, file_name, total_chunks):
"""合并分片"""
# 创建临时目录
temp_dir = f"/tmp/uploads/{file_hash}"
os.makedirs(temp_dir, exist_ok=True)
# 合并文件
output_path = os.path.join(temp_dir, file_name)
with open(output_path, 'wb') as output:
for i in range(total_chunks):
chunk_key = f"upload:{file_hash}:{i}"
chunk_data = cache.get(chunk_key)
if chunk_data:
output.write(chunk_data)
cache.delete(chunk_key)
# 移动到最终位置
final_path = default_storage.save(
f'uploads/{file_name}',
open(output_path, 'rb')
)
# 清理
cache.delete(f"upload:{file_hash}:count")
os.remove(output_path)
os.rmdir(temp_dir)
return JsonResponse({
'code': 0,
'data': {
'file_name': file_name,
'file_url': default_storage.url(final_path),
'status': 'completed'
}
})
前端分片上传示例
// 前端分片上传
async function uploadLargeFile(file) {
const chunkSize = 1024 * 1024; // 1MB 每片
const totalChunks = Math.ceil(file.size / chunkSize);
const fileHash = await calculateFileHash(file);
for (let i = 0; i < totalChunks; i++) {
const start = i * chunkSize;
const end = Math.min(start + chunkSize, file.size);
const chunk = file.slice(start, end);
const formData = new FormData();
formData.append("chunk", chunk);
formData.append("chunk_index", i);
formData.append("total_chunks", totalChunks);
formData.append("file_hash", fileHash);
formData.append("file_name", file.name);
await fetch("/api/upload/chunk/", {
method: "POST",
body: formData,
});
}
}
十八、性能优化¶
18.1 数据库查询优化¶
N+1 问题及解决方案
# 问题:N+1 查询
users = User.objects.all()
for user in users:
print(user.profile.bio) # 每次循环都查询 profile 表
# 解决:使用 select_related(一对一、多对一)
users = User.objects.select_related('profile')
for user in users:
print(user.profile.bio) # 使用缓存数据
# 解决:使用 prefetch_related(多对多、一对多)
users = User.objects.prefetch_related('orders')
for user in users:
for order in user.orders.all():
print(order.id)
# 复杂预取
from django.db.models import Prefetch
users = User.objects.prefetch_related(
Prefetch(
'orders',
queryset=Order.objects.filter(status='paid'),
to_attr='paid_orders'
)
)
QuerySet 优化技巧
# 1. 只查询需要的字段
users = User.objects.only('id', 'username') # 仅查询指定字段
users = User.objects.defer('large_field') # 排除大字段
# 2. 使用 values() 减少内存占用
users = User.objects.values('id', 'username') # 返回字典而非模型实例
# 3. 使用 iterator() 处理大量数据
for user in User.objects.all().iterator():
process_user(user) # 逐行处理,不缓存
# 4. 使用 exists() 而不是 bool()
if User.objects.filter(username='john').exists(): # 高效
pass
# 5. 使用 count() 而不是 len()
count = User.objects.count() # 使用 SELECT COUNT(*)
# 6. 使用 first() 而不是 [0]
user = User.objects.filter(active=True).first() # 更优雅
# 7. 避免在循环中查询
# 错误
for user_id in user_ids:
user = User.objects.get(id=user_id) # N次查询
# 正确
users = User.objects.filter(id__in=user_ids) # 1次查询
user_map = {u.id: u for u in users}
for user_id in user_ids:
user = user_map.get(user_id)
18.3 索引优化¶
模型索引配置
class Order(models.Model):
order_no = models.CharField(max_length=50)
user = models.ForeignKey(User, on_delete=models.CASCADE)
status = models.CharField(max_length=20)
created_at = models.DateTimeField(auto_now_add=True)
amount = models.DecimalField(max_digits=10, decimal_places=2)
class Meta:
indexes = [
# 单列索引
models.Index(fields=['order_no'], name='idx_order_no'),
# 复合索引
models.Index(fields=['user', 'status'], name='idx_user_status'),
models.Index(fields=['status', 'created_at'], name='idx_status_created'),
# 部分索引(PostgreSQL)
models.Index(
fields=['user'],
name='idx_active_orders',
condition=models.Q(status='active')
),
]
索引最佳实践
# 1. 为频繁查询的字段添加索引
username = models.CharField(max_length=50, db_index=True)
# 2. 为外键自动创建索引(Django 默认)
user = models.ForeignKey(User, on_delete=models.CASCADE) # 自动索引
# 3. 联合索引字段顺序:等值查询字段在前,范围查询字段在后
# 好:WHERE status='active' AND created_at > '2024-01-01'
models.Index(fields=['status', 'created_at'])
# 4. 避免过多索引(影响写入性能)
# 5. 定期分析查询日志,优化索引
18.4 批量操作¶
批量创建
# 低效:逐条创建
for i in range(1000):
User.objects.create(username=f'user{i}')
# 高效:批量创建
users = [User(username=f'user{i}') for i in range(1000)]
User.objects.bulk_create(users, batch_size=100)
# 批量创建并忽略冲突(PostgreSQL)
User.objects.bulk_create(users, ignore_conflicts=True)
批量更新
# 低效:逐条更新
for user in users:
user.status = 'active'
user.save()
# 高效:批量更新
User.objects.filter(id__in=[u.id for u in users]).update(status='active')
# 批量更新不同值
from django.db.models import Case, When
User.objects.filter(id__in=user_ids).update(
status=Case(
When(id__in=vip_ids, then=Value('vip')),
default=Value('active')
)
)
# bulk_update(Django 2.2+)
for user in users:
user.status = 'active'
User.objects.bulk_update(users, ['status'], batch_size=100)
批量删除
# 批量删除
User.objects.filter(status='inactive').delete()
# 软删除(使用 update)
User.objects.filter(status='inactive').update(is_deleted=True)
18.5 数据库连接池¶
使用 PgBouncer(PostgreSQL)
; pgbouncer.ini
[databases]
mydb = host=localhost port=5432 dbname=mydb
[pgbouncer]
listen_port = 6432
listen_addr = 127.0.0.1
auth_type = md5
auth_file = /etc/pgbouncer/userlist.txt
; 连接池配置
pool_mode = transaction
max_client_conn = 10000
default_pool_size = 25
max_db_connections = 100
Django 连接配置
# settings.py
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.postgresql',
'NAME': 'mydb',
'USER': 'user',
'PASSWORD': 'password',
'HOST': '127.0.0.1',
'PORT': '6432', # 连接到 PgBouncer
'CONN_MAX_AGE': 600, # 连接持久化(秒)
'OPTIONS': {
'connect_timeout': 10,
}
}
}
连接持久化(Persistent Connections)
# settings.py
# 连接最大存活时间(秒)
CONN_MAX_AGE = 600 # 10分钟
# 禁用持久化(开发环境)
CONN_MAX_AGE = 0
# 无限持久化(不推荐)
CONN_MAX_AGE = None
18.6 读写分离¶
多数据库配置
# settings.py
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.mysql',
'NAME': 'mydb',
'HOST': 'master.db.com',
'PORT': 3306,
},
'replica': {
'ENGINE': 'django.db.backends.mysql',
'NAME': 'mydb',
'HOST': 'replica.db.com',
'PORT': 3306,
}
}
# 数据库路由
DATABASE_ROUTERS = ['myapp.routers.DatabaseRouter']
数据库路由实现
# myapp/routers.py
class DatabaseRouter:
"""数据库路由"""
def db_for_read(self, model, **hints):
"""读操作路由到从库"""
return 'replica'
def db_for_write(self, model, **hints):
"""写操作路由到主库"""
return 'default'
def allow_relation(self, obj1, obj2, **hints):
"""允许跨数据库关系"""
return True
def allow_migrate(self, db, app_label, model_name=None, **hints):
"""迁移只在主库执行"""
return db == 'default'
手动选择数据库
# 使用特定数据库查询
users = User.objects.using('replica').all()
# 使用特定数据库保存
user.save(using='default')
18.7 慢查询分析¶
Django 查询日志
# settings.py
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'handlers': {
'console': {
'class': 'logging.StreamHandler',
},
},
'loggers': {
'django.db.backends': {
'handlers': ['console'],
'level': 'DEBUG', # 记录所有 SQL
'propagate': False,
},
},
}
查询分析工具
# 使用 Django Debug Toolbar
# pip install django-debug-toolbar
# 使用 django-silk
# pip install django-silk
# settings.py
INSTALLED_APPS += ['silk']
MIDDLEWARE += ['silk.middleware.SilkyMiddleware']
# urls.py
urlpatterns += [path('silk/', include('silk.urls', namespace='silk'))]
分析查询性能
from django.db import connection, reset_queries
# 重置查询日志
reset_queries()
# 执行查询
users = User.objects.select_related('profile').all()
list(users)
# 分析查询
for query in connection.queries:
print(f"Time: {query['time']}")
print(f"SQL: {query['sql']}")
print('---')
print(f"Total queries: {len(connection.queries)}")
EXPLAIN 分析
# 使用 explain()
print(User.objects.filter(age__gt=18).explain())
# 详细分析(PostgreSQL)
print(User.objects.filter(age__gt=18).explain(analyze=True))
18.8 缓存优化¶
数据库查询缓存
from django.core.cache import cache
def get_user_with_cache(user_id):
"""带缓存的用户查询"""
cache_key = f'user:{user_id}'
user = cache.get(cache_key)
if user is None:
try:
user = User.objects.get(id=user_id)
cache.set(cache_key, user, timeout=300)
except User.DoesNotExist:
cache.set(cache_key, None, timeout=60)
return user
模板片段缓存
{% load cache %} {% cache 300 sidebar user.id %}
<!-- 缓存内容 -->
<div class="sidebar">{{ user.profile.bio }}</div>
{% endcache %}
视图缓存
from django.views.decorators.cache import cache_page
@cache_page(60 * 15) # 缓存15分钟
def user_list(request):
users = User.objects.all()
return JsonResponse({'users': list(users)})
18.9 其他优化技巧¶
应用层面优化
# 1. 使用延迟加载
@property
def expensive_property(self):
if not hasattr(self, '_expensive_result'):
self._expensive_result = self.calculate_expensive()
return self._expensive_result
# 2. 使用生成器处理大数据
def process_large_dataset():
for item in LargeModel.objects.all().iterator():
yield process_item(item)
# 3. 异步任务处理耗时操作
# 使用 Django-RQ 或 Celery
django_rq.enqueue(send_email, user_id)
# 4. 数据库连接优化
# 使用连接池
# 调整 CONN_MAX_AGE
代码层面优化
# 1. 避免重复查询
# 错误
for order in orders:
user = User.objects.get(id=order.user_id) # N次查询
# 正确
user_ids = [o.user_id for o in orders]
users = User.objects.filter(id__in=user_ids)
user_map = {u.id: u for u in users}
# 2. 使用 values_list 获取特定字段
user_ids = User.objects.values_list('id', flat=True)
# 3. 使用 annotate 避免额外查询
from django.db.models import Count
users = User.objects.annotate(order_count=Count('orders'))
# 4. 批量操作替代循环
# 使用 bulk_create, bulk_update, update, delete
十九、安全实践¶
19.1 CSRF 防护¶
CSRF 原理
CSRF(跨站请求伪造)攻击利用用户已认证的会话,诱导用户在不知情的情况下执行非预期操作。
Django 的 CSRF 防护
# settings.py
MIDDLEWARE = [
'django.middleware.csrf.CsrfViewMiddleware', # 默认启用
...
]
# 前后端分离场景(API)
# 通常禁用 CSRF,使用 Token 认证
from django.views.decorators.csrf import csrf_exempt
@csrf_exempt
def api_view(request):
pass
# 或在 urls.py 中禁用
from django.views.decorators.csrf import csrf_exempt
urlpatterns = [
path('api/', csrf_exempt(views.api_view)),
]
传统表单场景的 CSRF 保护
<!-- 模板中使用 CSRF Token -->
<form method="post">
{% csrf_token %}
<input type="text" name="username" />
<button type="submit">提交</button>
</form>
# AJAX 请求携带 CSRF Token
# 从 Cookie 中获取 csrftoken
function getCookie(name) {
let cookieValue = null;
if (document.cookie && document.cookie !== '') {
const cookies = document.cookie.split(';');
for (let i = 0; i < cookies.length; i++) {
const cookie = cookies[i].trim();
if (cookie.substring(0, name.length + 1) === (name + '=')) {
cookieValue = decodeURIComponent(cookie.substring(name.length + 1));
break;
}
}
}
return cookieValue;
}
fetch('/api/endpoint/', {
method: 'POST',
headers: {
'X-CSRFToken': getCookie('csrftoken'),
},
body: JSON.stringify(data)
});
19.2 XSS 防护¶
XSS 攻击类型
存储型 XSS:恶意脚本存储在服务器
反射型 XSS:恶意脚本通过 URL 传递
DOM 型 XSS:通过修改页面 DOM 结构攻击
Django 的 XSS 防护
# 1. 模板自动转义
# 默认情况下,Django 模板会自动转义 HTML
{{ user_input }} # 自动转义 <script> 等标签
# 2. 标记安全内容(谨慎使用)
from django.utils.safestring import mark_safe
# 错误:直接标记用户输入为安全
user_content = mark_safe(request.POST['content']) # 危险!
# 正确:只标记可信内容
safe_html = mark_safe('<strong>加粗文本</strong>')
# 3. 使用 bleach 清理 HTML
# pip install bleach
import bleach
allowed_tags = ['p', 'br', 'strong', 'em', 'a']
allowed_attrs = {'a': ['href', 'title']}
cleaned_html = bleach.clean(
user_input,
tags=allowed_tags,
attributes=allowed_attrs,
strip=True
)
内容安全策略(CSP)
# 安装 django-csp
# pip install django-csp
# settings.py
INSTALLED_APPS += ['csp']
MIDDLEWARE += ['csp.middleware.CSPMiddleware']
# CSP 配置
CSP_DEFAULT_SRC = ("'self'",)
CSP_SCRIPT_SRC = ("'self'", "'unsafe-inline'", 'cdn.example.com')
CSP_STYLE_SRC = ("'self'", "'unsafe-inline'")
CSP_IMG_SRC = ("'self'", 'data:', 'cdn.example.com')
CSP_FONT_SRC = ("'self'", 'fonts.gstatic.com')
CSP_CONNECT_SRC = ("'self'", 'api.example.com')
# 报告违规
CSP_REPORT_URI = '/csp-report/'
CSP_REPORT_ONLY = False # 设置为 True 仅报告不阻止
19.3 SQL 注入防护¶
Django ORM 的防护
# Django ORM 自动防止 SQL 注入
users = User.objects.filter(username=user_input) # 安全
# 使用参数化查询
from django.db import connection
with connection.cursor() as cursor:
# 安全:使用参数化查询
cursor.execute(
"SELECT * FROM users WHERE username = %s",
[user_input]
)
# 危险:字符串拼接
cursor.execute(
f"SELECT * FROM users WHERE username = '{user_input}'"
) # 不要这样做!
Raw SQL 安全实践
# 安全的 Raw Query
users = User.objects.raw(
'SELECT * FROM users WHERE age > %s',
[min_age]
)
# 使用字典参数
with connection.cursor() as cursor:
cursor.execute(
"SELECT * FROM users WHERE username = %(username)s",
{'username': user_input}
)
19.4 敏感数据加密¶
密码安全
# Django 自动处理密码哈希
from django.contrib.auth.hashers import make_password, check_password
# 创建用户时自动哈希密码
user = User.objects.create_user(
username='john',
password='user_password' # 自动使用 PBKDF2 哈希
)
# 验证密码
if check_password(input_password, user.password):
pass
# 配置密码哈希(settings.py)
PASSWORD_HASHERS = [
'django.contrib.auth.hashers.PBKDF2PasswordHasher', # 默认
'django.contrib.auth.hashers.Argon2PasswordHasher', # 推荐
'django.contrib.auth.hashers.BCryptSHA256PasswordHasher',
]
# Argon2 需要安装:pip install django[argon2]
字段级加密
# 使用 django-cryptography
# pip install django-cryptography
from django_cryptography.fields import encrypt
class UserProfile(models.Model):
user = models.OneToOneField(User, on_delete=models.CASCADE)
# 自动加密存储
id_card = encrypt(models.CharField(max_length=18))
phone = encrypt(models.CharField(max_length=20))
address = encrypt(models.TextField())
# 普通字段
bio = models.TextField()
环境变量管理敏感信息
# 使用 django-environ
# pip install django-environ
import environ
env = environ.Env(
DEBUG=(bool, False)
)
# 读取 .env 文件
environ.Env.read_env()
# settings.py
SECRET_KEY = env('SECRET_KEY')
DATABASES = {
'default': env.db(), # DATABASE_URL
}
EMAIL_HOST_PASSWORD = env('EMAIL_HOST_PASSWORD')
AWS_SECRET_ACCESS_KEY = env('AWS_SECRET_ACCESS_KEY')
19.5 API 限流¶
Django Ratelimit
pip install django-ratelimit
from ratelimit.decorators import ratelimit
from ratelimit.core import get_usage
# 基于函数的限流
@ratelimit(key='ip', rate='10/m', method='POST')
def login_view(request):
"""每分钟最多10次登录尝试"""
pass
# 基于类的限流
from ratelimit.mixins import RatelimitMixin
class APIView(RatelimitMixin, View):
ratelimit_key = 'user'
ratelimit_rate = '100/h'
ratelimit_method = 'GET'
def get(self, request):
pass
# 自定义限流键
def get_client_ip(request):
"""获取真实 IP"""
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
return x_forwarded_for.split(',')[0]
return request.META.get('REMOTE_ADDR')
@ratelimit(key=get_client_ip, rate='5/m')
def sensitive_api(request):
pass
自定义限流中间件
from django.core.cache import cache
from django.http import JsonResponse
class RateLimitMiddleware:
"""API 限流中间件"""
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
# 获取客户端标识
client_id = self.get_client_id(request)
# 检查限流
if self.is_rate_limited(client_id):
return JsonResponse(
{'code': 429, 'msg': '请求过于频繁,请稍后再试'},
status=429
)
return self.get_response(request)
def get_client_id(self, request):
"""获取客户端标识"""
if request.user.is_authenticated:
return f"user:{request.user.id}"
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
return f"ip:{x_forwarded_for.split(',')[0]}"
return f"ip:{request.META.get('REMOTE_ADDR')}"
def is_rate_limited(self, client_id):
"""检查是否超过限流阈值"""
key = f"ratelimit:{client_id}"
# 使用滑动窗口算法
current = cache.get(key, 0)
if current >= 100: # 每小时100次
return True
cache.set(key, current + 1, timeout=3600)
return False
19.6 参数签名¶
API 签名验证
import hashlib
import hmac
import time
from django.conf import settings
def generate_signature(params, secret_key):
"""生成签名"""
# 按参数名排序
sorted_params = sorted(params.items())
# 构建签名字符串
sign_string = '&'.join([f"{k}={v}" for k, v in sorted_params])
sign_string += f"&key={secret_key}"
# 生成 MD5 签名
return hashlib.md5(sign_string.encode()).hexdigest().upper()
def verify_request(request):
"""验证请求签名"""
# 获取参数
params = request.GET if request.method == 'GET' else request.POST
# 获取签名
signature = params.get('sign')
timestamp = params.get('timestamp')
# 验证时间戳(防止重放攻击)
if not timestamp or abs(time.time() - int(timestamp)) > 300:
return False, '请求已过期'
# 移除签名参数
check_params = {k: v for k, v in params.items() if k != 'sign'}
# 验证签名
expected_sign = generate_signature(check_params, settings.API_SECRET_KEY)
if signature != expected_sign:
return False, '签名错误'
return True, None
# 使用装饰器
from functools import wraps
def require_signature(view_func):
@wraps(view_func)
def wrapper(request, *args, **kwargs):
valid, error = verify_request(request)
if not valid:
return JsonResponse({'code': 403, 'msg': error}, status=403)
return view_func(request, *args, **kwargs)
return wrapper
@require_signature
def api_endpoint(request):
pass
HMAC 签名(更安全)
import hmac
import hashlib
import base64
def generate_hmac_signature(data, secret_key):
"""生成 HMAC-SHA256 签名"""
message = '&'.join([f"{k}={v}" for k, v in sorted(data.items())])
signature = hmac.new(
secret_key.encode(),
message.encode(),
hashlib.sha256
).digest()
return base64.b64encode(signature).decode()
def verify_hmac_signature(data, signature, secret_key):
"""验证 HMAC 签名"""
expected = generate_hmac_signature(data, secret_key)
return hmac.compare_digest(expected, signature) # 防时序攻击
19.7 安全配置清单¶
生产环境安全配置
# settings/production.py
# 1. 关闭调试模式
DEBUG = False
# 2. 允许的主机
ALLOWED_HOSTS = ['yourdomain.com', 'www.yourdomain.com']
# 3. 安全中间件
MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware',
'django.middleware.http.ConditionalGetMiddleware',
...
]
# 4. HTTPS 配置
SECURE_SSL_REDIRECT = True
SECURE_PROXY_SSL_HEADER = ('HTTP_X_FORWARDED_PROTO', 'https')
# 5. HSTS(HTTP Strict Transport Security)
SECURE_HSTS_SECONDS = 31536000 # 1年
SECURE_HSTS_INCLUDE_SUBDOMAINS = True
SECURE_HSTS_PRELOAD = True
# 6. Cookie 安全
SESSION_COOKIE_SECURE = True
CSRF_COOKIE_SECURE = True
SESSION_COOKIE_HTTPONLY = True
CSRF_COOKIE_HTTPONLY = True
SESSION_COOKIE_SAMESITE = 'Strict'
# 7. 内容安全
SECURE_CONTENT_TYPE_NOSNIFF = True
SECURE_BROWSER_XSS_FILTER = True
X_FRAME_OPTIONS = 'DENY' # 防止点击劫持
# 8. 密码验证
AUTH_PASSWORD_VALIDATORS = [
{'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator'},
{'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator', 'OPTIONS': {'min_length': 8}},
{'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator'},
{'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator'},
]
安全检查命令
# Django 安全检查
python manage.py check --deploy
# 检查依赖漏洞
pip install safety
safety check
# 检查代码安全问题
pip install bandit
bandit -r .
安全响应头
# 使用 django-csp 和自定义中间件
class SecurityHeadersMiddleware:
"""安全响应头中间件"""
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
response = self.get_response(request)
# 安全响应头
response['X-Content-Type-Options'] = 'nosniff'
response['X-Frame-Options'] = 'DENY'
response['X-XSS-Protection'] = '1; mode=block'
response['Referrer-Policy'] = 'strict-origin-when-cross-origin'
response['Permissions-Policy'] = 'geolocation=(), microphone=(), camera=()'
return response
安全开发最佳实践
# 1. 输入验证
from django.core.validators import validate_email
from django.core.exceptions import ValidationError
def safe_view(request):
email = request.POST.get('email')
try:
validate_email(email)
except ValidationError:
return JsonResponse({'code': 400, 'msg': '邮箱格式错误'})
# 2. 输出编码
from django.utils.html import escape
user_input = request.GET.get('q')
safe_output = escape(user_input) # HTML 转义
# 3. 文件上传安全
import os
from django.core.exceptions import ValidationError
def validate_file_extension(file):
ext = os.path.splitext(file.name)[1].lower()
allowed = ['.jpg', '.png', '.pdf']
if ext not in allowed:
raise ValidationError('不支持的文件类型')
# 4. 日志脱敏
import logging
import re
class SensitiveDataFilter(logging.Filter):
"""敏感数据过滤器"""
def filter(self, record):
# 脱敏手机号
record.msg = re.sub(r'\d{3}\d{4}\d{4}', r'\g<0>****', str(record.msg))
# 脱敏身份证号
record.msg = re.sub(r'(\d{6})\d{8}(\d{4})', r'\g<0>********\g<0>', str(record.msg))
return True
LOGGING = {
'filters': {
'sensitive_data': {
'()': SensitiveDataFilter,
}
},
'handlers': {
'console': {
'filters': ['sensitive_data'],
}
}
}
二十、Admin 后台¶
20.1 基本注册¶
from django.contrib import admin
from .models import User, Order
admin.site.register(User)
# 或使用装饰器
@admin.register(Order)
class OrderAdmin(admin.ModelAdmin):
pass
20.2 自定义 Admin¶
@admin.register(Order)
class OrderAdmin(admin.ModelAdmin):
list_display = ['id', 'order_no', 'user', 'status', 'amount', 'created_at']
list_filter = ['status', 'created_at']
search_fields = ['order_no', 'user__username']
ordering = ['-created_at']
readonly_fields = ['order_no', 'created_at']
list_per_page = 20
date_hierarchy = 'created_at'
fieldsets = [
('基本信息', {'fields': ['order_no', 'user', 'status']}),
('金额', {'fields': ['amount']}),
('时间', {'fields': ['created_at'], 'classes': ['collapse']}),
]
# 自定义列
def user_email(self, obj):
return obj.user.email
user_email.short_description = '用户邮箱'
list_display = ['id', 'order_no', 'user_email', 'status', 'amount']
# 批量操作
actions = ['mark_as_paid']
@admin.action(description='标记为已支付')
def mark_as_paid(self, request, queryset):
count = queryset.update(status='paid')
self.message_user(request, f'已标记 {count} 个订单为已支付')
# 控制权限
def has_delete_permission(self, request, obj=None):
return request.user.is_superuser
二十一、测试¶
21.1 单元测试¶
# apps/users/tests.py
from django.test import TestCase
from django.contrib.auth import get_user_model
User = get_user_model()
class UserModelTest(TestCase):
def setUp(self):
self.user = User.objects.create_user(
username='testuser',
email='test@example.com',
password='testpass123'
)
def test_user_created(self):
self.assertEqual(self.user.username, 'testuser')
self.assertTrue(self.user.is_active)
def test_password_hashed(self):
self.assertFalse(self.user.password == 'testpass123')
self.assertTrue(self.user.check_password('testpass123'))
21.2 接口测试¶
from django.test import TestCase
from rest_framework.test import APITestCase, APIClient
from rest_framework import status
class UserAPITest(APITestCase):
def setUp(self):
self.user = User.objects.create_user(
username='admin', email='admin@test.com', password='admin123'
)
self.client = APIClient()
def test_register(self):
response = self.client.post('/api/users/', {
'username': 'newuser',
'email': 'new@test.com',
'password': 'pass123',
'confirm_password': 'pass123'
}, format='json')
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
self.assertEqual(response.data['data']['username'], 'newuser')
def test_login(self):
response = self.client.post('/api/auth/login/', {
'username': 'admin@test.com',
'password': 'admin123'
}, format='json')
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertIn('access', response.data)
def test_authenticated_request(self):
self.client.force_authenticate(user=self.user) # 跳过认证
response = self.client.get('/api/users/me/')
self.assertEqual(response.status_code, status.HTTP_200_OK)
def test_unauthorized(self):
response = self.client.get('/api/users/')
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
# 运行测试
python manage.py test
python manage.py test apps.users
python manage.py test apps.users.tests.UserAPITest
python manage.py test --verbosity=2
python manage.py test --keepdb # 保留测试数据库(加速)
二十二、部署与运维¶
22.1 Gunicorn¶
# pip install gunicorn
gunicorn config.wsgi:application \
--workers 4 \
--worker-class gthread \
--threads 2 \
--bind 0.0.0.0:8000 \
--timeout 120 \
--max-requests 1000 \
--max-requests-jitter 100 \
--access-logfile /var/log/app/access.log \
--error-logfile /var/log/app/error.log
# 收集静态文件
python manage.py collectstatic --noinput
22.2 Nginx 配置¶
upstream django_app {
server 127.0.0.1:8000;
}
server {
listen 80;
server_name yourdomain.com;
location /static/ {
alias /path/to/staticfiles/;
expires 30d;
add_header Cache-Control "public, immutable";
}
location /media/ {
alias /path/to/media/;
}
location / {
proxy_pass http://django_app;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_read_timeout 120;
client_max_body_size 20M;
}
}
22.3 Docker 部署¶
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements/production.txt requirements.txt
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
RUN python manage.py collectstatic --noinput
EXPOSE 8000
CMD ["gunicorn", "config.wsgi:application", "--workers", "4", "--bind", "0.0.0.0:8000"]
# docker-compose.yml
version: "3.8"
services:
web:
build: .
environment:
- DJANGO_SETTINGS_MODULE=config.settings.production
- DATABASE_URL=mysql://user:pass@db:3306/mydb
- REDIS_URL=redis://redis:6379/0
- SECRET_KEY=${SECRET_KEY}
depends_on:
- db
- redis
restart: unless-stopped
celery:
build: .
command: celery -A config.celery worker --loglevel=info
environment:
- DJANGO_SETTINGS_MODULE=config.settings.production
- DATABASE_URL=mysql://user:pass@db:3306/mydb
depends_on:
- db
- redis
celery-beat:
build: .
command: celery -A config.celery beat --loglevel=info
depends_on:
- redis
db:
image: mysql:8.0
environment:
MYSQL_ROOT_PASSWORD: rootpass
MYSQL_DATABASE: mydb
MYSQL_USER: user
MYSQL_PASSWORD: pass
volumes:
- mysql_data:/var/lib/mysql
redis:
image: redis:7-alpine
volumes:
- redis_data:/data
nginx:
image: nginx:alpine
ports:
- "80:80"
volumes:
- ./nginx.conf:/etc/nginx/conf.d/default.conf
- static_files:/app/staticfiles
depends_on:
- web
volumes:
mysql_data:
redis_data:
static_files:
22.4 生产环境检查清单¶
# Django 内置安全检查
python manage.py check --deploy
# 常见生产配置
DEBUG = False
ALLOWED_HOSTS = ['yourdomain.com']
SECURE_SSL_REDIRECT = True # 强制 HTTPS
SECURE_HSTS_SECONDS = 31536000 # HSTS
SESSION_COOKIE_SECURE = True
CSRF_COOKIE_SECURE = True
X_FRAME_OPTIONS = 'DENY'
Django vs Flask vs FastAPI 对比¶
Django |
Flask |
FastAPI |
|
|---|---|---|---|
定位 |
全栈框架(自带 ORM/Admin/Auth) |
微框架(轻量灵活) |
高性能 API 框架 |
ORM |
内置 Django ORM |
无(配合 SQLAlchemy) |
无(配合 Tortoise/SQLModel) |
Admin |
内置强大 Admin |
无 |
无 |
异步支持 |
Django 3.1+ 有限支持 |
Flask 2.0+ 有限支持 |
原生 async/await |
自动文档 |
无(DRF 有 BrowsableAPI) |
无 |
内置 Swagger/ReDoc |
学习曲线 |
较高 |
低 |
中等 |
适合场景 |
传统 Web / 后台系统 |
小型服务 / 原型 |
高性能 API / 微服务 |
常用扩展汇总¶
库 |
功能 |
|---|---|
django-redis |
Redis 缓存 |
django-rq |
任务队列(基于 Redis Queue) |
rq-scheduler |
RQ 定时任务 |
django-cors-headers |
CORS 跨域 |
django-environ |
环境变量管理 |
django-debug-toolbar |
开发调试工具栏 |
django-extensions |
扩展命令 |
Pillow |
图片处理(ImageField 依赖) |
django-storages |
对象存储(S3/OSS) |
psycopg2-binary |
PostgreSQL 驱动 |
mysqlclient |
MySQL 驱动 |
gunicorn |
WSGI 服务器 |
sentry-sdk |
错误追踪 |
参考资源¶
Django 官方文档:https://docs.djangoproject.com/
Django-RQ 文档:https://github.com/rq/django-rq
RQ 文档:https://python-rq.org/
django-redis 文档:https://github.com/jazzband/django-redis