Sanic¶
从基础到高级,涵盖路由、请求响应、中间件、蓝图、ORM、认证、WebSocket、部署等核心知识点。以 Sanic 23.x 为基准。
目录¶
一、基础篇¶
1.1 安装与项目结构¶
# 安装
pip install sanic sanic-ext
pip install tortoise-orm aerich aiomysql aiosqlite
pip install pydantic python-jose passlib redis
# 推荐项目结构
myapp/
├── app/
│ ├── __init__.py
│ ├── main.py # Sanic 应用入口
│ ├── config.py # 配置管理
│ ├── database.py # 数据库配置
│ ├── models/ # Tortoise ORM 模型
│ │ ├── __init__.py
│ │ └── user.py
│ ├── schemas/ # Pydantic 模型
│ │ ├── __init__.py
│ │ └── user.py
│ ├── blueprints/ # 蓝图
│ │ ├── __init__.py
│ │ ├── auth.py
│ │ └── user.py
│ ├── middleware/ # 中间件
│ │ └── auth.py
│ └── utils/ # 工具函数
│ └── response.py
├── migrations/ # Aerich 迁移文件
├── tests/
├── .env
├── pyproject.toml
└── requirements.txt
最简示例
from sanic import Sanic
from sanic.response import json
app = Sanic('MyApp')
@app.get('/')
async def index(request):
return json({'msg': 'Hello Sanic!'})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8000, debug=True, auto_reload=True)
1.2 路由与视图¶
from sanic import Sanic, Blueprint
from sanic.response import json, text, html, redirect, empty
from sanic.views import HTTPMethodView
app = Sanic('MyApp')
# 基本路由
@app.get('/')
async def index(request):
return json({'msg': 'ok'})
# 路径参数
@app.get('/users/<user_id:int>')
async def get_user(request, user_id: int):
return json({'user_id': user_id})
# 路径参数类型
# <name> str(默认)
# <name:int> 整数
# <name:float> 浮点数
# <name:uuid> UUID
# <name:slug> slug(字母数字横线)
# <name:alpha> 纯字母
# <name:path> 路径(含/)
# 多方法
@app.route('/login', methods=['GET', 'POST'])
async def login(request):
if request.method == 'POST':
return json({'msg': 'do login'})
return html('<form>...</form>')
# 查询参数
@app.get('/users')
async def list_users(request):
page = int(request.args.get('page', 1))
size = int(request.args.get('size', 20))
keyword = request.args.get('keyword', '')
return json({'page': page, 'size': size, 'keyword': keyword})
# 类视图(CBV)
class UserView(HTTPMethodView):
async def get(self, request, user_id: int):
return json({'user_id': user_id})
async def put(self, request, user_id: int):
return json({'msg': 'updated'})
async def delete(self, request, user_id: int):
return empty(status=204)
app.add_route(UserView.as_view(), '/users/<user_id:int>')
# URL 生成
app.url_for('get_user', user_id=1) # /users/1
app.url_for('index', _external=True) # http://localhost:8000/
1.3 请求与响应¶
from sanic.request import Request
from sanic.response import json, text, html, redirect, file, empty, ResponseStream
# 请求对象
request.method # GET / POST ...
request.args # 查询参数(RequestParameters,支持多值)
request.form # 表单数据
request.json # JSON body(自动解析)
request.body # 原始 body(bytes)
request.files # 上传文件
request.headers # 请求头
request.cookies # Cookie
request.ip # 客户端 IP
request.url # 完整 URL
request.path # 路径
request.token # Bearer Token(自动解析 Authorization 头)
request.ctx # 请求上下文(可挂载任意数据)
request.app # 当前 Sanic 应用实例
# 多值参数
request.args.get('tag') # 取第一个值
request.args.getlist('tag') # 取所有值 ['a', 'b']
# 响应方式
return json({'code': 0, 'data': {}})
return json({'code': 0}, status=201)
return text('plain text')
return html('<h1>HTML</h1>')
return redirect('/new-path')
return empty(status=204) # 无内容响应
# 自定义响应头和 Cookie
response = json({'msg': 'ok'})
response.headers['X-Custom'] = 'value'
response.cookies['token'] = 'xxx'
response.cookies['token']['httponly'] = True
response.cookies['token']['max-age'] = 3600
return response
# 删除 Cookie
response.delete_cookie('token')
# 文件响应
return await file('/path/to/file.pdf', filename='report.pdf')
# 流式响应
async def stream_generator(response):
for i in range(10):
await response.write(f'chunk {i}\n')
return ResponseStream(stream_generator, content_type='text/plain')
1.4 配置管理¶
# config.py
import os
from dataclasses import dataclass
@dataclass
class Config:
DEBUG: bool = False
SECRET_KEY: str = os.getenv('SECRET_KEY', 'dev-secret')
DATABASE_URL: str = os.getenv('DATABASE_URL', 'sqlite://db.sqlite3')
REDIS_URL: str = os.getenv('REDIS_URL', 'redis://localhost:6379/0')
JWT_EXPIRE_MINUTES: int = int(os.getenv('JWT_EXPIRE_MINUTES', 120))
# main.py
app = Sanic('MyApp')
# 方式1:从对象加载
app.config.update_config(Config)
# 方式2:从 .env 文件加载
app.config.load_env(prefix='APP_', lowercase=True)
# APP_DEBUG=true → app.config.debug = True
# 方式3:直接设置
app.config.SECRET_KEY = 'my-secret'
app.config.update({
'DATABASE_URL': 'mysql+aiomysql://user:pass@localhost/mydb'
})
# 读取配置
app.config.SECRET_KEY
app.config.get('DATABASE_URL', 'default')
# 视图中访问
@app.get('/')
async def index(request):
secret = request.app.config.SECRET_KEY
return json({'debug': request.app.config.DEBUG})
1.5 应用生命周期¶
from sanic import Sanic
from tortoise import Tortoise
import redis.asyncio as aioredis
app = Sanic('MyApp')
# 启动前(before_server_start)
@app.before_server_start
async def init_db(app, loop):
await Tortoise.init(config=TORTOISE_ORM)
await Tortoise.generate_schemas()
print('数据库已连接')
@app.before_server_start
async def init_redis(app, loop):
app.ctx.redis = aioredis.from_url(app.config.REDIS_URL, decode_responses=True)
print('Redis 已连接')
# 启动后(after_server_start)
@app.after_server_start
async def notify_started(app, loop):
print(f'服务已启动: http://0.0.0.0:{app.config.get("PORT", 8000)}')
# 停止前(before_server_stop)
@app.before_server_stop
async def close_db(app, loop):
await Tortoise.close_connections()
print('数据库连接已关闭')
@app.before_server_stop
async def close_redis(app, loop):
await app.ctx.redis.aclose()
print('Redis 连接已关闭')
# 停止后(after_server_stop)
@app.after_server_stop
async def cleanup(app, loop):
print('清理完成')
app.ctx是 Sanic 提供的应用级上下文对象,可以挂载任意共享资源(数据库连接、Redis 客户端等)。
二、Pydantic 集成篇¶
Sanic-ext 内置了对 Pydantic 的集成,可以自动验证和注入请求数据。
2.1 请求验证¶
# pip install sanic-ext pydantic
from sanic import Sanic
from sanic_ext import Extend
from pydantic import BaseModel, Field, EmailStr
from typing import Optional
app = Sanic('MyApp')
Extend(app) # 启用 sanic-ext
# 定义 Pydantic Schema
class UserCreate(BaseModel):
username: str = Field(..., min_length=3, max_length=50)
email: EmailStr
password: str = Field(..., min_length=6)
age: Optional[int] = Field(None, ge=1, le=150)
# 方式1:使用 sanic-ext 的 @app.ext.openapi.body 自动注入
from sanic_ext import validate
@app.post('/users')
@validate(json=UserCreate)
async def create_user(request, body: UserCreate):
# body 已经是经过验证的 UserCreate 实例
return json({'username': body.username, 'email': body.email})
# 方式2:手动解析和验证
@app.post('/users')
async def create_user(request):
try:
body = UserCreate(**request.json)
except ValueError as e:
return json({'code': 422, 'errors': str(e)}, status=422)
return json({'username': body.username})
# 查询参数验证
class QueryParams(BaseModel):
page: int = Field(1, ge=1)
size: int = Field(20, ge=1, le=100)
keyword: Optional[str] = None
@app.get('/users')
@validate(query=QueryParams)
async def list_users(request, query: QueryParams):
return json({'page': query.page, 'size': query.size})
2.2 响应序列化与 OpenAPI 文档¶
from sanic_ext import openapi
class UserResponse(BaseModel):
id: int
username: str
email: str
is_active: bool
model_config = {'from_attributes': True}
@app.get('/users/<user_id:int>')
@openapi.response(200, {'application/json': UserResponse}, description='用户详情')
@openapi.parameter('user_id', int, 'path', description='用户 ID')
async def get_user(request, user_id: int):
user = await User.get_or_none(id=user_id)
if not user:
return json({'code': 404, 'msg': '用户不存在'}, status=404)
return json(UserResponse.model_validate(user).model_dump())
# 访问自动生成的文档
# Swagger UI: http://localhost:8000/docs
# Redoc: http://localhost:8000/redoc
三、ORM 篇(Tortoise ORM)¶
3.1 安装与配置¶
# database.py
TORTOISE_ORM = {
'connections': {
'default': {
'engine': 'tortoise.backends.mysql',
'credentials': {
'host': 'localhost',
'port': 3306,
'user': 'root',
'password': 'password',
'database': 'mydb',
'charset': 'utf8mb4',
'minsize': 5,
'maxsize': 20,
}
}
},
'apps': {
'models': {
'models': ['app.models', 'aerich.models'],
'default_connection': 'default',
}
},
'use_tz': False,
'timezone': 'Asia/Shanghai'
}
# main.py
from tortoise.contrib.sanic import register_tortoise
app = Sanic('MyApp')
register_tortoise(
app,
config=TORTOISE_ORM,
generate_schemas=False # 生产用 aerich 迁移
)
3.2 模型定义¶
# app/models/user.py
from tortoise import fields
from tortoise.models import Model
class TimestampMixin:
created_at = fields.DatetimeField(auto_now_add=True)
updated_at = fields.DatetimeField(auto_now=True)
class User(TimestampMixin, Model):
id = fields.BigIntField(pk=True)
username = fields.CharField(max_length=50, unique=True)
email = fields.CharField(max_length=120, unique=True, index=True)
password = fields.CharField(max_length=256)
age = fields.IntField(null=True)
balance = fields.DecimalField(max_digits=12, decimal_places=2, default=0)
is_active = fields.BooleanField(default=True)
role = fields.CharField(max_length=20, default='user')
class Meta:
table = 'users'
ordering = ['-created_at']
def __str__(self):
return f'<User {self.username}>'
class Order(TimestampMixin, Model):
id = fields.BigIntField(pk=True)
user = fields.ForeignKeyField('models.User', related_name='orders', on_delete=fields.CASCADE)
order_no = fields.CharField(max_length=32, unique=True)
status = fields.CharField(max_length=20, default='pending')
amount = fields.DecimalField(max_digits=12, decimal_places=2)
class Meta:
table = 'orders'
unique_together = (('user_id', 'order_no'),)
indexes = (('user_id', 'status'),)
3.3 增删改查¶
Tortoise ORM 与 FastAPI 教程中的用法完全一致,以下重点列出 Sanic 场景下常用写法。
from tortoise.expressions import Q, F
from tortoise.functions import Count, Sum
from tortoise.transactions import in_transaction
# ---- 增 ----
user = await User.create(username='alice', email='alice@example.com', password='hashed')
user, created = await User.get_or_create(email='alice@example.com', defaults={'username': 'alice'})
await User.bulk_create([User(username='u1', email='u1@a.com', password='h')])
# ---- 查 ----
user = await User.get(id=1) # 不存在抛异常
user = await User.get_or_none(id=1) # 不存在返回 None
users = await User.filter(is_active=True).all()
users = await User.filter(Q(age__gte=18) | Q(is_vip=True)).all()
# 分页
page, size = 1, 20
users = await User.all().offset((page - 1) * size).limit(size)
total = await User.all().count()
# 关联预加载
orders = await Order.all().select_related('user') # JOIN
users = await User.all().prefetch_related('orders') # 批量子查询
# 聚合
stats = await Order.annotate(cnt=Count('id')).group_by('status').values('status', 'cnt')
# ---- 改 ----
user = await User.get(id=1)
user.username = 'new_name'
await user.save()
await user.save(update_fields=['username']) # 只更新指定字段
await User.filter(is_active=False).update(deleted=True)
await Order.filter(id=1).update(amount=F('amount') + 10)
# ---- 删 ----
user = await User.get(id=1)
await user.delete()
await User.filter(is_active=False).delete()
# ---- 事务 ----
async with in_transaction():
user = await User.create(username='alice', email='a@b.com', password='h')
order = await Order.create(user=user, order_no='ON001', amount=99.9)
3.4 迁移(Aerich)¶
pip install aerich
# pyproject.toml
[tool.aerich]
tortoise_orm = "app.database.TORTOISE_ORM"
location = "./migrations"
src_folder = "./."
aerich init -t app.database.TORTOISE_ORM
aerich init-db
aerich migrate --name "add_users_table"
aerich upgrade
aerich downgrade
aerich history
四、中间件篇¶
4.1 中间件机制¶
Sanic 中间件分两类:request(请求前)和 response(响应后)。
# 请求中间件
@app.middleware('request')
async def add_request_id(request):
import uuid
request.ctx.request_id = str(uuid.uuid4())
# 返回 None 继续处理;返回 Response 则直接响应,跳过视图
# 响应中间件
@app.middleware('response')
async def add_response_headers(request, response):
response.headers['X-Request-ID'] = getattr(request.ctx, 'request_id', '')
response.headers['X-Server'] = 'Sanic'
# 必须返回 response 或 None
# 计时中间件
import time
@app.middleware('request')
async def start_timer(request):
request.ctx.start_time = time.time()
@app.middleware('response')
async def add_process_time(request, response):
duration = time.time() - getattr(request.ctx, 'start_time', time.time())
response.headers['X-Process-Time'] = f'{duration:.4f}s'
return response
4.2 认证中间件¶
@app.middleware('request')
async def auth_middleware(request):
# 白名单路径跳过验证
skip_paths = ['/api/auth/login', '/api/auth/register', '/health']
if request.path in skip_paths:
return
token = request.token # 自动解析 Bearer Token
if not token:
return json({'code': 401, 'msg': '未登录'}, status=401)
try:
payload = verify_token(token)
user = await User.get_or_none(id=payload['sub'])
if not user or not user.is_active:
return json({'code': 401, 'msg': '用户不存在或已禁用'}, status=401)
request.ctx.user = user
except Exception:
return json({'code': 401, 'msg': 'Token 无效或已过期'}, status=401)
4.3 CORS¶
# 使用 sanic-ext(推荐)
from sanic_ext import Extend
app = Sanic('MyApp')
Extend(app, extensions=['cors'], config={
'CORS_ORIGINS': 'http://localhost:3000,https://yourdomain.com',
'CORS_METHODS': ['GET', 'POST', 'PUT', 'DELETE', 'OPTIONS'],
'CORS_ALLOW_HEADERS': ['Content-Type', 'Authorization'],
'CORS_SUPPORTS_CREDENTIALS': True,
})
# 手动实现 CORS
@app.middleware('request')
async def handle_options(request):
if request.method == 'OPTIONS':
response = empty()
response.headers['Access-Control-Allow-Origin'] = '*'
response.headers['Access-Control-Allow-Methods'] = 'GET, POST, PUT, DELETE'
response.headers['Access-Control-Allow-Headers'] = 'Content-Type, Authorization'
return response
@app.middleware('response')
async def add_cors_headers(request, response):
response.headers['Access-Control-Allow-Origin'] = '*'
return response
五、蓝图篇¶
5.1 基本蓝图¶
# app/blueprints/user.py
from sanic import Blueprint
from sanic.response import json
from sanic_ext import validate
from app.schemas.user import UserCreate, UserUpdate
user_bp = Blueprint('user', url_prefix='/api/users')
@user_bp.get('/')
async def list_users(request):
page = int(request.args.get('page', 1))
size = int(request.args.get('size', 20))
users = await User.all().offset((page-1)*size).limit(size)
total = await User.all().count()
return json({
'code': 0,
'data': {
'list': [u.__dict__ for u in users],
'total': total,
'page': page,
}
})
@user_bp.get('/<user_id:int>')
async def get_user(request, user_id: int):
user = await User.get_or_none(id=user_id)
if not user:
return json({'code': 404, 'msg': '用户不存在'}, status=404)
return json({'code': 0, 'data': user.__dict__})
@user_bp.post('/')
@validate(json=UserCreate)
async def create_user(request, body: UserCreate):
user = await User.create(**body.model_dump(exclude={'confirm_password'}))
return json({'code': 0, 'data': {'id': user.id}}, status=201)
@user_bp.put('/<user_id:int>')
@validate(json=UserUpdate)
async def update_user(request, user_id: int, body: UserUpdate):
user = await User.get_or_none(id=user_id)
if not user:
return json({'code': 404, 'msg': '用户不存在'}, status=404)
await user.update_from_dict(body.model_dump(exclude_none=True))
await user.save()
return json({'code': 0, 'msg': 'ok'})
@user_bp.delete('/<user_id:int>')
async def delete_user(request, user_id: int):
deleted = await User.filter(id=user_id).delete()
if not deleted:
return json({'code': 404, 'msg': '用户不存在'}, status=404)
return json({'code': 0, 'msg': '删除成功'})
# main.py
from app.blueprints.user import user_bp
from app.blueprints.auth import auth_bp
from app.blueprints.order import order_bp
app.blueprint(user_bp)
app.blueprint(auth_bp)
app.blueprint(order_bp)
5.2 蓝图组¶
from sanic import Blueprint, BlueprintGroup
auth_bp = Blueprint('auth', url_prefix='/auth')
user_bp = Blueprint('user', url_prefix='/users')
order_bp = Blueprint('order', url_prefix='/orders')
# 蓝图组(统一前缀 + 统一中间件)
api_group = BlueprintGroup(url_prefix='/api', version=1)
# 访问路径变为 /api/v1/users、/api/v1/orders
api_group.blueprint(user_bp)
api_group.blueprint(order_bp)
app.blueprint(auth_bp) # /api/auth(独立注册)
app.blueprint(api_group) # /api/v1/users、/api/v1/orders
5.3 蓝图中间件¶
# 只对该蓝图生效的中间件
@user_bp.middleware('request')
async def check_user_permission(request):
user = getattr(request.ctx, 'user', None)
if not user:
return json({'code': 401, 'msg': '未登录'}, status=401)
@user_bp.middleware('response')
async def log_user_response(request, response):
print(f'User API: {request.path} -> {response.status}')
return response
六、认证与权限篇¶
6.1 JWT 认证¶
# utils/jwt.py
from datetime import datetime, timedelta
from jose import jwt, JWTError
from passlib.context import CryptContext
SECRET_KEY = 'your-secret-key'
ALGORITHM = 'HS256'
pwd_context = CryptContext(schemes=['bcrypt'])
def hash_password(password: str) -> str:
return pwd_context.hash(password)
def verify_password(plain: str, hashed: str) -> bool:
return pwd_context.verify(plain, hashed)
def create_access_token(user_id: int, role: str = 'user', expire_minutes: int = 120) -> str:
expire = datetime.utcnow() + timedelta(minutes=expire_minutes)
return jwt.encode(
{'sub': str(user_id), 'role': role, 'exp': expire},
SECRET_KEY,
algorithm=ALGORITHM
)
def decode_token(token: str) -> dict:
return jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
# blueprints/auth.py
from sanic import Blueprint
from sanic.response import json
from app.models.user import User
from app.utils.jwt import hash_password, verify_password, create_access_token
auth_bp = Blueprint('auth', url_prefix='/api/auth')
@auth_bp.post('/register')
async def register(request):
data = request.json
if await User.filter(email=data['email']).exists():
return json({'code': 409, 'msg': '邮箱已注册'}, status=409)
user = await User.create(
username=data['username'],
email=data['email'],
password=hash_password(data['password'])
)
return json({'code': 0, 'data': {'id': user.id}}, status=201)
@auth_bp.post('/login')
async def login(request):
data = request.json
user = await User.get_or_none(email=data['email'])
if not user or not verify_password(data['password'], user.password):
return json({'code': 401, 'msg': '账号或密码错误'}, status=401)
if not user.is_active:
return json({'code': 403, 'msg': '账号已被禁用'}, status=403)
token = create_access_token(user.id, user.role)
return json({'code': 0, 'data': {'access_token': token, 'token_type': 'bearer'}})
6.2 装饰器权限¶
# utils/decorators.py
from functools import wraps
from sanic.response import json
from app.utils.jwt import decode_token
from app.models.user import User
def login_required(f):
"""需要登录"""
@wraps(f)
async def decorated(request, *args, **kwargs):
if not hasattr(request.ctx, 'user'):
return json({'code': 401, 'msg': '请先登录'}, status=401)
return await f(request, *args, **kwargs)
return decorated
def require_roles(*roles):
"""需要特定角色"""
def decorator(f):
@wraps(f)
@login_required
async def decorated(request, *args, **kwargs):
user = request.ctx.user
if user.role not in roles:
return json({'code': 403, 'msg': '权限不足'}, status=403)
return await f(request, *args, **kwargs)
return decorated
return decorator
# 使用
@user_bp.delete('/<user_id:int>')
@require_roles('admin', 'superadmin')
async def delete_user(request, user_id: int):
await User.filter(id=user_id).delete()
return json({'code': 0, 'msg': '删除成功'})
七、进阶篇¶
7.1 信号(Signal)¶
Sanic 23.x 内置信号系统,类似 Django 信号。
from sanic.signals import Event
# 内置信号
# Event.HTTP_LIFECYCLE_BEGIN
# Event.HTTP_LIFECYCLE_COMPLETE
# Event.SERVER_INIT_BEFORE
# Event.SERVER_INIT_AFTER
# Event.SERVER_SHUTDOWN_BEFORE
# 监听内置信号
@app.signal(Event.HTTP_LIFECYCLE_COMPLETE)
async def on_request_complete(conn_info, **kwargs):
pass
# 自定义信号
@app.signal('user.registered')
async def on_user_registered(user_id: int, **kwargs):
user = await User.get(id=user_id)
# 发送欢迎邮件、创建默认数据等
print(f'新用户注册: {user.username}')
# 发送自定义信号
async def register_user(data):
user = await User.create(**data)
await app.dispatch('user.registered', context={'user_id': user.id})
return user
# 在蓝图中发送
@auth_bp.post('/register')
async def register(request):
user = await User.create(...)
await request.app.dispatch('user.registered', context={'user_id': user.id})
return json({'code': 0, 'data': {'id': user.id}}, status=201)
7.2 后台任务¶
# 简单后台任务(同进程,非阻塞)
async def send_email(to: str, subject: str):
await asyncio.sleep(1) # 模拟耗时
print(f'发送邮件到 {to}: {subject}')
@app.post('/register')
async def register(request):
user = await User.create(...)
# 不等待,后台执行
app.add_task(send_email(user.email, '欢迎注册'))
return json({'code': 0, 'msg': '注册成功'}, status=201)
# 带名称的任务(可取消)
app.add_task(periodic_cleanup(), name='cleanup')
await app.cancel_task('cleanup')
# 定时后台任务
@app.before_server_start
async def start_periodic_tasks(app, loop):
app.add_task(heartbeat_task())
async def heartbeat_task():
while True:
await asyncio.sleep(60)
print('心跳检测...')
7.3 WebSocket¶
from sanic import Websocket
# 基本 WebSocket
@app.websocket('/ws')
async def ws_handler(request, ws: Websocket):
while True:
data = await ws.recv()
if data is None:
break
await ws.send(f'收到: {data}')
# 聊天室(带连接管理)
connected = set()
@app.websocket('/chat/<room_id>')
async def chat(request, ws: Websocket, room_id: str):
connected.add(ws)
try:
async for message in ws:
# 广播给同房间所有客户端
for conn in list(connected):
if conn != ws:
await conn.send(f'Room {room_id}: {message}')
except Exception:
pass
finally:
connected.discard(ws)
# WebSocket 蓝图
ws_bp = Blueprint('ws', url_prefix='/ws')
@ws_bp.websocket('/notify')
async def notify(request, ws: Websocket):
user = request.ctx.user
while True:
msg = await ws.recv()
await ws.send(json.dumps({'user': user.username, 'msg': msg}))
7.4 流式响应¶
from sanic.response import ResponseStream
# 流式传输大文件或实时数据
@app.get('/stream')
async def stream_data(request):
async def generate(response):
for i in range(100):
await response.write(f'data: chunk {i}\n\n')
await asyncio.sleep(0.1)
return ResponseStream(generate, content_type='text/event-stream')
# Server-Sent Events (SSE)
@app.get('/events')
async def sse(request):
async def event_stream(response):
for i in range(10):
await response.write(f'data: {{"index": {i}}}\n\n')
await asyncio.sleep(1)
return ResponseStream(
event_stream,
content_type='text/event-stream',
headers={
'Cache-Control': 'no-cache',
'X-Accel-Buffering': 'no'
}
)
7.5 文件上传¶
@app.post('/upload')
async def upload_file(request):
if not request.files:
return json({'code': 400, 'msg': '没有上传文件'}, status=400)
file = request.files.get('file')
if not file:
return json({'code': 400, 'msg': '缺少 file 字段'}, status=400)
# 保存文件
import os, uuid
ext = os.path.splitext(file.name)[1]
filename = f'{uuid.uuid4()}{ext}'
save_path = f'./uploads/{filename}'
with open(save_path, 'wb') as f:
f.write(file.body)
return json({'code': 0, 'data': {'filename': filename, 'size': len(file.body)}})
# 多文件上传
@app.post('/uploads')
async def upload_multi(request):
files = request.files.getlist('files')
saved = []
for file in files:
filename = f'{uuid.uuid4()}{os.path.splitext(file.name)[1]}'
with open(f'./uploads/{filename}', 'wb') as f:
f.write(file.body)
saved.append(filename)
return json({'code': 0, 'data': saved})
7.6 异常处理¶
from sanic.exceptions import SanicException, NotFound, Unauthorized, Forbidden
from sanic import errorhandler
# 内置异常
# NotFound (404)
# Unauthorized (401)
# Forbidden (403)
# BadRequest (400)
# ServerError (500)
# MethodNotAllowed (405)
# InvalidUsage (400)
# 全局异常处理
@app.exception(NotFound)
async def not_found_handler(request, exception):
return json({'code': 404, 'msg': '资源不存在'}, status=404)
@app.exception(Unauthorized)
async def unauthorized_handler(request, exception):
return json({'code': 401, 'msg': '未授权'}, status=401)
@app.exception(Exception)
async def global_exception_handler(request, exception):
if isinstance(exception, SanicException):
return json({'code': exception.status_code, 'msg': str(exception)}, status=exception.status_code)
# 未知异常
app.logger.exception(f'Unhandled exception: {exception}')
return json({'code': 500, 'msg': '服务器内部错误'}, status=500)
# 自定义业务异常
class AppException(SanicException):
def __init__(self, message: str, status_code: int = 400):
super().__init__(message, status_code=status_code)
@app.exception(AppException)
async def app_exception_handler(request, exception):
return json({'code': exception.status_code, 'msg': str(exception)}, status=exception.status_code)
# 抛出
raise AppException('用户名已存在', status_code=409)
raise NotFound('用户不存在')
raise Unauthorized('请先登录')
7.7 依赖注入(sanic-ext)¶
from sanic_ext import inject
# 注册可注入的服务
class EmailService:
async def send(self, to: str, subject: str, body: str):
print(f'发送邮件到 {to}')
# 在 lifespan 中注册
@app.before_server_start
async def setup_services(app, loop):
app.ext.dependency(EmailService())
# 视图中注入
@app.post('/register')
@inject
async def register(request, email_service: EmailService):
# email_service 自动注入
await email_service.send(request.json['email'], '欢迎', '...')
return json({'code': 0})
八、缓存与任务篇¶
8.1 Redis 缓存¶
import redis.asyncio as aioredis
import json as json_lib
from functools import wraps
# lifespan 初始化
@app.before_server_start
async def init_redis(app, loop):
app.ctx.redis = aioredis.from_url(app.config.REDIS_URL, decode_responses=True)
@app.before_server_stop
async def close_redis(app, loop):
await app.ctx.redis.aclose()
# 封装缓存工具
class RedisCache:
def __init__(self, redis):
self.redis = redis
async def get(self, key: str):
val = await self.redis.get(key)
return json_lib.loads(val) if val else None
async def set(self, key: str, value, ttl: int = 300):
await self.redis.setex(key, ttl, json_lib.dumps(value))
async def delete(self, key: str):
await self.redis.delete(key)
async def delete_pattern(self, pattern: str):
keys = await self.redis.keys(pattern)
if keys:
await self.redis.delete(*keys)
# 缓存装饰器
def cache(key_func, ttl: int = 300):
def decorator(f):
@wraps(f)
async def wrapper(request, *args, **kwargs):
redis = RedisCache(request.app.ctx.redis)
cache_key = key_func(request, *args, **kwargs)
cached = await redis.get(cache_key)
if cached is not None:
return json(cached)
result = await f(request, *args, **kwargs)
# 缓存响应内容
await redis.set(cache_key, result.body.decode(), ttl)
return result
return wrapper
return decorator
# 使用
@app.get('/hot-articles')
@cache(key_func=lambda req: 'hot_articles', ttl=60)
async def hot_articles(request):
articles = await Article.all().order_by('-views').limit(10).values()
return json({'code': 0, 'data': articles})
8.2 Celery 异步任务¶
# tasks.py
import os
os.environ.setdefault('DJANGO_SETTINGS_MODULE', '') # Sanic 无需设置
from celery import Celery
celery_app = Celery(
'myapp',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/0'
)
celery_app.conf.update(
task_serializer='json',
result_serializer='json',
timezone='Asia/Shanghai'
)
@celery_app.task(bind=True, max_retries=3)
def send_email_task(self, to: str, subject: str, body: str):
try:
email_service.send(to, subject, body)
except Exception as exc:
raise self.retry(exc=exc, countdown=60)
@celery_app.task
def generate_report(user_id: int):
pass
# Sanic 视图中调用(同步调用,不用 await)
@auth_bp.post('/register')
async def register(request):
user = await User.create(...)
send_email_task.delay(user.email, '欢迎注册', '...')
return json({'code': 0, 'msg': '注册成功'}, status=201)
# 定时任务
from celery.schedules import crontab
celery_app.conf.beat_schedule = {
'daily-cleanup': {
'task': 'tasks.generate_report',
'schedule': crontab(hour=2, minute=0),
'args': (1,)
}
}
# 启动
# celery -A tasks.celery_app worker --loglevel=info
# celery -A tasks.celery_app beat --loglevel=info
九、测试篇¶
9.1 接口测试¶
# pip install pytest pytest-asyncio sanic-testing
# tests/conftest.py
import pytest
from sanic import Sanic
from app.main import create_app
@pytest.fixture
def app():
sanic_app = create_app(testing=True)
return sanic_app
@pytest.fixture
def client(app):
return app.test_client
# tests/test_user.py
import pytest
@pytest.mark.asyncio
async def test_create_user(app):
_, response = await app.test_client.post(
'/api/users/',
json={
'username': 'testuser',
'email': 'test@example.com',
'password': '123456'
}
)
assert response.status == 201
data = response.json
assert data['data']['username'] == 'testuser'
@pytest.mark.asyncio
async def test_login(app):
# 先注册
await app.test_client.post('/api/auth/register', json={
'username': 'u', 'email': 'u@a.com', 'password': '123456'
})
# 登录
_, response = await app.test_client.post('/api/auth/login', json={
'email': 'u@a.com', 'password': '123456'
})
assert response.status == 200
assert 'access_token' in response.json['data']
@pytest.mark.asyncio
async def test_not_found(app):
_, response = await app.test_client.get('/api/users/99999')
assert response.status == 404
@pytest.mark.asyncio
async def test_unauthorized(app):
_, response = await app.test_client.get('/api/users/')
assert response.status == 401
9.2 Mock¶
from unittest.mock import patch, AsyncMock
@pytest.mark.asyncio
async def test_with_mock(app):
with patch('app.utils.email.send', new_callable=AsyncMock) as mock_send:
mock_send.return_value = True
_, response = await app.test_client.post('/api/auth/register', json={
'username': 'u', 'email': 'u@a.com', 'password': '123456'
})
assert mock_send.called
assert response.status == 201
十、部署篇¶
10.1 内置服务器 / Uvicorn¶
# 开发环境(内置服务器,支持热重载)
python -m app.main
# 或
sanic app.main:app --dev --host 0.0.0.0 --port 8000
# 生产环境(内置多 worker)
sanic app.main:app --host 0.0.0.0 --port 8000 --workers 4
# 生产环境(Uvicorn,适合 ASGI 部署)
uvicorn app.main:app --host 0.0.0.0 --port 8000 --workers 4
# 生产环境(Gunicorn + Uvicorn worker)
gunicorn app.main:app \
--worker-class uvicorn.workers.UvicornWorker \
--workers 4 \
--bind 0.0.0.0:8000 \
--timeout 120 \
--access-logfile /var/log/app/access.log \
--error-logfile /var/log/app/error.log
10.2 Nginx 配置¶
upstream sanic_app {
server 127.0.0.1:8000;
}
server {
listen 80;
server_name yourdomain.com;
location /static/ {
alias /path/to/static/;
expires 30d;
}
location /uploads/ {
alias /path/to/uploads/;
}
location / {
proxy_pass http://sanic_app;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_read_timeout 120;
client_max_body_size 20M;
}
# WebSocket 支持
location /ws/ {
proxy_pass http://sanic_app;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection 'upgrade';
proxy_read_timeout 86400;
}
}
10.3 Docker 部署¶
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 8000
CMD ["sanic", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]
# docker-compose.yml
version: "3.8"
services:
api:
build: .
ports:
- "8000:8000"
environment:
- SECRET_KEY=${SECRET_KEY}
- DATABASE_URL=mysql+aiomysql://user:pass@db:3306/mydb
- REDIS_URL=redis://redis:6379/0
depends_on:
- db
- redis
restart: unless-stopped
celery:
build: .
command: celery -A app.tasks.celery_app worker --loglevel=info
environment:
- DATABASE_URL=mysql+aiomysql://user:pass@db:3306/mydb
- REDIS_URL=redis://redis:6379/0
depends_on:
- db
- redis
db:
image: mysql:8.0
environment:
MYSQL_ROOT_PASSWORD: rootpass
MYSQL_DATABASE: mydb
MYSQL_USER: user
MYSQL_PASSWORD: pass
volumes:
- mysql_data:/var/lib/mysql
redis:
image: redis:7-alpine
volumes:
- redis_data:/data
nginx:
image: nginx:alpine
ports:
- "80:80"
volumes:
- ./nginx.conf:/etc/nginx/conf.d/default.conf
depends_on:
- api
volumes:
mysql_data:
redis_data:
Sanic vs FastAPI vs Flask 对比¶
Sanic |
FastAPI |
Flask |
|
|---|---|---|---|
性能 |
极高(原生异步) |
极高(原生异步) |
中等(同步) |
异步支持 |
原生 async/await |
原生 async/await |
有限支持 |
自动文档 |
sanic-ext 支持 |
内置 Swagger/ReDoc |
需插件 |
类型验证 |
Pydantic(sanic-ext) |
Pydantic 原生 |
需插件 |
WebSocket |
内置支持 |
内置支持 |
需插件 |
ORM |
Tortoise / SQLAlchemy |
Tortoise / SQLModel |
SQLAlchemy |
生态 |
中等 |
快速增长 |
成熟 |
学习曲线 |
低~中 |
中等 |
低 |
适合场景 |
高性能 API、实时应用 |
高性能 API、微服务 |
小型服务 |
常用扩展汇总¶
库 |
功能 |
|---|---|
sanic-ext |
官方扩展(CORS/DI/验证/文档) |
tortoise-orm |
异步 ORM |
aerich |
Tortoise 数据库迁移 |
pydantic |
数据验证 |
python-jose |
JWT |
passlib |
密码哈希 |
redis[asyncio] |
异步 Redis |
celery |
异步任务队列 |
httpx |
异步 HTTP 客户端 |
sanic-testing |
官方测试工具 |
参考资源¶
Sanic 官方文档:https://sanic.dev/
Sanic-ext 文档:https://sanic.dev/en/plugins/sanic-ext/
Tortoise ORM 文档:https://tortoise.github.io/
Aerich 文档:https://github.com/tortoise/aerich