接上篇。

graphrag-query

graphrag-query 包含 Microsoft GraphRAG 和Neo4j GraphRAG 的混合检索,根据用户的问题,选择合适本地知识库获取实时的结果。

和 additional-query 节点一样, graphrag-query 节点也包含一个安全护栏,用于判断用户的问题是否属于经营范围。如果属于经营范围,则继续执行后续的逻辑,否则直接结束当前会话。因为用户如果最初提问的问题类似:请问我的订单xxxx现在送到哪里了,这种问题包含了详细的信息,所以不会进入到 additional-query 去追问补充的信息。这部分的处理逻辑和 additional query 节点是一样的,这里就不再赘述了。


接下来,当用户的问题被判断为属于经营范围后,会实际的去查询对应的工具,比如不同的知识库,但在执行实际的查询之前,我们需要先通过任务分解组件去拆分用户输入进来的问题,因为用户的问题可能包含多个子任务,比如:请问你家的冰箱和电视都有哪些型号,这明显是两个任务,分别为:冰箱的型号和电视的型号。除此以外,对于多跳的查询,也需要通过任务分解组件去拆分。


在具体的实现上,任务分解组件会根据用户的问题,生成一个任务列表,然后根据任务列表中的每一个子任务的性质,选择合适的工具去执行。

PLANNER_SYSTEM_PROMPT = """
你是一个电商平台智能客服系统中的任务规划组件。
你的职责是分析用户的查询,并将其拆分为独立的子任务。
请遵循以下规则:
1. 如果问题可以拆分为多个独立子任务,返回这些子任务列表
2. 如果问题很简单,不需要拆分,则返回一个只包含原问题的列表
3. 子任务之间应该是独立的,不要相互依赖
4. 确保子任务不会返回重复或相似的信息
5. 将相互依赖的任务合并为一个问题
6. 将返回相同信息的任务合并为一个问题
示例:
- 问题:"有哪些饮料类产品?它们的价格是多少?"
子任务:["有哪些饮料类产品?", "饮料类产品的价格是多少?"]
- 问题:"谁是处理了订单10248的员工?这个订单发往哪里?"
子任务:["谁是处理了订单10248的员工?", "订单10248的配送信息是什么?"]
- 问题:"供应商Exotic Liquids提供了哪些产品?这些产品的库存情况如何?"
子任务:["供应商Exotic Liquids提供了哪些产品?", "Exotic Liquids供应的产品库存情况如
何?"]
- 问题:"订单10248包含哪些产品?这些产品是哪家供应商提供的?"
子任务:["订单10248包含哪些产品?", "订单10248中的产品分别由哪些供应商提供?"]
- 问题:"茶叶产品的使用说明手册在哪里可以找到?"
子任务:["茶叶产品的使用说明手册在哪里?"]
- 问题:"客户对巧克力产品的评价如何?"
子任务:["客户对巧克力产品的评价?"]
"""
对拆分出的每一个子任务,替换后续流程中工具调用时接收到的 query 参数,即:让接下来的工
具调用,执行的查询语句,是根据子任务生成的。伪代码如下:
planner_task_decomposition = {
"next_action": next_action,
"tasks": planner_output.tasks
or [
Task(
question=state.get("question", ""),
parent_task=state.get("question", ""),
     )
   ]
}

有了子任务后,接下来就是根据子任务的性质,选择合适的工具去执行。因此我们需要用到LangGraph 的条件边,在条件边中,定义都有哪些子任务,以及每个子任务都是做什么的。

async def tool_selection(
state: ToolSelectionInputState,
) -> Command[Literal["cypher_query", "predefined_cypher",
"customer_tools"]]:
"""
Choose the appropriate tool for the given task.
"""
# 调用工具选择链,生成针对每个任务要调用的工具名称和参数
tool_selection_output: BaseModel = await
tool_selection_chain.ainvoke(
{"question": state.get("question", "")}
)
# 根据路由到对应的工具节点
if tool_selection_output is not None:
tool_name: str =
tool_selection_output.model_json_schema().get("title", "")
tool_args: Dict[str, Any] = tool_selection_output.model_dump()
if tool_name == "predefined_cypher":
return Command(
goto=Send(
"predefined_cypher",
{
"task": state.get("question", ""),
"query_name": tool_name,
"query_parameters": tool_args,
"steps": ["tool_selection"],
},
)
)
elif tool_name == "cypher_query":
return Command(
goto=Send(
"cypher_query",
{
"task": state.get("question", ""),
"query_name": tool_name,
"query_parameters": tool_args,
"steps": ["tool_selection"],
},
)
)
else:
return Command(
goto=Send(
"customer_tools",
{
"task": state.get("question", ""),
"query_name": tool_name,
"query_parameters": tool_args,
"steps": ["tool_selection"],
},
                )
)

