Sanic

从基础到高级,涵盖路由、请求响应、中间件、蓝图、ORM、认证、WebSocket、部署等核心知识点。以 Sanic 23.x 为基准。


目录


一、基础篇

1.1 安装与项目结构

# 安装
pip install sanic sanic-ext
pip install tortoise-orm aerich aiomysql aiosqlite
pip install pydantic python-jose passlib redis

# 推荐项目结构
myapp/
├── app/
│   ├── __init__.py
│   ├── main.py             # Sanic 应用入口   ├── config.py           # 配置管理   ├── database.py         # 数据库配置   ├── models/             # Tortoise ORM 模型      ├── __init__.py
│      └── user.py
│   ├── schemas/            # Pydantic 模型      ├── __init__.py
│      └── user.py
│   ├── blueprints/         # 蓝图      ├── __init__.py
│      ├── auth.py
│      └── user.py
│   ├── middleware/         # 中间件      └── auth.py
│   └── utils/              # 工具函数       └── response.py
├── migrations/             # Aerich 迁移文件
├── tests/
├── .env
├── pyproject.toml
└── requirements.txt

最简示例

from sanic import Sanic
from sanic.response import json

app = Sanic('MyApp')

@app.get('/')
async def index(request):
    return json({'msg': 'Hello Sanic!'})

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8000, debug=True, auto_reload=True)

1.2 路由与视图

from sanic import Sanic, Blueprint
from sanic.response import json, text, html, redirect, empty
from sanic.views import HTTPMethodView

app = Sanic('MyApp')

# 基本路由
@app.get('/')
async def index(request):
    return json({'msg': 'ok'})

# 路径参数
@app.get('/users/<user_id:int>')
async def get_user(request, user_id: int):
    return json({'user_id': user_id})

# 路径参数类型
# <name>          str(默认)
# <name:int>      整数
# <name:float>    浮点数
# <name:uuid>     UUID
# <name:slug>     slug(字母数字横线)
# <name:alpha>    纯字母
# <name:path>     路径(含/)

# 多方法
@app.route('/login', methods=['GET', 'POST'])
async def login(request):
    if request.method == 'POST':
        return json({'msg': 'do login'})
    return html('<form>...</form>')

# 查询参数
@app.get('/users')
async def list_users(request):
    page    = int(request.args.get('page', 1))
    size    = int(request.args.get('size', 20))
    keyword = request.args.get('keyword', '')
    return json({'page': page, 'size': size, 'keyword': keyword})

# 类视图(CBV)
class UserView(HTTPMethodView):
    async def get(self, request, user_id: int):
        return json({'user_id': user_id})

    async def put(self, request, user_id: int):
        return json({'msg': 'updated'})

    async def delete(self, request, user_id: int):
        return empty(status=204)

app.add_route(UserView.as_view(), '/users/<user_id:int>')

# URL 生成
app.url_for('get_user', user_id=1)           # /users/1
app.url_for('index', _external=True)          # http://localhost:8000/

1.3 请求与响应

from sanic.request import Request
from sanic.response import json, text, html, redirect, file, empty, ResponseStream

# 请求对象
request.method          # GET / POST ...
request.args            # 查询参数(RequestParameters,支持多值)
request.form            # 表单数据
request.json            # JSON body(自动解析)
request.body            # 原始 body(bytes)
request.files           # 上传文件
request.headers         # 请求头
request.cookies         # Cookie
request.ip              # 客户端 IP
request.url             # 完整 URL
request.path            # 路径
request.token           # Bearer Token(自动解析 Authorization 头)
request.ctx             # 请求上下文(可挂载任意数据)
request.app             # 当前 Sanic 应用实例

# 多值参数
request.args.get('tag')         # 取第一个值
request.args.getlist('tag')     # 取所有值 ['a', 'b']

# 响应方式
return json({'code': 0, 'data': {}})
return json({'code': 0}, status=201)
return text('plain text')
return html('<h1>HTML</h1>')
return redirect('/new-path')
return empty(status=204)        # 无内容响应

# 自定义响应头和 Cookie
response = json({'msg': 'ok'})
response.headers['X-Custom'] = 'value'
response.cookies['token'] = 'xxx'
response.cookies['token']['httponly'] = True
response.cookies['token']['max-age'] = 3600
return response

# 删除 Cookie
response.delete_cookie('token')

# 文件响应
return await file('/path/to/file.pdf', filename='report.pdf')

# 流式响应
async def stream_generator(response):
    for i in range(10):
        await response.write(f'chunk {i}\n')

return ResponseStream(stream_generator, content_type='text/plain')

1.4 配置管理

# config.py
import os
from dataclasses import dataclass

