智慧出行助手项目(agent)
本文介绍了一个基于Agent2Agent架构的智能旅行助手系统,该系统整合了天气查询、票务查询与预订、景点推荐等功能。系统采用模块化设计,包括数据获取、数据库管理、MCP工具开发和Agent服务实现等核心模块。天气数据通过和风API获取并存储到MySQL数据库,票务信息则直接查询数据库。系统通过意图识别Agent分析用户查询,并路由到相应的专业Agent处理任务。各Agent使用LLM+Promp
第一章项目架构
1.1重要概念
数据是怎么获取的?
使用和风天气api,发送requestion请求拉去城市数据
数据怎么更新
使用on duplicate key updata 更新数据库数据
1.1.1查询改写
根据上下文,结合用户新的问题改写问题获取答案
举个例子:
用户:今天天气怎么样
system:今天天气....
用户:那明天呢?
system需要根据历史对话结合用户的新问题改写查询为明天的天气怎么样
铜锅列表保存上下文.保存五轮(每轮是用户query和agent的answer)保存在内存中,每轮对话通过session_id区分.
也可以使用异步的方法把历史对话保存到mysql数据库中实现持久化.使用异步的方法是为了不阻塞对话交互
1.1.2task执行流程
拿天气查询agent举例子:
意图识别agent把task传给天气查询agent,天气查询agent会解析task,
天气查询agent检索自己有哪些工具分别是干什么的,然后知道调用这个工具需要哪些参数
我们使用爬虫在天气网里爬取数据到mysql数据库里面,然后封装查询工具,
用户查询数据的时候agent把用户query封装成sql,然后调用工具就可以获取数据
原理如下:

1.2项目架构

优化输出agent:优化agent输出,提高用户体验
如果超出我们的服务范围:我们使用LLM提示用户超出了服务范围
虚线指的是如果用户输入信息缺失,Agent返回信息给用户提示补充信息
1.3代码架构

