FastAPI

从基础到高级,涵盖路由、请求响应、依赖注入、ORM、认证、中间件、异步、部署等核心知识点。以 FastAPI 0.110+ + Tortoise ORM 0.20+ 为基准。


目录


一、基础篇

1.1 安装与项目结构

# 安装核心依赖
pip install fastapi uvicorn[standard]
pip install tortoise-orm aerich asyncpg aiomysql aiosqlite
pip install python-jose[cryptography] passlib[bcrypt]
pip install python-dotenv pydantic-settings

# 推荐项目结构
myapp/
├── app/
│   ├── __init__.py
│   ├── main.py             # FastAPI 应用入口   ├── config.py           # 配置管理   ├── database.py         # 数据库配置   ├── models/             # Tortoise ORM 模型      ├── __init__.py
│      └── user.py
│   ├── schemas/            # Pydantic 模型      ├── __init__.py
│      └── user.py
│   ├── api/                # 路由      ├── __init__.py
│      ├── auth.py
│      └── user.py
│   ├── dependencies/       # 依赖注入      └── auth.py
│   └── utils/              # 工具函数
├── migrations/             # Aerich 迁移文件
├── tests/
├── .env
├── pyproject.toml          # Aerich 配置
└── requirements.txt

1.2 路由与视图

from fastapi import FastAPI, Path, Query, Body
from typing import Optional

app = FastAPI(title='My API', version='1.0.0')

# 基本路由
@app.get('/')
async def index():
    return {'msg': 'Hello FastAPI'}

# 路径参数
@app.get('/users/{user_id}')
async def get_user(user_id: int):        # 自动类型转换 + 验证
    return {'user_id': user_id}

# 路径参数验证
@app.get('/users/{user_id}')
async def get_user(
    user_id: int = Path(..., ge=1, description='用户ID,必须大于等于1')
):
    ...

# 查询参数
@app.get('/users')
async def list_users(
    page: int = Query(1, ge=1, description='页码'),
    size: int = Query(20, ge=1, le=100),
    keyword: Optional[str] = Query(None, max_length=50)
):
    return {'page': page, 'size': size, 'keyword': keyword}

# HTTP 方法
@app.post('/users')
@app.put('/users/{user_id}')
@app.patch('/users/{user_id}')
@app.delete('/users/{user_id}')

# 返回状态码
from fastapi import status

@app.post('/users', status_code=status.HTTP_201_CREATED)
async def create_user():
    ...

# URL 生成
from fastapi import Request
request.url_for('get_user', user_id=1)

1.3 请求与响应

from fastapi import Request, Response
from fastapi.responses import JSONResponse, RedirectResponse, StreamingResponse, FileResponse

# 请求体(使用 Pydantic 模型)
@app.post('/users')
async def create_user(user: UserCreate):   # 自动解析 JSON body
    ...

# 混合参数
@app.put('/users/{user_id}')
async def update_user(
    user_id: int,                          # 路径参数
    token: str = Header(...),              # 请求头
    user: UserUpdate = Body(...),          # 请求体
    source: str = Query('web')             # 查询参数
):
    ...

# 请求头
from fastapi import Header
async def get_items(user_agent: Optional[str] = Header(None)):
    ...

# Cookie
from fastapi import Cookie
async def get_items(session_id: Optional[str] = Cookie(None)):
    ...

# 文件上传
from fastapi import File, UploadFile

@app.post('/upload')
async def upload(file: UploadFile = File(...)):
    content = await file.read()
    return {'filename': file.filename, 'size': len(content)}

@app.post('/uploads')
async def upload_multi(files: list[UploadFile] = File(...)):
    return [{'name': f.filename} for f in files]

# 响应
return {'key': 'value'}                              # dict 自动转 JSON
return JSONResponse(content={'msg': 'ok'}, status_code=200)
return RedirectResponse(url='/new-path', status_code=302)
return FileResponse('/path/to/file.pdf')

# 自定义响应头
response = JSONResponse(content={'msg': 'ok'})
response.headers['X-Custom'] = 'value'
response.set_cookie('token', 'xxx', httponly=True, max_age=3600)
return response

# 依赖注入方式设置 Cookie
@app.post('/login')
async def login(response: Response):
    response.set_cookie('session', 'value')
    return {'msg': 'ok'}

1.4 配置管理

# config.py(使用 pydantic-settings,推荐)
from pydantic_settings import BaseSettings
from functools import lru_cache

class Settings(BaseSettings):
    app_name: str = 'My FastAPI App'
    debug: bool = False
    secret_key: str
    database_url: str
    redis_url: str = 'redis://localhost:6379/0'
    jwt_expire_minutes: int = 120

    class Config:
        env_file = '.env'           # 自动从 .env 文件加载
        env_file_encoding = 'utf-8'

