基于FastAPI和SQLAIchemy实现CRUD接口
本文介绍了一个基于FastAPI和SQLite的完整CRUD应用实现。系统采用前后端分离架构,包含用户认证(登录/注册/修改)和人员信息管理(增删改查)功能。后端使用FastAPI框架搭建RESTful API,通过SQLAlchemy ORM操作SQLite数据库,并采用Pydantic进行数据验证。前端通过requests库实现API调用,提供命令行交互界面。文章详细说明了环境配置、数据库模型
1、安装依赖库
pip install fastapi sqlachemy databases uvicorn
SQLite无需额外驱动,Python已经内置支持
2、导入模块
import uvicorn
from fastapi import FastAPI,HTTPException
from pydantic import BaseModel
from sqlalchemy import create_engine,Column,Integer,String
from sqlalchemy.orm import sessionmaker,declarative_base
2.1 uvicorn
UVicorn 是一个 ASGI(Asynchronous Server Gateway Interface)服务器,用于运行异步 Python 网络应用。它专门为 FastAPI 等异步框架提供高性能的服务器支持,能够处理高并发请求。
2.2 fastapi
FastAPI 是一个现代、快速(高性能)的 Python Web 框架,用于构建 API。它基于标准 Python 类型提示,支持异步请求处理,并自动生成交互式 API 文档(如 Swagger UI 和 ReDoc)。
2.3 pydantic
Pydantic 是一个数据验证和设置管理的库,使用 Python 类型注解进行数据校验。在 FastAPI 中,Pydantic 的 BaseModel 用于定义请求和响应的数据模型,确保输入输出的数据符合预期格式。
2.4 sqlalchemy
SQLAlchemy 是 Python 中一个强大的 ORM(对象关系映射)工具库,用于与数据库交互。它允许通过 Python 类和对象操作数据库,而不需要直接编写 SQL 语句。
2.5 sqlalchemy.create_engine
create_engine 用于创建数据库引擎实例,连接数据库。它接受数据库连接字符串作为参数,并管理数据库连接池。
2.6 sqlalchemy.Column
Column 用于定义数据库表中的列。通过指定列的类型(如 Integer、String)和其他属性(如主键、唯一性约束),可以映射 Python 类属性到数据库表的列。
2.7 sqlalchemy.Integer 和 sqlalchemy.String
Integer 和 String 是 SQLAlchemy 提供的列类型,分别对应数据库中的整数和字符串类型。它们用于定义表中列的数据类型。
2.8 sqlalchemy.orm.sessionmaker
sessionmaker 是一个工厂函数,用于生成数据库会话(Session)类。会话是 SQLAlchemy 与数据库交互的主要接口,用于执行查询、插入、更新和删除操作。
2.9 sqlalchemy.orm.declarative_base
declarative_base 是一个基类工厂函数,用于创建 ORM 模型的基类。通过继承这个基类,可以定义数据库表的 Python 类模型,实现类与表的映射关系。
3、初始化FastAPI应用
app = FastAPI()
创建一个FastAPI应用实例
4、数据库配置
DATABASE_URL = "sqlite:///./my_school.db"
# 创建数据库连接
engine = create_engine(DATABASE_URL)
# 创建数据库会话
SessionLocal = sessionmaker(autocommit=False,autoflush=False,bind=engine)
# 创建数据库模型
Base = declarative_base()
- DATABASE_URL:数据库连接 URL,使用 SQLite 数据库文件
province.db。 - engine:创建 SQLite 数据库引擎,
check_same_thread=False允许多线程操作。 - SessionLocal:数据库会话工厂,用于生成会话实例。
- Base:ORM 模型的基类。
5、定义数据模型
class Userbase(Base): # 继承Base
__tablename__ = "user_base"
id = Column(Integer,primary_key=True,index=True)
username = Column(String,unique=True)
password = Column(String)
class Peoplebase(Base):
__tablename__ = "people_base"
id = Column(Integer,primary_key=True,index=True)
name = Column(String,nullable=False)
age = Column(Integer,nullable=False)
address = Column(String,nullable=False)
phone = Column(String,nullable=False)
-
Userbase:用户基础数据模型,映射到数据库中的
user_base表。- id:主键,唯一标识每条用户记录,并创建索引以提高查询效率。
- username:用户名,要求在数据库中唯一(
unique=True),用于登录和身份识别。 - password:密码,存储用户的加密密码。
-
Peoplebase:人员基础数据模型,映射到数据库中的
people_base表。- id:主键,唯一标识每条人员记录,并创建索引以提高查询效率。
- name:姓名,不允许为空(
nullable=False)。 - age:年龄,不允许为空(
nullable=False)。 - address:地址,不允许为空(
nullable=False)。 - phone:电话号码,不允许为空(
nullable=False)。
6、创建数据库表
# 创建数据库表
Base.metadata.create_all(bind=engine)
根据Userbase和Peoplebase模型创建数据库表(如果表不存在)
7、Pydantic模型
class User(BaseModel):
username: str
password: str
class People(BaseModel):
name: str
age: int
address: str
phone: str
class PeopleQuery(BaseModel):
name: str
class PeopleDelete(BaseModel):
name: str
phone: str
-
User:用于接收创建用户请求的数据。
- username:用户名,字符串类型。
- password:密码,字符串类型。
-
People:用于接收创建或更新人员信息的请求数据。
- name:姓名,字符串类型。
- age:年龄,整数类型。
- address:地址,字符串类型。
- phone:电话号码,字符串类型。
-
PeopleQuery:用于接收查询人员信息的请求数据。
- name:姓名,字符串类型,作为查询条件。
-
PeopleDelete:用于接收删除人员信息的请求数据。
- name:姓名,字符串类型,作为删除条件。
- phone:电话号码,字符串类型,作为删除条件(通常与 name 联合使用以确保唯一性)。
说明
- BaseModel:这是 Pydantic 的基类,定义了字段及其类型,并提供了数据验证功能。
- 用途区分:
User和People是 创建/更新 数据时使用的请求模型。PeopleQuery是 查询 数据时使用的请求模型。PeopleDelete是 删除 数据时使用的请求模型。
- 命名规范:这种命名方式(
xxxCreate,xxxUpdate,xxxQuery,xxxDelete)是 API 设计中的常见模式,有助于清晰地表达每个模型的职责。
8、API接口实现CRUD
(1) 登录功能
@app.post("/get_login")
def get_login(user: User):
try:
session = SessionLocal()
res = session.query(Userbase).filter((Userbase.username == user.username) & (Userbase.password == user.password)).first()
if res:
return {"message": "登录成功"}
else:
raise HTTPException(status_code=400, detail="用户名或密码错误")
except HTTPException:
# 抛出HTTP异常,保持状态码
raise
except Exception as e:
print(f"登录功能出错:{e}")
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="服务器内部错误")
finally:
if session:
session.close()
(2)注册功能
@app.post("/get_register")
def get_register(user: User):
session = None
try:
session = SessionLocal()
res = session.query(Userbase).filter(Userbase.username == user.username).first()
if res:
raise HTTPException(status_code=400, detail="用户已存在")
else:
new_user = Userbase(username=user.username, password=user.password)
session.add(new_user)
session.commit()
session.refresh(new_user)
return {"message": "注册成功"}
except HTTPException:
# 重新抛出HTTP异常,保持状态码
raise # 重新抛出,FastAPI 能正确处理状态码
except Exception as e:
if session:
session.rollback() # 回滚事务
print(f"注册功能出错:{e}")
raise HTTPException(status_code=500, detail="服务器内部错误")
finally:
if session:
session.close()
(3) 修改用户名密码功能
@app.post("/get_update")
def get_update(user: User):
try:
session = SessionLocal()
res = session.query(Userbase).filter(Userbase.username == user.username).first()
if res:
res.username = user.username
res.password = user.password
session.commit()
return {"message": "更新成功"}
else:
raise HTTPException(status_code=400, detail="用户不存在")
except HTTPException:
# 重新抛出HTTP异常,保持状态码
raise
except Exception as e:
if session:
session.rollback() # 回滚事务
print(f"更新功能出错:{e}")
finally:
if session:
session.close()
(4) 增加人员功能
@app.post("/get_add_people")
def get_add_people(people: People):
try:
session = SessionLocal()
new_people = Peoplebase(name=people.name, age=people.age, address=people.address, phone=people.phone)
session.add(new_people)
session.commit()
session.refresh(new_people)
return {"message": "添加成功"}
except HTTPException:
# 重新抛出HTTP异常,保持状态码
raise
except Exception as e:
if session:
session.rollback()
print(f"添加功能出错:{e}")
raise HTTPException(status_code=500, detail="服务器内部错误")
finally:
if session:
session.close()
(5) 查询人员功能
@app.post("/get_query_people")
def get_query_people(people: PeopleQuery):
try:
session = SessionLocal()
res = session.query(Peoplebase).filter(Peoplebase.name == people.name).all()
if res:
return {"message": "查询成功", "data": res}
else:
raise HTTPException(status_code=400, detail="用户不存在")
except HTTPException:
# 重新抛出HTTP异常,保持状态码
raise
except Exception as e:
if session:
session.rollback()
print(f"查询功能出错:{e}")
raise HTTPException(status_code=500, detail="服务器内部错误")
finally:
if session:
session.close()
(6) 修改人员功能
@app.post("/get_update_people")
def get_update_people(people: People):
try:
session = SessionLocal()
res = session.query(Peoplebase).filter((Peoplebase.name == people.name) & (Peoplebase.phone == people.phone)).first()
if res:
res.name = people.name
res.age = people.age
res.address = people.address
res.phone = people.phone
session.commit()
session.refresh(res)
return {"message": "修改成功"}
else:
raise HTTPException(status_code=400, detail="用户不存在")
except HTTPException:
# 重新抛出HTTP异常,保持状态码
raise
except Exception as e:
if session:
session.rollback()
print(f"修改功能出错:{e}")
raise HTTPException(status_code=500, detail="服务器内部错误")
finally:
if session:
session.close()
(7) 删除人员功能
@app.post("/get_delete_people")
def get_delete_people(people: PeopleDelete):
try:
session = SessionLocal()
res = session.query(Peoplebase).filter((Peoplebase.name == people.name) & (Peoplebase.phone == people.phone)).first()
if res:
session.delete(res)
session.commit()
return {"message": "删除成功"}
else:
raise HTTPException(status_code=400, detail="用户不存在")
except HTTPException:
# 重新抛出HTTP异常,保持状态码
raise
except Exception as e:
if session:
session.rollback()
print(f"删除功能出错:{e}")
raise HTTPException(status_code=500, detail="服务器内部错误")
finally:
if session:
session.close()
(8)代码启动
if __name__ == "__main__":
uvicorn.run(app, host="127.0.0.1", port=8088)
使用uvicorn启动FastAPI应用,默认运行在http://127.0.0.1:8088
9、整体源码
9.1 服务端
import uvicorn
from fastapi import FastAPI,HTTPException
from pydantic import BaseModel
from sqlalchemy import create_engine,Column,Integer,String
from sqlalchemy.orm import sessionmaker,declarative_base
from starlette import status
app = FastAPI()
DATABASE_URL = "sqlite:///./my_school.db"
# 创建数据库连接
engine = create_engine(DATABASE_URL)
# 创建数据库会话
SessionLocal = sessionmaker(autocommit=False,autoflush=False,bind=engine)
# 创建数据库模型
Base = declarative_base()
class Userbase(Base): # 继承Base
__tablename__ = "user_base"
id = Column(Integer,primary_key=True,index=True)
username = Column(String,unique=True)
password = Column(String)
class Peoplebase(Base):
__tablename__ = "people_base"
id = Column(Integer,primary_key=True,index=True)
name = Column(String,unique=True,nullable=False)
age = Column(Integer,nullable=False)
address = Column(String,nullable=False)
phone = Column(String,nullable=False)
# 创建数据库表
Base.metadata.create_all(bind=engine)
# 创建数据模型
class User(BaseModel):
username: str
password: str
class People(BaseModel):
name: str
age: int
address: str
phone: str
class PeopleQuery(BaseModel):
name: str
class PeopleDelete(BaseModel):
name: str
phone: str
@app.get("/")
def root():
return {"message": "Hello World"}
@app.post("/get_login")
def get_login(user: User):
try:
session = SessionLocal()
res = session.query(Userbase).filter((Userbase.username == user.username) & (Userbase.password == user.password)).first()
if res:
return {"message": "登录成功"}
else:
raise HTTPException(status_code=400, detail="用户名或密码错误")
except HTTPException:
# 抛出HTTP异常,保持状态码
raise
except Exception as e:
print(f"登录功能出错:{e}")
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="服务器内部错误")
finally:
if session:
session.close()
@app.post("/get_register")
def get_register(user: User):
session = None
try:
session = SessionLocal()
res = session.query(Userbase).filter(Userbase.username == user.username).first()
if res:
raise HTTPException(status_code=400, detail="用户已存在")
else:
new_user = Userbase(username=user.username, password=user.password)
session.add(new_user)
session.commit()
session.refresh(new_user)
return {"message": "注册成功"}
except HTTPException:
# 重新抛出HTTP异常,保持状态码
raise # 重新抛出,FastAPI 能正确处理状态码
except Exception as e:
if session:
session.rollback() # 回滚事务
print(f"注册功能出错:{e}")
raise HTTPException(status_code=500, detail="服务器内部错误")
finally:
if session:
session.close()
@app.post("/get_update")
def get_update(user: User):
try:
session = SessionLocal()
res = session.query(Userbase).filter(Userbase.username == user.username).first()
if res:
res.username = user.username
res.password = user.password
session.commit()
return {"message": "更新成功"}
else:
raise HTTPException(status_code=400, detail="用户不存在")
except HTTPException:
# 重新抛出HTTP异常,保持状态码
raise
except Exception as e:
if session:
session.rollback() # 回滚事务
print(f"更新功能出错:{e}")
finally:
if session:
session.close()
@app.post("/get_add_people")
def get_add_people(people: People):
try:
session = SessionLocal()
new_people = Peoplebase(name=people.name, age=people.age, address=people.address, phone=people.phone)
session.add(new_people)
session.commit()
session.refresh(new_people)
return {"message": "添加成功"}
except HTTPException:
# 重新抛出HTTP异常,保持状态码
raise
except Exception as e:
if session:
session.rollback()
print(f"添加功能出错:{e}")
raise HTTPException(status_code=500, detail="服务器内部错误")
finally:
if session:
session.close()
# 写一个查询people功能
@app.post("/get_query_people")
def get_query_people(people: PeopleQuery):
try:
session = SessionLocal()
res = session.query(Peoplebase).filter(Peoplebase.name == people.name).all()
if res:
return {"message": "查询成功", "data": res}
else:
raise HTTPException(status_code=400, detail="用户不存在")
except HTTPException:
# 重新抛出HTTP异常,保持状态码
raise
except Exception as e:
if session:
session.rollback()
print(f"查询功能出错:{e}")
raise HTTPException(status_code=500, detail="服务器内部错误")
finally:
if session:
session.close()
# 写一个修改people功能
@app.post("/get_update_people")
def get_update_people(people: People):
try:
session = SessionLocal()
res = session.query(Peoplebase).filter((Peoplebase.name == people.name) & (Peoplebase.phone == people.phone)).first()
if res:
res.name = people.name
res.age = people.age
res.address = people.address
res.phone = people.phone
session.commit()
session.refresh(res)
return {"message": "修改成功"}
else:
raise HTTPException(status_code=400, detail="用户不存在")
except HTTPException:
# 重新抛出HTTP异常,保持状态码
raise
except Exception as e:
if session:
session.rollback()
print(f"修改功能出错:{e}")
raise HTTPException(status_code=500, detail="服务器内部错误")
finally:
if session:
session.close()
@app.post("/get_delete_people")
def get_delete_people(people: PeopleDelete):
try:
session = SessionLocal()
res = session.query(Peoplebase).filter((Peoplebase.name == people.name) & (Peoplebase.phone == people.phone)).first()
if res:
session.delete(res)
session.commit()
return {"message": "删除成功"}
else:
raise HTTPException(status_code=400, detail="用户不存在")
except HTTPException:
# 重新抛出HTTP异常,保持状态码
raise
except Exception as e:
if session:
session.rollback()
print(f"删除功能出错:{e}")
raise HTTPException(status_code=500, detail="服务器内部错误")
finally:
if session:
session.close()
if __name__ == "__main__":
uvicorn.run(app, host="127.0.0.1", port=8088)
9.2 客户端
import requests
url = ""
while True:
input_type = int(input("请输入您的操作:1、登录\t\t2、注册\t\t3、修改\t\t4、退出:"))
if input_type == 1:
username = input("请输入您的用户名:")
userpass = input("请输入您的密码:")
url = f"http://127.0.0.1:8088/get_login"
params = {"username":username,"password":userpass}
res = requests.post(url,json=params)
if res.status_code == 200:
print('注册成功')
while True:
input_choice = int(input("请输入您的操作:1、添加\t\t2、查询\t\t3、修改\t\t4、删除\t\t5、退出:"))
if input_choice == 1:
p_username = input("请输入您要添加的姓名:")
p_age = int(input("请输入您的年龄:"))
p_address = input("请输入您的地址:")
p_phone = input("请输入您的手机号:")
p_url = f"http://127.0.0.1:8088/get_add_people"
params = {"name": p_username, "age": p_age, "address": p_address, "phone": p_phone}
res = requests.post(p_url, json=params)
if res.status_code == 200:
print(res.text)
continue
elif res.status_code == 400:
print(res.text)
continue
else:
print(f"服务器内部错误:{res.text}")
elif input_choice == 2:
p_url = "http://127.0.0.1:8088/get_query_people"
p_name = input("请输入您要查询的姓名:")
params = {"name": p_name}
res = requests.post(p_url, json=params)
if res.status_code == 200:
print(res.text)
continue
elif res.status_code == 400:
print(res.text)
continue
else:
print(f"服务器内部错误:{res.text}")
elif input_choice == 3:
p_url = "http://127.0.0.1:8088/get_update_people"
p_name = input("请输入您要修改的姓名:")
p_age = int(input("请输入您的年龄:"))
p_address = input("请输入您的地址:")
p_phone = input("请输入您的手机号:")
params = {"name": p_name, "age": p_age, "address": p_address, "phone": p_phone}
res = requests.post(p_url, json=params)
if res.status_code == 200:
print(res.text)
continue
elif res.status_code == 400:
print(res.text)
continue
else:
print(f"服务器内部错误:{res.text}")
elif input_choice == 4:
p_url = "http://127.0.0.1:8088/get_delete_people"
p_name = input("请输入您要删除的姓名:")
p_phone = input("请输入您要删除的手机号:")
params = {"name": p_name, "phone": p_phone}
res = requests.post(p_url, json=params)
if res.status_code == 200:
print(res.text)
continue
elif res.status_code == 400:
print(res.text)
continue
else:
print(f"服务器内部错误:{res.text}")
else:
print("退出程序")
break
continue
elif res.status_code == 400:
print(res.text)
continue
else:
print(f"服务器内部错误:{res.text}")
break
elif input_type == 2:
url = "http://127.0.0.1:8088/get_register"
input_name = input("请输入您要注册的用户名:")
input_pass = input("请输入您要注册的密码:")
params = {"username": input_name, "password": input_pass}
res = requests.post(url, json=params)
if res.status_code == 200:
print(res.text)
continue
elif res.status_code == 400:
print(res.text)
continue
else:
print(f"服务器内部错误:{res.text}")
break
elif input_type == 3:
url = "http://127.0.0.1:8088/get_update"
input_name = input("请输入您要修改的用户名:")
input_pass = input("请输入您要修改的密码:")
params = {"username": input_name, "password": input_pass}
res = requests.post(url, json=params)
print(res)
print(res.text)
continue
elif input_type == 4:
break
10、面向对象版本
10.1 服务端
import uvicorn
import sqlite3
from fastapi import FastAPI, Request,HTTPException
app = FastAPI()
class DBHelper:
def __init__(self, my_file):
"""
:param my_file: 数据库的名称
self.connection:与数据库连接的对象
"""
self.my_file = my_file
self.connection = None
# 创建数据库 并连接数据
def create_database(self):
try:
self.connection = sqlite3.connect(self.my_file, check_same_thread=False)
except sqlite3.Error as e:
print(f"创建数据库错误:{e}")
# 创建表
def create_table(self):
try:
# 创建数据库游标
crs = self.connection.cursor()
# 创建表数据
crs.execute("""
create table if not exists user_base(
id integer primary key autoincrement,
username TEXT not null,
password TEXT not null
)
""")
crs.execute("""
create table if not exists people(
id integer primary key autoincrement,
name TEXT not null,
age INTEGER not null,
address TEXT not null,
phone text not null
)
""")
self.connection.commit()
except sqlite3.Error as e:
print(f"创建表错误:{e}")
# 注册
def register_table(self, username, password):
try:
# 创建数据库游标
crs = self.connection.cursor()
# 先查询数据库中是否有相同的用户名
crs.execute("select * from user_base where username = ?", (username,))
sd_tuple = crs.fetchall()
# 当返回的数据为空,则代表可以新增数据,不为空则用户名以存在
if len(sd_tuple) == 0:
crs.execute("insert into user_base(username,password) "
"values(?,?)",(username,password,))
# 先提交事务,再返回数据
self.connection.commit()
return {"message": "注册成功"}
else:
return {"message": "当前用户已存在"}
except sqlite3.Error as e:
return {"message": f"注册失败:{e}"}
# 登录
def login_table(self, username, password):
try:
# 创建数据库游标
crs = self.connection.cursor()
# 先查询数据库中是否有相同的用户名
crs.execute("select * from user_base where username = ? and password = ?",(username,password,))
list_data = crs.fetchall()
if len(list_data) > 0:
self.connection.commit()
return {"message": "登录成功"}
else:
return {"message": "用户名或密码不正确"}
except sqlite3.Error as e:
return {"message": f"登录异常:{e}"}
# 添加用户
def add_people(self, name, age, address, phone):
try:
crs = self.connection.cursor()
crs.execute("insert into people(name,age,address,phone) values(?,?,?,?)",(name,age,address,phone,))
self.connection.commit()
return {"message": "添加成功"}
except sqlite3.Error as e:
return {"message": f"添加用户异常:{e}"}
# 查询用户
# 在服务端.py中修改select_people方法
def select_people(self, name):
try:
crs = self.connection.cursor()
if name:
crs.execute("select * from people where name=?", (name,))
list_data = crs.fetchall()
else:
crs.execute("select * from people")
list_data = crs.fetchall()
return {"message": list_data} # 返回实际数据
except sqlite3.Error as e:
return {"message": f"查询用户异常:{e}"}
# 修改用户
def update_people(self,id,name,age,address,phone):
try:
crs = self.connection.cursor()
crs.execute("update people set name=?,age=?,address=?,phone=? where id=?",(name,age,address,phone,id,))
self.connection.commit()
return {"message": "修改成功"}
except sqlite3.Error as e:
return {"message": f"修改用户异常:{e}"}
# 删除用户
def delete_people(self,name,phone):
try:
crs = self.connection.cursor()
crs.execute("delete from people where name=? and phone=?",(name,phone,))
self.connection.commit()
return {"message": "删除成功"}
except sqlite3.Error as e:
return {"message": f"删除用户异常:{e}"}
# 关闭与数据库的连接
def close_connection(self):
if self.connection:
self.connection.close()
db = DBHelper("user.db")
db.create_database()
db.create_table()
@app.get("/")
def root():
return {"message": "Hello World"}
@app.get("/get_login")
def get_login(request: Request):
username = request.query_params["username"]
password = request.query_params["password"]
response = db.login_table(username, password)
return {"message": response["message"]}
@app.post("/get_register")
def get_register(request: Request):
username = request.query_params["username"]
password = request.query_params["password"]
response = db.register_table(username, password)
return {"message": response["message"]}
@app.post("/get_add_people")
def get_add_people(request: Request):
name = request.query_params["name"]
age = request.query_params["age"]
address = request.query_params["address"]
phone = request.query_params["phone"]
response = db.add_people(name, age, address, phone)
return {"message": response["message"]}
@app.get("/get_select_people")
def get_select_people(request: Request):
name = request.query_params["name"]
response = db.select_people(name)
return {"查询结果": response["message"]}
@app.put("/get_update_people")
def get_update_people(request: Request):
id = request.query_params["id"]
name = request.query_params["name"]
age = request.query_params["age"]
address = request.query_params["address"]
phone = request.query_params["phone"]
response = db.update_people(id,name, age, address, phone)
return {"message": response["message"]}
@app.delete("/get_delete_people")
def get_delete_people(request: Request):
name = request.query_params["name"]
phone = request.query_params["phone"]
response = db.delete_people(name, phone)
return {"message": response["data"]}
if __name__ == "__main__":
uvicorn.run(app, host="127.0.0.1", port=8088)
10.2 客户端
import requests
url = ""
while True:
input_type = int(input("请输入您的操作:1、登录\t\t2、注册\t\t3、退出:"))
if input_type == 1:
try:
username = input("请输入您的用户名:")
userpass = input("请输入您的用户名:")
url = f"http://127.0.0.1:8088/get_login?username={username}&password={userpass}"
res = requests.get(url)
if res.status_code == 200:
while True:
input_choice = int(input("请输入您的操作:1、查询\t\t2、添加\t\t3、修改\t\t4、删除\t\t5、退出:"))
if input_choice == 1:
try:
name = input("请输入您要查询的姓名:")
url = f"http://127.0.0.1:8088/get_select_people?name={name}"
res = requests.get(url)
if res.status_code == 200:
result = res.json()
# 服务器返回的是 {"查询结果": [元组数据]} 格式
if "查询结果" in result:
data = result["查询结果"]
if data: # 如果有数据
# 遍历每个元组并打印
for row in data:
print(f"{row[0]:<5} {row[1]:<10} {row[2]:<5} {row[3]:<15} {row[4]:<15}")
print(f"共查询到 {len(data)} 条记录")
else:
print("未找到相关记录")
else:
print("查询失败:", result)
continue
except Exception as e:
print(f"查询功能出错:{e}")
elif input_choice == 2:
try:
name = input("请输入您要添加的姓名:")
age = input("请输入您要添加的年龄:")
address = input("请输入您要添加的地址:")
phone = input("请输入您要添加的手机号:")
url = f"http://127.0.0.1:8088/get_add_people?name={name}&age={age}&address={address}&phone={phone}"
# params = {"name": name, "age": age, "address": address, "phone": phone}
res = requests.post(url)
if res.status_code == 200:
print(res.text)
continue
else:
print("添加失败")
continue
except Exception as e:
print(f"添加手机号出错:{e}")
elif input_choice == 3:
try:
p_id = int(input("请输入要修改的id:"))
p_name = input("请输入您的姓名:")
p_age = int(input("请输入您的年龄:"))
p_address = input("请输入您的地址:")
p_phone = input("请输入您的手机号:")
url = f"http://127.0.0.1:8088/get_update_people?id={p_id}&name={p_name}&age={p_age}&address={p_address}&phone={p_phone}"
# params = {"name": p_name, "age": p_age, "address": p_address, "phone": p_phone}
response = requests.put(url)
if response.status_code == 200:
print(response.text)
continue
else:
print("修改失败")
continue
except Exception as e:
print(f"修改功能出错:{e}")
elif input_choice == 4:
try:
name = input("请输入要删除的姓名:")
phone = input("请输入要删除的手机号:")
url = f"http://127.0.0.1:8088/get_delete_people?name={name}&phone={phone}"
# params = {"name": name, "phone": phone}
res = requests.delete(url)
if res.status_code == 200:
print("删除成功")
continue
else:
print("删除失败")
continue
except Exception as e:
print(f"删除功能出错:{e}")
else:
print("退出程序")
break
else:
print("登录失败")
except Exception as e:
print(f"登录功能出错:{e}")
elif input_type == 2:
try:
url = "http://127.0.0.1:8088/get_register"
input_name = input("请输入您要注册的用户名:")
input_pass = input("请输入您要注册的密码:")
params = {"username": input_name, "password": input_pass}
res = requests.post(url, params=params)
print(res)
print(res.text)
except Exception as e:
print(f"注册功能出错:{e}")
elif input_type == 3:
break
注意:面向对象版本好像有点小问题,懒得改了,有兴趣的小伙伴可以改改,么么
更多推荐

所有评论(0)