前言

本文发布仓促,后续会出一个详细的博文。

任务介绍

本次开发的核心任务是构建一个智能数据库查询系统,实现以下核心能力:

  1. 接收用户的自然语言问题(如 “技术部有多少个员工?”);
  2. 自动生成符合 MySQL 语法、匹配数据库表结构的 SQL 语句;
  3. 执行生成的 SQL 语句并获取查询结果;
  4. 将执行结果转化为简洁易懂的自然语言总结(先结论、再明细)。

整个系统基于 LangChain 框架实现组件化编排,大模型选用 DeepSeek-chat,确保 SQL 生成的准确性和结果总结的可读性。

任务流程

整个系统的执行流程分为三大步骤,通过 LangChain 的链路编排实现自动化执行:

步骤 1:初始化核心组件

  • 调用init_database_connection()建立数据库连接;
  • 调用init_deepseek_llm()初始化 DeepSeek 大模型;
  • 调用build_text2sql_chain()构建完整的 Text2SQL 链路。

步骤 2:生成 SQL 语句

  • 将数据库元信息(表结构)、用户问题注入 SQL 生成 Prompt;
  • 大模型根据 Prompt 规则生成仅包含 SQL 语句的纯文本;
  • 通过StrOutputParser解析生成结果,得到可执行的 SQL。

步骤 3:执行 SQL 并总结结果

  1. 调用run_sql()函数执行生成的 SQL,捕获执行异常(如语法错误、字段不存在);
  2. 将 “用户问题 + 执行的 SQL + 查询结果” 注入结果总结 Prompt;
  3. 大模型根据 Prompt 规则生成简洁的中文总结(先结论、再明细);
  4. 返回最终的自然语言答案。

LangChain 的核心组件介绍

本次开发用到的 LangChain 核心组件可分为四大类,各组件分工明确、可灵活组合:

1. 大模型交互组件(ChatOpenAI)

用于对接 DeepSeek 大模型(兼容 OpenAI API 格式),核心作用是生成 SQL、总结查询结果:

  • model:指定模型名称(如deepseek-chat);
  • api_key/base_url:配置 DeepSeek 的 API 密钥和请求地址;
  • temperature:设置为 0.1(低随机性),确保 SQL 生成的稳定性;
  • max_tokens/timeout:限制生成内容长度、设置请求超时时间。

2. 提示词模板(ChatPromptTemplate)

用于定义大模型的输入格式,分为 “SQL 生成 Prompt” 和 “结果总结 Prompt”:

  • 支持分角色配置(system/human):system 定义角色和规则,human 传递用户输入;
  • 支持变量插值(如{db.get_table_info()}{question}),动态注入数据库元信息和用户问题。

3. 输出解析器(StrOutputParser)

将大模型返回的 ChatMessage 格式转化为纯字符串,便于后续 SQL 执行、结果处理。

4. 链路编排(RunnablePassthrough)

实现多步骤任务的串联:

  • assign:为上下文字典新增字段(如生成 SQL 后新增sql字段,执行 SQL 后新增result字段);
  • 管道符|:实现组件的链式调用(Prompt → LLM → 解析器)。

5. 数据库工具(SQLDatabase)

封装数据库连接、SQL 执行、表结构查询等能力,核心方法:

  • from_uri:初始化数据库连接;
  • get_table_info():获取数据库表结构元信息(字段名、类型、表注释等);
  • run():执行 SQL 语句并返回结果。

环境配置

安装依赖

python-dotenv>=1.0.0
langchain>=0.1.0
langchain-openai>=0.1.0
langchain-community>=0.1.0
mysql-connector-python>=8.0.36
logging>=0.4.9.6

配置文件

# 数据库配置
DB_USER=root
DB_PASSWORD=your_db_password
DB_HOST=127.0.0.1
DB_PORT=3306
DB_NAME=your_db_name