@lru_cache                          # 单例,避免重复读取
def get_settings() -> Settings:
    return Settings()

settings = get_settings()
# .env
SECRET_KEY=your-secret-key
DATABASE_URL=mysql+aiomysql://user:pass@localhost:3306/mydb
DEBUG=false

1.5 自动文档

FastAPI 自动生成交互式 API 文档,无需额外配置:

Swagger UI:http://localhost:8000/docs
ReDoc:     http://localhost:8000/redoc
OpenAPI JSON:http://localhost:8000/openapi.json
# 自定义文档信息
app = FastAPI(
    title='My API',
    description='API 文档描述,支持 Markdown',
    version='1.0.0',
    docs_url='/docs',
    redoc_url='/redoc',
    openapi_tags=[
        {'name': 'users', 'description': '用户管理'},
        {'name': 'auth',  'description': '认证相关'}
    ]
)

# 路由添加文档信息
@app.get(
    '/users/{user_id}',
    tags=['users'],
    summary='获取用户信息',
    description='根据用户ID获取详细信息',
    response_description='用户详情',
    responses={404: {'description': '用户不存在'}}
)
async def get_user(user_id: int):
    ...

# 隐藏某个路由(不出现在文档中)
@app.get('/internal', include_in_schema=False)
async def internal():
    ...

二、Pydantic 篇

2.1 模型定义

from pydantic import BaseModel, Field, EmailStr, field_validator, model_validator
from typing import Optional
from datetime import datetime

class UserBase(BaseModel):
    username: str = Field(..., min_length=3, max_length=50, description='用户名')
    email: EmailStr
    age: Optional[int] = Field(None, ge=1, le=150)

class UserCreate(UserBase):
    password: str = Field(..., min_length=6)
    confirm_password: str

    @model_validator(mode='after')
    def check_passwords_match(self):
        if self.password != self.confirm_password:
            raise ValueError('两次密码不一致')
        return self

class UserUpdate(BaseModel):
    username: Optional[str] = Field(None, min_length=3, max_length=50)
    email: Optional[EmailStr] = None
    age: Optional[int] = None

class UserResponse(UserBase):
    id: int
    is_active: bool
    created_at: datetime

    model_config = {'from_attributes': True}   # 允许从 ORM 对象构造(Pydantic v2)

2.2 字段类型与验证

from pydantic import BaseModel, Field, field_validator
from typing import List, Dict, Literal
from decimal import Decimal
from enum import Enum

class OrderStatus(str, Enum):
    pending  = 'pending'
    paid     = 'paid'
    shipped  = 'shipped'
    done     = 'done'

class OrderSchema(BaseModel):
    items:   List[str]                          # 列表
    meta:    Dict[str, str] = {}               # 字典
    status:  OrderStatus = OrderStatus.pending  # 枚举
    amount:  Decimal = Field(..., gt=0, decimal_places=2)
    tags:    List[str] = Field(default_factory=list)
    source:  Literal['web', 'app', 'api'] = 'web'  # 字面量类型

    # 字段验证器
    @field_validator('items')
    @classmethod
    def items_not_empty(cls, v):
        if not v:
            raise ValueError('订单商品不能为空')
        return v

    # 字段别名(接收 camelCase 但内部用 snake_case)
    order_no: str = Field(..., alias='orderNo')

    model_config = {
        'populate_by_name': True,   # 允许用字段名或别名
        'str_strip_whitespace': True
    }

2.3 请求/响应模型分离

# schemas/user.py

# 创建请求
class UserCreate(BaseModel):
    username: str
    email: EmailStr
    password: str

# 更新请求(全部可选)
class UserUpdate(BaseModel):
    username: Optional[str] = None
    email: Optional[EmailStr] = None

# 列表响应(不含密码)
class UserBrief(BaseModel):
    id: int
    username: str
    email: str
    model_config = {'from_attributes': True}

# 详情响应
class UserDetail(UserBrief):
    age: Optional[int]
    is_active: bool
    created_at: datetime

# 分页响应(泛型)
from pydantic.generics import GenericModel   # Pydantic v1
from pydantic import BaseModel              # Pydantic v2 直接用 Generic

from typing import TypeVar, Generic
T = TypeVar('T')

class PageResult(BaseModel, Generic[T]):
    list: List[T]
    total: int
    page: int
    pages: int

# 使用
@app.get('/users', response_model=PageResult[UserBrief])
async def list_users():
    ...

三、ORM 篇(Tortoise ORM)

3.1 安装与配置