解释一下单个流程:
用户输入query,意图分析agent把task传给调用的agent
agent把用户的query转换为sql
mcp根据sql在数据库里面检索,返回列表
mcp把列表封装成json返回给agent.
第二章数据库初始化
2.1为什么把数据存到mysql数据库里查询
直接调用api查询开销比较大,而且受网络影响大,会导致查询失败.我们把天气信息存到数据库中可以解决这个问题
票务系统里面存的是:火车票,机票,演唱会票(实际业务中票务信息不会存到mysql里面,因为票务信息更新很快,还是建议使用api获取票务信息)
2.2数据库数据插入和更新
2.2.1创建数据库
DROP DATABASE IF EXISTS travel_rag;
CREATE DATABASE travel_rag CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
USE travel_rag;
2.2.2创建表
CREATE TABLE train_tickets (
id INT AUTO_INCREMENT PRIMARY KEY COMMENT '主键,自增,唯一标识每条记录',
departure_city VARCHAR(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '出发城市(如“北京”)',
arrival_city VARCHAR(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '到达城市(如“上海”)',
departure_time DATETIME NOT NULL COMMENT '出发时间(如“2025-08-12 07:00:00”)',
arrival_time DATETIME NOT NULL COMMENT '到达时间(如“2025-08-12 11:30:00”)',
train_number VARCHAR(20) NOT NULL COMMENT '火车车次(如“G1001”)',
seat_type VARCHAR(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '座位类型(如“二等座”)',
total_seats INT NOT NULL COMMENT '总座位数(如 1000)',
remaining_seats INT NOT NULL COMMENT '剩余座位数(如 50)',
price DECIMAL(10, 2) NOT NULL COMMENT '票价(如 553.50)',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间,自动记录插入时间',
UNIQUE KEY unique_train (departure_time, train_number) -- 唯一约束,确保同一时间和车次不重复
) COMMENT='火车票信息表';
CREATE TABLE flight_tickets (
id INT AUTO_INCREMENT PRIMARY KEY COMMENT '主键,自增,唯一标识每条记录',
departure_city VARCHAR(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '出发城市(如“北京”)',
arrival_city VARCHAR(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '到达城市(如“上海”)',
departure_time DATETIME NOT NULL COMMENT '出发时间(如“2025-08-12 08:00:00”)',
arrival_time DATETIME NOT NULL COMMENT '到达时间(如“2025-08-12 10:30:00”)',
flight_number VARCHAR(20) NOT NULL COMMENT '航班号(如“CA1234”)',
cabin_type VARCHAR(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '舱位类型(如“经济舱”)',
total_seats INT NOT NULL COMMENT '总座位数(如 200)',
remaining_seats INT NOT NULL COMMENT '剩余座位数(如 10)',
price DECIMAL(10, 2) NOT NULL COMMENT '票价(如 1200.00)',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间,自动记录插入时间',
UNIQUE KEY unique_flight (departure_time, flight_number) -- 唯一约束,确保同一时间和航班号不重复
) COMMENT='航班机票信息表';
CREATE TABLE concert_tickets (
id INT AUTO_INCREMENT PRIMARY KEY, -- 主键,自增,唯一标识每条记录
artist VARCHAR(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL, -- 艺人名称(如“周杰伦”)
city VARCHAR(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL, -- 举办城市(如“上海”)
venue VARCHAR(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL, -- 场馆(如“上海体育场”)
start_time DATETIME NOT NULL, -- 开始时间(如“2025-08-12 19:00:00”)
end_time DATETIME NOT NULL, -- 结束时间(如“2025-08-12 22:00:00”)
ticket_type VARCHAR(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL, -- 票类型(如“VIP”)
total_seats INT NOT NULL, -- 总座位数(如 5000)
remaining_seats INT NOT NULL, -- 剩余座位数(如 100)
price DECIMAL(10, 2) NOT NULL, -- 票价(如 880.00)
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, -- 创建时间,自动记录插入时间
UNIQUE KEY unique_concert (start_time, artist, ticket_type) -- 唯一约束,确保同一时间、艺人和票类型不重复
);
2.2.3插入数据更新数据
DROP TABLE IF EXISTS weather_data;
CREATE TABLE IF NOT EXISTS weather_data (
id INT AUTO_INCREMENT PRIMARY KEY,
city VARCHAR(50) NOT NULL COMMENT '城市名称',
fx_date DATE NOT NULL COMMENT '预报日期',
sunrise TIME COMMENT '日出时间',
sunset TIME COMMENT '日落时间',
moonrise TIME COMMENT '月升时间',
moonset TIME COMMENT '月落时间',
moon_phase VARCHAR(20) COMMENT '月相名称',
moon_phase_icon VARCHAR(10) COMMENT '月相图标代码',
temp_max INT COMMENT '最高温度',
temp_min INT COMMENT '最低温度',
icon_day VARCHAR(10) COMMENT '白天天气图标代码',
text_day VARCHAR(20) COMMENT '白天天气描述',
icon_night VARCHAR(10) COMMENT '夜间天气图标代码',
text_night VARCHAR(20) COMMENT '夜间天气描述',
wind360_day INT COMMENT '白天风向360角度',
wind_dir_day VARCHAR(20) COMMENT '白天风向',
wind_scale_day VARCHAR(10) COMMENT '白天风力等级',
wind_speed_day INT COMMENT '白天风速 (km/h)',
wind360_night INT COMMENT '夜间风向360角度',
wind_dir_night VARCHAR(20) COMMENT '夜间风向',
wind_scale_night VARCHAR(10) COMMENT '夜间风力等级',
wind_speed_night INT COMMENT '夜间风速 (km/h)',
precip DECIMAL(5,1) COMMENT '降水量 (mm)',
uv_index INT COMMENT '紫外线指数',
humidity INT COMMENT '相对湿度 (%)',
pressure INT COMMENT '大气压强 (hPa)',
vis INT COMMENT '能见度 (km)',
cloud INT COMMENT '云量 (%)',
update_time DATETIME COMMENT '数据更新时间',
UNIQUE KEY unique_city_date (city, fx_date)
) ENGINE=INNODB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='天气数据表';
第三章天气查询实现
3.1回顾
- agent router基于agent network和LLM选择最合适的agent
- agent经过部署成为agent server,然后可以使用agent client调用agent server
- 实现异步的方法:await weather_client.send_task_async()
- 异步的好处:不会发生线程阻塞防止并发,返回的task对象里面包含信息更多(sessionid,id...)
- 复杂任务分解:prompt+LLM+stroutparser()实现
- 实现并行:使用异步的方法,但是不使用Await,这时候返回的是没有执行的异步对象,把这些对象封装在task列表中,是哦用asyncio.gather()实现并行
- 若果获取天气成功,把获取信息放到task_artifacts里面返回信息.如果没获取到信息,把信息放到task_status里面让用户重试或者提供更多信息.如果获取报错,那么把报错信息封装到task_status里面返回
3.2天气查询api实现
3.2.1信息来源
每一个小时从和风天气里面拉取最新的天气数据到mysql里面,直接从mysql里面获取最新的消息.
3.2.2为什么把数据存到mysql里面,不直接用api查询天气?
访问量大时,每次调用api开销会比较大,存到mysql数据库里面只需要调用一次api就够了
使用api调用由于网络原因可能延迟会比较高,存到mysql里面延迟会很低
3.2.3:gzip压缩
gzip压缩可以极大地减少网络流量,加快请求
3.2.4测试天气查询api
import requests
import mysql.connector
from datetime import datetime, timedelta
import schedule
import time
import json
import gzip
import pytz
from config import Config
conf = Config()
# 配置
API_KEY = conf.weather_api_key
city_codes = {
"北京": "101010100",
"上海": "101020100",
"广州": "101280101",
"深圳": "101280601"
}
BASE_URL = conf.weather_base_url
TZ = pytz.timezone('Asia/Shanghai') # 使用上海时区
# MySQL 配置
db_config = {
"host": conf.host,
"user": conf.user,
"password": conf.password,
"database": conf.database,
"charset": "utf8mb4"
}
def connect_db():
return mysql.connector.connect(**db_config)
#数据爬取与解析
def fetch_weather_data(city, location):
headers = {
"X-QW-Api-Key": API_KEY,
"Accept-Encoding": "gzip"
}
url = f"{BASE_URL}?location={location}"
try:
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
if response.headers.get('Content-Encoding') == 'gzip':
data = gzip.decompress(response.content).decode('utf-8')
else:
data = response.text
return json.loads(data)
except requests.RequestException as e:
print(f"请求 {city} 天气数据失败: {e}")
return None
except json.JSONDecodeError as e:
print(f"{city} JSON 解析错误: {e}, 响应内容: {response.text[:500]}...")
return None
except gzip.BadGzipFile:
print(f"{city} 数据未正确解压,尝试直接解析: {response.text[:500]}...")
return json.loads(response.text) if response.text else None
#验证代码
if __name__ == "__main__":
weather_data = fetch_weather_data("北京", city_codes["北京"])
print(weather_data)
print("解析成功!")
3.3更新数据库天气
3.3.1原理
遍历城市查询,获取城市最新天气时间,若和现在时间计算后相差超过一天,强制更新
查询到的天气信息封装在daily列表中
如何实现插入更新数据?
使用ON duplicate key update,若天气存在,则更新天气信息,若果不存在,就插入信息
import requests
import mysql.connector
from datetime import datetime, timedelta
import schedule
import time
import json
import gzip
import pytz
from config import Config
conf = Config()
# 配置
API_KEY =conf.weather_api_key
city_codes = {
"北京": "101010100",
"上海": "101020100",
"广州": "101280101",
"深圳": "101280601"
}
BASE_URL = conf.weather_base_url
TZ = pytz.timezone('Asia/Shanghai') # 使用上海时区
# MySQL 配置
db_config = {
"host": conf.host,
"user": conf.user,
"password": conf.password,
"database": conf.database,
"charset": "utf8mb4"
}
def connect_db():
return mysql.connector.connect(**db_config)
#数据爬取与解析
def fetch_weather_data(city, location):
headers = {
"X-QW-Api-Key": API_KEY,
"Accept-Encoding": "gzip"
}
url = f"{BASE_URL}?location={location}"
try:
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
if response.headers.get('Content-Encoding') == 'gzip':
data = gzip.decompress(response.content).decode('utf-8')
else:
data = response.text
return json.loads(data)
except requests.RequestException as e:
print(f"请求 {city} 天气数据失败: {e}")
return None
except json.JSONDecodeError as e:
print(f"{city} JSON 解析错误: {e}, 响应内容: {response.text[:500]}...")
return None
except gzip.BadGzipFile:
print(f"{city} 数据未正确解压,尝试直接解析: {response.text[:500]}...")
return json.loads(response.text) if response.text else None
def get_latest_update_time(cursor, city):
cursor.execute("SELECT MAX(update_time) FROM weather_data WHERE city = %s", (city,))
result = cursor.fetchone()
return result[0] if result[0] else None
def should_update_data(latest_time, force_update=False):
if force_update:
return True
if latest_time is None:
return True
current_time = datetime.now(TZ)
return (current_time - latest_time) > timedelta(days=1)
def store_weather_data(conn, cursor, city, data):
if not data or data.get("code") != "200":
print(f"{city} 数据无效,跳过存储。")
return
daily_data = data.get("daily", [])
print(f"正在处理 {city} 的数据...")
print("=============data==============")
print(data)
print("=============daily_data==============")
print(daily_data)
#ISO 通常特指 ISO 8601 标准
# 处理前 'updateTime': '2025-09-13T17:18+08:00' 处理后:update_time: 2025-09-13 17:18:00+08:06
update_time = datetime.fromisoformat(data.get("updateTime").replace("+08:00", "+08:00")).replace(tzinfo=TZ)
for day in daily_data:
fx_date = datetime.strptime(day["fxDate"], "%Y-%m-%d").date()
values = (
city, fx_date,
day.get("sunrise"), day.get("sunset"),
day.get("moonrise"), day.get("moonset"),
day.get("moonPhase"), day.get("moonPhaseIcon"),
day.get("tempMax"), day.get("tempMin"),
day.get("iconDay"), day.get("textDay"),
day.get("iconNight"), day.get("textNight"),
day.get("wind360Day"), day.get("windDirDay"), day.get("windScaleDay"), day.get("windSpeedDay"),
day.get("wind360Night"), day.get("windDirNight"), day.get("windScaleNight"), day.get("windSpeedNight"),
day.get("precip"), day.get("uvIndex"),
day.get("humidity"), day.get("pressure"),
day.get("vis"), day.get("cloud"),
update_time
)
insert_query = """
INSERT INTO weather_data (
city, fx_date, sunrise, sunset, moonrise, moonset, moon_phase, moon_phase_icon,
temp_max, temp_min, icon_day, text_day, icon_night, text_night,
wind360_day, wind_dir_day, wind_scale_day, wind_speed_day,
wind360_night, wind_dir_night, wind_scale_night, wind_speed_night,
precip, uv_index, humidity, pressure, vis, cloud, update_time
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
sunrise = VALUES(sunrise), sunset = VALUES(sunset), moonrise = VALUES(moonrise),
moonset = VALUES(moonset), moon_phase = VALUES(moon_phase), moon_phase_icon = VALUES(moon_phase_icon),
temp_max = VALUES(temp_max), temp_min = VALUES(temp_min), icon_day = VALUES(icon_day),
text_day = VALUES(text_day), icon_night = VALUES(icon_night), text_night = VALUES(text_night),
wind360_day = VALUES(wind360_day), wind_dir_day = VALUES(wind_dir_day), wind_scale_day = VALUES(wind_scale_day),
wind_speed_day = VALUES(wind_speed_day), wind360_night = VALUES(wind360_night),
wind_dir_night = VALUES(wind_dir_night), wind_scale_night = VALUES(wind_scale_night),
wind_speed_night = VALUES(wind_speed_night), precip = VALUES(precip), uv_index = VALUES(uv_index),
humidity = VALUES(humidity), pressure = VALUES(pressure), vis = VALUES(vis),
cloud = VALUES(cloud), update_time = VALUES(update_time)
"""
try:
cursor.execute(insert_query, values)
print(f"{city} {fx_date} 数据写入/更新成功: {day.get('textDay')}, 影响行数: {cursor.rowcount}")
except mysql.connector.Error as e:
print(f"{city} {fx_date} 数据库错误: {e}")
conn.commit()
print(f"{city} 事务提交完成。")
#验证代码
if __name__ == "__main__":
conn = connect_db()
cursor = conn.cursor()
data = fetch_weather_data("北京", "101010100")
store_weather_data(conn, cursor, "北京", data)
print("数据存储完成。")
3.3.2数据更新
我们设计凌晨一点更新数据库数据,使用schedule模块每一分钟自动检测一次是否到凌晨一点
def setup_scheduler():
# 北京时间 1:00 对应 PDT 前一天的 16:00(夏令时)
schedule.every().day.at("16:00").do(update_weather)
while True:
schedule.run_pending()
time.sleep(60)
3.3.3小结
数据导入小结:
通过爬取和风天气上的数据,将其存储到mysql数据库里面.后续基于这些数据完成天气查询的操作.具体的做法是:每天定时调度,去处理每个城市,获取最新的数据库天气时间,然后判断是否过期(超过一天),如果过期则拉取新的数据upsert到mysql中
注意:
更新周期为什么选择一天更新一次数据?
和风天气中获取的数据是每日天气预报.也可以获取每小时天气预报,这样子精度更准.
怎么判断间隔是否大于一天?
使用on duplicate key upsert模块每一分钟和mysql数据库最新时间计算一次是否大于一天
如何爬取数据?
我们项目使用api获取和风天气的数据
这些只是测试时期的答复
正式答复:
我们考虑到有些城市的数据查询较少.更新频繁消耗较大.我们优化为在用户查询的时后先判断时间是否大于一天,如果大于一天,去使用api拉去数据更新数据库返回给用户.如果小于一天,那么直接返回给用户.
第四章MCP开发
整体开发思路:定义多个工具封装在不同的mcp server里面.一个mcp server可以封装多个tool.
怎么构建mcpserver?
我们使用的是mcp协议定义tools,然后run启动服务器.
4.1创建查询工具
创建通过sql到mysql数据库里查询数据的工具
定义mysql查询工具去查询mysql里面的数据
import mysql.connector
import json
from datetime import date, datetime, timedelta
from decimal import Decimal
from mcp.server.fastmcp import FastMCP
from SmartVoyage.config import Config
from SmartVoyage.create_logger import logger
from SmartVoyage.utils.format import DateEncoder, default_encoder
conf = Config()
# 天气服务类
class WeatherService: # 定义天气服务类,封装数据库操作逻辑
def __init__(self):
# 连接数据库
self.conn = mysql.connector.connect(
host=conf.host,
user=conf.user,
password=conf.password,
database=conf.database
)
# 定义执行SQL查询方法,输入SQL字符串,返回JSON字符串
def execute_query(self, sql: str) -> str:
try:
cursor = self.conn.cursor(dictionary=True)
cursor.execute(sql)
results = cursor.fetchall()
cursor.close()
# 格式化结果
for result in results: # 遍历每个结果字典
for key, value in result.items():
if isinstance(value, (date, datetime, timedelta, Decimal)): # 检查值是否为特殊类型
result[key] = default_encoder(value) # 使用自定义编码器格式化该值
# 序列化为JSON,如果有结果返回success,否则no_data;使用DateEncoder,非ASCII不转义
return json.dumps({"status": "success", "data": results} if results else {"status": "no_data", "message": "未找到天气数据,请确认城市和日期。"}, cls=DateEncoder, ensure_ascii=False)
except Exception as e:
logger.error(f"天气查询错误: {str(e)}")
# 返回错误JSON响应
return json.dumps({"status": "error", "message": str(e)}, ensure_ascii=False)
4.2启动mcp服务器
import mysql.connector
import json
from datetime import date, datetime, timedelta
from decimal import Decimal
from mcp.server.fastmcp import FastMCP
from config import Config
from create_logger import logger
from utils.format import DateEncoder, default_encoder
conf = Config()
# 天气服务类
class WeatherService: # 定义天气服务类,封装数据库操作逻辑
def __init__(self):
# 连接数据库
self.conn = mysql.connector.connect(
host=conf.host,
user=conf.user,
password=conf.password,
database=conf.database
)
# 定义执行SQL查询方法,输入SQL字符串,返回JSON字符串
def execute_query(self, sql: str) -> str:
try:
cursor = self.conn.cursor(dictionary=True)
cursor.execute(sql)
results = cursor.fetchall()
cursor.close()
# 格式化结果
for result in results: # 遍历每个结果字典
for key, value in result.items():
if isinstance(value, (date, datetime, timedelta, Decimal)): # 检查值是否为特殊类型
result[key] = default_encoder(value) # 使用自定义编码器格式化该值
# 序列化为JSON,如果有结果返回success,否则no_data;使用DateEncoder,非ASCII不转义
return json.dumps({"status": "success", "data": results} if results else {"status": "no_data",
"message": "未找到天气数据,请确认城市和日期。"},
cls=DateEncoder, ensure_ascii=False)
except Exception as e:
logger.error(f"天气查询错误: {str(e)}")
# 返回错误JSON响应
return json.dumps({"status": "error", "message": str(e)}, ensure_ascii=False)
def create_weather_mcp_server():
mcp = FastMCP(name='weather_mcp', instructions='天气查询服务', log_level='ERROR', host='localhost', port=8002)
service = WeatherService()
@mcp.tool(name='get_weather', description='查询天气')
def query_weather(sql: str) -> str:
return service.execute_query(sql)
mcp.run(transport='streamable-http')
if __name__ == '__main__':
create_weather_mcp_server()
调用工具方面,一种是自主调用,在工具少且参数少的时候选择,调用更精准.还有一种agent调用,适合工具多参数多的时候自动选择调用工具.在天气调用方面我们用的是自主调用,因为只有一个查询天气的工具
第四章票务功能实现
4.1票务查询
import mysql.connector
import json
from datetime import date, datetime, timedelta
from decimal import Decimal
from mcp.server.fastmcp import FastMCP
from SmartVoyage.config import Config
from SmartVoyage.create_logger import logger
from SmartVoyage.utils.format import DateEncoder, default_encoder
conf = Config()
# 票务服务类
class TicketService: # 定义票务服务类,封装数据库操作逻辑
def __init__(self): # 初始化方法,建立数据库连接
# 连接数据库
self.conn = mysql.connector.connect(
host=conf.host,
user=conf.user,
password=conf.password,
database=conf.database
)
# 定义执行SQL查询方法,输入SQL字符串,返回JSON字符串
def execute_query(self, sql: str) -> str:
try:
cursor = self.conn.cursor(dictionary=True)
cursor.execute(sql)
results = cursor.fetchall()
cursor.close()
# 格式化结果
for result in results: # 遍历每个结果字典
for key, value in result.items():
if isinstance(value, (date, datetime, timedelta, Decimal)): # 检查值是否为特殊类型
result[key] = default_encoder(value) # 使用自定义编码器格式化该值
# 序列化为JSON,如果有结果返回success,否则no_data;使用DateEncoder,非ASCII不转义
return json.dumps({"status": "success", "data": results} if results else {"status": "no_data",
"message": "未找到票务数据,请确认查询条件。"},
cls=DateEncoder, ensure_ascii=False)
except Exception as e:
logger.error(f"票务查询错误: {str(e)}")
# 返回错误JSON响应
return json.dumps({"status": "error", "message": str(e)}, ensure_ascii=False)
4.2票务预定
from mcp.server.fastmcp import FastMCP
from SmartVoyage.config import Config
from SmartVoyage.create_logger import logger
conf = Config()
# 创建FastMCP实例
order_mcp = FastMCP(name="OrderTools",
instructions="票务预定工具,通过调用API完成火车票、飞机票和演唱会票的预定。",
log_level="ERROR",
host="127.0.0.1", port=8003)
@order_mcp.tool(
name="order_train",
description="根据时间、车次、座位类型、数量预定火车票"
)
def order_train(departure_date: str, train_number: str, seat_type: str, number: int) -> str:
'''
Args:
departure_date (str): 出发日期,如 '2025-10-30'
train_number (str): 火车车次,如 'G346'
seat_type (str): 座位类型,如 '二等座'
number (int): 订购张数
'''
logger.info(f"正在订购火车票: {departure_date}, {train_number}, {seat_type}, {number}")
logger.info(f"恭喜,火车票预定成功!")
return "恭喜,火车票预定成功!"
@order_mcp.tool(
name="order_flight",
description="根据时间、班次、座位类型、数量预定飞机票"
)
def order_flight(departure_date: str, flight_number: str, seat_type: str, number: int) -> str:
'''
Args:
departure_date (str): 出发日期,如 '2025-10-30'
flight_number (str): 飞机班次,如 'CA6557'
seat_type (str): 座位类型,如 '经济舱'
number (int): 订购张数
'''
logger.info(f"正在订购飞机票: {departure_date}, {flight_number}, {seat_type}, {number}")
logger.info(f"恭喜,飞机票预定成功!")
return "恭喜,飞机票预定成功!"
@order_mcp.tool(
name="order_concert",
description="根据时间、明星、场地、座位类型、数量预定演出票"
)
def order_concert(start_date: str, aritist: str, venue: str, seat_type: str, number: int) -> str:
'''
Args:
start_date (str): 开始日期,如 '2025-10-30'
aritist (str): 明星,如 '刀郎'
venue (str): 场地,如 '上海体育馆'
seat_type (str): 座位类型,如 '看台'
number (int): 订购张数
'''
logger.info(f"正在订购演出票: {start_date}, {aritist}, {venue}, {seat_type}, {number}")
logger.info(f"恭喜,演出票预定成功!")
return "恭喜,演出票预定成功!"
# 创建票务预定MCP服务器
def create_order_mcp_server():
# 打印服务器信息
logger.info("=== 票务预定MCP服务器信息 ===")
logger.info(f"名称: {order_mcp.name}")
logger.info(f"描述: {order_mcp.instructions}")
# 运行服务器
try:
print("服务器已启动,请访问 http://127.0.0.1:8003/mcp")
order_mcp.run(transport="streamable-http") # 使用 streamable-http 传输方式
except Exception as e:
print(f"服务器启动失败: {e}")
if __name__ == "__main__":
# 调用创建服务器函数
create_order_mcp_server()
第五章天气查询Agent
主要功能:把用户输入的自然语言转换为sql.
怎么判断用户输入的信息是否合规?
我们通过Agent输出格式判断.如果用户输入的信息缺少城市和时间信息,Agent返回json信息给用户需要提供城市和时间信息.如果Agent返回信息是sql说明用户输入信息格式正确,且已处理完成.
如果相关信息可以上下文提取到,那么用户也可以不提供,agent可以自动处理.
实现方式:通过prompt+LLM实现
天气Agent是怎么做的?
定义agent server,在handle_task中实现MCP工具的调用:然后自定义WeatherQueryServer,集成A2Aserver,实现
5.1Agent模式
-
工具调用模式
根据用户query使用调用工具参数完成工具的调用和结果的返回
- react模式
根据工具调用结果分析下一步干什么怎么干
- 反思模式
Agent精度不高可以使用反思模式来修正.他是在调用工具完成一个任务或者整个任务后,对结果进行评估,根据评估结果生成反思,然后根据agent进行修正.
评估的方式有根据提示词自我评估(较简单实现),将用户的反馈作为评估结果(效果比较好),借助外部工具进行评估,记住评估模型进行评估
- 规划模式
先生成一个任务规划,然后逐步完成规划,每个规划的完成可以使用react模式
- 多agent模式
多个agent协作完成任务
5.2整体开发原理

5.3调用工具
使用MCP服务端调用工具有三种结果:
调用成功:改变status状态为sucess,提取artifacts内容返回
信息需要完善:改变status状态为input requested,内容放到message里面返回
调用失败:改变状态statusfailed,报错内容放到message里面
import json
import asyncio
from mcp import ClientSession
from mcp.client.streamable_http import streamablehttp_client
from python_a2a import A2AServer, run_server, AgentCard, AgentSkill, TaskStatus, TaskState, Message, TextContent, \
MessageRole, Task
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from config import Config
from datetime import datetime
import pytz
from create_logger import logger
conf = Config()
# 初始化LLM
llm = ChatOpenAI(
model=conf.model_name,
base_url=conf.base_url,
api_key=conf.api_key,
temperature=0.1
)
# 数据表 schema
table_schema_string = """ # 定义天气数据表的SQL schema字符串,用于Prompt上下文
CREATE TABLE IF NOT EXISTS weather_data (
id INT AUTO_INCREMENT PRIMARY KEY,
city VARCHAR(50) NOT NULL COMMENT '城市名称',
fx_date DATE NOT NULL COMMENT '预报日期',
sunrise TIME COMMENT '日出时间',
sunset TIME COMMENT '日落时间',
moonrise TIME COMMENT '月升时间',
moonset TIME COMMENT '月落时间',
moon_phase VARCHAR(20) COMMENT '月相名称',
moon_phase_icon VARCHAR(10) COMMENT '月相图标代码',
temp_max INT COMMENT '最高温度',
temp_min INT COMMENT '最低温度',
icon_day VARCHAR(10) COMMENT '白天天气图标代码',
text_day VARCHAR(20) COMMENT '白天天气描述',
icon_night VARCHAR(10) COMMENT '夜间天气图标代码',
text_night VARCHAR(20) COMMENT '夜间天气描述',
wind360_day INT COMMENT '白天风向360角度',
wind_dir_day VARCHAR(20) COMMENT '白天风向',
wind_scale_day VARCHAR(10) COMMENT '白天风力等级',
wind_speed_day INT COMMENT '白天风速 (km/h)',
wind360_night INT COMMENT '夜间风向360角度',
wind_dir_night VARCHAR(20) COMMENT '夜间风向',
wind_scale_night VARCHAR(10) COMMENT '夜间风力等级',
wind_speed_night INT COMMENT '夜间风速 (km/h)',
precip DECIMAL(5,1) COMMENT '降水量 (mm)',
uv_index INT COMMENT '紫外线指数',
humidity INT COMMENT '相对湿度 (%)',
pressure INT COMMENT '大气压强 (hPa)',
vis INT COMMENT '能见度 (km)',
cloud INT COMMENT '云量 (%)',
update_time DATETIME COMMENT '数据更新时间',
UNIQUE KEY unique_city_date (city, fx_date)
) ENGINE=INNODB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='天气数据表';
"""
# 生成SQL的提示词
sql_prompt = ChatPromptTemplate.from_template(
"""
系统提示:你是一个专业的天气SQL生成器,需要从对话历史(含用户的问题)中提取关键信息,然后基于weather_data表生成SELECT语句。
- 如果用户需要查天气,则至少需要城市和时间信息。如果对话历史中缺乏必要的信息,可以向其追问,输出格式为json格式,如示例所示;如果对话历史中信息齐全,则输出纯SQL即可。
- 如果用户问与天气无关的问题,则模仿最后2个示例回复即可。
示例:
- 对话: user: 北京 2025-07-30
输出: SELECT city, fx_date, temp_max, temp_min, text_day, text_night, humidity, wind_dir_day, precip FROM weather_data WHERE city = '北京' AND fx_date = '2025-07-30'
- 对话: user: 上海未来3天的天气
输出: SELECT city, fx_date, temp_max, temp_min, text_day, text_night, humidity, wind_dir_day, precip FROM weather_data WHERE city = '上海' AND fx_date BETWEEN '2025-07-30' AND '2025-08-01' ORDER BY fx_date
- 对话: user: 北京的天气
输出: {{"status": "input_required", "message": "请提供具体的需要查询的日期,例如 '2025-07-30'。"}}
- 对话: user: 今天\nassistant: 请提供城市。\nuser: 北京
输出: SELECT city, fx_date, temp_max, temp_min, text_day, text_night, humidity, wind_dir_day, precip FROM weather_data WHERE city = '北京' AND fx_date = '2025-07-30'
- 对话: user: 北京明天的天气\nassistant: 多云。\nuser: 后天呢
输出: SELECT city, fx_date, temp_max, temp_min, text_day, text_night, humidity, wind_dir_day, precip FROM weather_data WHERE city = '北京' AND fx_date = '2025-08-01'
- 对话: user: 你好
输出: {{"status": "input_required", "message": "请提供城市和日期,例如 '北京 2025-07-30'。"}}
- 对话: user: 今天有什么好吃的
输出: {{"status": "input_required", "message": "请提供天气相关查询,包括城市和日期。"}}
weather_data表结构:{table_schema_string}
对话历史: {conversation}
当前日期: {current_date} (Asia/Shanghai)
"""
)
# Agent卡片定义
agent_card = AgentCard(
name="WeatherQueryAssistant",
description="基于LangChain提供天气查询服务的助手",
url="http://localhost:5005",
version="1.0.0",
capabilities={"streaming": True, "memory": True}, # 设置能力:支持流式和内存
skills=[ # 定义技能列表
AgentSkill(
name="execute weather query",
description="执行天气查询,返回天气数据库结果,支持自然语言输入",
examples=["北京 2025-07-30 天气", "上海未来5天", "今天天气如何"]
)
]
)
# 创建一个异步函数,来实现天气MCP客户端的创建及使用
async def get_weather(sql):
try:
# 启动mcp server,并通过标准输入输出创建异步链接
async with streamablehttp_client("http://localhost:8002/mcp") as (read, write, _):
# 使用读写流创建MCP会话对象【这个session对象就是客户端】
async with ClientSession(read, write) as session:
# 初始化session
await session.initialize()
try:
# 通过session调用工具
result = await session.call_tool('get_weather', {"sql":sql})
# print(f'MCP tool result-->{result.content[0].text}')
result_data = json.loads(result) if isinstance(result, str) else result
logger.info(f"天气查询结果:{result_data}")
return result_data.content[0].text
except Exception as e:
logger.error(f"天气 MCP 测试出错:{str(e)}")
return {"status": "error", "message": f"天气 MCP 查询出错:{str(e)}"}
except Exception as e:
logger.error(f"连接或会话初始化时发生错误: {e}")
return {"status": "error", "message": "连接或会话初始化时发生错误"}
# 天气查询服务器类
class WeatherQueryServer(A2AServer):
def __init__(self):
super().__init__(agent_card=agent_card)
self.llm = llm
self.sql_prompt = sql_prompt
self.schema = table_schema_string
# 定义生成SQL查询方法,输入对话历史,返回SQL或追问JSON
def generate_sql_query(self, conversation: str) -> dict:
try:
# 组装链
chain = self.sql_prompt | self.llm
# 调用链
out = chain.invoke({"conversation": conversation, "current_date": datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d"), "table_schema_string": self.schema}).content.strip()
logger.info(f"生成SQL结果: {out}")
# 处理生成SQL的结果
if out.startswith("{"): # 如果是JSON格式,则返回
return json.loads(out)
else: # 如果是SQL格式,则执行并返回结果
return {"status": "sql", "sql": out}
except Exception as e:
logger.error(f"SQL生成失败: {str(e)}")
return {"status": "input_required", "message": "查询无效,请提供城市和日期。"} # 返回追问JSON
# 任务处理函数
def handle_task(self, task):
# 1.获取任务内容
query = (task.message or {}).get("content", {}).get("text", "")
logger.info(f"任务内容: {query}")
try:
# 2.基于用户的问题生成SQL
sql_result = self.generate_sql_query(query)
# print(f'sql_result-->{sql_result}')
# 处理生成的结果
if sql_result["status"] == "input_required": # 如果是追问问题,则直接进行返回
task.status = TaskStatus(state=TaskState.INPUT_REQUIRED, message={"role": "agent", "content":{"text": sql_result["message"]}})
return task
else: # 否则,生成了SQL,则执行并返回结果
sql_query = sql_result["sql"]
logger.info(f"SQL查询: {sql_query}")
# 3.调用MCP工具获取查询结果
weather_result = asyncio.run(get_weather(sql_query))
# 4.格式化结果
response = json.loads(weather_result) if isinstance(weather_result, str) else weather_result
logger.info(f"调用MCP工具的结果: {response}")
# 检查响应状态
if response.get("status") == "success":
data = response.get("data", []) # 提取数据列表
response_text = "\n".join([f"{d['city']} {d['fx_date']}: {d['text_day']}(夜间 {d['text_night']}),温度 {d['temp_min']}-{d['temp_max']}°C,湿度 {d['humidity']}%,风向 {d['wind_dir_day']},降水 {d['precip']}mm" for d in data]) # 格式化每个数据项为友好文本,连接成多行
# print(f"天气查询结果:\n{response_text}")
# 将查询结构封装到 artifacts中,并将状态设置成完成
task.artifacts = [{"parts": [{"type": "text", "text": response_text}]}]
task.status = TaskStatus(state=TaskState.COMPLETED)
elif response.get("status") == "no_data":
response_text = response.get("message", "没有找到天气数据,请确认城市和日期。")
task.status = TaskStatus(state=TaskState.INPUT_REQUIRED, message={"role": "agent", "content":{"text": response_text}})
else:
response_text = response.get("message", "查询失败,请重试或者提供详细的城市和日期")
task.status = TaskStatus(state=TaskState.FAILED, message={"role": "agent", "content":{"text": response_text}})
return task
except Exception as e: # 捕获异常
logger.error(f"查询失败: {str(e)}")
# 设置任务状态为失败,添加错误信息
task.status = TaskStatus(state=TaskState.FAILED,
message={"role": "agent",
"content": {"text": f"查询失败: {str(e)} 请重试或提供更多细节。"}})
return task
if __name__ == '__main__':
# # 测试 handle_task
# server = WeatherQueryServer()
# query = "查询一下北京今天的天气"
# # query = "查询一下北京的天气"
# message = Message(content=TextContent(text=query), role=MessageRole.USER)
# task = Task(message=message.to_dict())
# server.handle_task(task)
# 创建并运行服务器
# 实例化天气查询服务器
weather_server = WeatherQueryServer()
# 打印服务器信息
print("\n=== 服务器信息 ===")
print(f"名称: {weather_server.agent_card.name}")
print(f"描述: {weather_server.agent_card.description}")
print("\n技能:")
for skill in weather_server.agent_card.skills:
print(f"- {skill.name}: {skill.description}")
# 运行服务器
run_server(weather_server, host="127.0.0.1", port=5005)
第五章美化输出
若成功获取结果(status为complete),从artifacts里面提取数据进行prompt+LLM进行答案优化.
若获取答案失败,从message里面提取数据
使用异步的优势:
解决高并发,可以获取task完整数据(session id,id......)
5.1调用逻辑流程

第六章票务查询
6.1执行流程
把用户意图转化为sql,到数据库里面查询.
query里面包含的信息是出发地,目的地和时间或者直接提供车次.如果缺少信息,那agent返回信息提示用户补充
6.2代码实现
import json
import asyncio
from mcp import ClientSession
from mcp.client.streamable_http import streamablehttp_client
from python_a2a import A2AServer, run_server, AgentCard, AgentSkill, TaskStatus, TaskState, Message, TextContent, MessageRole, Task
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from datetime import datetime
import pytz
from config import Config
from create_logger import logger
conf = Config()
# 初始化LLM
llm = ChatOpenAI(
model=conf.model_name,
base_url=conf.base_url,
api_key=conf.api_key,
temperature=0.1
)
# 数据表 schema
table_schema_string = """ # 定义票务表SQL schema字符串,用于Prompt上下文
CREATE TABLE train_tickets (
id INT AUTO_INCREMENT PRIMARY KEY COMMENT '主键,自增,唯一标识每条记录',
departure_city VARCHAR(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '出发城市(如“北京”)',
arrival_city VARCHAR(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '到达城市(如“上海”)',
departure_time DATETIME NOT NULL COMMENT '出发时间(如“2025-08-12 07:00:00”)',
arrival_time DATETIME NOT NULL COMMENT '到达时间(如“2025-08-12 11:30:00”)',
train_number VARCHAR(20) NOT NULL COMMENT '火车车次(如“G1001”)',
seat_type VARCHAR(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '座位类型(如“二等座”)',
total_seats INT NOT NULL COMMENT '总座位数(如 1000)',
remaining_seats INT NOT NULL COMMENT '剩余座位数(如 50)',
price DECIMAL(10, 2) NOT NULL COMMENT '票价(如 553.50)',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间,自动记录插入时间',
UNIQUE KEY unique_train (departure_time, train_number)
) COMMENT='火车票信息表';
-- 机票表
CREATE TABLE flight_tickets (
id INT AUTO_INCREMENT PRIMARY KEY COMMENT '主键,自增,唯一标识每条记录',
departure_city VARCHAR(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '出发城市(如“北京”)',
arrival_city VARCHAR(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '到达城市(如“上海”)',
departure_time DATETIME NOT NULL COMMENT '出发时间(如“2025-08-12 08:00:00”)',
arrival_time DATETIME NOT NULL COMMENT '到达时间(如“2025-08-12 10:30:00”)',
flight_number VARCHAR(20) NOT NULL COMMENT '航班号(如“CA1234”)',
cabin_type VARCHAR(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '舱位类型(如“经济舱”)',
total_seats INT NOT NULL COMMENT '总座位数(如 200)',
remaining_seats INT NOT NULL COMMENT '剩余座位数(如 10)',
price DECIMAL(10, 2) NOT NULL COMMENT '票价(如 1200.00)',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间,自动记录插入时间',
UNIQUE KEY unique_flight (departure_time, flight_number)
) COMMENT='航班机票信息表';
-- 演唱会票表
CREATE TABLE concert_tickets (
id INT AUTO_INCREMENT PRIMARY KEY COMMENT '主键,自增,唯一标识每条记录',
artist VARCHAR(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '艺人名称(如“周杰伦”)',
city VARCHAR(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '举办城市(如“上海”)',
venue VARCHAR(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '场馆(如“上海体育场”)',
start_time DATETIME NOT NULL COMMENT '开始时间(如“2025-08-12 19:00:00”)',
end_time DATETIME NOT NULL COMMENT '结束时间(如“2025-08-12 22:00:00”)',
ticket_type VARCHAR(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '票类型(如“VIP”)',
total_seats INT NOT NULL COMMENT '总座位数(如 5000)',
remaining_seats INT NOT NULL COMMENT '剩余座位数(如 100)',
price DECIMAL(10, 2) NOT NULL COMMENT '票价(如 880.00)',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间,自动记录插入时间',
UNIQUE KEY unique_concert (start_time, artist, ticket_type)
) COMMENT='演唱会门票信息表';
"""
# 生成SQL的提示词
sql_prompt = ChatPromptTemplate.from_template(
"""
系统提示:你是一个专业的票务SQL生成器,需要从对话历史(含用户的问题)中提取用户的意图以及关键信息,然后基于train_tickets、flight_tickets、concert_tickets表生成SELECT语句。
根据对话历史:
1. 提取用户的意图,意图有3种(train: 火车/高铁, flight: 机票, concert: 演唱会),输出:{{"type": "train/flight/concert"}};如果无法识别意图,或者意图不在这3种内,则模仿最后1个示例回复即可。
2. 根据用户的意图,生成对应表的 SELECT 语句,仅查询指定字段:
- train_tickets: id, departure_city, arrival_city, departure_time, arrival_time, train_number, seat_type, price, remaining_seats
- flight_tickets: id, departure_city, arrival_city, departure_time, arrival_time, flight_number, cabin_type, price, remaining_seats
- concert_tickets: id, artist, city, venue, start_time, end_time, ticket_type, price, remaining_seats
3. 如果用户在查询票务信息时,缺少必要信息,则输出:{{"status": "input_required", "message": "请提供票务类型(如火车票、机票、演唱会)和必要信息(如城市、日期)。"}} ,如示例所示;如果对话历史中信息齐全,则输出纯SQL即可。
其中,每种意图必要的信息有:
- flight/train: 【departure_city (出发城市), arrival_city (到达城市), date (日期)】 或 【train_number/flight_number (车次)】
- concert: city (城市), artist (艺人), date (日期)。
4. 按要求输出两行数据或一行数据即可,不需要输出其他内容。
示例:
- 对话: user: 火车票 北京 上海 2025-07-31 硬卧
输出:
{{"type": "train"}}
SELECT id, departure_city, arrival_city, departure_time, arrival_time, train_number, seat_type, price, remaining_seats FROM train_tickets WHERE departure_city = '北京' AND arrival_city = '上海' AND DATE(departure_time) = '2025-07-31' AND seat_type = '硬卧'
- 对话: user: 机票 上海 广州 2025-09-11 头等舱
输出:
{{"type": "flight"}}
SELECT id, departure_city, arrival_city, departure_time, arrival_time, flight_number, cabin_type, price, remaining_seats FROM flight_tickets WHERE departure_city = '上海' AND arrival_city = '广州' AND DATE(departure_time) = '2025-09-11' AND cabin_type = '头等舱'
- 对话: user: 演唱会 北京 刀郎 2025-08-23 看台
输出:
{{"type": "concert"}}
SELECT id, artist, city, venue, start_time, end_time, ticket_type, price, remaining_seats FROM concert_tickets WHERE city = '北京' AND artist = '刀郎' AND DATE(start_time) = '2025-08-23' AND ticket_type = '看台'
- 对话: user: 火车票
输出:
{{"status": "input_required", "message": "请提供票务类型(如火车票、机票、演唱会)和必要信息(如城市、日期)。"}}
- 对话: user: 你好
输出:
{{"status": "input_required", "message": "请提供票务类型(如火车票、机票、演唱会)和必要信息(如城市、日期)。"}}
表结构:{table_schema_string}
对话历史: {conversation}
当前日期: {current_date} (Asia/Shanghai)
"""
)
# 定义查询函数
async def get_ticket_info(sql):
try:
# 启动 MCP server,通过streamable建立连接
async with streamablehttp_client("http://127.0.0.1:8001/mcp") as (read, write, _):
# 使用读写通道创建 MCP 会话
async with ClientSession(read, write) as session:
try:
await session.initialize()
# 工具调用
result = await session.call_tool("query_tickets", {"sql": sql})
result_data = json.loads(result) if isinstance(result, str) else result
logger.info(f"票务查询结果:{result_data}")
return result_data.content[0].text
except Exception as e:
logger.error(f"票务 MCP 测试出错:{str(e)}")
return {"status": "error", "message": f"票务 MCP 查询出错:{str(e)}"}
except Exception as e:
logger.error(f"连接或会话初始化时发生错误: {e}")
return {"status": "error", "message": "连接或会话初始化时发生错误"}
# Agent 卡片定义
agent_card = AgentCard(
name="TicketQueryAssistant",
description="基于 LangChain 提供票务查询服务的助手",
url="http://localhost:5006",
version="1.0.4",
capabilities={"streaming": True, "memory": True},
skills=[
AgentSkill(
name="execute ticket query",
description="根据客户端提供的输入执行票务查询,返回数据库结果,支持自然语言输入",
examples=["火车票 北京 上海 2025-07-31 硬卧", "机票 北京 上海 2025-07-31 经济舱",
"演唱会 北京 刀郎 2025-08-23 看台"]
)
]
)
# 票务查询服务器类
class TicketQueryServer(A2AServer):
def __init__(self):
super().__init__(agent_card=agent_card)
self.llm = llm
self.sql_prompt = sql_prompt
self.schema = table_schema_string
# 定义生成SQL查询方法,输入对话历史,返回SQL或追问JSON
def generate_sql_query(self, conversation: str) -> dict:
try:
# 组装链
chain = self.sql_prompt | self.llm
# 调用链
current_date = datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d') # 获取当前日期,格式化为字符串
output = chain.invoke({"conversation": conversation, "current_date": current_date, "table_schema_string": self.schema}).content.strip()
logger.info(f"原始 LLM 输出: {output}")
# 处理结果,返回字典
lines = output.split('\n')
type_line = lines[0].strip()
if type_line.startswith('```json'): # 检查是否以```json开头
type_line = lines[1].strip() # 取下一行为类型行
sql_lines = lines[3:-1] if lines[-1].strip() == '```' else lines[3:] # 提取SQL行,跳过代码块标记
else:
sql_lines = lines[1:] if len(lines) > 1 else [] # 取剩余行为SQL行
# 提取 type 和 SQL
if type_line.startswith('{"type":'): # 如果以{"type":开头
query_type = json.loads(type_line)["type"] # 解析并提取类型
sql_query = ' '.join([line.strip() for line in sql_lines if line.strip() and not line.startswith('```')]) # 连接SQL行,过滤空行和代码块
logger.info(f"分类类型: {query_type}, 生成的 SQL: {sql_query}")
return {"status": "sql", "type": query_type, "sql": sql_query} # 返回SQL状态字典,包括类型
elif type_line.startswith('{"status": "input_required"'): # 检查是否为追问JSON
return json.loads(type_line)
else: # 无效格式
logger.error(f"无效的 LLM 输出格式: {output}")
return {"status": "input_required", "message": "无法解析查询类型或SQL,请提供更明确的信息。"} # 返回默认追问
except Exception as e:
logger.error(f"SQL 生成失败: {str(e)}")
return {"status": "input_required", "message": "查询无效,请提供查询票务的相关信息。"} # 返回追问JSON
# 处理任务:提取输入,生成SQL,调用MCP,格式化结果
def handle_task(self, task):
# 1 提取输入
content = (task.message or {}).get("content", {}) # 从消息中获取内容
# 提取conversation,即客户端发起的任务中的query语句
conversation = content.get("text", "") if isinstance(content, dict) else ""
logger.info(f"对话历史及用户问题: {conversation}")
try:
# 2 基于用户问题生成SQL查询
gen_result = self.generate_sql_query(conversation)
# 检查是否需要追问,如果是则添加追问消息后返回任务
if gen_result["status"] == "input_required":
task.status = TaskStatus(state=TaskState.INPUT_REQUIRED,
message={"role": "agent", "content": {"text": gen_result["message"]}})
return task
# 否则则提取SQL查询,并进行MCP调用
sql_query = gen_result["sql"]
query_type = gen_result["type"]
logger.info(f"执行 SQL 查询: {sql_query} (类型: {query_type})")
# 3 调用MCP
ticket_result = asyncio.run(get_ticket_info(sql_query))
# 4 格式化结果
response = json.loads(ticket_result) if isinstance(ticket_result, str) else ticket_result
logger.info(f"MCP 返回: {response}")
# 检查响应状态
if response.get("status") == "success":
data = response.get("data", []) # 提取数据列表
response_text = "" # 初始化响应文本
for d in data: # 遍历每个数据项
if query_type == "train": # 火车票类型
response_text += f"{d['departure_city']} 到 {d['arrival_city']} {d['departure_time']}: 车次 {d['train_number']},{d['seat_type']},票价 {d['price']}元,剩余 {d['remaining_seats']} 张\n" # 格式化火车票文本
elif query_type == "flight": # 机票类型
response_text += f"{d['departure_city']} 到 {d['arrival_city']} {d['departure_time']}: 航班 {d['flight_number']},{d['cabin_type']},票价 {d['price']}元,剩余 {d['remaining_seats']} 张\n" # 格式化机票文本
elif query_type == "concert": # 演唱会类型
response_text += f"{d['city']} {d['start_time']}: {d['artist']} 演唱会,{d['ticket_type']},场地 {d['venue']},票价 {d['price']}元,剩余 {d['remaining_seats']} 张\n" # 格式化演唱会文本
if not response_text: # 检查文本是否为空
response_text = "无结果。如果需要其他日期,请补充。"
# 设置任务产物为文本部分,并设置任务状态为完成
task.artifacts = [{"parts": [{"type": "text", "text": response_text}]}]
task.status = TaskStatus(state=TaskState.COMPLETED)
elif response.get("status") == "no_data":
response_text = response.get("message", "请输出查询票务的详细信息。")
# 设置任务状态为输入所需,添加追问消息
task.status = TaskStatus(state=TaskState.INPUT_REQUIRED,
message={"role": "agent", "content": {"text": response_text}})
else:
response_text = response.get("message", "查询失败,请重试或提供更多细节。")
# 设置任务状态为失败,添加错误信息
task.status = TaskStatus(state=TaskState.FAILED,
message={"role": "agent", "content": {"text": response_text}})
return task
except Exception as e: # 捕获异常
logger.error(f"查询失败: {str(e)}")
# 设置任务状态为失败,添加错误信息
task.status = TaskStatus(state=TaskState.FAILED,
message={"role": "agent",
"content": {"text": f"查询失败: {str(e)} 请重试或提供更多细节。"}})
return task
if __name__ == "__main__":
# # 测试 handle_task
# server = TicketQueryServer()
# query = "查一下1月6日从北京到上海的火车票"
# # query = "查询一下北京的天气"
# message = Message(content=TextContent(text=query), role=MessageRole.USER)
# task = Task(message=message.to_dict())
# server.handle_task(task)
# 创建并运行服务器
# 实例化票务查询服务器
ticket_server = TicketQueryServer()
# 打印服务器信息
print("\n=== 服务器信息 ===")
print(f"名称: {ticket_server.agent_card.name}")
print(f"描述: {ticket_server.agent_card.description}")
print("\n技能:")
for skill in ticket_server.agent_card.skills:
print(f"- {skill.name}: {skill.description}")
# 运行服务器
run_server(ticket_server, host="127.0.0.1", port=5006)
第七章票务预定
7.1特殊点
票务预定之前要先和票务查询交互(没查到信息提示用户重新输入时间)
参数和工具较多,使用agent调用工具
订票成功:把信息八方artifacts里面,改变状态为complete
订票失败:把信息放到message里面,状态为failed
import json
import asyncio
from mcp import ClientSession
from mcp.client.streamable_http import streamablehttp_client
from python_a2a import A2AServer, run_server, AgentCard, AgentSkill, TaskStatus, TaskState, Message, TextContent, MessageRole, Task
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from datetime import datetime
import pytz
from config import Config
from create_logger import logger
conf = Config()
# 初始化LLM
llm = ChatOpenAI(
model=conf.model_name,
base_url=conf.base_url,
api_key=conf.api_key,
temperature=0.1
)
# 数据表 schema
table_schema_string = """ # 定义票务表SQL schema字符串,用于Prompt上下文
CREATE TABLE train_tickets (
id INT AUTO_INCREMENT PRIMARY KEY COMMENT '主键,自增,唯一标识每条记录',
departure_city VARCHAR(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '出发城市(如“北京”)',
arrival_city VARCHAR(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '到达城市(如“上海”)',
departure_time DATETIME NOT NULL COMMENT '出发时间(如“2025-08-12 07:00:00”)',
arrival_time DATETIME NOT NULL COMMENT '到达时间(如“2025-08-12 11:30:00”)',
train_number VARCHAR(20) NOT NULL COMMENT '火车车次(如“G1001”)',
seat_type VARCHAR(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '座位类型(如“二等座”)',
total_seats INT NOT NULL COMMENT '总座位数(如 1000)',
remaining_seats INT NOT NULL COMMENT '剩余座位数(如 50)',
price DECIMAL(10, 2) NOT NULL COMMENT '票价(如 553.50)',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间,自动记录插入时间',
UNIQUE KEY unique_train (departure_time, train_number)
) COMMENT='火车票信息表';
-- 机票表
CREATE TABLE flight_tickets (
id INT AUTO_INCREMENT PRIMARY KEY COMMENT '主键,自增,唯一标识每条记录',
departure_city VARCHAR(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '出发城市(如“北京”)',
arrival_city VARCHAR(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '到达城市(如“上海”)',
departure_time DATETIME NOT NULL COMMENT '出发时间(如“2025-08-12 08:00:00”)',
arrival_time DATETIME NOT NULL COMMENT '到达时间(如“2025-08-12 10:30:00”)',
flight_number VARCHAR(20) NOT NULL COMMENT '航班号(如“CA1234”)',
cabin_type VARCHAR(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '舱位类型(如“经济舱”)',
total_seats INT NOT NULL COMMENT '总座位数(如 200)',
remaining_seats INT NOT NULL COMMENT '剩余座位数(如 10)',
price DECIMAL(10, 2) NOT NULL COMMENT '票价(如 1200.00)',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间,自动记录插入时间',
UNIQUE KEY unique_flight (departure_time, flight_number)
) COMMENT='航班机票信息表';
-- 演唱会票表
CREATE TABLE concert_tickets (
id INT AUTO_INCREMENT PRIMARY KEY COMMENT '主键,自增,唯一标识每条记录',
artist VARCHAR(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '艺人名称(如“周杰伦”)',
city VARCHAR(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '举办城市(如“上海”)',
venue VARCHAR(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '场馆(如“上海体育场”)',
start_time DATETIME NOT NULL COMMENT '开始时间(如“2025-08-12 19:00:00”)',
end_time DATETIME NOT NULL COMMENT '结束时间(如“2025-08-12 22:00:00”)',
ticket_type VARCHAR(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '票类型(如“VIP”)',
total_seats INT NOT NULL COMMENT '总座位数(如 5000)',
remaining_seats INT NOT NULL COMMENT '剩余座位数(如 100)',
price DECIMAL(10, 2) NOT NULL COMMENT '票价(如 880.00)',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间,自动记录插入时间',
UNIQUE KEY unique_concert (start_time, artist, ticket_type)
) COMMENT='演唱会门票信息表';
"""
# 生成SQL的提示词
sql_prompt = ChatPromptTemplate.from_template(
"""
系统提示:你是一个专业的票务SQL生成器,需要从对话历史(含用户的问题)中提取用户的意图以及关键信息,然后基于train_tickets、flight_tickets、concert_tickets表生成SELECT语句。
根据对话历史:
1. 提取用户的意图,意图有3种(train: 火车/高铁, flight: 机票, concert: 演唱会),输出:{{"type": "train/flight/concert"}};如果无法识别意图,或者意图不在这3种内,则模仿最后1个示例回复即可。
2. 根据用户的意图,生成对应表的 SELECT 语句,仅查询指定字段:
- train_tickets: id, departure_city, arrival_city, departure_time, arrival_time, train_number, seat_type, price, remaining_seats
- flight_tickets: id, departure_city, arrival_city, departure_time, arrival_time, flight_number, cabin_type, price, remaining_seats
- concert_tickets: id, artist, city, venue, start_time, end_time, ticket_type, price, remaining_seats
3. 如果用户在查询票务信息时,缺少必要信息,则输出:{{"status": "input_required", "message": "请提供票务类型(如火车票、机票、演唱会)和必要信息(如城市、日期)。"}} ,如示例所示;如果对话历史中信息齐全,则输出纯SQL即可。
其中,每种意图必要的信息有:
- flight/train: 【departure_city (出发城市), arrival_city (到达城市), date (日期)】 或 【train_number/flight_number (车次)】
- concert: city (城市), artist (艺人), date (日期)。
4. 按要求输出两行数据或一行数据即可,不需要输出其他内容。
示例:
- 对话: user: 火车票 北京 上海 2025-07-31 硬卧
输出:
{{"type": "train"}}
SELECT id, departure_city, arrival_city, departure_time, arrival_time, train_number, seat_type, price, remaining_seats FROM train_tickets WHERE departure_city = '北京' AND arrival_city = '上海' AND DATE(departure_time) = '2025-07-31' AND seat_type = '硬卧'
- 对话: user: 机票 上海 广州 2025-09-11 头等舱
输出:
{{"type": "flight"}}
SELECT id, departure_city, arrival_city, departure_time, arrival_time, flight_number, cabin_type, price, remaining_seats FROM flight_tickets WHERE departure_city = '上海' AND arrival_city = '广州' AND DATE(departure_time) = '2025-09-11' AND cabin_type = '头等舱'
- 对话: user: 演唱会 北京 刀郎 2025-08-23 看台
输出:
{{"type": "concert"}}
SELECT id, artist, city, venue, start_time, end_time, ticket_type, price, remaining_seats FROM concert_tickets WHERE city = '北京' AND artist = '刀郎' AND DATE(start_time) = '2025-08-23' AND ticket_type = '看台'
- 对话: user: 火车票
输出:
{{"status": "input_required", "message": "请提供票务类型(如火车票、机票、演唱会)和必要信息(如城市、日期)。"}}
- 对话: user: 你好
输出:
{{"status": "input_required", "message": "请提供票务类型(如火车票、机票、演唱会)和必要信息(如城市、日期)。"}}
表结构:{table_schema_string}
对话历史: {conversation}
当前日期: {current_date} (Asia/Shanghai)
"""
)
# 定义查询函数
async def get_ticket_info(sql):
try:
# 启动 MCP server,通过streamable建立连接
async with streamablehttp_client("http://127.0.0.1:8001/mcp") as (read, write, _):
# 使用读写通道创建 MCP 会话
async with ClientSession(read, write) as session:
try:
await session.initialize()
# 工具调用
result = await session.call_tool("query_tickets", {"sql": sql})
result_data = json.loads(result) if isinstance(result, str) else result
logger.info(f"票务查询结果:{result_data}")
return result_data.content[0].text
except Exception as e:
logger.error(f"票务 MCP 测试出错:{str(e)}")
return {"status": "error", "message": f"票务 MCP 查询出错:{str(e)}"}
except Exception as e:
logger.error(f"连接或会话初始化时发生错误: {e}")
return {"status": "error", "message": "连接或会话初始化时发生错误"}
# Agent 卡片定义
agent_card = AgentCard(
name="TicketQueryAssistant",
description="基于 LangChain 提供票务查询服务的助手",
url="http://localhost:5006",
version="1.0.4",
capabilities={"streaming": True, "memory": True},
skills=[
AgentSkill(
name="execute ticket query",
description="根据客户端提供的输入执行票务查询,返回数据库结果,支持自然语言输入",
examples=["火车票 北京 上海 2025-07-31 硬卧", "机票 北京 上海 2025-07-31 经济舱",
"演唱会 北京 刀郎 2025-08-23 看台"]
)
]
)
# 票务查询服务器类
class TicketQueryServer(A2AServer):
def __init__(self):
super().__init__(agent_card=agent_card)
self.llm = llm
self.sql_prompt = sql_prompt
self.schema = table_schema_string
# 定义生成SQL查询方法,输入对话历史,返回SQL或追问JSON
def generate_sql_query(self, conversation: str) -> dict:
try:
# 组装链
chain = self.sql_prompt | self.llm
# 调用链
current_date = datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d') # 获取当前日期,格式化为字符串
output = chain.invoke({"conversation": conversation, "current_date": current_date, "table_schema_string": self.schema}).content.strip()
logger.info(f"原始 LLM 输出: {output}")
# 处理结果,返回字典
lines = output.split('\n')
type_line = lines[0].strip()
if type_line.startswith('```json'): # 检查是否以```json开头
type_line = lines[1].strip() # 取下一行为类型行
sql_lines = lines[3:-1] if lines[-1].strip() == '```' else lines[3:] # 提取SQL行,跳过代码块标记
else:
sql_lines = lines[1:] if len(lines) > 1 else [] # 取剩余行为SQL行
# 提取 type 和 SQL
if type_line.startswith('{"type":'): # 如果以{"type":开头
query_type = json.loads(type_line)["type"] # 解析并提取类型
sql_query = ' '.join([line.strip() for line in sql_lines if line.strip() and not line.startswith('```')]) # 连接SQL行,过滤空行和代码块
logger.info(f"分类类型: {query_type}, 生成的 SQL: {sql_query}")
return {"status": "sql", "type": query_type, "sql": sql_query} # 返回SQL状态字典,包括类型
elif type_line.startswith('{"status": "input_required"'): # 检查是否为追问JSON
return json.loads(type_line)
else: # 无效格式
logger.error(f"无效的 LLM 输出格式: {output}")
return {"status": "input_required", "message": "无法解析查询类型或SQL,请提供更明确的信息。"} # 返回默认追问
except Exception as e:
logger.error(f"SQL 生成失败: {str(e)}")
return {"status": "input_required", "message": "查询无效,请提供查询票务的相关信息。"} # 返回追问JSON
# 处理任务:提取输入,生成SQL,调用MCP,格式化结果
def handle_task(self, task):
# 1 提取输入
content = (task.message or {}).get("content", {}) # 从消息中获取内容
# 提取conversation,即客户端发起的任务中的query语句
conversation = content.get("text", "") if isinstance(content, dict) else ""
logger.info(f"对话历史及用户问题: {conversation}")
try:
# 2 基于用户问题生成SQL查询
gen_result = self.generate_sql_query(conversation)
# 检查是否需要追问,如果是则添加追问消息后返回任务
if gen_result["status"] == "input_required":
task.status = TaskStatus(state=TaskState.INPUT_REQUIRED,
message={"role": "agent", "content": {"text": gen_result["message"]}})
return task
# 否则则提取SQL查询,并进行MCP调用
sql_query = gen_result["sql"]
query_type = gen_result["type"]
logger.info(f"执行 SQL 查询: {sql_query} (类型: {query_type})")
# 3 调用MCP
ticket_result = asyncio.run(get_ticket_info(sql_query))
# 4 格式化结果
response = json.loads(ticket_result) if isinstance(ticket_result, str) else ticket_result
logger.info(f"MCP 返回: {response}")
# 检查响应状态
if response.get("status") == "success":
data = response.get("data", []) # 提取数据列表
response_text = "" # 初始化响应文本
for d in data: # 遍历每个数据项
if query_type == "train": # 火车票类型
response_text += f"{d['departure_city']} 到 {d['arrival_city']} {d['departure_time']}: 车次 {d['train_number']},{d['seat_type']},票价 {d['price']}元,剩余 {d['remaining_seats']} 张\n" # 格式化火车票文本
elif query_type == "flight": # 机票类型
response_text += f"{d['departure_city']} 到 {d['arrival_city']} {d['departure_time']}: 航班 {d['flight_number']},{d['cabin_type']},票价 {d['price']}元,剩余 {d['remaining_seats']} 张\n" # 格式化机票文本
elif query_type == "concert": # 演唱会类型
response_text += f"{d['city']} {d['start_time']}: {d['artist']} 演唱会,{d['ticket_type']},场地 {d['venue']},票价 {d['price']}元,剩余 {d['remaining_seats']} 张\n" # 格式化演唱会文本
if not response_text: # 检查文本是否为空
response_text = "无结果。如果需要其他日期,请补充。"
# 设置任务产物为文本部分,并设置任务状态为完成
task.artifacts = [{"parts": [{"type": "text", "text": response_text}]}]
task.status = TaskStatus(state=TaskState.COMPLETED)
elif response.get("status") == "no_data":
response_text = response.get("message", "请输出查询票务的详细信息。")
# 设置任务状态为输入所需,添加追问消息
task.status = TaskStatus(state=TaskState.INPUT_REQUIRED,
message={"role": "agent", "content": {"text": response_text}})
else:
response_text = response.get("message", "查询失败,请重试或提供更多细节。")
# 设置任务状态为失败,添加错误信息
task.status = TaskStatus(state=TaskState.FAILED,
message={"role": "agent", "content": {"text": response_text}})
return task
except Exception as e: # 捕获异常
logger.error(f"查询失败: {str(e)}")
# 设置任务状态为失败,添加错误信息
task.status = TaskStatus(state=TaskState.FAILED,
message={"role": "agent",
"content": {"text": f"查询失败: {str(e)} 请重试或提供更多细节。"}})
return task
if __name__ == "__main__":
# # 测试 handle_task
# server = TicketQueryServer()
# query = "查一下1月6日从北京到上海的火车票"
# # query = "查询一下北京的天气"
# message = Message(content=TextContent(text=query), role=MessageRole.USER)
# task = Task(message=message.to_dict())
# server.handle_task(task)
# 创建并运行服务器
# 实例化票务查询服务器
ticket_server = TicketQueryServer()
# 打印服务器信息
print("\n=== 服务器信息 ===")
print(f"名称: {ticket_server.agent_card.name}")
print(f"描述: {ticket_server.agent_card.description}")
print("\n技能:")
for skill in ticket_server.agent_card.skills:
print(f"- {skill.name}: {skill.description}")
# 运行服务器
run_server(ticket_server, host="127.0.0.1", port=5006)
第八章main程序
8.1模块
意图识别:根据prompt+LLM实现用户意图识别
查询改写:根据上下文结合用户query完善用户query
agent反馈:agent server判断信息是否完善,若不完善,反馈让用户完善query
8.2代码
把agent添加到network当中
import asyncio
import json
import uuid
from datetime import datetime
import pytz
import re
from python_a2a import AgentNetwork, TextContent, Message, MessageRole, Task
from langchain_openai import ChatOpenAI
from SmartVoyage.config import Config
from SmartVoyage.create_logger import logger
from SmartVoyage.main_prompts import SmartVoyagePrompts
conf = Config()
# 初始化全局变量,用于模拟会话状态 这些变量替换了Streamlit的session_state
messages = [] # 存储对话历史消息列表,每个元素为字典 {"role": "user/assistant", "content": "消息内容"}
agent_network = None # 代理网络实例
llm = None # 大语言模型实例
agent_urls = {} # 存储代理的URL信息字典
conversation_history = "" # 存储整个对话历史字符串,用于意图识别
# 初始化代理网络和相关组件 此部分在脚本启动时执行一次,模拟Streamlit的初始化
def initialize_system():
"""
初始化系统组件,包括代理网络、路由器、LLM和会话状态
核心逻辑:构建AgentNetwork,添加代理,创建路由器和LLM
"""
global agent_network, llm, agent_urls, conversation_history
# 存储代理URL信息,便于查看
agent_urls = {
"WeatherQueryAssistant": "http://localhost:5005", # 天气代理URL
"TicketQueryAssistant": "http://localhost:5006", # 票务代理URL
"TicketOrderAssistant": "http://localhost:5007" # 票务预定URL
}
# 创建代理网络
network = AgentNetwork(name="旅行助手网络")
network.add("WeatherQueryAssistant", "http://localhost:5005")
network.add("TicketQueryAssistant", "http://localhost:5006")
network.add("TicketOrderAssistant", "http://localhost:5007")
agent_network = network
# 加载配置并创建LLM
llm = ChatOpenAI(
model=conf.model_name,
api_key=conf.api_key,
base_url=conf.base_url,
temperature=0.1
)
# 初始化对话历史为空字符串
conversation_history = ""
# 意图识别agent
def intent_agent(user_input):
global conversation_history, llm
# 创建意图识别链:提示模板 + LLM
chain = SmartVoyagePrompts.intent_prompt() | llm
# 调用chain进行意图识别
current_date = datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d")
# 为了防止上下文过长,这里只取最近6条对话
result = chain.invoke({
"current_date": current_date,
"conversation_history": '\n'.join(conversation_history.split("\n")[-6:]),
"query": user_input
}).content.strip()
logger.info(f"意图识别结果:{result}")
# 处理意图识别结果
intent_result = json.loads(result)
# 提取意图、改写问题和追问消息
intents = intent_result.get("intents", [])
user_queries = intent_result.get("user_queries", {})
follow_up_message = intent_result.get("follow_up_message", "")
logger.info(f"意图类型:{intents}, 改写问题:{user_queries}, 追问消息:{follow_up_message}")
return intents, user_queries, follow_up_message
# 处理用户输入的核心函数
# 此函数模拟Streamlit的输入处理逻辑,包括意图识别、路由和响应生成
def process_user_input(prompt):
"""
处理用户输入:识别意图、调用代理、生成响应
核心逻辑:使用LLM进行意图识别,根据意图路由到相应代理或直接生成内容
"""
global messages, conversation_history, llm
# 添加用户消息到历史
messages.append({"role": "user", "content": prompt})
conversation_history += f"\nUser: {prompt}"
print("正在分析您的意图...")
try:
# 1.意图识别
intents, user_queries, follow_up_message = intent_agent(prompt)
# 2.根据意图进行判断,进行不同处理
if "out_of_scope" in intents:
# 如果意图超出范围,则返回意图 'follow_up_message' 中的内容
response = follow_up_message
conversation_history += f"\nAssistant: {response}"
elif follow_up_message != "":
# 如果有歧义,也就是追问消息不为空,则返回该消息
response = follow_up_message
conversation_history += f"\nAssistant: {response}"
else: # 处理有效意图
responses = [] # 存储每个意图的响应列表
routed_agents = [] # 记录路由到的代理列表
for intent in intents:
logger.info(f"处理意图:{intent}")
# 根据意图确定代理名称
if intent == "weather":
agent_name = "WeatherQueryAssistant"
elif intent in ["flight", "train", "concert"]:
agent_name = "TicketQueryAssistant"
elif intent == "order":
agent_name = "TicketOrderAssistant"
else:
agent_name = None
# 不同意图处理方式
if intent == "attraction":
# 对于景点推荐,直接使用LLM生成
chain = SmartVoyagePrompts.attraction_prompt() | llm
rec_response = chain.invoke({"query": prompt}).content.strip()
responses.append(rec_response)
elif agent_name: # 对于业务agent对应的意图,进行agent server调用
# 1)获取改写后的问题
query_str = user_queries.get(intent, prompt)
# 2)获取代理实例
agent = agent_network.get_agent(agent_name)
# 3)将历史对话信息+新查询作为输入,调用agent
# 注意:需要将原始的用户查询去掉,然后将改写后的查询添加到历史对话中
chat_history = '\n'.join(conversation_history.split("\n")[-7:-1]) + f'\nUser: {query_str}'
message = Message(content=TextContent(text=chat_history), role=MessageRole.USER)
task = Task(id="task-" + str(uuid.uuid4()), message=message.to_dict())
raw_response = asyncio.run(agent.send_task_async(task))
logger.info(f"{agent_name} 原始响应: {raw_response}") # 记录原始响应日志
# 4)处理响应
if raw_response.status.state == 'completed': # 正常结果
agent_result = raw_response.artifacts[0]['parts'][0]['text']
else: # 异常结果
agent_result = raw_response.status.message['content']['text']
# 调用结果优化agent,优化结果
if agent_name == "WeatherQueryAssistant":
chain = SmartVoyagePrompts.summarize_weather_prompt() | llm
final_response = chain.invoke(
{"query": query_str, "raw_response": agent_result}).content.strip()
elif agent_name == "TicketQueryAssistant":
chain = SmartVoyagePrompts.summarize_ticket_prompt() | llm
final_response = chain.invoke(
{"query": query_str, "raw_response": agent_result}).content.strip()
else:
final_response = agent_result
# 5)添加到历史
responses.append(final_response) # 添加到响应列表
routed_agents.append(agent_name) # 记录路由代理
else:
# 不支持的意图
responses.append("暂不支持此意图。")
# 组合所有响应
response = "\n\n".join(responses)
if routed_agents:
logger.info(f"路由到代理:{routed_agents}")
conversation_history += f"\nAssistant: {response}" # 更新历史
# 输出助手响应(模拟Streamlit的显示)
print(f"\n助手回复:\n{response}\n") # 打印响应
# 添加到消息历史
messages.append({"role": "assistant", "content": response})
except json.JSONDecodeError as json_err:
# 处理JSON解析错误
logger.error(f"意图识别JSON解析失败")
error_message = f"意图识别JSON解析失败:{str(json_err)}。请重试。"
print(f"\n助手回复:\n{error_message}\n") # 打印错误
messages.append({"role": "assistant", "content": error_message})
except Exception as e:
# 处理其他异常
logger.error(f"处理异常: {str(e)}")
error_message = f"处理失败:{str(e)}。请重试。"
print(f"\n助手回复:\n{error_message}\n") # 打印错误
messages.append({"role": "assistant", "content": error_message})
# 显示代理卡片信息
# 此函数模拟Streamlit的右侧Agent Card,打印代理详情
def display_agent_cards():
"""
显示所有代理的卡片信息,包括技能、描述、地址和状态
核心逻辑:遍历代理网络,获取并打印卡片内容
"""
print("\n🛠️ Agent Cards:")
for agent_name in agent_network.agents.keys():
# 获取代理卡片
agent_card = agent_network.get_agent_card(agent_name)
agent_url = agent_urls.get(agent_name, "未知地址")
print(f"\n--- Agent: {agent_name} ---")
print(f"技能: {agent_card.skills}")
print(f"描述: {agent_card.description}")
print(f"地址: {agent_url}")
print(f"状态: 在线") # 固定状态为在线
if __name__ == '__main__':
# initialize_system()
# # intent_agent("查询北京到上海的机票,1月3日的")
#
# process_user_input("查询北京到上海的机票,1月16日的")
# 初始化系统
initialize_system()
print("🤖 基于A2A的SmartVoyage旅行智能助手")
print("欢迎体验智能对话!输入问题,按回车提交;输入'quit'退出;输入'cards'查看代理卡片。")
# 显示初始代理卡片
display_agent_cards()
# 交互循环:模拟Streamlit的连续输入
while True:
# 获取用户输入
prompt = input("\n请输入您的问题: ").strip()
if prompt.lower() == 'quit':
print("感谢使用SmartVoyage!再见!")
break
elif prompt.lower() == 'cards': # 查看卡片条件
display_agent_cards() # 重新显示卡片
continue
elif not prompt: # 空输入跳过
continue
else:
# 处理输入
process_user_input(prompt) # 调用核心处理函数
# 脚本结束时打印页脚信息
print("\n---")
print("Powered by 黑马程序员 | 基于Agent2Agent的旅行助手系统 v2.0")
第九章意图识别
message:向用户展示上下文对话
history_message:给agent传输上下文对话
9.1LLM+prompt实现
from langchain_core.prompts import ChatPromptTemplate
class SmartVoyagePrompts:
# 定义意图识别提示模板
@staticmethod
def intent_prompt():
return ChatPromptTemplate.from_template(
"""
系统提示:您是一个专业的旅行意图识别专家,基于用户查询和对话历史,识别其意图,用于调用专门的agent server来执行;为方便后续的agent server处理,可以基于对话历史对用户查询进行改写,使问题更明确。严格遵守规则:
- 支持意图:['weather' (天气查询), 'flight' (机票查询), 'train' (高铁/火车票查询), 'concert' (演唱会票查询), 'order' (票务预定), 'attraction' (景点推荐)] 或其组合(如 ['weather', 'flight'])。如果意图超出范围,返回意图 'out_of_scope'。
- 注意票务预定和票务查询要区分开,涉及到订票时则为order,只是查询则为flight、train或concert。
- 如果意图为 'out_of_scope'时,此时不需要再进行查询改写,你可以直接根据用户问题进行回复,将回复答案写到follow_up_message中即可。
- 在进行用户查询改写时,不要回答其问题,也不要修改其原意,只需要将对话历史中跟该查询相关的上下文信息取出来,然后整合到一起,使用户查询更明确即可,要仔细分析上下文信息,不要进行过度整合。如果用户查询跟对话历史无关,则输出原始查询。
- 如果用户的意图很不明确或者有歧义,可以向其进行追问,将追问问题填充到follow_up_message中。
- 输出严格为JSON:{{"intents": ["intent1", "intent2"], "user_queries": {{"intent1": "user_query1", "intent2": "user_query2"}}, "follow_up_message": "追问消息"}}。不要添加额外文本!
输出示例:
{{"intents": ["weather"], "user_queries": {{"weather": "今天北京天气如何"}}, "follow_up_message": ""}}
{{"intents": ["weather"], "user_queries": {{}}, "follow_up_message": "你问的是今天北京天气状况吗"}}
{{"intents": ["weather", "flight"], "user_queries": {{"weather": "今天北京天气如何", "flight": "查询一下10月28日,从北京飞往杭州的机票"}}, "follow_up_message": ""}}
{{"intents": ["out_of_scope"], "user_queries": {{}}, "follow_up_message": "你好,我是智能旅行助手,欢迎您向我提问"}}
当前日期:{current_date} (Asia/Shanghai)。
对话历史:{conversation_history}
用户查询:{query}
""")
# 定义天气结果总结提示模板,用于LLM总结天气查询的原始响应
@staticmethod
def summarize_weather_prompt():
return ChatPromptTemplate.from_template(
"""
系统提示:您是一位专业的天气预报员,以生动、准确的风格总结天气信息。基于查询和结果:
- 核心描述点:城市、日期、温度范围、天气描述、湿度、风向、降水等。
- 如果结果为空或者意思为需要补充数据,则委婉提示“未找到数据,请确认城市/日期”
- 语气:专业预报,如“根据最新数据,北京2025-07-31的天气预报为...”。
- 保持中文,100-150字。
- 如果查询无关,返回“请提供天气相关查询。”
查询:{query}
结果:{raw_response}
""")
# 定义票务结果总结提示模板,用于LLM总结票务查询的原始响应
@staticmethod
def summarize_ticket_prompt():
return ChatPromptTemplate.from_template(
"""
系统提示:您是一位专业的票务顾问,以热情、精确的风格总结票务信息。基于查询和结果:
- 核心描述点:出发/到达、时间、类型、价格、剩余座位等。
- 如果结果为空或者意思为需要补充数据,则委婉提示“未找到数据,请确认或修改条件”
- 语气:顾问式,如“为您推荐北京到上海的机票选项...”。
- 保持中文,100-150字。
- 如果查询无关,返回“请提供票务相关查询。”
查询:{query}
结果:{raw_response}
""")
# 定义景点推荐提示模板,用于LLM直接生成景点推荐内容
@staticmethod
def attraction_prompt():
return ChatPromptTemplate.from_template(
"""
系统提示:您是一位旅行专家,基于用户查询生成景点推荐。规则:
- 推荐3-5个景点,包含描述、理由、注意事项。
- 基于槽位:城市、偏好。
- 语气:热情推荐,如“推荐您在北京探索故宫...”。
- 备注:内容生成,仅供参考。
- 保持中文,150-250字。
查询:{query}
""")
if __name__ == '__main__':
print(SmartVoyagePrompts.intent_prompt())
第十章景点推荐
使用LLM+prompt实现
更多推荐



所有评论(0)