|
前言
学无止境,无止境学。坚持每天学点编程知识,坚持每天写点小文章,坚持每天进步一点点。大家好,我是张大鹏,喜欢学习和分享,希望能够通过此公众号,将自己学到的东西分享给大家,和大家一起交流,一起成长,一起进步。
乾坤未定,你我皆是黑马。大鹏一日同风起,扶摇直上九万里。宝剑锋从磨砺出,梅花香自苦寒来。不积跬步,无以至千里,不积小流无以成江海。
如果有多余的时间,就坚持学习吧,小时候学习改变命运,长大了学习丰富内涵,老了学习带来宁静。活到老,学到老,我能行!
课程介绍
收费说明
《FastAPI零基础入门班》收费说明:
- 时长:10节课
- 价格:1999元
- 上课方式:腾讯会议小班教学
- 联系电话(同微信):18010070052
课程大纲
本篇文章是《FastAPI零基础入门班》的第5篇文章,前面还有:
- 《FastAPI快速入门》
- 《FastAPI实现商品管理API》
- 《FastAPI整合MySQL》
- 《FastAPI实现OAuth2和JWT登录》
- 《FastAPI组件化开发》
基本实现
目录结构
创建如下的目录结构:

安装依赖
pip install fastapi[all]pip install uvicorn[standard]pip install python-jose[cryptography]pip install passlib[bcrypt]pip install peeweepip install pymysql
配置
目标
1、引入pymysql并创建MySQL的引擎
2、引入peewee的连接方法
3、创建MySQL的数据库连接
实现方案
1、创建引擎
import pymysqlpymysql.install_as_MySQLdb()
2、创建MySQL连接
import pymysqldb = connect('mysql://root:root@localhost:3306/gaoxiang_shop_admin')
完整代码
config/db.py
import pymysqlfrom playhouse.db_url import connectpymysql.install_as_MySQLdb()db = connect('mysql://root:root@localhost:3306/gaoxiang_shop_admin')
密码工具模块
目标
1、实现密码加密的方法
2、实现校验密码是否正确的方法
实现方案
1、实现密码加密的方法
from passlib.context import CryptContext# 密码加密对象pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")def get_password(password): """密码加密""" return pwd_context.hash(password)
2、实现校验密码是否正确的方法
def check_password(plain_password, hashed_password): """校验密码是否正确""" return pwd_context.verify(plain_password, hashed_password)
完整代码
from passlib.context import CryptContextpwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")def check_password(password_text, password_hash): """密码校验""" return pwd_context.verify(password_text, password_hash)def get_password(password_text): """密码加密""" return pwd_context.hash(password_text)
用户模型
目标
1、创建用户表的ORM模型
2、创建根据用户名查询用户的方法
3、创建校验用户名和密码是否正确的方法
实现方案
1、创建用户表的ORM模型
from peewee import *from config import dbfrom utils.password import check_passwordclass UserModel(Model): username = CharField() password = CharField() class Meta: database = db.db db_table = "user"
2、创建根据用户名查询用户的方法
def get_user(username: str): """从数据库获取用户信息""" try: return UserModel.get(UserModel.username == username) except Exception as e: print(e)
3、创建校验用户名和密码是否正确的方法
def authenticate_user(username: str, password: str): """校验用户""" # 从数据库获取用户 user = get_user(username) # 校验用户是否存在 if user is None: return False # 校验用户密码是否正确 if not check_password(password, user.password): return False # 校验通过,返回数据库查到的用户信息 return user
完整代码
db/user.py
from peewee import *from config import dbfrom utils.password import check_passwordclass UserModel(Model): username = CharField() password = CharField() class Meta: database = db.db db_table = "user"def get_user(username: str): """从数据库获取用户信息""" try: return UserModel.get(UserModel.username == username) except Exception as e: print(e)def authenticate_user(username: str, password: str): """校验用户""" # 从数据库获取用户 user = get_user(username) # 校验用户是否存在 if user is None: return False # 校验用户密码是否正确 if not check_password(password, user.password): return False # 校验通过,返回数据库查到的用户信息 return user
用户依赖模块
目标
1、确保用户要登录以后才能够访问到Doc文档的带权限接口
2、确保token能够解析出用户信息
实现方案
1、确保用户要登录以后才能够访问到Doc文档的带权限接口
from fastapi import Depends, HTTPException, statusfrom fastapi.security import OAuth2PasswordBearerfrom jose import JWTError, jwtfrom db.user import get_userfrom utils.jwt import SECRET_KEY, ALGORITHM# 文档OAuth2登录oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
2、确保token能够解析出用户信息
async def get_current_user(token: str = Depends(oauth2_scheme)): """获取当前登录用户""" # 异常对象 credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="权限校验失败", headers={"WWW-Authenticate": "Bearer"}, ) username: str = "" try: # 解析客户端传过来的JWT Token payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) # 获取用户名 username = payload.get("username") # 校验用户名是否存在 if username is None: raise credentials_exception except JWTError: raise credentials_exception # 从数据库获取用户 user = get_user(username=username) # 校验用户是否存在 if user is None: raise credentials_exception # 返回获取到的用户 return user
完整代码
dependencies/user.py
from fastapi import Depends, HTTPException, statusfrom fastapi.security import OAuth2PasswordBearerfrom jose import JWTError, jwtfrom db.user import get_userfrom utils.jwt import SECRET_KEY, ALGORITHM# 文档OAuth2登录oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")async def get_current_user(token: str = Depends(oauth2_scheme)): """获取当前登录用户""" # 异常对象 credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="权限校验失败", headers={"WWW-Authenticate": "Bearer"}, ) username: str = "" try: # 解析客户端传过来的JWT Token payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) # 获取用户名 username = payload.get("username") # 校验用户名是否存在 if username is None: raise credentials_exception except JWTError: raise credentials_exception # 从数据库获取用户 user = get_user(username=username) # 校验用户是否存在 if user is None: raise credentials_exception # 返回获取到的用户 return user
用户请求Schema
目标
1、规范用户登录接口的请求
2、规范用户注册的请求
实现方案
1、规范用户登录接口的请求
from pydantic import BaseModelclass UserSchema(BaseModel): """用户模型""" username: str # 用户名class UserLoginSchema(UserSchema): """用户登录模型""" password: str # 密码
2、规范用户注册的请求
class UserRegisterSchema(UserLoginSchema): """用户注册模型""" re_password: str # 确认密码
完整代码
schemas/user.py
from pydantic import BaseModelclass UserSchema(BaseModel): """用户模型""" username: str # 用户名class UserLoginSchema(UserSchema): """用户登录模型""" password: str # 密码class UserRegisterSchema(UserLoginSchema): """用户注册模型""" re_password: str # 确认密码
JWT工具模块
目标
1、创建JWT Token
2、解析JWT Token
实现方案
1、创建JWT Token
from datetime import timedelta, datetimefrom typing import Unionfrom jose import jwt# 生成私钥的命令:openssl rand -hex 32SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM = "HS256" # 加密算法ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 3 # token过期时间(分钟)def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None): """创建JWT Token""" # 提交要打包的token内部数据 to_encode = data.copy() # 设置token的过期时间ta if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=15) to_encode.update({"exp": expire}) # 生成JWT Token encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) # 返回JWT Token return encoded_jwt
2、解析JWT Token
def get_token(data: dict = {}): """创建token""" access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data=data, expires_delta=access_token_expires ) return access_token
完整代码
utils/jwt.py
from datetime import timedelta, datetimefrom typing import Unionfrom jose import jwt# 生成私钥的命令:openssl rand -hex 32SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM = "HS256" # 加密算法ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 3 # token过期时间(分钟)def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None): """创建JWT Token""" # 提交要打包的token内部数据 to_encode = data.copy() # 设置token的过期时间ta if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=15) to_encode.update({"exp": expire}) # 生成JWT Token encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) # 返回JWT Token return encoded_jwtdef get_token(data: dict = {}): """创建token""" access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data=data, expires_delta=access_token_expires ) return access_token
用户相关接口
目标
1、实现用户注册接口
2、使用Oauth2用户登录接口
3、实现JSON用户登录接口
4、实现获取用户信息接口
实现方案
1、实现用户注册接口
from fastapi import APIRouterfrom fastapi import Depends, HTTPException, statusfrom fastapi.security import OAuth2PasswordRequestForm# 导入用户相关数据库方法from db.user import authenticate_user, UserModel# 导入依赖注入相关代码from dependencies.user import get_current_user# 导入用户相关schemafrom responses.base import get_response_param_error, get_responsefrom schemas.user import UserRegisterSchema, UserLoginSchema# 导入JWT相关的代码from utils.jwt import get_tokenfrom utils.password import get_passwordrouter = APIRouter()@router.post("/register")async def post_register(user: UserRegisterSchema): """用户注册""" if user.password != user.re_password: return get_response_param_error("两次密码不一致") password = get_password(user.password) user_model = UserModel(username=user.username, password=password) user_model.save() return get_response()
2、使用Oauth2用户登录接口
@router.post("/token")async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()): """用户登录获取Token接口""" # 校验用户 auth_user = authenticate_user(form_data.username, form_data.password) if not auth_user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="用户名或密码错误", headers={"WWW-Authenticate": "Bearer"}, ) # 生成token data = { "id": auth_user.id, "username": auth_user.username, } token = get_token(data) # 返回token和token类型 return {"access_token": token, "token_type": "bearer"}
3、实现JSON用户登录接口
@router.post("/login")async def post_login(user: UserLoginSchema): """用户登录""" # 校验用户 auth_user = authenticate_user(**user.dict()) if not user: return get_response_param_error("用户名或密码错误") # 生成token data = { "id": auth_user.id, "username": user.username, } token = get_token(data) # 返回响应 result = { "id": auth_user.id, "username": user.username, "token": token, } return get_response(data=result)
4、实现获取用户信息接口
@router.get("/userinfo")async def get_userinfo(current_user: UserModel = Depends(get_current_user)): """获取用户信息""" data = current_user.__data__ del data["password"] return get_response(data=data)
完整代码
routers/user.py
from fastapi import APIRouterfrom fastapi import Depends, HTTPException, statusfrom fastapi.security import OAuth2PasswordRequestForm# 导入用户相关数据库方法from db.user import authenticate_user, UserModel# 导入依赖注入相关代码from dependencies.user import get_current_user# 导入用户相关schemafrom responses.base import get_response_param_error, get_responsefrom schemas.user import UserRegisterSchema, UserLoginSchema# 导入JWT相关的代码from utils.jwt import get_tokenfrom utils.password import get_passwordrouter = APIRouter()@router.post("/token")async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()): """用户登录获取Token接口""" # 校验用户 auth_user = authenticate_user(form_data.username, form_data.password) if not auth_user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="用户名或密码错误", headers={"WWW-Authenticate": "Bearer"}, ) # 生成token data = { "id": auth_user.id, "username": auth_user.username, } token = get_token(data) # 返回token和token类型 return {"access_token": token, "token_type": "bearer"}@router.post("/register")async def post_register(user: UserRegisterSchema): """用户注册""" if user.password != user.re_password: return get_response_param_error("两次密码不一致") password = get_password(user.password) user_model = UserModel(username=user.username, password=password) user_model.save() return get_response()@router.post("/login")async def post_login(user: UserLoginSchema): """用户登录""" # 校验用户 auth_user = authenticate_user(**user.dict()) if not user: return get_response_param_error("用户名或密码错误") # 生成token data = { "id": auth_user.id, "username": user.username, } token = get_token(data) # 返回响应 result = { "id": auth_user.id, "username": user.username, "token": token, } return get_response(data=result)@router.get("/userinfo")async def get_userinfo(current_user: UserModel = Depends(get_current_user)): """获取用户信息""" data = current_user.__data__ del data["password"] return get_response(data=data)
初始化数据库脚本
目标
1、引入相关的数据库ORM模型
2、根据ORM模型创建相关表
完整代码
init_db.py
from config import dbfrom db.user import UserModeltables = [UserModel]db.db.create_tables(tables)
入口程序
目标
1、引入相关路由
2、注册相关路由
完整代码
main.py
from fastapi import FastAPIfrom routers import userapp = FastAPI()app.include_router(user.router)
启动程序
初始化数据库
python init_db.py
启动服务
uvicorn main:app
测试
访问文档进行测试:http://localhost:8000/docs

完整代码
目录结构

config配置相关
config/__init__.py
config/server.py
服务器相关配置:
# 接口前缀API_PREFIX = "/api/v1"
config/jwt.py
JWT Token相关配置:
# 秘钥:openssl rand -hex 32SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"# 加密算法ALGORITHM = "HS256"# JWT Token的过期时间ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 20
config/db.py
参数说明:
- DB_HOST:数据库IP地址
- DB_PORT:数据库服务端口号
- DB_USER:数据库用户名
- DB_PASSWORD:数据库密码
- DB_DATABASE:数据库名
- db_config:数据库配置信息
数据库相关配置:
DB_HOST = 'localhost'DB_PORT = 3306DB_USER = 'root'DB_PASSWORD = 'zhangdapeng520'DB_DATABASE = 'gaoxiang_shop_admin'db_config = { 'host': DB_HOST, 'port': DB_PORT, 'user': DB_USER, 'password': DB_PASSWORD, 'database': DB_DATABASE,}
utils工具相关
utils/__init__.py
utils/common.py
判断两个变量是否相等的方法:
def is_equal(a, b): """判断是否相等""" return a == b
utils/jwt.py
方法说明:
- get_token:生成JWT Token字符串
- parse_token:解析JWT Token字符串
代码:
from datetime import datetime, timedeltafrom jose import jwtfrom config.jwt import SECRET_KEY, ALGORITHM, ACCESS_TOKEN_EXPIRE_MINUTESdef get_token(data: dict, expires_minute: int = ACCESS_TOKEN_EXPIRE_MINUTES): """ 生成token :param data: 要打包的数据 :param expires_delta: 过期时间 :return: JWT Token """ to_encode = data.copy() expire = datetime.utcnow() + timedelta(minutes=expires_minute) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwtdef parse_token(token)->dict: """ 解析Token """ return jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
utils/password.py
方法说明:
- get_password:密码加密
- check_password:检查密码是否正确
代码:
from passlib.context import CryptContextpwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")def check_password(password_text, password_hash): """密码校验""" return pwd_context.verify(password_text, password_hash)def get_password(password_text): """密码加密""" return pwd_context.hash(password_text)
utils/response.py
方法说明:
- get_response:获取通用的响应,默认是成功的响应信息
- get_error_response:获取错误响应信息
- get_param_error_response:获取参数错误响应信息
- get_not_found_response:获取数据不存在响应信息
- get_exitsts_response:获取数据已存在响应信息
代码:
def get_response(msg="success", code=10000, status=True, data=None): """成功响应""" response = {"status": status, "code": code, "msg": msg} if data is not None: response["data"] = data return responsedef get_error_response(msg="服务器内部错误", code=10003): """错误响应""" return get_response(code=code, status=False, msg=msg)def get_param_error_response(msg): """参数错误响应""" return get_error_response(msg, code=10005)def get_not_found_response(msg): """数据不存在错误响应""" return get_error_response(msg, code=10004)def get_exists_response(msg): """数据已存在错误响应""" return get_error_response(msg, code=10006)
db数据库相关
db/__init__.py
方法说明:
代码:
from db.user import Userfrom db.base import dbdef init_db(): tables = [User] db.create_tables(tables)
db/base.py
核心功能:提供了断线重连机制,解决了MySQL重启后,API接口失效的问题。
import pymysqlfrom peewee import *from playhouse.shortcuts import ReconnectMixinfrom config.db import db_configpymysql.install_as_MySQLdb()# 同步数据库# 同步数据库断线重连类class ReconnectMySQLDatabase(ReconnectMixin, MySQLDatabase): pass# 数据库实例db = ReconnectMySQLDatabase(**db_config)# 基础模型class BaseModel(Model): class Meta: database = dbdb.connect()
db/user.py
方法说明:
- db_register:创建一个新的用户
- db_get_user_by_username:根据用户名获取用户信息
代码:
from peewee import *from db.base import BaseModelfrom utils.password import get_password# 创建用户表class User(BaseModel): username = CharField() password = CharField() role_id = IntegerField()def db_register(username,password,role_id=0): """用户注册""" hashed_password = get_password(password) user = User(username=username,password=hashed_password, role_id=role_id) user.save()def db_get_user_by_username(username): """根据用户名获取用户信息""" users = list(User.select().filter(User.username==username)) if len(users) > 0: return users[0] return None
schemas校验相关
schemas/__init__.py
schemas/user.py
对象说明:
- LoginSchema:用户登录对象
- RegisterSchema:用户注册对象
- UpdatePasswordSchema:更新密码对象
代码:
from pydantic import BaseModelclass LoginSchema(BaseModel): username: str password: strclass RegisterSchema(LoginSchema): re_password:strclass UpdatePasswordSchema(RegisterSchema): old_password: str
dependes依赖相关
dependes/__init__.py
dependes/user.py
from fastapi import Depends, HTTPException, status, Headerfrom fastapi.security import OAuth2PasswordBearerfrom jose import JWTErrorfrom utils.jwt import parse_tokenfrom db.user import db_get_user_by_usernamefrom config.server import API_PREFIX# OAuth2登录,文档登录oauth2_scheme = OAuth2PasswordBearer(tokenUrl=f"{API_PREFIX}/token")def get_user(token): """根据Token解析登录用户信息""" credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="权限校验失败", headers={"WWW-Authenticate": "Bearer"}, ) if not token: raise credentials_exception try: # 解析token payload = parse_token(token) # 校验用户名 username: str = payload.get("username") if username is None: raise credentials_exception except JWTError: raise credentials_exception # 获取用户 user = None try: user = db_get_user_by_username(username) if user is None: raise credentials_exception except Exception as e: print(e) raise credentials_exception return userdef get_login_user(token: str = Depends(oauth2_scheme)): """获取登录用户""" return get_user(token)def get_json_login_user(zdppy_jwt_token: str = Header()): """JSON接口获取登录用户""" return get_user(zdppy_jwt_token)
routers路由相关
routers/__init__.py
routers/user.py
from fastapi import APIRouter, Dependsfrom fastapi.security import OAuth2PasswordRequestFormfrom db.user import db_register, db_get_user_by_usernamefrom utils.common import is_equalfrom utils.password import check_passwordfrom utils.response import *from schemas.user import RegisterSchema, LoginSchema, UpdatePasswordSchemafrom config.jwt import ACCESS_TOKEN_EXPIRE_MINUTESfrom utils.jwt import get_tokenfrom utils.password import get_passwordfrom depends.user import get_login_userrouter = APIRouter()# 注册接口@router.post("/register", summary="用户注册")async def register(schema: RegisterSchema): """ 注册接口 """ # 校验两次密码是否一致 if not is_equal(schema.password, schema.re_password): return get_param_error_response("两次密码不一致") # 校验用户名是否已存在 try: user = db_get_user_by_username(schema.username) if user is not None: return get_exists_response("该用户已存在") except Exception as e: print(e) return get_error_response(msg="连接MySQL服务失败") # 新增用户 try: db_register(schema.username, schema.password) return get_response() except Exception as e: print(e) return get_error_response()def get_user_data(username, password): # 获取用户 user = None try: user = db_get_user_by_username(username) if user is None: return get_not_found_response("该用户不存在") except Exception as e: print(e) return get_error_response(msg="连接MySQL服务失败") # 校验密码是否正确 if not check_password(password, user.password): return get_param_error_response("用户名或密码错误") # 生成token data = { "id": user.id, "username": user.username, "role_id": user.role_id, } data["token"] = get_token(data=data) return get_response(data = data)@router.post("/token", summary="Form登录")async def login_form(form_data: OAuth2PasswordRequestForm = Depends()): response = get_user_data(form_data.username,form_data.password) data = response.get("data") return {"access_token": data.get("token"), "token_type": "bearer"}@router.post("/login", summary="JSON登录")async def login_json(json_data: LoginSchema): return get_user_data(json_data.username,json_data.password)@router.put("/password", summary="密码修改")async def put_password(schema: UpdatePasswordSchema): # 获取用户get_login_user user = None try: user = db_get_user_by_username(schema.username) if user is None: return get_not_found_response("该用户不存在") except Exception as e: print(e) return get_error_response(msg="连接MySQL服务失败") # 校验旧密码 if not check_password(schema.old_password, user.password): return get_param_error_response("用户名或密码错误") # 校验两次密码是否一致 if not is_equal(schema.password, schema.re_password): return get_param_error_response("两次密码不一致") # 修改密码 user.password = get_password(schema.password) try: user.save() except Exception as e: print(e) return get_error_response(msg="连接MySQL服务失败") return get_response()@router.get("/userinfo", summary="获取用户信息")async def get_userinfo(user=Depends(get_login_user)): """ 获取用户信息 """ data = { "id":user.id, "username":user.username, "role_id":user.role_id, } return get_response(data=data)
routers/goods.py
from fastapi import APIRouterfrom utils.response import *router = APIRouter()# 注册接口@router.get("/goods")async def get_goods(page:int=1, size:int=20): """获取商品信息""" return get_response()
入口
main.py
入口程序主要引入子路由并挂载到APP。
from fastapi import FastAPI, Dependsfrom config.server import API_PREFIXfrom depends.user import get_json_login_userfrom router.user import router as user_routerfrom router.goods import router as good_routerapp = FastAPI()app.include_router(user_router, prefix=API_PREFIX, tags=["用户管理"])app.include_router(good_router, prefix=API_PREFIX, tags=["商品管理"], dependencies=[Depends(get_json_login_user)])if __name__ == "__main__": import uvicorn uvicorn.run("main:app")
测试
启动服务
python main.py
访问接口文档:http://127.0.0.1:8000/docs
接口文档预览

总结
以上就是本文要分享的全部内容了,如果您对跨端开发小程序和APP感兴趣,想要查看更多的文章,欢迎关注我的公众号“Python私教”。若其他平台代码显示混乱,大家也可以通过公众号查看排版正确的源码哈。
打赏20元然后评论“已打赏”,可以获取本文的所有源码哦。
我是大鹏,专注于IT领域的编程知识分享,提供付费的个人IT技能提升服务,若有相关需求,欢迎留言或私信我。
咱们下篇文章再见~ |
|