pip install tortoise-orm aerich
pip install asyncpg          # PostgreSQL
pip install aiomysql         # MySQL
pip install aiosqlite        # SQLite
# database.py
TORTOISE_ORM = {
    'connections': {
        'default': {
            'engine': 'tortoise.backends.mysql',
            'credentials': {
                'host':     'localhost',
                'port':     3306,
                'user':     'root',
                'password': 'password',
                'database': 'mydb',
                'charset':  'utf8mb4',
                'minsize':  5,
                'maxsize':  20,
            }
        }
    },
    'apps': {
        'models': {
            'models': ['app.models', 'aerich.models'],  # 模型所在模块
            'default_connection': 'default',
        }
    },
    'use_tz': False,
    'timezone': 'Asia/Shanghai'
}
# main.py(使用 lifespan 初始化)
from contextlib import asynccontextmanager
from fastapi import FastAPI
from tortoise.contrib.fastapi import RegisterTortoise
from app.database import TORTOISE_ORM

@asynccontextmanager
async def lifespan(app: FastAPI):
    # 启动时初始化数据库
    async with RegisterTortoise(
        app,
        config=TORTOISE_ORM,
        generate_schemas=False,    # 生产环境用 aerich 迁移,不自动建表
        add_exception_handlers=True
    ):
        yield
    # 关闭时自动清理连接

app = FastAPI(lifespan=lifespan)

3.2 模型定义

# app/models/user.py
from tortoise import fields
from tortoise.models import Model
from datetime import datetime

class User(Model):
    id         = fields.BigIntField(pk=True)
    username   = fields.CharField(max_length=50, unique=True)
    email      = fields.CharField(max_length=120, unique=True, index=True)
    password   = fields.CharField(max_length=256)
    age        = fields.IntField(null=True)
    balance    = fields.DecimalField(max_digits=12, decimal_places=2, default=0)
    is_active  = fields.BooleanField(default=True)
    created_at = fields.DatetimeField(auto_now_add=True)   # 创建时自动设置
    updated_at = fields.DatetimeField(auto_now=True)       # 每次更新自动设置

    class Meta:
        table = 'users'
        ordering = ['-created_at']      # 默认排序

    def __str__(self):
        return f'<User {self.username}>'

3.3 字段类型与约束

常用字段类型

字段类型

说明

BigIntField(pk=True)

大整数主键(自增)

UUIDField(pk=True)

UUID 主键

CharField(max_length=n)

VARCHAR

TextField

TEXT

IntField

INT

BigIntField

BIGINT

FloatField

FLOAT

DecimalField(max_digits, decimal_places)

DECIMAL(精确小数)

BooleanField

BOOLEAN

DatetimeField

DATETIME

DateField

DATE

JSONField

JSON

BinaryField

BLOB

字段通用参数

fields.CharField(
    max_length=50,
    null=True,              # 允许 NULL
    blank=True,             # 允许空字符串(逻辑层)
    default='value',        # 默认值(也可以是 callable)
    unique=True,            # 唯一约束
    index=True,             # 创建索引
    description='字段说明'  # 文档注释
)

表级约束与索引

class Order(Model):
    id       = fields.BigIntField(pk=True)
    user_id  = fields.BigIntField(index=True)
    order_no = fields.CharField(max_length=32)
    status   = fields.CharField(max_length=20, default='pending')
    amount   = fields.DecimalField(max_digits=12, decimal_places=2)

    class Meta:
        table = 'orders'
        unique_together = (('user_id', 'order_no'),)   # 联合唯一索引
        indexes = (('user_id', 'status'),)             # 联合普通索引

3.4 关系映射

# 一对多:User → Order
class User(Model):
    id     = fields.BigIntField(pk=True)
    # orders 是反向关系,通过 Order.user 的 related_name 自动创建

class Order(Model):
    id      = fields.BigIntField(pk=True)
    user    = fields.ForeignKeyField('models.User', related_name='orders', on_delete=fields.CASCADE)
    amount  = fields.DecimalField(max_digits=12, decimal_places=2)

# 使用
user = await User.get(id=1)
orders = await user.orders.all()             # 通过 related_name 反向访问
order = await Order.get(id=1)
await order.fetch_related('user')            # 手动加载关联
print(order.user.username)


# 多对多:User ↔ Role
class Role(Model):
    id   = fields.BigIntField(pk=True)
    name = fields.CharField(max_length=50)

class User(Model):
    id    = fields.BigIntField(pk=True)
    roles = fields.ManyToManyField('models.Role', related_name='users', through='user_roles')

# 使用
user = await User.get(id=1)
await user.roles.add(role)                   # 添加关联
await user.roles.remove(role)               # 删除关联
roles = await user.roles.all()              # 获取所有关联
await user.fetch_related('roles')           # 预加载


# 一对一:User → Profile
class Profile(Model):
    id   = fields.BigIntField(pk=True)
    user = fields.OneToOneField('models.User', related_name='profile', on_delete=fields.CASCADE)
    bio  = fields.TextField(null=True)

# 使用
user = await User.get(id=1)
await user.fetch_related('profile')
print(user.profile.bio)

3.5 增删改查

