项目九:领域LLM高效微调(法律行业专用LLM微调与合同审查平台)
参数配置核心代码实现===================================业务场景:律师事务所微调专用法律大模型进行合同智能审查
·
项目原型
⚖️ 法律领域LLM高效微调系统
==================================================
[模型训练状态]
基础模型: chatglm3-6b
训练阶段: 第3轮/共5轮
训练进度: ██████████ 100% (当前轮次)
损失值: 1.2345 → 0.8765 → 0.6543
[训练数据概览]
训练样本: 45,678 条
验证样本: 5,000 条
测试样本: 5,000 条
数据分布: 合同审查(40%) 法律问答(35%) 案例分析(25%)
[模型性能评估]
├── 法律术语准确率: 92.3%
├── 法律条文引用准确率: 87.6%
├── 风险识别F1分数: 0.891
└── 回答满意度: 4.5/5.0
[实时测试界面]
输入法律问题: 合同中不可抗力条款应该注意哪些要点?
🤖 法律AI回答:
不可抗力条款是合同中的重要组成部分,应注意以下要点:
1. 明确不可抗力的定义和范围,参照《合同法》第117条...
2. 规定通知义务和证明责任...
3. 约定责任免除和合同处理方式...
4. 注意与国际贸易术语的衔接...
相关法律依据:
• 《中华人民共和国民法典》第180条
• 《合同法》第117-118条
• 最高人民法院相关司法解释
[微调配置面板]
1. 📊 调整训练参数 2. 📝 编辑提示词模板
3. 🗂️ 管理训练数据 4. 🔧 优化模型架构
5. 🧪 运行A/B测试 6. 💾 导出微调模型
[资源监控]
GPU显存: 12.3/16.0 GB
训练速度: 125 samples/秒
预计剩余时间: 45分钟
请输入选择 [1-6]:
参数配置
# config/llm_finetuning_config.yaml
model:
base_model: "THUDM/chatglm3-6b"
model_revision: "main"
torch_dtype: "float16"
load_in_8bit: true
lora:
r: 8
lora_alpha: 32
lora_dropout: 0.1
target_modules:
- "query_key_value"
- "dense"
- "dense_h_to_4h"
- "dense_4h_to_h"
training:
output_dir: "./finetuned_model"
num_epochs: 5
batch_size: 2
gradient_accumulation_steps: 8
warmup_steps: 100
learning_rate: 2e-4
max_length: 1024
logging_steps: 50
save_steps: 500
eval_steps: 500
data:
train_file: "data/train.json"
validation_file: "data/validation.json"
test_file: "data/test.json"
max_samples: 10000
data_formats:
- "qa"
- "document"
- "conversation"
legal_domain:
specialized_terms:
- "原告"
- "被告"
- "诉讼"
- "仲裁"
- "合同"
- "条款"
prompt_templates:
contract_review: "请分析以下合同条款的法律风险: {clause}"
legal_qa: "请回答以下法律问题: {question}"
document_analysis: "请分析以下法律文档: {document}"
generation:
max_length: 512
temperature: 0.7
top_p: 0.9
repetition_penalty: 1.1
核心代码实现
import torch
import torch.nn as nn
from transformers import(
AutoTokenizer,AutoModelForCausalLM,
TrainingArguments,Trainer,
DataCollatorForLanguageModeling,
get_linear_schedule_with_warmup
)
from peft import(
LoraConfig,
get_peft_model,
TaskType,
PeftModel,
PeftConfig
)
import datasets
from typing import Dict,List,Any,Optional
import json
import logging
from datetime import datetime
import numpy as np
from sklearn.metrics import accuracy_score,f1_score
import os
from dataclasses import dataclass
@dataclass
class TrainingConfig:
"""训练配置数据类"""
model_name: str
dataset_path: str
output_dir: str
lora_r: int = 8
lora_alpha: int = 32
lora_dropout: float = 0.1
learning_rate: float = 2e-4
num_epochs: int = 5
batch_size: int = 4
max_length: int = 1024
warmup_steps: int = 100
logging_steps: int = 50
save_steps: int = 500
class DomainLLMTuner:
"""领域LLM高效微调系统,使用QLoRA等高效微调技术"""
def __init__(self,config:TrainingConfig):
self.config = config
self.setup_logging()
#训练组件
self.tokenizer = None
self.model = None
self.lora_config = None
self.trainer = None
#训练状态
self.training_history = []
self.best_metric = None
def setup_logging(self):
"""配置日志系统"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('domain_llm_tuning.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
def load_base_model(self):
"""加载基础模型和分词器"""
self.logger.info(f"加载基础模型: {self.config.model_name}")
try:
#加载分词器
self.tokenizer = AutoTokenizer.from_pretrained(
self.config.model_name,
trust_remote_code = True,
use_fast = False
)
#处理特殊token
if self.tokenizer.pad_token in None:
self.tokenizer.pad_token = self.tokenizer.eos_token
#加载模型
self.model = AutoModelForCausalLM.from_pretrained(
self.config.model_name,
torch_dtype=torch.float16,
device_map="auto",
trust_remote_code=True,
load_in_8bit=True, # 8-bit量化减少显存占用
)
self.logger.info("基础模型加载完成")
except:Exception as e:
self.logger.error(f"模型加载失败: {e}")
raise
def setup_lora(self):
"""设置LoRA配置"""
self.logger.info("配置LoRA参数")
self.lora_config = LoraConfig(
task_type=TaskType.CAUSAL_LM,
inference_mode=False,
r=self.config.lora_r,
lora_alpha=self.config.lora_alpha,
lora_dropout=self.config.lora_dropout,
target_modules=self._get_target_modules()
)
# 应用LoRA到模型
self.model = get_peft_model(self.model, self.lora_config)
# 打印可训练参数
self.model.print_trainable_parameters()
def _get_target_modules(self) -> List[str]:
"""获取目标模块列表"""
# 根据模型架构选择目标模块
model_name_lower = self.config.model_name.lower()
if 'chatglm' in model_name_lower:
return ["query_key_value", "dense", "dense_h_to_4h", "dense_4h_to_h"]
elif 'llama' in model_name_lower or 'baichuan' in model_name_lower:
return ["q_proj", "k_proj", "v_proj", "o_proj", "gate_proj", "up_proj", "down_proj"]
elif 'qwen' in model_name_lower:
return ["c_attn", "c_proj", "w1", "w2", "c_proj"]
else:
# 默认目标模块
return ["query_key_value", "dense"]
def prepare_dataset(self) -> datasets.Dataset:
"""准备训练数据集"""
self.logger.info(f"加载训练数据: {self.config.dataset_path}")
try:
# 加载领域特定数据
with open(self.config.dataset_path, 'r', encoding='utf-8') as f:
raw_data = json.load(f)
# 格式化训练数据
formatted_data = self._format_training_data(raw_data)
# 创建数据集
dataset = datasets.Dataset.from_list(formatted_data)
# 分词处理
tokenized_dataset = dataset.map(
self._tokenize_function,
batched=True,
remove_columns=dataset.column_names
)
# 数据集分割
if len(tokenized_dataset) > 1000: # 数据量足够时进行分割
train_test_split = tokenized_dataset.train_test_split(
test_size=0.1,
shuffle=True,
seed=42
)
tokenized_dataset = train_test_split
else:
# 小数据集,全部用于训练
tokenized_dataset = {
'train': tokenized_dataset,
'test': tokenized_dataset # 相同的测试集
}
self.logger.info(f"训练数据准备完成: {len(tokenized_dataset['train'])} 训练样本, "
f"{len(tokenized_dataset.get('test', []))} 测试样本")
return tokenized_dataset
except Exception as e:
self.logger.error(f"数据准备失败: {e}")
raise
def _format_training_data(self, raw_data: List[Dict]) -> List[Dict]:
"""格式化训练数据"""
formatted = []
for item in raw_data:
if item['type'] == 'qa':
# 问答对格式
text = self._format_qa_prompt(item['question'], item['answer'])
elif item['type'] == 'document':
# 文档格式
text = self._format_document_prompt(item['content'])
elif item['type'] == 'conversation':
# 对话格式
text = self._format_conversation_prompt(item['conversation'])
else:
continue
formatted.append({'text': text})
return formatted
def _format_qa_prompt(self, question: str, answer: str) -> str:
"""格式化问答提示词"""
prompt_template = """你是一个专业的AI助手。请回答以下问题:
问题:{question}
回答:{answer}"""
return prompt_template.format(question=question, answer=answer)
def _format_document_prompt(self, content: str) -> str:
"""格式化文档提示词"""
prompt_template = """请学习以下专业知识:
{content}
请记住这些专业知识,并在后续问题中应用。"""
return prompt_template.format(content=content[:2000]) # 限制长度
def _format_conversation_prompt(self, conversation: List[Dict]) -> str:
"""格式化对话提示词"""
formatted_conversation = []
for turn in conversation:
role = "用户" if turn['role'] == 'user' else "助手"
formatted_conversation.append(f"{role}:{turn['content']}")
return "\n".join(formatted_conversation)
def _tokenize_function(self, examples):
"""分词函数"""
return self.tokenizer(
examples['text'],
truncation=True,
padding=False,
max_length=self.config.max_length,
return_tensors=None
)
def setup_training_args(self):
"""设置训练参数"""
self.logger.info("配置训练参数")
self.training_args = TrainingArguments(
output_dir=self.config.output_dir,
overwrite_output_dir=True,
num_train_epochs=self.config.num_epochs,
per_device_train_batch_size=self.config.batch_size,
per_device_eval_batch_size=self.config.batch_size,
gradient_accumulation_steps=4,
warmup_steps=self.config.warmup_steps,
learning_rate=self.config.learning_rate,
fp16=True, # 混合精度训练
logging_steps=self.config.logging_steps,
save_steps=self.config.save_steps,
eval_steps=self.config.save_steps,
evaluation_strategy="steps",
save_strategy="steps",
load_best_model_at_end=True,
metric_for_best_model="eval_loss",
greater_is_better=False,
report_to=None, # 禁用wandb等外部报告
ddp_find_unused_parameters=False,
)
def train(self, dataset: datasets.Dataset) -> Dict[str, Any]:
"""训练模型"""
self.logger.info("开始模型训练")
try:
# 数据收集器
data_collator = DataCollatorForLanguageModeling(
tokenizer=self.tokenizer,
mlm=False, # 因果语言模型
)
# 初始化Trainer
self.trainer = Trainer(
model=self.model,
args=self.training_args,
train_dataset=dataset['train'],
eval_dataset=dataset.get('test', None),
data_collator=data_collator,
tokenizer=self.tokenizer,
compute_metrics=self._compute_metrics if dataset.get('test') else None,
)
# 开始训练
train_result = self.trainer.train()
# 保存最终模型
self.trainer.save_model()
self.trainer.save_state()
# 记录训练历史
self.training_history = self.trainer.state.log_history
self.logger.info("模型训练完成")
return {
'train_loss': train_result.training_loss,
'train_metrics': train_result.metrics,
'training_time': train_result.metrics.get('train_runtime', 0),
'best_model_path': self.config.output_dir
}
except Exception as e:
self.logger.error(f"训练过程失败: {e}")
raise
def _compute_metrics(self, eval_pred):
"""计算评估指标"""
predictions, labels = eval_pred
predictions = np.argmax(predictions, axis=-1)
# 只计算非填充位置的准确率
mask = labels != -100
predictions = predictions[mask]
labels = labels[mask]
accuracy = accuracy_score(labels, predictions)
f1 = f1_score(labels, predictions, average='macro')
return {
'accuracy': accuracy,
'f1': f1
}
def evaluate_model(self, test_dataset: datasets.Dataset) -> Dict[str, Any]:
"""评估模型性能"""
self.logger.info("开始模型评估")
if self.trainer is None:
self.logger.error("模型未训练,无法评估")
return {}
try:
eval_results = self.trainer.evaluate(eval_dataset=test_dataset)
self.logger.info(f"评估结果: {eval_results}")
return eval_results
except Exception as e:
self.logger.error(f"模型评估失败: {e}")
return {}
def save_model(self, save_path: str):
"""保存微调后的模型"""
self.logger.info(f"保存模型到: {save_path}")
try:
# 保存LoRA适配器
self.model.save_pretrained(save_path)
self.tokenizer.save_pretrained(save_path)
# 保存训练配置
config_info = {
'base_model': self.config.model_name,
'lora_config': self.lora_config.__dict__,
'training_config': self.config.__dict__,
'training_date': datetime.now().isoformat()
}
with open(os.path.join(save_path, 'training_config.json'), 'w') as f:
json.dump(config_info, f, indent=2)
self.logger.info("模型保存完成")
except Exception as e:
self.logger.error(f"模型保存失败: {e}")
raise
def load_finetuned_model(self, model_path: str):
"""加载微调后的模型"""
self.logger.info(f"加载微调模型: {model_path}")
try:
# 加载配置
config = PeftConfig.from_pretrained(model_path)
# 加载基础模型
base_model = AutoModelForCausalLM.from_pretrained(
config.base_model_name_or_path,
torch_dtype=torch.float16,
device_map="auto",
trust_remote_code=True
)
# 加载LoRA适配器
self.model = PeftModel.from_pretrained(base_model, model_path)
self.tokenizer = AutoTokenizer.from_pretrained(model_path)
self.logger.info("微调模型加载完成")
except Exception as e:
self.logger.error(f"微调模型加载失败: {e}")
raise
def generate_response(self, prompt: str, max_length: int = 512, **kwargs) -> str:
"""使用微调后的模型生成回答"""
if self.model is None or self.tokenizer is None:
self.logger.error("模型未加载")
return ""
try:
# 编码输入
inputs = self.tokenizer(prompt, return_tensors="pt", max_length=1024, truncation=True)
inputs = {k: v.to(self.model.device) for k, v in inputs.items()}
# 生成回答
with torch.no_grad():
outputs = self.model.generate(
**inputs,
max_new_tokens=max_length,
temperature=kwargs.get('temperature', 0.7),
do_sample=True,
top_p=kwargs.get('top_p', 0.9),
pad_token_id=self.tokenizer.eos_token_id,
repetition_penalty=kwargs.get('repetition_penalty', 1.1)
)
# 解码输出
response = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
# 提取生成的回答部分
generated_text = response[len(prompt):].strip()
return generated_text
except Exception as e:
self.logger.error(f"文本生成失败: {e}")
return ""
class LegalLLMTuner(DomainLLMTuner):
"""法律领域LLM微调器"""
def __init__(self, config: TrainingConfig):
super().__init__(config)
self.legal_terms = self._load_legal_terms()
self.legal_knowledge_base = {}
def _load_legal_terms(self) -> List[str]:
"""加载法律术语库"""
legal_terms = [
"原告", "被告", "诉讼", "仲裁", "合同", "条款", "违约",
"赔偿责任", "不可抗力", "知识产权", "商业秘密", "侵权行为",
"法律适用", "争议解决", "管辖权", "举证责任", "诉讼时效"
]
return legal_terms
def _format_qa_prompt(self, question: str, answer: str) -> str:
"""格式化法律问答提示词"""
prompt_template = """你是一名专业的法律AI助手。请根据法律知识回答以下问题:
问题:{question}
回答:{answer}
请确保回答专业、准确,并引用相关法律依据。"""
return prompt_template.format(question=question, answer=answer)
def _format_document_prompt(self, content: str) -> str:
"""格式化法律文档提示词"""
prompt_template = """请学习以下法律专业知识:
{content}
请记住这些法律知识,并在后续法律问题中准确应用。"""
return prompt_template.format(content=content)
def create_legal_prompt(self, query: str, context: str = "") -> str:
"""创建法律领域提示词"""
prompt_template = """你是一名专业的法律AI助手,请根据以下法律知识和上下文,回答用户的问题。
法律知识:
{legal_knowledge}
上下文:
{context}
用户问题:
{query}
请以专业、准确的法律语言回答,并引用相关法律依据:"""
return prompt_template.format(
legal_knowledge="\n".join(self.legal_terms[:10]), # 使用部分术语作为示例
context=context,
query=query
)
def analyze_legal_document(self, document_text: str) -> Dict[str, Any]:
"""分析法律文档"""
analysis_prompt = f"""请分析以下法律文档:
{document_text}
请提供:
1. 文档类型识别
2. 关键条款提取
3. 潜在风险点
4. 修改建议
分析结果:"""
analysis_result = self.generate_response(analysis_prompt, max_length=800)
return {
'document_analysis': analysis_result,
'identified_terms': self._extract_legal_terms(document_text),
'analysis_timestamp': datetime.now().isoformat()
}
def _extract_legal_terms(self, text: str) -> List[str]:
"""从文本中提取法律术语"""
found_terms = []
for term in self.legal_terms:
if term in text:
found_terms.append(term)
return found_terms
class LegalDataGenerator:
"""法律训练数据生成器"""
def __init__(self, base_model_name: str):
self.base_model_name = base_model_name
self.tokenizer = AutoTokenizer.from_pretrained(base_model_name)
self.logger = logging.getLogger(__name__)
def generate_qa_pairs(self, legal_topics: List[str], num_pairs_per_topic: int = 20) -> List[Dict]:
"""生成法律问答对"""
qa_pairs = []
for topic in legal_topics:
self.logger.info(f"为主题 '{topic}' 生成问答对")
for i in range(num_pairs_per_topic):
question = self._generate_question(topic, i)
answer = self._generate_answer(question, topic)
qa_pairs.append({
'type': 'qa',
'topic': topic,
'question': question,
'answer': answer,
'difficulty': self._assess_difficulty(question),
'source': 'synthetic'
})
return qa_pairs
def _generate_question(self, topic: str, index: int) -> str:
"""生成法律问题"""
question_templates = [
f"关于{topic}的法律规定有哪些?",
f"在{topic}方面,企业需要注意哪些法律风险?",
f"{topic}相关的典型案例有哪些?",
f"如何合规地处理{topic}相关事务?",
f"{topic}的法律责任如何认定?",
f"违反{topic}规定会有什么后果?",
f"{topic}的法律适用标准是什么?",
f"在{topic}纠纷中如何维护自身权益?"
]
# 轮换使用模板
template_index = index % len(question_templates)
return question_templates[template_index]
def _generate_answer(self, question: str, topic: str) -> str:
"""生成法律回答"""
# 这里简化实现,实际应该使用基础LLM生成更丰富的回答
answer_templates = {
"法律规定": f"关于{topic},主要依据《相关法律》第X条规定...具体包括以下几个方面:1... 2... 3...",
"法律风险": f"在{topic}方面,企业需注意以下法律风险:1... 2... 3... 建议采取以下防范措施...",
"典型案例": f"关于{topic}的典型案例包括:案例1(基本情况...法院观点...)案例2(基本情况...法院观点...)",
"合规建议": f"为合规处理{topic},建议采取以下措施:1... 2... 3... 同时需要注意...",
"法律责任": f"{topic}的法律责任认定需要考虑以下因素:1... 2... 3... 根据《XX法》第X条...",
"违法后果": f"违反{topic}规定可能面临以下后果:1... 2... 3... 具体处罚依据《XX法》第X条...",
"适用标准": f"{topic}的法律适用标准主要包括:1... 2... 3... 法院在判断时会考虑...",
"权益维护": f"在{topic}纠纷中维护自身权益需要注意:1... 2... 3... 建议收集以下证据..."
}
# 简单匹配回答模板
for key, template in answer_templates.items():
if key in question:
return template
return f"关于{topic}的问题,需要结合具体案情和法律条文进行分析。建议咨询专业律师获取具体建议。"
def _assess_difficulty(self, question: str) -> str:
"""评估问题难度"""
difficulty_keywords = {
'easy': ['什么', '哪些', '如何'],
'medium': ['风险', '案例', '建议', '责任'],
'hard': ['认定', '标准', '纠纷', '权益', '适用']
}
for level, keywords in difficulty_keywords.items():
if any(keyword in question for keyword in keywords):
return level
return 'easy'
def run_legal_tuning_pipeline():
"""运行法律模型微调管道"""
# 配置训练参数
config = TrainingConfig(
model_name='THUDM/chatglm3-6b',
dataset_path='data/legal_training.json',
output_dir='./legal_llm_finetuned',
lora_r=8,
lora_alpha=32,
lora_dropout=0.1,
learning_rate=2e-4,
num_epochs=5,
batch_size=2,
max_length=1024,
warmup_steps=100,
logging_steps=50,
save_steps=500
)
# 初始化法律LLM微调器
legal_tuner = LegalLLMTuner(config)
try:
# 1. 加载基础模型
legal_tuner.load_base_model()
# 2. 设置LoRA
legal_tuner.setup_lora()
# 3. 准备训练数据
dataset = legal_tuner.prepare_dataset()
# 4. 设置训练参数
legal_tuner.setup_training_args()
# 5. 开始训练
training_result = legal_tuner.train(dataset)
# 6. 评估模型
if 'test' in dataset:
eval_result = legal_tuner.evaluate_model(dataset['test'])
training_result['evaluation'] = eval_result
# 7. 保存模型
legal_tuner.save_model('./legal_llm_final')
return {
'status': 'success',
'training_result': training_result,
'model_path': './legal_llm_final'
}
except Exception as e:
logging.error(f"法律模型微调管道失败: {e}")
return {
'status': 'error',
'error': str(e)
}
# 使用示例
if __name__ == "__main__":
# 生成训练数据
data_generator = LegalDataGenerator('THUDM/chatglm3-6b')
legal_topics = ['合同法', '劳动法', '知识产权', '公司法', '民事诉讼']
training_data = data_generator.generate_qa_pairs(legal_topics, num_pairs_per_topic=15)
# 保存训练数据
with open('data/legal_training.json', 'w', encoding='utf-8') as f:
json.dump(training_data, f, ensure_ascii=False, indent=2)
# 运行微调管道
result = run_legal_tuning_pipeline()
print(f"微调结果: {result}")
===================================
Java架构
领域LLM在法律合同审查的微调应用
业务场景:律师事务所微调专用法律大模型进行合同智能审查
┌─ 法律合同智能审查平台 ───────────────────────────────────┐
│ 合同: 《某科技公司技术服务协议》 | 审查人: 张律师 │
├─────────────────────────────────────────────────────────┤
│ 【领域专用模型】 │
│ ■ 基座模型: ChatGLM3-6B │
│ ■ 微调数据: 5,000份法律文书 + 200万条法律条款 │
│ ■ 专业领域: 合同法、知识产权法、劳动法 │
├─────────────────────────────────────────────────────────┤
│ 【合同审查报告】 │
│ │
│ 🔍 关键条款分析: │
│ ✓ 服务内容条款: 明确具体,无歧义 │
│ ⚠️ 付款条款: 缺少具体付款时间节点 │
│ ❌ 知识产权条款: 归属约定不明确,存在风险 │
│ ⚠️ 违约责任: 违约金比例过高(30%),可能不被支持 │
│ ✓ 保密条款: 完备有效 │
│ │
│ 💡 修改建议: │
│ 1. 第5.2条建议增加"甲方验收合格后15个工作日内支付" │
│ 2. 第8.3条建议将违约金调整至实际损失的30%上限 │
│ 3. 第6.1条建议明确开发成果的知识产权归属 │
│ │
│ 【风险等级评估】 │
│ 总体风险: 中等 ⭐⭐⭐☆☆ │
│ 法律风险: 知识产权条款高风险,其他中低风险 │
│ 商业风险: 付款条款中风险,违约责任中风险 │
│ │
│ 【相似案例参考】 │
│ 📑 (2023)沪01民终1234号 - 类似技术服务合同纠纷 │
│ 📑 (2022)京民申5678号 - 知识产权归属争议 │
│ │
│ [生成修改稿] [导出审查意见] [添加到知识库] │
└─────────────────────────────────────────────────────────┘
环境搭建
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.company</groupId>
<artifactId>domain-llm-finetuning-system</artifactId>
<version>1.0.0</version>
<packaging>jar</packaging>
<name>Domain LLM Finetuning System</name>
<description>企业级领域大语言模型微调系统</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.1.5</version>
<relativePath/>
</parent>
<properties>
<java.version>17</java.version>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<!-- AI/ML相关版本 -->
<tensorflow.version>2.13.0</tensorflow.version>
<pytorch.version>2.0.1</pytorch.version>
<transformers.version>4.35.2</transformers.version>
<deeplearning4j.version>1.0.0-M2.1</deeplearning4j.version>
<!-- 工具类版本 -->
<hutool.version>5.8.22</hutool.version>
<guava.version>32.1.3-jre</guava.version>
<apache.commons.version>2.14.0</apache.commons.version>
<!-- Python集成 -->
<jep.version>4.1.1</jep.version>
<py4j.version>0.10.9.7</py4j.version>
</properties>
<dependencies>
<!-- Spring Boot Starters -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-cache</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-security</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!-- 数据库 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.33</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- 消息队列 -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- AI/机器学习核心依赖 -->
<dependency>
<groupId>org.tensorflow</groupId>
<artifactId>tensorflow-core-api</artifactId>
<version>${tensorflow.version}</version>
</dependency>
<dependency>
<groupId>org.deeplearning4j</groupId>
<artifactId>deeplearning4j-core</artifactId>
<version>${deeplearning4j.version}</version>
</dependency>
<dependency>
<groupId>org.nd4j</groupId>
<artifactId>nd4j-native-platform</artifactId>
<version>${deeplearning4j.version}</version>
</dependency>
<!-- Python集成 -->
<dependency>
<groupId>black.ninia</groupId>
<artifactId>jep</artifactId>
<version>${jep.version}</version>
</dependency>
<dependency>
<groupId>net.sf.py4j</groupId>
<artifactId>py4j</artifactId>
<version>${py4j.version}</version>
</dependency>
<!-- 文档处理 -->
<dependency>
<groupId>org.apache.pdfbox</groupId>
<artifactId>pdfbox</artifactId>
<version>3.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi</artifactId>
<version>5.2.4</version>
</dependency>
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi-ooxml</artifactId>
<version>5.2.4</version>
</dependency>
<!-- 工具类库 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>${apache.commons.version}</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>${hutool.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<!-- JSON处理 -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
</dependency>
<!-- 数学计算 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-math3</artifactId>
<version>3.6.1</version>
</dependency>
<!-- 监控和指标 -->
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-core</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
</dependency>
<!-- 测试依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.security</groupId>
<artifactId>spring-security-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.11.0</version>
<configuration>
<source>17</source>
<target>17</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
</project>
核心实体类
package com.company.llmfinetuning.entity;
import jakarta.persistence.*;
import lombok.*;
import org.hibernate.annotations.CreationTimestamp;
import org.hibernate.annotations.UpdateTimestamp;
import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
/**
* 领域模型实体 - 存储微调模型的基本信息
* 对应数据库表:domain_models
*/
@Entity
@Table(name = "domain_models", indexes = {
@Index(name = "idx_model_name", columnList = "modelName"),
@Index(name = "idx_model_domain", columnList = "domain"),
@Index(name = "idx_model_status", columnList = "status"),
@Index(name = "idx_model_created", columnList = "createdTime")
})
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@ToString(exclude = {"trainingJobs", "evaluations", "deployments"})
public class DomainModel {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
/**
* 模型唯一标识
*/
@Column(nullable = false, unique = true, length = 64)
private String modelId;
/**
* 模型名称
*/
@Column(nullable = false, length = 200)
private String modelName;
/**
* 模型描述
*/
@Column(columnDefinition = "TEXT")
private String description;
/**
* 领域类型:LEGAL, MEDICAL, FINANCE, TECHNICAL
*/
@Enumerated(EnumType.STRING)
@Column(nullable = false, length = 50)
private DomainType domain;
/**
* 基座模型名称
*/
@Column(nullable = false, length = 100)
private String baseModel;
/**
* 模型版本
*/
@Column(nullable = false, length = 50)
private String version;
/**
* 模型状态:DRAFT, TRAINING, TRAINED, EVALUATING, DEPLOYED, ARCHIVED
*/
@Enumerated(EnumType.STRING)
@Column(nullable = false, length = 30)
private ModelStatus status;
/**
* 模型文件路径
*/
@Column(length = 500)
private String modelPath;
/**
* 模型配置(JSON格式)
*/
@Column(columnDefinition = "JSON")
private String modelConfig;
/**
* 训练参数(JSON格式)
*/
@Column(columnDefinition = "JSON")
private String trainingParams;
/**
* 模型大小(MB)
*/
private Long modelSize;
/**
* 参数数量(亿)
*/
private Long parameterCount;
/**
* 评估指标(JSON格式)
*/
@Column(columnDefinition = "JSON")
private String evaluationMetrics;
/**
* 创建人
*/
@Column(nullable = false, length = 50)
private String createdBy;
/**
* 创建时间
*/
@Column(nullable = false)
private LocalDateTime createdTime;
/**
* 最后更新时间
*/
private LocalDateTime lastUpdatedTime;
/**
* 训练任务列表
*/
@OneToMany(mappedBy = "model", cascade = CascadeType.ALL, fetch = FetchType.LAZY)
private List<TrainingJob> trainingJobs = new ArrayList<>();
/**
* 评估记录列表
*/
@OneToMany(mappedBy = "model", cascade = CascadeType.ALL, fetch = FetchType.LAZY)
private List<ModelEvaluation> evaluations = new ArrayList<>();
/**
* 部署记录列表
*/
@OneToMany(mappedBy = "model", cascade = CascadeType.ALL, fetch = FetchType.LAZY)
private List<ModelDeployment> deployments = new ArrayList<>();
@CreationTimestamp
private LocalDateTime createTime;
@UpdateTimestamp
private LocalDateTime updateTime;
/**
* 领域类型枚举
*/
public enum DomainType {
LEGAL, // 法律
MEDICAL, // 医疗
FINANCE, // 金融
TECHNICAL, // 技术
ACADEMIC, // 学术
BUSINESS // 商业
}
/**
* 模型状态枚举
*/
public enum ModelStatus {
DRAFT, // 草稿
TRAINING, // 训练中
TRAINED, // 已训练
EVALUATING, // 评估中
DEPLOYED, // 已部署
ARCHIVED // 已归档
}
}
/**
* 训练数据集实体 - 存储训练数据信息
* 对应数据库表:training_datasets
*/
@Entity
@Table(name = "training_datasets", indexes = {
@Index(name = "idx_dataset_name", columnList = "datasetName"),
@Index(name = "idx_dataset_domain", columnList = "domain"),
@Index(name = "idx_dataset_status", columnList = "status")
})
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@ToString(exclude = {"dataItems", "trainingJobs"})
public class TrainingDataset {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
/**
* 数据集唯一标识
*/
@Column(nullable = false, unique = true, length = 64)
private String datasetId;
/**
* 数据集名称
*/
@Column(nullable = false, length = 200)
private String datasetName;
/**
* 数据集描述
*/
@Column(columnDefinition = "TEXT")
private String description;
/**
* 领域类型
*/
@Enumerated(EnumType.STRING)
@Column(nullable = false, length = 50)
private DomainModel.DomainType domain;
/**
* 数据集类型:FULL_TRAINING, INSTRUCTION_TUNING, RLHF, EVALUATION
*/
@Enumerated(EnumType.STRING)
@Column(nullable = false, length = 30)
private DatasetType datasetType;
/**
* 数据来源
*/
@Column(nullable = false, length = 100)
private String dataSource;
/**
* 数据文件路径
*/
@Column(nullable = false, length = 500)
private String filePath;
/**
* 文件格式:JSON, JSONL, CSV, TXT
*/
@Column(nullable = false, length = 20)
private String fileFormat;
/**
* 数据总量
*/
@Column(nullable = false)
private Long totalSize;
/**
* 样本数量
*/
@Column(nullable = false)
private Long sampleCount;
/**
* 数据质量评分(0-5)
*/
@Column(precision = 3, scale = 2)
private BigDecimal qualityScore;
/**
* 数据状态:COLLECTING, PROCESSING, READY, ARCHIVED
*/
@Enumerated(EnumType.STRING)
@Column(nullable = false, length = 30)
private DatasetStatus status;
/**
* 数据统计信息(JSON格式)
*/
@Column(columnDefinition = "JSON")
private String statistics;
/**
* 数据预处理配置(JSON格式)
*/
@Column(columnDefinition = "JSON")
private String preprocessingConfig;
/**
* 创建人
*/
@Column(nullable = false, length = 50)
private String createdBy;
/**
* 创建时间
*/
@Column(nullable = false)
private LocalDateTime createdTime;
/**
* 数据项列表
*/
@OneToMany(mappedBy = "dataset", cascade = CascadeType.ALL, fetch = FetchType.LAZY)
private List<DataItem> dataItems = new ArrayList<>();
/**
* 训练任务列表
*/
@OneToMany(mappedBy = "dataset", cascade = CascadeType.ALL, fetch = FetchType.LAZY)
private List<TrainingJob> trainingJobs = new ArrayList<>();
@CreationTimestamp
private LocalDateTime createTime;
@UpdateTimestamp
private LocalDateTime updateTime;
/**
* 数据集类型枚举
*/
public enum DatasetType {
FULL_TRAINING, // 全量训练
INSTRUCTION_TUNING, // 指令微调
RLHF, // 人类反馈强化学习
EVALUATION, // 评估数据
TEST // 测试数据
}
/**
* 数据集状态枚举
*/
public enum DatasetStatus {
COLLECTING, // 收集中
PROCESSING, // 处理中
READY, // 就绪
ARCHIVED // 已归档
}
}
/**
* 训练任务实体 - 存储模型训练任务信息
* 对应数据库表:training_jobs
*/
@Entity
@Table(name = "training_jobs", indexes = {
@Index(name = "idx_job_model", columnList = "model_id"),
@Index(name = "idx_job_dataset", columnList = "dataset_id"),
@Index(name = "idx_job_status", columnList = "status"),
@Index(name = "idx_job_created", columnList = "createdTime")
})
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class TrainingJob {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
/**
* 训练任务唯一标识
*/
@Column(nullable = false, unique = true, length = 64)
private String jobId;
/**
* 关联的领域模型
*/
@ManyToOne(fetch = FetchType.LAZY)
@JoinColumn(name = "model_id", nullable = false)
private DomainModel model;
/**
* 关联的训练数据集
*/
@ManyToOne(fetch = FetchType.LAZY)
@JoinColumn(name = "dataset_id", nullable = false)
private TrainingDataset dataset;
/**
* 任务名称
*/
@Column(nullable = false, length = 200)
private String jobName;
/**
* 训练类型:FULL_FINETUNE, LORA, P_TUNING, PROMPT_TUNING
*/
@Enumerated(EnumType.STRING)
@Column(nullable = false, length = 30)
private TrainingType trainingType;
/**
* 训练参数(JSON格式)
*/
@Column(columnDefinition = "JSON")
private String trainingParameters;
/**
* 超参数配置(JSON格式)
*/
@Column(columnDefinition = "JSON")
private String hyperparameters;
/**
* 训练状态:PENDING, RUNNING, COMPLETED, FAILED, CANCELLED
*/
@Enumerated(EnumType.STRING)
@Column(nullable = false, length = 30)
private JobStatus status;
/**
* 训练进度(0-100)
*/
@Column(nullable = false)
private Integer progress;
/**
* 当前训练轮次
*/
private Integer currentEpoch;
/**
* 总训练轮次
*/
private Integer totalEpochs;
/**
* 训练指标(JSON格式)
*/
@Column(columnDefinition = "JSON")
private String trainingMetrics;
/**
* 训练日志路径
*/
@Column(length = 500)
private String logPath;
/**
* 输出模型路径
*/
@Column(length = 500)
private String outputPath;
/**
* 错误信息
*/
@Column(columnDefinition = "TEXT")
private String errorMessage;
/**
* 训练开始时间
*/
private LocalDateTime startTime;
/**
* 训练结束时间
*/
private LocalDateTime endTime;
/**
* 训练时长(秒)
*/
private Long trainingDuration;
/**
* GPU使用情况(JSON格式)
*/
@Column(columnDefinition = "JSON")
private String gpuUsage;
/**
* 创建人
*/
@Column(nullable = false, length = 50)
private String createdBy;
/**
* 创建时间
*/
@Column(nullable = false)
private LocalDateTime createdTime;
@CreationTimestamp
private LocalDateTime createTime;
@UpdateTimestamp
private LocalDateTime updateTime;
/**
* 训练类型枚举
*/
public enum TrainingType {
FULL_FINETUNE, // 全参数微调
LORA, // LoRA微调
P_TUNING, // P-Tuning
PROMPT_TUNING, // Prompt Tuning
ADAPTER // Adapter Tuning
}
/**
* 任务状态枚举
*/
public enum JobStatus {
PENDING, // 等待中
RUNNING, // 运行中
COMPLETED, // 已完成
FAILED, // 失败
CANCELLED // 已取消
}
}
/**
* 模型评估实体 - 存储模型评估结果
* 对应数据库表:model_evaluations
*/
@Entity
@Table(name = "model_evaluations", indexes = {
@Index(name = "idx_eval_model", columnList = "model_id"),
@Index(name = "idx_eval_dataset", columnList = "dataset_id"),
@Index(name = "idx_eval_type", columnList = "evaluationType")
})
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ModelEvaluation {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
/**
* 评估记录唯一标识
*/
@Column(nullable = false, unique = true, length = 64)
private String evaluationId;
/**
* 关联的领域模型
*/
@ManyToOne(fetch = FetchType.LAZY)
@JoinColumn(name = "model_id", nullable = false)
private DomainModel model;
/**
* 关联的评估数据集
*/
@ManyToOne(fetch = FetchType.LAZY)
@JoinColumn(name = "dataset_id", nullable = false)
private TrainingDataset dataset;
/**
* 评估类型:AUTOMATIC, MANUAL, COMPARATIVE
*/
@Enumerated(EnumType.STRING)
@Column(nullable = false, length = 30)
private EvaluationType evaluationType;
/**
* 评估指标(JSON格式)
*/
@Column(columnDefinition = "JSON")
private String metrics;
/**
* 评估报告(JSON格式)
*/
@Column(columnDefinition = "JSON")
private String evaluationReport;
/**
* 评估状态:RUNNING, COMPLETED, FAILED
*/
@Enumerated(EnumType.STRING)
@Column(nullable = false, length = 20)
private EvaluationStatus status;
/**
* 总体评分(0-100)
*/
@Column(precision = 5, scale = 2)
private BigDecimal overallScore;
/**
* 评估开始时间
*/
@Column(nullable = false)
private LocalDateTime startTime;
/**
* 评估结束时间
*/
private LocalDateTime endTime;
/**
* 评估时长(秒)
*/
private Long evaluationDuration;
/**
* 评估人/系统
*/
@Column(nullable = false, length = 50)
private String evaluatedBy;
@CreationTimestamp
private LocalDateTime createTime;
/**
* 评估类型枚举
*/
public enum EvaluationType {
AUTOMATIC, // 自动评估
MANUAL, // 人工评估
COMPARATIVE // 对比评估
}
/**
* 评估状态枚举
*/
public enum EvaluationStatus {
RUNNING, // 评估中
COMPLETED, // 已完成
FAILED // 失败
}
}
数据预处理服务实现
package com.company.llmfinetuning.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.web.multipart.MultipartFile;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.Map;
/**
* 数据预处理服务
* 负责训练数据的清洗、格式化、增强和质量控制
*/
@Service
@Slf4j
public class DataPreprocessingService {
private final PythonIntegrationService pythonIntegrationService;
private final FileStorageService fileStorageService;
private final DataQualityService dataQualityService;
public DataPreprocessingService(PythonIntegrationService pythonIntegrationService,
FileStorageService fileStorageService,
DataQualityService dataQualityService) {
this.pythonIntegrationService = pythonIntegrationService;
this.fileStorageService = fileStorageService;
this.dataQualityService = dataQualityService;
}
/**
* 预处理训练数据集
*
* @param dataset 训练数据集
* @param preprocessingConfig 预处理配置
* @return 预处理结果
*/
public PreprocessingResult preprocessDataset(TrainingDataset dataset,
PreprocessingConfig preprocessingConfig) {
log.info("开始预处理数据集: {}", dataset.getDatasetName());
long startTime = System.currentTimeMillis();
try {
PreprocessingResult result = PreprocessingResult.builder()
.datasetId(dataset.getDatasetId())
.startTime(LocalDateTime.now())
.steps(new ArrayList<>())
.build();
// 1. 数据加载和验证
DataLoadingResult loadingResult = loadAndValidateData(dataset);
result.getSteps().add(PreprocessingStep.builder()
.stepName("DATA_LOADING")
.status(StepStatus.COMPLETED)
.details(loadingResult.toJson())
.build());
if (!loadingResult.isSuccess()) {
throw new DataPreprocessingException("数据加载失败: " + loadingResult.getErrorMessage());
}
// 2. 数据清洗
DataCleaningResult cleaningResult = cleanData(loadingResult, preprocessingConfig);
result.getSteps().add(PreprocessingStep.builder()
.stepName("DATA_CLEANING")
.status(StepStatus.COMPLETED)
.details(cleaningResult.toJson())
.build());
// 3. 数据格式化
DataFormattingResult formattingResult = formatData(cleaningResult, preprocessingConfig);
result.getSteps().add(PreprocessingStep.builder()
.stepName("DATA_FORMATTING")
.status(StepStatus.COMPLETED)
.details(formattingResult.toJson())
.build());
// 4. 数据增强(如果需要)
if (preprocessingConfig.isDataAugmentationEnabled()) {
DataAugmentationResult augmentationResult = augmentData(formattingResult, preprocessingConfig);
result.getSteps().add(PreprocessingStep.builder()
.stepName("DATA_AUGMENTATION")
.status(StepStatus.COMPLETED)
.details(augmentationResult.toJson())
.build());
formattingResult = augmentationResult.getFormattedData();
}
// 5. 质量检查
QualityCheckResult qualityResult = performQualityCheck(formattingResult, preprocessingConfig);
result.getSteps().add(PreprocessingStep.builder()
.stepName("QUALITY_CHECK")
.status(StepStatus.COMPLETED)
.details(qualityResult.toJson())
.build());
// 6. 保存预处理后的数据
String outputPath = savePreprocessedData(formattingResult, dataset);
// 更新数据集信息
updateDatasetAfterPreprocessing(dataset, formattingResult, qualityResult, outputPath);
result.setEndTime(LocalDateTime.now());
result.setProcessingTime(System.currentTimeMillis() - startTime);
result.setOutputPath(outputPath);
result.setSuccess(true);
result.setQualityScore(qualityResult.getOverallQualityScore());
log.info("数据集预处理完成: {}, 处理时间: {}ms", dataset.getDatasetName(), result.getProcessingTime());
return result;
} catch (Exception e) {
log.error("数据集预处理失败: {}, 错误: {}", dataset.getDatasetName(), e.getMessage(), e);
throw new DataPreprocessingException("数据集预处理失败: " + e.getMessage(), e);
}
}
/**
* 加载和验证数据
*/
private DataLoadingResult loadAndValidateData(TrainingDataset dataset) {
log.debug("加载和验证数据,数据集: {}", dataset.getDatasetName());
try {
// 调用Python数据处理脚本
Map<String, Object> pythonResult = pythonIntegrationService.executePythonScript(
"data_loading.py",
Map.of(
"file_path", dataset.getFilePath(),
"file_format", dataset.getFileFormat(),
"dataset_id", dataset.getDatasetId()
)
);
DataLoadingResult result = DataLoadingResult.fromPythonResult(pythonResult);
if (!result.isSuccess()) {
log.warn("数据加载失败: {}", result.getErrorMessage());
return result;
}
log.debug("数据加载成功,样本数量: {}", result.getSampleCount());
return result;
} catch (Exception e) {
log.error("数据加载异常: {}", e.getMessage(), e);
return DataLoadingResult.error("数据加载异常: " + e.getMessage());
}
}
/**
* 数据清洗
*/
private DataCleaningResult cleanData(DataLoadingResult loadingResult,
PreprocessingConfig config) {
log.debug("开始数据清洗,样本数量: {}", loadingResult.getSampleCount());
try {
Map<String, Object> pythonResult = pythonIntegrationService.executePythonScript(
"data_cleaning.py",
Map.of(
"raw_data", loadingResult.getRawData(),
"cleaning_config", config.getCleaningConfig(),
"domain_type", config.getDomainType().name()
)
);
DataCleaningResult result = DataCleaningResult.fromPythonResult(pythonResult);
log.debug("数据清洗完成,保留样本: {}, 过滤样本: {}",
result.getRetainedSamples(), result.getFilteredSamples());
return result;
} catch (Exception e) {
log.error("数据清洗异常: {}", e.getMessage(), e);
throw new DataPreprocessingException("数据清洗失败: " + e.getMessage(), e);
}
}
/**
* 数据格式化
*/
private DataFormattingResult formatData(DataCleaningResult cleaningResult,
PreprocessingConfig config) {
log.debug("开始数据格式化");
try {
Map<String, Object> pythonResult = pythonIntegrationService.executePythonScript(
"data_formatting.py",
Map.of(
"cleaned_data", cleaningResult.getCleanedData(),
"format_config", config.getFormatConfig(),
"training_type", config.getTrainingType().name()
)
);
DataFormattingResult result = DataFormattingResult.fromPythonResult(pythonResult);
log.debug("数据格式化完成,训练样本: {}, 验证样本: {}",
result.getTrainingSamples(), result.getValidationSamples());
return result;
} catch (Exception e) {
log.error("数据格式化异常: {}", e.getMessage(), e);
throw new DataPreprocessingException("数据格式化失败: " + e.getMessage(), e);
}
}
/**
* 数据增强
*/
private DataAugmentationResult augmentData(DataFormattingResult formattingResult,
PreprocessingConfig config) {
log.debug("开始数据增强,原始样本: {}", formattingResult.getTrainingSamples());
try {
Map<String, Object> pythonResult = pythonIntegrationService.executePythonScript(
"data_augmentation.py",
Map.of(
"formatted_data", formattingResult.getFormattedData(),
"augmentation_config", config.getAugmentationConfig(),
"domain_type", config.getDomainType().name()
)
);
DataAugmentationResult result = DataAugmentationResult.fromPythonResult(pythonResult);
log.debug("数据增强完成,增强后样本: {}, 增强倍数: {}",
result.getAugmentedSamples(), result.getAugmentationFactor());
return result;
} catch (Exception e) {
log.error("数据增强异常: {}", e.getMessage(), e);
throw new DataPreprocessingException("数据增强失败: " + e.getMessage(), e);
}
}
/**
* 质量检查
*/
private QualityCheckResult performQualityCheck(DataFormattingResult formattingResult,
PreprocessingConfig config) {
log.debug("开始数据质量检查");
try {
QualityCheckResult result = dataQualityService.checkDataQuality(
formattingResult.getFormattedData(), config);
log.debug("质量检查完成,总体评分: {}, 通过检查: {}",
result.getOverallQualityScore(), result.isQualityPassed());
return result;
} catch (Exception e) {
log.error("质量检查异常: {}", e.getMessage(), e);
throw new DataPreprocessingException("质量检查失败: " + e.getMessage(), e);
}
}
/**
* 保存预处理后的数据
*/
private String savePreprocessedData(DataFormattingResult formattingResult,
TrainingDataset dataset) {
log.debug("保存预处理后的数据");
try {
String outputFileName = String.format("preprocessed_%s_%s.jsonl",
dataset.getDatasetId(), System.currentTimeMillis());
Path outputPath = Paths.get("data/preprocessed", outputFileName);
// 调用Python脚本保存数据
Map<String, Object> pythonResult = pythonIntegrationService.executePythonScript(
"data_saving.py",
Map.of(
"formatted_data", formattingResult.getFormattedData(),
"output_path", outputPath.toString(),
"file_format", "jsonl"
)
);
if (!Boolean.TRUE.equals(pythonResult.get("success"))) {
throw new DataPreprocessingException("数据保存失败: " + pythonResult.get("error"));
}
log.debug("数据保存成功: {}", outputPath);
return outputPath.toString();
} catch (Exception e) {
log.error("数据保存异常: {}", e.getMessage(), e);
throw new DataPreprocessingException("数据保存失败: " + e.getMessage(), e);
}
}
/**
* 更新数据集信息
*/
private void updateDatasetAfterPreprocessing(TrainingDataset dataset,
DataFormattingResult formattingResult,
QualityCheckResult qualityResult,
String outputPath) {
dataset.setFilePath(outputPath);
dataset.setStatus(TrainingDataset.DatasetStatus.READY);
dataset.setSampleCount(formattingResult.getTotalSamples());
dataset.setQualityScore(qualityResult.getOverallQualityScore());
// 更新统计信息
DatasetStatistics statistics = DatasetStatistics.builder()
.totalSamples(formattingResult.getTotalSamples())
.trainingSamples(formattingResult.getTrainingSamples())
.validationSamples(formattingResult.getValidationSamples())
.averageLength(formattingResult.getAverageLength())
.qualityMetrics(qualityResult.getQualityMetrics())
.build();
dataset.setStatistics(statistics.toJson());
dataset.setUpdateTime(LocalDateTime.now());
trainingDatasetRepository.save(dataset);
log.debug("数据集信息已更新: {}", dataset.getDatasetName());
}
/**
* 批量预处理数据集
*/
public BatchPreprocessingResult batchPreprocessDatasets(List<TrainingDataset> datasets,
PreprocessingConfig config) {
log.info("开始批量预处理数据集,数量: {}", datasets.size());
BatchPreprocessingResult batchResult = BatchPreprocessingResult.builder()
.totalDatasets(datasets.size())
.results(new HashMap<>())
.startTime(LocalDateTime.now())
.build();
for (TrainingDataset dataset : datasets) {
try {
PreprocessingResult result = preprocessDataset(dataset, config);
batchResult.getResults().put(dataset.getDatasetId(), result);
if (result.isSuccess()) {
batchResult.incrementSuccessCount();
} else {
batchResult.incrementFailedCount();
}
} catch (Exception e) {
log.warn("数据集 {} 预处理失败: {}", dataset.getDatasetName(), e.getMessage());
batchResult.incrementFailedCount();
batchResult.getResults().put(dataset.getDatasetId(),
PreprocessingResult.error(dataset.getDatasetId(), e.getMessage()));
}
}
batchResult.setEndTime(LocalDateTime.now());
batchResult.calculateStatistics();
log.info("批量预处理完成,成功: {}, 失败: {}",
batchResult.getSuccessCount(), batchResult.getFailedCount());
return batchResult;
}
}
/**
* 预处理结果数据类
*/
@Data
@Builder
class PreprocessingResult {
private String datasetId;
private LocalDateTime startTime;
private LocalDateTime endTime;
private Long processingTime;
private String outputPath;
private BigDecimal qualityScore;
private Boolean success;
private String errorMessage;
private List<PreprocessingStep> steps;
public static PreprocessingResult error(String datasetId, String errorMessage) {
return PreprocessingResult.builder()
.datasetId(datasetId)
.success(false)
.errorMessage(errorMessage)
.startTime(LocalDateTime.now())
.endTime(LocalDateTime.now())
.steps(new ArrayList<>())
.build();
}
}
/**
* 预处理步骤数据类
*/
@Data
@Builder
class PreprocessingStep {
private String stepName;
private StepStatus status;
private String details;
private Long duration;
private String errorMessage;
}
/**
* 数据加载结果数据类
*/
@Data
@Builder
class DataLoadingResult {
private Boolean success;
private String errorMessage;
private Long sampleCount;
private Long totalSize;
private String rawData; // JSON格式的原始数据
private List<String> validationErrors;
public static DataLoadingResult error(String errorMessage) {
return DataLoadingResult.builder()
.success(false)
.errorMessage(errorMessage)
.validationErrors(new ArrayList<>())
.build();
}
public static DataLoadingResult fromPythonResult(Map<String, Object> pythonResult) {
// 从Python执行结果解析数据
return DataLoadingResult.builder()
.success(Boolean.TRUE.equals(pythonResult.get("success")))
.errorMessage((String) pythonResult.get("error"))
.sampleCount(Long.valueOf(pythonResult.get("sample_count").toString()))
.totalSize(Long.valueOf(pythonResult.get("total_size").toString()))
.rawData((String) pythonResult.get("raw_data"))
.validationErrors((List<String>) pythonResult.get("validation_errors"))
.build();
}
}
/**
* 数据清洗结果数据类
*/
@Data
@Builder
class DataCleaningResult {
private Boolean success;
private Long retainedSamples;
private Long filteredSamples;
private List<String> cleaningLogs;
private String cleanedData; // JSON格式的清洗后数据
public static DataCleaningResult fromPythonResult(Map<String, Object> pythonResult) {
return DataCleaningResult.builder()
.success(Boolean.TRUE.equals(pythonResult.get("success")))
.retainedSamples(Long.valueOf(pythonResult.get("retained_samples").toString()))
.filteredSamples(Long.valueOf(pythonResult.get("filtered_samples").toString()))
.cleaningLogs((List<String>) pythonResult.get("cleaning_logs"))
.cleanedData((String) pythonResult.get("cleaned_data"))
.build();
}
}
/**
* 数据格式化结果数据类
*/
@Data
@Builder
class DataFormattingResult {
private Boolean success;
private Long totalSamples;
private Long trainingSamples;
private Long validationSamples;
private Double averageLength;
private String formattedData; // JSON格式的格式化数据
public static DataFormattingResult fromPythonResult(Map<String, Object> pythonResult) {
return DataFormattingResult.builder()
.success(Boolean.TRUE.equals(pythonResult.get("success")))
.totalSamples(Long.valueOf(pythonResult.get("total_samples").toString()))
.trainingSamples(Long.valueOf(pythonResult.get("training_samples").toString()))
.validationSamples(Long.valueOf(pythonResult.get("validation_samples").toString()))
.averageLength(Double.valueOf(pythonResult.get("average_length").toString()))
.formattedData((String) pythonResult.get("formatted_data"))
.build();
}
}
/**
* 数据增强结果数据类
*/
@Data
@Builder
class DataAugmentationResult {
private Boolean success;
private Long originalSamples;
private Long augmentedSamples;
private Double augmentationFactor;
private DataFormattingResult formattedData;
public static DataAugmentationResult fromPythonResult(Map<String, Object> pythonResult) {
return DataAugmentationResult.builder()
.success(Boolean.TRUE.equals(pythonResult.get("success")))
.originalSamples(Long.valueOf(pythonResult.get("original_samples").toString()))
.augmentedSamples(Long.valueOf(pythonResult.get("augmented_samples").toString()))
.augmentationFactor(Double.valueOf(pythonResult.get("augmentation_factor").toString()))
.build();
}
}
/**
* 质量检查结果数据类
*/
@Data
@Builder
class QualityCheckResult {
private Boolean qualityPassed;
private BigDecimal overallQualityScore;
private Map<String, BigDecimal> qualityMetrics;
private List<QualityIssue> qualityIssues;
private String qualityReport;
}
/**
* 批量预处理结果数据类
*/
@Data
@Builder
class BatchPreprocessingResult {
private Integer totalDatasets;
private Integer successCount;
private Integer failedCount;
private LocalDateTime startTime;
private LocalDateTime endTime;
private Map<String, PreprocessingResult> results;
public void incrementSuccessCount() {
this.successCount = (this.successCount == null ? 0 : this.successCount) + 1;
}
public void incrementFailedCount() {
this.failedCount = (this.failedCount == null ? 0 : this.failedCount) + 1;
}
public void calculateStatistics() {
this.successCount = (int) results.values().stream()
.filter(PreprocessingResult::getSuccess)
.count();
this.failedCount = totalDatasets - successCount;
}
}
模型训练服务实现
package com.company.llmfinetuning.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Map;
/**
* 模型训练服务
* 负责管理模型训练任务,包括参数配置、训练执行、进度监控和结果收集
*/
@Service
@Slf4j
public class ModelTrainingService {
private final PythonIntegrationService pythonIntegrationService;
private final TrainingJobRepository trainingJobRepository;
private final ModelRepository modelRepository;
private final TrainingMonitorService trainingMonitorService;
private final ResourceManagerService resourceManagerService;
public ModelTrainingService(PythonIntegrationService pythonIntegrationService,
TrainingJobRepository trainingJobRepository,
ModelRepository modelRepository,
TrainingMonitorService trainingMonitorService,
ResourceManagerService resourceManagerService) {
this.pythonIntegrationService = pythonIntegrationService;
this.trainingJobRepository = trainingJobRepository;
this.modelRepository = modelRepository;
this.trainingMonitorService = trainingMonitorService;
this.resourceManagerService = resourceManagerService;
}
/**
* 创建并启动训练任务
*
* @param trainingRequest 训练请求
* @return 训练任务信息
*/
public TrainingJob createTrainingJob(TrainingRequest trainingRequest) {
log.info("创建训练任务,模型: {}, 数据集: {}",
trainingRequest.getModelId(), trainingRequest.getDatasetId());
try {
// 1. 验证训练资源
ResourceAllocation resourceAllocation = resourceManagerService.allocateTrainingResources(
trainingRequest.getTrainingConfig());
if (!resourceAllocation.isSuccess()) {
throw new TrainingResourceException("训练资源分配失败: " + resourceAllocation.getMessage());
}
// 2. 创建训练任务记录
TrainingJob trainingJob = buildTrainingJob(trainingRequest, resourceAllocation);
trainingJob = trainingJobRepository.save(trainingJob);
// 3. 更新模型状态
updateModelStatus(trainingRequest.getModelId(), DomainModel.ModelStatus.TRAINING);
// 4. 异步启动训练
startTrainingAsync(trainingJob);
log.info("训练任务创建成功,任务ID: {}", trainingJob.getJobId());
return trainingJob;
} catch (Exception e) {
log.error("创建训练任务失败: {}", e.getMessage(), e);
throw new TrainingJobException("创建训练任务失败: " + e.getMessage(), e);
}
}
/**
* 异步启动训练
*/
private void startTrainingAsync(TrainingJob trainingJob) {
CompletableFuture.runAsync(() -> {
try {
executeTraining(trainingJob);
} catch (Exception e) {
log.error("训练任务执行失败,任务ID: {}, 错误: {}",
trainingJob.getJobId(), e.getMessage(), e);
handleTrainingFailure(trainingJob, e);
}
});
}
/**
* 执行训练任务
*/
private void executeTraining(TrainingJob trainingJob) {
log.info("开始执行训练任务,任务ID: {}", trainingJob.getJobId());
try {
// 1. 更新任务状态为运行中
trainingJob.setStatus(TrainingJob.JobStatus.RUNNING);
trainingJob.setStartTime(LocalDateTime.now());
trainingJobRepository.save(trainingJob);
// 2. 准备训练参数
Map<String, Object> trainingParams = prepareTrainingParameters(trainingJob);
// 3. 启动训练监控
trainingMonitorService.startMonitoring(trainingJob.getJobId());
// 4. 调用Python训练脚本
Map<String, Object> pythonResult = pythonIntegrationService.executePythonScript(
"model_training.py",
trainingParams
);
// 5. 处理训练结果
handleTrainingResult(trainingJob, pythonResult);
log.info("训练任务执行完成,任务ID: {}", trainingJob.getJobId());
} catch (Exception e) {
log.error("训练执行异常,任务ID: {}, 错误: {}", trainingJob.getJobId(), e.getMessage(), e);
throw new TrainingExecutionException("训练执行异常: " + e.getMessage(), e);
}
}
/**
* 准备训练参数
*/
private Map<String, Object> prepareTrainingParameters(TrainingJob trainingJob) {
Map<String, Object> params = new HashMap<>();
// 基础参数
params.put("job_id", trainingJob.getJobId());
params.put("model_id", trainingJob.getModel().getModelId());
params.put("dataset_id", trainingJob.getDataset().getDatasetId());
params.put("base_model", trainingJob.getModel().getBaseModel());
params.put("training_type", trainingJob.getTrainingType().name());
// 训练参数
TrainingConfig trainingConfig = TrainingConfig.fromJson(trainingJob.getTrainingParameters());
params.put("training_config", trainingConfig.toMap());
// 超参数
Hyperparameters hyperparameters = Hyperparameters.fromJson(trainingJob.getHyperparameters());
params.put("hyperparameters", hyperparameters.toMap());
// 路径参数
params.put("output_dir", getOutputDirectory(trainingJob));
params.put("log_dir", getLogDirectory(trainingJob));
params.put("checkpoint_dir", getCheckpointDirectory(trainingJob));
// 资源参数
params.put("gpu_count", trainingConfig.getGpuCount());
params.put("memory_limit", trainingConfig.getMemoryLimit());
log.debug("训练参数准备完成,参数数量: {}", params.size());
return params;
}
/**
* 处理训练结果
*/
private void handleTrainingResult(TrainingJob trainingJob, Map<String, Object> pythonResult) {
boolean success = Boolean.TRUE.equals(pythonResult.get("success"));
if (success) {
handleTrainingSuccess(trainingJob, pythonResult);
} else {
handleTrainingFailure(trainingJob,
new TrainingExecutionException("Python训练脚本执行失败: " + pythonResult.get("error")));
}
}
/**
* 处理训练成功
*/
private void handleTrainingSuccess(TrainingJob trainingJob, Map<String, Object> pythonResult) {
log.info("训练任务成功完成,任务ID: {}", trainingJob.getJobId());
try {
// 更新训练任务状态
trainingJob.setStatus(TrainingJob.JobStatus.COMPLETED);
trainingJob.setEndTime(LocalDateTime.now());
trainingJob.setProgress(100);
// 计算训练时长
long duration = java.time.Duration.between(
trainingJob.getStartTime(), trainingJob.getEndTime()).getSeconds();
trainingJob.setTrainingDuration(duration);
// 保存训练指标
TrainingMetrics metrics = TrainingMetrics.fromPythonResult(pythonResult);
trainingJob.setTrainingMetrics(metrics.toJson());
// 保存输出路径
String modelPath = (String) pythonResult.get("model_path");
trainingJob.setOutputPath(modelPath);
trainingJobRepository.save(trainingJob);
// 更新模型信息
updateModelAfterTraining(trainingJob.getModel(), trainingJob, metrics);
// 停止监控
trainingMonitorService.stopMonitoring(trainingJob.getJobId());
log.info("训练结果处理完成,模型路径: {}", modelPath);
} catch (Exception e) {
log.error("处理训练成功结果异常: {}", e.getMessage(), e);
// 即使处理异常,任务状态仍然是完成的
}
}
/**
* 处理训练失败
*/
private void handleTrainingFailure(TrainingJob trainingJob, Exception error) {
log.error("训练任务失败,任务ID: {}, 错误: {}", trainingJob.getJobId(), error.getMessage());
try {
// 更新训练任务状态
trainingJob.setStatus(TrainingJob.JobStatus.FAILED);
trainingJob.setEndTime(LocalDateTime.now());
trainingJob.setErrorMessage(error.getMessage());
trainingJobRepository.save(trainingJob);
// 更新模型状态
updateModelStatus(trainingJob.getModel().getModelId(), DomainModel.ModelStatus.DRAFT);
// 停止监控
trainingMonitorService.stopMonitoring(trainingJob.getJobId());
// 释放资源
resourceManagerService.releaseTrainingResources(trainingJob.getJobId());
} catch (Exception e) {
log.error("处理训练失败异常: {}", e.getMessage(), e);
}
}
/**
* 训练完成后更新模型信息
*/
private void updateModelAfterTraining(DomainModel model, TrainingJob trainingJob, TrainingMetrics metrics) {
model.setStatus(DomainModel.ModelStatus.TRAINED);
model.setModelPath(trainingJob.getOutputPath());
model.setTrainingParams(trainingJob.getTrainingParameters());
model.setEvaluationMetrics(metrics.toJson());
model.setLastUpdatedTime(LocalDateTime.now());
// 更新模型配置
ModelConfig modelConfig = ModelConfig.fromJson(model.getModelConfig());
modelConfig.setTrainingJobId(trainingJob.getJobId());
modelConfig.setLastTrainingTime(LocalDateTime.now());
model.setModelConfig(modelConfig.toJson());
modelRepository.save(model);
log.debug("模型信息已更新: {}", model.getModelName());
}
/**
* 获取训练任务状态
*/
public TrainingJobStatus getTrainingStatus(String jobId) {
log.debug("获取训练任务状态,任务ID: {}", jobId);
TrainingJob trainingJob = trainingJobRepository.findByJobId(jobId)
.orElseThrow(() -> new ResourceNotFoundException("训练任务不存在: " + jobId));
TrainingMetrics metrics = null;
if (trainingJob.getTrainingMetrics() != null) {
metrics = TrainingMetrics.fromJson(trainingJob.getTrainingMetrics());
}
return TrainingJobStatus.builder()
.jobId(jobId)
.status(trainingJob.getStatus())
.progress(trainingJob.getProgress())
.currentEpoch(trainingJob.getCurrentEpoch())
.totalEpochs(trainingJob.getTotalEpochs())
.startTime(trainingJob.getStartTime())
.trainingDuration(trainingJob.getTrainingDuration())
.metrics(metrics)
.errorMessage(trainingJob.getErrorMessage())
.build();
}
/**
* 取消训练任务
*/
public void cancelTrainingJob(String jobId) {
log.info("取消训练任务,任务ID: {}", jobId);
TrainingJob trainingJob = trainingJobRepository.findByJobId(jobId)
.orElseThrow(() -> new ResourceNotFoundException("训练任务不存在: " + jobId));
if (trainingJob.getStatus() != TrainingJob.JobStatus.RUNNING) {
throw new TrainingJobException("只能取消运行中的训练任务");
}
try {
// 调用Python脚本取消训练
Map<String, Object> pythonResult = pythonIntegrationService.executePythonScript(
"training_control.py",
Map.of("action", "cancel", "job_id", jobId)
);
if (Boolean.TRUE.equals(pythonResult.get("success"))) {
trainingJob.setStatus(TrainingJob.JobStatus.CANCELLED);
trainingJob.setEndTime(LocalDateTime.now());
trainingJobRepository.save(trainingJob);
// 更新模型状态
updateModelStatus(trainingJob.getModel().getModelId(), DomainModel.ModelStatus.DRAFT);
// 释放资源
resourceManagerService.releaseTrainingResources(jobId);
log.info("训练任务取消成功,任务ID: {}", jobId);
} else {
throw new TrainingJobException("取消训练任务失败: " + pythonResult.get("error"));
}
} catch (Exception e) {
log.error("取消训练任务异常: {}", e.getMessage(), e);
throw new TrainingJobException("取消训练任务失败: " + e.getMessage(), e);
}
}
/**
* 获取训练任务日志
*/
public TrainingLogs getTrainingLogs(String jobId, Integer lines) {
log.debug("获取训练任务日志,任务ID: {}, 行数: {}", jobId, lines);
TrainingJob trainingJob = trainingJobRepository.findByJobId(jobId)
.orElseThrow(() -> new ResourceNotFoundException("训练任务不存在: " + jobId));
try {
Map<String, Object> pythonResult = pythonIntegrationService.executePythonScript(
"log_reader.py",
Map.of("log_path", trainingJob.getLogPath(), "lines", lines)
);
if (Boolean.TRUE.equals(pythonResult.get("success"))) {
return TrainingLogs.builder()
.jobId(jobId)
.logs((List<String>) pythonResult.get("logs"))
.totalLines(Long.valueOf(pythonResult.get("total_lines").toString()))
.build();
} else {
throw new TrainingJobException("获取训练日志失败: " + pythonResult.get("error"));
}
} catch (Exception e) {
log.error("获取训练日志异常: {}", e.getMessage(), e);
throw new TrainingJobException("获取训练日志失败: " + e.getMessage(), e);
}
}
private TrainingJob buildTrainingJob(TrainingRequest request, ResourceAllocation allocation) {
return TrainingJob.builder()
.jobId(generateJobId())
.model(modelRepository.findByModelId(request.getModelId())
.orElseThrow(() -> new ResourceNotFoundException("模型不存在: " + request.getModelId())))
.dataset(trainingDatasetRepository.findByDatasetId(request.getDatasetId())
.orElseThrow(() -> new ResourceNotFoundException("数据集不存在: " + request.getDatasetId())))
.jobName(request.getJobName())
.trainingType(request.getTrainingType())
.trainingParameters(request.getTrainingConfig().toJson())
.hyperparameters(request.getHyperparameters().toJson())
.status(TrainingJob.JobStatus.PENDING)
.progress(0)
.currentEpoch(0)
.totalEpochs(request.getTrainingConfig().getEpochs())
.createdBy(request.getCreatedBy())
.createdTime(LocalDateTime.now())
.build();
}
private void updateModelStatus(String modelId, DomainModel.ModelStatus status) {
DomainModel model = modelRepository.findByModelId(modelId)
.orElseThrow(() -> new ResourceNotFoundException("模型不存在: " + modelId));
model.setStatus(status);
model.setLastUpdatedTime(LocalDateTime.now());
modelRepository.save(model);
}
private String generateJobId() {
return "JOB_" + System.currentTimeMillis() + "_" +
UUID.randomUUID().toString().substring(0, 8);
}
private String getOutputDirectory(TrainingJob trainingJob) {
return Paths.get("models", trainingJob.getModel().getModelId(),
trainingJob.getJobId()).toString();
}
private String getLogDirectory(TrainingJob trainingJob) {
return Paths.get("logs", "training", trainingJob.getJobId()).toString();
}
private String getCheckpointDirectory(TrainingJob trainingJob) {
return Paths.get("checkpoints", trainingJob.getJobId()).toString();
}
}
/**
* 训练请求数据类
*/
@Data
@Builder
class TrainingRequest {
private String modelId;
private String datasetId;
private String jobName;
private TrainingJob.TrainingType trainingType;
private TrainingConfig trainingConfig;
private Hyperparameters hyperparameters;
private String createdBy;
}
/**
* 训练配置数据类
*/
@Data
@Builder
class TrainingConfig {
private Integer epochs;
private Integer batchSize;
private Double learningRate;
private Integer warmupSteps;
private Integer maxSeqLength;
private Integer gradientAccumulationSteps;
private Integer saveSteps;
private Integer evalSteps;
private Integer loggingSteps;
private Integer maxGradNorm;
private Integer gpuCount;
private Long memoryLimit;
public String toJson() {
// 实现JSON序列化
return JsonUtils.toJson(this);
}
public static TrainingConfig fromJson(String json) {
return JsonUtils.fromJson(json, TrainingConfig.class);
}
public Map<String, Object> toMap() {
Map<String, Object> map = new HashMap<>();
map.put("epochs", epochs);
map.put("batch_size", batchSize);
map.put("learning_rate", learningRate);
map.put("warmup_steps", warmupSteps);
map.put("max_seq_length", maxSeqLength);
map.put("gradient_accumulation_steps", gradientAccumulationSteps);
map.put("save_steps", saveSteps);
map.put("eval_steps", evalSteps);
map.put("logging_steps", loggingSteps);
map.put("max_grad_norm", maxGradNorm);
return map;
}
}
/**
* 超参数数据类
*/
@Data
@Builder
class Hyperparameters {
private String optimizer;
private String scheduler;
private Double weightDecay;
private Double adamEpsilon;
private Double adamBeta1;
private Double adamBeta2;
private Integer maxSteps;
private Integer seed;
private Boolean fp16;
private Boolean bf16;
public String toJson() {
return JsonUtils.toJson(this);
}
public static Hyperparameters fromJson(String json) {
return JsonUtils.fromJson(json, Hyperparameters.class);
}
public Map<String, Object> toMap() {
Map<String, Object> map = new HashMap<>();
map.put("optimizer", optimizer);
map.put("scheduler", scheduler);
map.put("weight_decay", weightDecay);
map.put("adam_epsilon", adamEpsilon);
map.put("adam_beta1", adamBeta1);
map.put("adam_beta2", adamBeta2);
map.put("max_steps", maxSteps);
map.put("seed", seed);
map.put("fp16", fp16);
map.put("bf16", bf16);
return map;
}
}
/**
* 训练指标数据类
*/
@Data
@Builder
class TrainingMetrics {
private Double finalLoss;
private Double bestLoss;
private Double learningRate;
private Integer totalSteps;
private Integer totalSamples;
private Map<Integer, Double> epochLosses;
private Map<Integer, Double> stepLosses;
private Map<String, Double> evaluationScores;
private Map<String, Object> hardwareMetrics;
public String toJson() {
return JsonUtils.toJson(this);
}
public static TrainingMetrics fromJson(String json) {
return JsonUtils.fromJson(json, TrainingMetrics.class);
}
public static TrainingMetrics fromPythonResult(Map<String, Object> pythonResult) {
return TrainingMetrics.builder()
.finalLoss(Double.valueOf(pythonResult.get("final_loss").toString()))
.bestLoss(Double.valueOf(pythonResult.get("best_loss").toString()))
.learningRate(Double.valueOf(pythonResult.get("learning_rate").toString()))
.totalSteps(Integer.valueOf(pythonResult.get("total_steps").toString()))
.totalSamples(Integer.valueOf(pythonResult.get("total_samples").toString()))
.epochLosses((Map<Integer, Double>) pythonResult.get("epoch_losses"))
.stepLosses((Map<Integer, Double>) pythonResult.get("step_losses"))
.evaluationScores((Map<String, Double>) pythonResult.get("evaluation_scores"))
.hardwareMetrics((Map<String, Object>) pythonResult.get("hardware_metrics"))
.build();
}
}
/**
* 训练任务状态数据类
*/
@Data
@Builder
class TrainingJobStatus {
private String jobId;
private TrainingJob.JobStatus status;
private Integer progress;
private Integer currentEpoch;
private Integer totalEpochs;
private LocalDateTime startTime;
private Long trainingDuration;
private TrainingMetrics metrics;
private String errorMessage;
}
/**
* 训练日志数据类
*/
@Data
@Builder
class TrainingLogs {
private String jobId;
private List<String> logs;
private Long totalLines;
}
模型评估服务实现
package com.company.llmfinetuning.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.math.BigDecimal;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* 模型评估服务
* 负责对微调后的模型进行自动化和人工评估,生成评估报告
*/
@Service
@Slf4j
public class ModelEvaluationService {
private final PythonIntegrationService pythonIntegrationService;
private final ModelEvaluationRepository evaluationRepository;
private final ModelRepository modelRepository;
private final TrainingDatasetRepository datasetRepository;
public ModelEvaluationService(PythonIntegrationService pythonIntegrationService,
ModelEvaluationRepository evaluationRepository,
ModelRepository modelRepository,
TrainingDatasetRepository datasetRepository) {
this.pythonIntegrationService = pythonIntegrationService;
this.evaluationRepository = evaluationRepository;
this.modelRepository = modelRepository;
this.datasetRepository = datasetRepository;
}
/**
* 执行自动化模型评估
*
* @param evaluationRequest 评估请求
* @return 评估结果
*/
public ModelEvaluation executeAutomaticEvaluation(EvaluationRequest evaluationRequest) {
log.info("开始自动化模型评估,模型: {}, 数据集: {}",
evaluationRequest.getModelId(), evaluationRequest.getDatasetId());
long startTime = System.currentTimeMillis();
try {
// 1. 创建评估记录
ModelEvaluation evaluation = createEvaluationRecord(evaluationRequest,
ModelEvaluation.EvaluationType.AUTOMATIC);
// 2. 执行自动化评估
AutomaticEvaluationResult autoResult = performAutomaticEvaluation(evaluationRequest);
// 3. 更新评估结果
updateEvaluationWithAutomaticResult(evaluation, autoResult);
// 4. 更新模型评估指标
updateModelEvaluationMetrics(evaluationRequest.getModelId(), autoResult);
long processingTime = System.currentTimeMillis() - startTime;
evaluation.setEvaluationDuration(processingTime / 1000);
log.info("自动化评估完成,评估ID: {}, 总体评分: {}",
evaluation.getEvaluationId(), evaluation.getOverallScore());
return evaluation;
} catch (Exception e) {
log.error("自动化评估失败: {}", e.getMessage(), e);
throw new ModelEvaluationException("自动化评估失败: " + e.getMessage(), e);
}
}
/**
* 执行人工评估
*/
public ModelEvaluation executeManualEvaluation(ManualEvaluationRequest request) {
log.info("开始人工模型评估,模型: {}, 评估人: {}",
request.getModelId(), request.getEvaluator());
try {
// 1. 创建评估记录
ModelEvaluation evaluation = createEvaluationRecord(
EvaluationRequest.builder()
.modelId(request.getModelId())
.datasetId(request.getDatasetId())
.evaluatedBy(request.getEvaluator())
.build(),
ModelEvaluation.EvaluationType.MANUAL
);
// 2. 执行人工评估
ManualEvaluationResult manualResult = performManualEvaluation(request);
// 3. 更新评估结果
updateEvaluationWithManualResult(evaluation, manualResult);
log.info("人工评估完成,评估ID: {}, 总体评分: {}",
evaluation.getEvaluationId(), evaluation.getOverallScore());
return evaluation;
} catch (Exception e) {
log.error("人工评估失败: {}", e.getMessage(), e);
throw new ModelEvaluationException("人工评估失败: " + e.getMessage(), e);
}
}
/**
* 执行对比评估
*/
public ComparativeEvaluationResult executeComparativeEvaluation(ComparativeEvaluationRequest request) {
log.info("开始对比评估,基准模型: {}, 对比模型数量: {}",
request.getBaselineModelId(), request.getComparisonModelIds().size());
try {
ComparativeEvaluationResult result = ComparativeEvaluationResult.builder()
.baselineModelId(request.getBaselineModelId())
.comparisonResults(new HashMap<>())
.startTime(LocalDateTime.now())
.build();
// 评估基准模型
EvaluationRequest baselineRequest = EvaluationRequest.builder()
.modelId(request.getBaselineModelId())
.datasetId(request.getDatasetId())
.evaluatedBy(request.getEvaluatedBy())
.build();
ModelEvaluation baselineEvaluation = executeAutomaticEvaluation(baselineRequest);
result.setBaselineEvaluation(baselineEvaluation);
// 评估对比模型
for (String modelId : request.getComparisonModelIds()) {
try {
EvaluationRequest comparisonRequest = EvaluationRequest.builder()
.modelId(modelId)
.datasetId(request.getDatasetId())
.evaluatedBy(request.getEvaluatedBy())
.build();
ModelEvaluation comparisonEvaluation = executeAutomaticEvaluation(comparisonRequest);
result.getComparisonResults().put(modelId, comparisonEvaluation);
} catch (Exception e) {
log.warn("对比模型 {} 评估失败: {}", modelId, e.getMessage());
result.getFailedModels().add(modelId);
}
}
result.setEndTime(LocalDateTime.now());
result.calculateComparativeMetrics();
log.info("对比评估完成,成功对比模型: {}", result.getComparisonResults().size());
return result;
} catch (Exception e) {
log.error("对比评估失败: {}", e.getMessage(), e);
throw new ModelEvaluationException("对比评估失败: " + e.getMessage(), e);
}
}
/**
* 创建评估记录
*/
private ModelEvaluation createEvaluationRecord(EvaluationRequest request,
ModelEvaluation.EvaluationType evaluationType) {
ModelEvaluation evaluation = ModelEvaluation.builder()
.evaluationId(generateEvaluationId())
.model(modelRepository.findByModelId(request.getModelId())
.orElseThrow(() -> new ResourceNotFoundException("模型不存在: " + request.getModelId())))
.dataset(datasetRepository.findByDatasetId(request.getDatasetId())
.orElseThrow(() -> new ResourceNotFoundException("数据集不存在: " + request.getDatasetId())))
.evaluationType(evaluationType)
.status(ModelEvaluation.EvaluationStatus.RUNNING)
.startTime(LocalDateTime.now())
.evaluatedBy(request.getEvaluatedBy())
.build();
return evaluationRepository.save(evaluation);
}
/**
* 执行自动化评估
*/
private AutomaticEvaluationResult performAutomaticEvaluation(EvaluationRequest request) {
log.debug("执行自动化评估,模型: {}", request.getModelId());
try {
// 准备评估参数
Map<String, Object> evalParams = prepareEvaluationParameters(request);
// 调用Python评估脚本
Map<String, Object> pythonResult = pythonIntegrationService.executePythonScript(
"model_evaluation.py",
evalParams
);
if (!Boolean.TRUE.equals(pythonResult.get("success"))) {
throw new ModelEvaluationException("自动化评估执行失败: " + pythonResult.get("error"));
}
AutomaticEvaluationResult result = AutomaticEvaluationResult.fromPythonResult(pythonResult);
log.debug("自动化评估完成,评分: {}", result.getOverallScore());
return result;
} catch (Exception e) {
log.error("自动化评估执行异常: {}", e.getMessage(), e);
throw new ModelEvaluationException("自动化评估执行异常: " + e.getMessage(), e);
}
}
/**
* 执行人工评估
*/
private ManualEvaluationResult performManualEvaluation(ManualEvaluationRequest request) {
log.debug("执行人工评估,模型: {}", request.getModelId());
try {
// 生成评估样本
List<EvaluationSample> evaluationSamples = generateEvaluationSamples(request);
// 这里可以集成人工评估界面,或者等待人工输入
// 简化实现:模拟人工评估结果
ManualEvaluationResult result = simulateManualEvaluation(evaluationSamples, request.getEvaluator());
log.debug("人工评估完成,评估样本数: {}", evaluationSamples.size());
return result;
} catch (Exception e) {
log.error("人工评估执行异常: {}", e.getMessage(), e);
throw new ModelEvaluationException("人工评估执行异常: " + e.getMessage(), e);
}
}
/**
* 更新自动化评估结果
*/
private void updateEvaluationWithAutomaticResult(ModelEvaluation evaluation,
AutomaticEvaluationResult result) {
evaluation.setStatus(ModelEvaluation.EvaluationStatus.COMPLETED);
evaluation.setEndTime(LocalDateTime.now());
evaluation.setOverallScore(BigDecimal.valueOf(result.getOverallScore()));
evaluation.setMetrics(result.getMetricsJson());
evaluation.setEvaluationReport(result.getEvaluationReport());
evaluationRepository.save(evaluation);
}
/**
* 更新人工评估结果
*/
private void updateEvaluationWithManualResult(ModelEvaluation evaluation,
ManualEvaluationResult result) {
evaluation.setStatus(ModelEvaluation.EvaluationStatus.COMPLETED);
evaluation.setEndTime(LocalDateTime.now());
evaluation.setOverallScore(BigDecimal.valueOf(result.getOverallScore()));
evaluation.setMetrics(result.getMetricsJson());
evaluation.setEvaluationReport(result.getEvaluationReport());
evaluationRepository.save(evaluation);
}
/**
* 更新模型评估指标
*/
private void updateModelEvaluationMetrics(String modelId, AutomaticEvaluationResult result) {
DomainModel model = modelRepository.findByModelId(modelId)
.orElseThrow(() -> new ResourceNotFoundException("模型不存在: " + modelId));
// 合并现有的评估指标
ModelEvaluationMetrics existingMetrics = ModelEvaluationMetrics.fromJson(model.getEvaluationMetrics());
ModelEvaluationMetrics newMetrics = ModelEvaluationMetrics.fromAutomaticResult(result);
ModelEvaluationMetrics mergedMetrics = existingMetrics != null ?
existingMetrics.merge(newMetrics) : newMetrics;
model.setEvaluationMetrics(mergedMetrics.toJson());
model.setLastUpdatedTime(LocalDateTime.now());
modelRepository.save(model);
log.debug("模型评估指标已更新: {}", modelId);
}
/**
* 准备评估参数
*/
private Map<String, Object> prepareEvaluationParameters(EvaluationRequest request) {
Map<String, Object> params = new HashMap<>();
DomainModel model = modelRepository.findByModelId(request.getModelId())
.orElseThrow(() -> new ResourceNotFoundException("模型不存在: " + request.getModelId()));
TrainingDataset dataset = datasetRepository.findByDatasetId(request.getDatasetId())
.orElseThrow(() -> new ResourceNotFoundException("数据集不存在: " + request.getDatasetId()));
params.put("model_path", model.getModelPath());
params.put("model_name", model.getModelName());
params.put("base_model", model.getBaseModel());
params.put("dataset_path", dataset.getFilePath());
params.put("evaluation_id", generateEvaluationId());
params.put("domain_type", model.getDomain().name());
// 评估配置
EvaluationConfig evalConfig = EvaluationConfig.defaultConfig();
params.put("evaluation_config", evalConfig.toMap());
return params;
}
/**
* 生成评估样本
*/
private List<EvaluationSample> generateEvaluationSamples(ManualEvaluationRequest request) {
try {
Map<String, Object> pythonResult = pythonIntegrationService.executePythonScript(
"sample_generation.py",
Map.of(
"model_id", request.getModelId(),
"dataset_id", request.getDatasetId(),
"sample_count", request.getSampleCount(),
"evaluation_type", "MANUAL"
)
);
if (Boolean.TRUE.equals(pythonResult.get("success"))) {
return (List<EvaluationSample>) pythonResult.get("samples");
} else {
throw new ModelEvaluationException("生成评估样本失败: " + pythonResult.get("error"));
}
} catch (Exception e) {
log.error("生成评估样本异常: {}", e.getMessage(), e);
throw new ModelEvaluationException("生成评估样本失败: " + e.getMessage(), e);
}
}
/**
* 模拟人工评估(简化实现)
*/
private ManualEvaluationResult simulateManualEvaluation(List<EvaluationSample> samples, String evaluator) {
// 在实际系统中,这里应该集成人工评估界面
// 简化实现:为每个样本生成模拟评分
Map<String, Double> categoryScores = new HashMap<>();
categoryScores.put("relevance", 4.2);
categoryScores.put("accuracy", 4.5);
categoryScores.put("coherence", 4.0);
categoryScores.put("fluency", 4.3);
double overallScore = categoryScores.values().stream()
.mapToDouble(Double::doubleValue)
.average()
.orElse(0.0);
ManualEvaluationReport report = ManualEvaluationReport.builder()
.evaluator(evaluator)
.evaluationDate(LocalDateTime.now())
.totalSamples(samples.size())
.categoryScores(categoryScores)
.sampleEvaluations(new ArrayList<>())
.overallComments("模型在法律领域表现良好,但在某些复杂案例上仍有改进空间")
.build();
return ManualEvaluationResult.builder()
.overallScore(overallScore)
.categoryScores(categoryScores)
.evaluationReport(report.toJson())
.build();
}
/**
* 获取评估报告
*/
public EvaluationReport getEvaluationReport(String evaluationId) {
log.debug("获取评估报告,评估ID: {}", evaluationId);
ModelEvaluation evaluation = evaluationRepository.findByEvaluationId(evaluationId)
.orElseThrow(() -> new ResourceNotFoundException("评估记录不存在: " + evaluationId));
return EvaluationReport.builder()
.evaluationId(evaluationId)
.modelId(evaluation.getModel().getModelId())
.modelName(evaluation.getModel().getModelName())
.datasetName(evaluation.getDataset().getDatasetName())
.evaluationType(evaluation.getEvaluationType())
.overallScore(evaluation.getOverallScore())
.metrics(ModelEvaluationMetrics.fromJson(evaluation.getMetrics()))
.reportContent(evaluation.getEvaluationReport())
.startTime(evaluation.getStartTime())
.endTime(evaluation.getEndTime())
.evaluationDuration(evaluation.getEvaluationDuration())
.evaluatedBy(evaluation.getEvaluatedBy())
.build();
}
private String generateEvaluationId() {
return "EVAL_" + System.currentTimeMillis() + "_" +
UUID.randomUUID().toString().substring(0, 8);
}
}
/**
* 评估请求数据类
*/
@Data
@Builder
class EvaluationRequest {
private String modelId;
private String datasetId;
private String evaluatedBy;
}
/**
* 人工评估请求数据类
*/
@Data
@Builder
class ManualEvaluationRequest {
private String modelId;
private String datasetId;
private String evaluator;
private Integer sampleCount;
private Map<String, Object> evaluationCriteria;
}
/**
* 对比评估请求数据类
*/
@Data
@Builder
class ComparativeEvaluationRequest {
private String baselineModelId;
private List<String> comparisonModelIds;
private String datasetId;
private String evaluatedBy;
private String comparisonCriteria;
}
/**
* 自动化评估结果数据类
*/
@Data
@Builder
class AutomaticEvaluationResult {
private Double overallScore;
private Map<String, Double> metricScores;
private String metricsJson;
private String evaluationReport;
private List<String> strengths;
private List<String> weaknesses;
private Map<String, Object> detailedResults;
public static AutomaticEvaluationResult fromPythonResult(Map<String, Object> pythonResult) {
return AutomaticEvaluationResult.builder()
.overallScore(Double.valueOf(pythonResult.get("overall_score").toString()))
.metricScores((Map<String, Double>) pythonResult.get("metric_scores"))
.metricsJson((String) pythonResult.get("metrics_json"))
.evaluationReport((String) pythonResult.get("evaluation_report"))
.strengths((List<String>) pythonResult.get("strengths"))
.weaknesses((List<String>) pythonResult.get("weaknesses"))
.detailedResults((Map<String, Object>) pythonResult.get("detailed_results"))
.build();
}
}
/**
* 人工评估结果数据类
*/
@Data
@Builder
class ManualEvaluationResult {
private Double overallScore;
private Map<String, Double> categoryScores;
private String evaluationReport;
private List<SampleEvaluation> sampleEvaluations;
}
/**
* 对比评估结果数据类
*/
@Data
@Builder
class ComparativeEvaluationResult {
private String baselineModelId;
private ModelEvaluation baselineEvaluation;
private Map<String, ModelEvaluation> comparisonResults;
private List<String> failedModels;
private LocalDateTime startTime;
private LocalDateTime endTime;
private Map<String, Object> comparativeMetrics;
public void calculateComparativeMetrics() {
comparativeMetrics = new HashMap<>();
if (baselineEvaluation != null && comparisonResults != null) {
double baselineScore = baselineEvaluation.getOverallScore().doubleValue();
Map<String, Double> improvementScores = new HashMap<>();
for (Map.Entry<String, ModelEvaluation> entry : comparisonResults.entrySet()) {
double comparisonScore = entry.getValue().getOverallScore().doubleValue();
double improvement = comparisonScore - baselineScore;
improvementScores.put(entry.getKey(), improvement);
}
comparativeMetrics.put("baseline_score", baselineScore);
comparativeMetrics.put("improvement_scores", improvementScores);
comparativeMetrics.put("best_model", findBestModel(improvementScores));
}
}
private String findBestModel(Map<String, Double> improvementScores) {
return improvementScores.entrySet().stream()
.max(Map.Entry.comparingByValue())
.map(Map.Entry::getKey)
.orElse(null);
}
}
/**
* 评估报告数据类
*/
@Data
@Builder
class EvaluationReport {
private String evaluationId;
private String modelId;
private String modelName;
private String datasetName;
private ModelEvaluation.EvaluationType evaluationType;
private BigDecimal overallScore;
private ModelEvaluationMetrics metrics;
private String reportContent;
private LocalDateTime startTime;
private LocalDateTime endTime;
private Long evaluationDuration;
private String evaluatedBy;
}
/**
* 模型评估指标数据类
*/
@Data
@Builder
class ModelEvaluationMetrics {
private Double accuracy;
private Double precision;
private Double recall;
private Double f1Score;
private Double perplexity;
private Double bleuScore;
private Double rougeScore;
private Map<String, Double> domainSpecificMetrics;
private LocalDateTime evaluationTime;
public String toJson() {
return JsonUtils.toJson(this);
}
public static ModelEvaluationMetrics fromJson(String json) {
if (json == null || json.trim().isEmpty()) {
return null;
}
return JsonUtils.fromJson(json, ModelEvaluationMetrics.class);
}
public static ModelEvaluationMetrics fromAutomaticResult(AutomaticEvaluationResult result) {
return ModelEvaluationMetrics.builder()
.accuracy(result.getMetricScores().get("accuracy"))
.precision(result.getMetricScores().get("precision"))
.recall(result.getMetricScores().get("recall"))
.f1Score(result.getMetricScores().get("f1_score"))
.perplexity(result.getMetricScores().get("perplexity"))
.bleuScore(result.getMetricScores().get("bleu_score"))
.rougeScore(result.getMetricScores().get("rouge_score"))
.domainSpecificMetrics(result.getMetricScores())
.evaluationTime(LocalDateTime.now())
.build();
}
public ModelEvaluationMetrics merge(ModelEvaluationMetrics other) {
// 实现指标合并逻辑,例如计算平均值等
return this; // 简化实现
}
}
测试用例
package com.company.llmfinetuning.service;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.util.List;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;
/**
* 数据预处理服务测试类
*/
@ExtendWith(MockitoExtension.class)
class DataPreprocessingServiceTest {
@Mock
private PythonIntegrationService pythonIntegrationService;
@Mock
private FileStorageService fileStorageService;
@Mock
private DataQualityService dataQualityService;
@Mock
private TrainingDatasetRepository trainingDatasetRepository;
@InjectMocks
private DataPreprocessingService dataPreprocessingService;
private TrainingDataset testDataset;
private PreprocessingConfig testConfig;
@BeforeEach
void setUp() {
testDataset = TrainingDataset.builder()
.datasetId("DS-001")
.datasetName("法律合同数据集")
.domain(DomainModel.DomainType.LEGAL)
.filePath("/data/raw/legal_contracts.jsonl")
.fileFormat("JSONL")
.totalSize(1024L * 1024L) // 1MB
.sampleCount(1000L)
.status(TrainingDataset.DatasetStatus.COLLECTING)
.build();
testConfig = PreprocessingConfig.builder()
.domainType(DomainModel.DomainType.LEGAL)
.trainingType(TrainingJob.TrainingType.LORA)
.dataAugmentationEnabled(true)
.build();
}
@Test
@DisplayName("数据集预处理 - 成功案例")
void testPreprocessDataset_Success() {
// 模拟Python脚本执行结果
Map<String, Object> loadingResult = Map.of(
"success", true,
"sample_count", 1000L,
"total_size", 1024L * 1024L,
"raw_data", "mock_raw_data"
);
Map<String, Object> cleaningResult = Map.of(
"success", true,
"retained_samples", 950L,
"filtered_samples", 50L,
"cleaned_data", "mock_cleaned_data"
);
Map<String, Object> formattingResult = Map.of(
"success", true,
"total_samples", 950L,
"training_samples", 760L,
"validation_samples", 190L,
"average_length", 256.5,
"formatted_data", "mock_formatted_data"
);
Map<String, Object> augmentationResult = Map.of(
"success", true,
"original_samples", 760L,
"augmented_samples", 1520L,
"augmentation_factor", 2.0
);
Map<String, Object> savingResult = Map.of(
"success", true,
"output_path", "/data/preprocessed/ds_001_processed.jsonl"
);
when(pythonIntegrationService.executePythonScript(eq("data_loading.py"), anyMap()))
.thenReturn(loadingResult);
when(pythonIntegrationService.executePythonScript(eq("data_cleaning.py"), anyMap()))
.thenReturn(cleaningResult);
when(pythonIntegrationService.executePythonScript(eq("data_formatting.py"), anyMap()))
.thenReturn(formattingResult);
when(pythonIntegrationService.executePythonScript(eq("data_augmentation.py"), anyMap()))
.thenReturn(augmentationResult);
when(pythonIntegrationService.executePythonScript(eq("data_saving.py"), anyMap()))
.thenReturn(savingResult);
QualityCheckResult qualityResult = QualityCheckResult.builder()
.qualityPassed(true)
.overallQualityScore(new BigDecimal("4.5"))
.build();
when(dataQualityService.checkDataQuality(any(), any())).thenReturn(qualityResult);
// 执行测试
PreprocessingResult result = dataPreprocessingService.preprocessDataset(testDataset, testConfig);
// 验证结果
assertNotNull(result);
assertTrue(result.isSuccess());
assertEquals("/data/preprocessed/ds_001_processed.jsonl", result.getOutputPath());
assertEquals(new BigDecimal("4.5"), result.getQualityScore());
assertTrue(result.getProcessingTime() > 0);
assertEquals(5, result.getSteps().size()); // 5个处理步骤
// 验证依赖调用
verify(pythonIntegrationService, times(5)).executePythonScript(anyString(), anyMap());
verify(dataQualityService).checkDataQuality(any(), any());
verify(trainingDatasetRepository).save(testDataset);
}
@Test
@DisplayName("数据集预处理 - 数据加载失败")
void testPreprocessDataset_DataLoadingFailure() {
// 模拟数据加载失败
Map<String, Object> loadingResult = Map.of(
"success", false,
"error", "文件格式不支持"
);
when(pythonIntegrationService.executePythonScript(eq("data_loading.py"), anyMap()))
.thenReturn(loadingResult);
// 执行测试并验证异常
assertThrows(DataPreprocessingException.class, () -> {
dataPreprocessingService.preprocessDataset(testDataset, testConfig);
});
// 验证后续步骤没有被调用
verify(pythonIntegrationService, times(1)).executePythonScript(anyString(), anyMap());
verify(dataQualityService, never()).checkDataQuality(any(), any());
}
@Test
@DisplayName("批量数据集预处理 - 部分成功")
void testBatchPreprocessDatasets_PartialSuccess() {
// 准备测试数据
List<TrainingDataset> datasets = List.of(testDataset);
// 模拟预处理成功
Map<String, Object> successfulResult = Map.of(
"success", true,
"sample_count", 1000L,
"total_size", 1024L * 1024L
);
when(pythonIntegrationService.executePythonScript(anyString(), anyMap()))
.thenReturn(successfulResult);
QualityCheckResult qualityResult = QualityCheckResult.builder()
.qualityPassed(true)
.overallQualityScore(new BigDecimal("4.5"))
.build();
when(dataQualityService.checkDataQuality(any(), any())).thenReturn(qualityResult);
// 执行测试
BatchPreprocessingResult batchResult = dataPreprocessingService.batchPreprocessDatasets(
datasets, testConfig);
// 验证结果
assertNotNull(batchResult);
assertEquals(1, batchResult.getTotalDatasets());
assertEquals(1, batchResult.getSuccessCount());
assertEquals(0, batchResult.getFailedCount());
assertNotNull(batchResult.getResults().get(testDataset.getDatasetId()));
}
}
/**
* 模型训练服务测试类
*/
@ExtendWith(MockitoExtension.class)
class ModelTrainingServiceTest {
@Mock
private PythonIntegrationService pythonIntegrationService;
@Mock
private TrainingJobRepository trainingJobRepository;
@Mock
private ModelRepository modelRepository;
@Mock
private TrainingMonitorService trainingMonitorService;
@Mock
private ResourceManagerService resourceManagerService;
@InjectMocks
private ModelTrainingService modelTrainingService;
private DomainModel testModel;
private TrainingDataset testDataset;
private TrainingRequest testRequest;
@BeforeEach
void setUp() {
testModel = DomainModel.builder()
.modelId("MODEL-001")
.modelName("法律合同审查模型")
.domain(DomainModel.DomainType.LEGAL)
.baseModel("chatglm3-6b")
.version("1.0")
.status(DomainModel.ModelStatus.DRAFT)
.build();
testDataset = TrainingDataset.builder()
.datasetId("DS-001")
.datasetName("法律合同数据集")
.status(TrainingDataset.DatasetStatus.READY)
.build();
testRequest = TrainingRequest.builder()
.modelId("MODEL-001")
.datasetId("DS-001")
.jobName("法律模型微调训练")
.trainingType(TrainingJob.TrainingType.LORA)
.trainingConfig(TrainingConfig.builder()
.epochs(10)
.batchSize(4)
.learningRate(1e-4)
.build())
.hyperparameters(Hyperparameters.builder()
.optimizer("adamw")
.scheduler("cosine")
.build())
.createdBy("test-user")
.build();
}
@Test
@DisplayName("创建训练任务 - 成功案例")
void testCreateTrainingJob_Success() {
// 模拟资源分配成功
ResourceAllocation resourceAllocation = ResourceAllocation.builder()
.success(true)
.gpuCount(4)
.memoryGB(32)
.build();
when(resourceManagerService.allocateTrainingResources(any())).thenReturn(resourceAllocation);
when(modelRepository.findByModelId("MODEL-001")).thenReturn(java.util.Optional.of(testModel));
when(trainingDatasetRepository.findByDatasetId("DS-001")).thenReturn(java.util.Optional.of(testDataset));
when(trainingJobRepository.save(any(TrainingJob.class))).thenAnswer(invocation -> invocation.getArgument(0));
// 执行测试
TrainingJob trainingJob = modelTrainingService.createTrainingJob(testRequest);
// 验证结果
assertNotNull(trainingJob);
assertEquals("MODEL-001", trainingJob.getModel().getModelId());
assertEquals("DS-001", trainingJob.getDataset().getDatasetId());
assertEquals(TrainingJob.JobStatus.PENDING, trainingJob.getStatus());
assertEquals(0, trainingJob.getProgress());
// 验证依赖调用
verify(resourceManagerService).allocateTrainingResources(any());
verify(modelRepository).findByModelId("MODEL-001");
verify(trainingDatasetRepository).findByDatasetId("DS-001");
verify(trainingJobRepository).save(any(TrainingJob.class));
}
@Test
@DisplayName("创建训练任务 - 资源分配失败")
void testCreateTrainingJob_ResourceAllocationFailure() {
// 模拟资源分配失败
ResourceAllocation resourceAllocation = ResourceAllocation.builder()
.success(false)
.message("GPU资源不足")
.build();
when(resourceManagerService.allocateTrainingResources(any())).thenReturn(resourceAllocation);
// 执行测试并验证异常
assertThrows(TrainingResourceException.class, () -> {
modelTrainingService.createTrainingJob(testRequest);
});
// 验证训练任务没有被创建
verify(trainingJobRepository, never()).save(any(TrainingJob.class));
}
@Test
@DisplayName("获取训练任务状态 - 成功案例")
void testGetTrainingStatus_Success() {
// 准备测试数据
TrainingJob trainingJob = TrainingJob.builder()
.jobId("JOB-001")
.model(testModel)
.dataset(testDataset)
.status(TrainingJob.JobStatus.RUNNING)
.progress(50)
.currentEpoch(5)
.totalEpochs(10)
.startTime(LocalDateTime.now().minusHours(1))
.trainingDuration(3600L)
.trainingMetrics("{\"final_loss\": 0.1, \"best_loss\": 0.05}")
.build();
when(trainingJobRepository.findByJobId("JOB-001")).thenReturn(java.util.Optional.of(trainingJob));
// 执行测试
TrainingJobStatus status = modelTrainingService.getTrainingStatus("JOB-001");
// 验证结果
assertNotNull(status);
assertEquals("JOB-001", status.getJobId());
assertEquals(TrainingJob.JobStatus.RUNNING, status.getStatus());
assertEquals(50, status.getProgress());
assertEquals(5, status.getCurrentEpoch());
assertEquals(10, status.getTotalEpochs());
assertNotNull(status.getMetrics());
}
}
/**
* 模型评估服务测试类
*/
@ExtendWith(MockitoExtension.class)
class ModelEvaluationServiceTest {
@Mock
private PythonIntegrationService pythonIntegrationService;
@Mock
private ModelEvaluationRepository evaluationRepository;
@Mock
private ModelRepository modelRepository;
@Mock
private TrainingDatasetRepository datasetRepository;
@InjectMocks
private ModelEvaluationService modelEvaluationService;
private DomainModel testModel;
private TrainingDataset testDataset;
@BeforeEach
void setUp() {
testModel = DomainModel.builder()
.modelId("MODEL-001")
.modelName("法律合同审查模型")
.modelPath("/models/model_001")
.baseModel("chatglm3-6b")
.build();
testDataset = TrainingDataset.builder()
.datasetId("DS-001")
.datasetName("法律评估数据集")
.filePath("/data/evaluation/legal_eval.jsonl")
.build();
}
@Test
@DisplayName("自动化模型评估 - 成功案例")
void testExecuteAutomaticEvaluation_Success() {
// 准备测试数据
EvaluationRequest request = EvaluationRequest.builder()
.modelId("MODEL-001")
.datasetId("DS-001")
.evaluatedBy("system")
.build();
// 模拟Python评估结果
Map<String, Object> pythonResult = Map.of(
"success", true,
"overall_score", 85.5,
"metric_scores", Map.of(
"accuracy", 0.87,
"precision", 0.85,
"recall", 0.82,
"f1_score", 0.835
),
"evaluation_report", "自动化评估报告内容"
);
when(modelRepository.findByModelId("MODEL-001")).thenReturn(java.util.Optional.of(testModel));
when(datasetRepository.findByDatasetId("DS-001")).thenReturn(java.util.Optional.of(testDataset));
when(pythonIntegrationService.executePythonScript(eq("model_evaluation.py"), anyMap()))
.thenReturn(pythonResult);
when(evaluationRepository.save(any(ModelEvaluation.class))).thenAnswer(invocation -> invocation.getArgument(0));
// 执行测试
ModelEvaluation evaluation = modelEvaluationService.executeAutomaticEvaluation(request);
// 验证结果
assertNotNull(evaluation);
assertEquals("MODEL-001", evaluation.getModel().getModelId());
assertEquals("DS-001", evaluation.getDataset().getDatasetId());
assertEquals(ModelEvaluation.EvaluationType.AUTOMATIC, evaluation.getEvaluationType());
assertEquals(ModelEvaluation.EvaluationStatus.COMPLETED, evaluation.getStatus());
assertEquals(new BigDecimal("85.5"), evaluation.getOverallScore());
// 验证依赖调用
verify(pythonIntegrationService).executePythonScript(eq("model_evaluation.py"), anyMap());
verify(evaluationRepository, atLeastOnce()).save(any(ModelEvaluation.class));
verify(modelRepository).save(testModel);
}
@Test
@DisplayName("对比模型评估 - 成功案例")
void testExecuteComparativeEvaluation_Success() {
// 准备测试数据
ComparativeEvaluationRequest request = ComparativeEvaluationRequest.builder()
.baselineModelId("MODEL-BASE")
.comparisonModelIds(List.of("MODEL-001", "MODEL-002"))
.datasetId("DS-001")
.evaluatedBy("system")
.build();
// 模拟基准模型评估
DomainModel baselineModel = DomainModel.builder()
.modelId("MODEL-BASE")
.modelName("基准模型")
.modelPath("/models/model_base")
.build();
when(modelRepository.findByModelId("MODEL-BASE")).thenReturn(java.util.Optional.of(baselineModel));
when(modelRepository.findByModelId("MODEL-001")).thenReturn(java.util.Optional.of(testModel));
when(datasetRepository.findByDatasetId("DS-001")).thenReturn(java.util.Optional.of(testDataset));
// 模拟Python评估结果
Map<String, Object> pythonResult = Map.of(
"success", true,
"overall_score", 80.0,
"metric_scores", Map.of("accuracy", 0.8),
"evaluation_report", "评估报告"
);
when(pythonIntegrationService.executePythonScript(eq("model_evaluation.py"), anyMap()))
.thenReturn(pythonResult);
when(evaluationRepository.save(any(ModelEvaluation.class))).thenAnswer(invocation -> invocation.getArgument(0));
// 执行测试
ComparativeEvaluationResult result = modelEvaluationService.executeComparativeEvaluation(request);
// 验证结果
assertNotNull(result);
assertEquals("MODEL-BASE", result.getBaselineModelId());
assertEquals(2, result.getComparisonResults().size()); // 两个对比模型
assertNotNull(result.getComparativeMetrics());
// 验证评估次数
verify(pythonIntegrationService, times(3)).executePythonScript(eq("model_evaluation.py"), anyMap());
}
}
/**
* 集成测试 - 完整微调流程
*/
@SpringBootTest
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class LLMFinetuningIntegrationTest {
@Autowired
private DataPreprocessingService dataPreprocessingService;
@Autowired
private ModelTrainingService modelTrainingService;
@Autowired
private ModelEvaluationService modelEvaluationService;
@Autowired
private ModelRepository modelRepository;
@Autowired
private TrainingDatasetRepository datasetRepository;
@Autowired
private TrainingJobRepository trainingJobRepository;
@Autowired
private TestDataBuilder testDataBuilder;
private DomainModel testModel;
private TrainingDataset testDataset;
@BeforeAll
void setUp() {
// 初始化测试数据
testModel = testDataBuilder.createTestModel();
testDataset = testDataBuilder.createTestDataset();
modelRepository.save(testModel);
datasetRepository.save(testDataset);
}
@Test
@DisplayName("完整微调流程集成测试")
void testCompleteFinetuningWorkflow() {
// 1. 数据预处理
PreprocessingConfig preprocessingConfig = PreprocessingConfig.builder()
.domainType(DomainModel.DomainType.LEGAL)
.trainingType(TrainingJob.TrainingType.LORA)
.dataAugmentationEnabled(true)
.build();
PreprocessingResult preprocessingResult = dataPreprocessingService.preprocessDataset(
testDataset, preprocessingConfig);
assertTrue(preprocessingResult.isSuccess(), "数据预处理应该成功");
assertNotNull(preprocessingResult.getOutputPath(), "预处理输出路径不应为空");
// 2. 创建训练任务
TrainingRequest trainingRequest = TrainingRequest.builder()
.modelId(testModel.getModelId())
.datasetId(testDataset.getDatasetId())
.jobName("集成测试训练任务")
.trainingType(TrainingJob.TrainingType.LORA)
.trainingConfig(TrainingConfig.builder()
.epochs(3) // 简化测试,使用较少的轮次
.batchSize(2)
.learningRate(1e-4)
.build())
.hyperparameters(Hyperparameters.builder()
.optimizer("adamw")
.scheduler("linear")
.build())
.createdBy("integration-test")
.build();
TrainingJob trainingJob = modelTrainingService.createTrainingJob(trainingRequest);
assertNotNull(trainingJob, "训练任务不应为空");
assertEquals(testModel.getModelId(), trainingJob.getModel().getModelId());
// 3. 检查训练任务状态
TrainingJobStatus trainingStatus = modelTrainingService.getTrainingStatus(trainingJob.getJobId());
assertNotNull(trainingStatus, "训练状态不应为空");
// 4. 模型评估(在实际系统中,这里应该等待训练完成)
EvaluationRequest evaluationRequest = EvaluationRequest.builder()
.modelId(testModel.getModelId())
.datasetId(testDataset.getDatasetId())
.evaluatedBy("system")
.build();
// 注意:在实际集成测试中,需要等待训练完成后再进行评估
// 这里简化处理,直接测试评估流程
try {
ModelEvaluation evaluation = modelEvaluationService.executeAutomaticEvaluation(evaluationRequest);
assertNotNull(evaluation, "模型评估不应为空");
assertNotNull(evaluation.getOverallScore(), "评估分数不应为空");
} catch (Exception e) {
log.warn("模型评估跳过(训练未完成): {}", e.getMessage());
}
// 5. 验证数据一致性
DomainModel updatedModel = modelRepository.findByModelId(testModel.getModelId())
.orElseThrow();
assertNotNull(updatedModel.getLastUpdatedTime(), "模型最后更新时间不应为空");
TrainingDataset updatedDataset = datasetRepository.findByDatasetId(testDataset.getDatasetId())
.orElseThrow();
assertEquals(TrainingDataset.DatasetStatus.READY, updatedDataset.getStatus(), "数据集状态应该为就绪");
log.info("完整微调流程集成测试完成");
}
@Test
@DisplayName("性能测试 - 批量数据处理")
void testBatchDataProcessingPerformance() {
// 创建多个测试数据集
List<TrainingDataset> datasets = testDataBuilder.createTestDatasets(5);
datasetRepository.saveAll(datasets);
PreprocessingConfig config = PreprocessingConfig.builder()
.domainType(DomainModel.DomainType.LEGAL)
.trainingType(TrainingJob.TrainingType.LORA)
.dataAugmentationEnabled(false) // 关闭数据增强以加快测试
.build();
long startTime = System.currentTimeMillis();
BatchPreprocessingResult batchResult = dataPreprocessingService.batchPreprocessDatasets(
datasets, config);
long totalTime = System.currentTimeMillis() - startTime;
double averageTime = (double) totalTime / datasets.size();
log.info("批量数据处理性能结果:");
log.info("数据集数量: {}", datasets.size());
log.info("总处理时间: {}ms", totalTime);
log.info("平均处理时间: {:.2f}ms", averageTime);
log.info("成功率: {}/{}", batchResult.getSuccessCount(), batchResult.getTotalDatasets());
// 性能断言
assertTrue(averageTime < 5000, "平均处理时间应小于5秒");
assertTrue(batchResult.getSuccessCount() >= 4, "成功率应大于80%");
}
}
/**
* 性能基准测试
*/
@SpringBootTest
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class ModelTrainingPerformanceTest {
@Autowired
private ModelTrainingService modelTrainingService;
@Autowired
private ModelEvaluationService modelEvaluationService;
@Test
@DisplayName("大规模模型训练性能测试")
void testLargeScaleTrainingPerformance() {
int concurrentJobs = 3;
List<TrainingRequest> trainingRequests = testDataBuilder.createTrainingRequests(concurrentJobs);
long startTime = System.currentTimeMillis();
// 并发提交训练任务
List<CompletableFuture<TrainingJob>> futures = trainingRequests.stream()
.map(request -> CompletableFuture.supplyAsync(() -> {
try {
return modelTrainingService.createTrainingJob(request);
} catch (Exception e) {
log.warn("训练任务创建失败: {}", e.getMessage());
return null;
}
}))
.collect(Collectors.toList());
// 等待所有任务提交完成
List<TrainingJob> trainingJobs = futures.stream()
.map(CompletableFuture::join)
.filter(Objects::nonNull)
.collect(Collectors.toList());
long submissionTime = System.currentTimeMillis() - startTime;
double averageSubmissionTime = (double) submissionTime / trainingRequests.size();
log.info("大规模训练性能结果:");
log.info("并发任务数: {}", concurrentJobs);
log.info("总提交时间: {}ms", submissionTime);
log.info("平均提交时间: {:.2f}ms", averageSubmissionTime);
log.info("成功提交: {}/{}", trainingJobs.size(), trainingRequests.size());
// 性能断言
assertTrue(averageSubmissionTime < 1000, "平均提交时间应小于1秒");
assertTrue(trainingJobs.size() >= concurrentJobs * 0.8, "提交成功率应大于80%");
}
}
更多推荐



所有评论(0)