# DeepSeek大模型配置
DEEPSEEK_API_KEY=your_deepseek_api_key
DEEPSEEK_BASE_URL=https://api.deepseek.com/v1

SQL代码

CREATE DATABASE IF NOT EXISTS company_hr_db DEFAULT CHARACTER SET utf8mb4;

USE company_hr_db;

-- 员工表
CREATE TABLE IF NOT EXISTS employee (
    emp_id INT PRIMARY KEY AUTO_INCREMENT COMMENT '员工ID',
    emp_name VARCHAR(50) NOT NULL COMMENT '员工姓名',
    gender ENUM('男','女') NOT NULL COMMENT '性别',
    age INT NOT NULL COMMENT '年龄',
    department VARCHAR(30) NOT NULL COMMENT '部门',
    salary DECIMAL(10,2) NOT NULL COMMENT '月薪',
    hire_date DATE NOT NULL COMMENT '入职日期'
);

SELECT * FROM employee;

-- 部门表
CREATE TABLE IF NOT EXISTS department (
    dept_id INT PRIMARY KEY AUTO_INCREMENT COMMENT '部门ID',
    dept_name VARCHAR(30) NOT NULL COMMENT '部门名称',
    dept_manager VARCHAR(50) NOT NULL COMMENT '部门经理',
    dept_location VARCHAR(50) NOT NULL COMMENT '部门地址'
);

-- 考勤表
CREATE TABLE IF NOT EXISTS attendance (
    att_id INT PRIMARY KEY AUTO_INCREMENT COMMENT '考勤ID',
    emp_id INT NOT NULL COMMENT '员工ID',
    work_date DATE NOT NULL COMMENT '考勤日期',
    work_hours DECIMAL(4,1) NOT NULL COMMENT '当日工时',
    status ENUM('正常','迟到','早退','旷工') NOT NULL COMMENT '考勤状态',
    FOREIGN KEY (emp_id) REFERENCES employee(emp_id)
);

-- 插入测试数据
INSERT INTO employee (emp_name, gender, age, department, salary, hire_date) VALUES
('张三', '男', 28, '技术部', 25000.00, '2020-05-10'),
('李四', '女', 32, '财务部', 18000.00, '2018-09-01'),
('王五', '男', 35, '技术部', 32000.00, '2017-03-15'),
('赵六', '女', 26, '人事部', 15000.00, '2021-07-20'),
('钱七', '男', 40, '财务部', 28000.00, '2015-11-05');

INSERT INTO department (dept_name, dept_manager, dept_location) VALUES
('技术部', '王五', '研发大楼A座'),
('财务部', '钱七', '行政大楼B层'),
('人事部', '赵六', '行政大楼A层');

INSERT INTO attendance (emp_id, work_date, work_hours, status) VALUES
(1, '2026-01-10', 8.0, '正常'),
(1, '2026-01-11', 7.5, '早退'),
(2, '2026-01-10', 8.0, '正常'),
(3, '2026-01-10', 9.0, '正常'),
(4, '2026-01-11', 8.0, '正常');

代码展示

请在执行了SQL代码并配置好环境在运行,否则会有各种报错。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import logging
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_community.utilities import SQLDatabase
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough

# 基础配置
load_dotenv()
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)


# 1. 数据库连接初始化
def init_database_connection() -> SQLDatabase:
    try:
        db_uri = f"mysql+mysqlconnector://{os.getenv('DB_USER')}:{os.getenv('DB_PASSWORD')}@{os.getenv('DB_HOST')}:{os.getenv('DB_PORT')}/{os.getenv('DB_NAME')}"
        db = SQLDatabase.from_uri(db_uri)
        logger.info("✅ 数据库连接成功!")
        logger.info(f"📚 数据库包含表: {db.get_usable_table_names()}")
        return db
    except Exception as e:
        logger.error(f"❌ 数据库连接失败: {str(e)}", exc_info=True)
        raise SystemExit(1)


