fastapi实现密码哈希与 Bearer JWT 令牌验证

fastapi实现密码哈希与 Bearer JWT 令牌验证

官方文档

https://fastapi.tiangolo.com/zh/tutorial/security/oauth2-jwt/

由于官方文档看起来有些复杂,所以我在学习的过程中进行了整理

安装库

安装 python-jose,在 Python 中生成和校验 JWT 令牌

pip install python-jose[cryptography]

Passlib 是处理密码哈希的 Python 包。

pip install passlib[bcrypt]

完整代码

代码注释非常清楚,认真研究一下,逻辑li清楚了其实很简单

from datetime import datetime, timedelta, timezone
from typing import Union
from pydantic import BaseModel, TypeAdapter
import jwt
from fastapi import Body, Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jwt.exceptions import InvalidTokenError
from passlib.context import CryptContext
from pydantic import BaseModel

# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

# 模拟数据库,保存用户信息
# 其中hashed_password是加密过后的密码
fake_users_db = {
    "admin": {
        "username": "admin",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": "$2b$12$YKnod6.e6A2yihtRMbrqiu8zME1AhFFv4GjdeazKLv60SQ9tf/GmS",
        "disabled": False,
    }
}

# 请求认证模型
class Token(BaseModel):
    access_token: str
    token_type: str


class TokenData(BaseModel):
    username: Union[str, None] = None


class User(BaseModel):
    username: str
    email: Union[str, None] = None
    full_name: Union[str, None] = None
    disabled: Union[bool, None] = None


class UserInDB(User):
    hashed_password: str

class OAuth2PasswordRequestForm(BaseModel):
    username: str
    password: str
    
# 通过passlib中CryptContext实现加密,解密
# 这里使用bcrypt算法
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

app = FastAPI()

# 验证密码
def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)

# 获取密码
def get_password_hash(password):
    return pwd_context.hash(password)

# 模拟从数据库取用户数据
def get_user(db, username: str):
   
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)

# 通过密码验证并返回用户信息
def authenticate_user(fake_db, username: str, password: str):
    user = get_user(fake_db, username)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user

# 加密,创建JWT的核心逻辑
def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.now(timezone.utc) + expires_delta
    else:
        expire = datetime.now(timezone.utc) + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

# 获取当前用户信息,对token解密,还原user,并从数据库中验证
async def get_current_user(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except InvalidTokenError:
        raise credentials_exception
    
    user = get_user(fake_users_db, username=token_data.username)
    if user is None:
        raise credentials_exception
    return user

# 验证用户是否被禁用
async def get_current_active_user(current_user: User = Depends(get_current_user)):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user

# 传入用户名和密码,返回token,注意,这里的格式需要传入字典
@app.post("/token")
async def login_for_access_token(
    json_data: dict = Body(...),
) -> Token:
    ta = TypeAdapter(OAuth2PasswordRequestForm)
    form_data = ta.validate_python(json_data)
    print(form_data.password)
    user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="用户名或密码错误",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )
    return Token(access_token=access_token, token_type="bearer")



@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
    return current_user



@app.get("/users/me/items/")
async def read_own_items(current_user: User = Depends(get_current_active_user)):
    return [{"item_id": "Foo", "owner": current_user.username}]

测试

启动项目

uvicorn main:app --reload

请求测试,我这里用的是apipost

apipost官网

请求http://127.0.0.1:8000/token,返回的令牌为

eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJhZG1pbiIsImV4cCI6MTcxOTc1NDkwNH0.QVdd0TxfKfH5cav7a9_RCs7pT5khxG6_7LwuigIUmXo

 

20240630213347508-image

接着我们请求http://127.0.0.1:8000/users/me/路由时,需要携带令牌,令牌格式为token_type+access_token,也就是

bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJhZG1pbiIsImV4cCI6MTcxOTc1NDkwNH0.QVdd0TxfKfH5cav7a9_RCs7pT5khxG6_7LwuigIUmXo

20240630213844394-image

以后只要每个路由函数中传入这个值的路由,都需要令牌验证,不传则不需要

current_user: User = Depends(get_current_active_user)

 

20240630214026224-image

 

 

© 版权声明
THE END
喜欢就支持一下吧
点赞0赞赏 分享
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

取消
昵称表情代码图片

    暂无评论内容