@dataclass
class Config:
    DEBUG: bool = False
    SECRET_KEY: str = os.getenv('SECRET_KEY', 'dev-secret')
    DATABASE_URL: str = os.getenv('DATABASE_URL', 'sqlite://db.sqlite3')
    REDIS_URL: str = os.getenv('REDIS_URL', 'redis://localhost:6379/0')
    JWT_EXPIRE_MINUTES: int = int(os.getenv('JWT_EXPIRE_MINUTES', 120))

# main.py
app = Sanic('MyApp')

# 方式1:从对象加载
app.config.update_config(Config)

# 方式2:从 .env 文件加载
app.config.load_env(prefix='APP_', lowercase=True)
# APP_DEBUG=true → app.config.debug = True

# 方式3:直接设置
app.config.SECRET_KEY = 'my-secret'
app.config.update({
    'DATABASE_URL': 'mysql+aiomysql://user:pass@localhost/mydb'
})

# 读取配置
app.config.SECRET_KEY
app.config.get('DATABASE_URL', 'default')

# 视图中访问
@app.get('/')
async def index(request):
    secret = request.app.config.SECRET_KEY
    return json({'debug': request.app.config.DEBUG})

1.5 应用生命周期

from sanic import Sanic
from tortoise import Tortoise
import redis.asyncio as aioredis

app = Sanic('MyApp')

# 启动前(before_server_start)
@app.before_server_start
async def init_db(app, loop):
    await Tortoise.init(config=TORTOISE_ORM)
    await Tortoise.generate_schemas()
    print('数据库已连接')

@app.before_server_start
async def init_redis(app, loop):
    app.ctx.redis = aioredis.from_url(app.config.REDIS_URL, decode_responses=True)
    print('Redis 已连接')

# 启动后(after_server_start)
@app.after_server_start
async def notify_started(app, loop):
    print(f'服务已启动: http://0.0.0.0:{app.config.get("PORT", 8000)}')

# 停止前(before_server_stop)
@app.before_server_stop
async def close_db(app, loop):
    await Tortoise.close_connections()
    print('数据库连接已关闭')

@app.before_server_stop
async def close_redis(app, loop):
    await app.ctx.redis.aclose()
    print('Redis 连接已关闭')

# 停止后(after_server_stop)
@app.after_server_stop
async def cleanup(app, loop):
    print('清理完成')

app.ctx 是 Sanic 提供的应用级上下文对象,可以挂载任意共享资源(数据库连接、Redis 客户端等)。


二、Pydantic 集成篇

Sanic-ext 内置了对 Pydantic 的集成,可以自动验证和注入请求数据。

2.1 请求验证

# pip install sanic-ext pydantic
from sanic import Sanic
from sanic_ext import Extend
from pydantic import BaseModel, Field, EmailStr
from typing import Optional

app = Sanic('MyApp')
Extend(app)                    # 启用 sanic-ext

# 定义 Pydantic Schema
class UserCreate(BaseModel):
    username: str = Field(..., min_length=3, max_length=50)
    email: EmailStr
    password: str = Field(..., min_length=6)
    age: Optional[int] = Field(None, ge=1, le=150)

# 方式1:使用 sanic-ext 的 @app.ext.openapi.body 自动注入
from sanic_ext import validate

@app.post('/users')
@validate(json=UserCreate)
async def create_user(request, body: UserCreate):
    # body 已经是经过验证的 UserCreate 实例
    return json({'username': body.username, 'email': body.email})

# 方式2:手动解析和验证
@app.post('/users')
async def create_user(request):
    try:
        body = UserCreate(**request.json)
    except ValueError as e:
        return json({'code': 422, 'errors': str(e)}, status=422)
    return json({'username': body.username})

# 查询参数验证
class QueryParams(BaseModel):
    page: int = Field(1, ge=1)
    size: int = Field(20, ge=1, le=100)
    keyword: Optional[str] = None

@app.get('/users')
@validate(query=QueryParams)
async def list_users(request, query: QueryParams):
    return json({'page': query.page, 'size': query.size})

2.2 响应序列化与 OpenAPI 文档

from sanic_ext import openapi

class UserResponse(BaseModel):
    id: int
    username: str
    email: str
    is_active: bool

    model_config = {'from_attributes': True}

@app.get('/users/<user_id:int>')
@openapi.response(200, {'application/json': UserResponse}, description='用户详情')
@openapi.parameter('user_id', int, 'path', description='用户 ID')
async def get_user(request, user_id: int):
    user = await User.get_or_none(id=user_id)
    if not user:
        return json({'code': 404, 'msg': '用户不存在'}, status=404)
    return json(UserResponse.model_validate(user).model_dump())

# 访问自动生成的文档
# Swagger UI: http://localhost:8000/docs
# Redoc:      http://localhost:8000/redoc

三、ORM 篇(Tortoise ORM)

3.1 安装与配置

