引言

经过前两篇文章的铺垫,我们已经掌握了openGauss的部署安装和使用Data Studio进行可视化管理。现在,我们来到了本系列文章的终章,将目光聚焦于开发者最关心的环节——如何在应用程序中与openGauss进行交互,并探索其在AI领域的应用潜力。

本文将以目前最流行的编程语言之一Python为例,详细演示如何连接openGauss数据库,并围绕两个典型的业务场景——“用户管理系统”和“订单支付流程”,构建完整的CRUD(创建、读取、更新、删除)与事务处理代码示例。更进一步,我们将结合业界热点,探讨如何利用openGauss的向量计算能力,构建一个简单的以文搜图RAG(检索增强生成)应用,充分展现openGauss作为一款AI原生数据库的魅力。


image.png

本文的后端环境为“CentOS 7.9 + openGauss 极简版(simpleInstall,6.x)”,数据库运行在云服务器上,应用在本地开发机(Windows/macOS/Linux)运行并远程连接到数据库。若远程连接失败,请参考第二篇中关于listen_addressespg_hba.conf的配置说明。

Python连接openGauss:psycopg2驱动

openGauss兼容PostgreSQL生态,因此,我们可以使用PostgreSQL最成熟的Python驱动psycopg2来连接和操作openGauss数据库。

1. 安装psycopg2

首先,安装Python驱动。请根据环境选择以下命令:

  • Python 3.8 及以上(推荐):
python3 -m pip install --user psycopg2-binary
  • CentOS 7 + Python 3.6(需固定版本并使用国内镜像):
python3 -m pip install --upgrade "pip==21.3.1" --user -i https://pypi.tuna.tsinghua.edu.cn/simple
python3 -m pip install --user -i https://mirrors.aliyun.com/pypi/simple "psycopg2-binary==2.8.6"
# 如网络较慢,可追加 --default-timeout=120

e1821a45e4b7bcadd15800e7f81934cf.png

提示:尽量使用 python3 -m pip(或 pip3),并在非虚拟环境下加 --user 以避免权限冲突;若需源码编译,可改为安装 psycopg2==2.8.6,并提前安装构建依赖(如 gccpython3-develpostgresql-libs/postgresql-devel)。

常见安装问题排障:

  • 下载超时:加入 --default-timeout=120 或切换镜像源(如清华、阿里)。
  • 证书/代理:在公司网络需配置 HTTPS 代理或临时关闭 MITM 检查;也可离线下载 .whl 后本地安装。
  • 权限警告:在 root 环境下建议使用 --user 或创建虚拟环境。
  • Windows 与多版本 Python 映射:在 PowerShell 执行 python -Vpip -Vpy -0p,确保 pippython 指向同一解释器;优先使用 python -m pip install --user psycopg2-binary
  • 路径检查:where pythonwhere pip 查看可执行路径;必要时使用目标解释器绝对路径,例如 C:\Python39\python.exe -m pip install psycopg2-binary
  • 快速验证安装:python -c "import psycopg2, sys; print(psycopg2.__version__, sys.executable)",确认驱动版本与解释器路径匹配。
  • 源码编译提示:若报缺少 pg_config 或开发头文件,优先使用 psycopg2-binary;或先安装 postgresql-libs/postgresql-develpython3-devel 后再编译 psycopg2

2. 建立数据库连接

连接openGauss数据库非常简单,只需提供主机、端口、用户名、密码和数据库名即可。下面的示例与入门篇默认参数保持一致(请替换为实际的服务器IP与密码)

实操步骤(本地开发机):

  • 在本地任意工作目录创建项目并新建连接测试脚本:
# Windows PowerShell
mkdir opengauss_demo
cd opengauss_demo
# Linux/macOS 可用:
# mkdir -p opengauss_demo
# 如尚未安装驱动,请先执行:
python3 -m pip install --user psycopg2-binary

image.png

在该目录中新建文件 db_connect_test.py,内容如下:

# db_connect_test.py
import psycopg2

def get_db_connection():
    try:
        conn = psycopg2.connect(
            host="YOUR_SERVER_IP",   # 替换为云服务器公网IP
            port=5432,
            user="omm",
            password="Gauss@123456", # 替换为实际密码
            database="postgres",
            connect_timeout=5
        )
        print("数据库连接成功!")
        return conn
    except psycopg2.OperationalError as e:
        print(f"数据库连接失败: {e}")
        return None

if __name__ == "__main__":
    conn = get_db_connection()
    if conn:
        conn.close()

