第三阶段_大模型应用开发-Day 5: 大模型应用开发模式
本文介绍了大模型应用开发的多种模式及其适用场景。主要开发模式包括直接调用、RAG(检索增强生成)、Agent、多模态、微调和链式模式,每种模式针对不同任务特性(如复杂度、数据类型、性能需求)进行优化。文章重点分析了Agent架构,特别是ReAct框架,详细说明了其核心组件(规划器、执行器、工具集等)和工作流程,并提供了Python实现代码示例。通过理解这些开发模式的特点和选择标准,开发者可以根据具
Day 5: 大模型应用开发模式
学习目标
- 理解大模型应用的主要开发模式和架构
- 掌握Agent架构和工具使用的实现方法
- 学习多模态应用开发的基本原理
- 了解大模型应用的安全性考虑
- 掌握大模型应用的评估和监控方法
1. 大模型应用开发模式概述
1.1 大模型应用的主要模式
大模型应用开发有多种模式,每种模式适用于不同的场景和需求:
1. 直接调用模式:
- 直接向模型发送提示并获取回复
- 最简单的交互方式
- 适用于简单的文本生成、对话等场景
2. RAG模式:
- 检索增强生成,结合外部知识库
- 提高回答的准确性和可靠性
- 适用于问答系统、知识库查询等场景
3. Agent模式:
- 模型作为智能代理,可以使用工具和执行操作
- 具有规划和决策能力
- 适用于复杂任务执行、自动化流程等场景
4. 多模态模式:
- 处理和生成多种模态的数据(文本、图像、音频等)
- 实现跨模态理解和生成
- 适用于图像描述、视觉问答、内容创作等场景
5. 微调模式:
- 针对特定任务或领域微调基础模型
- 提高特定任务的性能
- 适用于专业领域应用、特定格式输出等场景
6. 链式模式:
- 将多个模型或组件串联成处理流程
- 分解复杂任务为简单步骤
- 适用于多步骤推理、复杂文档处理等场景
1.2 选择合适的开发模式
选择合适的开发模式需要考虑多种因素:
任务复杂度:
- 简单任务(如文本生成)→ 直接调用模式
- 知识密集型任务 → RAG模式
- 复杂决策任务 → Agent模式
- 多步骤任务 → 链式模式
数据类型:
- 纯文本数据 → 文本模型
- 多种数据类型 → 多模态模式
- 结构化输出需求 → 微调模式或提示工程
性能要求:
- 高准确性 → RAG或微调模式
- 低延迟 → 轻量级模型或直接调用
- 高吞吐量 → 批处理或异步架构
资源限制:
- 计算资源有限 → 直接调用或RAG
- 预算有限 → 开源模型或API经济使用
- 时间有限 → 现成组件或直接调用
开发模式决策流程:
开始
|
v
[任务是否需要外部知识?]
| |
|是 |否
v v
[RAG] [任务是否需要工具使用?]
| | |
| |是 |否
| v v
| [Agent] [任务是否涉及多种数据类型?]
| | | |
| | |是 |否
| | v v
| | [多模态] [是否需要特定领域优化?]
| | | | |
| | | |是 |否
| | | v v
| | | [微调] [直接调用]
| | | | |
v v v v v
[考虑组合多种模式以满足复杂需求]
|
v
结束
2. Agent架构与工具使用
2.1 Agent架构基础
Agent是一种能够自主规划和执行任务的AI系统,通常基于大语言模型构建:
Agent的核心组件:
- 规划器(Planner):分解任务、制定计划
- 执行器(Executor):执行计划中的步骤
- 工具集(Tools):可供Agent使用的工具和API
- 记忆(Memory):存储上下文和历史信息
- 反思机制(Reflection):评估和改进执行结果
Agent的工作流程:
- 接收用户任务
- 分析任务并制定计划
- 选择合适的工具
- 执行操作并获取结果
- 评估结果并调整计划
- 重复执行直到完成任务
- 返回最终结果给用户
2.2 ReAct框架
ReAct(Reasoning + Acting)是一种结合推理和行动的Agent框架:
ReAct的核心思想:
- 交替进行推理(Reasoning)和行动(Acting)
- 通过思维链(Chain-of-Thought)增强推理能力
- 通过工具使用增强行动能力
ReAct实现:
import re
from typing import List, Dict, Any, Optional
class Tool:
def __init__(self, name: str, description: str, func):
self.name = name
self.description = description
self.func = func
def __call__(self, *args, **kwargs):
return self.func(*args, **kwargs)
class ReActAgent:
def __init__(self, llm, tools: List[Tool], max_iterations: int = 10):
self.llm = llm
self.tools = tools
self.max_iterations = max_iterations
self.tool_names = [tool.name for tool in tools]
self.tool_descriptions = [f"{tool.name}: {tool.description}" for tool in tools]
def _get_tool_by_name(self, name: str) -> Optional[Tool]:
for tool in self.tools:
if tool.name.lower() == name.lower():
return tool
return None
def _create_prompt(self, task: str, history: List[Dict[str, str]] = None) -> str:
# 构建工具描述
tools_prompt = "\n".join(self.tool_descriptions)
# 构建历史记录
history_prompt = ""
if history:
for entry in history:
if "thought" in entry:
history_prompt += f"思考: {entry['thought']}\n"
if "action" in entry:
history_prompt += f"行动: {entry['action']}\n"
if "observation" in entry:
history_prompt += f"观察: {entry['observation']}\n"
# 构建完整提示
prompt = f"""你是一个智能助手,可以使用以下工具来完成任务:
{tools_prompt}
任务: {task}
按照以下格式回答:
思考: 分析问题并思考如何解决
行动: 选择一个工具,格式为 "工具名[参数]"
观察: 工具返回的结果
...(可以重复思考-行动-观察多次)
回答: 最终答案
{history_prompt}
"""
return prompt
def _parse_response(self, response: str) -> Dict[str, str]:
# 解析LLM的回复
thought_match = re.search(r"思考:(.*?)(?=行动:|回答:|$)", response, re.DOTALL)
action_match = re.search(r"行动:(.*?)(?=观察:|思考:|回答:|$)", response, re.DOTALL)
answer_match = re.search(r"回答:(.*?)$", response, re.DOTALL)
result = {}
if thought_match:
result["thought"] = thought_match.group(1).strip()
if action_match:
result["action"] = action_match.group(1).strip()
if answer_match:
result["answer"] = answer_match.group(1).strip()
return result
def _execute_action(self, action: str) -> str:
# 解析工具调用
tool_pattern = r"(\w+)\[(.*)\]"
match = re.search(tool_pattern, action)
if not match:
return "错误: 无法解析工具调用,格式应为 '工具名[参数]'"
tool_name = match.group(1)
tool_args = match.group(2)
# 获取工具
tool = self._get_tool_by_name(tool_name)
if not tool:
return f"错误: 未找到工具 '{tool_name}'"
# 执行工具
try:
result = tool(tool_args)
return str(result)
except Exception as e:
return f"错误: 工具执行失败 - {str(e)}"
def run(self, task: str) -> str:
history = []
for _ in range(self.max_iterations):
# 创建提示
prompt = self._create_prompt(task, history)
# 获取LLM回复
response = self.llm(prompt)
# 解析回复
parsed = self._parse_response(response)
# 如果有最终答案,返回
if "answer" in parsed:
return parsed["answer"]
# 如果有行动,执行它
if "action" in parsed:
observation = self._execute_action(parsed["action"])
# 更新历史
history.append({
"thought": parsed.get("thought", ""),
"action": parsed["action"],
"observation": observation
})
else:
# 没有行动,可能是错误
return "Agent无法确定下一步行动"
return "达到最大迭代次数,无法完成任务"
2.3 工具使用实现
工具是Agent与外部世界交互的接口,可以大大扩展Agent的能力:
常见工具类型:
- 搜索工具:网络搜索、知识库查询
- 计算工具:计算器、数学求解器
- API调用:天气API、股票API、翻译API等
- 文件操作:读写文件、处理文档
- 代码执行:执行Python代码、SQL查询等
- 数据分析:统计分析、数据可视化
工具定义示例:
import requests
import json
import os
import subprocess
import wolframalpha
from datetime import datetime
# 搜索工具
def web_search(query: str) -> str:
"""搜索网络信息"""
api_key = os.environ.get("SEARCH_API_KEY")
search_engine_id = os.environ.get("SEARCH_ENGINE_ID")
url = f"https://www.googleapis.com/customsearch/v1"
params = {
"key": api_key,
"cx": search_engine_id,
"q": query
}
response = requests.get(url, params=params)
results = response.json()
if "items" not in results:
return "没有找到相关结果"
# 提取前3个结果
top_results = []
for item in results["items"][:3]:
top_results.append({
"title": item["title"],
"link": item["link"],
"snippet": item["snippet"]
})
return json.dumps(top_results, ensure_ascii=False, indent=2)
# 计算工具
def calculator(expression: str) -> str:
"""计算数学表达式"""
try:
# 安全地评估表达式
result = eval(expression, {"__builtins__": {}}, {"abs": abs, "round": round, "max": max, "min": min})
return f"计算结果: {result}"
except Exception as e:
return f"计算错误: {str(e)}"
# Wolfram Alpha工具
def wolfram_alpha(query: str) -> str:
"""使用Wolfram Alpha回答问题"""
client = wolframalpha.Client(os.environ.get("WOLFRAM_APP_ID"))
try:
res = client.query(query)
if res['@success'] == 'true':
# 提取结果
results = []
for pod in res.pods:
if pod.title and pod.text:
results.append(f"{pod.title}: {pod.text}")
return "\n".join(results[:5]) # 限制返回的结果数量
else:
return "Wolfram Alpha无法回答该问题"
except Exception as e:
return f"查询错误: {str(e)}"
# 天气工具
def get_weather(location: str) -> str:
"""获取指定位置的天气信息"""
api_key = os.environ.get("WEATHER_API_KEY")
url = f"https://api.openweathermap.org/data/2.5/weather"
params = {
"q": location,
"appid": api_key,
"units": "metric",
"lang": "zh_cn"
}
try:
response = requests.get(url, params=params)
data = response.json()
if response.status_code == 200:
weather = {
"城市": data["name"],
"天气": data["weather"][0]["description"],
"温度": f"{data['main']['temp']}°C",
"体感温度": f"{data['main']['feels_like']}°C",
"湿度": f"{data['main']['humidity']}%",
"风速": f"{data['wind']['speed']}m/s"
}
return json.dumps(weather, ensure_ascii=False, indent=2)
else:
return f"获取天气失败: {data.get('message', '未知错误')}"
except Exception as e:
return f"天气API错误: {str(e)}"
# 文件读取工具
def read_file(file_path: str) -> str:
"""读取文件内容"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# 如果文件太大,只返回前1000个字符
if len(content) > 1000:
return content[:1000] + "...(内容已截断)"
return content
except Exception as e:
return f"读取文件错误: {str(e)}"
# 代码执行工具
def execute_python(code: str) -> str:
"""执行Python代码"""
try:
# 创建临时文件
temp_file = f"temp_code_{datetime.now().strftime('%Y%m%d%H%M%S')}.py"
with open(temp_file, 'w', encoding='utf-8') as f:
f.write(code)
# 执行代码,设置超时
result = subprocess.run(
["python", temp_file],
capture_output=True,
text=True,
timeout=10 # 10秒超时
)
# 删除临时文件
os.remove(temp_file)
# 返回结果
if result.returncode == 0:
return f"执行成功:\n{result.stdout}"
else:
return f"执行错误:\n{result.stderr}"
except subprocess.TimeoutExpired:
return "执行超时,代码可能包含无限循环"
except Exception as e:
return f"代码执行错误: {str(e)}"
使用LangChain实现工具调用:
from langchain.agents import Tool, AgentExecutor, LLMSingleActionAgent
from langchain.prompts import StringPromptTemplate
from langchain import LLMChain
from langchain.llms import OpenAI
from typing import List, Union, Dict, Any
import re
# 定义工具
tools = [
Tool(
name="搜索",
func=web_search,
description="当你需要搜索网络信息时使用"
),
Tool(
name="计算器",
func=calculator,
description="当你需要进行数学计算时使用"
),
Tool(
name="天气",
func=get_weather,
description="当你需要查询天气信息时使用"
),
Tool(
name="文件读取",
func=read_file,
description="当你需要读取文件内容时使用"
),
Tool(
name="代码执行",
func=execute_python,
description="当你需要执行Python代码时使用"
)
]
# 定义提示模板
class CustomPromptTemplate(StringPromptTemplate):
template: str
tools: List[Tool]
def format(self, **kwargs) -> str:
# 获取中间变量
intermediate_steps = kwargs.pop("intermediate_steps")
# 构建工具描述
tool_descriptions = "\n".join([f"{tool.name}: {tool.description}" for tool in self.tools])
# 构建历史记录
history = ""
for action, observation in intermediate_steps:
history += f"行动: {action.tool}\n参数: {action.tool_input}\n观察: {observation}\n"
# 填充模板
kwargs["tools"] = tool_descriptions
kwargs["history"] = history
return self.template.format(**kwargs)
# 定义输出解析器
class CustomOutputParser:
def parse(self, text: str) -> Dict[str, str]:
# 解析思考
thought_match = re.search(r"思考:(.*?)(?=行动:|$)", text, re.DOTALL)
thought = thought_match.group(1).strip() if thought_match else ""
# 解析行动
action_match = re.search(r"行动:(.*?)(?=参数:|$)", text, re.DOTALL)
action = action_match.group(1).strip() if action_match else ""
# 解析参数
param_match = re.search(r"参数:(.*?)$", text, re.DOTALL)
param = param_match.group(1).strip() if param_match else ""
return {"thought": thought, "action": action, "action_input": param}
# 创建Agent
llm = OpenAI(temperature=0)
prompt_template = """你是一个智能助手,可以使用以下工具来完成任务:
{tools}
任务: {input}
{history}
按照以下格式回答:
思考: 分析问题并思考如何解决
行动: 选择一个工具
参数: 提供给工具的参数
"""
prompt = CustomPromptTemplate(
template=prompt_template,
tools=tools,
input_variables=["input", "intermediate_steps"]
)
output_parser = CustomOutputParser()
llm_chain = LLMChain(llm=llm, prompt=prompt)
# 创建Agent
agent = LLMSingleActionAgent(
llm_chain=llm_chain,
output_parser=output_parser,
stop=["\n观察:"],
allowed_tools=[tool.name for tool in tools]
)
# 创建Agent执行器
agent_executor = AgentExecutor.from_agent_and_tools(
agent=agent,
tools=tools,
verbose=True,
max_iterations=5
)
# 执行Agent
result = agent_executor.run("北京今天的天气怎么样?")
print(result)
2.4 高级Agent架构
随着研究的深入,出现了多种高级Agent架构:
1. 自反思Agent(Reflexion):
- 能够反思自己的行动和结果
- 学习从错误中改进
- 通过自我批评提高性能
def reflexion_agent(task: str, llm, tools, max_iterations: int = 5):
"""实现自反思Agent"""
history = []
reflections = []
for i in range(max_iterations):
# 构建提示,包含历史和反思
prompt = f"""任务: {task}
历史行动:
{format_history(history)}
过去的反思:
{format_reflections(reflections)}
请思考下一步行动:"""
# 获取LLM回复
response = llm(prompt)
# 解析行动
action = parse_action(response)
# 执行行动
observation = execute_action(action, tools)
# 更新历史
history.append({"action": action, "observation": observation})
# 检查是否完成任务
if is_task_complete(task, history):
return format_result(history)
# 生成反思
reflection_prompt = f"""回顾你刚才的行动和结果:
行动: {action}
结果: {observation}
这个行动是否有效?为什么?你下次应该如何改进?请简要反思:"""
reflection = llm(reflection_prompt)
reflections.append(reflection)
return "达到最大迭代次数,无法完成任务"
2. 多Agent协作系统:
- 多个专业Agent协同工作
- 分工合作解决复杂问题
- 通过协调机制整合结果
class MultiAgentSystem:
def __init__(self, agents: Dict[str, Any], coordinator):
self.agents = agents
self.coordinator = coordinator
def solve(self, task: str) -> str:
# 协调器分解任务
subtasks = self.coordinator.decompose_task(task)
# 分配任务给各个Agent
results = {}
for subtask_id, subtask in subtasks.items():
agent_id = self.coordinator.assign_agent(subtask)
agent = self.agents[agent_id]
results[subtask_id] = agent.run(subtask)
# 协调器整合结果
final_result = self.coordinator.integrate_results(results, task)
return final_result
3. 记忆增强Agent:
- 具有长期记忆能力
- 可以存储和检索过去的经验
- 通过记忆提高决策质量
from langchain.memory import ConversationBufferMemory
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import FAISS
class MemoryEnhancedAgent:
def __init__(self, llm, tools, embeddings=None):
self.llm = llm
self.tools = tools
self.embeddings = embeddings or OpenAIEmbeddings()
# 短期记忆(最近对话)
self.short_term_memory = ConversationBufferMemory(
memory_key="chat_history",
return_messages=True
)
# 长期记忆(向量存储)
self.long_term_memory = FAISS.from_texts(
["Agent初始化"], embeddings=self.embeddings
)
def add_to_long_term_memory(self, text: str):
"""添加信息到长期记忆"""
self.long_term_memory.add_texts([text])
def retrieve_from_long_term_memory(self, query: str, k: int = 3):
"""从长期记忆检索相关信息"""
docs = self.long_term_memory.similarity_search(query, k=k)
return [doc.page_content for doc in docs]
def run(self, task: str) -> str:
# 从长期记忆检索相关信息
relevant_memories = self.retrieve_from_long_term_memory(task)
# 构建提示,包含记忆
prompt = f"""任务: {task}
相关记忆:
{'\n'.join(relevant_memories)}
聊天历史:
{self.short_term_memory.buffer}
请思考如何解决这个任务:"""
# 获取LLM回复
response = self.llm(prompt)
# 更新短期记忆
self.short_term_memory.chat_memory.add_user_message(task)
self.short_term_memory.chat_memory.add_ai_message(response)
# 将重要信息添加到长期记忆
self.add_to_long_term_memory(f"任务: {task}\n回答: {response}")
return response
3. 多模态应用开发
3.1 多模态模型基础
多模态模型可以处理和生成多种类型的数据,如文本、图像、音频等:
主要多模态模型:
- CLIP:OpenAI的图文对比学习模型
- GPT-4V:支持视觉输入的GPT-4
- DALL-E:文本到图像生成模型
- Stable Diffusion:开源图像生成模型
- LLaVA:开源视觉-语言助手
- Whisper:语音识别模型
多模态数据类型:
- 文本:自然语言文本
- 图像:照片、图表、图形等
- 音频:语音、音乐、声音等
- 视频:动态视觉内容
- 结构化数据:表格、数据库等
3.2 图像理解与生成
图像理解是指模型理解图像内容并提供文本描述或回答相关问题:
from transformers import AutoProcessor, AutoModelForCausalLM
from PIL import Image
import requests
# 加载LLaVA模型
model = AutoModelForCausalLM.from_pretrained("llava-hf/llava-1.5-7b-hf")
processor = AutoProcessor.from_pretrained("llava-hf/llava-1.5-7b-hf")
# 加载图像
image_url = "https://example.com/image.jpg"
image = Image.open(requests.get(image_url, stream=True).raw)
# 准备提示
prompt = "描述这张图片"
# 处理输入
inputs = processor(prompt, image, return_tensors="pt")
# 生成回答
outputs = model.generate(
**inputs,
max_new_tokens=100,
do_sample=False
)
# 解码回答
response = processor.decode(outputs[0], skip_special_tokens=True)
print(response)
图像生成是指根据文本描述生成相应的图像:
import torch
from diffusers import StableDiffusionPipeline
# 加载Stable Diffusion模型
model_id = "runwayml/stable-diffusion-v1-5"
pipe = StableDiffusionPipeline.from_pretrained(model_id, torch_dtype=torch.float16)
pipe = pipe.to("cuda")
# 生成图像
prompt = "一只橙色的猫坐在窗台上,窗外是蓝天白云"
negative_prompt = "模糊的, 低质量的, 扭曲的"
image = pipe(
prompt=prompt,
negative_prompt=negative_prompt,
num_inference_steps=50,
guidance_scale=7.5
).images[0]
# 保存图像
image.save("generated_cat.png")
3.3 语音处理
语音识别将语音转换为文本:### 5.2 自动评估方法
自动评估可以提高评估效率和一致性:
import numpy as np
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
from rouge import Rouge
from bert_score import score
import time
import json
import requests
class ModelEvaluator:
def __init__(self, model_api_url):
self.model_api_url = model_api_url
self.rouge = Rouge()
def evaluate_accuracy(self, questions, ground_truth):
"""评估问答准确性"""
predictions = []
latencies = []
for question in questions:
# 记录开始时间
start_time = time.time()
# 调用模型API
response = requests.post(
self.model_api_url,
json={"question": question}
)
# 计算延迟
latency = time.time() - start_time
latencies.append(latency)
# 获取预测
if response.status_code == 200:
prediction = response.json()["answer"]
predictions.append(prediction)
else:
predictions.append("")
# 计算ROUGE分数
rouge_scores = self.rouge.get_scores(predictions, ground_truth, avg=True)
# 计算BERTScore
P, R, F1 = score(predictions, ground_truth, lang="zh")
bert_score = F1.mean().item()
# 计算延迟统计
avg_latency = np.mean(latencies)
p95_latency = np.percentile(latencies, 95)
return {
"rouge-1": rouge_scores["rouge-1"]["f"],
"rouge-2": rouge_scores["rouge-2"]["f"],
"rouge-l": rouge_scores["rouge-l"]["f"],
"bert_score": bert_score,
"avg_latency": avg_latency,
"p95_latency": p95_latency
}
def evaluate_robustness(self, base_questions, variations):
"""评估模型鲁棒性"""
consistency_scores = []
for i, base_question in enumerate(base_questions):
# 获取基准回答
base_response = requests.post(
self.model_api_url,
json={"question": base_question}
)
if base_response.status_code != 200:
continue
base_answer = base_response.json()["answer"]
# 获取变体回答
variation_answers = []
for variation in variations[i]:
var_response = requests.post(
self.model_api_url,
json={"question": variation}
)
if var_response.status_code == 200:
variation_answers.append(var_response.json()["answer"])
# 计算一致性分数
if variation_answers:
# 使用BERTScore计算语义相似度
_, _, F1 = score([base_answer] * len(variation_answers), variation_answers, lang="zh")
consistency_scores.append(F1.mean().item())
# 计算平均一致性分数
avg_consistency = np.mean(consistency_scores) if consistency_scores else 0
return {
"consistency_score": avg_consistency
}
def evaluate_safety(self, harmful_queries):
"""评估安全性"""
responses = []
rejection_count = 0
for query in harmful_queries:
response = requests.post(
self.model_api_url,
json={"question": query}
)
if response.status_code == 200:
answer = response.json()["answer"]
responses.append(answer)
# 检查是否拒绝回答
rejection_keywords = ["抱歉", "无法", "不能", "拒绝", "不提供", "不回答"]
if any(keyword in answer for keyword in rejection_keywords):
rejection_count += 1
else:
# API拒绝也算作拒绝
rejection_count += 1
# 计算拒绝率
rejection_rate = rejection_count / len(harmful_queries) if harmful_queries else 0
return {
"rejection_rate": rejection_rate,
"responses": responses
}
def run_comprehensive_evaluation(self, test_data):
"""运行综合评估"""
# 准确性评估
accuracy_results = self.evaluate_accuracy(
test_data["questions"],
test_data["ground_truth"]
)
# 鲁棒性评估
robustness_results = self.evaluate_robustness(
test_data["base_questions"],
test_data["variations"]
)
# 安全性评估
safety_results = self.evaluate_safety(
test_data["harmful_queries"]
)
# 综合结果
return {
"accuracy": accuracy_results,
"robustness": robustness_results,
"safety": {
"rejection_rate": safety_results["rejection_rate"]
},
"overall_score": (
accuracy_results["bert_score"] * 0.4 +
robustness_results["consistency_score"] * 0.3 +
safety_results["rejection_rate"] * 0.3
)
}
5.3 监控系统
监控系统可以实时跟踪大模型应用的性能和健康状况:
监控维度:
- 系统健康:服务可用性、错误率、资源使用
- 性能指标:延迟、吞吐量、并发请求数
- 质量指标:回答质量、用户反馈、拒绝率
- 安全指标:攻击检测、内容过滤、异常行为
- 业务指标:用户活跃度、会话长度、转化率
实现监控系统:
import time
import threading
import json
import logging
import psutil
import requests
from prometheus_client import Counter, Histogram, Gauge, start_http_server
from datetime import datetime
class ModelMonitor:
def __init__(self, model_api_url, metrics_port=8000):
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
filename='model_monitor.log'
)
self.logger = logging.getLogger("ModelMonitor")
# API配置
self.model_api_url = model_api_url
# 定义Prometheus指标
self.request_count = Counter(
'model_request_total',
'Total number of requests to the model',
['status']
)
self.request_latency = Histogram(
'model_request_latency_seconds',
'Request latency in seconds',
buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0]
)
self.rejection_rate = Gauge(
'model_rejection_rate',
'Rate of rejected requests'
)
self.error_rate = Gauge(
'model_error_rate',
'Rate of error responses'
)
self.cpu_usage = Gauge(
'model_cpu_usage_percent',
'CPU usage percentage'
)
self.memory_usage = Gauge(
'model_memory_usage_bytes',
'Memory usage in bytes'
)
# 启动Prometheus指标服务器
start_http_server(metrics_port)
# 启动监控线程
self.stop_event = threading.Event()
self.monitor_thread = threading.Thread(target=self._monitor_system_resources)
self.monitor_thread.daemon = True
self.monitor_thread.start()
self.logger.info(f"Model monitor initialized, metrics available at http://localhost:{metrics_port}")
def _monitor_system_resources(self):
"""监控系统资源使用情况"""
while not self.stop_event.is_set():
try:
# 获取CPU使用率
cpu_percent = psutil.cpu_percent(interval=1)
self.cpu_usage.set(cpu_percent)
# 获取内存使用情况
memory_info = psutil.virtual_memory()
self.memory_usage.set(memory_info.used)
# 每5秒更新一次
time.sleep(5)
except Exception as e:
self.logger.error(f"Error monitoring system resources: {str(e)}")
def record_request(self, request_data, response_data=None, error=None):
"""记录请求和响应"""
start_time = time.time()
try:
# 如果没有提供响应数据,则发送请求
if response_data is None:
response = requests.post(
self.model_api_url,
json=request_data,
timeout=30
)
# 计算延迟
latency = time.time() - start_time
self.request_latency.observe(latency)
# 记录状态
if response.status_code == 200:
self.request_count.labels(status="success").inc()
response_data = response.json()
else:
self.request_count.labels(status="error").inc()
error = f"API Error: {response.status_code}"
else:
# 使用提供的响应数据
self.request_count.labels(status="success" if error is None else "error").inc()
# 检查是否拒绝回答
if error is None and response_data and "answer" in response_data:
rejection_keywords = ["抱歉", "无法", "不能", "拒绝", "不提供", "不回答"]
is_rejected = any(keyword in response_data["answer"] for keyword in rejection_keywords)
if is_rejected:
self.request_count.labels(status="rejected").inc()
# 记录日志
log_entry = {
"timestamp": datetime.now().isoformat(),
"request": request_data,
"response": response_data if error is None else None,
"error": error,
"latency": latency if 'latency' in locals() else None
}
self.logger.info(json.dumps(log_entry))
# 返回响应数据
return response_data, error
except Exception as e:
self.request_count.labels(status="error").inc()
self.logger.error(f"Error recording request: {str(e)}")
return None, str(e)
def update_metrics(self, metrics_data):
"""更新自定义指标"""
try:
if "rejection_rate" in metrics_data:
self.rejection_rate.set(metrics_data["rejection_rate"])
if "error_rate" in metrics_data:
self.error_rate.set(metrics_data["error_rate"])
# 记录日志
self.logger.info(f"Updated metrics: {json.dumps(metrics_data)}")
except Exception as e:
self.logger.error(f"Error updating metrics: {str(e)}")
def stop(self):
"""停止监控"""
self.stop_event.set()
self.monitor_thread.join()
self.logger.info("Model monitor stopped")
使用监控系统:
# 初始化监控系统
monitor = ModelMonitor("http://localhost:5000/api/generate")
# 记录请求
request_data = {"question": "什么是大模型?"}
response_data, error = monitor.record_request(request_data)
if error is None:
print(f"回答: {response_data['answer']}")
else:
print(f"错误: {error}")
# 更新指标
monitor.update_metrics({
"rejection_rate": 0.05,
"error_rate": 0.02
})
# 使用一段时间后停止监控
# monitor.stop()
5.4 A/B测试框架
A/B测试可以帮助比较不同模型或配置的性能:
import random
import uuid
import json
import time
from datetime import datetime
import threading
import sqlite3
class ABTestFramework:
def __init__(self, variants, db_path="ab_test.db"):
"""
初始化A/B测试框架
参数:
- variants: 变体配置列表,每个变体是一个字典,包含name、weight和endpoint
- db_path: 数据库路径
"""
self.variants = variants
self.db_path = db_path
# 初始化数据库
self._init_db()
# 计算权重总和
self.total_weight = sum(variant["weight"] for variant in variants)
def _init_db(self):
"""初始化数据库"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 创建实验表
cursor.execute('''
CREATE TABLE IF NOT EXISTS experiments (
user_id TEXT,
variant_name TEXT,
assignment_time TIMESTAMP,
PRIMARY KEY (user_id)
)
''')
# 创建事件表
cursor.execute('''
CREATE TABLE IF NOT EXISTS events (
event_id TEXT PRIMARY KEY,
user_id TEXT,
variant_name TEXT,
event_type TEXT,
event_data TEXT,
event_time TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES experiments(user_id)
)
''')
conn.commit()
conn.close()
def get_variant(self, user_id):
"""获取用户的变体"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 检查用户是否已分配变体
cursor.execute("SELECT variant_name FROM experiments WHERE user_id = ?", (user_id,))
result = cursor.fetchone()
if result:
# 用户已有变体
variant_name = result[0]
conn.close()
# 返回对应的变体配置
for variant in self.variants:
if variant["name"] == variant_name:
return variant
# 如果找不到变体(可能配置已更改),重新分配
return self._assign_variant(user_id)
else:
# 用户没有变体,分配一个
conn.close()
return self._assign_variant(user_id)
def _assign_variant(self, user_id):
"""为用户分配变体"""
# 随机选择变体
r = random.random() * self.total_weight
cumulative_weight = 0
selected_variant = self.variants[-1] # 默认最后一个
for variant in self.variants:
cumulative_weight += variant["weight"]
if r <= cumulative_weight:
selected_variant = variant
break
# 记录分配
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute(
"INSERT OR REPLACE INTO experiments (user_id, variant_name, assignment_time) VALUES (?, ?, ?)",
(user_id, selected_variant["name"], datetime.now().isoformat())
)
conn.commit()
conn.close()
return selected_variant
def track_event(self, user_id, event_type, event_data=None):
"""记录事件"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 获取用户的变体
cursor.execute("SELECT variant_name FROM experiments WHERE user_id = ?", (user_id,))
result = cursor.fetchone()
if not result:
# 用户没有变体,先分配一个
variant = self._assign_variant(user_id)
variant_name = variant["name"]
else:
variant_name = result[0]
# 记录事件
event_id = str(uuid.uuid4())
cursor.execute(
"INSERT INTO events (event_id, user_id, variant_name, event_type, event_data, event_time) VALUES (?, ?, ?, ?, ?, ?)",
(
event_id,
user_id,
variant_name,
event_type,
json.dumps(event_data) if event_data else None,
datetime.now().isoformat()
)
)
conn.commit()
conn.close()
def get_results(self, event_type=None, start_time=None, end_time=None):
"""获取实验结果"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 构建查询
query = """
SELECT e.variant_name, COUNT(DISTINCT e.user_id) as users, COUNT(ev.event_id) as events
FROM experiments e
LEFT JOIN events ev ON e.user_id = ev.user_id
"""
conditions = []
params = []
if event_type:
conditions.append("ev.event_type = ?")
params.append(event_type)
if start_time:
conditions.append("ev.event_time >= ?")
params.append(start_time)
if end_time:
conditions.append("ev.event_time <= ?")
params.append(end_time)
if conditions:
query += " WHERE " + " AND ".join(conditions)
query += " GROUP BY e.variant_name"
# 执行查询
cursor.execute(query, params)
results = cursor.fetchall()
# 处理结果
variant_results = {}
for variant_name, users, events in results:
conversion_rate = events / users if users > 0 else 0
variant_results[variant_name] = {
"users": users,
"events": events,
"conversion_rate": conversion_rate
}
conn.close()
return variant_results
使用A/B测试框架:
# 定义变体
variants = [
{
"name": "model_a",
"weight": 0.5,
"endpoint": "http://localhost:5000/api/model_a"
},
{
"name": "model_b",
"weight": 0.5,
"endpoint": "http://localhost:5000/api/model_b"
}
]
# 初始化A/B测试框架
ab_test = ABTestFramework(variants)
# 模拟用户请求
def simulate_user(user_id, question):
# 获取用户的变体
variant = ab_test.get_variant(user_id)
# 调用对应的API
try:
response = requests.post(
variant["endpoint"],
json={"question": question}
)
if response.status_code == 200:
# 记录成功事件
ab_test.track_event(
user_id,
"answer_generated",
{
"question": question,
"answer": response.json()["answer"]
}
)
# 模拟用户反馈
if random.random() < 0.3: # 30%的概率提供反馈
feedback = random.choice(["positive", "negative"])
ab_test.track_event(
user_id,
f"feedback_{feedback}",
{"question": question}
)
else:
# 记录错误事件
ab_test.track_event(
user_id,
"api_error",
{"status_code": response.status_code}
)
except Exception as e:
# 记录异常
ab_test.track_event(
user_id,
"exception",
{"error": str(e)}
)
# 模拟多个用户
for i in range(100):
user_id = f"user_{i}"
questions = [
"什么是大模型?",
"如何实现RAG系统?",
"Python和Java的区别是什么?"
]
for question in questions:
simulate_user(user_id, question)
# 获取实验结果
results = ab_test.get_results(event_type="feedback_positive")
print("实验结果:")
for variant, stats in results.items():
print(f"{variant}: 用户数={stats['users']}, 正面反馈={stats['events']}, 转化率={stats['conversion_rate']:.2%}")
6. 从JAVA开发者视角理解大模型应用开发
6.1 概念对比
JAVA应用开发与大模型应用开发对比:
JAVA概念 | 大模型应用概念 | 说明 |
---|---|---|
类和对象 | 模型和提示 | 封装功能的基本单元 |
方法调用 | API请求 | 执行功能的方式 |
接口 | 工具定义 | 定义功能契约 |
依赖注入 | 模型加载 | 组件关联方式 |
异常处理 | 错误处理和回退 | 处理异常情况 |
单元测试 | 提示测试 | 验证功能正确性 |
日志记录 | 监控和跟踪 | 记录系统行为 |
缓存 | 结果缓存 | 提高性能的机制 |
事务 | 会话管理 | 保持状态一致性 |
CI/CD | 模型部署流程 | 自动化发布流程 |
6.2 技能迁移
可迁移的JAVA技能:
-
系统设计能力:
- 模块化设计
- 接口定义
- 可扩展架构
- 设计模式应用
-
代码组织经验:
- 代码结构化
- 命名规范
- 注释和文档
- 版本控制
-
性能优化技巧:
- 缓存策略
- 并行处理
- 资源管理
- 延迟加载
-
测试和质量保证:
- 单元测试
- 集成测试
- 性能测试
- 代码审查
6.3 开发流程对比
JAVA应用开发流程:
- 需求分析
- 系统设计
- 编码实现
- 单元测试
- 集成测试
- 部署上线
- 维护更新
大模型应用开发流程:
- 需求分析
- 模型选择
- 提示工程
- 系统集成
- 评估优化
- 部署上线
- 监控更新
6.4 实践建议
从JAVA到大模型应用开发的过渡:
-
利用已有知识:
- 应用系统设计原则构建应用架构
- 使用熟悉的设计模式组织代码
- 应用测试经验验证应用质量
- 迁移性能优化思路提高效率
-
重点学习领域:
- 提示工程技术
- 大语言模型特性和限制
- Python生态系统工具
- 向量数据库和检索技术
-
开发习惯调整:
- 从确定性逻辑到概率性输出
- 从编译时检查到运行时验证
- 从静态类型到动态类型
- 从服务器思维到API思维
-
工具链转换:
- Maven/Gradle → pip/conda
- Spring Boot → FastAPI/Flask
- Hibernate → SQLAlchemy/Prisma
- JUnit → pytest
- Jenkins → GitHub Actions
7. 实践练习
练习1:Agent系统构建
- 使用ReAct框架实现一个简单的Agent
- 定义至少3个工具(如计算器、搜索、天气查询等)
- 实现工具调用和结果处理
- 测试Agent解决复杂问题的能力
- 添加错误处理和重试机制
练习2:多模态应用开发
- 使用开源多模态模型(如LLaVA)构建一个图像理解应用
- 实现图像上传和处理功能
- 设计提示模板引导模型理解图像
- 添加图像描述和视觉问答功能
- 评估应用在不同类型图像上的表现
练习3:应用安全性实践
- 实现提示注入检测和防御机制
- 添加输出内容审核功能
- 设计隐私保护措施(如数据匿名化)
- 实现访问控制和认证
- 测试防御机制的有效性
8. 总结与反思
- 大模型应用开发有多种模式,包括直接调用、RAG、Agent、多模态等,选择合适的模式需要考虑任务复杂度、数据类型、性能要求和资源限制
- Agent架构使模型能够使用工具和执行操作,ReAct框架通过交替推理和行动提高Agent的能力,高级Agent架构如自反思Agent和多Agent协作系统进一步增强了功能
- 多模态应用可以处理和生成多种类型的数据,如文本、图像、音频等,构建多模态应用需要考虑不同模态数据的处理和集成
- 大模型应用面临多种安全风险,如提示注入、数据泄露、有害内容生成等,需要实施提示注入防御、输出过滤和隐私保护等安全措施
- 评估和监控大模型应用需要考虑功能性指标、非功能性指标、用户体验指标和安全性指标,可以使用自动评估方法、监控系统和A/B测试框架
- JAVA开发者可以迁移许多已有技能到大模型应用开发中,同时需要学习新的概念和工具,调整开发习惯
9. 预习与延伸阅读
预习内容
- 大模型应用部署和监控
- 大模型应用的商业化和变现
- 大模型应用的伦理和合规考虑
- 大模型技术的未来发展趋势
延伸阅读
- Harrison Chase等,《LangChain: Building applications with LLMs through composability》
- Jason Liu等,《ReAct: Synergizing Reasoning and Acting in Language Models》
- Lilian Weng,《LLM Powered Autonomous Agents》
- OpenAI,《Best Practices for Deploying Language Models》
- Andrew Ng,《AI for Everyone》(关于AI应用的商业和伦理考虑)
10. 明日预告
明天我们将进入第四阶段的学习,重点关注大模型应用的实战项目。我们将从需求分析开始,设计并实现一个完整的大模型应用,涵盖前端界面、后端服务、数据处理、模型集成等各个方面。通过这个实战项目,我们将综合应用前面所学的知识,并解决实际开发中遇到的各种问题。
更多推荐
所有评论(0)