from tortoise.expressions import Q, F
from tortoise.functions import Count, Sum, Avg

# ---- 增 ----
# 方式1:create(直接入库)
user = await User.create(username='alice', email='alice@example.com', password='hashed')

# 方式2:先构造再保存
user = User(username='bob', email='bob@example.com', password='hashed')
await user.save()

# 批量创建
await User.bulk_create([
    User(username='user1', email='u1@example.com', password='h'),
    User(username='user2', email='u2@example.com', password='h'),
])


# ---- 查 ----
# 主键查询
user = await User.get(id=1)              # 不存在抛 DoesNotExist
user = await User.get_or_none(id=1)     # 不存在返回 None
user = await User.get_or_create(username='alice', defaults={'email': 'a@b.com'})

# 过滤查询
users = await User.filter(is_active=True).all()
users = await User.filter(age__gte=18).all()
user  = await User.filter(username='alice').first()

# Q 对象(复杂查询)
users = await User.filter(
    Q(age__gte=18) & Q(is_active=True)          # AND
).all()
users = await User.filter(
    Q(username='alice') | Q(email='alice@a.com')  # OR
).all()
users = await User.filter(~Q(is_active=True)).all()  # NOT

# 字段查询操作符
# __exact      精确匹配(默认)
# __iexact     不区分大小写匹配
# __contains   包含
# __icontains  不区分大小写包含
# __startswith / __endswith
# __gt / __gte / __lt / __lte
# __in         IN 列表
# __isnull     IS NULL
# __range      范围 [a, b]
User.filter(username__icontains='alice')
User.filter(id__in=[1, 2, 3])
User.filter(age__range=(18, 60))
User.filter(deleted_at__isnull=True)

# 排序
users = await User.all().order_by('-created_at', 'username')

# 分页
users = await User.all().offset((page - 1) * size).limit(size)
total = await User.all().count()

# 只查部分字段
users = await User.all().values('id', 'username', 'email')
users = await User.all().values_list('id', 'username', flat=False)

# 关联查询(预加载,避免 N+1)
users = await User.all().prefetch_related('orders', 'roles')
orders = await Order.all().select_related('user')   # JOIN 方式

# 聚合
from tortoise.functions import Count, Sum
count = await User.all().count()
total = await Order.filter(user_id=1).aggregate(total=Sum('amount'))
stats = await Order.annotate(cnt=Count('id')).group_by('status').values('status', 'cnt')


# ---- 改 ----
# 方式1:获取后修改
user = await User.get(id=1)
user.username = 'new_name'
await user.save()

# 方式2:批量更新(高效,不加载对象)
await User.filter(is_active=False).update(deleted=True)

# 方式3:update_or_create
user, created = await User.update_or_create(
    username='alice',
    defaults={'email': 'new@example.com'}
)

# F 表达式(字段引用,避免竞争条件)
await Order.filter(id=1).update(amount=F('amount') + 10)


# ---- 删 ----
user = await User.get(id=1)
await user.delete()

# 批量删除
await User.filter(created_at__lt=cutoff_date).delete()


# ---- 事务 ----
from tortoise.transactions import in_transaction, atomic

# 方式1:async with
async with in_transaction():
    user = await User.create(username='alice', email='a@b.com', password='h')
    await Order.create(user=user, amount=99.9)

# 方式2:装饰器
@atomic()
async def create_order_with_user():
    user = await User.create(...)
    order = await Order.create(user=user, ...)
    return order

3.6 迁移(Aerich)

pip install aerich
# pyproject.toml
[tool.aerich]
tortoise_orm = "app.database.TORTOISE_ORM"
location = "./migrations"
src_folder = "./."
# 初始化(只执行一次)
aerich init -t app.database.TORTOISE_ORM
aerich init-db

# 生成迁移文件
aerich migrate --name "add_users_table"

# 执行迁移
aerich upgrade

# 回滚
aerich downgrade

# 查看迁移历史
aerich history

# 查看当前版本
aerich heads

四、依赖注入篇

4.1 基本依赖

from fastapi import Depends

# 函数依赖
async def common_params(
    page: int = Query(1, ge=1),
    size: int = Query(20, ge=1, le=100)
) -> dict:
    return {'offset': (page - 1) * size, 'limit': size}

@app.get('/users')
async def list_users(params: dict = Depends(common_params)):
    offset, limit = params['offset'], params['limit']
    ...

# 类依赖(有状态)
class Pagination:
    def __init__(self, page: int = 1, size: int = 20):
        self.offset = (page - 1) * size
        self.limit = size

@app.get('/orders')
async def list_orders(pagination: Pagination = Depends(Pagination)):
    ...

4.2 依赖链

# 依赖可以依赖其他依赖
async def get_token(authorization: str = Header(...)) -> str:
    if not authorization.startswith('Bearer '):
        raise HTTPException(status_code=401)
    return authorization[7:]

