办公问答网

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 85|回复: 0

FastAPI权限管理系统升级版2(附完整代码)

[复制链接]

4

主题

7

帖子

15

积分

新手上路

Rank: 1

积分
15
发表于 2023-2-10 13:38:44 | 显示全部楼层 |阅读模式
前言

学无止境,无止境学。坚持每天学点编程知识,坚持每天写点小文章,坚持每天进步一点点。大家好,我是张大鹏,喜欢学习和分享,希望能够通过此公众号,将自己学到的东西分享给大家,和大家一起交流,一起成长,一起进步。
乾坤未定,你我皆是黑马。大鹏一日同风起,扶摇直上九万里。宝剑锋从磨砺出,梅花香自苦寒来。不积跬步,无以至千里,不积小流无以成江海。
如果有多余的时间,就坚持学习吧,小时候学习改变命运,长大了学习丰富内涵,老了学习带来宁静。活到老,学到老,我能行!

课程介绍

收费说明

《FastAPI零基础入门班》收费说明:

  • 时长:10节课
  • 价格:1999元
  • 上课方式:腾讯会议小班教学
  • 联系电话(同微信):18010070052

课程大纲

本篇文章是《FastAPI零基础入门班》的第5篇文章,前面还有:

  • 《FastAPI快速入门》
  • 《FastAPI实现商品管理API》
  • 《FastAPI整合MySQL》
  • 《FastAPI实现OAuth2和JWT登录》
  • 《FastAPI组件化开发》

基本实现

目录结构

创建如下的目录结构:




安装依赖

pip install fastapi[all]pip install uvicorn[standard]pip install python-jose[cryptography]pip install passlib[bcrypt]pip install peeweepip install pymysql
配置

目标

1、引入pymysql并创建MySQL的引擎
2、引入peewee的连接方法
3、创建MySQL的数据库连接

实现方案

1、创建引擎
import pymysqlpymysql.install_as_MySQLdb()
2、创建MySQL连接
import pymysqldb = connect('mysql://root:root@localhost:3306/gaoxiang_shop_admin')
完整代码

config/db.py
import pymysqlfrom playhouse.db_url import connectpymysql.install_as_MySQLdb()db = connect('mysql://root:root@localhost:3306/gaoxiang_shop_admin')
密码工具模块

目标

1、实现密码加密的方法
2、实现校验密码是否正确的方法

实现方案

1、实现密码加密的方法
from passlib.context import CryptContext# 密码加密对象pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")def get_password(password):    """密码加密"""    return pwd_context.hash(password)
2、实现校验密码是否正确的方法
def check_password(plain_password, hashed_password):    """校验密码是否正确"""    return pwd_context.verify(plain_password, hashed_password)
完整代码

