Python 下用 SQLAlchemy 库连接 PostgreSQL 数据库

文章目录
  1. 1. 依赖
  2. 2. FastAPI
  3. 3. 启动数据库
  4. 4. 连接到数据库
  5. 5. 创建数据库 Model
  6. 6. 使用 Alembic
    1. 6.1. 以后要更新表列
    2. 6.2. 从环境变量中获取数据库 URL
  7. 7. 创建 CRUD APIs
    1. 7.1. 创建 Schemas
    2. 7.2. 增加 FastAPI 配置
  8. 8. 运行
    1. 8.1. 测试 API
  9. 9. Heroku 配置
  10. 10. 参考

原来 Heroku 的 PostgreSQL 是免费的,刚好在学《数据库基础》,顺路看看怎么用关系型数据库。

依赖

  • FastAPI:RESTful 的 API 框架
  • uvicorn:HTTP Server,之前用惯 gunicorn,应该差不多
  • SQLAlchemy:数据库操作
  • psycopg2:PostgreSQL 驱动(Driver)
  • Alembic:用于做 ORM 模型与数据库的迁移与映射(?
1
pip install fastapi uvicorn SQLAlchemy psycopg2-binary alembic

FastAPI

在 src 文件夹下创建 main.py。

1
2
3
from fastapi import FastAPI

app = FastAPI()

启动数据库

视频中,作者懒得记启动数据库的指令,于是用 Makefile 帮忙记住。

1
2
run-db:
docker run --name youtube_postgres -p 5432:5432 -e POSTGRES_PASSWORD=mysuperpassword -e POSTGRES_DB=youtube -v ${PWD}/db_data:/var/lib/postgresql/data -d postgres

然后之后就用 make run-db 启动即可。相当于创建了一个 alias,有点像 npm 的 npm run serve

连接到数据库

接下来是 database.py 文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

SQLALCHEMY_DATABASE_URL = "postgresql://postgres:[email protected]/youtube"

# 初始化
engine = create_engine(SQLALCHEMY_DATABASE_URL)
# 连接到数据库
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

# 创建 declarative_base 实例,为了在后面使用 database models
Base = declarative_base()

def get_db():
db = SessionLocal()
try:
yield db
except:
db.close()

创建数据库 Model

models.py

1
2
3
4
5
6
7
8
9
10
from sqlalchemy import Integer, String
from sqlalchemy.sql.schema import Column
from .database import Base

class Job(Base):
__tablename__ = 'jobs'

id = Column(Integer, primary_key=True)
title = Column(String, nullable=False)
description = Column(String, nullable=False)

接下来我们要用 migration 工具(也就是 Alembic)来修改(Update)数据库(即创建表)。

使用 Alembic

执行:

1
2
alembic init alembic
alembic revision -m "init"

编辑 alembic.ini,修改 sqlalchemy.url 配置项。【从环境变量中获取数据库链接,详见下方小章节】

修改 alembic/versions/xxx_init.py。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# ...

# 进行 migration 的动作
def upgrade():
op.create_table(
'jobs',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('title', sa.String, nullable=False),
sa.Column('description', sa.String, nullable=False)
)

# 撤销 migration 的动作
def downgrade():
op.drop_table('jobs')

执行:

1
alembic upgrade head

以后要更新表列

执行:

1
alembic revision -m "update_msg"

修改对应的 py 文件:

1
2
3
4
5
6
7
8
9
10
def upgrade():
# 新增一列
op.add_column('jobs', sa.Column('userId', sa.Integer, nullable=False))
# 改列名
op.alter_column('users', 'id', nullable=False, new_column_name='userId')

# 回退修改
def downgrade():
op.drop_column('jobs', 'userId')
op.alter_column('users', 'userId', nullable=False, new_column_name='id')

执行:

1
alembic upgrade head

从环境变量中获取数据库 URL

alembic/env.py,将

1
config = context.config

改为(其实就是加了两行)

1
2
3
4
5
config = context.config
import os
config.set_section_option(
config.config_ini_section, "sqlalchemy.url", os.environ.get('DATABASE_URL')
)

创建 CRUD APIs

创建 Schemas

schemas.py

1
2
3
4
5
from pydantic import BaseModel

class CreateJobRequest(BaseModel):
title: str
description: str

增加 FastAPI 配置

main.py

fastapi.Depends:Allow us to use dependency injection on the getdb function to make sure it’s provided each time the endpoint is hit.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from fastapi import FastAPI, Depends
from sqlalchemy.orm import Session
from .schemas import CreateJobRequest
from .database import get_db
from .models import Job

app = FastAPI()

@app.post("/")
def create(details: CreateJobRequest, db: Session = Depends(get_db)):
to_create = Job(
title=details.title,
description=details.description
)
db.add(to_create)
db.commit()
return {
"success": True,
"created_id": to_create.id
}

@app.get("/")
def get_by_id(id: int, db: Session = Depends(get_db)):
return db.query(Job).filter(Job.id == id).first()

@app.delete("/")
def delete(id: int, db: Session = Depends(get_db)):
db.query(Job).filter(Job.id == id).delete()
db.commit()
return { "success": True }

运行

1
uvicorn src.main:app --reload

测试 API

创建一条记录:

1
curl --request POST --data '{"title: "Hello", "description": "World"}' localhost:8000

获取一条记录:

1
curl --request GET "localhost:8000?id=1"

删除一条记录:

1
curl --request DELETE "localhost:8000?id=1"

Heroku 配置

参考 用 GitHub Actions 把项目部署到 Heroku - 酱瓜

1
web: uvicorn src.main:app --host=0.0.0.0 --port=${PORT:-5000}

参考