运行与验证(Windows/ macOS/ Linux):

python3 db_connect_test.py
# 若 Windows 上没有 python3 命令,可用:
# python db_connect_test.py

预期输出:出现“数据库连接成功!”并退出;如报错,按下方“远程连接失败排查要点”逐项检查。

若出现连接超时或认证失败,请回到第二篇确认:

  • postgresql.conflisten_addresses是否为'*'或对应IP;
  • pg_hba.conf是否添加了允许远程访问的host条目(md5认证);
  • 云安全组是否开放TCP 5432入站规则到您的本地IP范围。

连接失败快速排查:

  • 替换占位符:确认已将 YOUR_SERVER_IPGauss@123456 替换为真实值;若脚本报 NoneType 错误,通常是连接失败导致返回 None
  • 端口连通性:Windows 执行 Test-NetConnection YOUR_SERVER_IP -Port 5432;Linux/macOS 执行 telnet YOUR_SERVER_IP 5432nc -vz YOUR_SERVER_IP 5432
  • 认证规则:在数据库侧 pg_hba.conf 添加合适的 host 条目(如 md5),修改后 gs_ctl reload 或重启实例。
  • 初始用户远程连接说明:不建议用初始用户(如 omm)作为应用远程账号;推荐新建业务用户(如 xsc)并授予表/序列权限,见下文“用户管理”授权方案。
  • 数据库/Schema:确认连接的 database="postgres"search_path 包含 public;使用限定名 public.t_user 可避免路径问题。

场景一:用户管理系统的CRUD实战

用户管理是几乎所有应用的标配。下面,我们将实现一个简单的用户管理模块,包含增、删、改、查功能。

1. 创建用户表

首先,在数据库中创建t_user表。

CREATE TABLE t_user (
    id SERIAL PRIMARY KEY,
    username VARCHAR(50) UNIQUE NOT NULL,
    email VARCHAR(100) NOT NULL,
    created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);

实操步骤(数据库侧):

  • 使用 gsql 远程连接到 openGauss:
gsql -d postgres -h YOUR_SERVER_IP -p 5432 -U omm -W
  • 执行上述 CREATE TABLE 语句创建表。
  • 验证:
\dt
SELECT * FROM t_user;

image.png

2. Python代码实现

在本地开发机创建目录(例如 opengauss_demo),新增文件 user_crud.py,内容如下:

# user_crud.py

import psycopg2
from contextlib import closing


def get_db_connection():
    try:
        conn = psycopg2.connect(
            host="YOUR_SERVER_IP",
            port=5432,
            user="omm",
            password="Gauss@123456",
            database="postgres",
            connect_timeout=5,
        )
        print("数据库连接成功!")
        return conn
    except psycopg2.OperationalError as e:
        print(f"数据库连接失败: {e}")
        return None


def create_user(username, email):
    with closing(get_db_connection()) as conn:
        with conn.cursor() as cursor:
            cursor.execute(
                "INSERT INTO t_user (username, email) VALUES (%s, %s)",
                (username, email),
            )
            conn.commit()
            print(f"用户 '{username}' 创建成功。")


def get_user_by_username(username):
    with closing(get_db_connection()) as conn:
        with conn.cursor() as cursor:
            cursor.execute(
                "SELECT id, username, email FROM t_user WHERE username = %s",
                (username,),
            )
            return cursor.fetchone()


def update_user_email(username, new_email):
    with closing(get_db_connection()) as conn:
        with conn.cursor() as cursor:
            cursor.execute(
                "UPDATE t_user SET email = %s WHERE username = %s",
                (new_email, username),
            )
            conn.commit()
            print(f"用户 '{username}' 的邮箱更新成功。")


def delete_user(username):
    with closing(get_db_connection()) as conn:
        with conn.cursor() as cursor:
            cursor.execute(
                "DELETE FROM t_user WHERE username = %s",
                (username,),
            )
            conn.commit()
            print(f"用户 '{username}' 删除成功。")


if __name__ == "__main__":
    create_user("john_doe", "john.doe@example.com")
    user = get_user_by_username("john_doe")
    print(f"查询到用户: {user}")
    update_user_email("john_doe", "john.doe.new@example.com")
    user = get_user_by_username("john_doe")
    print(f"更新后用户: {user}")
    delete_user("john_doe")

运行与验证(本地开发机):

cd opengauss_demo
python3 user_crud.py

预期输出:依次打印“数据库连接成功”、“创建成功”、“查询到用户”、“更新后用户”、“删除成功”。
如需数据库侧二次确认:

SELECT * FROM t_user WHERE username='john_doe';

image.png

常见报错与解决(用户管理):

我用应用账号 xsc 远程连库,但表 t_user 是初始用户 omm 创建和拥有;默认权限下, xsc 没有对该表的操作权限,因此出现 “permission denied for relation t_user”。另外 openGauss 不建议用初始用户做远程应用连接,这是安全最佳实践。
image.png

  • 报错 psycopg2.errors.InsufficientPrivilege: permission denied for relation t_user:业务用户(如 xsc)缺少权限。请在服务器上以所有者/管理员(通常为 omm)执行:

    • GRANT USAGE ON SCHEMA public TO xsc;
    • GRANT SELECT, INSERT, UPDATE, DELETE ON TABLE public.t_user TO xsc;
    • GRANT USAGE, SELECT ON SEQUENCE public.t_user_id_seq TO xsc;
      序列授权是因为 SERIAL 会创建隐式序列,插入时需读写序列。

    image.png

  • 可选(需谨慎):将所有者变更为业务用户以简化权限管理:

    • ALTER TABLE public.t_user OWNER TO xsc;
    • ALTER SEQUENCE public.t_user_id_seq OWNER TO xsc;
  • 代码健壮性建议:在每个操作前判断连接是否成功,并使用表的限定名:

def create_user(username, email):
    conn = get_db_connection()  # 可将 user 改为业务账号,如 xsc
    if not conn:
        print("连接失败,已跳过 insert")
        return
    try:
        with conn.cursor() as cur:
            cur.execute(
                "INSERT INTO public.t_user (username, email) VALUES (%s, %s)",
                (username, email),
            )
        conn.commit()
    finally:
        conn.close()

def get_user_by_username(username):
    conn = get_db_connection()
    if not conn:
        return None
    try:
        with conn.cursor() as cur:
            cur.execute(
                "SELECT id, username, email FROM public.t_user WHERE username = %s",
                (username,),
            )
            return cur.fetchone()
    finally:
        conn.close()
  • 授权快速验证:
    • gsql -d postgres -h YOUR_SERVER_IP -p 5432 -U xsc -W
    • 依次执行:INSERT/SELECT/UPDATE/DELETEpublic.t_user,确认无权限报错。

场景二:订单支付流程的事务处理

在电商等金融敏感场景中,数据一致性至关重要。我们将模拟一个“用户下单扣减库存”的场景,来演示如何使用事务保证操作的原子性。

1. 创建商品表和订单表

CREATE TABLE t_product (
    id SERIAL PRIMARY KEY,
    name VARCHAR(100) NOT NULL,
    stock INT NOT NULL
);

CREATE TABLE t_order (
    id SERIAL PRIMARY KEY,
    product_id INT NOT NULL,
    quantity INT NOT NULL,
    created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);

-- 插入一个商品
INSERT INTO t_product (name, stock) VALUES ('openGauss T-shirt', 100);

实操步骤(数据库侧):

  • 连接数据库(同上):
gsql -d postgres -h YOUR_SERVER_IP -p 5432 -U omm -W
  • 依次执行创建两张表与初始化数据的 SQL;可用下列查询验证:
\dt
SELECT id, name, stock FROM t_product;  -- 应看到一条库存为100的商品
SELECT COUNT(*) FROM t_order;           -- 初始为0

2. Python代码实现

在本地开发机创建文件 transaction_example.py,内容如下:

# transaction_example.py

import psycopg2


def get_db_connection():
    try:
        return psycopg2.connect(
            host="YOUR_SERVER_IP",
            port=5432,
            user="omm",
            password="Gauss@123456",
            database="postgres",
            connect_timeout=5,
        )
    except psycopg2.OperationalError as e:
        print(f"数据库连接失败: {e}")
        return None


def place_order(product_id, quantity):
    conn = get_db_connection()
    if not conn:
        return

    try:
        with conn.cursor() as cursor:
            # 1. 检查库存并加行锁,避免并发下读到旧库存
            cursor.execute(
                "SELECT stock FROM t_product WHERE id = %s FOR UPDATE",
                (product_id,),
            )
            row = cursor.fetchone()
            if not row:
                raise ValueError("商品不存在!")
            stock = row[0]
            if stock < quantity:
                raise ValueError("库存不足!")

            # 2. 扣减库存
            cursor.execute(
                "UPDATE t_product SET stock = stock - %s WHERE id = %s",
                (quantity, product_id),
            )

            # 3. 创建订单
            cursor.execute(
                "INSERT INTO t_order (product_id, quantity) VALUES (%s, %s)",
                (product_id, quantity),
            )

            conn.commit()
            print("下单成功!")

    except (Exception, psycopg2.Error) as error:
        print(f"下单失败: {error}")
        conn.rollback()
    finally:
        conn.close()