# 2. DeepSeek大模型初始化
def init_deepseek_llm() -> ChatOpenAI:
    try:
        llm = ChatOpenAI(
            model="deepseek-chat",
            api_key=os.getenv("DEEPSEEK_API_KEY"),
            base_url=os.getenv("DEEPSEEK_BASE_URL"),
            temperature=0.1,
            max_tokens=2048,
            timeout=30
        )
        logger.info("✅ DeepSeek大模型初始化成功!")
        return llm
    except Exception as e:
        logger.error(f"❌ DeepSeek模型初始化失败: {str(e)}", exc_info=True)
        raise SystemExit(1)


# 3. 自定义SQL生成Prompt + 构建Text2SQL链路
def build_text2sql_chain(llm: ChatOpenAI, db: SQLDatabase):
    # 自定义SQL生成的Prompt(核心替代create_sql_query_chain)
    sql_generation_prompt = ChatPromptTemplate.from_messages([
        ("system", f"""
        你是专业的SQL生成专家,需要根据用户问题生成可执行的MySQL语句。
        数据库元信息:{db.get_table_info()}
        生成规则:
        1. 仅生成SQL语句,不包含任何解释、注释、换行符
        2. 严格匹配数据库表结构,字段名、表名必须准确
        3. 避免使用SELECT *,只查询必要字段
        4. 数值计算、过滤条件要精准(如入职时间2018年之前用 < '2018-01-01')
        """),
        ("human", "请根据以下问题生成MySQL语句:{question}")
    ])

    # SQL执行函数
    def run_sql(sql: str) -> str:
        try:
            logger.info(f"📝 即将执行SQL语句: {sql}")
            result = db.run(sql)
            logger.info(f"✅ SQL执行成功,结果: {result}")
            return result
        except Exception as e:
            logger.error(f"❌ SQL执行失败: {str(e)}", exc_info=True)
            return f"SQL执行异常: {str(e)}"

    # 结果总结Prompt
    answer_prompt = ChatPromptTemplate.from_messages([
        ("system", """
        你是数据库结果分析专家,任务是:
        1. 接收用户问题、生成的SQL、执行结果
        2. 用简洁中文总结结果,先结论再明细
        """),
        ("human", "用户问题: {question}\n执行的SQL: {sql}\n查询结果: {result}\n请总结答案:")
    ])

    # 构建完整链路:生成SQL → 执行SQL → 总结结果
    full_chain = (
        # 第一步:生成SQL
        RunnablePassthrough.assign(
            sql=sql_generation_prompt | llm | StrOutputParser()
        )
        # 第二步:执行SQL
        .assign(result=lambda x: run_sql(x["sql"]))
        # 第三步:总结结果
        | answer_prompt
        | llm
        | StrOutputParser()
    )
    return full_chain


# 4. 主函数(测试用例)
def main():
    # 初始化核心组件
    db = init_database_connection()
    llm = init_deepseek_llm()
    text2sql_chain = build_text2sql_chain(llm, db)

    logger.info("\n=====================================")
    logger.info("💡 智能数据库查询系统(硬编码测试用例)")
    logger.info("=====================================\n")

    # 测试问题
    test_questions = [
        "技术部有多少个员工?",
        "财务部员工的平均工资是多少?",
        "入职时间在2018年之前的技术部员工姓名和薪资"
    ]

    # 执行测试
    for idx, question in enumerate(test_questions, 1):
        logger.info(f"\n【测试用例{idx}】正在处理查询问题: {question}")
        try:
            answer = text2sql_chain.invoke({"question": question})
            print("\n" + "=" * 50)
            print(f"✅ 测试用例{idx} - 最终查询答案:")
            print(answer)
            print("=" * 50 + "\n")
        except Exception as e:
            logger.error(f"❌ 测试用例{idx}处理失败: {str(e)}", exc_info=True)
            print(f"\n⚠️  测试用例{idx}查询出错:{str(e)}\n")


if __name__ == "__main__":
    main()
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