# database.py
TORTOISE_ORM = {
    'connections': {
        'default': {
            'engine': 'tortoise.backends.mysql',
            'credentials': {
                'host':     'localhost',
                'port':     3306,
                'user':     'root',
                'password': 'password',
                'database': 'mydb',
                'charset':  'utf8mb4',
                'minsize':  5,
                'maxsize':  20,
            }
        }
    },
    'apps': {
        'models': {
            'models': ['app.models', 'aerich.models'],
            'default_connection': 'default',
        }
    },
    'use_tz': False,
    'timezone': 'Asia/Shanghai'
}

# main.py
from tortoise.contrib.sanic import register_tortoise

app = Sanic('MyApp')
register_tortoise(
    app,
    config=TORTOISE_ORM,
    generate_schemas=False    # 生产用 aerich 迁移
)

3.2 模型定义

# app/models/user.py
from tortoise import fields
from tortoise.models import Model

class TimestampMixin:
    created_at = fields.DatetimeField(auto_now_add=True)
    updated_at = fields.DatetimeField(auto_now=True)

class User(TimestampMixin, Model):
    id         = fields.BigIntField(pk=True)
    username   = fields.CharField(max_length=50, unique=True)
    email      = fields.CharField(max_length=120, unique=True, index=True)
    password   = fields.CharField(max_length=256)
    age        = fields.IntField(null=True)
    balance    = fields.DecimalField(max_digits=12, decimal_places=2, default=0)
    is_active  = fields.BooleanField(default=True)
    role       = fields.CharField(max_length=20, default='user')

    class Meta:
        table = 'users'
        ordering = ['-created_at']

    def __str__(self):
        return f'<User {self.username}>'


class Order(TimestampMixin, Model):
    id       = fields.BigIntField(pk=True)
    user     = fields.ForeignKeyField('models.User', related_name='orders', on_delete=fields.CASCADE)
    order_no = fields.CharField(max_length=32, unique=True)
    status   = fields.CharField(max_length=20, default='pending')
    amount   = fields.DecimalField(max_digits=12, decimal_places=2)

    class Meta:
        table = 'orders'
        unique_together = (('user_id', 'order_no'),)
        indexes = (('user_id', 'status'),)

3.3 增删改查

Tortoise ORM 与 FastAPI 教程中的用法完全一致,以下重点列出 Sanic 场景下常用写法。

from tortoise.expressions import Q, F
from tortoise.functions import Count, Sum
from tortoise.transactions import in_transaction

# ---- 增 ----
user = await User.create(username='alice', email='alice@example.com', password='hashed')
user, created = await User.get_or_create(email='alice@example.com', defaults={'username': 'alice'})
await User.bulk_create([User(username='u1', email='u1@a.com', password='h')])

# ---- 查 ----
user   = await User.get(id=1)                    # 不存在抛异常
user   = await User.get_or_none(id=1)            # 不存在返回 None
users  = await User.filter(is_active=True).all()
users  = await User.filter(Q(age__gte=18) | Q(is_vip=True)).all()

# 分页
page, size = 1, 20
users = await User.all().offset((page - 1) * size).limit(size)
total = await User.all().count()

# 关联预加载
orders = await Order.all().select_related('user')          # JOIN
users  = await User.all().prefetch_related('orders')       # 批量子查询

# 聚合
stats = await Order.annotate(cnt=Count('id')).group_by('status').values('status', 'cnt')

# ---- 改 ----
user = await User.get(id=1)
user.username = 'new_name'
await user.save()
await user.save(update_fields=['username'])      # 只更新指定字段

await User.filter(is_active=False).update(deleted=True)
await Order.filter(id=1).update(amount=F('amount') + 10)

# ---- 删 ----
user = await User.get(id=1)
await user.delete()
await User.filter(is_active=False).delete()

# ---- 事务 ----
async with in_transaction():
    user  = await User.create(username='alice', email='a@b.com', password='h')
    order = await Order.create(user=user, order_no='ON001', amount=99.9)

3.4 迁移(Aerich)

pip install aerich
# pyproject.toml
[tool.aerich]
tortoise_orm = "app.database.TORTOISE_ORM"
location = "./migrations"
src_folder = "./."
aerich init -t app.database.TORTOISE_ORM
aerich init-db

aerich migrate --name "add_users_table"
aerich upgrade
aerich downgrade
aerich history

四、中间件篇

4.1 中间件机制

Sanic 中间件分两类:request(请求前)和 response(响应后)。

# 请求中间件
@app.middleware('request')
async def add_request_id(request):
    import uuid
    request.ctx.request_id = str(uuid.uuid4())
    # 返回 None 继续处理;返回 Response 则直接响应,跳过视图

# 响应中间件
@app.middleware('response')
async def add_response_headers(request, response):
    response.headers['X-Request-ID'] = getattr(request.ctx, 'request_id', '')
    response.headers['X-Server'] = 'Sanic'
    # 必须返回 response 或 None

