1、安装依赖库

pip install fastapi sqlachemy databases uvicorn

SQLite无需额外驱动,Python已经内置支持

2、导入模块

import uvicorn
from fastapi import FastAPI,HTTPException
from pydantic import BaseModel
from sqlalchemy import create_engine,Column,Integer,String
from sqlalchemy.orm import sessionmaker,declarative_base

2.1 uvicorn

UVicorn 是一个 ASGI(Asynchronous Server Gateway Interface)服务器,用于运行异步 Python 网络应用。它专门为 FastAPI 等异步框架提供高性能的服务器支持,能够处理高并发请求。

2.2 fastapi

FastAPI 是一个现代、快速(高性能)的 Python Web 框架,用于构建 API。它基于标准 Python 类型提示,支持异步请求处理,并自动生成交互式 API 文档(如 Swagger UI 和 ReDoc)。

2.3 pydantic

Pydantic 是一个数据验证和设置管理的库,使用 Python 类型注解进行数据校验。在 FastAPI 中,Pydantic 的 BaseModel 用于定义请求和响应的数据模型,确保输入输出的数据符合预期格式。

2.4 sqlalchemy

SQLAlchemy 是 Python 中一个强大的 ORM(对象关系映射)工具库,用于与数据库交互。它允许通过 Python 类和对象操作数据库,而不需要直接编写 SQL 语句。

2.5 sqlalchemy.create_engine

create_engine 用于创建数据库引擎实例,连接数据库。它接受数据库连接字符串作为参数,并管理数据库连接池。

2.6 sqlalchemy.Column

Column 用于定义数据库表中的列。通过指定列的类型(如 IntegerString)和其他属性(如主键、唯一性约束),可以映射 Python 类属性到数据库表的列。

2.7 sqlalchemy.Integer 和 sqlalchemy.String

Integer 和 String 是 SQLAlchemy 提供的列类型,分别对应数据库中的整数和字符串类型。它们用于定义表中列的数据类型。

2.8 sqlalchemy.orm.sessionmaker

sessionmaker 是一个工厂函数,用于生成数据库会话(Session)类。会话是 SQLAlchemy 与数据库交互的主要接口,用于执行查询、插入、更新和删除操作。

2.9 sqlalchemy.orm.declarative_base

declarative_base 是一个基类工厂函数,用于创建 ORM 模型的基类。通过继承这个基类,可以定义数据库表的 Python 类模型,实现类与表的映射关系。

3、初始化FastAPI应用

app = FastAPI()

创建一个FastAPI应用实例

4、数据库配置

DATABASE_URL = "sqlite:///./my_school.db"
# 创建数据库连接
engine = create_engine(DATABASE_URL)
# 创建数据库会话
SessionLocal = sessionmaker(autocommit=False,autoflush=False,bind=engine)
# 创建数据库模型
Base = declarative_base()
  • DATABASE_URL:数据库连接 URL,使用 SQLite 数据库文件 province.db
  • engine:创建 SQLite 数据库引擎,check_same_thread=False 允许多线程操作。
  • SessionLocal:数据库会话工厂,用于生成会话实例。
  • Base:ORM 模型的基类。

5、定义数据模型

class Userbase(Base): # 继承Base
    __tablename__ = "user_base"
    id = Column(Integer,primary_key=True,index=True)
    username = Column(String,unique=True)
    password = Column(String)

class Peoplebase(Base):
    __tablename__ = "people_base"
    id = Column(Integer,primary_key=True,index=True)
    name = Column(String,nullable=False)
    age = Column(Integer,nullable=False)
    address = Column(String,nullable=False)
    phone = Column(String,nullable=False)
  • Userbase:用户基础数据模型,映射到数据库中的 user_base 表。

    • id:主键,唯一标识每条用户记录,并创建索引以提高查询效率。
    • username:用户名,要求在数据库中唯一(unique=True),用于登录和身份识别。
    • password:密码,存储用户的加密密码。
  • Peoplebase:人员基础数据模型,映射到数据库中的 people_base 表。

    • id:主键,唯一标识每条人员记录,并创建索引以提高查询效率。
    • name:姓名,不允许为空(nullable=False)。
    • age:年龄,不允许为空(nullable=False)。
    • address:地址,不允许为空(nullable=False)。
    • phone:电话号码,不允许为空(nullable=False)。

6、创建数据库表

# 创建数据库表
Base.metadata.create_all(bind=engine)