这里面用到的 Send 是 LangGraph 中的一个 API ,主要是通过 Map-Reduce 对子任务执行并行的处理,然后汇总所有已完成的子任务结果。因为 Nodes 和 Edges 都是预先定义的,并且基于相同的共享状态进行操作。但对这种动态的执行过程,我们不知道会切分出多少个子任务,也不知道每个子任务会使用哪个工具,所以如果想要多个子任务都能独立应用全局的共享状态,就需要使用 Map-Reduce 的执行方式。 

这一步里结构化数据走的是Neo4j Text2Cypher。

核心是将用户的自然语言问题先转化为 Cypher 查询语句,然后再去实际的 Neo4j 数据库中执行查询语句,最后将查询结果返回给用户。

关于如何借助大模型根据用户的问题生成较为准确的 Cypher 查询语句,这里采用了两种方法,其一是预构建 Cypher 字典,其二是直接借助大模型生成 Cypher 查询语句。

预构建 Cypher 字典的本质是根据自有数据的情况,人工定义出一些可以正常运行,且能正确返回结果的 Cypher 查询语句,这类查询语句可以基于业务类型进行分类,也可以基于数据类型进行分类。

另外一种借助大模型自动化生成 Cypher 查询语句的方法,需要给到大模型的核心信息就是 Neo4j 的 Schema 信息。

上线初期,Cypher字典是不完善的,还是要从业务日志中进行数据抽取,同时存储到向量数据库中,通过向量检索进行 Cypher 字典的长期更新和维护。

实际在跑的时候发现,无论是采用哪种生成 Cypher 语句的方案,其最终生成的 Cypher 语句都可能存在问题,因此做了一个包含校验-自我纠正-执行-反馈的闭环。

Text2Cypher 是通过大模型生成 Cypher 语句的, 但是发现大模型生成的 Cypher 语句耗时比较长,所以后来预构建了一个 Cypher 工具节点,这个预构建的 Cypher 工具节点和 Text2Cypher 的 Cypher 字典不同, Text2Cypher 的 Cypher 字典是用来作为 Few-shot 的示例填充到提示模版中,从而引导大模型生成正确的 Cypher 语句,而预构建的 Cypher 工具节点是用来直接获取到对应的 Cypher 语句进行执行,它作为工具节点来使用。

图像识别的话就需要接入视觉大模型,把图片里的文字识别出来转换成自然语言,然后进行后续问答,这个用一个LangGraph节点就可以快速实现。代码还是付一下吧。

payload = {
"model": vision_model,
"messages": [
{
"role": "system",
"content": "你是一个专业的图像分析助手。请详细分析图片中的内容,特别关注产品细节、品牌、型号等信息。"
},
{
"role": "user",
"content": [
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{image_data}"
}
}
]
}
],
"max_tokens": 4000,
"temperature": 0.7
}
# 发送API请求
async with aiohttp.ClientSession() as session:
async with session.post(
f"{base_url}/chat/completions",
headers=headers,
json=payload,
timeout=60 # 增加超时时间
) as response:
if response.status == 200:
result = await response.json()
image_description = result["choices"][0]["message"]
["content"]
logger.info(f"Successfully processed image and generated
description")
# 使用图片描述和用户问题生成最终回复
# 从lg_prompts导入电商客服模板
# 构建回复请求
if settings.AGENT_SERVICE == ServiceType.DEEPSEEK:
model = ChatDeepSeek(api_key=settings.DEEPSEEK_API_KEY,
model_name=settings.DEEPSEEK_MODEL, temperature=0.7, tags=["image_query"])
else:
model = ChatOllama(model=settings.OLLAMA_AGENT_MODEL,
base_url=settings.OLLAMA_BASE_URL, temperature=0.7, tags=["image_query"])
# 使用专门的图片查询提示模板
system_prompt = GET_IMAGE_SYSTEM_PROMPT.format(
image_description=image_description
)
messages = [{"role": "system", "content": system_prompt}] +
state.messages
response = await model.ainvoke(messages)
return {"messages": [response]}
else:
error_text = await response.text()
logger.error(f"Vision API Request Failed: {response.status}
- {error_text}")
return {"messages": [AIMessage(content=f"抱歉,我无法查看这张图片,请重新上传。")]}


 

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