# 计时中间件
import time

@app.middleware('request')
async def start_timer(request):
    request.ctx.start_time = time.time()

@app.middleware('response')
async def add_process_time(request, response):
    duration = time.time() - getattr(request.ctx, 'start_time', time.time())
    response.headers['X-Process-Time'] = f'{duration:.4f}s'
    return response

4.2 认证中间件

@app.middleware('request')
async def auth_middleware(request):
    # 白名单路径跳过验证
    skip_paths = ['/api/auth/login', '/api/auth/register', '/health']
    if request.path in skip_paths:
        return

    token = request.token                  # 自动解析 Bearer Token
    if not token:
        return json({'code': 401, 'msg': '未登录'}, status=401)

    try:
        payload = verify_token(token)
        user = await User.get_or_none(id=payload['sub'])
        if not user or not user.is_active:
            return json({'code': 401, 'msg': '用户不存在或已禁用'}, status=401)
        request.ctx.user = user
    except Exception:
        return json({'code': 401, 'msg': 'Token 无效或已过期'}, status=401)

4.3 CORS

# 使用 sanic-ext(推荐)
from sanic_ext import Extend

app = Sanic('MyApp')
Extend(app, extensions=['cors'], config={
    'CORS_ORIGINS': 'http://localhost:3000,https://yourdomain.com',
    'CORS_METHODS': ['GET', 'POST', 'PUT', 'DELETE', 'OPTIONS'],
    'CORS_ALLOW_HEADERS': ['Content-Type', 'Authorization'],
    'CORS_SUPPORTS_CREDENTIALS': True,
})

# 手动实现 CORS
@app.middleware('request')
async def handle_options(request):
    if request.method == 'OPTIONS':
        response = empty()
        response.headers['Access-Control-Allow-Origin'] = '*'
        response.headers['Access-Control-Allow-Methods'] = 'GET, POST, PUT, DELETE'
        response.headers['Access-Control-Allow-Headers'] = 'Content-Type, Authorization'
        return response

@app.middleware('response')
async def add_cors_headers(request, response):
    response.headers['Access-Control-Allow-Origin'] = '*'
    return response

五、蓝图篇

5.1 基本蓝图

# app/blueprints/user.py
from sanic import Blueprint
from sanic.response import json
from sanic_ext import validate
from app.schemas.user import UserCreate, UserUpdate

user_bp = Blueprint('user', url_prefix='/api/users')

@user_bp.get('/')
async def list_users(request):
    page = int(request.args.get('page', 1))
    size = int(request.args.get('size', 20))
    users = await User.all().offset((page-1)*size).limit(size)
    total = await User.all().count()
    return json({
        'code': 0,
        'data': {
            'list':  [u.__dict__ for u in users],
            'total': total,
            'page':  page,
        }
    })

@user_bp.get('/<user_id:int>')
async def get_user(request, user_id: int):
    user = await User.get_or_none(id=user_id)
    if not user:
        return json({'code': 404, 'msg': '用户不存在'}, status=404)
    return json({'code': 0, 'data': user.__dict__})

@user_bp.post('/')
@validate(json=UserCreate)
async def create_user(request, body: UserCreate):
    user = await User.create(**body.model_dump(exclude={'confirm_password'}))
    return json({'code': 0, 'data': {'id': user.id}}, status=201)

@user_bp.put('/<user_id:int>')
@validate(json=UserUpdate)
async def update_user(request, user_id: int, body: UserUpdate):
    user = await User.get_or_none(id=user_id)
    if not user:
        return json({'code': 404, 'msg': '用户不存在'}, status=404)
    await user.update_from_dict(body.model_dump(exclude_none=True))
    await user.save()
    return json({'code': 0, 'msg': 'ok'})

@user_bp.delete('/<user_id:int>')
async def delete_user(request, user_id: int):
    deleted = await User.filter(id=user_id).delete()
    if not deleted:
        return json({'code': 404, 'msg': '用户不存在'}, status=404)
    return json({'code': 0, 'msg': '删除成功'})
# main.py
from app.blueprints.user import user_bp
from app.blueprints.auth import auth_bp
from app.blueprints.order import order_bp

app.blueprint(user_bp)
app.blueprint(auth_bp)
app.blueprint(order_bp)

5.2 蓝图组

from sanic import Blueprint, BlueprintGroup

auth_bp  = Blueprint('auth',  url_prefix='/auth')
user_bp  = Blueprint('user',  url_prefix='/users')
order_bp = Blueprint('order', url_prefix='/orders')

# 蓝图组(统一前缀 + 统一中间件)
api_group = BlueprintGroup(url_prefix='/api', version=1)
# 访问路径变为 /api/v1/users、/api/v1/orders
api_group.blueprint(user_bp)
api_group.blueprint(order_bp)

