Appearance
使用 SQLAlchemy 进行数据库操作
在 FastAPI 中使用 SQL 数据库可以使用多个 ORM 工具,例如 SQLAlchemy、Tortoise ORM 等,类似 Java 的 Mybatis 。这些 ORM 工具可以帮助我们方便地与关系型数据库进行交互,如 MySQL 、PostgreSQL等。本篇文章将介绍如何使用 SQLAlchemy 来完成数据库操作,以便让我们在 FastAPI 项目中方便地进行数据存储和查询。
介绍 SQLAlchemy
SQLAlchemy 官方文档 SQL工具包和对象关系映射器是一套用于处理数据库和Python的综合工具,俗称 ORM 工具,提供了灵活的数据模型定义和查询语法,支持多种数据库后端,例如:
- MySQL
- SQLite
- Oracle
- PostgreSQL
- Microsoft SQL Server
在 FastAPI 中使用 SQLAlchemy,可以通过安装 SQLAlchemy 和 相应的数据库连接驱动程序(如 mysqlclient、psycopg2 等)来连接数据库,然后使用 SQLAlchemy 提供的模型类定义数据表和字段,以及使用查询语法进行数据操作。
本篇文章中,我将以 MySQL 为例,实现 SQLAlchemy 的数据库进行连接以及操作。
ORM 具有在代码和数据库表中的对象之间转换的工具,简单来说就是将该数据表映射到项目代码中,然后你通常在 SQL 数据库中创建一个代表映射的类,该类的每个属性代表一个列,具有名称和类型。
安装 SQLAlchemy
pip install SQLAlchemy
目录结构
.
├── app
│ ├── controller # 控制层,主要存放各种 API 接口
│ │ ├── __init__.py
│ │ └── user.py # 用户相关接口
│ ├── core
│ │ ├── config
│ │ │ ├── __init__.py
│ │ │ ├── settings.py
│ │ │ └── settings.yaml
│ │ ├── db # 数据库相关文件
│ │ │ ├── __init__.py
│ │ │ └── sqlalchemy.py # sqlalchemy 文件,里面存放数据库引擎等
│ │ ├── depends
│ │ │ ├── __init__.py
│ │ │ ├── db.py # 数据库相关依赖
│ │ │ └── service.py # 服务层相关依赖
│ │ └── __init__.py
│ ├── model # 模型文件夹,跟数据库表一一对应
│ │ ├── __init__.py
│ │ └── user.py # 用户模型(用户表)
│ ├── repository # 数据层,主要存放直接操作数据库的接口
│ │ ├── __init__.py
│ │ └── user.py # 用户相关数据操作
│ ├── schemas # 存放各种个样的模型
│ │ ├── __init__.py
│ │ └── user.py # 用户相关模型,例如创建用户、更新用户等
│ ├── service # 服务层
│ │ ├── __init__.py
│ │ └── user.py # 用户相关服务层接口
│ └── __init__.py
├── README.md
└── main.py
数据库连接
涉及文件 app/core/db/sqlalchemy.py
安装相关依赖
由于是需要使用 MySQL ,并且需要异步支持,那么需要安装以下两个库:
pip install mysqlclient
pip install aiomysql
配置数据库连接
具体代码如下
python
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker, declarative_base
from app.core.config.settings import settings
# 创建异步数据库引擎
# echo=True 会打印所有的 SQL 语句
# future=True 会启用 SQLAlchemy 的异步功能
async_engine = create_async_engine(settings.mysql.url, echo=True, future=True)
# 创建异步数据库会话
# expire_on_commit=False 会关闭自动提交,这样就可以在事务中使用 session.commit() 了
# class_=AsyncSession 会启用 SQLAlchemy 的异步功能
SessionLocal = sessionmaker(async_engine, expire_on_commit=False, class_=AsyncSession)
# 创建基本映射类
Base = declarative_base()
数据库连接 URL
:settings.mysql.url
这里是直接读取了配置文件中的内容,后续更改只需要更改配置文件中的mysql.url
即可, 如果想使用其他数据库,例如 PostgreSQL 数据库,则需要设置为postgresql://user:password@postgresserver/db
。SessionLocal
:它是一个本地线程存储(thread-local storage)的单例类,用来创建数据库会话。简单来说,SessionLocal
类的主要作用是为了每个请求创建一个数据库会话,并且确保这个会话在整个请求期间都是唯一的。这样我们就可以在不同的函数中使用同一会话,从而避免了在不同函数中反复创建会话的麻烦。declarative_base()
:declarative_base() 是 SQLAlchemy 中提供的一个函数,用于创建一个基类,然后通过继承这个基类来定义数据表模型。它可以让我们更加方便地定义数据表模型,而不需要关注底层的SQL语句。具体作用:- 自动创建对应的数据表:我们定义了数据表模型之后,可以调用 create_all() 方法来创建对应的数据表。
- 自动映射数据表和类属性:我们只需要定义类属性,SQLAlchemy 可以自动将这些属性映射到对应的数据表字段。
- 提供了更加易读易懂的代码:使用 declarative_base() 可以让我们更加方便地定义类,使代码更加清晰易读。
创建模型
接下来便是创建和数据表映射的数据库模型以及 Pydantic 模型,数据库模型用以对接数据表,Pydantic 模型则用来作为响应模型(response_model)及请求体。
数据库模型
涉及文件 app/model/user.py
具体代码如下
python
from sqlalchemy import Column, BigInteger, String, Boolean
from app.core.db.sqlalchemy import Base
class UserModel(Base):
""" 用户表 """ # __tablename__ 表名
__tablename__ = 'user'
# id 字段,primary_key=True 为主键,autoincrement=True 自增,index=True 索引
id = Column(BigInteger, primary_key=True, autoincrement=True, index=True)
# username 字段,nullable=False 不为空,unique=True 唯一
username = Column(String(32), unique=True, nullable=False, index=True)
# hashed_password 字段,nullable=False 不为空
hashed_password = Column(String(255), nullable=False)
# is_active 字段,default=True 默认为 True is_active = Column(Boolean, default=True)
🌟 用 Base 类来创建 SQLAlchemy 模型
Base 类也就是数据库连接时的 declarative_base(),从 sqlalchemy(来自上面的 sqlalchemy.py 文件)导入 **Base,**那么它将自动映射数据表和类属性(原因在前边)。
🌟 tablename
该属性是给模型映射的数据表的名称,比如 User 类的 tablename 为 user,那么它映射的数据表名即为 user。
🌟 模型属性/列
python
id = Column(BigInteger, primary_key=True, autoincrement=True, index=True)
username = Column(String(32), unique=True, nullable=False, index=True)
hashed_password = Column(String(255), nullable=False)
Column 表示这些属性中的每一个都代表其相应数据库表中的一列,Column 中的第一个参数,如Integer、String 和 Boolean,它定义了数据库中的类型。
- primary_key =True 表示该字段为主键;
- autoincrement=Ture 表示自增字段;
- index=True 表示该字段为索引,提升查询性能;
- unique=True 则是唯一约束,以确保在 username 列中的每个值都是唯一的;
- default=True 表示is_active 的默认值为 True;
Pydantic 模型
涉及文件 app/scemas/user.py
具体代码如下
python
from pydantic import BaseModel
class UserBase(BaseModel):
""" 用户基础模型 """ username: str
is_active: bool
class UserCreate(UserBase):
""" 创建用户模型 """ hashed_password: str
class UserUpdate(UserBase):
""" 更新用户模型 """ hashed_password: str
class User(UserBase):
""" 用户模型 """ id: int
class Config:
orm_mode = True
注意,SQLAlchemy 模型和 Pydantic 声明属性的方式不一样,前者是 = ,而后者是 :。
🌟 orm_mode
此类 Config 用于为 Pydantic 提供配置。
python
class User(UserBase):
""" 用户模型 """ id: int
class Config:
orm_mode = True
Pydantic orm_mode 将告诉 Pydantic 模型读取数据,即它不是一个 dict,而是一个 ORM 模型。这样该 Pydantic 模型就会尝试从属性中获取它,如 id = data.id。
有了这个,Pydantic 模型与 ORM 兼容,您只需在路径操作 response_model 的参数中声明它,即可返回一个数据库模型,并从中读取数据。
SQLAlchemy 和许多其他默认情况下是延迟加载。这意味着,除非您尝试访问包含该数据的属性,否则它们不会从数据库中获取关系数据。
数据库操作
涉及文件 app/repository/user.py
、app/service/user.py
在这里将代码分为了 3 层,Controller
、Service
、Repository
。
Repository 层
这层是直接和数据库打交道,使用 SQLAlchemy 对数据库进行操作时,是需要引入 Session 数据库会话,还需要在函数中使用到之前定义好的数据库模型 app/model/user.py 中的 User (SQLAlchemy 模型)和 app/schemas/user.py 中需要用到模型 (Pydantic模型)。
以下是详细代码
python
from typing import Sequence
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, update, delete
from app.schemas.user import UserCreate, UserUpdate, User
from app.model.user import UserModel
class UserRepository:
""" 用户仓库类 """
def __init__(self, db_session: AsyncSession):
self.db_session = db_session
async def create_user(self, user: UserCreate) -> User:
""" 创建用户 """ db_user = UserModel(username=user.username, hashed_password=user.password)
self.db_session.add(db_user)
await self.db_session.commit()
await self.db_session.refresh(db_user)
return db_user
async def query_user(self, user_id: int) -> User:
""" 根据id查询用户 """ stmt = select(UserModel).where(UserModel.id == user_id)
result = await self.db_session.execute(stmt)
return result.scalars().first()
async def query_user_by_username(self, username: str) -> User:
""" 根据username查询用户"""
stmt = select(UserModel).where(UserModel.username == username)
result = await self.db_session.execute(stmt)
return result.scalars().first()
async def query_users(self, skip: int = 0, limit: int = 100) -> Sequence[User]:
""" 查询用户列表 """ stmt = select(UserModel).offset(skip).limit(limit)
result = await self.db_session.execute(stmt)
return result.scalars().all()
async def update_user(self, user_id: int, user: UserUpdate) -> User:
""" 更新用户 """ stmt = update(UserModel).where(UserModel.id == user_id).values(**user.dict(exclude_unset=True))
await self.db_session.execute(stmt)
await self.db_session.commit()
stmt = select(UserModel).where(UserModel.id == user_id)
result = await self.db_session.execute(stmt)
return result.scalars().first()
async def delete_user(self, user_id: int) -> None:
""" 删除用户(物理删除) """ smtm = delete(UserModel).where(UserModel.id == user_id)
await self.db_session.execute(smtm)
await self.db_session.commit()
Service 层
详细代码如下
python
from typing import Sequence
from sqlalchemy.ext.asyncio import AsyncSession
from app.schemas.user import UserCreate, User, UserUpdate
from app.repository.user import UserRepository
class UserService:
""" 用户服务类 """
def __init__(self, db_session: AsyncSession):
self.user_repository = UserRepository(db_session)
async def create_user(self, user: UserCreate) -> User:
return await self.user_repository.create_user(user)
async def query_user(self, user_id: int) -> User:
return await self.user_repository.query_user(user_id)
async def query_user_by_username(self, username: str) -> User:
return await self.user_repository.query_user_by_username(username)
async def query_users(self, skip: int = 0, limit: int = 100) -> Sequence[User]:
return await self.user_repository.query_users(skip, limit)
async def update_user(self, user_id: int, user: UserUpdate) -> User:
return await self.user_repository.update_user(user_id, user)
async def delete_user(self, user_id: int) -> None:
return await self.user_repository.delete_user(user_id)
Controller 层
python
from typing import Union, List
from fastapi import APIRouter, Depends
from app.schemas.user import User, UserCreate, UserUpdate
from app.core.depends.service import get_user_service
from app.service.user import UserService
class UserController:
router = APIRouter()
@staticmethod
@router.post('/users/', response_model=User)
async def create_user(user: UserCreate, user_service: UserService = Depends(get_user_service)):
return await user_service.create_user(user)
@staticmethod
@router.get('/users/id/{user_id}', response_model=Union[User, None])
async def get_user(user_id: int, user_service: UserService = Depends(get_user_service)):
return await user_service.query_user(user_id)
@staticmethod
@router.get('/users/username/{username}', response_model=Union[User, None])
async def get_user_by_username(username: str, user_service: UserService = Depends(get_user_service)):
return await user_service.query_user_by_username(username)
@staticmethod
@router.get('/users', response_model=Union[List[User], None])
async def get_users(skip: int = 0, limit: int = 100, user_service: UserService = Depends(get_user_service)):
return await user_service.query_users(skip, limit)
@staticmethod
@router.put('/users/{user_id}', response_model=User)
async def update_user(user_id: int, user: UserUpdate, user_service: UserService = Depends(get_user_service)):
return await user_service.update_user(user_id, user)
@staticmethod
@router.delete('/users/{user_id}')
async def delete_user(user_id: int, user_service: UserService = Depends(get_user_service)):
return await user_service.delete_user(user_id)
@classmethod
def get_router(cls):
return cls.router
项目运行
python
import os
from fastapi import FastAPI
from app.controller.user import UserController
app = FastAPI()
app.include_router(UserController.get_router(), prefix='/api')
if __name__ == '__main__':
# 设置 ENV_FOR_DYNACONF 为 development os.environ['ENV_FOR_DYNACONF'] = 'development'
# os.environ['ENV_FOR_DYNACONF'] = 'production'
import uvicorn
uvicorn.run('main:app', host='0.0.0.0', port=8000, reload=True)
查看文档
当项目运行后直接在浏览器输入 http://localhost:8000/docs
即可查看接口文档。
其他文件
涉及文件 app/core/depends/db.py
、 app/core/depends/service.py
在 depends 文件夹中主要负责存放依赖注入相关的文件,详细内容如下
python
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.db.sqlalchemy import SessionLocal
async def get_db_session() -> AsyncSession:
""" 获取数据库会话 """ db_session = SessionLocal()
try:
yield db_session
finally:
await db_session.close()
python
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.depends.db import get_db_session
from app.service.user import UserService
async def get_user_service(db_session: AsyncSession = Depends(get_db_session)) -> UserService:
""" 获取用户服务类 """
yield UserService(db_session)
数据库脚本
sql
CREATE TABLE `user` (
`id` BIGINT AUTO_INCREMENT PRIMARY KEY,
`username` VARCHAR(32) NOT NULL UNIQUE,
`hashed_password` VARCHAR(255) NOT NULL,
`is_active` BOOLEAN DEFAULT TRUE
);