根据Userbase和Peoplebase模型创建数据库表(如果表不存在)

7、Pydantic模型

class User(BaseModel):
    username: str
    password: str

class People(BaseModel):
    name: str
    age: int
    address: str
    phone: str

class PeopleQuery(BaseModel):
    name: str

class PeopleDelete(BaseModel):
    name: str
    phone: str
  • User:用于接收创建用户请求的数据。

    • username:用户名,字符串类型。
    • password:密码,字符串类型。
  • People:用于接收创建或更新人员信息的请求数据。

    • name:姓名,字符串类型。
    • age:年龄,整数类型。
    • address:地址,字符串类型。
    • phone:电话号码,字符串类型。
  • PeopleQuery:用于接收查询人员信息的请求数据。

    • name:姓名,字符串类型,作为查询条件。
  • PeopleDelete:用于接收删除人员信息的请求数据。

    • name:姓名,字符串类型,作为删除条件。
    • phone:电话号码,字符串类型,作为删除条件(通常与 name 联合使用以确保唯一性)。

说明

  • BaseModel:这是 Pydantic 的基类,定义了字段及其类型,并提供了数据验证功能。
  • 用途区分
    • User 和 People 是 创建/更新 数据时使用的请求模型。
    • PeopleQuery 是 查询 数据时使用的请求模型。
    • PeopleDelete 是 删除 数据时使用的请求模型。
  • 命名规范:这种命名方式(xxxCreatexxxUpdatexxxQueryxxxDelete)是 API 设计中的常见模式,有助于清晰地表达每个模型的职责。

8、API接口实现CRUD

(1) 登录功能

@app.post("/get_login")
def get_login(user: User):
    try:
        session = SessionLocal()
        res = session.query(Userbase).filter((Userbase.username == user.username) & (Userbase.password == user.password)).first()
        if res:
            return {"message": "登录成功"}
        else:
            raise HTTPException(status_code=400, detail="用户名或密码错误")
    except HTTPException:
        # 抛出HTTP异常,保持状态码
        raise
    except Exception as e:
        print(f"登录功能出错:{e}")
        raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="服务器内部错误")
    finally:
        if session:
            session.close()

(2)注册功能

@app.post("/get_register")
def get_register(user: User):
    session = None
    try:
        session = SessionLocal()
        res = session.query(Userbase).filter(Userbase.username == user.username).first()
        if res:
            raise HTTPException(status_code=400, detail="用户已存在")
        else:
            new_user = Userbase(username=user.username, password=user.password)
            session.add(new_user)
            session.commit()
            session.refresh(new_user)
            return {"message": "注册成功"}
    except HTTPException:
        # 重新抛出HTTP异常,保持状态码
        raise # 重新抛出,FastAPI 能正确处理状态码
    except Exception as e:
        if session:
            session.rollback()  # 回滚事务
        print(f"注册功能出错:{e}")
        raise HTTPException(status_code=500, detail="服务器内部错误")
    finally:
        if session:
            session.close()

(3) 修改用户名密码功能

@app.post("/get_update")
def get_update(user: User):
    try:
        session = SessionLocal()
        res = session.query(Userbase).filter(Userbase.username == user.username).first()
        if res:
            res.username = user.username
            res.password = user.password
            session.commit()
            return {"message": "更新成功"}
        else:
            raise HTTPException(status_code=400, detail="用户不存在")
    except HTTPException:
        # 重新抛出HTTP异常,保持状态码
        raise
    except Exception as e:
        if session:
            session.rollback() # 回滚事务
        print(f"更新功能出错:{e}")
    finally:
        if session:
           session.close()

(4) 增加人员功能

@app.post("/get_add_people")
def get_add_people(people: People):
    try:
        session = SessionLocal()
        new_people = Peoplebase(name=people.name, age=people.age, address=people.address, phone=people.phone)
        session.add(new_people)
        session.commit()
        session.refresh(new_people)
        return {"message": "添加成功"}
    except HTTPException:
        # 重新抛出HTTP异常,保持状态码
        raise
    except Exception as e:
        if session:
            session.rollback()
        print(f"添加功能出错:{e}")
        raise HTTPException(status_code=500, detail="服务器内部错误")
    finally:
        if session:
            session.close()

(5) 查询人员功能