app.blueprint(auth_bp)         # /api/auth(独立注册)
app.blueprint(api_group)       # /api/v1/users、/api/v1/orders

5.3 蓝图中间件

# 只对该蓝图生效的中间件
@user_bp.middleware('request')
async def check_user_permission(request):
    user = getattr(request.ctx, 'user', None)
    if not user:
        return json({'code': 401, 'msg': '未登录'}, status=401)

@user_bp.middleware('response')
async def log_user_response(request, response):
    print(f'User API: {request.path} -> {response.status}')
    return response

六、认证与权限篇

6.1 JWT 认证

# utils/jwt.py
from datetime import datetime, timedelta
from jose import jwt, JWTError
from passlib.context import CryptContext

SECRET_KEY = 'your-secret-key'
ALGORITHM  = 'HS256'
pwd_context = CryptContext(schemes=['bcrypt'])

def hash_password(password: str) -> str:
    return pwd_context.hash(password)

def verify_password(plain: str, hashed: str) -> bool:
    return pwd_context.verify(plain, hashed)

def create_access_token(user_id: int, role: str = 'user', expire_minutes: int = 120) -> str:
    expire = datetime.utcnow() + timedelta(minutes=expire_minutes)
    return jwt.encode(
        {'sub': str(user_id), 'role': role, 'exp': expire},
        SECRET_KEY,
        algorithm=ALGORITHM
    )

def decode_token(token: str) -> dict:
    return jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
# blueprints/auth.py
from sanic import Blueprint
from sanic.response import json
from app.models.user import User
from app.utils.jwt import hash_password, verify_password, create_access_token

auth_bp = Blueprint('auth', url_prefix='/api/auth')

@auth_bp.post('/register')
async def register(request):
    data = request.json
    if await User.filter(email=data['email']).exists():
        return json({'code': 409, 'msg': '邮箱已注册'}, status=409)
    user = await User.create(
        username=data['username'],
        email=data['email'],
        password=hash_password(data['password'])
    )
    return json({'code': 0, 'data': {'id': user.id}}, status=201)

@auth_bp.post('/login')
async def login(request):
    data = request.json
    user = await User.get_or_none(email=data['email'])
    if not user or not verify_password(data['password'], user.password):
        return json({'code': 401, 'msg': '账号或密码错误'}, status=401)
    if not user.is_active:
        return json({'code': 403, 'msg': '账号已被禁用'}, status=403)
    token = create_access_token(user.id, user.role)
    return json({'code': 0, 'data': {'access_token': token, 'token_type': 'bearer'}})

6.2 装饰器权限

# utils/decorators.py
from functools import wraps
from sanic.response import json
from app.utils.jwt import decode_token
from app.models.user import User

def login_required(f):
    """需要登录"""
    @wraps(f)
    async def decorated(request, *args, **kwargs):
        if not hasattr(request.ctx, 'user'):
            return json({'code': 401, 'msg': '请先登录'}, status=401)
        return await f(request, *args, **kwargs)
    return decorated

def require_roles(*roles):
    """需要特定角色"""
    def decorator(f):
        @wraps(f)
        @login_required
        async def decorated(request, *args, **kwargs):
            user = request.ctx.user
            if user.role not in roles:
                return json({'code': 403, 'msg': '权限不足'}, status=403)
            return await f(request, *args, **kwargs)
        return decorated
    return decorator

# 使用
@user_bp.delete('/<user_id:int>')
@require_roles('admin', 'superadmin')
async def delete_user(request, user_id: int):
    await User.filter(id=user_id).delete()
    return json({'code': 0, 'msg': '删除成功'})

七、进阶篇

7.1 信号(Signal)

Sanic 23.x 内置信号系统,类似 Django 信号。

from sanic.signals import Event

# 内置信号
# Event.HTTP_LIFECYCLE_BEGIN
# Event.HTTP_LIFECYCLE_COMPLETE
# Event.SERVER_INIT_BEFORE
# Event.SERVER_INIT_AFTER
# Event.SERVER_SHUTDOWN_BEFORE

# 监听内置信号
@app.signal(Event.HTTP_LIFECYCLE_COMPLETE)
async def on_request_complete(conn_info, **kwargs):
    pass

# 自定义信号
@app.signal('user.registered')
async def on_user_registered(user_id: int, **kwargs):
    user = await User.get(id=user_id)
    # 发送欢迎邮件、创建默认数据等
    print(f'新用户注册: {user.username}')

# 发送自定义信号
async def register_user(data):
    user = await User.create(**data)
    await app.dispatch('user.registered', context={'user_id': user.id})
    return user

# 在蓝图中发送
@auth_bp.post('/register')
async def register(request):
    user = await User.create(...)
    await request.app.dispatch('user.registered', context={'user_id': user.id})
    return json({'code': 0, 'data': {'id': user.id}}, status=201)

