7.2 OAuth2 密码流程(带 Bearer Token)
FastAPI OAuth2密码流程教程:使用Bearer Token进行用户认证
本教程详细讲解如何在FastAPI中实现OAuth2密码流程(Resource Owner Password Credentials Grant),使用Bearer Token进行认证。内容涵盖安装依赖、代码示例、安全注意事项,适合FastAPI初学者快速上手API认证。
推荐工具
FastAPI OAuth2密码流程教程(带Bearer Token)
介绍
OAuth2是一种授权框架,允许第三方应用获取用户资源的有限访问权限,而无需暴露用户密码。密码流程(Resource Owner Password Credentials Grant)是OAuth2的一种流程,允许用户使用用户名和密码直接获取访问令牌,适用于受信任的客户端,如移动应用或桌面应用。Bearer Token是标准令牌类型,在HTTP头中以Authorization: Bearer <token>形式传递,用于认证API请求。
本教程将指导您在FastAPI中实现OAuth2密码流程,使用Bearer Token进行安全认证。内容设计简单易懂,适合FastAPI新手。
前提条件
- 基本Python知识
- 安装了Python 3.7+和pip
- 熟悉FastAPI基础知识(如安装和运行)
安装依赖
首先,打开终端或命令行,安装必要的包:
pip install fastapi uvicorn python-multipart passlib[bcrypt] python-jose[cryptography]
fastapi: FastAPI框架。uvicorn: ASGI服务器,用于运行FastAPI应用。python-multipart: 处理表单数据。passlib[bcrypt]: 用于密码哈希。python-jose[cryptography]: 用于JWT(JSON Web Token)生成和验证。
创建FastAPI应用
创建一个新文件,如 main.py,并导入必要的模块。
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
from datetime import datetime, timedelta
from jose import JWTError, jwt
from passlib.context import CryptContext
from typing import Optional
app = FastAPI(title="OAuth2密码流程教程")
# 用于演示的用户数据库(内存中)
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW", # 密码是"secret"的哈希值
"disabled": False,
}
}
# 密码哈希上下文
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# OAuth2方案,用于Bearer Token认证
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# JWT设置
SECRET_KEY = "your_secret_key" # 在实际应用中,请使用强密钥并存储在环境变量中
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
# 数据模型
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Optional[str] = None
class User(BaseModel):
username: str
email: Optional[str] = None
full_name: Optional[str] = None
disabled: Optional[bool] = None
class UserInDB(User):
hashed_password: str
# 辅助函数
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
"""
处理用户登录,返回Bearer Token。
这是OAuth2密码流程的核心部分。
"""
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
async def get_current_user(token: str = Depends(oauth2_scheme)):
"""
从Bearer Token中提取并验证当前用户。
"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
return user
async def get_current_active_user(current_user: User = Depends(get_current_user)):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
@app.get("/users/me", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
"""
受保护的路由,需要Bearer Token认证。
"""
return current_user
@app.get("/")
async def root():
return {"message": "欢迎使用FastAPI OAuth2教程!访问 /docs 查看API文档。"}
运行应用
保存文件后,运行以下命令启动服务器:
uvicorn main:app --reload
访问 http://127.0.0.1:8000/docs 查看交互式API文档。
测试认证流程
- 在API文档中,找到
/token端点,使用POST方法。 - 在表单中输入用户名
johndoe和密码secret(示例中哈希的密码)。 - 点击执行,您将获得一个Bearer Token,例如:
{ "access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...", "token_type": "bearer" } - 复制access_token,然后访问
/users/me端点,在Authorize部分添加Token(格式:Bearer )。 - 点击执行,如果Token有效,将返回用户信息。
解释代码
- 依赖注入:使用
Depends()注入依赖,如认证表单和Token验证。 - JWT令牌:生成带有效期的JWT作为Bearer Token,用于后续请求。
- 安全:使用bcrypt哈希密码,OAuth2方案确保安全认证。
- 错误处理:HTTP异常用于返回适当的错误状态和消息。
安全注意事项
- 使用HTTPS:在生产环境中,确保使用HTTPS保护数据传输。
- 密钥管理:将SECRET_KEY存储在环境变量或安全配置中,避免硬编码。
- 密码强度:要求用户使用强密码,并定期更新。
- 令牌过期:设置较短的Token过期时间,并考虑使用刷新令牌机制。
- 数据库:本教程使用内存数据库作为示例,实际应用应使用真实数据库(如PostgreSQL)。
总结
通过本教程,您学会了在FastAPI中实现OAuth2密码流程,使用Bearer Token进行用户认证。这有助于保护您的API,确保只有授权用户才能访问受保护资源。下一步,您可以扩展此示例,添加更多功能如用户注册、角色管理或集成其他OAuth2流程。
如需更多帮助,请参考FastAPI官方文档或加入相关社区。
开发工具推荐