Skip to content
On this page

使用 SQLAlchemy 进行数据库操作

在 FastAPI 中使用 SQL 数据库可以使用多个 ORM 工具,例如 SQLAlchemy、Tortoise ORM 等,类似 Java 的 Mybatis 。这些 ORM 工具可以帮助我们方便地与关系型数据库进行交互,如 MySQL 、PostgreSQL等。本篇文章将介绍如何使用 SQLAlchemy 来完成数据库操作,以便让我们在 FastAPI 项目中方便地进行数据存储和查询。

介绍 SQLAlchemy

SQLAlchemy 官方文档 SQL工具包和对象关系映射器是一套用于处理数据库和Python的综合工具,俗称 ORM 工具,提供了灵活的数据模型定义和查询语法,支持多种数据库后端,例如:

  • MySQL
  • SQLite
  • Oracle
  • PostgreSQL
  • Microsoft SQL Server

在 FastAPI 中使用 SQLAlchemy,可以通过安装 SQLAlchemy 和 相应的数据库连接驱动程序(如 mysqlclient、psycopg2 等)来连接数据库,然后使用 SQLAlchemy 提供的模型类定义数据表和字段,以及使用查询语法进行数据操作。

本篇文章中,我将以 MySQL 为例,实现 SQLAlchemy 的数据库进行连接以及操作。

ORM 具有在代码和数据库表中的对象之间转换的工具,简单来说就是将该数据表映射到项目代码中,然后你通常在 SQL 数据库中创建一个代表映射的类,该类的每个属性代表一个列,具有名称和类型。

安装 SQLAlchemy

pip install SQLAlchemy

目录结构

.
├── app
│   ├── controller # 控制层,主要存放各种 API 接口
│   │   ├── __init__.py
│   │   └── user.py # 用户相关接口
│   ├── core
│   │   ├── config
│   │   │   ├── __init__.py
│   │   │   ├── settings.py
│   │   │   └── settings.yaml
│   │   ├── db # 数据库相关文件
│   │   │   ├── __init__.py
│   │   │   └── sqlalchemy.py # sqlalchemy 文件,里面存放数据库引擎等
│   │   ├── depends
│   │   │   ├── __init__.py
│   │   │   ├── db.py # 数据库相关依赖
│   │   │   └── service.py # 服务层相关依赖
│   │   └── __init__.py
│   ├── model # 模型文件夹,跟数据库表一一对应
│   │   ├── __init__.py
│   │   └── user.py # 用户模型(用户表)
│   ├── repository # 数据层,主要存放直接操作数据库的接口
│   │   ├── __init__.py
│   │   └── user.py # 用户相关数据操作
│   ├── schemas # 存放各种个样的模型
│   │   ├── __init__.py
│   │   └── user.py # 用户相关模型,例如创建用户、更新用户等
│   ├── service # 服务层
│   │   ├── __init__.py
│   │   └── user.py # 用户相关服务层接口
│   └── __init__.py
├── README.md
└── main.py

数据库连接

涉及文件 app/core/db/sqlalchemy.py

安装相关依赖

由于是需要使用 MySQL ,并且需要异步支持,那么需要安装以下两个库:

  1. pip install mysqlclient
  2. pip install aiomysql

配置数据库连接

具体代码如下

python
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession  
from sqlalchemy.orm import sessionmaker, declarative_base  
  
from app.core.config.settings import settings  
  
# 创建异步数据库引擎  
# echo=True 会打印所有的 SQL 语句  
# future=True 会启用 SQLAlchemy 的异步功能  
async_engine = create_async_engine(settings.mysql.url, echo=True, future=True)  
  
# 创建异步数据库会话  
# expire_on_commit=False 会关闭自动提交,这样就可以在事务中使用 session.commit() 了  
# class_=AsyncSession 会启用 SQLAlchemy 的异步功能  
SessionLocal = sessionmaker(async_engine, expire_on_commit=False, class_=AsyncSession)  
  