async def get_current_user(token: str = Depends(get_token)) -> User:
    user_id = verify_token(token)
    user = await User.get_or_none(id=user_id)
    if not user:
        raise HTTPException(status_code=401)
    return user

async def get_active_user(user: User = Depends(get_current_user)) -> User:
    if not user.is_active:
        raise HTTPException(status_code=403, detail='用户已被禁用')
    return user

@app.get('/profile')
async def profile(user: User = Depends(get_active_user)):
    return user

4.3 分页依赖

# dependencies/pagination.py
from dataclasses import dataclass
from fastapi import Query

@dataclass
class PageParams:
    page: int = Query(1, ge=1, description='页码')
    size: int = Query(20, ge=1, le=100, description='每页数量')

    @property
    def offset(self):
        return (self.page - 1) * self.size

    @property
    def limit(self):
        return self.size

# 使用
@app.get('/users')
async def list_users(pagination: PageParams = Depends()):
    users = await User.all().offset(pagination.offset).limit(pagination.limit)
    total = await User.all().count()
    return {
        'list': users,
        'total': total,
        'page': pagination.page,
        'pages': (total + pagination.size - 1) // pagination.size
    }

五、认证与权限篇

5.1 OAuth2 + JWT

# pip install python-jose[cryptography] passlib[bcrypt]
from datetime import datetime, timedelta
from jose import jwt, JWTError
from passlib.context import CryptContext
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm

SECRET_KEY = settings.secret_key
ALGORITHM  = 'HS256'
pwd_context = CryptContext(schemes=['bcrypt'])
oauth2_scheme = OAuth2PasswordBearer(tokenUrl='/api/auth/login')

def hash_password(password: str) -> str:
    return pwd_context.hash(password)

def verify_password(plain: str, hashed: str) -> bool:
    return pwd_context.verify(plain, hashed)

def create_access_token(data: dict, expires_delta: timedelta = None) -> str:
    to_encode = data.copy()
    expire = datetime.utcnow() + (expires_delta or timedelta(minutes=120))
    to_encode.update({'exp': expire})
    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)

# 登录接口
@router.post('/login')
async def login(form: OAuth2PasswordRequestForm = Depends()):
    user = await User.get_or_none(email=form.username)
    if not user or not verify_password(form.password, user.password):
        raise HTTPException(status_code=401, detail='账号或密码错误')

    access_token = create_access_token({'sub': str(user.id), 'role': user.role})
    return {'access_token': access_token, 'token_type': 'bearer'}

5.2 依赖注入认证

# dependencies/auth.py
from fastapi import Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer
from jose import jwt, JWTError

oauth2_scheme = OAuth2PasswordBearer(tokenUrl='/api/auth/login')

async def get_current_user(token: str = Depends(oauth2_scheme)) -> User:
    credentials_exception = HTTPException(
        status_code=401,
        detail='无效的认证凭据',
        headers={'WWW-Authenticate': 'Bearer'}
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        user_id: str = payload.get('sub')
        if user_id is None:
            raise credentials_exception
    except JWTError:
        raise credentials_exception

    user = await User.get_or_none(id=int(user_id))
    if not user:
        raise credentials_exception
    return user

async def get_active_user(user: User = Depends(get_current_user)) -> User:
    if not user.is_active:
        raise HTTPException(status_code=403, detail='账号已被禁用')
    return user

# 使用
@router.get('/profile')
async def profile(user: User = Depends(get_active_user)):
    return UserDetail.model_validate(user)

5.3 权限控制

# 基于角色的权限装饰器
from functools import wraps

def require_roles(*roles: str):
    async def dependency(user: User = Depends(get_active_user)):
        if user.role not in roles:
            raise HTTPException(status_code=403, detail='权限不足')
        return user
    return Depends(dependency)

# 使用
@router.get('/admin/users')
async def admin_list(user: User = require_roles('admin', 'superadmin')):
    ...

# 更灵活的权限类
class PermissionChecker:
    def __init__(self, required_permission: str):
        self.required = required_permission

    async def __call__(self, user: User = Depends(get_active_user)):
        if self.required not in user.permissions:
            raise HTTPException(status_code=403, detail='权限不足')
        return user

@router.delete('/users/{user_id}')
async def delete_user(
    user_id: int,
    _: User = Depends(PermissionChecker('user:delete'))
):
    ...

5.4 API Key 认证

from fastapi.security import APIKeyHeader

api_key_header = APIKeyHeader(name='X-API-Key', auto_error=False)

async def verify_api_key(api_key: str = Depends(api_key_header)):
    if api_key != settings.api_key:
        raise HTTPException(status_code=403, detail='无效的 API Key')
    return api_key

@router.get('/webhook', dependencies=[Depends(verify_api_key)])
async def webhook():
    ...

六、进阶篇

6.1 路由分组(APIRouter)

# app/api/user.py
from fastapi import APIRouter

router = APIRouter(
    prefix='/api/users',
    tags=['users'],
    dependencies=[Depends(get_active_user)]    # 整个路由组都需要认证
)

@router.get('/')
async def list_users():
    ...

@router.get('/{user_id}')
async def get_user(user_id: int):
    ...
# main.py
from app.api import auth, user, order

app.include_router(auth.router)
app.include_router(user.router)
app.include_router(order.router)

6.2 应用工厂模式

# app/main.py
from fastapi import FastAPI
from contextlib import asynccontextmanager
from tortoise.contrib.fastapi import RegisterTortoise

def create_app() -> FastAPI:
    @asynccontextmanager
    async def lifespan(app: FastAPI):
        async with RegisterTortoise(app, config=TORTOISE_ORM, generate_schemas=False):
            yield

    app = FastAPI(
        title='My API',
        lifespan=lifespan
    )

    # 注册路由
    from app.api import auth, user, order
    app.include_router(auth.router)
    app.include_router(user.router)
    app.include_router(order.router)

    # 注册中间件
    register_middlewares(app)

    return app

app = create_app()

6.3 中间件

from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.gzip import GZipMiddleware
from starlette.middleware.base import BaseHTTPMiddleware
import time

# CORS
app.add_middleware(
    CORSMiddleware,
    allow_origins=['http://localhost:3000', 'https://yourdomain.com'],
    allow_credentials=True,
    allow_methods=['*'],
    allow_headers=['*']
)

# GZIP 压缩
app.add_middleware(GZipMiddleware, minimum_size=1000)

# 自定义中间件
class RequestLogMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request, call_next):
        start = time.time()
        response = await call_next(request)
        duration = time.time() - start
        response.headers['X-Process-Time'] = f'{duration:.4f}'
        return response