@app.post("/get_query_people")
def get_query_people(people: PeopleQuery):
    try:
        session = SessionLocal()
        res = session.query(Peoplebase).filter(Peoplebase.name == people.name).all()
        if res:
            return {"message": "查询成功", "data": res}
        else:
            raise HTTPException(status_code=400, detail="用户不存在")
    except HTTPException:
            # 重新抛出HTTP异常,保持状态码
            raise
    except Exception as e:
        if session:
            session.rollback()
        print(f"查询功能出错:{e}")
        raise HTTPException(status_code=500, detail="服务器内部错误")
    finally:
        if session:
            session.close()

(6) 修改人员功能

@app.post("/get_update_people")
def get_update_people(people: People):
    try:
        session = SessionLocal()
        res = session.query(Peoplebase).filter((Peoplebase.name == people.name) & (Peoplebase.phone == people.phone)).first()
        if res:
            res.name = people.name
            res.age = people.age
            res.address = people.address
            res.phone = people.phone
            session.commit()
            session.refresh(res)
            return {"message": "修改成功"}
        else:
            raise HTTPException(status_code=400, detail="用户不存在")
    except HTTPException:
        # 重新抛出HTTP异常,保持状态码
        raise
    except Exception as e:
        if session:
            session.rollback()
        print(f"修改功能出错:{e}")
        raise HTTPException(status_code=500, detail="服务器内部错误")
    finally:
        if session:
            session.close()

(7) 删除人员功能

@app.post("/get_delete_people")
def get_delete_people(people: PeopleDelete):
    try:
        session = SessionLocal()
        res = session.query(Peoplebase).filter((Peoplebase.name == people.name) & (Peoplebase.phone == people.phone)).first()
        if res:
            session.delete(res)
            session.commit()
            return {"message": "删除成功"}
        else:
            raise HTTPException(status_code=400, detail="用户不存在")
    except HTTPException:
        # 重新抛出HTTP异常,保持状态码
        raise
    except Exception as e:
        if session:
            session.rollback()
        print(f"删除功能出错:{e}")
        raise HTTPException(status_code=500, detail="服务器内部错误")
    finally:
        if session:
            session.close()

(8)代码启动


if __name__ == "__main__":
    uvicorn.run(app, host="127.0.0.1", port=8088)

使用uvicorn启动FastAPI应用,默认运行在http://127.0.0.1:8088

9、整体源码

9.1 服务端

import uvicorn
from fastapi import FastAPI,HTTPException
from pydantic import BaseModel
from sqlalchemy import create_engine,Column,Integer,String
from sqlalchemy.orm import sessionmaker,declarative_base
from starlette import status

app = FastAPI()

DATABASE_URL = "sqlite:///./my_school.db"
# 创建数据库连接
engine = create_engine(DATABASE_URL)
# 创建数据库会话
SessionLocal = sessionmaker(autocommit=False,autoflush=False,bind=engine)
# 创建数据库模型
Base = declarative_base()

class Userbase(Base): # 继承Base
    __tablename__ = "user_base"
    id = Column(Integer,primary_key=True,index=True)
    username = Column(String,unique=True)
    password = Column(String)

class Peoplebase(Base):
    __tablename__ = "people_base"
    id = Column(Integer,primary_key=True,index=True)
    name = Column(String,unique=True,nullable=False)
    age = Column(Integer,nullable=False)
    address = Column(String,nullable=False)
    phone = Column(String,nullable=False)
# 创建数据库表
Base.metadata.create_all(bind=engine)
# 创建数据模型
class User(BaseModel):
    username: str
    password: str

class People(BaseModel):
    name: str
    age: int
    address: str
    phone: str

class PeopleQuery(BaseModel):
    name: str

class PeopleDelete(BaseModel):
    name: str
    phone: str

@app.get("/")
def root():
    return {"message": "Hello World"}


@app.post("/get_login")
def get_login(user: User):
    try:
        session = SessionLocal()
        res = session.query(Userbase).filter((Userbase.username == user.username) & (Userbase.password == user.password)).first()
        if res:
            return {"message": "登录成功"}
        else:
            raise HTTPException(status_code=400, detail="用户名或密码错误")
    except HTTPException:
        # 抛出HTTP异常,保持状态码
        raise
    except Exception as e:
        print(f"登录功能出错:{e}")
        raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="服务器内部错误")
    finally:
        if session:
            session.close()