# 创建基本映射类  
Base = declarative_base()
  • 数据库连接 URLsettings.mysql.url 这里是直接读取了配置文件中的内容,后续更改只需要更改配置文件中的 mysql.url 即可, 如果想使用其他数据库,例如 PostgreSQL 数据库,则需要设置为 postgresql://user:password@postgresserver/db

  • SessionLocal:它是一个本地线程存储(thread-local storage)的单例类,用来创建数据库会话。简单来说,SessionLocal 类的主要作用是为了每个请求创建一个数据库会话,并且确保这个会话在整个请求期间都是唯一的。这样我们就可以在不同的函数中使用同一会话,从而避免了在不同函数中反复创建会话的麻烦。

  • declarative_base():declarative_base() 是 SQLAlchemy 中提供的一个函数,用于创建一个基类,然后通过继承这个基类来定义数据表模型。它可以让我们更加方便地定义数据表模型,而不需要关注底层的SQL语句。具体作用:

    1. 自动创建对应的数据表:我们定义了数据表模型之后,可以调用 create_all() 方法来创建对应的数据表。
    2. 自动映射数据表和类属性:我们只需要定义类属性,SQLAlchemy 可以自动将这些属性映射到对应的数据表字段。
    3. 提供了更加易读易懂的代码:使用 declarative_base() 可以让我们更加方便地定义类,使代码更加清晰易读。

创建模型

接下来便是创建和数据表映射的数据库模型以及 Pydantic 模型数据库模型用以对接数据表,Pydantic 模型则用来作为响应模型(response_model)及请求体。

数据库模型

涉及文件 app/model/user.py

具体代码如下

python
from sqlalchemy import Column, BigInteger, String, Boolean  
  
from app.core.db.sqlalchemy import Base  
  
class UserModel(Base):  
    """ 用户表 """    # __tablename__ 表名  
    __tablename__ = 'user'  
    # id 字段,primary_key=True 为主键,autoincrement=True 自增,index=True 索引  
    id = Column(BigInteger, primary_key=True, autoincrement=True, index=True)  
    # username 字段,nullable=False 不为空,unique=True 唯一  
    username = Column(String(32), unique=True, nullable=False, index=True)  
    # hashed_password 字段,nullable=False 不为空  
    hashed_password = Column(String(255), nullable=False)  
    # is_active 字段,default=True 默认为 True    is_active = Column(Boolean, default=True)

🌟 用 Base 类来创建 SQLAlchemy 模型

Base 类也就是数据库连接时的 declarative_base(),从 sqlalchemy(来自上面的 sqlalchemy.py 文件)导入 **Base,**那么它将自动映射数据表和类属性(原因在前边)。

🌟 tablename

该属性是给模型映射的数据表的名称,比如 User 类的 tablename 为 user,那么它映射的数据表名即为 user

🌟 模型属性/列

python
id = Column(BigInteger, primary_key=True, autoincrement=True, index=True)  
username = Column(String(32), unique=True, nullable=False, index=True)  
hashed_password = Column(String(255), nullable=False)  

Column 表示这些属性中的每一个都代表其相应数据库表中的一列,Column 中的第一个参数,如Integer、String 和 Boolean,它定义了数据库中的类型。

  • primary_key =True 表示该字段为主键;
  • autoincrement=Ture 表示自增字段;
  • index=True 表示该字段为索引,提升查询性能;
  • unique=True 则是唯一约束,以确保在 username 列中的每个值都是唯一的;
  • default=True 表示is_active 的默认值为 True;

Pydantic 模型

涉及文件 app/scemas/user.py

具体代码如下

python
from pydantic import BaseModel  
  
  
class UserBase(BaseModel):  
    """ 用户基础模型 """    username: str  
    is_active: bool  
  
  
class UserCreate(UserBase):  
    """ 创建用户模型 """    hashed_password: str  
  
  
class UserUpdate(UserBase):  
    """ 更新用户模型 """    hashed_password: str  
  
  
class User(UserBase):  
    """ 用户模型 """    id: int  
  
    class Config:  
        orm_mode = True

注意,SQLAlchemy 模型和 Pydantic 声明属性的方式不一样,前者是 = ,而后者是 