app.add_middleware(RequestLogMiddleware)

# 限流中间件(使用 slowapi)
# pip install slowapi
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address

limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)

@app.get('/api/data')
@limiter.limit('100/minute')
async def data(request: Request):
    ...

6.4 事件钩子(lifespan)

# FastAPI 0.93+ 推荐用 lifespan 代替 on_event
from contextlib import asynccontextmanager

@asynccontextmanager
async def lifespan(app: FastAPI):
    # === 启动时 ===
    print('应用启动')
    await init_redis()
    await init_db()

    yield    # 应用运行中

    # === 关闭时 ===
    print('应用关闭')
    await close_redis()

app = FastAPI(lifespan=lifespan)

6.5 后台任务

from fastapi import BackgroundTasks

# 轻量后台任务(同进程,适合快速操作)
def send_email(email: str, message: str):
    # 发送邮件...
    pass

@app.post('/register')
async def register(user_data: UserCreate, background_tasks: BackgroundTasks):
    user = await User.create(**user_data.model_dump())
    background_tasks.add_task(send_email, user.email, '欢迎注册!')
    return {'msg': '注册成功'}

# 耗时任务用 Celery(见第八章)

6.6 WebSocket

from fastapi import WebSocket, WebSocketDisconnect
from typing import List

class ConnectionManager:
    def __init__(self):
        self.active: List[WebSocket] = []

    async def connect(self, ws: WebSocket):
        await ws.accept()
        self.active.append(ws)

    def disconnect(self, ws: WebSocket):
        self.active.remove(ws)

    async def broadcast(self, message: str):
        for ws in self.active:
            await ws.send_text(message)

manager = ConnectionManager()

@app.websocket('/ws/{room_id}')
async def websocket_endpoint(ws: WebSocket, room_id: str):
    await manager.connect(ws)
    try:
        while True:
            data = await ws.receive_text()
            await manager.broadcast(f'Room {room_id}: {data}')
    except WebSocketDisconnect:
        manager.disconnect(ws)

6.7 异常处理

from fastapi import Request
from fastapi.responses import JSONResponse
from fastapi.exceptions import RequestValidationError

# 全局异常处理
@app.exception_handler(404)
async def not_found_handler(request: Request, exc):
    return JSONResponse({'code': 404, 'msg': '资源不存在'}, status_code=404)

@app.exception_handler(500)
async def server_error_handler(request: Request, exc):
    return JSONResponse({'code': 500, 'msg': '服务器内部错误'}, status_code=500)

# 请求验证错误(Pydantic 验证失败)
@app.exception_handler(RequestValidationError)
async def validation_error_handler(request: Request, exc: RequestValidationError):
    errors = [{'field': e['loc'][-1], 'msg': e['msg']} for e in exc.errors()]
    return JSONResponse({'code': 422, 'msg': '参数验证失败', 'errors': errors}, status_code=422)