7.2 后台任务

# 简单后台任务(同进程,非阻塞)
async def send_email(to: str, subject: str):
    await asyncio.sleep(1)  # 模拟耗时
    print(f'发送邮件到 {to}: {subject}')

@app.post('/register')
async def register(request):
    user = await User.create(...)
    # 不等待,后台执行
    app.add_task(send_email(user.email, '欢迎注册'))
    return json({'code': 0, 'msg': '注册成功'}, status=201)

# 带名称的任务(可取消)
app.add_task(periodic_cleanup(), name='cleanup')
await app.cancel_task('cleanup')

# 定时后台任务
@app.before_server_start
async def start_periodic_tasks(app, loop):
    app.add_task(heartbeat_task())

async def heartbeat_task():
    while True:
        await asyncio.sleep(60)
        print('心跳检测...')

7.3 WebSocket

from sanic import Websocket

# 基本 WebSocket
@app.websocket('/ws')
async def ws_handler(request, ws: Websocket):
    while True:
        data = await ws.recv()
        if data is None:
            break
        await ws.send(f'收到: {data}')

# 聊天室(带连接管理)
connected = set()

@app.websocket('/chat/<room_id>')
async def chat(request, ws: Websocket, room_id: str):
    connected.add(ws)
    try:
        async for message in ws:
            # 广播给同房间所有客户端
            for conn in list(connected):
                if conn != ws:
                    await conn.send(f'Room {room_id}: {message}')
    except Exception:
        pass
    finally:
        connected.discard(ws)

# WebSocket 蓝图
ws_bp = Blueprint('ws', url_prefix='/ws')

@ws_bp.websocket('/notify')
async def notify(request, ws: Websocket):
    user = request.ctx.user
    while True:
        msg = await ws.recv()
        await ws.send(json.dumps({'user': user.username, 'msg': msg}))

7.4 流式响应

from sanic.response import ResponseStream

# 流式传输大文件或实时数据
@app.get('/stream')
async def stream_data(request):
    async def generate(response):
        for i in range(100):
            await response.write(f'data: chunk {i}\n\n')
            await asyncio.sleep(0.1)

    return ResponseStream(generate, content_type='text/event-stream')

# Server-Sent Events (SSE)
@app.get('/events')
async def sse(request):
    async def event_stream(response):
        for i in range(10):
            await response.write(f'data: {{"index": {i}}}\n\n')
            await asyncio.sleep(1)

    return ResponseStream(
        event_stream,
        content_type='text/event-stream',
        headers={
            'Cache-Control': 'no-cache',
            'X-Accel-Buffering': 'no'
        }
    )

7.5 文件上传

@app.post('/upload')
async def upload_file(request):
    if not request.files:
        return json({'code': 400, 'msg': '没有上传文件'}, status=400)

    file = request.files.get('file')
    if not file:
        return json({'code': 400, 'msg': '缺少 file 字段'}, status=400)

    # 保存文件
    import os, uuid
    ext = os.path.splitext(file.name)[1]
    filename = f'{uuid.uuid4()}{ext}'
    save_path = f'./uploads/{filename}'

    with open(save_path, 'wb') as f:
        f.write(file.body)

    return json({'code': 0, 'data': {'filename': filename, 'size': len(file.body)}})

# 多文件上传
@app.post('/uploads')
async def upload_multi(request):
    files = request.files.getlist('files')
    saved = []
    for file in files:
        filename = f'{uuid.uuid4()}{os.path.splitext(file.name)[1]}'
        with open(f'./uploads/{filename}', 'wb') as f:
            f.write(file.body)
        saved.append(filename)
    return json({'code': 0, 'data': saved})

7.6 异常处理

from sanic.exceptions import SanicException, NotFound, Unauthorized, Forbidden
from sanic import errorhandler

# 内置异常
# NotFound (404)
# Unauthorized (401)
# Forbidden (403)
# BadRequest (400)
# ServerError (500)
# MethodNotAllowed (405)
# InvalidUsage (400)

# 全局异常处理
@app.exception(NotFound)
async def not_found_handler(request, exception):
    return json({'code': 404, 'msg': '资源不存在'}, status=404)

@app.exception(Unauthorized)
async def unauthorized_handler(request, exception):
    return json({'code': 401, 'msg': '未授权'}, status=401)

@app.exception(Exception)
async def global_exception_handler(request, exception):
    if isinstance(exception, SanicException):
        return json({'code': exception.status_code, 'msg': str(exception)}, status=exception.status_code)
    # 未知异常
    app.logger.exception(f'Unhandled exception: {exception}')
    return json({'code': 500, 'msg': '服务器内部错误'}, status=500)

# 自定义业务异常
class AppException(SanicException):
    def __init__(self, message: str, status_code: int = 400):
        super().__init__(message, status_code=status_code)

