官方文档
https://fastapi.tiangolo.com/zh/tutorial/security/oauth2-jwt/
由于官方文档看起来有些复杂,所以我在学习的过程中进行了整理
安装库
安装 python-jose
,在 Python 中生成和校验 JWT 令牌
pip install python-jose[cryptography]
Passlib 是处理密码哈希的 Python 包。
pip install passlib[bcrypt]
完整代码
代码注释非常清楚,认真研究一下,逻辑li清楚了其实很简单
from datetime import datetime, timedelta, timezone
from typing import Union
from pydantic import BaseModel, TypeAdapter
import jwt
from fastapi import Body, Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jwt.exceptions import InvalidTokenError
from passlib.context import CryptContext
from pydantic import BaseModel
# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
# 模拟数据库,保存用户信息
# 其中hashed_password是加密过后的密码
fake_users_db = {
"admin": {
"username": "admin",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "$2b$12$YKnod6.e6A2yihtRMbrqiu8zME1AhFFv4GjdeazKLv60SQ9tf/GmS",
"disabled": False,
}
}
# 请求认证模型
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Union[str, None] = None
class User(BaseModel):
username: str
email: Union[str, None] = None
full_name: Union[str, None] = None
disabled: Union[bool, None] = None
class UserInDB(User):
hashed_password: str
class OAuth2PasswordRequestForm(BaseModel):
username: str
password: str
# 通过passlib中CryptContext实现加密,解密
# 这里使用bcrypt算法
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
app = FastAPI()
# 验证密码
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
# 获取密码
def get_password_hash(password):
return pwd_context.hash(password)
# 模拟从数据库取用户数据
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
# 通过密码验证并返回用户信息
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
# 加密,创建JWT的核心逻辑
def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
# 获取当前用户信息,对token解密,还原user,并从数据库中验证
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except InvalidTokenError:
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
return user
# 验证用户是否被禁用
async def get_current_active_user(current_user: User = Depends(get_current_user)):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
# 传入用户名和密码,返回token,注意,这里的格式需要传入字典
@app.post("/token")
async def login_for_access_token(
json_data: dict = Body(...),
) -> Token:
ta = TypeAdapter(OAuth2PasswordRequestForm)
form_data = ta.validate_python(json_data)
print(form_data.password)
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="用户名或密码错误",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return Token(access_token=access_token, token_type="bearer")
@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
@app.get("/users/me/items/")
async def read_own_items(current_user: User = Depends(get_current_active_user)):
return [{"item_id": "Foo", "owner": current_user.username}]
测试
启动项目
uvicorn main:app --reload
请求测试,我这里用的是apipost
请求http://127.0.0.1:8000/token,返回的令牌为
eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJhZG1pbiIsImV4cCI6MTcxOTc1NDkwNH0.QVdd0TxfKfH5cav7a9_RCs7pT5khxG6_7LwuigIUmXo
接着我们请求http://127.0.0.1:8000/users/me/路由时,需要携带令牌,令牌格式为token_type+access_token,也就是
bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJhZG1pbiIsImV4cCI6MTcxOTc1NDkwNH0.QVdd0TxfKfH5cav7a9_RCs7pT5khxG6_7LwuigIUmXo
以后只要每个路由函数中传入这个值的路由,都需要令牌验证,不传则不需要
current_user: User = Depends(get_current_active_user)
© 版权声明
THE END
暂无评论内容