@app.post("/get_register")
def get_register(user: User):
    session = None
    try:
        session = SessionLocal()
        res = session.query(Userbase).filter(Userbase.username == user.username).first()
        if res:
            raise HTTPException(status_code=400, detail="用户已存在")
        else:
            new_user = Userbase(username=user.username, password=user.password)
            session.add(new_user)
            session.commit()
            session.refresh(new_user)
            return {"message": "注册成功"}
    except HTTPException:
        # 重新抛出HTTP异常,保持状态码
        raise # 重新抛出,FastAPI 能正确处理状态码
    except Exception as e:
        if session:
            session.rollback()  # 回滚事务
        print(f"注册功能出错:{e}")
        raise HTTPException(status_code=500, detail="服务器内部错误")
    finally:
        if session:
            session.close()

@app.post("/get_update")
def get_update(user: User):
    try:
        session = SessionLocal()
        res = session.query(Userbase).filter(Userbase.username == user.username).first()
        if res:
            res.username = user.username
            res.password = user.password
            session.commit()
            return {"message": "更新成功"}
        else:
            raise HTTPException(status_code=400, detail="用户不存在")
    except HTTPException:
        # 重新抛出HTTP异常,保持状态码
        raise
    except Exception as e:
        if session:
            session.rollback() # 回滚事务
        print(f"更新功能出错:{e}")
    finally:
        if session:
           session.close()

@app.post("/get_add_people")
def get_add_people(people: People):
    try:
        session = SessionLocal()
        new_people = Peoplebase(name=people.name, age=people.age, address=people.address, phone=people.phone)
        session.add(new_people)
        session.commit()
        session.refresh(new_people)
        return {"message": "添加成功"}
    except HTTPException:
        # 重新抛出HTTP异常,保持状态码
        raise
    except Exception as e:
        if session:
            session.rollback()
        print(f"添加功能出错:{e}")
        raise HTTPException(status_code=500, detail="服务器内部错误")
    finally:
        if session:
            session.close()

# 写一个查询people功能
@app.post("/get_query_people")
def get_query_people(people: PeopleQuery):
    try:
        session = SessionLocal()
        res = session.query(Peoplebase).filter(Peoplebase.name == people.name).all()
        if res:
            return {"message": "查询成功", "data": res}
        else:
            raise HTTPException(status_code=400, detail="用户不存在")
    except HTTPException:
            # 重新抛出HTTP异常,保持状态码
            raise
    except Exception as e:
        if session:
            session.rollback()
        print(f"查询功能出错:{e}")
        raise HTTPException(status_code=500, detail="服务器内部错误")
    finally:
        if session:
            session.close()

# 写一个修改people功能
@app.post("/get_update_people")
def get_update_people(people: People):
    try:
        session = SessionLocal()
        res = session.query(Peoplebase).filter((Peoplebase.name == people.name) & (Peoplebase.phone == people.phone)).first()
        if res:
            res.name = people.name
            res.age = people.age
            res.address = people.address
            res.phone = people.phone
            session.commit()
            session.refresh(res)
            return {"message": "修改成功"}
        else:
            raise HTTPException(status_code=400, detail="用户不存在")
    except HTTPException:
        # 重新抛出HTTP异常,保持状态码
        raise
    except Exception as e:
        if session:
            session.rollback()
        print(f"修改功能出错:{e}")
        raise HTTPException(status_code=500, detail="服务器内部错误")
    finally:
        if session:
            session.close()

@app.post("/get_delete_people")
def get_delete_people(people: PeopleDelete):
    try:
        session = SessionLocal()
        res = session.query(Peoplebase).filter((Peoplebase.name == people.name) & (Peoplebase.phone == people.phone)).first()
        if res:
            session.delete(res)
            session.commit()
            return {"message": "删除成功"}
        else:
            raise HTTPException(status_code=400, detail="用户不存在")
    except HTTPException:
        # 重新抛出HTTP异常,保持状态码
        raise
    except Exception as e:
        if session:
            session.rollback()
        print(f"删除功能出错:{e}")
        raise HTTPException(status_code=500, detail="服务器内部错误")
    finally:
        if session:
            session.close()

if __name__ == "__main__":
    uvicorn.run(app, host="127.0.0.1", port=8088)

9.2 客户端

import requests

url = ""