@app.exception(AppException)
async def app_exception_handler(request, exception):
    return json({'code': exception.status_code, 'msg': str(exception)}, status=exception.status_code)

# 抛出
raise AppException('用户名已存在', status_code=409)
raise NotFound('用户不存在')
raise Unauthorized('请先登录')

7.7 依赖注入(sanic-ext)

from sanic_ext import inject

# 注册可注入的服务
class EmailService:
    async def send(self, to: str, subject: str, body: str):
        print(f'发送邮件到 {to}')

# 在 lifespan 中注册
@app.before_server_start
async def setup_services(app, loop):
    app.ext.dependency(EmailService())

# 视图中注入
@app.post('/register')
@inject
async def register(request, email_service: EmailService):
    # email_service 自动注入
    await email_service.send(request.json['email'], '欢迎', '...')
    return json({'code': 0})

八、缓存与任务篇

8.1 Redis 缓存

import redis.asyncio as aioredis
import json as json_lib
from functools import wraps

# lifespan 初始化
@app.before_server_start
async def init_redis(app, loop):
    app.ctx.redis = aioredis.from_url(app.config.REDIS_URL, decode_responses=True)

@app.before_server_stop
async def close_redis(app, loop):
    await app.ctx.redis.aclose()

# 封装缓存工具
class RedisCache:
    def __init__(self, redis):
        self.redis = redis

    async def get(self, key: str):
        val = await self.redis.get(key)
        return json_lib.loads(val) if val else None

    async def set(self, key: str, value, ttl: int = 300):
        await self.redis.setex(key, ttl, json_lib.dumps(value))

    async def delete(self, key: str):
        await self.redis.delete(key)

    async def delete_pattern(self, pattern: str):
        keys = await self.redis.keys(pattern)
        if keys:
            await self.redis.delete(*keys)

# 缓存装饰器
def cache(key_func, ttl: int = 300):
    def decorator(f):
        @wraps(f)
        async def wrapper(request, *args, **kwargs):
            redis = RedisCache(request.app.ctx.redis)
            cache_key = key_func(request, *args, **kwargs)
            cached = await redis.get(cache_key)
            if cached is not None:
                return json(cached)
            result = await f(request, *args, **kwargs)
            # 缓存响应内容
            await redis.set(cache_key, result.body.decode(), ttl)
            return result
        return wrapper
    return decorator

# 使用
@app.get('/hot-articles')
@cache(key_func=lambda req: 'hot_articles', ttl=60)
async def hot_articles(request):
    articles = await Article.all().order_by('-views').limit(10).values()
    return json({'code': 0, 'data': articles})

8.2 Celery 异步任务

# tasks.py
import os
os.environ.setdefault('DJANGO_SETTINGS_MODULE', '')  # Sanic 无需设置

from celery import Celery

celery_app = Celery(
    'myapp',
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/0'
)
celery_app.conf.update(
    task_serializer='json',
    result_serializer='json',
    timezone='Asia/Shanghai'
)

@celery_app.task(bind=True, max_retries=3)
def send_email_task(self, to: str, subject: str, body: str):
    try:
        email_service.send(to, subject, body)
    except Exception as exc:
        raise self.retry(exc=exc, countdown=60)

@celery_app.task
def generate_report(user_id: int):
    pass

# Sanic 视图中调用(同步调用,不用 await)
@auth_bp.post('/register')
async def register(request):
    user = await User.create(...)
    send_email_task.delay(user.email, '欢迎注册', '...')
    return json({'code': 0, 'msg': '注册成功'}, status=201)

# 定时任务
from celery.schedules import crontab
celery_app.conf.beat_schedule = {
    'daily-cleanup': {
        'task': 'tasks.generate_report',
        'schedule': crontab(hour=2, minute=0),
        'args': (1,)
    }
}

# 启动
# celery -A tasks.celery_app worker --loglevel=info
# celery -A tasks.celery_app beat --loglevel=info

九、测试篇

9.1 接口测试

# pip install pytest pytest-asyncio sanic-testing

# tests/conftest.py
import pytest
from sanic import Sanic
from app.main import create_app

@pytest.fixture
def app():
    sanic_app = create_app(testing=True)
    return sanic_app

@pytest.fixture
def client(app):
    return app.test_client
# tests/test_user.py
import pytest

@pytest.mark.asyncio
async def test_create_user(app):
    _, response = await app.test_client.post(
        '/api/users/',
        json={
            'username': 'testuser',
            'email': 'test@example.com',
            'password': '123456'
        }
    )
    assert response.status == 201
    data = response.json
    assert data['data']['username'] == 'testuser'

@pytest.mark.asyncio
async def test_login(app):
    # 先注册
    await app.test_client.post('/api/auth/register', json={
        'username': 'u', 'email': 'u@a.com', 'password': '123456'
    })
    # 登录
    _, response = await app.test_client.post('/api/auth/login', json={
        'email': 'u@a.com', 'password': '123456'
    })
    assert response.status == 200
    assert 'access_token' in response.json['data']