🌟 orm_mode

此类 Config 用于为 Pydantic 提供配置。

python
class User(UserBase):  
    """ 用户模型 """    id: int  
  
    class Config:  
        orm_mode = True

Pydantic orm_mode 将告诉 Pydantic 模型读取数据,即它不是一个 dict,而是一个 ORM 模型。这样该 Pydantic 模型就会尝试从属性中获取它,如 id = data.id

有了这个,Pydantic 模型与 ORM 兼容,您只需在路径操作 response_model 的参数中声明它,即可返回一个数据库模型,并从中读取数据。

SQLAlchemy 和许多其他默认情况下是延迟加载。这意味着,除非您尝试访问包含该数据的属性,否则它们不会从数据库中获取关系数据。

数据库操作

涉及文件 app/repository/user.pyapp/service/user.py

在这里将代码分为了 3 层,ControllerServiceRepository

Repository 层

这层是直接和数据库打交道,使用 SQLAlchemy 对数据库进行操作时,是需要引入 Session 数据库会话,还需要在函数中使用到之前定义好的数据库模型 app/model/user.py 中的 User (SQLAlchemy 模型)和 app/schemas/user.py 中需要用到模型 (Pydantic模型)。

以下是详细代码

python
from typing import Sequence  
  
from sqlalchemy.ext.asyncio import AsyncSession  
from sqlalchemy import select, update, delete  
  
from app.schemas.user import UserCreate, UserUpdate, User  
from app.model.user import UserModel  
  
  
class UserRepository:  
    """ 用户仓库类 """  
    def __init__(self, db_session: AsyncSession):  
        self.db_session = db_session  
  
    async def create_user(self, user: UserCreate) -> User:  
        """ 创建用户 """        db_user = UserModel(username=user.username, hashed_password=user.password)  
        self.db_session.add(db_user)  
        await self.db_session.commit()  
        await self.db_session.refresh(db_user)  
        return db_user  
  
    async def query_user(self, user_id: int) -> User:  
        """ 根据id查询用户 """        stmt = select(UserModel).where(UserModel.id == user_id)  
        result = await self.db_session.execute(stmt)  
        return result.scalars().first()  
  
    async def query_user_by_username(self, username: str) -> User:  
        """ 根据username查询用户"""  
        stmt = select(UserModel).where(UserModel.username == username)  
        result = await self.db_session.execute(stmt)  
        return result.scalars().first()  
  
    async def query_users(self, skip: int = 0, limit: int = 100) -> Sequence[User]:  
        """ 查询用户列表 """        stmt = select(UserModel).offset(skip).limit(limit)  
        result = await self.db_session.execute(stmt)  
        return result.scalars().all()  
  
    async def update_user(self, user_id: int, user: UserUpdate) -> User:  
        """ 更新用户 """        stmt = update(UserModel).where(UserModel.id == user_id).values(**user.dict(exclude_unset=True))  
        await self.db_session.execute(stmt)  
        await self.db_session.commit()  
        stmt = select(UserModel).where(UserModel.id == user_id)  
        result = await self.db_session.execute(stmt)  
        return result.scalars().first()  
  
    async def delete_user(self, user_id: int) -> None:  
        """ 删除用户(物理删除) """        smtm = delete(UserModel).where(UserModel.id == user_id)  
        await self.db_session.execute(smtm)  
        await self.db_session.commit()

Service 层

详细代码如下

python
from typing import Sequence  
  
from sqlalchemy.ext.asyncio import AsyncSession  
  
from app.schemas.user import UserCreate, User, UserUpdate  
from app.repository.user import UserRepository  
  
  
class UserService:  
    """ 用户服务类 """  
    def __init__(self, db_session: AsyncSession):  
        self.user_repository = UserRepository(db_session)  
  
    async def create_user(self, user: UserCreate) -> User:  
        return await self.user_repository.create_user(user)  
  
    async def query_user(self, user_id: int) -> User:  
        return await self.user_repository.query_user(user_id)  
  
    async def query_user_by_username(self, username: str) -> User:  
        return await self.user_repository.query_user_by_username(username)  
  
    async def query_users(self, skip: int = 0, limit: int = 100) -> Sequence[User]:  
        return await self.user_repository.query_users(skip, limit)  
  
    async def update_user(self, user_id: int, user: UserUpdate) -> User:  
        return await self.user_repository.update_user(user_id, user)  
  
    async def delete_user(self, user_id: int) -> None:  
        return await self.user_repository.delete_user(user_id)

