智能问数 —— 学校数据中台 AI 查询助手 技术方案设计文档
1. 项目概述
1.1 项目背景
学校数据中台积累了大量结构化数据(学生信息、教务管理、科研项目、财务数据、人事信息等),共计 47 张业务表、330 个字段、18 个业务指标。传统的数据查询需要编写 SQL,对非技术人员存在较高门槛。
智能问数 系统旨在让用户通过自然语言提问(如"统计各学院的学生人数"),系统自动理解意图、生成 SQL、执行查询并返回结构化结果,实现 NL2SQL(Natural Language to SQL) 的完整闭环。
1.2 核心能力
| 能力 | 说明 |
|---|---|
| 自然语言理解 | 从用户问题中提取关键词、识别查询意图 |
| 多路召回 | 字段(向量)+ 指标(向量)+ 字段取值(全文)三路并行召回 |
| LLM 精确过滤 | 利用大模型对召回结果进行语义裁剪,保留最相关的表和字段 |
| SQL 生成与校验 | LLM 生成 SQL → 数据库 EXPLAIN 验证 → 错误时 LLM 自动纠正 |
| 实时流式响应 | 基于 SSE(Server-Sent Events)将各节点执行进度实时推送到前端 |
1.3 整体架构图
┌─────────────────────────────────────────────────────────────────────────┐
│ Frontend Layer │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ PC (3000) │ │ H5 (3001) │ Vue 3 + Vite │
│ └──────┬───────┘ └──────┬───────┘ │
│ └───────────┬───────┘ │
│ │ POST /api/query (SSE) │
├─────────────────────┼───────────────────────────────────────────────────┤
│ ▼ │
│ FastAPI (18000) │
│ ┌───────────────────────┐ │
│ │ API / Router Layer │ │
│ │ ┌─────────────────┐ │ │
│ │ │ QueryService │ │ │
│ │ └────────┬────────┘ │ │
│ └───────────┼───────────┘ │
│ ▼ │
│ LangGraph Agent Pipeline │
│ ┌──────────────────────────────────────────────────────────────────┐ │
│ │ extract recall_column ─┐ │ │
│ │ keywords → recall_value ─┼→ merge → filter_table ─┐ │ │
│ │ recall_metric ─┘ filter_metric ─┼→ add_extra │ │
│ │ → generate │ │
│ │ → validate ──┬─→ execute (success) │ │
│ │ └─→ correct → execute (retry) │ │
│ └──────────────────────────────────────────────────────────────────┘ │
│ │ │ │ │ │
│ ▼ ▼ ▼ ▼ │
│ ┌──────────┐ ┌────────┐ ┌────────┐ ┌─────────┐ │
│ │ LLM │ │ Qdrant │ │ ES │ │ MySQL │ │
│ │(qwen-max)│ │(向量DB)│ │(全文) │ │(Meta+DW)│ │
│ └──────────┘ └────────┘ └────────┘ └─────────┘ │
│ ▲ │
│ ┌──────────┐ │
│ │Embedding │ │
│ │ (TEI) │ │
│ └──────────┘ │
└─────────────────────────────────────────────────────────────────────────┘
2. 技术栈总览
| 层次 | 技术 | 版本/说明 |
|---|---|---|
| 后端框架 | FastAPI + Uvicorn | Python 异步 Web 框架,端口 18000 |
| Agent 编排 | LangGraph | 基于状态图的 Agent 工作流编排引擎 |
| LLM 调用 | LangChain + langchain-openai | 统一的 LLM 调用层 |
| 大语言模型 | 阿里云通义千问 qwen-max | 通过 DashScope OpenAI 兼容接口调用 |
| 文本向量化 | BAAI/bge-large-zh-v1.5 | 通过 TEI(Text Embeddings Inference)本地部署,1024 维 |
| 向量数据库 | Qdrant | 字段信息和指标信息的向量索引与语义检索 |
| 全文搜索引擎 | Elasticsearch | 字段取值的全文索引与中文分词检索 |
| 关系型数据库 | MySQL(阿里云 RDS) | Meta 库(元数据)+ DW 库(数据仓库) |
| ORM | SQLAlchemy 2.0 (async) + asyncmy | 异步数据库访问 |
| 配置管理 | OmegaConf | 结构化 YAML 配置与 dataclass 映射 |
| 日志 | Loguru | 结构化日志,支持 request_id 链路追踪 |
| 前端 | Vue 3 + Vite | PC 端 + H5 移动端 |
3. 外部依赖与中间件详解
3.1 大语言模型 —— 阿里云通义千问(qwen-max)
作用:系统的"大脑",承担所有需要语义理解和推理的任务。
调用方式:通过阿里云 DashScope 提供的 OpenAI 兼容接口,使用 LangChain 的 init_chat_model 统一初始化。
# app/agent/llm.py
llm = init_chat_model(
model=app_config.llm.model_name, # qwen-max
model_provider="openai",
api_key=app_config.llm.api_key,
base_url=app_config.llm.base_url, # https://dashscope.aliyuncs.com/compatible-mode/v1
temperature=0 # 确保输出确定性
)
在 Agent 中的使用场景(共 6 次 LLM 调用):
| 节点 | 调用目的 | 输出格式 |
|---|---|---|
| extract_keywords | 从自然语言中提取关键词 | JSON 数组 |
| recall_column | 扩展字段召回关键词 | JSON 数组 |
| recall_metric | 扩展指标召回关键词 | JSON 数组 |
| recall_value | 扩展字段取值召回关键词 | JSON 数组 |
| filter_table | 从候选表和字段中裁剪出必需的 | JSON 对象 |
| filter_metric | 从候选指标中筛选出必需的 | JSON 数组 |
| generate_sql | 根据上下文生成最终 SQL | SQL 文本 |
| correct_sql | 当 SQL 验证失败时纠正 | SQL 文本 |
配置位置:conf/app_config.yaml → llm 节
llm:
model_name: qwen-max
api_key: sk-xxxxx
base_url: https://dashscope.aliyuncs.com/compatible-mode/v1
3.2 文本向量化服务 —— TEI + bge-large-zh-v1.5
作用:将文本转换为 1024 维向量,用于语义检索。向量化后的字段信息和指标信息存入 Qdrant,查询时将用户关键词向量化后进行相似度搜索。
部署方式:本地部署 HuggingFace TEI(Text Embeddings Inference)服务,加载 BAAI/bge-large-zh-v1.5 中文向量模型。
调用方式:通过 LangChain 的 HuggingFaceEndpointEmbeddings 客户端。
# app/clients/embedding_client_manager.py
class EmbeddingClientManager:
def init(self):
self.client = HuggingFaceEndpointEmbeddings(
model=f"http://{config.host}:{config.port}" # http://localhost:8888
)
使用场景:
- 知识库构建时:将字段名称、描述、别名分别向量化后写入 Qdrant;将指标名称、描述、别名向量化后写入 Qdrant
- 查询时:将用户关键词向量化后在 Qdrant 中检索相似的字段/指标
配置位置:conf/app_config.yaml → embedding 节
embedding:
host: localhost
port: 8888
model: BAAI/bge-large-zh-v1.5
3.3 向量数据库 —— Qdrant
作用:存储字段信息和指标信息的向量索引,支持基于语义相似度的快速检索(余弦距离)。
部署位置:远程服务器(8.136.50.72:6333),通过 API Key 认证。
客户端:qdrant_client.AsyncQdrantClient(异步客户端)。
# app/clients/qdrant_client_manager.py
class QdrantClientManager:
def init(self):
self.client = AsyncQdrantClient(
host=config.host, # 8.136.50.72
port=config.port, # 6333
api_key=config.api_key
)
包含两个 Collection:
| Collection | 存储内容 | 向量来源 | Payload 结构 |
|---|---|---|---|
test-smart-data-query-column |
字段信息 | 字段名/描述/别名的向量 | ColumnInfo(id, name, type, role, examples, description, alias, table_id) |
test-smart-data-query-metric |
指标信息 | 指标名/描述/别名的向量 | MetricInfo(id, name, description, relevant_columns, alias) |
Repository 层:
ColumnQdrantRepository—— 字段向量的写入与检索MetricQdrantRepository—— 指标向量的写入与检索
检索参数:
await client.query_points(
collection_name=collection_name,
query=embedding, # 查询向量
score_threshold=0.6, # 最低相似度阈值
limit=5 # 每次检索最多返回 5 条
)
配置位置:conf/app_config.yaml → qdrant 节
qdrant:
host: 8.136.50.72
port: 6333
embedding_size: 1024
api_key: xxxxx
column_collection: test-smart-data-query-column
metric_collection: test-smart-data-query-metric
3.4 全文搜索引擎 —— Elasticsearch
作用:存储字段取值的全文索引,支持基于中文分词的模糊匹配检索。当用户提问中包含具体的实体名称(如"中医学院""针灸推拿学院"等),通过 ES 全文检索可以匹配到对应字段及其取值。
部署位置:阿里云 Elasticsearch 服务(公网访问),通过 basic_auth 认证。
客户端:elasticsearch.AsyncElasticsearch。
# app/clients/es_client_manager.py
class ESClientManager:
def init(self):
self.client = AsyncElasticsearch(
hosts=[f"http://{config.host}:{config.port}"],
basic_auth=(config.user, config.password)
)
索引设计:
| 索引名 | 字段 | 类型 | 分析器 |
|---|---|---|---|
test-smart-data-query |
id | keyword | — |
| value | text | ik_max_word(中文分词) | |
| column_id | keyword | — |
ik_max_word 分析器会将中文文本进行最大粒度分词,例如"针灸推拿学院"会被分为"针灸"“推拿”“学院”“针灸推拿”"针灸推拿学院"等 token,从而提高召回率。
数据来源:meta_config.yaml 中标记 sync: true 的字段,从数据仓库中查询实际取值后写入 ES。共有 93 个字段被标记为需要同步。
检索方式:
await client.search(
index=index_name,
query={"match": {"value": keyword}},
min_score=0.6, # 最低分数阈值
size=5 # 每次最多返回 5 条
)
配置位置:conf/app_config.yaml → es 节
es:
host: es-cn-xxxxx.public.elasticsearch.aliyuncs.com
port: 9200
index_name: test-smart-data-query
user: elastic
password: xxxxx
3.5 关系型数据库 —— MySQL(阿里云 RDS)
作用:项目使用两个 MySQL 数据库实例(同一 RDS 实例的不同 database):
| 数据库 | 名称 | 作用 |
|---|---|---|
| Meta 库 | smart-data-query-meta |
存储元数据知识库(表信息、字段信息、指标信息、字段-指标关联关系) |
| DW 库 | edu_core |
学校数据仓库,包含 47 张业务数据表,是最终 SQL 的执行目标 |
客户端:SQLAlchemy 2.0 异步引擎 + asyncmy 驱动。
# app/clients/mysql_client_manager.py
class MySQLClientManager:
def init(self):
self.engine = create_async_engine(
f"mysql+asyncmy://{config.user}:{config.password}@{config.host}:{config.port}/{config.database}"
)
self.session_factory = async_sessionmaker(self.engine)
Meta 库表结构(4 张表):
table_info 字段表信息(id, name, role, description)
column_info 字段信息(id, name, type, role, examples, description, alias, table_id)
metric_info 指标信息(id, name, description, relevant_columns, alias)
column_metric 字段-指标关联表(column_id, metric_id)
DW 库使用场景:
| 操作 | 节点/脚本 | 说明 |
|---|---|---|
SHOW COLUMNS FROM table |
知识库构建 | 获取字段类型 |
SELECT DISTINCT column FROM table |
知识库构建 | 获取字段样本值 |
SELECT VERSION() |
add_extra_context | 获取数据库版本信息 |
EXPLAIN sql |
validate_sql | 验证生成的 SQL 语法 |
执行最终 SQL |
execute_sql | 执行查询并返回结果 |
Repository 层:
MetaMySQLRepository—— 元数据的 CRUD(表信息、字段信息、指标信息、关联关系)DWMySQLRepository—— 数据仓库操作(字段类型查询、取值查询、SQL 验证与执行)
配置位置:conf/app_config.yaml → db_meta / db_dw 节
db_meta:
host: rm-xxxxx.mysql.rds.aliyuncs.com
port: 3306
user: test1
password: xxxxx
database: smart-data-query-meta
db_dw:
host: rm-xxxxx.mysql.rds.aliyuncs.com
port: 3306
user: test1
password: xxxxx
database: edu_core
3.6 LangGraph —— Agent 工作流编排引擎
作用:LangGraph 是 LangChain 生态中的有状态图执行引擎,本项目用它来编排 NL2SQL 的完整 Agent 工作流。
核心概念:
| 概念 | 在本项目中的实现 |
|---|---|
| State(状态) | DataAgentState —— 一个 TypedDict,各节点通过读写 State 传递数据 |
| Context(上下文) | DataAgentContext —— 注入外部依赖(数据库连接、向量客户端等) |
| Node(节点) | 12 个异步函数,每个处理一个子任务 |
| Edge(边) | 定义节点间的执行顺序和并行关系 |
| Conditional Edge | validate_sql 后的条件分支(成功→执行,失败→纠正) |
| stream_writer | 各节点通过它向前端推送 SSE 进度消息 |
State 定义(app/agent/state.py):
class DataAgentState(TypedDict):
query: str # 用户原始问题
keywords: Annotated[list, operator.add] # 提取的关键词(可追加)
retrieved_columns: list[ColumnInfo] # 召回的字段列表
retrieved_values: list[ValueInfo] # 召回的字段取值
retrieved_metrics: list[MetricInfo] # 召回的指标列表
table_infos: list[TableInfoState] # 合并后的表结构信息
metric_infos: list[MetricInfoState] # 合并后的指标信息
date_info: DateInfoState # 当前日期上下文
db_info: dict # 数据库版本信息
sql: str # 生成的 SQL
error: Optional[str] # SQL 验证错误信息
Context 定义(app/agent/context.py):
class DataAgentContext(TypedDict):
embedding_client: HuggingFaceEndpointEmbeddings
column_qdrant_repository: ColumnQdrantRepository
value_es_repository: ValueESRepository
metric_qdrant_repository: MetricQdrantRepository
meta_mysql_repository: MetaMySQLRepository
dw_mysql_repository: DWMySQLRepository
3.7 LangChain —— LLM 调用与链式编排
作用:提供统一的 LLM 调用接口和 Prompt → LLM → OutputParser 的链式编排。
调用模式:
# 典型的 LangChain 链
prompt = PromptTemplate(template=load_prompt("xxx"), input_variables=[...])
output_parser = JsonOutputParser() # 或 StrOutputParser
chain = prompt | llm | output_parser
result = await chain.ainvoke({...})
组件说明:
| 组件 | 说明 |
|---|---|
PromptTemplate |
加载 .prompt 文件,填充变量后构造提示词 |
JsonOutputParser |
将 LLM 输出解析为 JSON(用于关键词提取、过滤等) |
StrOutputParser |
将 LLM 输出保留为字符串(用于 SQL 生成) |
init_chat_model |
统一的模型初始化函数,支持 openai/anthropic 等多种 provider |
3.8 OmegaConf —— 结构化配置管理
作用:将 YAML 配置文件与 Python dataclass 绑定,提供类型安全的配置访问。
工作原理:
# app/conf/app_config.py
config_file = Path(__file__).parents[2] / 'conf' / 'app_config.yaml'
context = OmegaConf.load(config_file) # 加载 YAML
schema = OmegaConf.structured(AppConfig) # 创建 schema
app_config: AppConfig = OmegaConf.to_object( # 合并为带类型的对象
OmegaConf.merge(schema, context)
)
配置文件:
| 文件 | 作用 |
|---|---|
conf/app_config.yaml |
应用运行配置(数据库、中间件连接信息、日志配置) |
conf/meta_config.yaml |
元数据配置(47 张表、330 个字段、18 个指标的定义) |
3.9 Loguru —— 结构化日志
作用:提供结构化日志输出,支持 request_id 链路追踪,方便在并发请求中追踪单个查询的完整执行链路。
日志格式:
2026-04-09 10:30:15.123 | INFO | request_id - abc123 | module:function:42 - 消息内容
特色机制:通过 contextvars.ContextVar 实现 request_id 注入——FastAPI 中间件在每个请求开始时生成 UUID,所有后续日志自动携带该 ID。
# app/core/context.py
request_id_ctx_var = ContextVar("request_id", default="1")
# main.py 中间件
@app.middleware("http")
async def add_process_time_header(request, call_next):
request_id_ctx_var.set(uuid.uuid4())
return await call_next(request)
4. 元数据知识库构建流程
在 Agent 查询之前,需要先运行 build_meta_knowledge.py 脚本构建元数据知识库。该过程将 meta_config.yaml 中定义的表/字段/指标信息同步到三个存储中间件。
4.1 构建流程图
meta_config.yaml
│
▼
┌─────────────────┐
│ 1. 清空旧数据 │ DELETE FROM column_metric/column_info/metric_info/table_info
└────────┬────────┘
▼
┌─────────────────┐ ┌──────────────┐
│ 2. 保存表/字段 │────→│ MySQL Meta │ table_info + column_info
│ 到 Meta 库 │ └──────────────┘
└────────┬────────┘
▼
┌─────────────────┐ ┌──────────────┐
│ 3. 字段信息 │────→│ Qdrant │ column collection
│ 建立向量索引 │ │ (向量检索) │
└────────┬────────┘ └──────────────┘
▼
┌─────────────────┐ ┌──────────────┐
│ 4. 字段取值 │────→│Elasticsearch │ value index
│ 建立全文索引 │ │ (全文检索) │
└────────┬────────┘ └──────────────┘
▼
┌─────────────────┐ ┌──────────────┐
│ 5. 保存指标信息 │────→│ MySQL Meta │ metric_info + column_metric
│ 到 Meta 库 │ └──────────────┘
└────────┬────────┘
▼
┌─────────────────┐ ┌──────────────┐
│ 6. 指标信息 │────→│ Qdrant │ metric collection
│ 建立向量索引 │ │ (向量检索) │
└────────┬────────┘ └──────────────┘
▼
构建完成
4.2 各步骤详解
步骤 1 — 清空旧数据
按外键依赖顺序执行 DELETE:column_metric → column_info → metric_info → table_info,确保可重复执行。
步骤 2 — 保存表和字段到 Meta 库
遍历 meta_config.yaml 中所有表定义,为每个字段从数据仓库获取真实的列类型(SHOW COLUMNS)和样本值(SELECT DISTINCT ... LIMIT 10),构造 TableInfo 和 ColumnInfo 实体后批量写入 Meta 库。
步骤 3 — 字段信息建立向量索引
对每个字段,将其名称、描述、所有别名分别向量化(调用 TEI 服务),每个向量作为一个 Point 写入 Qdrant 的 column collection。Payload 中携带完整的 ColumnInfo。这意味着一个字段会产生 2 + N 个向量点(名称 + 描述 + N 个别名)。
步骤 4 — 字段取值建立全文索引
遍历所有标记 sync: true 的字段,从数据仓库查询其全部取值(最多 100000 条),构造 ValueInfo 后批量写入 ES 索引。ES 使用 ik_max_word 中文分析器进行分词索引。
步骤 5 — 保存指标信息到 Meta 库
将 18 个业务指标定义写入 metric_info 表,同时将指标与字段的关联关系写入 column_metric 表。
步骤 6 — 指标信息建立向量索引
与字段向量化类似,将每个指标的名称、描述、所有别名分别向量化后写入 Qdrant 的 metric collection。
5. Agent 节点流转详解
5.1 完整流转图
START
│
▼
┌─────────────────┐
│ extract_keywords │ LLM 提取关键词
└────────┬────────┘
│
┌───────────┼───────────┐
▼ ▼ ▼ ← 三路并行
┌──────────┐ ┌──────────┐ ┌──────────┐
│recall_col│ │recall_val│ │recall_met│
│ (Qdrant)│ │ (ES) │ │ (Qdrant)│
└────┬─────┘ └────┬─────┘ └────┬─────┘
└─────────────┼─────────────┘
▼
┌─────────────────────┐
│ merge_retrieved_info │ 合并+补全+分组
└──────────┬──────────┘
│
┌────────┴────────┐
▼ ▼ ← 两路并行
┌──────────────┐ ┌──────────────┐
│ filter_table │ │ filter_metric│
│ (LLM) │ │ (LLM) │
└───────┬──────┘ └───────┬──────┘
└────────┬────────┘
▼
┌─────────────────────┐
│ add_extra_context │ 添加日期+DB信息
└──────────┬──────────┘
▼
┌─────────────────────┐
│ generate_sql │ LLM 生成 SQL
└──────────┬──────────┘
▼
┌─────────────────────┐
│ validate_sql │ EXPLAIN 验证
└──────────┬──────────┘
│
┌──────┴──────┐
│ error? │
┌───┘ └───┐
▼ (无错误) ▼ (有错误)
┌──────────────┐ ┌──────────────┐
│ execute_sql │ │ correct_sql │ LLM 纠正 SQL
└──────────────┘ └──────┬───────┘
▲ │
└───────────────────┘
│
END
5.2 各节点详解
节点 1:extract_keywords(提取关键词)
| 属性 | 说明 |
|---|---|
| 输入 | state.query(用户原始问题) |
| 输出 | state.keywords(关键词列表) |
| 依赖 | LLM |
| 作用 | 从自然语言问题中提取核心关键词,作为后续三路召回的检索种子 |
工作原理:将用户问题发送给 LLM,要求其提取关键词并以 JSON 数组返回。例如,“统计各学院的学生人数” → ["学院", "学生", "人数"]。
节点 2/3/4:recall_column / recall_value / recall_metric(三路并行召回)
这三个节点并行执行(LangGraph 的 fan-out 机制),从不同存储中间件中召回候选信息。
recall_column(字段召回 — Qdrant 向量检索)
| 属性 | 说明 |
|---|---|
| 输入 | state.query, state.keywords |
| 输出 | state.retrieved_columns |
| 依赖 | LLM, Embedding, Qdrant |
| 作用 | 通过语义相似度从向量数据库中召回相关字段 |
工作流程:
- 调用 LLM 扩展关键词(Prompt:
extend_keywords_for_column_recall.prompt) - 将扩展后的关键词逐一向量化
- 在 Qdrant column collection 中进行向量检索(cosine similarity ≥ 0.6,top 5)
- 对检索结果按 column_id 去重,返回
ColumnInfo列表
recall_value(字段取值召回 — ES 全文检索)
| 属性 | 说明 |
|---|---|
| 输入 | state.query, state.keywords |
| 输出 | state.retrieved_values |
| 依赖 | LLM, Elasticsearch |
| 作用 | 通过中文分词匹配从 ES 中召回相关字段取值 |
工作流程:
- 调用 LLM 扩展关键词(Prompt:
extend_keywords_for_value_recall.prompt),识别实体名称如"中医学院""休学"等 - 将扩展后的关键词逐一在 ES 中全文检索
- 对检索结果按 value_id 去重,返回
ValueInfo列表
recall_metric(指标召回 — Qdrant 向量检索)
| 属性 | 说明 |
|---|---|
| 输入 | state.query, state.keywords |
| 输出 | state.retrieved_metrics |
| 依赖 | LLM, Embedding, Qdrant |
| 作用 | 通过语义相似度从向量数据库中召回相关业务指标 |
工作流程:
- 调用 LLM 扩展关键词(Prompt:
extend_keywords_for_metric_recall.prompt),生成指标概念短语如"平均绩点"“GPA均值” - 将扩展后的关键词逐一向量化
- 在 Qdrant metric collection 中进行向量检索
- 对检索结果按 metric_id 去重,返回
MetricInfo列表
节点 5:merge_retrieved_info(合并召回信息)
| 属性 | 说明 |
|---|---|
| 输入 | state.retrieved_columns, state.retrieved_values, state.retrieved_metrics |
| 输出 | state.table_infos, state.metric_infos |
| 依赖 | MySQL Meta |
| 作用 | 将三路召回结果合并为统一的表结构视图,补充缺失信息 |
工作流程:
- 指标关联字段补全:遍历召回的指标,将其
relevant_columns中引用的字段补充到字段列表 - 字段取值合并:将 ES 召回的字段取值合并到对应字段的
examples中 - 按表分组:将所有字段按
table_id分组,形成表→字段列表的映射 - 主外键补全:为每个涉及的表查询其主键和外键字段,确保后续 JOIN 可用
- 构造输出:将表信息和指标信息封装为
TableInfoState和MetricInfoState
这一步的关键价值在于:将分散在三个中间件中的召回结果,整合为一个完整且自洽的候选 Schema 视图。
节点 6/7:filter_table / filter_metric(LLM 精确过滤,两路并行)
filter_table(过滤表和字段)
| 属性 | 说明 |
|---|---|
| 输入 | state.query, state.table_infos |
| 输出 | state.table_infos(裁剪后) |
| 依赖 | LLM |
| 作用 | 从候选表和字段中裁剪出回答该问题所必需的最小集合 |
Prompt 核心规则(filter_table_info.prompt):
- 只能从候选中选择,禁止新增
- 字段以"是否在本次查询中被实际使用"为唯一标准
- 多表关联时必须包含 JOIN 所需的主外键
- 查询结果必须包含人类可读的名称字段(规则 6):若按实体分组,必须包含名称字段而非仅返回 ID;若名称在维度表中,必须选中维度表
LLM 输出格式:{"表名1": ["字段1", "字段2"], "表名2": [...]}
filter_metric(过滤指标)
| 属性 | 说明 |
|---|---|
| 输入 | state.query, state.metric_infos |
| 输出 | state.metric_infos(裁剪后) |
| 依赖 | LLM |
| 作用 | 从候选指标中筛选出回答该问题所必需的指标 |
Prompt 核心规则(filter_metric_info.prompt):
- 以"是否在本次问题中被实际用于度量或统计"为唯一标准
- 仅用于筛选/分组/限定范围的字段不视为指标
- 不涉及任何度量或统计时返回空数组
[]
节点 8:add_extra_context(添加额外上下文)
| 属性 | 说明 |
|---|---|
| 输入 | — |
| 输出 | state.date_info, state.db_info |
| 依赖 | MySQL DW |
| 作用 | 为 SQL 生成提供时间上下文和数据库环境信息 |
获取内容:
| 字段 | 来源 | 示例值 |
|---|---|---|
date |
datetime.today() |
2026-04-09 |
weekday |
strftime("%A") |
Wednesday |
quarter |
计算公式 | Q2 |
version |
SELECT VERSION() |
8.0.28 |
dialect |
SQLAlchemy engine | mysql |
这些信息帮助 LLM 在生成 SQL 时正确处理"去年""本学期"等时间表达。
节点 9:generate_sql(SQL 生成)
| 属性 | 说明 |
|---|---|
| 输入 | state.query, state.table_infos, state.metric_infos, state.date_info, state.db_info |
| 输出 | state.sql |
| 依赖 | LLM |
| 作用 | 根据完整上下文生成最终 SQL |
Prompt 输入(generate_sql.prompt):
- 用户问题
- 过滤后的表结构信息(YAML 格式)
- 过滤后的指标信息(YAML 格式)
- 日期上下文
- 数据库环境信息
LLM 综合所有上下文信息,生成一条可直接执行的 SQL 语句。
节点 10:validate_sql(SQL 验证)
| 属性 | 说明 |
|---|---|
| 输入 | state.sql |
| 输出 | state.error(None 表示通过,否则为错误信息) |
| 依赖 | MySQL DW |
| 作用 | 通过 EXPLAIN 验证 SQL 语法正确性 |
工作原理:在数据仓库上执行 EXPLAIN {sql}。如果 SQL 语法有误(表不存在、字段拼写错误、语法错误等),EXPLAIN 会抛出异常,节点将错误信息写入 state.error。
节点 11:correct_sql(SQL 纠正)— 条件触发
| 属性 | 说明 |
|---|---|
| 触发条件 | state.error is not None(validate_sql 失败时) |
| 输入 | state.sql, state.error, 以及完整上下文 |
| 输出 | state.sql(纠正后的 SQL) |
| 依赖 | LLM |
| 作用 | 将失败的 SQL 和错误信息交给 LLM 修正 |
Prompt 输入(correct_sql.prompt):包含原始 SQL、错误信息、用户问题、表/指标信息、日期/DB 信息,要求 LLM 修正后重新输出。
纠正后的 SQL 将直接进入 execute_sql(不再二次验证),这是一次重试机制。
节点 12:execute_sql(执行 SQL)
| 属性 | 说明 |
|---|---|
| 输入 | state.sql |
| 输出 | SSE 事件(结果数据流式推送到前端) |
| 依赖 | MySQL DW |
| 作用 | 在数据仓库上执行最终 SQL,返回查询结果 |
工作原理:
- 执行 SQL 语句
- 将结果序列化为字典列表(处理 Decimal→float、datetime→str 等类型转换)
- 通过
stream_writer推送{"type": "result", "data": [...]}事件
5.3 条件分支逻辑
graph_builder.add_conditional_edges(
"validate_sql",
lambda state: "execute_sql" if state["error"] is None else "correct_sql",
{"execute_sql": "execute_sql", "correct_sql": "correct_sql"}
)
- 验证通过(error 为 None)→ 直接执行 SQL
- 验证失败(error 非 None)→ 进入 correct_sql 节点由 LLM 纠正后再执行
6. API 层设计
6.1 应用生命周期(Lifespan)
# app/core/lifespan.py
@asynccontextmanager
async def lifespan(app: FastAPI):
# 启动:初始化所有客户端连接
embedding_client_manager.init() # TEI 向量服务
qdrant_client_manager.init() # Qdrant 连接
es_client_manager.init() # ES 连接
meta_mysql_client_manager.init() # Meta 库连接池
dw_mysql_client_manager.init() # DW 库连接池
yield
# 关闭:释放所有连接
await qdrant_client_manager.close()
await es_client_manager.close()
await meta_mysql_client_manager.close()
await dw_mysql_client_manager.close()
6.2 依赖注入链
FastAPI 的 Depends 机制实现了从连接池到 Service 的完整依赖注入链:
get_meta_session ─────────→ get_meta_mysql_repository ──┐
get_dw_session ───────────→ get_dw_mysql_repository ────┤
get_embedding_client ───────────────────────────────────┤
get_column_qdrant_repository ───────────────────────────┼→ get_query_service
get_value_es_repository ────────────────────────────────┤
get_metric_qdrant_repository ───────────────────────────┘
6.3 API 接口
| 方法 | 路径 | 请求体 | 响应 |
|---|---|---|---|
| POST | /api/query |
{"query": "统计各学院的学生人数"} |
SSE 流(text/event-stream) |
SSE 事件格式:
# 进度事件(各节点执行状态)
data: {"type": "progress", "step": "提取关键词", "status": "running"}
data: {"type": "progress", "step": "提取关键词", "status": "success"}
data: {"type": "progress", "step": "召回字段", "status": "running"}
...
# 结果事件
data: {"type": "result", "data": [{"college_name": "中医学院", "student_count": 1200}, ...]}
# 错误事件
data: {"type": "error", "message": "执行SQL失败: ..."}
6.4 请求处理流程
HTTP POST /api/query
│
▼
QueryRouter 接收请求
│
▼
FastAPI Depends 注入 QueryService
(创建 DB Session、Repository 实例)
│
▼
QueryService.query() 启动 LangGraph Agent
│
▼
graph.astream(stream_mode="custom") 流式执行
│
▼
各节点通过 stream_writer 推送进度/结果
│
▼
StreamingResponse 以 SSE 格式返回
7. 提示词工程
系统共使用 7 份提示词文件,位于 prompts/ 目录:
| 文件 | 使用节点 | 作用 |
|---|---|---|
extend_keywords_for_column_recall.prompt |
recall_column | 扩展字段召回关键词(仅输出字段级语义) |
extend_keywords_for_metric_recall.prompt |
recall_metric | 扩展指标召回关键词(指标概念短语) |
extend_keywords_for_value_recall.prompt |
recall_value | 扩展字段取值关键词(实体名、枚举值等) |
filter_table_info.prompt |
filter_table | 从候选表中裁剪必需表和字段 |
filter_metric_info.prompt |
filter_metric | 从候选指标中筛选必需指标 |
generate_sql.prompt |
generate_sql | 根据完整上下文生成 SQL |
correct_sql.prompt |
correct_sql | 根据错误信息纠正 SQL |
每份提示词都经过针对学校数据中台场景的定制优化,包含明确的角色定义、任务描述、规则约束、禁止事项和输出格式要求。
8. 数据流全链路示例
以用户问题 “统计各学院的学生人数” 为例,展示完整的数据流转过程:
步骤 1:extract_keywords
输入: "统计各学院的学生人数"
LLM 输出: ["学院", "学生", "人数", "统计"]
步骤 2:三路并行召回
recall_column(Qdrant 向量检索):
LLM 扩展关键词: ["学院名称", "学院", "学生学号", "学生人数"]
向量检索结果: base_college.college_name, base_college.college_id,
std_student.college_id, std_student.student_id, ...
recall_value(ES 全文检索):
LLM 扩展关键词: ["学院"]
ES 检索结果: "中医学院", "针灸推拿学院", "药学院", ...
recall_metric(Qdrant 向量检索):
LLM 扩展关键词: ["学生人数", "学生总数", "在校生人数"]
向量检索结果: TOTAL_STUDENTS
步骤 3:merge_retrieved_info
合并后的表结构:
- base_college: [college_id(PK), college_name]
- std_student: [student_id(PK), college_id(FK), name, ...]
指标: [TOTAL_STUDENTS]
步骤 4:filter_table + filter_metric(并行)
filter_table LLM 输出:
{
"base_college": ["college_id", "college_name"],
"std_student": ["student_id", "college_id"]
}
filter_metric LLM 输出:
["TOTAL_STUDENTS"]
步骤 5:add_extra_context
date_info: {date: "2026-04-09", weekday: "Wednesday", quarter: "Q2"}
db_info: {version: "8.0.28", dialect: "mysql"}
步骤 6:generate_sql
SELECT bc.college_name AS 学院名称, COUNT(ss.student_id) AS 学生人数
FROM std_student ss
JOIN base_college bc ON ss.college_id = bc.college_id
GROUP BY bc.college_id, bc.college_name
ORDER BY 学生人数 DESC
步骤 7:validate_sql
EXPLAIN SELECT bc.college_name ... → 通过 ✓
error = None
步骤 8:execute_sql
[
{"学院名称": "中医学院", "学生人数": 1200},
{"学院名称": "针灸推拿学院", "学生人数": 980},
{"学院名称": "药学院", "学生人数": 860},
...
]
前端实时展示为表格。
更多推荐


所有评论(0)