@pytest.mark.asyncio
async def test_not_found(app):
    _, response = await app.test_client.get('/api/users/99999')
    assert response.status == 404

@pytest.mark.asyncio
async def test_unauthorized(app):
    _, response = await app.test_client.get('/api/users/')
    assert response.status == 401

9.2 Mock

from unittest.mock import patch, AsyncMock

@pytest.mark.asyncio
async def test_with_mock(app):
    with patch('app.utils.email.send', new_callable=AsyncMock) as mock_send:
        mock_send.return_value = True
        _, response = await app.test_client.post('/api/auth/register', json={
            'username': 'u', 'email': 'u@a.com', 'password': '123456'
        })
        assert mock_send.called
        assert response.status == 201

十、部署篇

10.1 内置服务器 / Uvicorn

# 开发环境(内置服务器,支持热重载)
python -m app.main
# 或
sanic app.main:app --dev --host 0.0.0.0 --port 8000

# 生产环境(内置多 worker)
sanic app.main:app --host 0.0.0.0 --port 8000 --workers 4

# 生产环境(Uvicorn,适合 ASGI 部署)
uvicorn app.main:app --host 0.0.0.0 --port 8000 --workers 4

# 生产环境(Gunicorn + Uvicorn worker)
gunicorn app.main:app \
  --worker-class uvicorn.workers.UvicornWorker \
  --workers 4 \
  --bind 0.0.0.0:8000 \
  --timeout 120 \
  --access-logfile /var/log/app/access.log \
  --error-logfile /var/log/app/error.log

10.2 Nginx 配置

upstream sanic_app {
    server 127.0.0.1:8000;
}

server {
    listen 80;
    server_name yourdomain.com;

    location /static/ {
        alias /path/to/static/;
        expires 30d;
    }

    location /uploads/ {
        alias /path/to/uploads/;
    }

    location / {
        proxy_pass http://sanic_app;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
        proxy_read_timeout 120;
        client_max_body_size 20M;
    }

    # WebSocket 支持
    location /ws/ {
        proxy_pass http://sanic_app;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection 'upgrade';
        proxy_read_timeout 86400;
    }
}

10.3 Docker 部署

# Dockerfile
FROM python:3.11-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

EXPOSE 8000

CMD ["sanic", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]
# docker-compose.yml
version: "3.8"

services:
  api:
    build: .
    ports:
      - "8000:8000"
    environment:
      - SECRET_KEY=${SECRET_KEY}
      - DATABASE_URL=mysql+aiomysql://user:pass@db:3306/mydb
      - REDIS_URL=redis://redis:6379/0
    depends_on:
      - db
      - redis
    restart: unless-stopped

  celery:
    build: .
    command: celery -A app.tasks.celery_app worker --loglevel=info
    environment:
      - DATABASE_URL=mysql+aiomysql://user:pass@db:3306/mydb
      - REDIS_URL=redis://redis:6379/0
    depends_on:
      - db
      - redis

  db:
    image: mysql:8.0
    environment:
      MYSQL_ROOT_PASSWORD: rootpass
      MYSQL_DATABASE: mydb
      MYSQL_USER: user
      MYSQL_PASSWORD: pass
    volumes:
      - mysql_data:/var/lib/mysql

  redis:
    image: redis:7-alpine
    volumes:
      - redis_data:/data

  nginx:
    image: nginx:alpine
    ports:
      - "80:80"
    volumes:
      - ./nginx.conf:/etc/nginx/conf.d/default.conf
    depends_on:
      - api

volumes:
  mysql_data:
  redis_data:

Sanic vs FastAPI vs Flask 对比

Sanic

FastAPI

Flask

性能

极高(原生异步)

极高(原生异步)

中等(同步)

异步支持

原生 async/await

原生 async/await

有限支持

自动文档

sanic-ext 支持

内置 Swagger/ReDoc

需插件

类型验证

Pydantic(sanic-ext)

Pydantic 原生

需插件

WebSocket

内置支持

内置支持

需插件

ORM

Tortoise / SQLAlchemy

Tortoise / SQLModel

SQLAlchemy

生态

中等

快速增长

成熟

学习曲线

低~中

中等

适合场景

高性能 API、实时应用

高性能 API、微服务

小型服务


常用扩展汇总

功能

sanic-ext

官方扩展(CORS/DI/验证/文档)

tortoise-orm

异步 ORM

aerich

Tortoise 数据库迁移

pydantic

数据验证

python-jose

JWT

passlib

密码哈希

redis[asyncio]

异步 Redis

celery

异步任务队列

httpx

异步 HTTP 客户端

sanic-testing

官方测试工具


参考资源