# fastapp_app **Repository Path**: shadaileng/fastapp_app ## Basic Information - **Project Name**: fastapp_app - **Description**: No description available - **Primary Language**: Python - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2024-11-03 - **Last Updated**: 2025-02-11 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README ## FastAPI Demo ### 1. 安装 ```shell pip install "fastapi[all]" ``` ### 2. 创建项目 ```shell mkdir fastapi cd fastapi touch main.py ``` ### 3. 编写代码 ```python from fastapi import FastAPI app = FastAPI() @app.get("/") def read_root(): return {"Hello": "World"} ``` ### 4. 启动项目 ```shell uvicorn main:app --reload ``` ### 5. 测试 ## 使用uv构建项目 ### 1. 安装`uv` ```bash # ubuntu-22.04全局安装需要使用--break-system-packages强制安装 # 或者在虚拟环境中安装uv pip insatll uv ``` ### 2. 创建项目 ```bash mkdir fastapi && cd $_ uv init --app uv add fastapi ``` ### 3. 编写代码 ```python from fastapi import FastAPI app = FastAPI() @app.get("/") def read_root(): return {"Hello": "World"} ``` ### 4. 启动项目 ```bash uv run fastapi dev ``` ## 配置 ### 1. 配置文件 ```python from pydantic import BaseSettings class Settings(BaseSettings): app_name: str = "FastAPI Demo" app_version: str = "1.0.0" app_debug: bool = True class Config: env_file = ".env" env_file_encoding = "utf-8" settings = Settings() ``` ### 2. 读取配置文件 ```python from fastapi import FastAPI from config import settings app = FastAPI() @app.get("/") def read_root(): return {"Hello": settings.app_name} ``` ### 3. 创建环境变量文件 ```shell touch .env ``` ### 4. 添加环境变量 ```shell APP_NAME="FastAPI Demo" APP_VERSION="1.0.0" APP_DEBUG=True ``` ### 5. 启动项目 ```shell uvicorn main:app --reload ``` ### 6. 测试 ## 跨域 ### 1. 安装 ```shell pip install fastapi-cors ``` ### 2. 编写代码 ```python from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware app = FastAPI() app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.get("/") def read_root(): return {"Hello": "World"} ``` ### 3. 启动项目 ```shell uvicorn main:app --reload ``` ### 4. 测试 ## 统一返回格式 ### 1. 安装 ```shell pip install pydantic ``` ### 2. 编写代码 ```python from typing import TypeVar, Generic, Any from pydantic import BaseModel, Field # 定义一个 TypeVar,用于表示泛型类型 T = TypeVar('T', int, str, dict, list, BaseModel) class ResponseModel(BaseModel, Generic[T]): code: int = Field(default=200, description="状态码") success: bool = Field(default=True, description="是否成功") message: str = Field(default=True, description="信息") data: T = Field(default=None, description="数据") class Config: from_attributes = True class Page(BaseModel, Generic[T]): pageNo: int = Field(default=1, description="页码") pageSize: int = Field(default=10, description="每页条数") totalPage: int = Field(default=1, description="总页数") totalRows: int = Field(default=0, description="总条数") rows: list[T] = Field(default=[], description="数据") class Config: from_attributes = True class ResponsePage(ResponseModel, Generic[T]): data: Page[T] = Field(default=None, description="数据") class Config: from_attributes = True def resp(success: bool, code: int, message: str, data: dict={})->ResponseModel: return ResponseModel(code=code, success=success, message=message, data=data) def success(code: int=0, message: str="操作成功", data: dict={})->ResponseModel: return resp(True, code, message, data) def page(page_no=1, page_size=10, total_page = 0, total_rows=0, data: list=[])->ResponseModel: data = Page.from_orm(dict(pageNo=page_no, pageSize=page_size, totalPage=total_page, totalRows=total_rows, rows=data)) return ResponsePage(code=200, success=True, message="查询成功", data=data) def fail(code: int=-1, message: str="操作失败", data: dict={})->ResponseModel: return resp(False, code, message, data) @app.get("/", response_model=ResponseModel) def read_root(): return success(data={"Hello": "World"}) @app.get("/user", response_model=ResponsePage) def read_root(pageNo:int=1, pageSize:int=10, db: Session = Depends(get_db)): skip = (pageNo - 1) * pageSize total_rows = db.query(SysUser).count() users = db.query(SysUser).offset(skip).limit(pageSize).all() users = [User.from_orm(user) for user in users] total_page = total_rows // pageSize if total_rows % pageSize else total_rows // pageSize + 1 return page(page_no=pageNo, page_size=pageSize, total_page = total_page, total_rows=total_rows, data=users) ``` ### 3. 启动项目 ```shell uvicorn main:app --reload ``` ### 4. 测试 ## 全局异常处理 ```python # -*- coding:utf-8 -*- import uvicorn from fastapi import FastAPI, APIRouter, Request, status from fastapi.encoders import jsonable_encoder from fastapi.exceptions import RequestValidationError from starlette.exceptions import HTTPException as StarletteHTTPException from fastapi.responses import JSONResponse app = FastAPI( openapi_url=settings.SWAGGER_UI_OPENAPI_URL, docs_url=settings.SWAGGER_UI_DOCS_URL, redoc_url=settings.SWAGGER_UI_REDOC_RL, ) async def exception_handlers(request:Request, exc: Exception): return JSONResponse({ "code": 500, "message": str(exc), "success": False, "data": None, }, 200) async def http_exception_handler(request: Request, exc: StarletteHTTPException): return JSONResponse({ "code": exc.status_code, "message": str(exc.detail), "success": False, "data": None, }, 200) async def validation_exception_handler(request: Request, exc: RequestValidationError): return JSONResponse({ "code": 422, "message": jsonable_encoder({"detail": exc.errors(), "body": exc.body}), "success": False, "data": None, }, 200) app.add_exception_handler(StarletteHTTPException, http_exception_handler) app.add_exception_handler(RequestValidationError, validation_exception_handler) app.add_exception_handler(500, exception_handlers) if __name__ == '__main__': uvicorn.run(app='main:app', host='0.0.0.0', port=8000, reload=settings.RELOAD) ``` ## 数据库 ### 1. 安装 ```shell pip install sqlalchemy pip install databases pip install asyncpg ``` ### 2. 编写代码 ```python from fastapi import FastAPI from databases import Database from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String app = FastAPI() DATABASE_URL = "postgresql://user:password@localhost:5432/database" database = Database(DATABASE_URL) metadata = MetaData() users = Table( "users", metadata, Column("id", Integer, primary_key=True), Column("name", String), Column("email", String), ) engine = create_engine(DATABASE_URL) @app.on_event("startup") async def startup(): await database.connect() @app.on_event("shutdown") async def shutdown(): await database.disconnect() @app.get("/") async def read_root(): query = users.select() return await database.fetch_all(query) ``` ### 3. 启动项目 ```shell uvicorn main:app --reload ``` ### 4. 测试 ## jwt认证 1. 安装依赖 ```bash uv add bcrypt==4.0.1 passlib[bcrypt]>=1.7.4 pyjwt>=2.10.1 ``` 2. 编写代码 ```python from fastapi import Depends, HTTPException, status, APIRouter from fastapi.security import OAuth2PasswordBearer from fastapi.responses import JSONResponse from pydantic import BaseModel from passlib.context import CryptContext from jwt.exceptions import InvalidTokenError import jwt from datetime import datetime, timedelta oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/login") JWT_SECRET_KEY = "your_secret_key" JWT_ALGORITHM = "HS256" # 密码 pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") def password_hash(password: str): ''' 密码加密 ''' return pwd_context.hash(password) def virify_password(plain_password, hashed_password): ''' 验证密码 plain_password: 明文密码 hashed_password: 加密后的密码 ''' return pwd_context.verify(plain_password, hashed_password) def create_token(data: dict, expires_delta: int | None = None): ''' 创建token ''' to_encode_data = data.copy() if expires_delta: expire = datetime.utcnow() + timedelta(minutes=expires_delta) else: expire = datetime.utcnow() + timedelta(minutes=15) to_encode_data.update({"exp": expire}) return jwt.encode(to_encode_data, JWT_SECRET_KEY, algorithm=JWT_ALGORITHM) def create_access_token(data: dict): ''' 创建 access token ''' return create_token(data, expires_delta=10) def create_refresh_token(data: dict): ''' 创建 refresh token ''' return create_token(data, expires_delta=7600) async def get_token_data(token: str = Depends(oauth2_scheme)): ''' 获取token数据 ''' try: return jwt.decode(token, JWT_SECRET_KEY, algorithms=[JWT_ALGORITHM]) except jwt.ExpiredSignatureError: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="凭证已证过期", headers={"WWW-Authenticate": f"Bearer {token}"}, ) except jwt.InvalidTokenError: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="无效凭证", headers={"WWW-Authenticate": f"Bearer {token}"}, ) except (PyJWTError, ValidationError): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="无效凭证", headers={"WWW-Authenticate": f"Bearer {token}"}, ) router = APIRouter(prefix="/auth") class User(BaseModel): username: str password: str fake_db = { "user1": User(username="user1", password=password_hash("password1")), "user2": User(username="user2", password=password_hash("password2")) } @router.get("/password", response_model=str) async def password_test(password:str, token_data: dict=Depends(get_token_data)): db_user = fake_db.get(token_data["sub"]) if db_user is None: return "not login" return password_hash(password) @router.post("/oauth2", summary="登录") async def login(form_data: OAuth2PasswordRequestForm = Depends()): db_user = fake_db.get(form_data.username) if db_user is None: # raise HTTPException(status_code=400, detail="Incorrect username or password") return JSONResponse(status_code=400, content={"message": "Incorrect username or password"}) if not virify_password(form_data.password, db_user.password): # raise HTTPException(status_code=400, detail="Incorrect username or password") return JSONResponse(status_code=400, content={"message": "Incorrect username or password"}) access_token = create_access_token({"sub": form_data.username}) refresh_token = create_refresh_token({"sub": form_data.username}) # return success(data={"access_token": access_token, "refresh_token": refresh_token, "token_type": "bearer"}) return {"access_token": access_token, "refresh_token": refresh_token, "token_type": "bearer"} @router.get("/refresh", summary="token 刷新") async def refresh_token(token_data: dict=Depends(get_token_data), repo: UserRepository = Depends(get_user_repo)): db_user = fake_db.get(token_data["sub"]) if db_user is None: raise HTTPException(status_code=400, detail="Incorrect username or password") access_token = create_access_token({"sub": form_data.username}) refresh_token = create_refresh_token({"sub": form_data.username}) # return success(data={"access_token": access_token, "refresh_token": refresh_token, "token_type": "bearer"}) return {"access_token": access_token, "refresh_token": refresh_token, "token_type": "bearer"} ``` ## redis缓存 ### 1. 安装依赖 ```bash uv add redis ``` ### 2. 配置 ```python import redis.asyncio as redis from redis.exceptions import ConnectionError, TimeoutError redis_pool = redis.ConnectionPool( host="127.0.0.1", # Redis服务器的地址 port=6789, # Redis服务器的端口 password="password", # Redis服务器的密码 db=0, # Redis数据库的索引 decode_responses=True, # 解码响应 encoding="utf-8" # 编码请求 ) async def redis_connect(): try: client = redis.Redis(connection_pool=redis_pool) if await client.ping(): print("Redis connected") return client except (ConnectionError): raise Exception("Redis connection error") except (TimeoutError): raise Exception("Redis connect timeout") except Exception as e: raise Exception(f"Redis error: {e}") ``` ### 3. 通过Lifespan(生命周期事件)加载服务 ```python from fastapi import FastAPI from fastapi_lifespan import LifespanManager from contextlib import asynccontextmanager app = FastAPI() @asynccontextmanager async def lifespan(app: FastAPI): print("FastAPI 启动中") app.state.redis = await redis_connect() yield print("FastAPI 已启动") await app.state.redis.close() print("FastAPI 已停止") @app.get("/redis") async def redis_get(): value = await app.state.redis.get("key") if value is None: value = "value" sleep(5) await app.state.redis.set("key", ) return value if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=8000, lifespan=lifespan) ``` ## 数据库 ### 使用SQLAlchemy #### 1. 安装依赖 ```shell uv add sqlalchemy, pymysql ``` #### 2. 创建数据库连接(mysql) ```python # -*- coding:utf-8 -*- from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker SessionLocal = None Base = declarative_base() isInit = False engine = None async def register_db(dsn="mysql+pymysql://root:123456@127.0.0.1:3306/test", echo=False): global isInit, SessionLocal, engine if isInit: return isInit = True engine = create_engine( dsn, echo=echo, isolation_level="READ COMMITTED" ) SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) # Base.metadata.create_all(bind=engine) # Dependency def get_db(): session = SessionLocal() try: yield session session.commit() except: session.rollback() raise finally: session.close() ``` #### 3. 创建数据库模型 ```python # -*- coding:utf-8 -*- from sqlalchemy import Column, Integer, String from sqlalchemy.orm import Session from .db import Base, get_db class User(Base): __tablename__ = "user" id = Column(Integer, primary_key=True, index=True) name = Column(String, index=True) ``` #### 4. 创建路由 ```python # -*- coding:utf-8 -*- from fastapi import APIRouter, Depends from sqlalchemy.orm import Session from . import crud, schemas from .db import get_db router = APIRouter() @router.post("/users/", response_model=schemas.User) def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)): return crud.create_user(db=db, user=user) @router.get("/users/{user_id}", response_model=schemas.User) def read_user(user_id: int, db: Session = Depends(get_db)): return crud.get_user(db=db, user_id=user_id) ``` ## 定时任务 ## 异步任务 ## 邮件发送 ### 1. 安装依赖 ```bash uv add fastapi_mail ``` ### 2. 配置 ```python # -*- coding:utf-8 -*- from fastapi_mail import FastMail, ConnectionConfig conf = ConnectionConfig( MAIL_USERNAME = "test@qq.com" # 邮箱账号 MAIL_PASSWORD = "test" # 邮箱密码, 注意不是邮箱登录密码,而是授权码 MAIL_FROM = "test@qq.com" # 发送者邮箱, 注意不是邮箱账号 MAIL_PORT = 465 # 邮箱端口, 587/465(SSL) MAIL_SERVER = "smtp.qq.com" # 邮箱服务器 MAIL_SSL_TLS = True # 是否使用SSL: 465 MAIL_STARTTLS = False # 是否使用TLS: 587 USE_CREDENTIALS = True # 是否使用认证 VALIDATE_CERTS = False # 是否验证证书 ) fm = FastMail(conf) ``` ### 3. 发送邮件 ```python # -*- coding:utf-8 -*- from fastapi_mail import FastMail, ConnectionConfig, MessageSchema async def send_email(): message = MessageSchema( subject="Test", recipients=["xxx"], # List of recipients, as many as you can pass body="Test email", subtype="plain" ) await fm.send_message(message) ``` ## 验证 ### 验证码 ```python # -*- coding:utf-8 -*- import uuid from pydantic import BaseModel, EmailStr from fastapi import APIRouter, Depends, HTTPException, Security, Request, BackgroundTasks from core.response import success_page, ResponsePage, ResponseModel, success from app.logger import getLogger from app.config import settings class VerifyEmailCode(BaseModel): email: EmailStr code: str @router.get("/code", response_model=ResponseModel[str], summary="生成验证码") async def generate_code(request: Request, email: EmailStr): code = str(uuid.uuid4().int)[:4] await request.app.state.redis.set(f'code:{email}', code, ex=60) return success(data=code, message="验证码生成成功") @router.post("/verify", response_model=ResponseModel[bool], summary="验证验证码") async def verify_code(request: Request, email_code: VerifyEmailCode): redis_code = await request.app.state.redis.get(f'code:{email_code.email}') print(redis_code) if redis_code is None: raise HTTPException(status_code=400, detail="验证码已过期") if redis_code != email_code.code: raise HTTPException(status_code=400, detail="验证码错误") return success(data=True, message="验证码验证成功") ``` ### 激活验证 #### 安装依赖 ```bash uv add itsdangerous ``` #### 原理 ```python from fastapi import APIRouter, Request, HTTPException from itsdangerous import URLSafeTimedSerializer, SignatureExpired, BadSignature router = APIRouter() serializer = URLSafeTimedSerializer(settings.SECRET_KEY) @router.post("/generate_link", response_model=ResponseModel[str], summary="生成激活链接") async def generate_link(email: EmailStr): token = serializer.dumps({'email': email}, salt="activate") # 将token存入redis # 发送邮件 return success(data=token) @router.get("/activate/{token}", response_model=ResponseModel[str], summary="激活账号") async def verify_link(token: str): try: data = serializer.loads(token, salt="activate", max_age=3600) except SignatureExpired: raise HTTPException(status_code=400, detail="链接已过期") except BadSignature: raise HTTPException(status_code=400, detail="链接无效") # TODO: 激活账号 return success(data=data['email'], message="激活成功") ``` #### 验证 ```python from fastapi import APIRouter, Depends, HTTPException, Request, BackgroundTasks from core.response import ResponseModel, success from app.models.request.user import User, UserCreate, from app.repository.user import UserRepository from app.service.auth import password_hash from app.service.verify import generate_link, verify_link from app.service.email import send_message, Email router = APIRouter(prefix="/user") @router.post("/register", response_model=ResponseModel[User], summary="用户注册") async def do_register(user: UserCreate, request: Request, tasks: BackgroundTasks, repo: UserRepository = Depends(UserRepository)): check_user = await repo.get_user_by_username(user.account) if check_user is not None: raise HTTPException(status_code=400, detail="User already exists") check_user = await repo.get_user_by_email(user.email) if check_user is not None: raise HTTPException(status_code=400, detail="User already exists") check_user = await repo.get_user_by_mobile(user.mobile) if check_user is not None: raise HTTPException(status_code=400, detail="User already exists") user.password = password_hash(user.password) user = await repo.save(user, create_user=request.state.user_id if hasattr(request.state, "user_id") else None) link = await generate_link(user.email) # 将token存入redis # 发送邮件 tasks.add_task(send_message, Email(subject="激活账号", addresses=[user.email], content=link)) return success(data=User.from_orm(user)) ``` ## 初始账号 1. 使用系统邀请码注册成为管理员