while True:
    input_type = int(input("请输入您的操作:1、登录\t\t2、注册\t\t3、修改\t\t4、退出:"))
    if input_type == 1:
        username = input("请输入您的用户名:")
        userpass = input("请输入您的密码:")
        url = f"http://127.0.0.1:8088/get_login"
        params  = {"username":username,"password":userpass}
        res = requests.post(url,json=params)
        if res.status_code == 200:
            print('注册成功')
            while True:
                input_choice = int(input("请输入您的操作:1、添加\t\t2、查询\t\t3、修改\t\t4、删除\t\t5、退出:"))
                if input_choice == 1:
                    p_username = input("请输入您要添加的姓名:")
                    p_age = int(input("请输入您的年龄:"))
                    p_address = input("请输入您的地址:")
                    p_phone = input("请输入您的手机号:")
                    p_url = f"http://127.0.0.1:8088/get_add_people"
                    params = {"name": p_username, "age": p_age, "address": p_address, "phone": p_phone}
                    res = requests.post(p_url, json=params)
                    if res.status_code == 200:
                        print(res.text)
                        continue
                    elif res.status_code == 400:
                        print(res.text)
                        continue
                    else:
                        print(f"服务器内部错误:{res.text}")
                elif input_choice == 2:
                    p_url = "http://127.0.0.1:8088/get_query_people"
                    p_name = input("请输入您要查询的姓名:")
                    params = {"name": p_name}
                    res = requests.post(p_url, json=params)
                    if res.status_code == 200:
                        print(res.text)
                        continue
                    elif res.status_code == 400:
                        print(res.text)
                        continue
                    else:
                        print(f"服务器内部错误:{res.text}")
                elif input_choice == 3:
                    p_url = "http://127.0.0.1:8088/get_update_people"
                    p_name = input("请输入您要修改的姓名:")
                    p_age = int(input("请输入您的年龄:"))
                    p_address = input("请输入您的地址:")
                    p_phone = input("请输入您的手机号:")
                    params = {"name": p_name, "age": p_age, "address": p_address, "phone": p_phone}
                    res = requests.post(p_url, json=params)
                    if res.status_code == 200:
                        print(res.text)
                        continue
                    elif res.status_code == 400:
                        print(res.text)
                        continue
                    else:
                        print(f"服务器内部错误:{res.text}")
                elif input_choice == 4:
                    p_url = "http://127.0.0.1:8088/get_delete_people"
                    p_name = input("请输入您要删除的姓名:")
                    p_phone = input("请输入您要删除的手机号:")
                    params = {"name": p_name, "phone": p_phone}
                    res = requests.post(p_url, json=params)
                    if res.status_code == 200:
                        print(res.text)
                        continue
                    elif res.status_code == 400:
                        print(res.text)
                        continue
                    else:
                        print(f"服务器内部错误:{res.text}")
                else:
                    print("退出程序")
                    break
            continue
        elif res.status_code == 400:
            print(res.text)
            continue
        else:
            print(f"服务器内部错误:{res.text}")
            break
    elif input_type == 2:
        url = "http://127.0.0.1:8088/get_register"
        input_name = input("请输入您要注册的用户名:")
        input_pass = input("请输入您要注册的密码:")
        params = {"username": input_name, "password": input_pass}
        res = requests.post(url, json=params)
        if res.status_code == 200:
            print(res.text)
            continue
        elif res.status_code == 400:
            print(res.text)
            continue
        else:
            print(f"服务器内部错误:{res.text}")
            break
    elif input_type == 3:
        url = "http://127.0.0.1:8088/get_update"
        input_name = input("请输入您要修改的用户名:")
        input_pass = input("请输入您要修改的密码:")
        params = {"username": input_name, "password": input_pass}
        res = requests.post(url, json=params)
        print(res)
        print(res.text)
        continue
    elif input_type == 4:
        break

10、面向对象版本

10.1 服务端

import uvicorn

import sqlite3

from fastapi import FastAPI, Request,HTTPException

app = FastAPI()


