FastAPI¶
从基础到高级,涵盖路由、请求响应、依赖注入、ORM、认证、中间件、异步、部署等核心知识点。以 FastAPI 0.110+ + Tortoise ORM 0.20+ 为基准。
目录¶
一、基础篇¶
1.1 安装与项目结构¶
# 安装核心依赖
pip install fastapi uvicorn[standard]
pip install tortoise-orm aerich asyncpg aiomysql aiosqlite
pip install python-jose[cryptography] passlib[bcrypt]
pip install python-dotenv pydantic-settings
# 推荐项目结构
myapp/
├── app/
│ ├── __init__.py
│ ├── main.py # FastAPI 应用入口
│ ├── config.py # 配置管理
│ ├── database.py # 数据库配置
│ ├── models/ # Tortoise ORM 模型
│ │ ├── __init__.py
│ │ └── user.py
│ ├── schemas/ # Pydantic 模型
│ │ ├── __init__.py
│ │ └── user.py
│ ├── api/ # 路由
│ │ ├── __init__.py
│ │ ├── auth.py
│ │ └── user.py
│ ├── dependencies/ # 依赖注入
│ │ └── auth.py
│ └── utils/ # 工具函数
├── migrations/ # Aerich 迁移文件
├── tests/
├── .env
├── pyproject.toml # Aerich 配置
└── requirements.txt
1.2 路由与视图¶
from fastapi import FastAPI, Path, Query, Body
from typing import Optional
app = FastAPI(title='My API', version='1.0.0')
# 基本路由
@app.get('/')
async def index():
return {'msg': 'Hello FastAPI'}
# 路径参数
@app.get('/users/{user_id}')
async def get_user(user_id: int): # 自动类型转换 + 验证
return {'user_id': user_id}
# 路径参数验证
@app.get('/users/{user_id}')
async def get_user(
user_id: int = Path(..., ge=1, description='用户ID,必须大于等于1')
):
...
# 查询参数
@app.get('/users')
async def list_users(
page: int = Query(1, ge=1, description='页码'),
size: int = Query(20, ge=1, le=100),
keyword: Optional[str] = Query(None, max_length=50)
):
return {'page': page, 'size': size, 'keyword': keyword}
# HTTP 方法
@app.post('/users')
@app.put('/users/{user_id}')
@app.patch('/users/{user_id}')
@app.delete('/users/{user_id}')
# 返回状态码
from fastapi import status
@app.post('/users', status_code=status.HTTP_201_CREATED)
async def create_user():
...
# URL 生成
from fastapi import Request
request.url_for('get_user', user_id=1)
1.3 请求与响应¶
from fastapi import Request, Response
from fastapi.responses import JSONResponse, RedirectResponse, StreamingResponse, FileResponse
# 请求体(使用 Pydantic 模型)
@app.post('/users')
async def create_user(user: UserCreate): # 自动解析 JSON body
...
# 混合参数
@app.put('/users/{user_id}')
async def update_user(
user_id: int, # 路径参数
token: str = Header(...), # 请求头
user: UserUpdate = Body(...), # 请求体
source: str = Query('web') # 查询参数
):
...
# 请求头
from fastapi import Header
async def get_items(user_agent: Optional[str] = Header(None)):
...
# Cookie
from fastapi import Cookie
async def get_items(session_id: Optional[str] = Cookie(None)):
...
# 文件上传
from fastapi import File, UploadFile
@app.post('/upload')
async def upload(file: UploadFile = File(...)):
content = await file.read()
return {'filename': file.filename, 'size': len(content)}
@app.post('/uploads')
async def upload_multi(files: list[UploadFile] = File(...)):
return [{'name': f.filename} for f in files]
# 响应
return {'key': 'value'} # dict 自动转 JSON
return JSONResponse(content={'msg': 'ok'}, status_code=200)
return RedirectResponse(url='/new-path', status_code=302)
return FileResponse('/path/to/file.pdf')
# 自定义响应头
response = JSONResponse(content={'msg': 'ok'})
response.headers['X-Custom'] = 'value'
response.set_cookie('token', 'xxx', httponly=True, max_age=3600)
return response
# 依赖注入方式设置 Cookie
@app.post('/login')
async def login(response: Response):
response.set_cookie('session', 'value')
return {'msg': 'ok'}
1.4 配置管理¶
# config.py(使用 pydantic-settings,推荐)
from pydantic_settings import BaseSettings
from functools import lru_cache
class Settings(BaseSettings):
app_name: str = 'My FastAPI App'
debug: bool = False
secret_key: str
database_url: str
redis_url: str = 'redis://localhost:6379/0'
jwt_expire_minutes: int = 120
class Config:
env_file = '.env' # 自动从 .env 文件加载
env_file_encoding = 'utf-8'
@lru_cache # 单例,避免重复读取
def get_settings() -> Settings:
return Settings()
settings = get_settings()
# .env
SECRET_KEY=your-secret-key
DATABASE_URL=mysql+aiomysql://user:pass@localhost:3306/mydb
DEBUG=false
1.5 自动文档¶
FastAPI 自动生成交互式 API 文档,无需额外配置:
Swagger UI:http://localhost:8000/docs
ReDoc: http://localhost:8000/redoc
OpenAPI JSON:http://localhost:8000/openapi.json
# 自定义文档信息
app = FastAPI(
title='My API',
description='API 文档描述,支持 Markdown',
version='1.0.0',
docs_url='/docs',
redoc_url='/redoc',
openapi_tags=[
{'name': 'users', 'description': '用户管理'},
{'name': 'auth', 'description': '认证相关'}
]
)
# 路由添加文档信息
@app.get(
'/users/{user_id}',
tags=['users'],
summary='获取用户信息',
description='根据用户ID获取详细信息',
response_description='用户详情',
responses={404: {'description': '用户不存在'}}
)
async def get_user(user_id: int):
...
# 隐藏某个路由(不出现在文档中)
@app.get('/internal', include_in_schema=False)
async def internal():
...
二、Pydantic 篇¶
2.1 模型定义¶
from pydantic import BaseModel, Field, EmailStr, field_validator, model_validator
from typing import Optional
from datetime import datetime
class UserBase(BaseModel):
username: str = Field(..., min_length=3, max_length=50, description='用户名')
email: EmailStr
age: Optional[int] = Field(None, ge=1, le=150)
class UserCreate(UserBase):
password: str = Field(..., min_length=6)
confirm_password: str
@model_validator(mode='after')
def check_passwords_match(self):
if self.password != self.confirm_password:
raise ValueError('两次密码不一致')
return self
class UserUpdate(BaseModel):
username: Optional[str] = Field(None, min_length=3, max_length=50)
email: Optional[EmailStr] = None
age: Optional[int] = None
class UserResponse(UserBase):
id: int
is_active: bool
created_at: datetime
model_config = {'from_attributes': True} # 允许从 ORM 对象构造(Pydantic v2)
2.2 字段类型与验证¶
from pydantic import BaseModel, Field, field_validator
from typing import List, Dict, Literal
from decimal import Decimal
from enum import Enum
class OrderStatus(str, Enum):
pending = 'pending'
paid = 'paid'
shipped = 'shipped'
done = 'done'
class OrderSchema(BaseModel):
items: List[str] # 列表
meta: Dict[str, str] = {} # 字典
status: OrderStatus = OrderStatus.pending # 枚举
amount: Decimal = Field(..., gt=0, decimal_places=2)
tags: List[str] = Field(default_factory=list)
source: Literal['web', 'app', 'api'] = 'web' # 字面量类型
# 字段验证器
@field_validator('items')
@classmethod
def items_not_empty(cls, v):
if not v:
raise ValueError('订单商品不能为空')
return v
# 字段别名(接收 camelCase 但内部用 snake_case)
order_no: str = Field(..., alias='orderNo')
model_config = {
'populate_by_name': True, # 允许用字段名或别名
'str_strip_whitespace': True
}
2.3 请求/响应模型分离¶
# schemas/user.py
# 创建请求
class UserCreate(BaseModel):
username: str
email: EmailStr
password: str
# 更新请求(全部可选)
class UserUpdate(BaseModel):
username: Optional[str] = None
email: Optional[EmailStr] = None
# 列表响应(不含密码)
class UserBrief(BaseModel):
id: int
username: str
email: str
model_config = {'from_attributes': True}
# 详情响应
class UserDetail(UserBrief):
age: Optional[int]
is_active: bool
created_at: datetime
# 分页响应(泛型)
from pydantic.generics import GenericModel # Pydantic v1
from pydantic import BaseModel # Pydantic v2 直接用 Generic
from typing import TypeVar, Generic
T = TypeVar('T')
class PageResult(BaseModel, Generic[T]):
list: List[T]
total: int
page: int
pages: int
# 使用
@app.get('/users', response_model=PageResult[UserBrief])
async def list_users():
...
三、ORM 篇(Tortoise ORM)¶
3.1 安装与配置¶
pip install tortoise-orm aerich
pip install asyncpg # PostgreSQL
pip install aiomysql # MySQL
pip install aiosqlite # SQLite
# database.py
TORTOISE_ORM = {
'connections': {
'default': {
'engine': 'tortoise.backends.mysql',
'credentials': {
'host': 'localhost',
'port': 3306,
'user': 'root',
'password': 'password',
'database': 'mydb',
'charset': 'utf8mb4',
'minsize': 5,
'maxsize': 20,
}
}
},
'apps': {
'models': {
'models': ['app.models', 'aerich.models'], # 模型所在模块
'default_connection': 'default',
}
},
'use_tz': False,
'timezone': 'Asia/Shanghai'
}
# main.py(使用 lifespan 初始化)
from contextlib import asynccontextmanager
from fastapi import FastAPI
from tortoise.contrib.fastapi import RegisterTortoise
from app.database import TORTOISE_ORM
@asynccontextmanager
async def lifespan(app: FastAPI):
# 启动时初始化数据库
async with RegisterTortoise(
app,
config=TORTOISE_ORM,
generate_schemas=False, # 生产环境用 aerich 迁移,不自动建表
add_exception_handlers=True
):
yield
# 关闭时自动清理连接
app = FastAPI(lifespan=lifespan)
3.2 模型定义¶
# app/models/user.py
from tortoise import fields
from tortoise.models import Model
from datetime import datetime
class User(Model):
id = fields.BigIntField(pk=True)
username = fields.CharField(max_length=50, unique=True)
email = fields.CharField(max_length=120, unique=True, index=True)
password = fields.CharField(max_length=256)
age = fields.IntField(null=True)
balance = fields.DecimalField(max_digits=12, decimal_places=2, default=0)
is_active = fields.BooleanField(default=True)
created_at = fields.DatetimeField(auto_now_add=True) # 创建时自动设置
updated_at = fields.DatetimeField(auto_now=True) # 每次更新自动设置
class Meta:
table = 'users'
ordering = ['-created_at'] # 默认排序
def __str__(self):
return f'<User {self.username}>'
3.3 字段类型与约束¶
常用字段类型
字段类型 |
说明 |
|---|---|
|
大整数主键(自增) |
|
UUID 主键 |
|
VARCHAR |
|
TEXT |
|
INT |
|
BIGINT |
|
FLOAT |
|
DECIMAL(精确小数) |
|
BOOLEAN |
|
DATETIME |
|
DATE |
|
JSON |
|
BLOB |
字段通用参数
fields.CharField(
max_length=50,
null=True, # 允许 NULL
blank=True, # 允许空字符串(逻辑层)
default='value', # 默认值(也可以是 callable)
unique=True, # 唯一约束
index=True, # 创建索引
description='字段说明' # 文档注释
)
表级约束与索引
class Order(Model):
id = fields.BigIntField(pk=True)
user_id = fields.BigIntField(index=True)
order_no = fields.CharField(max_length=32)
status = fields.CharField(max_length=20, default='pending')
amount = fields.DecimalField(max_digits=12, decimal_places=2)
class Meta:
table = 'orders'
unique_together = (('user_id', 'order_no'),) # 联合唯一索引
indexes = (('user_id', 'status'),) # 联合普通索引
3.4 关系映射¶
# 一对多:User → Order
class User(Model):
id = fields.BigIntField(pk=True)
# orders 是反向关系,通过 Order.user 的 related_name 自动创建
class Order(Model):
id = fields.BigIntField(pk=True)
user = fields.ForeignKeyField('models.User', related_name='orders', on_delete=fields.CASCADE)
amount = fields.DecimalField(max_digits=12, decimal_places=2)
# 使用
user = await User.get(id=1)
orders = await user.orders.all() # 通过 related_name 反向访问
order = await Order.get(id=1)
await order.fetch_related('user') # 手动加载关联
print(order.user.username)
# 多对多:User ↔ Role
class Role(Model):
id = fields.BigIntField(pk=True)
name = fields.CharField(max_length=50)
class User(Model):
id = fields.BigIntField(pk=True)
roles = fields.ManyToManyField('models.Role', related_name='users', through='user_roles')
# 使用
user = await User.get(id=1)
await user.roles.add(role) # 添加关联
await user.roles.remove(role) # 删除关联
roles = await user.roles.all() # 获取所有关联
await user.fetch_related('roles') # 预加载
# 一对一:User → Profile
class Profile(Model):
id = fields.BigIntField(pk=True)
user = fields.OneToOneField('models.User', related_name='profile', on_delete=fields.CASCADE)
bio = fields.TextField(null=True)
# 使用
user = await User.get(id=1)
await user.fetch_related('profile')
print(user.profile.bio)
3.5 增删改查¶
from tortoise.expressions import Q, F
from tortoise.functions import Count, Sum, Avg
# ---- 增 ----
# 方式1:create(直接入库)
user = await User.create(username='alice', email='alice@example.com', password='hashed')
# 方式2:先构造再保存
user = User(username='bob', email='bob@example.com', password='hashed')
await user.save()
# 批量创建
await User.bulk_create([
User(username='user1', email='u1@example.com', password='h'),
User(username='user2', email='u2@example.com', password='h'),
])
# ---- 查 ----
# 主键查询
user = await User.get(id=1) # 不存在抛 DoesNotExist
user = await User.get_or_none(id=1) # 不存在返回 None
user = await User.get_or_create(username='alice', defaults={'email': 'a@b.com'})
# 过滤查询
users = await User.filter(is_active=True).all()
users = await User.filter(age__gte=18).all()
user = await User.filter(username='alice').first()
# Q 对象(复杂查询)
users = await User.filter(
Q(age__gte=18) & Q(is_active=True) # AND
).all()
users = await User.filter(
Q(username='alice') | Q(email='alice@a.com') # OR
).all()
users = await User.filter(~Q(is_active=True)).all() # NOT
# 字段查询操作符
# __exact 精确匹配(默认)
# __iexact 不区分大小写匹配
# __contains 包含
# __icontains 不区分大小写包含
# __startswith / __endswith
# __gt / __gte / __lt / __lte
# __in IN 列表
# __isnull IS NULL
# __range 范围 [a, b]
User.filter(username__icontains='alice')
User.filter(id__in=[1, 2, 3])
User.filter(age__range=(18, 60))
User.filter(deleted_at__isnull=True)
# 排序
users = await User.all().order_by('-created_at', 'username')
# 分页
users = await User.all().offset((page - 1) * size).limit(size)
total = await User.all().count()
# 只查部分字段
users = await User.all().values('id', 'username', 'email')
users = await User.all().values_list('id', 'username', flat=False)
# 关联查询(预加载,避免 N+1)
users = await User.all().prefetch_related('orders', 'roles')
orders = await Order.all().select_related('user') # JOIN 方式
# 聚合
from tortoise.functions import Count, Sum
count = await User.all().count()
total = await Order.filter(user_id=1).aggregate(total=Sum('amount'))
stats = await Order.annotate(cnt=Count('id')).group_by('status').values('status', 'cnt')
# ---- 改 ----
# 方式1:获取后修改
user = await User.get(id=1)
user.username = 'new_name'
await user.save()
# 方式2:批量更新(高效,不加载对象)
await User.filter(is_active=False).update(deleted=True)
# 方式3:update_or_create
user, created = await User.update_or_create(
username='alice',
defaults={'email': 'new@example.com'}
)
# F 表达式(字段引用,避免竞争条件)
await Order.filter(id=1).update(amount=F('amount') + 10)
# ---- 删 ----
user = await User.get(id=1)
await user.delete()
# 批量删除
await User.filter(created_at__lt=cutoff_date).delete()
# ---- 事务 ----
from tortoise.transactions import in_transaction, atomic
# 方式1:async with
async with in_transaction():
user = await User.create(username='alice', email='a@b.com', password='h')
await Order.create(user=user, amount=99.9)
# 方式2:装饰器
@atomic()
async def create_order_with_user():
user = await User.create(...)
order = await Order.create(user=user, ...)
return order
3.6 迁移(Aerich)¶
pip install aerich
# pyproject.toml
[tool.aerich]
tortoise_orm = "app.database.TORTOISE_ORM"
location = "./migrations"
src_folder = "./."
# 初始化(只执行一次)
aerich init -t app.database.TORTOISE_ORM
aerich init-db
# 生成迁移文件
aerich migrate --name "add_users_table"
# 执行迁移
aerich upgrade
# 回滚
aerich downgrade
# 查看迁移历史
aerich history
# 查看当前版本
aerich heads
四、依赖注入篇¶
4.1 基本依赖¶
from fastapi import Depends
# 函数依赖
async def common_params(
page: int = Query(1, ge=1),
size: int = Query(20, ge=1, le=100)
) -> dict:
return {'offset': (page - 1) * size, 'limit': size}
@app.get('/users')
async def list_users(params: dict = Depends(common_params)):
offset, limit = params['offset'], params['limit']
...
# 类依赖(有状态)
class Pagination:
def __init__(self, page: int = 1, size: int = 20):
self.offset = (page - 1) * size
self.limit = size
@app.get('/orders')
async def list_orders(pagination: Pagination = Depends(Pagination)):
...
4.2 依赖链¶
# 依赖可以依赖其他依赖
async def get_token(authorization: str = Header(...)) -> str:
if not authorization.startswith('Bearer '):
raise HTTPException(status_code=401)
return authorization[7:]
async def get_current_user(token: str = Depends(get_token)) -> User:
user_id = verify_token(token)
user = await User.get_or_none(id=user_id)
if not user:
raise HTTPException(status_code=401)
return user
async def get_active_user(user: User = Depends(get_current_user)) -> User:
if not user.is_active:
raise HTTPException(status_code=403, detail='用户已被禁用')
return user
@app.get('/profile')
async def profile(user: User = Depends(get_active_user)):
return user
4.3 分页依赖¶
# dependencies/pagination.py
from dataclasses import dataclass
from fastapi import Query
@dataclass
class PageParams:
page: int = Query(1, ge=1, description='页码')
size: int = Query(20, ge=1, le=100, description='每页数量')
@property
def offset(self):
return (self.page - 1) * self.size
@property
def limit(self):
return self.size
# 使用
@app.get('/users')
async def list_users(pagination: PageParams = Depends()):
users = await User.all().offset(pagination.offset).limit(pagination.limit)
total = await User.all().count()
return {
'list': users,
'total': total,
'page': pagination.page,
'pages': (total + pagination.size - 1) // pagination.size
}
五、认证与权限篇¶
5.1 OAuth2 + JWT¶
# pip install python-jose[cryptography] passlib[bcrypt]
from datetime import datetime, timedelta
from jose import jwt, JWTError
from passlib.context import CryptContext
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
SECRET_KEY = settings.secret_key
ALGORITHM = 'HS256'
pwd_context = CryptContext(schemes=['bcrypt'])
oauth2_scheme = OAuth2PasswordBearer(tokenUrl='/api/auth/login')
def hash_password(password: str) -> str:
return pwd_context.hash(password)
def verify_password(plain: str, hashed: str) -> bool:
return pwd_context.verify(plain, hashed)
def create_access_token(data: dict, expires_delta: timedelta = None) -> str:
to_encode = data.copy()
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=120))
to_encode.update({'exp': expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
# 登录接口
@router.post('/login')
async def login(form: OAuth2PasswordRequestForm = Depends()):
user = await User.get_or_none(email=form.username)
if not user or not verify_password(form.password, user.password):
raise HTTPException(status_code=401, detail='账号或密码错误')
access_token = create_access_token({'sub': str(user.id), 'role': user.role})
return {'access_token': access_token, 'token_type': 'bearer'}
5.2 依赖注入认证¶
# dependencies/auth.py
from fastapi import Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer
from jose import jwt, JWTError
oauth2_scheme = OAuth2PasswordBearer(tokenUrl='/api/auth/login')
async def get_current_user(token: str = Depends(oauth2_scheme)) -> User:
credentials_exception = HTTPException(
status_code=401,
detail='无效的认证凭据',
headers={'WWW-Authenticate': 'Bearer'}
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
user_id: str = payload.get('sub')
if user_id is None:
raise credentials_exception
except JWTError:
raise credentials_exception
user = await User.get_or_none(id=int(user_id))
if not user:
raise credentials_exception
return user
async def get_active_user(user: User = Depends(get_current_user)) -> User:
if not user.is_active:
raise HTTPException(status_code=403, detail='账号已被禁用')
return user
# 使用
@router.get('/profile')
async def profile(user: User = Depends(get_active_user)):
return UserDetail.model_validate(user)
5.3 权限控制¶
# 基于角色的权限装饰器
from functools import wraps
def require_roles(*roles: str):
async def dependency(user: User = Depends(get_active_user)):
if user.role not in roles:
raise HTTPException(status_code=403, detail='权限不足')
return user
return Depends(dependency)
# 使用
@router.get('/admin/users')
async def admin_list(user: User = require_roles('admin', 'superadmin')):
...
# 更灵活的权限类
class PermissionChecker:
def __init__(self, required_permission: str):
self.required = required_permission
async def __call__(self, user: User = Depends(get_active_user)):
if self.required not in user.permissions:
raise HTTPException(status_code=403, detail='权限不足')
return user
@router.delete('/users/{user_id}')
async def delete_user(
user_id: int,
_: User = Depends(PermissionChecker('user:delete'))
):
...
5.4 API Key 认证¶
from fastapi.security import APIKeyHeader
api_key_header = APIKeyHeader(name='X-API-Key', auto_error=False)
async def verify_api_key(api_key: str = Depends(api_key_header)):
if api_key != settings.api_key:
raise HTTPException(status_code=403, detail='无效的 API Key')
return api_key
@router.get('/webhook', dependencies=[Depends(verify_api_key)])
async def webhook():
...
六、进阶篇¶
6.1 路由分组(APIRouter)¶
# app/api/user.py
from fastapi import APIRouter
router = APIRouter(
prefix='/api/users',
tags=['users'],
dependencies=[Depends(get_active_user)] # 整个路由组都需要认证
)
@router.get('/')
async def list_users():
...
@router.get('/{user_id}')
async def get_user(user_id: int):
...
# main.py
from app.api import auth, user, order
app.include_router(auth.router)
app.include_router(user.router)
app.include_router(order.router)
6.2 应用工厂模式¶
# app/main.py
from fastapi import FastAPI
from contextlib import asynccontextmanager
from tortoise.contrib.fastapi import RegisterTortoise
def create_app() -> FastAPI:
@asynccontextmanager
async def lifespan(app: FastAPI):
async with RegisterTortoise(app, config=TORTOISE_ORM, generate_schemas=False):
yield
app = FastAPI(
title='My API',
lifespan=lifespan
)
# 注册路由
from app.api import auth, user, order
app.include_router(auth.router)
app.include_router(user.router)
app.include_router(order.router)
# 注册中间件
register_middlewares(app)
return app
app = create_app()
6.3 中间件¶
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.gzip import GZipMiddleware
from starlette.middleware.base import BaseHTTPMiddleware
import time
# CORS
app.add_middleware(
CORSMiddleware,
allow_origins=['http://localhost:3000', 'https://yourdomain.com'],
allow_credentials=True,
allow_methods=['*'],
allow_headers=['*']
)
# GZIP 压缩
app.add_middleware(GZipMiddleware, minimum_size=1000)
# 自定义中间件
class RequestLogMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
start = time.time()
response = await call_next(request)
duration = time.time() - start
response.headers['X-Process-Time'] = f'{duration:.4f}'
return response
app.add_middleware(RequestLogMiddleware)
# 限流中间件(使用 slowapi)
# pip install slowapi
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
@app.get('/api/data')
@limiter.limit('100/minute')
async def data(request: Request):
...
6.4 事件钩子(lifespan)¶
# FastAPI 0.93+ 推荐用 lifespan 代替 on_event
from contextlib import asynccontextmanager
@asynccontextmanager
async def lifespan(app: FastAPI):
# === 启动时 ===
print('应用启动')
await init_redis()
await init_db()
yield # 应用运行中
# === 关闭时 ===
print('应用关闭')
await close_redis()
app = FastAPI(lifespan=lifespan)
6.5 后台任务¶
from fastapi import BackgroundTasks
# 轻量后台任务(同进程,适合快速操作)
def send_email(email: str, message: str):
# 发送邮件...
pass
@app.post('/register')
async def register(user_data: UserCreate, background_tasks: BackgroundTasks):
user = await User.create(**user_data.model_dump())
background_tasks.add_task(send_email, user.email, '欢迎注册!')
return {'msg': '注册成功'}
# 耗时任务用 Celery(见第八章)
6.6 WebSocket¶
from fastapi import WebSocket, WebSocketDisconnect
from typing import List
class ConnectionManager:
def __init__(self):
self.active: List[WebSocket] = []
async def connect(self, ws: WebSocket):
await ws.accept()
self.active.append(ws)
def disconnect(self, ws: WebSocket):
self.active.remove(ws)
async def broadcast(self, message: str):
for ws in self.active:
await ws.send_text(message)
manager = ConnectionManager()
@app.websocket('/ws/{room_id}')
async def websocket_endpoint(ws: WebSocket, room_id: str):
await manager.connect(ws)
try:
while True:
data = await ws.receive_text()
await manager.broadcast(f'Room {room_id}: {data}')
except WebSocketDisconnect:
manager.disconnect(ws)
6.7 异常处理¶
from fastapi import Request
from fastapi.responses import JSONResponse
from fastapi.exceptions import RequestValidationError
# 全局异常处理
@app.exception_handler(404)
async def not_found_handler(request: Request, exc):
return JSONResponse({'code': 404, 'msg': '资源不存在'}, status_code=404)
@app.exception_handler(500)
async def server_error_handler(request: Request, exc):
return JSONResponse({'code': 500, 'msg': '服务器内部错误'}, status_code=500)
# 请求验证错误(Pydantic 验证失败)
@app.exception_handler(RequestValidationError)
async def validation_error_handler(request: Request, exc: RequestValidationError):
errors = [{'field': e['loc'][-1], 'msg': e['msg']} for e in exc.errors()]
return JSONResponse({'code': 422, 'msg': '参数验证失败', 'errors': errors}, status_code=422)
# 自定义业务异常
class AppException(Exception):
def __init__(self, message: str, code: int = 400):
self.message = message
self.code = code
@app.exception_handler(AppException)
async def app_exception_handler(request: Request, exc: AppException):
return JSONResponse({'code': exc.code, 'msg': exc.message}, status_code=exc.code)
# 抛出
raise AppException('用户名已存在', code=409)
raise HTTPException(status_code=404, detail='用户不存在')
七、异步篇¶
7.1 async/await 基础¶
import asyncio
# 同步函数(不能 await,不要在里面做 IO 操作)
def sync_func():
return 'sync'
# 异步函数(可以 await,能做异步 IO)
async def async_func():
await asyncio.sleep(1)
return 'async'
# FastAPI 视图推荐全部用 async def
@app.get('/data')
async def get_data():
result = await fetch_from_db()
return result
# 同步阻塞操作(如 CPU 密集计算)放到线程池
import asyncio
from fastapi.concurrency import run_in_threadpool
@app.get('/heavy')
async def heavy_compute():
result = await run_in_threadpool(sync_heavy_function, arg1, arg2)
return result
7.2 并发任务¶
import asyncio
# 并行执行多个协程
@app.get('/dashboard')
async def dashboard(user: User = Depends(get_active_user)):
# 同时查询,不按顺序等待
orders, profile, stats = await asyncio.gather(
Order.filter(user_id=user.id).count(),
Profile.get_or_none(user_id=user.id),
get_user_stats(user.id)
)
return {'orders': orders, 'profile': profile, 'stats': stats}
# 带超时
try:
result = await asyncio.wait_for(some_coroutine(), timeout=5.0)
except asyncio.TimeoutError:
raise HTTPException(status_code=504, detail='请求超时')
7.3 异步 HTTP 请求¶
# pip install httpx
import httpx
# 作为应用启动时创建,关闭时销毁
@asynccontextmanager
async def lifespan(app: FastAPI):
app.state.http_client = httpx.AsyncClient(timeout=10.0)
yield
await app.state.http_client.aclose()
# 在视图中使用
@app.get('/external')
async def call_external(request: Request):
client = request.app.state.http_client
response = await client.get('https://api.example.com/data')
return response.json()
八、缓存与任务篇¶
8.1 Redis 缓存¶
# pip install redis[asyncio]
import redis.asyncio as aioredis
import json
# 初始化(lifespan 中)
@asynccontextmanager
async def lifespan(app: FastAPI):
app.state.redis = aioredis.from_url(settings.redis_url, decode_responses=True)
yield
await app.state.redis.aclose()
# 缓存装饰器
from functools import wraps
def cache(key_prefix: str, ttl: int = 300):
def decorator(func):
@wraps(func)
async def wrapper(request: Request, *args, **kwargs):
redis = request.app.state.redis
cache_key = f'{key_prefix}:{":".join(str(a) for a in args)}'
cached = await redis.get(cache_key)
if cached:
return json.loads(cached)
result = await func(request, *args, **kwargs)
await redis.setex(cache_key, ttl, json.dumps(result))
return result
return wrapper
return decorator
# 手动缓存
@app.get('/hot-articles')
async def hot_articles(request: Request):
redis = request.app.state.redis
cached = await redis.get('hot_articles')
if cached:
return json.loads(cached)
articles = await Article.all().order_by('-views').limit(10).values()
await redis.setex('hot_articles', 60, json.dumps(articles))
return articles
8.2 Celery 异步任务¶
# celery_app.py
from celery import Celery
celery = Celery(
'myapp',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/0'
)
celery.conf.update(
task_serializer='json',
result_serializer='json',
timezone='Asia/Shanghai'
)
# tasks.py
@celery.task(bind=True, max_retries=3)
def send_email_task(self, to: str, subject: str, body: str):
try:
email_service.send(to, subject, body)
except Exception as exc:
raise self.retry(exc=exc, countdown=60)
# FastAPI 视图中调用(同步调用 Celery,非 await)
@app.post('/register')
async def register(user_data: UserCreate):
user = await User.create(**user_data.model_dump())
send_email_task.delay(user.email, '欢迎注册', '...')
return {'msg': 'ok'}
# 启动
# celery -A tasks worker --loglevel=info -c 4
九、测试篇¶
9.1 接口测试¶
# pip install pytest pytest-asyncio httpx
# tests/conftest.py
import pytest
from httpx import AsyncClient, ASGITransport
from tortoise.contrib.testing import initializer, finalizer
from app.main import app
@pytest.fixture(scope='session')
def anyio_backend():
return 'asyncio'
@pytest.fixture(autouse=True)
def init_db():
initializer(['app.models'])
yield
finalizer()
@pytest.fixture
async def client():
async with AsyncClient(
transport=ASGITransport(app=app),
base_url='http://test'
) as client:
yield client
# tests/test_user.py
import pytest
@pytest.mark.anyio
async def test_create_user(client):
response = await client.post('/api/users/', json={
'username': 'testuser',
'email': 'test@example.com',
'password': '123456',
'confirm_password': '123456'
})
assert response.status_code == 201
data = response.json()
assert data['data']['username'] == 'testuser'
@pytest.mark.anyio
async def test_login(client):
await client.post('/api/users/', json={
'username': 'u', 'email': 'u@a.com',
'password': '123456', 'confirm_password': '123456'
})
response = await client.post('/api/auth/login', data={
'username': 'u@a.com', 'password': '123456'
})
assert response.status_code == 200
assert 'access_token' in response.json()
@pytest.mark.anyio
async def test_get_user_not_found(client):
response = await client.get('/api/users/99999')
assert response.status_code == 404
9.2 Mock¶
from unittest.mock import patch, AsyncMock
@pytest.mark.anyio
async def test_with_mock(client):
with patch('app.services.email.send', new_callable=AsyncMock) as mock_send:
mock_send.return_value = True
response = await client.post('/api/register', json={...})
assert mock_send.called
十、部署篇¶
10.1 Uvicorn / Gunicorn¶
# 开发环境(热重载)
uvicorn app.main:app --reload --host 0.0.0.0 --port 8000
# 生产环境(Uvicorn 多 worker)
uvicorn app.main:app --host 0.0.0.0 --port 8000 --workers 4
# 生产环境(Gunicorn + Uvicorn worker,推荐)
gunicorn app.main:app \
--worker-class uvicorn.workers.UvicornWorker \
--workers 4 \
--bind 0.0.0.0:8000 \
--timeout 120 \
--max-requests 1000 \
--max-requests-jitter 100 \
--access-logfile /var/log/app/access.log \
--error-logfile /var/log/app/error.log
10.2 Nginx 配置¶
upstream fastapi_app {
server 127.0.0.1:8000;
}
server {
listen 80;
server_name yourdomain.com;
location /static/ {
alias /path/to/static/;
expires 30d;
}
location / {
proxy_pass http://fastapi_app;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_read_timeout 120;
client_max_body_size 20M;
}
# WebSocket 支持
location /ws/ {
proxy_pass http://fastapi_app;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection 'upgrade';
}
}
10.3 Docker 部署¶
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 8000
CMD ["gunicorn", "app.main:app",
"--worker-class", "uvicorn.workers.UvicornWorker",
"--workers", "4",
"--bind", "0.0.0.0:8000"]
# docker-compose.yml
version: "3.8"
services:
api:
build: .
ports:
- "8000:8000"
environment:
- DATABASE_URL=mysql+aiomysql://user:pass@db:3306/mydb
- REDIS_URL=redis://redis:6379/0
- SECRET_KEY=${SECRET_KEY}
depends_on:
- db
- redis
restart: unless-stopped
db:
image: mysql:8.0
environment:
MYSQL_ROOT_PASSWORD: rootpass
MYSQL_DATABASE: mydb
MYSQL_USER: user
MYSQL_PASSWORD: pass
volumes:
- mysql_data:/var/lib/mysql
redis:
image: redis:7-alpine
volumes:
- redis_data:/data
nginx:
image: nginx:alpine
ports:
- "80:80"
volumes:
- ./nginx.conf:/etc/nginx/conf.d/default.conf
depends_on:
- api
volumes:
mysql_data:
redis_data:
FastAPI vs Flask 对比¶
FastAPI |
Flask |
|
|---|---|---|
性能 |
极高(异步) |
中等(同步) |
类型检查 |
Pydantic 原生支持 |
需额外库 |
自动文档 |
内置 Swagger/ReDoc |
需插件 |
异步支持 |
原生 async/await |
Flask 2.0+ 有限支持 |
学习曲线 |
中等 |
低 |
生态成熟度 |
较新但增长快 |
成熟 |
ORM |
推荐 Tortoise / SQLModel |
Flask-SQLAlchemy |
适合场景 |
高性能 API、微服务 |
传统 Web、快速原型 |
常用扩展汇总¶
库 |
功能 |
|---|---|
tortoise-orm |
异步 ORM |
aerich |
Tortoise 数据库迁移 |
pydantic-settings |
配置管理 |
python-jose |
JWT |
passlib |
密码哈希 |
httpx |
异步 HTTP 客户端 |
redis[asyncio] |
异步 Redis |
celery |
异步任务队列 |
slowapi |
接口限流 |
fastapi-cors |
跨域(内置 middleware) |
python-multipart |
表单/文件上传 |
uvicorn |
ASGI 服务器 |
gunicorn |
进程管理器 |
参考资源¶
FastAPI 官方文档:https://fastapi.tiangolo.com/
Tortoise ORM 文档:https://tortoise.github.io/
Aerich 文档:https://github.com/tortoise/aerich
Pydantic 文档:https://docs.pydantic.dev/
Uvicorn 文档:https://www.uvicorn.org/