from passlib.context import CryptContextpwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")def check_password(password_text, password_hash):    """密码校验"""    return pwd_context.verify(password_text, password_hash)def get_password(password_text):    """密码加密"""    return pwd_context.hash(password_text)
用户模型

目标

1、创建用户表的ORM模型
2、创建根据用户名查询用户的方法
3、创建校验用户名和密码是否正确的方法

实现方案

1、创建用户表的ORM模型
from peewee import *from config import dbfrom utils.password import check_passwordclass UserModel(Model):    username = CharField()    password = CharField()    class Meta:        database = db.db        db_table = "user"
2、创建根据用户名查询用户的方法
def get_user(username: str):    """从数据库获取用户信息"""    try:        return UserModel.get(UserModel.username == username)    except Exception as e:        print(e)
3、创建校验用户名和密码是否正确的方法
def authenticate_user(username: str, password: str):    """校验用户"""    # 从数据库获取用户    user = get_user(username)    # 校验用户是否存在    if user is None:        return False    # 校验用户密码是否正确    if not check_password(password, user.password):        return False    # 校验通过,返回数据库查到的用户信息    return user
完整代码

db/user.py
from peewee import *from config import dbfrom utils.password import check_passwordclass UserModel(Model):    username = CharField()    password = CharField()    class Meta:        database = db.db        db_table = "user"def get_user(username: str):    """从数据库获取用户信息"""    try:        return UserModel.get(UserModel.username == username)    except Exception as e:        print(e)def authenticate_user(username: str, password: str):    """校验用户"""    # 从数据库获取用户    user = get_user(username)    # 校验用户是否存在    if user is None:        return False    # 校验用户密码是否正确    if not check_password(password, user.password):        return False    # 校验通过,返回数据库查到的用户信息    return user
用户依赖模块

目标

1、确保用户要登录以后才能够访问到Doc文档的带权限接口
2、确保token能够解析出用户信息

实现方案

1、确保用户要登录以后才能够访问到Doc文档的带权限接口
from fastapi import Depends, HTTPException, statusfrom fastapi.security import OAuth2PasswordBearerfrom jose import JWTError, jwtfrom db.user import get_userfrom utils.jwt import SECRET_KEY, ALGORITHM# 文档OAuth2登录oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
2、确保token能够解析出用户信息
async def get_current_user(token: str = Depends(oauth2_scheme)):    """获取当前登录用户"""    # 异常对象    credentials_exception = HTTPException(        status_code=status.HTTP_401_UNAUTHORIZED,        detail="权限校验失败",        headers={"WWW-Authenticate": "Bearer"},    )    username: str = ""    try:        # 解析客户端传过来的JWT Token        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])        # 获取用户名        username = payload.get("username")        # 校验用户名是否存在        if username is None:            raise credentials_exception    except JWTError:        raise credentials_exception    # 从数据库获取用户    user = get_user(username=username)    # 校验用户是否存在    if user is None:        raise credentials_exception    # 返回获取到的用户    return user
完整代码

dependencies/user.py
from fastapi import Depends, HTTPException, statusfrom fastapi.security import OAuth2PasswordBearerfrom jose import JWTError, jwtfrom db.user import get_userfrom utils.jwt import SECRET_KEY, ALGORITHM# 文档OAuth2登录oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")async def get_current_user(token: str = Depends(oauth2_scheme)):    """获取当前登录用户"""    # 异常对象    credentials_exception = HTTPException(        status_code=status.HTTP_401_UNAUTHORIZED,        detail="权限校验失败",        headers={"WWW-Authenticate": "Bearer"},    )    username: str = ""    try:        # 解析客户端传过来的JWT Token        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])        # 获取用户名        username = payload.get("username")        # 校验用户名是否存在        if username is None:            raise credentials_exception    except JWTError:        raise credentials_exception    # 从数据库获取用户    user = get_user(username=username)    # 校验用户是否存在    if user is None:        raise credentials_exception    # 返回获取到的用户    return user
用户请求Schema

目标

1、规范用户登录接口的请求
2、规范用户注册的请求

实现方案

1、规范用户登录接口的请求
from pydantic import BaseModelclass UserSchema(BaseModel):    """用户模型"""    username: str  # 用户名class UserLoginSchema(UserSchema):    """用户登录模型"""    password: str  # 密码
2、规范用户注册的请求
class UserRegisterSchema(UserLoginSchema):    """用户注册模型"""    re_password: str  # 确认密码
完整代码

schemas/user.py
from pydantic import BaseModelclass UserSchema(BaseModel):    """用户模型"""    username: str  # 用户名class UserLoginSchema(UserSchema):    """用户登录模型"""    password: str  # 密码class UserRegisterSchema(UserLoginSchema):    """用户注册模型"""    re_password: str  # 确认密码
JWT工具模块

目标

1、创建JWT Token
2、解析JWT Token

实现方案

1、创建JWT Token
from datetime import timedelta, datetimefrom typing import Unionfrom jose import jwt# 生成私钥的命令:openssl rand -hex 32SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM = "HS256"  # 加密算法ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 3  # token过期时间(分钟)def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None):    """创建JWT Token"""    # 提交要打包的token内部数据    to_encode = data.copy()    # 设置token的过期时间ta    if expires_delta:        expire = datetime.utcnow() + expires_delta    else:        expire = datetime.utcnow() + timedelta(minutes=15)    to_encode.update({"exp": expire})    # 生成JWT Token    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)    # 返回JWT Token    return encoded_jwt
2、解析JWT Token
def get_token(data: dict = {}):    """创建token"""    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)    access_token = create_access_token(        data=data, expires_delta=access_token_expires    )    return access_token
完整代码

utils/jwt.py
from datetime import timedelta, datetimefrom typing import Unionfrom jose import jwt# 生成私钥的命令:openssl rand -hex 32SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM = "HS256"  # 加密算法ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 3  # token过期时间(分钟)def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None):    """创建JWT Token"""    # 提交要打包的token内部数据    to_encode = data.copy()    # 设置token的过期时间ta    if expires_delta:        expire = datetime.utcnow() + expires_delta    else:        expire = datetime.utcnow() + timedelta(minutes=15)    to_encode.update({"exp": expire})    # 生成JWT Token    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)    # 返回JWT Token    return encoded_jwtdef get_token(data: dict = {}):    """创建token"""    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)    access_token = create_access_token(        data=data, expires_delta=access_token_expires    )    return access_token
用户相关接口

目标

1、实现用户注册接口
2、使用Oauth2用户登录接口
3、实现JSON用户登录接口
4、实现获取用户信息接口

实现方案

1、实现用户注册接口
from fastapi import APIRouterfrom fastapi import Depends, HTTPException, statusfrom fastapi.security import OAuth2PasswordRequestForm# 导入用户相关数据库方法from db.user import authenticate_user, UserModel# 导入依赖注入相关代码from dependencies.user import get_current_user# 导入用户相关schemafrom responses.base import get_response_param_error, get_responsefrom schemas.user import UserRegisterSchema, UserLoginSchema# 导入JWT相关的代码from utils.jwt import get_tokenfrom utils.password import get_passwordrouter = APIRouter()@router.post("/register")async def post_register(user: UserRegisterSchema):    """用户注册"""    if user.password != user.re_password:        return get_response_param_error("两次密码不一致")    password = get_password(user.password)    user_model = UserModel(username=user.username, password=password)    user_model.save()    return get_response()
2、使用Oauth2用户登录接口
@router.post("/token")async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):    """用户登录获取Token接口"""    # 校验用户    auth_user = authenticate_user(form_data.username, form_data.password)    if not auth_user:        raise HTTPException(            status_code=status.HTTP_401_UNAUTHORIZED,            detail="用户名或密码错误",            headers={"WWW-Authenticate": "Bearer"},        )    # 生成token    data = {        "id": auth_user.id,        "username": auth_user.username,    }    token = get_token(data)    # 返回token和token类型    return {"access_token": token, "token_type": "bearer"}
3、实现JSON用户登录接口
@router.post("/login")async def post_login(user: UserLoginSchema):    """用户登录"""    # 校验用户    auth_user = authenticate_user(**user.dict())    if not user:        return get_response_param_error("用户名或密码错误")    # 生成token    data = {        "id": auth_user.id,        "username": user.username,    }    token = get_token(data)    # 返回响应    result = {        "id": auth_user.id,        "username": user.username,        "token": token,    }    return get_response(data=result)
4、实现获取用户信息接口
@router.get("/userinfo")async def get_userinfo(current_user: UserModel = Depends(get_current_user)):    """获取用户信息"""    data = current_user.__data__    del data["password"]    return get_response(data=data)
完整代码

routers/user.py
from fastapi import APIRouterfrom fastapi import Depends, HTTPException, statusfrom fastapi.security import OAuth2PasswordRequestForm# 导入用户相关数据库方法from db.user import authenticate_user, UserModel# 导入依赖注入相关代码from dependencies.user import get_current_user# 导入用户相关schemafrom responses.base import get_response_param_error, get_responsefrom schemas.user import UserRegisterSchema, UserLoginSchema# 导入JWT相关的代码from utils.jwt import get_tokenfrom utils.password import get_passwordrouter = APIRouter()@router.post("/token")async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):    """用户登录获取Token接口"""    # 校验用户    auth_user = authenticate_user(form_data.username, form_data.password)    if not auth_user:        raise HTTPException(            status_code=status.HTTP_401_UNAUTHORIZED,            detail="用户名或密码错误",            headers={"WWW-Authenticate": "Bearer"},        )    # 生成token    data = {        "id": auth_user.id,        "username": auth_user.username,    }    token = get_token(data)    # 返回token和token类型    return {"access_token": token, "token_type": "bearer"}@router.post("/register")async def post_register(user: UserRegisterSchema):    """用户注册"""    if user.password != user.re_password:        return get_response_param_error("两次密码不一致")    password = get_password(user.password)    user_model = UserModel(username=user.username, password=password)    user_model.save()    return get_response()@router.post("/login")async def post_login(user: UserLoginSchema):    """用户登录"""    # 校验用户    auth_user = authenticate_user(**user.dict())    if not user:        return get_response_param_error("用户名或密码错误")    # 生成token    data = {        "id": auth_user.id,        "username": user.username,    }    token = get_token(data)    # 返回响应    result = {        "id": auth_user.id,        "username": user.username,        "token": token,    }    return get_response(data=result)@router.get("/userinfo")async def get_userinfo(current_user: UserModel = Depends(get_current_user)):    """获取用户信息"""    data = current_user.__data__    del data["password"]    return get_response(data=data)
初始化数据库脚本

目标

1、引入相关的数据库ORM模型
2、根据ORM模型创建相关表

完整代码

init_db.py
from config import dbfrom db.user import UserModeltables = [UserModel]db.db.create_tables(tables)
入口程序

目标

1、引入相关路由
2、注册相关路由

完整代码

main.py
from fastapi import FastAPIfrom routers import userapp = FastAPI()app.include_router(user.router)
启动程序

初始化数据库

python init_db.py
启动服务

uvicorn main:app
测试

访问文档进行测试:http://localhost:8000/docs




完整代码

目录结构





config配置相关

config/__init__.py

config/server.py

服务器相关配置:
# 接口前缀API_PREFIX = "/api/v1"
config/jwt.py

JWT Token相关配置:
# 秘钥:openssl rand -hex 32SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"# 加密算法ALGORITHM = "HS256"# JWT Token的过期时间ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 20
config/db.py

参数说明:

  • DB_HOST:数据库IP地址
  • DB_PORT:数据库服务端口号
  • DB_USER:数据库用户名
  • DB_PASSWORD:数据库密码
  • DB_DATABASE:数据库名
  • db_config:数据库配置信息

数据库相关配置:
DB_HOST = 'localhost'DB_PORT = 3306DB_USER = 'root'DB_PASSWORD = 'zhangdapeng520'DB_DATABASE = 'gaoxiang_shop_admin'db_config = {    'host': DB_HOST,    'port': DB_PORT,    'user': DB_USER,    'password': DB_PASSWORD,    'database': DB_DATABASE,}
utils工具相关

utils/__init__.py

utils/common.py

判断两个变量是否相等的方法:
def is_equal(a, b):    """判断是否相等"""    return a == b
utils/jwt.py

方法说明:

  • get_token:生成JWT Token字符串
  • parse_token:解析JWT Token字符串

代码:
from datetime import datetime, timedeltafrom jose import jwtfrom config.jwt import SECRET_KEY, ALGORITHM, ACCESS_TOKEN_EXPIRE_MINUTESdef get_token(data: dict, expires_minute: int = ACCESS_TOKEN_EXPIRE_MINUTES):    """    生成token    :param data: 要打包的数据    :param expires_delta: 过期时间    :return: JWT Token    """    to_encode = data.copy()    expire = datetime.utcnow() + timedelta(minutes=expires_minute)    to_encode.update({"exp": expire})    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)    return encoded_jwtdef parse_token(token)->dict:    """    解析Token    """    return jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
utils/password.py

方法说明:

  • get_password:密码加密
  • check_password:检查密码是否正确

代码:
from passlib.context import CryptContextpwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")def check_password(password_text, password_hash):    """密码校验"""    return pwd_context.verify(password_text, password_hash)def get_password(password_text):    """密码加密"""    return pwd_context.hash(password_text)
utils/response.py

方法说明:

  • get_response:获取通用的响应,默认是成功的响应信息
  • get_error_response:获取错误响应信息
  • get_param_error_response:获取参数错误响应信息
  • get_not_found_response:获取数据不存在响应信息
  • get_exitsts_response:获取数据已存在响应信息

代码:
def get_response(msg="success", code=10000, status=True, data=None):    """成功响应"""    response = {"status": status, "code": code, "msg": msg}    if data is not None:        response["data"] = data    return responsedef get_error_response(msg="服务器内部错误", code=10003):    """错误响应"""    return get_response(code=code, status=False, msg=msg)def get_param_error_response(msg):    """参数错误响应"""    return get_error_response(msg, code=10005)def get_not_found_response(msg):    """数据不存在错误响应"""    return get_error_response(msg, code=10004)def get_exists_response(msg):    """数据已存在错误响应"""    return get_error_response(msg, code=10006)
db数据库相关

db/__init__.py

方法说明:

  • init_db:初始化数据库

代码:
from db.user import Userfrom db.base import dbdef init_db():    tables = [User]    db.create_tables(tables)
db/base.py

核心功能:提供了断线重连机制,解决了MySQL重启后,API接口失效的问题。
import pymysqlfrom peewee import *from playhouse.shortcuts import ReconnectMixinfrom config.db import db_configpymysql.install_as_MySQLdb()# 同步数据库# 同步数据库断线重连类class ReconnectMySQLDatabase(ReconnectMixin, MySQLDatabase):    pass# 数据库实例db = ReconnectMySQLDatabase(**db_config)# 基础模型class BaseModel(Model):    class Meta:        database = dbdb.connect()
db/user.py

方法说明:

  • db_register:创建一个新的用户
  • db_get_user_by_username:根据用户名获取用户信息

代码:
from peewee import *from db.base import BaseModelfrom utils.password import get_password# 创建用户表class User(BaseModel):    username = CharField()    password = CharField()    role_id = IntegerField()def db_register(username,password,role_id=0):    """用户注册"""    hashed_password = get_password(password)    user = User(username=username,password=hashed_password, role_id=role_id)    user.save()def db_get_user_by_username(username):    """根据用户名获取用户信息"""    users = list(User.select().filter(User.username==username))    if len(users) > 0:        return users[0]    return None
schemas校验相关

schemas/__init__.py

schemas/user.py

对象说明:

  • LoginSchema:用户登录对象
  • RegisterSchema:用户注册对象
  • UpdatePasswordSchema:更新密码对象

代码:
from pydantic import BaseModelclass LoginSchema(BaseModel):    username: str    password: strclass RegisterSchema(LoginSchema):    re_password:strclass UpdatePasswordSchema(RegisterSchema):    old_password: str
dependes依赖相关

dependes/__init__.py

dependes/user.py

from fastapi import Depends,  HTTPException, status, Headerfrom fastapi.security import OAuth2PasswordBearerfrom jose import JWTErrorfrom utils.jwt import parse_tokenfrom db.user import db_get_user_by_usernamefrom config.server import API_PREFIX# OAuth2登录,文档登录oauth2_scheme = OAuth2PasswordBearer(tokenUrl=f"{API_PREFIX}/token")def get_user(token):    """根据Token解析登录用户信息"""    credentials_exception = HTTPException(        status_code=status.HTTP_401_UNAUTHORIZED,        detail="权限校验失败",        headers={"WWW-Authenticate": "Bearer"},    )        if not token:        raise credentials_exception        try:        # 解析token        payload = parse_token(token)        # 校验用户名        username: str = payload.get("username")        if username is None:            raise credentials_exception    except JWTError:        raise credentials_exception    # 获取用户    user = None    try:        user = db_get_user_by_username(username)        if user is None:            raise credentials_exception    except Exception as e:        print(e)        raise credentials_exception    return userdef get_login_user(token: str = Depends(oauth2_scheme)):    """获取登录用户"""    return get_user(token)def get_json_login_user(zdppy_jwt_token: str = Header()):    """JSON接口获取登录用户"""    return get_user(zdppy_jwt_token)
routers路由相关

routers/__init__.py

routers/user.py

from fastapi import APIRouter, Dependsfrom fastapi.security import OAuth2PasswordRequestFormfrom db.user import db_register, db_get_user_by_usernamefrom utils.common import is_equalfrom utils.password import check_passwordfrom utils.response import *from schemas.user import RegisterSchema, LoginSchema, UpdatePasswordSchemafrom config.jwt import ACCESS_TOKEN_EXPIRE_MINUTESfrom utils.jwt import get_tokenfrom utils.password import get_passwordfrom depends.user import get_login_userrouter = APIRouter()# 注册接口@router.post("/register", summary="用户注册")async def register(schema: RegisterSchema):    """    注册接口    """    # 校验两次密码是否一致    if not is_equal(schema.password, schema.re_password):        return get_param_error_response("两次密码不一致")        # 校验用户名是否已存在    try:        user = db_get_user_by_username(schema.username)        if user is not None:            return get_exists_response("该用户已存在")    except Exception as e:        print(e)        return get_error_response(msg="连接MySQL服务失败")        # 新增用户    try:        db_register(schema.username, schema.password)        return get_response()    except Exception as e:        print(e)    return get_error_response()def get_user_data(username, password):    # 获取用户    user = None    try:        user = db_get_user_by_username(username)        if user is None:            return get_not_found_response("该用户不存在")    except Exception as e:        print(e)        return get_error_response(msg="连接MySQL服务失败")    # 校验密码是否正确    if not check_password(password, user.password):        return get_param_error_response("用户名或密码错误")    # 生成token    data = {        "id": user.id,        "username": user.username,        "role_id": user.role_id,    }    data["token"] = get_token(data=data)    return get_response(data = data)@router.post("/token", summary="Form登录")async def login_form(form_data: OAuth2PasswordRequestForm = Depends()):    response = get_user_data(form_data.username,form_data.password)    data = response.get("data")    return {"access_token": data.get("token"), "token_type": "bearer"}@router.post("/login", summary="JSON登录")async def login_json(json_data: LoginSchema):    return get_user_data(json_data.username,json_data.password)@router.put("/password", summary="密码修改")async def put_password(schema: UpdatePasswordSchema):    # 获取用户get_login_user    user = None    try:        user = db_get_user_by_username(schema.username)        if user is None:            return get_not_found_response("该用户不存在")    except Exception as e:        print(e)        return get_error_response(msg="连接MySQL服务失败")        # 校验旧密码    if not check_password(schema.old_password, user.password):        return get_param_error_response("用户名或密码错误")        # 校验两次密码是否一致    if not is_equal(schema.password, schema.re_password):        return get_param_error_response("两次密码不一致")        # 修改密码    user.password = get_password(schema.password)    try:        user.save()    except Exception as e:        print(e)        return get_error_response(msg="连接MySQL服务失败")        return get_response()@router.get("/userinfo", summary="获取用户信息")async def get_userinfo(user=Depends(get_login_user)):    """    获取用户信息    """    data = {        "id":user.id,        "username":user.username,        "role_id":user.role_id,    }    return get_response(data=data)
routers/goods.py

from fastapi import APIRouterfrom utils.response import *router = APIRouter()# 注册接口@router.get("/goods")async def get_goods(page:int=1, size:int=20):    """获取商品信息"""    return get_response()
入口

main.py

入口程序主要引入子路由并挂载到APP。
from fastapi import FastAPI, Dependsfrom config.server import API_PREFIXfrom depends.user import get_json_login_userfrom router.user import router as user_routerfrom router.goods import router as good_routerapp = FastAPI()app.include_router(user_router, prefix=API_PREFIX, tags=["用户管理"])app.include_router(good_router, prefix=API_PREFIX, tags=["商品管理"],                    dependencies=[Depends(get_json_login_user)])if __name__ == "__main__":    import uvicorn    uvicorn.run("main:app")
测试

启动服务

python main.py
访问接口文档:http://127.0.0.1:8000/docs

接口文档预览





总结

以上就是本文要分享的全部内容了,如果您对跨端开发小程序和APP感兴趣,想要查看更多的文章,欢迎关注我的公众号“Python私教”。若其他平台代码显示混乱,大家也可以通过公众号查看排版正确的源码哈。
打赏20元然后评论“已打赏”,可以获取本文的所有源码哦。
我是大鹏,专注于IT领域的编程知识分享,提供付费的个人IT技能提升服务,若有相关需求,欢迎留言或私信我。
咱们下篇文章再见~
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

Archiver|手机版|小黑屋|办公问答网

GMT+8, 2025-7-8 08:47 , Processed in 0.102819 second(s), 23 queries .

Powered by Discuz! X3.4

© 2001-2013 Comsenz Inc. Templated By 【未来科技 www.veikei.com】设计

快速回复 返回顶部 返回列表