class DBHelper:
    def __init__(self, my_file):
        """
        :param my_file: 数据库的名称
        self.connection:与数据库连接的对象
        """
        self.my_file = my_file
        self.connection = None

    # 创建数据库  并连接数据
    def create_database(self):
        try:
            self.connection = sqlite3.connect(self.my_file, check_same_thread=False)
        except sqlite3.Error as e:
            print(f"创建数据库错误:{e}")

    # 创建表
    def create_table(self):
        try:
            # 创建数据库游标
            crs = self.connection.cursor()
            # 创建表数据
            crs.execute("""
                create table if not exists user_base(
                    id integer primary key autoincrement,
                    username TEXT not null,
                    password TEXT not null
                )
            """)
            crs.execute("""
                create table if not exists people(
                    id integer primary key autoincrement,
                    name TEXT not null,
                    age INTEGER not null,
                    address TEXT not null,
                    phone text not null
                )
            """)
            self.connection.commit()
        except sqlite3.Error as e:
            print(f"创建表错误:{e}")

    # 注册
    def register_table(self, username, password):
        try:
            # 创建数据库游标
            crs = self.connection.cursor()
            # 先查询数据库中是否有相同的用户名
            crs.execute("select * from user_base where username = ?", (username,))
            sd_tuple = crs.fetchall()
            # 当返回的数据为空,则代表可以新增数据,不为空则用户名以存在
            if len(sd_tuple) == 0:
                crs.execute("insert into user_base(username,password) "
                            "values(?,?)",(username,password,))
                # 先提交事务,再返回数据
                self.connection.commit()
                return {"message": "注册成功"}
            else:
                return {"message": "当前用户已存在"}
        except sqlite3.Error as e:
            return {"message": f"注册失败:{e}"}

    # 登录
    def login_table(self, username, password):
        try:
            # 创建数据库游标
            crs = self.connection.cursor()
            # 先查询数据库中是否有相同的用户名
            crs.execute("select * from user_base where username = ? and password = ?",(username,password,))
            list_data = crs.fetchall()
            if len(list_data) > 0:
                self.connection.commit()
                return {"message": "登录成功"}
            else:
                return {"message": "用户名或密码不正确"}
        except sqlite3.Error as e:
            return {"message": f"登录异常:{e}"}

    # 添加用户
    def add_people(self, name, age, address, phone):
        try:
            crs = self.connection.cursor()
            crs.execute("insert into people(name,age,address,phone) values(?,?,?,?)",(name,age,address,phone,))
            self.connection.commit()
            return {"message": "添加成功"}
        except sqlite3.Error as e:
            return {"message": f"添加用户异常:{e}"}

    # 查询用户
    # 在服务端.py中修改select_people方法
    def select_people(self, name):
        try:
            crs = self.connection.cursor()
            if name:
                crs.execute("select * from people where name=?", (name,))
                list_data = crs.fetchall()
            else:
                crs.execute("select * from people")
                list_data = crs.fetchall()
            return {"message": list_data}  # 返回实际数据
        except sqlite3.Error as e:
            return {"message": f"查询用户异常:{e}"}

    # 修改用户
    def update_people(self,id,name,age,address,phone):
        try:
            crs = self.connection.cursor()
            crs.execute("update people set name=?,age=?,address=?,phone=? where id=?",(name,age,address,phone,id,))
            self.connection.commit()
            return {"message": "修改成功"}
        except sqlite3.Error as e:
            return {"message": f"修改用户异常:{e}"}

    # 删除用户
    def delete_people(self,name,phone):
        try:
            crs = self.connection.cursor()
            crs.execute("delete from people where name=? and phone=?",(name,phone,))
            self.connection.commit()
            return {"message": "删除成功"}
        except sqlite3.Error as e:
            return {"message": f"删除用户异常:{e}"}

    # 关闭与数据库的连接
    def close_connection(self):
        if self.connection:
            self.connection.close()


db = DBHelper("user.db")
db.create_database()
db.create_table()

@app.get("/")
def root():
    return {"message": "Hello World"}


@app.get("/get_login")
def get_login(request: Request):
    username = request.query_params["username"]
    password = request.query_params["password"]
    response = db.login_table(username, password)
    return {"message": response["message"]}


@app.post("/get_register")
def get_register(request: Request):
    username = request.query_params["username"]
    password = request.query_params["password"]
    response = db.register_table(username, password)
    return {"message": response["message"]}

@app.post("/get_add_people")
def get_add_people(request: Request):
    name = request.query_params["name"]
    age = request.query_params["age"]
    address = request.query_params["address"]
    phone = request.query_params["phone"]
    response = db.add_people(name, age, address, phone)
    return {"message": response["message"]}

@app.get("/get_select_people")
def get_select_people(request: Request):
    name = request.query_params["name"]
    response = db.select_people(name)
    return {"查询结果": response["message"]}

@app.put("/get_update_people")
def get_update_people(request: Request):
    id = request.query_params["id"]
    name = request.query_params["name"]
    age = request.query_params["age"]
    address = request.query_params["address"]
    phone = request.query_params["phone"]
    response = db.update_people(id,name, age, address, phone)
    return {"message": response["message"]}

