智能体平台全流程评测与实现方案
智能体平台全流程评测与实现方案
·
智能体平台全流程评测与实现方案
1. 项目概述
1.1 项目背景
本项目将围绕智能体使用体验评测主题,结合多智能体协作和MCP服务接入,展示从创建到部署全过程的智能体开发体验。
1.2 技术栈
- 后端框架: FastAPI + SQLAlchemy
- 智能体框架: LangChain + OpenAI
- 前端框架: Streamlit(演示界面)
- 数据库: PostgreSQL + Redis
- 容器化: Docker + Docker Compose
- 版本控制: Git
2. UML建模
2.1 用例图
2.2 类图
2.3 序列图 - 多智能体协作流程
3. 项目文件结构
agent-platform/
├── README.md
├── docker-compose.yml
├── .env.example
├── requirements.txt
├── .gitignore
├── src/
│ ├── __init__.py
│ ├── main.py
│ ├── api/
│ │ ├── __init__.py
│ │ ├── routes/
│ │ │ ├── __init__.py
│ │ │ ├── agents.py
│ │ │ ├── knowledge_base.py
│ │ │ ├── mcp.py
│ │ │ └── prompts.py
│ │ └── dependencies.py
│ ├── core/
│ │ ├── __init__.py
│ │ ├── config.py
│ │ ├── database.py
│ │ └── security.py
│ ├── models/
│ │ ├── __init__.py
│ │ ├── schemas.py
│ │ ├── agent.py
│ │ ├── knowledge.py
│ │ └── user.py
│ ├── services/
│ │ ├── __init__.py
│ │ ├── agent_service.py
│ │ ├── knowledge_service.py
│ │ ├── prompt_service.py
│ │ ├── mcp_service.py
│ │ └── multi_agent_service.py
│ ├── agents/
│ │ ├── __init__.py
│ │ ├── base_agent.py
│ │ ├── research_agent.py
│ │ ├── writing_agent.py
│ │ ├── review_agent.py
│ │ └── orchestrator.py
│ ├── tools/
│ │ ├── __init__.py
│ │ ├── web_search.py
│ │ ├── calculator.py
│ │ ├── file_processor.py
│ │ └── mcp_client.py
│ ├── prompts/
│ │ ├── __init__.py
│ │ ├── templates.py
│ │ ├── generator.py
│ │ └── optimizer.py
│ ├── knowledge/
│ │ ├── __init__.py
│ │ ├── vector_store.py
│ │ ├── document_processor.py
│ │ └── summary_generator.py
│ └── utils/
│ ├── __init__.py
│ ├── logger.py
│ ├── helpers.py
│ └── validators.py
├── tests/
│ ├── __init__.py
│ ├── test_agents.py
│ ├── test_knowledge.py
│ └── test_mcp.py
├── scripts/
│ ├── setup.sh
│ ├── deploy.sh
│ └── test.sh
└── docs/
├── api.md
├── architecture.md
└── user_guide.md
4. 核心实现代码
4.1 基础配置 (src/core/config.py)
from pydantic_settings import BaseSettings
from typing import Optional
class Settings(BaseSettings):
# API配置
API_V1_STR: str = "/api/v1"
PROJECT_NAME: str = "Agent Platform"
# 数据库配置
DATABASE_URL: str
REDIS_URL: str
# OpenAI配置
OPENAI_API_KEY: str
OPENAI_MODEL: str = "gpt-4-turbo"
# 向量数据库配置
QDRANT_HOST: str = "localhost"
QDRANT_PORT: int = 6333
# MCP配置
MCP_SERVICES: dict = {
"web_search": "http://localhost:8001",
"calculator": "http://localhost:8002",
"weather": "http://localhost:8003"
}
class Config:
env_file = ".env"
settings = Settings()
4.2 基础智能体类 (src/agents/base_agent.py)
from abc import ABC, abstractmethod
from typing import Dict, Any, List, Optional
from langchain.agents import AgentExecutor, create_openai_tools_agent
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain.tools import BaseTool
from langchain.memory import ConversationBufferMemory
import logging
logger = logging.getLogger(__name__)
class BaseAgent(ABC):
def __init__(
self,
name: str,
description: str,
tools: List[BaseTool],
model_name: str = "gpt-4-turbo",
temperature: float = 0.1
):
self.name = name
self.description = description
self.tools = tools
self.llm = ChatOpenAI(
model_name=model_name,
temperature=temperature
)
self.memory = ConversationBufferMemory(
memory_key="chat_history",
return_messages=True
)
self.agent_executor: Optional[AgentExecutor] = None
def initialize(self):
"""初始化智能体"""
prompt = ChatPromptTemplate.from_messages([
("system", self._get_system_prompt()),
MessagesPlaceholder(variable_name="chat_history"),
("human", "{input}"),
MessagesPlaceholder(variable_name="agent_scratchpad")
])
agent = create_openai_tools_agent(
llm=self.llm,
tools=self.tools,
prompt=prompt
)
self.agent_executor = AgentExecutor(
agent=agent,
tools=self.tools,
memory=self.memory,
verbose=True,
handle_parsing_errors=True
)
logger.info(f"Agent '{self.name}' initialized successfully")
@abstractmethod
def _get_system_prompt(self) -> str:
"""获取系统提示词"""
pass
async def execute(self, input_text: str) -> Dict[str, Any]:
"""执行任务"""
if not self.agent_executor:
raise RuntimeError("Agent not initialized. Call initialize() first.")
try:
result = await self.agent_executor.ainvoke({
"input": input_text
})
return {
"success": True,
"output": result.get("output", ""),
"intermediate_steps": result.get("intermediate_steps", [])
}
except Exception as e:
logger.error(f"Agent execution failed: {e}")
return {
"success": False,
"error": str(e),
"output": ""
}
def add_tool(self, tool: BaseTool):
"""添加工具"""
self.tools.append(tool)
if self.agent_executor:
self.initialize() # 重新初始化以包含新工具
def clear_memory(self):
"""清除记忆"""
self.memory.clear()
4.3 研究智能体实现 (src/agents/research_agent.py)
from .base_agent import BaseAgent
from langchain.tools import Tool
from langchain_community.tools import DuckDuckGoSearchRun
from typing import List, Dict, Any
import json
class ResearchAgent(BaseAgent):
def __init__(self):
search_tool = DuckDuckGoSearchRun()
tools = [
Tool(
name="web_search",
func=search_tool.run,
description="Search the web for current information"
),
Tool(
name="analyze_data",
func=self._analyze_data,
description="Analyze and summarize research data"
)
]
super().__init__(
name="Research Agent",
description="Specialized in researching and gathering information from various sources",
tools=tools
)
def _get_system_prompt(self) -> str:
return """You are a research assistant specialized in gathering, analyzing, and summarizing information.
Your responsibilities:
1. Search for relevant information using available tools
2. Analyze search results and extract key insights
3. Organize information in a structured format
4. Provide citations and sources when possible
5. Identify knowledge gaps and suggest further research areas
Always maintain objectivity and verify information from multiple sources when possible.
"""
def _analyze_data(self, data: str) -> str:
"""分析研究数据"""
try:
# 这里可以集成更复杂的分析逻辑
data_dict = json.loads(data) if isinstance(data, str) else data
analysis = {
"key_findings": [],
"sources": [],
"confidence_level": "medium",
"recommendations": []
}
# 简单的分析逻辑
if isinstance(data_dict, dict):
for key, value in data_dict.items():
if isinstance(value, (str, int, float)):
analysis["key_findings"].append(f"{key}: {value}")
return json.dumps(analysis, ensure_ascii=False, indent=2)
except Exception as e:
return f"Analysis error: {str(e)}"
async def research_topic(self, topic: str, depth: str = "overview") -> Dict[str, Any]:
"""研究特定主题"""
prompt = f"""
Please research the following topic: {topic}
Research depth: {depth}
Provide:
6. Comprehensive overview
7. Key facts and data points
8. Recent developments
9. Relevant sources and references
10. Potential areas for further investigation
Format the response in a structured JSON format.
"""
result = await self.execute(prompt)
if result["success"]:
try:
# 尝试解析JSON输出
parsed_output = json.loads(result["output"])
result["parsed_output"] = parsed_output
except:
# 如果不是JSON,保持原样
result["parsed_output"] = {"raw_output": result["output"]}
return result
4.4 多智能体协调器 (src/agents/orchestrator.py)
from typing import Dict, Any, List, Optional
from .base_agent import BaseAgent
from .research_agent import ResearchAgent
from .writing_agent import WritingAgent
from .review_agent import ReviewAgent
import asyncio
import logging
logger = logging.getLogger(__name__)
class Orchestrator:
def __init__(self):
self.agents = {
"research": ResearchAgent(),
"writing": WritingAgent(),
"review": ReviewAgent()
}
# 初始化所有智能体
for agent in self.agents.values():
agent.initialize()
async def coordinate_task(
self,
task_description: str,
workflow_type: str = "research_report"
) -> Dict[str, Any]:
"""协调多智能体完成任务"""
if workflow_type == "research_report":
return await self._research_report_workflow(task_description)
elif workflow_type == "content_creation":
return await self._content_creation_workflow(task_description)
else:
return await self._default_workflow(task_description)
async def _research_report_workflow(self, task: str) -> Dict[str, Any]:
"""研究报告工作流"""
logger.info(f"Starting research report workflow for: {task}")
results = {}
# 阶段1: 研究
research_result = await self.agents["research"].research_topic(task, "comprehensive")
results["research"] = research_result
if not research_result["success"]:
return {
"success": False,
"error": "Research phase failed",
"results": results
}
# 阶段2: 写作
writing_prompt = f"""
Based on the following research findings, write a comprehensive report:
Research Findings:
{research_result.get('output', '')}
Topic: {task}
Requirements:
1. Create a well-structured report with introduction, body, and conclusion
2. Include key findings and insights
3. Use clear and professional language
4. Add section headings for better readability
"""
writing_result = await self.agents["writing"].execute(writing_prompt)
results["writing"] = writing_result
if not writing_result["success"]:
return {
"success": False,
"error": "Writing phase failed",
"results": results
}
# 阶段3: 评审
review_prompt = f"""
Please review the following report:
{writing_result.get('output', '')}
Check for:
1. Factual accuracy
2. Logical coherence
3. Grammar and style issues
4. Structure and organization
5. Overall quality and completeness
Provide specific feedback and suggestions for improvement.
"""
review_result = await self.agents["review"].execute(review_prompt)
results["review"] = review_result
# 整合最终结果
final_output = self._aggregate_results(results)
return {
"success": True,
"task": task,
"workflow": "research_report",
"final_output": final_output,
"phase_results": results
}
def _aggregate_results(self, results: Dict[str, Any]) -> Dict[str, Any]:
"""聚合各阶段结果"""
aggregated = {
"research_summary": results.get("research", {}).get("parsed_output", {}),
"report_content": results.get("writing", {}).get("output", ""),
"review_feedback": results.get("review", {}).get("output", ""),
"quality_score": self._calculate_quality_score(results)
}
return aggregated
def _calculate_quality_score(self, results: Dict[str, Any]) -> float:
"""计算质量评分"""
score = 0.0
if results.get("research", {}).get("success"):
score += 0.3
if results.get("writing", {}).get("success"):
score += 0.4
if results.get("review", {}).get("success"):
score += 0.3
return score
4.5 MCP服务客户端 (src/tools/mcp_client.py)
import httpx
from typing import Dict, Any, Optional
import json
import logging
logger = logging.getLogger(__name__)
class MCPClient:
def __init__(self, service_url: str, service_name: str):
self.service_url = service_url
self.service_name = service_name
self.client = httpx.AsyncClient(timeout=30.0)
async def invoke(
self,
method: str,
params: Dict[str, Any],
endpoint: str = "/invoke"
) -> Dict[str, Any]:
"""调用MCP服务"""
url = f"{self.service_url}{endpoint}"
payload = {
"method": method,
"params": params,
"jsonrpc": "2.0",
"id": 1
}
try:
response = await self.client.post(
url,
json=payload,
headers={"Content-Type": "application/json"}
)
if response.status_code == 200:
result = response.json()
if "error" in result:
logger.error(f"MCP service error: {result['error']}")
return {
"success": False,
"error": result["error"]
}
return {
"success": True,
"result": result.get("result", {})
}
else:
logger.error(f"MCP service HTTP error: {response.status_code}")
return {
"success": False,
"error": f"HTTP {response.status_code}"
}
except Exception as e:
logger.error(f"MCP service call failed: {e}")
return {
"success": False,
"error": str(e)
}
async def discover_capabilities(self) -> Dict[str, Any]:
"""发现服务能力"""
return await self.invoke("discover", {})
async def close(self):
"""关闭客户端"""
await self.client.aclose()
class MCPServiceRegistry:
def __init__(self):
self.services: Dict[str, MCPClient] = {}
def register_service(self, name: str, url: str) -> MCPClient:
"""注册MCP服务"""
client = MCPClient(url, name)
self.services[name] = client
logger.info(f"Registered MCP service: {name} at {url}")
return client
def get_service(self, name: str) -> Optional[MCPClient]:
"""获取MCP服务客户端"""
return self.services.get(name)
async def discover_all_services(self) -> Dict[str, Any]:
"""发现所有服务的能力"""
capabilities = {}
for name, client in self.services.items():
try:
result = await client.discover_capabilities()
capabilities[name] = result
except Exception as e:
logger.error(f"Failed to discover capabilities for {name}: {e}")
capabilities[name] = {"error": str(e)}
return capabilities
4.6 知识库管理 (src/knowledge/vector_store.py)
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct
from langchain_openai import OpenAIEmbeddings
from typing import List, Dict, Any, Optional
import uuid
import json
import logging
logger = logging.getLogger(__name__)
class KnowledgeVectorStore:
def __init__(
self,
host: str = "localhost",
port: int = 6333,
collection_name: str = "knowledge_base"
):
self.client = QdrantClient(host=host, port=port)
self.collection_name = collection_name
self.embeddings = OpenAIEmbeddings()
# 确保集合存在
self._ensure_collection()
def _ensure_collection(self):
"""确保向量集合存在"""
try:
collections = self.client.get_collections().collections
collection_names = [col.name for col in collections]
if self.collection_name not in collection_names:
self.client.create_collection(
collection_name=self.collection_name,
vectors_config=VectorParams(
size=1536, # OpenAI embedding size
distance=Distance.COSINE
)
)
logger.info(f"Created collection: {self.collection_name}")
except Exception as e:
logger.error(f"Failed to ensure collection: {e}")
raise
async def add_documents(
self,
documents: List[Dict[str, Any]],
metadata: Optional[Dict[str, Any]] = None
) -> List[str]:
"""添加文档到知识库"""
doc_ids = []
points = []
for doc in documents:
doc_id = str(uuid.uuid4())
content = doc.get("content", "")
# 生成向量
try:
embedding = await self.embeddings.aembed_query(content)
# 创建点结构
point = PointStruct(
id=doc_id,
vector=embedding,
payload={
"content": content,
"metadata": metadata or {},
"source": doc.get("source", ""),
"title": doc.get("title", ""),
"doc_type": doc.get("type", "general")
}
)
points.append(point)
doc_ids.append(doc_id)
except Exception as e:
logger.error(f"Failed to embed document: {e}")
continue
if points:
# 批量添加
self.client.upsert(
collection_name=self.collection_name,
points=points
)
logger.info(f"Added {len(doc_ids)} documents to knowledge base")
return doc_ids
async def search(
self,
query: str,
top_k: int = 5,
score_threshold: float = 0.7
) -> List[Dict[str, Any]]:
"""搜索相似文档"""
try:
# 生成查询向量
query_embedding = await self.embeddings.aembed_query(query)
# 搜索向量数据库
search_result = self.client.search(
collection_name=self.collection_name,
query_vector=query_embedding,
limit=top_k,
score_threshold=score_threshold
)
results = []
for hit in search_result:
result = {
"id": hit.id,
"score": hit.score,
"content": hit.payload.get("content", ""),
"source": hit.payload.get("source", ""),
"title": hit.payload.get("title", ""),
"metadata": hit.payload.get("metadata", {})
}
results.append(result)
return results
except Exception as e:
logger.error(f"Search failed: {e}")
return []
def delete_document(self, doc_id: str) -> bool:
"""删除文档"""
try:
self.client.delete(
collection_name=self.collection_name,
points_selector=[doc_id]
)
return True
except Exception as e:
logger.error(f"Failed to delete document: {e}")
return False
def get_collection_info(self) -> Dict[str, Any]:
"""获取集合信息"""
try:
collection_info = self.client.get_collection(self.collection_name)
return {
"name": collection_info.name,
"vectors_count": collection_info.vectors_count,
"status": collection_info.status,
"config": {
"params": dict(collection_info.config.params),
"hnsw_config": dict(collection_info.config.hnsw_config) if collection_info.config.hnsw_config else {}
}
}
except Exception as e:
logger.error(f"Failed to get collection info: {e}")
return {}
4.7 知识库自动总结 (src/knowledge/summary_generator.py)
from langchain_openai import ChatOpenAI
from langchain.chains.summarize import load_summarize_chain
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.docstore.document import Document
from typing import List, Dict, Any
import asyncio
import logging
logger = logging.getLogger(__name__)
class SummaryGenerator:
def __init__(self, model_name: str = "gpt-4-turbo"):
self.llm = ChatOpenAI(
model_name=model_name,
temperature=0.1
)
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=4000,
chunk_overlap=200,
length_function=len,
separators=["\n\n", "\n", " ", ""]
)
async def generate_summary(
self,
content: str,
summary_type: str = "concise",
max_length: int = 500
) -> Dict[str, Any]:
"""生成内容摘要"""
try:
# 分割文本
texts = self.text_splitter.split_text(content)
docs = [Document(page_content=text) for text in texts]
# 根据摘要类型选择chain
if summary_type == "detailed":
chain_type = "refine"
prompt_template = """
Please provide a detailed summary of the following text:
{text}
Include:
1. Main topics and themes
2. Key arguments and evidence
3. Important examples and case studies
4. Conclusions and recommendations
5. Any significant limitations or controversies
"""
elif summary_type == "bullet_points":
chain_type = "map_reduce"
prompt_template = """
Extract key points from the following text as bullet points:
{text}
Format as:
• Point 1
• Point 2
• Point 3
"""
else: # concise
chain_type = "stuff"
prompt_template = f"""
Provide a concise summary of the following text in {max_length} words or less:
{{text}}
Focus on the most important information.
"""
# 加载摘要chain
chain = load_summarize_chain(
self.llm,
chain_type=chain_type,
verbose=True
)
# 生成摘要
summary = await chain.arun(docs)
# 提取关键词
keywords = await self._extract_keywords(content)
return {
"success": True,
"summary": summary.strip(),
"keywords": keywords,
"summary_type": summary_type,
"original_length": len(content),
"summary_length": len(summary)
}
except Exception as e:
logger.error(f"Summary generation failed: {e}")
return {
"success": False,
"error": str(e),
"summary": "",
"keywords": []
}
async def _extract_keywords(self, content: str, top_n: int = 10) -> List[str]:
"""提取关键词"""
prompt = f"""
Extract the top {top_n} most important keywords or key phrases from the following text:
{content[:2000]} # 限制长度
Return as a comma-separated list.
"""
try:
response = await self.llm.ainvoke(prompt)
keywords_text = response.content.strip()
keywords = [k.strip() for k in keywords_text.split(",") if k.strip()]
return keywords[:top_n]
except Exception as e:
logger.error(f"Keyword extraction failed: {e}")
return []
async def batch_summarize(
self,
documents: List[Dict[str, Any]],
summary_type: str = "concise"
) -> List[Dict[str, Any]]:
"""批量生成摘要"""
tasks = []
for doc in documents:
task = self.generate_summary(
content=doc.get("content", ""),
summary_type=summary_type
)
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
processed_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
processed_results.append({
"success": False,
"error": str(result),
"document_index": i
})
else:
result["document_index"] = i
processed_results.append(result)
return processed_results
4.8 提示词自动生成与优化 (src/prompts/generator.py)
from langchain_openai import ChatOpenAI
from typing import Dict, Any, List, Optional
import json
import re
import logging
logger = logging.getLogger(__name__)
class PromptGenerator:
def __init__(self, model_name: str = "gpt-4-turbo"):
self.llm = ChatOpenAI(
model_name=model_name,
temperature=0.7
)
self.templates = self._load_default_templates()
def _load_default_templates(self) -> Dict[str, Any]:
"""加载默认模板"""
return {
"analysis": {
"description": "For analyzing and interpreting information",
"template": """As an expert analyst, please analyze the following:
CONTEXT:
{context}
ANALYSIS REQUEST:
{request}
Please provide:
1. Key insights and observations
2. Patterns or trends identified
3. Potential implications
4. Recommendations or next steps
Format your response clearly with appropriate headings."""
},
"creative": {
"description": "For creative writing and brainstorming",
"template": """As a creative assistant, please help with:
TOPIC/REQUEST:
{topic}
ADDITIONAL CONTEXT:
{context}
Please generate creative content that is:
- Engaging and original
- Appropriate for the target audience: {audience}
- Aligned with the tone: {tone}
Length: {length} words"""
},
"technical": {
"description": "For technical documentation and explanations",
"template": """As a technical expert, please explain:
TECHNICAL TOPIC:
{topic}
AUDIENCE LEVEL:
{audience_level}
Please provide:
1. Clear explanation of concepts
2. Relevant examples or code snippets
3. Common use cases
4. Best practices or warnings
5. Further reading/resources
Ensure accuracy and clarity."""
}
}
async def generate_prompt(
self,
prompt_type: str,
variables: Dict[str, Any],
custom_template: Optional[str] = None
) -> Dict[str, Any]:
"""生成提示词"""
if custom_template:
template = custom_template
elif prompt_type in self.templates:
template = self.templates[prompt_type]["template"]
else:
template = "{input}" # 默认模板
# 填充模板变量
try:
prompt = template.format(**variables)
# 生成优化建议
optimization_suggestions = await self._generate_optimization_suggestions(prompt)
return {
"success": True,
"prompt": prompt,
"prompt_type": prompt_type,
"variables_used": list(variables.keys()),
"optimization_suggestions": optimization_suggestions,
"metadata": {
"template_source": "custom" if custom_template else "builtin",
"variable_count": len(variables)
}
}
except KeyError as e:
missing_var = str(e).strip("'")
return {
"success": False,
"error": f"Missing variable: {missing_var}",
"available_variables": list(variables.keys()),
"required_variables": self._extract_template_variables(template)
}
except Exception as e:
logger.error(f"Prompt generation failed: {e}")
return {
"success": False,
"error": str(e)
}
async def _generate_optimization_suggestions(self, prompt: str) -> List[str]:
"""生成优化建议"""
analysis_prompt = f"""
Analyze the following prompt and provide optimization suggestions:
PROMPT:
{prompt}
Please evaluate:
1. Clarity and specificity
2. Potential ambiguities
3. Missing context or constraints
4. Opportunities for better structure
5. Suggestions for improved results
Provide 3-5 specific, actionable suggestions.
"""
try:
response = await self.llm.ainvoke(analysis_prompt)
# 解析建议
suggestions_text = response.content.strip()
suggestions = []
# 尝试按项目符号分割
lines = suggestions_text.split("\n")
for line in lines:
line = line.strip()
if line.startswith(("-", "•", "*", "1.", "2.", "3.", "4.", "5.")):
# 移除项目符号
clean_line = re.sub(r'^[•\-\*\d\.\s]+', '', line)
if clean_line:
suggestions.append(clean_line)
# 如果没有解析出项目符号,按句分割
if not suggestions:
sentences = re.split(r'[.!?]+', suggestions_text)
suggestions = [s.strip() for s in sentences if s.strip()][:5]
return suggestions[:5] # 最多返回5条建议
except Exception as e:
logger.error(f"Optimization suggestions generation failed: {e}")
return []
def _extract_template_variables(self, template: str) -> List[str]:
"""提取模板变量"""
pattern = r'\{(\w+)\}'
variables = re.findall(pattern, template)
return list(set(variables)) # 去重
async def create_custom_template(
self,
description: str,
examples: List[Dict[str, Any]],
constraints: Optional[List[str]] = None
) -> Dict[str, Any]:
"""创建自定义模板"""
examples_text = "\n".join([
f"Example {i+1}:\nInput: {ex.get('input', '')}\nOutput: {ex.get('output', '')}"
for i, ex in enumerate(examples[:3]) # 最多3个示例
])
constraints_text = ""
if constraints:
constraints_text = "\nConstraints:\n" + "\n".join(f"- {c}" for c in constraints)
generation_prompt = f"""
Create a robust prompt template for the following task:
TASK DESCRIPTION:
{description}
EXAMPLES:
{examples_text}
{constraints_text}
Create a reusable template with placeholders for variables in {{curly_braces}}.
Include clear instructions and structure for best results.
Return ONLY the template text, no explanations.
"""
try:
response = await self.llm.ainvoke(generation_prompt)
template = response.content.strip()
# 验证模板格式
variables = self._extract_template_variables(template)
return {
"success": True,
"template": template,
"variables": variables,
"description": description,
"example_count": len(examples)
}
except Exception as e:
logger.error(f"Template creation failed: {e}")
return {
"success": False,
"error": str(e)
}
async def evaluate_prompt_effectiveness(
self,
prompt: str,
expected_output: str,
actual_output: str
) -> Dict[str, Any]:
"""评估提示词效果"""
evaluation_prompt = f"""
Evaluate the effectiveness of the following prompt:
PROMPT:
{prompt}
EXPECTED OUTPUT:
{expected_output}
ACTUAL OUTPUT:
{actual_output}
Please rate on a scale of 1-10 for:
6. Clarity of instructions
7. Specificity and detail
8. Alignment with expected output
9. Potential for improvement
Also provide specific suggestions for improving the prompt.
"""
try:
response = await self.llm.ainvoke(evaluation_prompt)
# 尝试提取评分
evaluation_text = response.content.strip()
# 查找评分
scores = {}
score_pattern = r'(\d+)\s*[\.:]\s*(?:Clarity|Specificity|Alignment|Improvement)'
matches = re.findall(score_pattern, evaluation_text, re.IGNORECASE)
if matches:
scores = {
"clarity": int(matches[0]) if len(matches) > 0 else 0,
"specificity": int(matches[1]) if len(matches) > 1 else 0,
"alignment": int(matches[2]) if len(matches) > 2 else 0,
"improvement_potential": int(matches[3]) if len(matches) > 3 else 0,
}
return {
"success": True,
"evaluation": evaluation_text,
"scores": scores,
"average_score": sum(scores.values()) / len(scores) if scores else 0
}
except Exception as e:
logger.error(f"Prompt evaluation failed: {e}")
return {
"success": False,
"error": str(e)
}
4.9 主API应用 (src/main.py)
from fastapi import FastAPI, Depends, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
import logging
from src.core.config import settings
from src.core.database import engine, Base
from src.api.routes import agents, knowledge_base, mcp, prompts
from src.services.multi_agent_service import MultiAgentService
from src.services.mcp_service import MCPServiceRegistry
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# 全局服务实例
multi_agent_service = None
mcp_registry = None
@asynccontextmanager
async def lifespan(app: FastAPI):
"""应用生命周期管理"""
# 启动时
logger.info("Starting Agent Platform...")
# 创建数据库表
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
# 初始化服务
global multi_agent_service, mcp_registry
multi_agent_service = MultiAgentService()
mcp_registry = MCPServiceRegistry()
# 注册默认MCP服务
for name, url in settings.MCP_SERVICES.items():
mcp_registry.register_service(name, url)
logger.info("Agent Platform started successfully")
yield
# 关闭时
logger.info("Shutting down Agent Platform...")
if mcp_registry:
for client in mcp_registry.services.values():
await client.close()
# 创建FastAPI应用
app = FastAPI(
title=settings.PROJECT_NAME,
version="1.0.0",
lifespan=lifespan
)
# 配置CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 依赖注入函数
def get_multi_agent_service():
return multi_agent_service
def get_mcp_registry():
return mcp_registry
# 包含路由
app.include_router(
agents.router,
prefix=f"{settings.API_V1_STR}/agents",
tags=["agents"]
)
app.include_router(
knowledge_base.router,
prefix=f"{settings.API_V1_STR}/knowledge",
tags=["knowledge"]
)
app.include_router(
mcp.router,
prefix=f"{settings.API_V1_STR}/mcp",
tags=["mcp"]
)
app.include_router(
prompts.router,
prefix=f"{settings.API_V1_STR}/prompts",
tags=["prompts"]
)
@app.get("/")
async def root():
return {
"message": "Welcome to Agent Platform",
"version": "1.0.0",
"docs": "/docs",
"health": "/health"
}
@app.get("/health")
async def health_check():
return {
"status": "healthy",
"services": {
"database": "connected",
"mcp_registry": "initialized" if mcp_registry else "not_initialized",
"multi_agent": "initialized" if multi_agent_service else "not_initialized"
}
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"src.main:app",
host="0.0.0.0",
port=8000,
reload=True
)
4.10 Docker Compose配置
version: '3.8'
services:
postgres:
image: postgres:15-alpine
environment:
POSTGRES_USER: agent_user
POSTGRES_PASSWORD: agent_password
POSTGRES_DB: agent_platform
ports:
- "5432:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U agent_user"]
interval: 10s
timeout: 5s
retries: 5
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 5s
retries: 5
qdrant:
image: qdrant/qdrant:latest
ports:
- "6333:6333"
- "6334:6334"
volumes:
- qdrant_storage:/qdrant/storage
api:
build: .
ports:
- "8000:8000"
environment:
DATABASE_URL: postgresql://agent_user:agent_password@postgres:5432/agent_platform
REDIS_URL: redis://redis:6379/0
OPENAI_API_KEY: ${OPENAI_API_KEY}
QDRANT_HOST: qdrant
depends_on:
postgres:
condition: service_healthy
redis:
condition: service_healthy
qdrant:
condition: service_started
volumes:
- ./src:/app/src
command: uvicorn src.main:app --host 0.0.0.0 --port 8000 --reload
mcp-web-search:
image: python:3.11-slim
ports:
- "8001:8000"
volumes:
- ./mcp_services/web_search:/app
working_dir: /app
command: python service.py
environment:
SERPAPI_API_KEY: ${SERPAPI_API_KEY}
mcp-calculator:
image: python:3.11-slim
ports:
- "8002:8000"
volumes:
- ./mcp_services/calculator:/app
working_dir: /app
command: python service.py
volumes:
postgres_data:
redis_data:
qdrant_storage:
5. 使用示例与评测
5.1 智能体创建示例
# 创建多智能体系统
from src.services.multi_agent_service import MultiAgentService
service = MultiAgentService()
# 执行复杂任务
result = await service.execute_workflow(
workflow_type="research_report",
task_description="人工智能在医疗诊断中的应用现状和未来趋势",
parameters={
"depth": "comprehensive",
"include_sources": True,
"format": "academic"
}
)
print(f"任务成功: {result['success']}")
print(f"质量评分: {result['quality_score']}")
print(f"研究报告:\n{result['final_output']['report_content'][:500]}...")
5.2 知识库使用示例
# 上传文档并自动生成摘要
from src.services.knowledge_service import KnowledgeService
kb_service = KnowledgeService()
# 上传文档
doc_ids = await kb_service.add_documents([
{
"title": "AI in Healthcare",
"content": "人工智能在医疗领域...",
"source": "research_paper.pdf"
}
])
# 生成摘要
summary = await kb_service.generate_document_summary(
document_id=doc_ids[0],
summary_type="detailed"
)
# 搜索相似内容
search_results = await kb_service.search_knowledge(
query="医疗人工智能诊断",
top_k=5
)
5.3 MCP服务接入示例
# 使用MCP服务
from src.services.mcp_service import MCPServiceRegistry
registry = MCPServiceRegistry()
web_search = registry.get_service("web_search")
# 调用MCP服务
result = await web_search.invoke(
method="search",
params={"query": "latest AI developments", "num_results": 5}
)
6. 评测总结
6.1 平台优势
- 全流程覆盖: 从智能体创建、调试到部署的全流程支持
- 多智能体协作: 内置研究、写作、评审智能体协同工作
- 知识库智能管理: 自动摘要生成和向量检索
- 提示词优化: 自动生成和优化提示词模板
- MCP集成: 灵活的模型上下文协议服务接入
- 可扩展架构: 模块化设计,易于扩展新功能
6.2 与竞品对比
| 特性 | ModelEngine | Dify | Coze | Versatile |
|---|---|---|---|---|
| 多智能体协作 | ✅ 内置 | ⚠️ 有限 | ⚠️ 有限 | ⚠️ 有限 |
| 知识库自动摘要 | ✅ | ⚠️ 手动 | ❌ | ⚠️ 基础 |
| 提示词自动优化 | ✅ | ⚠️ 基础 | ❌ | ⚠️ 基础 |
| MCP服务支持 | ✅ | ❌ | ❌ | ❌ |
| 可视化编排 | ⚠️ 开发中 | ✅ | ✅ | ✅ |
| 部署灵活性 | ✅ | ✅ | ⚠️ 云优先 | ⚠️ 云优先 |
6.3 改进建议
- 增强可视化界面: 添加智能体编排的图形化界面
- 性能优化: 实现智能体推理的批处理和缓存
- 监控与日志: 增强分布式追踪和性能监控
- 安全增强: 添加API密钥管理和访问控制
- 模板市场: 建立提示词和智能体模板共享平台
7. 部署指南
7.1 环境准备
# 克隆项目
git clone https://github.com/yourusername/agent-platform.git
cd agent-platform
# 复制环境变量
cp .env.example .env
# 编辑.env文件,配置API密钥等
# 安装依赖
pip install -r requirements.txt
7.2 使用Docker部署
# 启动所有服务
docker-compose up -d
# 查看日志
docker-compose logs -f api
# 停止服务
docker-compose down
7.3 API使用
# 创建智能体
curl -X POST "http://localhost:8000/api/v1/agents/" \
-H "Content-Type: application/json" \
-d '{
"name": "研究助手",
"description": "专业的研究分析智能体",
"agent_type": "research"
}'
# 执行任务
curl -X POST "http://localhost:8000/api/v1/agents/execute" \
-H "Content-Type: application/json" \
-d '{
"agent_id": "agent_123",
"input": "分析人工智能在教育的应用"
}'
8. 结论
本项目实现了一个完整的智能体平台,展示了从创建到部署的全流程。通过多智能体协作、知识库自动管理、提示词优化和MCP服务集成等特性,提供了强大的AI应用开发能力。与竞品相比,本平台在多智能体协作和知识库智能处理方面具有明显优势,为开发者提供了更加灵活和强大的工具集。
平台采用模块化设计,易于扩展和维护,可以作为企业级AI应用开发的基础平台。未来可以通过添加可视化编排界面、增强监控和安全功能,进一步提升平台的竞争力。
更多推荐
所有评论(0)