Python 下用 SQLAlchemy 库连接 PostgreSQL 数据库
原来 Heroku 的 PostgreSQL 是免费的,刚好在学《数据库基础》,顺路看看怎么用关系型数据库。
依赖
FastAPI :RESTful 的 API 框架
uvicorn:HTTP Server,之前用惯 gunicorn,应该差不多
SQLAlchemy:数据库操作
psycopg2:PostgreSQL 驱动(Driver)
Alembic:用于做 ORM 模型与数据库的迁移与映射(?
1 pip install fastapi uvicorn SQLAlchemy psycopg2-binary alembic
FastAPI 在 src 文件夹下创建 main.py。
1 2 3 from fastapi import FastAPIapp = FastAPI()
启动数据库 视频中,作者懒得记启动数据库的指令,于是用 Makefile 帮忙记住。
1 2 run-db: docker run --name youtube_postgres -p 5432:5432 -e POSTGRES_PASSWORD=mysuperpassword -e POSTGRES_DB=youtube -v ${PWD}/db_data:/var/lib/postgresql/data -d postgres
然后之后就用 make run-db
启动即可。相当于创建了一个 alias,有点像 npm 的 npm run serve
。
连接到数据库 接下来是 database.py 文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 from sqlalchemy import create_enginefrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy.orm import sessionmakerSQLALCHEMY_DATABASE_URL = "postgresql://postgres:mysuperpassword@localhost/youtube" engine = create_engine(SQLALCHEMY_DATABASE_URL) SessionLocal = sessionmaker(autocommit=False , autoflush=False , bind=engine) Base = declarative_base() def get_db (): db = SessionLocal() try : yield db except : db.close()
创建数据库 Model models.py
1 2 3 4 5 6 7 8 9 10 from sqlalchemy import Integer, Stringfrom sqlalchemy.sql.schema import Columnfrom .database import Baseclass Job (Base ): __tablename__ = 'jobs' id = Column(Integer, primary_key=True ) title = Column(String, nullable=False ) description = Column(String, nullable=False )
接下来我们要用 migration 工具(也就是 Alembic)来修改(Update)数据库(即创建表)。
使用 Alembic 执行:
1 2 alembic init alembic alembic revision -m "init"
编辑 alembic.ini
,修改 sqlalchemy.url
配置项。【从环境变量中获取数据库链接,详见下方小章节】
修改 alembic/versions/xxx_init.py。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 def upgrade (): op.create_table( 'jobs' , sa.Column('id' , sa.Integer, primary_key=True ), sa.Column('title' , sa.String, nullable=False ), sa.Column('description' , sa.String, nullable=False ) ) def downgrade (): op.drop_table('jobs' )
执行:
以后要更新表列 执行:
1 alembic revision -m "update_msg"
修改对应的 py 文件:
1 2 3 4 5 6 7 8 9 10 def upgrade (): op.add_column('jobs' , sa.Column('userId' , sa.Integer, nullable=False )) op.alter_column('users' , 'id' , nullable=False , new_column_name='userId' ) def downgrade (): op.drop_column('jobs' , 'userId' ) op.alter_column('users' , 'userId' , nullable=False , new_column_name='id' )
执行:
从环境变量中获取数据库 URL alembic/env.py,将
改为(其实就是加了两行)
1 2 3 4 5 config = context.config import osconfig.set_section_option( config.config_ini_section, "sqlalchemy.url" , os.environ.get('DATABASE_URL' ) )
创建 CRUD APIs 创建 Schemas schemas.py
1 2 3 4 5 from pydantic import BaseModelclass CreateJobRequest (BaseModel ): title: str description: str
增加 FastAPI 配置 main.py
fastapi.Depends:Allow us to use dependency injection on the getdb function to make sure it’s provided each time the endpoint is hit.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 from fastapi import FastAPI, Dependsfrom sqlalchemy.orm import Sessionfrom .schemas import CreateJobRequestfrom .database import get_dbfrom .models import Jobapp = FastAPI() @app.post("/" ) def create (details: CreateJobRequest, db: Session = Depends(get_db ) ): to_create = Job( title=details.title, description=details.description ) db.add(to_create) db.commit() return { "success" : True , "created_id" : to_create.id } @app.get("/" ) def get_by_id (id : int , db: Session = Depends(get_db ) ): return db.query(Job).filter (Job.id == id ).first() @app.delete("/" ) def delete (id : int , db: Session = Depends(get_db ) ): db.query(Job).filter (Job.id == id ).delete() db.commit() return { "success" : True }
运行 1 uvicorn src.main:app --reload
测试 API 创建一条记录:
1 curl --request POST --data '{"title: "Hello", "description": "World"}' localhost:8000
获取一条记录:
1 curl --request GET "localhost:8000?id=1"
删除一条记录:
1 curl --request DELETE "localhost:8000?id=1"
Heroku 配置 参考 用 GitHub Actions 把项目部署到 Heroku - 酱瓜
1 web: uvicorn src.main:app --host=0.0.0.0 --port=${PORT:-5000}
参考