7.3 JWT(JSON Web Token)认证实现
FastAPI中JWT认证实现详细教程 - 从入门到精通
本教程详细介绍了如何在FastAPI中实现JWT(JSON Web Token)认证,包括安装依赖、生成和验证token的步骤,适合新人学习,涵盖代码示例和安全实践。
推荐工具
FastAPI中JWT认证实现教程
简介
JWT(JSON Web Token)是一种用于在网络应用之间安全传输信息的开放标准。在FastAPI中,JWT常用于用户认证和授权,因为它简洁、可自包含且易于实现。本教程将指导您在FastAPI中实现JWT认证,适合初学者快速上手。
前置条件
在开始之前,确保您已安装以下内容:
- Python 3.7 或更高版本
- FastAPI 框架(通过 pip 安装)
- 对FastAPI基础有一定了解(如路由、依赖注入)
安装所需包
运行以下命令安装必要的包:
pip install fastapi uvicorn python-jose[cryptography] passlib bcrypt
fastapi和uvicorn用于运行FastAPI应用。python-jose用于生成和验证JWT token。passlib和bcrypt用于密码哈希(可选,但推荐用于安全)。
实现JWT认证
步骤1:定义用户模型和设置
首先,创建Pydantic模型来处理用户数据,并定义JWT相关配置。
from pydantic import BaseModel
from datetime import datetime, timedelta
from typing import Optional
# 用户登录模型
class UserLogin(BaseModel):
username: str
password: str
# 用户模型(示例)
class User(BaseModel):
username: str
password_hash: str # 存储哈希后的密码
# JWT配置
SECRET_KEY = "your-secret-key-change-this-in-production" # 在生产环境中使用安全密钥
ALGORITHM = "HS256" # 签名算法
ACCESS_TOKEN_EXPIRE_MINUTES = 30 # token过期时间
步骤2:生成JWT Token
创建一个函数来生成token,基于用户信息。
from jose import JWTError, jwt
from datetime import datetime, timedelta
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
步骤3:验证JWT Token
编写一个函数来验证并解码token。
def verify_token(token: str):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub") # 假设token中包含sub字段存储用户名
if username is None:
return None
return username
except JWTError:
return None
步骤4:创建认证依赖
使用FastAPI的依赖注入来保护路由,验证token。
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
security = HTTPBearer()
def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)):
token = credentials.credentials
username = verify_token(token)
if username is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="无效或过期的token",
headers={"WWW-Authenticate": "Bearer"},
)
return username
保护路由
现在,您可以在需要认证的路由中使用Depends(get_current_user)来保护它们。
from fastapi import FastAPI, Depends
app = FastAPI()
# 登录端点,生成token
@app.post("/login")
async def login(user_login: UserLogin):
# 这里应查询数据库验证用户,示例中简化处理
if user_login.username == "admin" and user_login.password == "password":
access_token = create_access_token(data={"sub": user_login.username})
return {"access_token": access_token, "token_type": "bearer"}
else:
raise HTTPException(status_code=400, detail="用户名或密码错误")
# 受保护的路由
@app.get("/protected")
async def protected_route(current_user: str = Depends(get_current_user)):
return {"message": f"欢迎, {current_user}!您已成功认证。"}
完整示例代码
将所有代码整合到一个文件中,例如 main.py,并运行应用。
# main.py
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel
from jose import JWTError, jwt
from datetime import datetime, timedelta
from typing import Optional
# 配置
SECRET_KEY = "your-secret-key-change-this-in-production"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
app = FastAPI()
security = HTTPBearer()
class UserLogin(BaseModel):
username: str
password: str
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def verify_token(token: str):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
return None
return username
except JWTError:
return None
def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)):
token = credentials.credentials
username = verify_token(token)
if username is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="无效或过期的token",
headers={"WWW-Authenticate": "Bearer"},
)
return username
@app.post("/login")
async def login(user_login: UserLogin):
# 简化验证:实际应用中应与数据库比对
if user_login.username == "admin" and user_login.password == "password":
access_token = create_access_token(data={"sub": user_login.username})
return {"access_token": access_token, "token_type": "bearer"}
else:
raise HTTPException(status_code=400, detail="用户名或密码错误")
@app.get("/protected")
async def protected_route(current_user: str = Depends(get_current_user)):
return {"message": f"欢迎, {current_user}!您已成功认证。"}
运行应用:
uvicorn main:app --reload
安全和最佳实践
- 密钥管理:在生产环境中,使用环境变量或秘密管理工具存储
SECRET_KEY,避免硬编码。 - 密码哈希:推荐使用
passlib或bcrypt对密码进行哈希存储,而不是明文。 - token过期:设置合理的过期时间,如30分钟,并使用刷新token机制处理长时间会话。
- HTTPS:在生产环境中使用HTTPS来加密传输,防止token被截获。
- 避免敏感信息:不要在JWT payload中存储敏感数据,因为它可被解码。
总结
通过本教程,您学会了如何在FastAPI中实现JWT认证,从安装依赖到保护路由的完整流程。JWT是构建安全API的有效工具,结合FastAPI的依赖注入,可以轻松管理用户认证。继续实践并探索更多高级特性,如角色权限管理。
如果您有任何问题,欢迎在社区中提问。
开发工具推荐