if __name__ == "__main__":
    place_order(1, 5)   # 成功下单,库存从100减到95
    place_order(1, 100) # 超卖示例,将抛出“库存不足!”并回滚

运行与验证:

python3 transaction_example.py

可在数据库侧验证:

SELECT id, stock FROM t_product WHERE id=1;   -- 应为95
SELECT id, product_id, quantity FROM t_order; -- 应新增一条 quantity=5 的订单

说明:FOR UPDATE 会在查询到的行上加排他锁,确保并发事务不会在你提交前修改该行,避免超卖;一旦发生异常,代码会执行 rollback() 保证库存与订单的一致性。

这段代码确保了“扣减库存”和“创建订单”这两个操作要么同时成功,要么同时失败,避免了数据不一致的问题。

常见报错与处理(事务):

  • 权限问题:业务用户对 t_product 需至少 SELECT, UPDATE,对 t_orderINSERT, SELECT,并为隐式序列授予 USAGE, SELECT(例如 t_product_id_seqt_order_id_seq)。
  • 并发冲突:若遇到 could not serialize access due to concurrent update,说明并发更新冲突;保留 FOR UPDATE,在应用侧捕获并重试,或确认隔离级别为 READ COMMITTED
  • 异常处理:任何异常均应 rollback() 保证一致性,提交成功后再打印“下单成功”。

场景三:AI赋能 - 基于向量的以文搜图(RAG)

现在,让我们进入最激动人心的部分。openGauss 5.0版本后引入了对向量数据类型和向量计算的内置支持,使其成为构建AI应用的理想选择。我们将构建一个简单的RAG应用,通过文本描述来搜索相似的图片。

1. 开启向量插件并创建表

首先,需要在数据库中开启vectors插件

CREATE EXTENSION vectors;

说明:部分发行版或构建中扩展名可能为 vector(pgvector),若上面的命令报未找到扩展,可尝试:

CREATE EXTENSION vector;

然后,创建一个用于存储图片向量的表。本文使用clip-ViT-B-32模型,其默认输出维度为512,因此示例使用VECTOR(512)

CREATE TABLE t_image_vectors (
    id SERIAL PRIMARY KEY,
    image_path VARCHAR(255) NOT NULL,
    image_vector VECTOR(512)
);

验证与注意:

  • 使用 \dx 查看已启用的扩展;若显示 vectors/vector 即成功。
  • \d t_image_vectors 可查看表结构;向量字段类型为 vector(512)

2. Python代码实现

这个场景需要借助一个能够将文本和图片转换为向量的模型。这里我们使用sentence-transformers库作为示例。

python3 -m pip install --user -i https://mirrors.aliyun.com/pypi/simple sentence-transformers Pillow

前置说明:该步骤建议在本地开发机的 Python 3.8+ 环境执行(sentence-transformers 对 Py3.6 支持不佳)。首次运行会从 HuggingFace 下载模型,需联网。

在本地准备工程与图片:

mkdir -p opengauss_demo/assets/images
# 将两张示例图片保存为:opengauss_demo/assets/images/cat.jpg、dog.jpg
# rag_example.py

import psycopg2
from sentence_transformers import SentenceTransformer
from PIL import Image


def get_db_connection():
    try:
        return psycopg2.connect(
            host="YOUR_SERVER_IP",
            port=5432,
            user="omm",
            password="Gauss@123456",
            database="postgres",
            connect_timeout=5,
        )
    except psycopg2.OperationalError as e:
        print(f"数据库连接失败: {e}")
        return None


model = SentenceTransformer("clip-ViT-B-32")


def insert_image_vector(image_path: str) -> None:
    conn = get_db_connection()
    if not conn:
        return

    try:
        img_embedding = model.encode(Image.open(image_path))  # 512维
        vector_str = "[" + ",".join(str(float(x)) for x in img_embedding.tolist()) + "]"

        with conn.cursor() as cursor:
            cursor.execute(
                "INSERT INTO t_image_vectors (image_path, image_vector) VALUES (%s, %s::vector)",
                (image_path, vector_str),
            )
            conn.commit()
            print(f"已入库向量:{image_path}")
    except Exception as e:
        print(f"处理图片失败: {e}")
        conn.rollback()
    finally:
        conn.close()