# 自定义业务异常
class AppException(Exception):
    def __init__(self, message: str, code: int = 400):
        self.message = message
        self.code = code

@app.exception_handler(AppException)
async def app_exception_handler(request: Request, exc: AppException):
    return JSONResponse({'code': exc.code, 'msg': exc.message}, status_code=exc.code)

# 抛出
raise AppException('用户名已存在', code=409)
raise HTTPException(status_code=404, detail='用户不存在')

七、异步篇

7.1 async/await 基础

import asyncio

# 同步函数(不能 await,不要在里面做 IO 操作)
def sync_func():
    return 'sync'

# 异步函数(可以 await,能做异步 IO)
async def async_func():
    await asyncio.sleep(1)
    return 'async'

# FastAPI 视图推荐全部用 async def
@app.get('/data')
async def get_data():
    result = await fetch_from_db()
    return result

# 同步阻塞操作(如 CPU 密集计算)放到线程池
import asyncio
from fastapi.concurrency import run_in_threadpool

@app.get('/heavy')
async def heavy_compute():
    result = await run_in_threadpool(sync_heavy_function, arg1, arg2)
    return result

7.2 并发任务

import asyncio

# 并行执行多个协程
@app.get('/dashboard')
async def dashboard(user: User = Depends(get_active_user)):
    # 同时查询,不按顺序等待
    orders, profile, stats = await asyncio.gather(
        Order.filter(user_id=user.id).count(),
        Profile.get_or_none(user_id=user.id),
        get_user_stats(user.id)
    )
    return {'orders': orders, 'profile': profile, 'stats': stats}

# 带超时
try:
    result = await asyncio.wait_for(some_coroutine(), timeout=5.0)
except asyncio.TimeoutError:
    raise HTTPException(status_code=504, detail='请求超时')

7.3 异步 HTTP 请求

# pip install httpx
import httpx

# 作为应用启动时创建,关闭时销毁
@asynccontextmanager
async def lifespan(app: FastAPI):
    app.state.http_client = httpx.AsyncClient(timeout=10.0)
    yield
    await app.state.http_client.aclose()

# 在视图中使用
@app.get('/external')
async def call_external(request: Request):
    client = request.app.state.http_client
    response = await client.get('https://api.example.com/data')
    return response.json()

八、缓存与任务篇

8.1 Redis 缓存

# pip install redis[asyncio]
import redis.asyncio as aioredis
import json

# 初始化(lifespan 中)
@asynccontextmanager
async def lifespan(app: FastAPI):
    app.state.redis = aioredis.from_url(settings.redis_url, decode_responses=True)
    yield
    await app.state.redis.aclose()

# 缓存装饰器
from functools import wraps

def cache(key_prefix: str, ttl: int = 300):
    def decorator(func):
        @wraps(func)
        async def wrapper(request: Request, *args, **kwargs):
            redis = request.app.state.redis
            cache_key = f'{key_prefix}:{":".join(str(a) for a in args)}'
            cached = await redis.get(cache_key)
            if cached:
                return json.loads(cached)
            result = await func(request, *args, **kwargs)
            await redis.setex(cache_key, ttl, json.dumps(result))
            return result
        return wrapper
    return decorator

# 手动缓存
@app.get('/hot-articles')
async def hot_articles(request: Request):
    redis = request.app.state.redis
    cached = await redis.get('hot_articles')
    if cached:
        return json.loads(cached)
    articles = await Article.all().order_by('-views').limit(10).values()
    await redis.setex('hot_articles', 60, json.dumps(articles))
    return articles

8.2 Celery 异步任务

# celery_app.py
from celery import Celery

celery = Celery(
    'myapp',
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/0'
)
celery.conf.update(
    task_serializer='json',
    result_serializer='json',
    timezone='Asia/Shanghai'
)

# tasks.py
@celery.task(bind=True, max_retries=3)
def send_email_task(self, to: str, subject: str, body: str):
    try:
        email_service.send(to, subject, body)
    except Exception as exc:
        raise self.retry(exc=exc, countdown=60)

# FastAPI 视图中调用(同步调用 Celery,非 await)
@app.post('/register')
async def register(user_data: UserCreate):
    user = await User.create(**user_data.model_dump())
    send_email_task.delay(user.email, '欢迎注册', '...')
    return {'msg': 'ok'}

# 启动
# celery -A tasks worker --loglevel=info -c 4

九、测试篇

9.1 接口测试

# pip install pytest pytest-asyncio httpx
# tests/conftest.py
import pytest
from httpx import AsyncClient, ASGITransport
from tortoise.contrib.testing import initializer, finalizer
from app.main import app

@pytest.fixture(scope='session')
def anyio_backend():
    return 'asyncio'

@pytest.fixture(autouse=True)
def init_db():
    initializer(['app.models'])
    yield
    finalizer()

