AI掘金头条新闻系统 (Toutiao News)-用户注册-生成Token
1.crud/users.py
# 生成 Token async def create_token(db: AsyncSession, user_id: int): # 生成 Token token = str(uuid.uuid4()) # 设置过期时间 expires_at = datetime.now() + timedelta(days=7) # 查询数据库当前用户是否有 Token query = select(UserToken).where(UserToken.user_id == user_id) result = await db.execute(query) user_token = result.scalar_one_or_none() # 有:更新 if user_token: user_token.token = token user_token.expires_at = expires_at await db.commit() # 没有: 添加 else: user_token = UserToken(user_id=user_id, token=token, expires_at=expires_at) db.add(user_token) await db.commit() return token2.routers/users.py
# 生成 Token token = await users.create_token(db, user.id)完整代码
from fastapi import APIRouter from fastapi.params import Depends from sqlalchemy.ext.asyncio import AsyncSession from common.result import Result from config.db_confing import get_db from schemas.users import UserRequest from crud import users router = APIRouter(prefix="/api/user", tags=["users"]) # 用户注册 @router.post("register") async def register(user_data: UserRequest, db: AsyncSession = Depends(get_db)): # 根据用户名查询数据库 db_user = await users.get_user_by_username(db, user_data.username) if db_user: return Result.error("用户已存在", 400) # 新增用户 user = await users.create_user(db, user_data) # 生成 Token token = await users.create_token(db, user.id) return { "code": 200, "message": "注册成功", "data": { "token": token, "userInfo": { "id": user.id, "username": user_data.username, "bio": user.bio, "avatar": user.avatar } } }