def search_images_by_text(text_query: str, top_k: int = 3):
    conn = get_db_connection()
    if not conn:
        return []

    try:
        query_embedding = model.encode(text_query)
        vector_str = "[" + ",".join(str(float(x)) for x in query_embedding.tolist()) + "]"

        with conn.cursor() as cursor:
            cursor.execute(
                "SELECT image_path, image_vector <-> %s::vector AS dist FROM t_image_vectors ORDER BY dist LIMIT %s",
                (vector_str, top_k),
            )
            return cursor.fetchall()
    finally:
        conn.close()


if __name__ == "__main__":
    # 将两张图片向量化并入库
    insert_image_vector("assets/images/cat.jpg")
    insert_image_vector("assets/images/dog.jpg")

    # 文本检索
    query = "A photo of a cute animal sitting on the grass"
    results = search_images_by_text(query, top_k=3)

    print(f"\n查询:{query}")
    for path, dist in results:
        print(f"- 图片: {path},L2距离: {dist:.4f}")

运行与验证:

cd opengauss_demo
python3 rag_example.py

预期输出:打印两条“已入库向量:…”后,返回按距离排序的图片列表。距离越小表示越相似。

补充说明:

  • <-> 是向量相似度的距离操作符(L2距离)。若发行版提供 l2_distance(v1,v2),也可替换为该函数。
  • 如需加速检索,可建立向量索引(例如 ivfflat/hnsw),不同版本支持情况不一,配置前请确认。

这个例子展示了openGauss如何无缝集成到AI工作流中。通过在数据库层面进行向量检索,可以简化RAG应用的架构,降低开发复杂度,并利用数据库的事务性和可靠性来管理AI数据。

小贴士:

  • 若您的版本提供l2_distance(v1, v2)函数,也可将示例中的image_vector <-> %s::vector替换为l2_distance(image_vector, %s::vector)
  • 向量索引(如ivfflathnsw)可显著提升检索性能,但不同版本支持情况可能不同,建议在功能确认后再配置索引。

常见问题与解决(RAG 向量检索):

  • 扩展不存在:ERROR: extension "vectors" does not existtype "vector" does not exist → 执行 CREATE EXTENSION vectors;,若无则尝试 CREATE EXTENSION vector;,并用 \dx 检查扩展已启用。
  • 操作符缺失:operator does not exist: vector <-> vector → 未启用向量扩展或版本不匹配;启用扩展或改用 l2_distance(...)
  • 向量文本格式错误:invalid input syntax for type vector → 确保是类似 [0.1,0.2,...] 的 JSON 风格列表,并在 SQL 中使用 %s::vector 显式类型转换。
  • 模型下载失败:受公司网络限制可配置代理或离线下载模型;也可切换 pip 镜像安装依赖(示例已使用阿里镜像)。
  • Pillow 依赖:Linux 发行版可能需安装 libjpeg/zlib 等系统库;Windows/macOS 通常无需额外配置。

常见错误速查表:

  • ModuleNotFoundError: No module named psycopg2 → 用目标解释器执行 python -m pip install --user psycopg2-binary,并用 python -c "import psycopg2" 验证。
  • Windows python3 不识别 → 直接用 pythonpy -3,优先 python -m pip
  • psycopg2.OperationalError: could not connect to server/超时 → 检查 listen_addressespg_hba.conf 与安全组;本机用 Test-NetConnection/telnet 验证 5432 端口连通。
  • permission denied for relation/sequence ... → 按上文为业务用户授予 SCHEMATABLE 与隐式序列权限。
  • relation "t_user" does not exist → 确认已创建表、连接的数据库正确,或在 SQL 中使用 public.t_user 限定名。
  • extension "vectors" does not exist/type "vector" does not exist → 安装并启用向量扩展,或使用兼容扩展名 vector

总结

从基础的CRUD操作到复杂的事务管理,再到前沿的AI向量应用,openGauss通过其强大的功能和对主流开发生态的良好兼容性,证明了其在现代应用开发中的核心价值。特别是其原生的向量数据库能力,为企业在AI时代构建智能应用提供了坚实、高效且易于管理的数据底座。

希望这个系列的文章能帮助您对openGauss有一个全面而深入的了解。数据库的世界广阔无垠,而openGauss的探索之旅,才刚刚开始。欢迎您继续深入研究,发掘它更多的可能性!

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