Controller 层

python
from typing import Union, List  
  
from fastapi import APIRouter, Depends  
  
from app.schemas.user import User, UserCreate, UserUpdate  
from app.core.depends.service import get_user_service  
from app.service.user import UserService  
  
  
class UserController:  
    router = APIRouter()  
  
    @staticmethod  
    @router.post('/users/', response_model=User)  
    async def create_user(user: UserCreate, user_service: UserService = Depends(get_user_service)):  
        return await user_service.create_user(user)  
  
    @staticmethod  
    @router.get('/users/id/{user_id}', response_model=Union[User, None])  
    async def get_user(user_id: int, user_service: UserService = Depends(get_user_service)):  
        return await user_service.query_user(user_id)  
  
    @staticmethod  
    @router.get('/users/username/{username}', response_model=Union[User, None])  
    async def get_user_by_username(username: str, user_service: UserService = Depends(get_user_service)):  
        return await user_service.query_user_by_username(username)  
  
    @staticmethod  
    @router.get('/users', response_model=Union[List[User], None])  
    async def get_users(skip: int = 0, limit: int = 100, user_service: UserService = Depends(get_user_service)):  
        return await user_service.query_users(skip, limit)  
  
    @staticmethod  
    @router.put('/users/{user_id}', response_model=User)  
    async def update_user(user_id: int, user: UserUpdate, user_service: UserService = Depends(get_user_service)):  
        return await user_service.update_user(user_id, user)  
  
    @staticmethod  
    @router.delete('/users/{user_id}')  
    async def delete_user(user_id: int, user_service: UserService = Depends(get_user_service)):  
        return await user_service.delete_user(user_id)  
  
    @classmethod  
    def get_router(cls):  
        return cls.router

项目运行

python
import os  
  
from fastapi import FastAPI  
  
from app.controller.user import UserController  
  
app = FastAPI()  
  
app.include_router(UserController.get_router(), prefix='/api')  
  
if __name__ == '__main__':  
    # 设置 ENV_FOR_DYNACONF 为 development    os.environ['ENV_FOR_DYNACONF'] = 'development'  
    # os.environ['ENV_FOR_DYNACONF'] = 'production'  
  
    import uvicorn  
  
    uvicorn.run('main:app', host='0.0.0.0', port=8000, reload=True)

查看文档

当项目运行后直接在浏览器输入 http://localhost:8000/docs 即可查看接口文档。

其他文件

涉及文件 app/core/depends/db.pyapp/core/depends/service.py

depends 文件夹中主要负责存放依赖注入相关的文件,详细内容如下

db.py

python
from sqlalchemy.ext.asyncio import AsyncSession  
  
from app.core.db.sqlalchemy import SessionLocal  
  
  
async def get_db_session() -> AsyncSession:  
    """ 获取数据库会话 """    db_session = SessionLocal()  
    try:  
        yield db_session  
    finally:  
        await db_session.close()

service.py

python
from fastapi import Depends  
from sqlalchemy.ext.asyncio import AsyncSession  
  
from app.core.depends.db import get_db_session  
  
from app.service.user import UserService  
  
  
async def get_user_service(db_session: AsyncSession = Depends(get_db_session)) -> UserService:  
    """ 获取用户服务类 """    
    yield UserService(db_session)

数据库脚本

sql
CREATE TABLE `user` (  
    `id` BIGINT AUTO_INCREMENT PRIMARY KEY,  
    `username` VARCHAR(32) NOT NULL UNIQUE,  
    `hashed_password` VARCHAR(255) NOT NULL,  
    `is_active` BOOLEAN DEFAULT TRUE  
);

Released under the MIT License.