@pytest.fixture
async def client():
    async with AsyncClient(
        transport=ASGITransport(app=app),
        base_url='http://test'
    ) as client:
        yield client
# tests/test_user.py
import pytest

@pytest.mark.anyio
async def test_create_user(client):
    response = await client.post('/api/users/', json={
        'username': 'testuser',
        'email': 'test@example.com',
        'password': '123456',
        'confirm_password': '123456'
    })
    assert response.status_code == 201
    data = response.json()
    assert data['data']['username'] == 'testuser'

@pytest.mark.anyio
async def test_login(client):
    await client.post('/api/users/', json={
        'username': 'u', 'email': 'u@a.com',
        'password': '123456', 'confirm_password': '123456'
    })
    response = await client.post('/api/auth/login', data={
        'username': 'u@a.com', 'password': '123456'
    })
    assert response.status_code == 200
    assert 'access_token' in response.json()

@pytest.mark.anyio
async def test_get_user_not_found(client):
    response = await client.get('/api/users/99999')
    assert response.status_code == 404

9.2 Mock

from unittest.mock import patch, AsyncMock

@pytest.mark.anyio
async def test_with_mock(client):
    with patch('app.services.email.send', new_callable=AsyncMock) as mock_send:
        mock_send.return_value = True
        response = await client.post('/api/register', json={...})
        assert mock_send.called

十、部署篇

10.1 Uvicorn / Gunicorn

# 开发环境(热重载)
uvicorn app.main:app --reload --host 0.0.0.0 --port 8000

# 生产环境(Uvicorn 多 worker)
uvicorn app.main:app --host 0.0.0.0 --port 8000 --workers 4

# 生产环境(Gunicorn + Uvicorn worker,推荐)
gunicorn app.main:app \
  --worker-class uvicorn.workers.UvicornWorker \
  --workers 4 \
  --bind 0.0.0.0:8000 \
  --timeout 120 \
  --max-requests 1000 \
  --max-requests-jitter 100 \
  --access-logfile /var/log/app/access.log \
  --error-logfile /var/log/app/error.log

10.2 Nginx 配置

upstream fastapi_app {
    server 127.0.0.1:8000;
}

server {
    listen 80;
    server_name yourdomain.com;

    location /static/ {
        alias /path/to/static/;
        expires 30d;
    }

    location / {
        proxy_pass http://fastapi_app;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
        proxy_read_timeout 120;
        client_max_body_size 20M;
    }

    # WebSocket 支持
    location /ws/ {
        proxy_pass http://fastapi_app;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection 'upgrade';
    }
}

10.3 Docker 部署

# Dockerfile
FROM python:3.11-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

EXPOSE 8000

CMD ["gunicorn", "app.main:app",
     "--worker-class", "uvicorn.workers.UvicornWorker",
     "--workers", "4",
     "--bind", "0.0.0.0:8000"]
# docker-compose.yml
version: "3.8"

services:
  api:
    build: .
    ports:
      - "8000:8000"
    environment:
      - DATABASE_URL=mysql+aiomysql://user:pass@db:3306/mydb
      - REDIS_URL=redis://redis:6379/0
      - SECRET_KEY=${SECRET_KEY}
    depends_on:
      - db
      - redis
    restart: unless-stopped

  db:
    image: mysql:8.0
    environment:
      MYSQL_ROOT_PASSWORD: rootpass
      MYSQL_DATABASE: mydb
      MYSQL_USER: user
      MYSQL_PASSWORD: pass
    volumes:
      - mysql_data:/var/lib/mysql

  redis:
    image: redis:7-alpine
    volumes:
      - redis_data:/data

  nginx:
    image: nginx:alpine
    ports:
      - "80:80"
    volumes:
      - ./nginx.conf:/etc/nginx/conf.d/default.conf
    depends_on:
      - api

volumes:
  mysql_data:
  redis_data:

FastAPI vs Flask 对比

FastAPI

Flask

性能

极高(异步)

中等(同步)

类型检查

Pydantic 原生支持

需额外库

自动文档

内置 Swagger/ReDoc

需插件

异步支持

原生 async/await

Flask 2.0+ 有限支持

学习曲线

中等

生态成熟度

较新但增长快

成熟

ORM

推荐 Tortoise / SQLModel

Flask-SQLAlchemy

适合场景

高性能 API、微服务

传统 Web、快速原型


常用扩展汇总

功能

tortoise-orm

异步 ORM

aerich

Tortoise 数据库迁移

pydantic-settings

配置管理

python-jose

JWT

passlib

密码哈希

httpx

异步 HTTP 客户端

redis[asyncio]

异步 Redis

celery

异步任务队列

slowapi

接口限流

fastapi-cors

跨域(内置 middleware)

python-multipart

表单/文件上传

uvicorn

ASGI 服务器

gunicorn

进程管理器


参考资源