@app.delete("/get_delete_people")
def get_delete_people(request: Request):
    name = request.query_params["name"]
    phone = request.query_params["phone"]
    response = db.delete_people(name, phone)
    return {"message": response["data"]}

if __name__ == "__main__":
    uvicorn.run(app, host="127.0.0.1", port=8088)

10.2 客户端

import requests

url = ""

while True:
    input_type = int(input("请输入您的操作:1、登录\t\t2、注册\t\t3、退出:"))
    if input_type == 1:
        try:
            username = input("请输入您的用户名:")
            userpass = input("请输入您的用户名:")
            url = f"http://127.0.0.1:8088/get_login?username={username}&password={userpass}"
            res = requests.get(url)
            if res.status_code == 200:
                while True:
                    input_choice = int(input("请输入您的操作:1、查询\t\t2、添加\t\t3、修改\t\t4、删除\t\t5、退出:"))
                    if input_choice == 1:
                        try:
                            name = input("请输入您要查询的姓名:")
                            url = f"http://127.0.0.1:8088/get_select_people?name={name}"
                            res = requests.get(url)
                            if res.status_code == 200:
                                result = res.json()
                                # 服务器返回的是 {"查询结果": [元组数据]} 格式
                                if "查询结果" in result:
                                    data = result["查询结果"]
                                    if data:  # 如果有数据
                                        # 遍历每个元组并打印
                                        for row in data:
                                            print(f"{row[0]:<5} {row[1]:<10} {row[2]:<5} {row[3]:<15} {row[4]:<15}")
                                        print(f"共查询到 {len(data)} 条记录")
                                    else:
                                        print("未找到相关记录")
                                else:
                                    print("查询失败:", result)
                                continue
                        except Exception as e:
                            print(f"查询功能出错:{e}")
                    elif input_choice == 2:
                        try:
                            name = input("请输入您要添加的姓名:")
                            age = input("请输入您要添加的年龄:")
                            address = input("请输入您要添加的地址:")
                            phone = input("请输入您要添加的手机号:")
                            url = f"http://127.0.0.1:8088/get_add_people?name={name}&age={age}&address={address}&phone={phone}"
                            # params = {"name": name, "age": age, "address": address, "phone": phone}
                            res = requests.post(url)
                            if res.status_code == 200:
                                print(res.text)
                                continue
                            else:
                                print("添加失败")
                                continue
                        except Exception as e:
                            print(f"添加手机号出错:{e}")
                    elif input_choice == 3:
                        try:
                            p_id = int(input("请输入要修改的id:"))
                            p_name = input("请输入您的姓名:")
                            p_age = int(input("请输入您的年龄:"))
                            p_address = input("请输入您的地址:")
                            p_phone = input("请输入您的手机号:")
                            url = f"http://127.0.0.1:8088/get_update_people?id={p_id}&name={p_name}&age={p_age}&address={p_address}&phone={p_phone}"
                            # params = {"name": p_name, "age": p_age, "address": p_address, "phone": p_phone}
                            response = requests.put(url)
                            if response.status_code == 200:
                                print(response.text)
                                continue
                            else:
                                print("修改失败")
                                continue
                        except Exception as e:
                            print(f"修改功能出错:{e}")
                    elif input_choice == 4:
                        try:
                            name = input("请输入要删除的姓名:")
                            phone = input("请输入要删除的手机号:")
                            url = f"http://127.0.0.1:8088/get_delete_people?name={name}&phone={phone}"
                            # params = {"name": name, "phone": phone}
                            res = requests.delete(url)
                            if res.status_code == 200:
                                print("删除成功")
                                continue
                            else:
                                print("删除失败")
                                continue
                        except Exception as e:
                            print(f"删除功能出错:{e}")
                    else:
                        print("退出程序")
                        break
            else:
                print("登录失败")
        except Exception as e:
            print(f"登录功能出错:{e}")
    elif input_type == 2:
        try:
            url = "http://127.0.0.1:8088/get_register"
            input_name = input("请输入您要注册的用户名:")
            input_pass = input("请输入您要注册的密码:")
            params = {"username": input_name, "password": input_pass}
            res = requests.post(url, params=params)
            print(res)
            print(res.text)
        except Exception as e:
            print(f"注册功能出错:{e}")
    elif input_type == 3:
        break

注意:面向对象版本好像有点小问题,懒得改了,有兴趣的小伙伴可以改改,么么

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