一、数据准备

企业级大模型训练常见的数据集

①、数据收集:收集公司内部客服对话记录、产品手册、常见问题解答(FAQ)等

//智能客服系统场景
{"id": 1, "question": "如何重置密码?", "answer": "您可以在登录页面点击忘记密码,然后按照提示操作。", "category": "账户管理"}
{"id": 2, "question": "我的订单还没有发货?", "answer": "请提供订单号,我为您查询。", "category": "订单查询"}
//金融风控系统
transaction_id,user_id,amount,location,time,is_fraud
1001,12345,1500.00,"New York","2023-01-01 10:00:00",0
1002,12346,5000.00,"Tokyo","2023-01-01 11:00:00",1
//医疗诊断辅助系统
{
  "patient_id": "P001",
  "symptoms": ["发热", "咳嗽", "乏力"],
  "examination_results": {"temperature": 38.5, "blood_pressure": "120/80"},
  "diagnosis": "流感",
  "treatment_plan": "休息,多喝水,服用抗病毒药物"
}
//法律文档分析
{
  "clause": "本合同自双方签字盖章之日起生效,有效期三年。",
  "explanation": "该条款规定了合同的生效时间和有效期。",
  "risk_level": "低"
}
//多语言翻译系统
This is a product description.	这是产品描述。
The device has a battery life of 24 hours.	该设备电池续航为24小时。

②、数据清洗:去除敏感信息、无关信息、进行脱敏处理

③、数据标注:监督学习,需要将问题与对应的答案进行配对;也可以使用无监督、半监督方法

④、数据格式:训练集、验证集、测试集,问题和答案对应,有时需要构造负样本(错误答案)用于训练

⑤、数据预处理:根据所选模型的输入进行预处理,如分词、截断、添加特殊标记

import pandas as pd
import re
from transformers import AutoTokenizer

class DataPreprocessor:
	def __init__(self,config):
		self.tokenizer = AutoTokenizer.from_pretrained(config.model_name)
		self.max_length = config.max_length

	def clean_text(self,text):
		"""文本清洗"""
		#移除特殊字符
		text = re.sub(r'[^\w\s.,?!]', '', text)
		#标准化空白字符
		text = re.sub(r'\s+', ' ', text)
		return text.strip()

	def tokenize_dataset(self,dataset):
		"""分词处理"""
		return self.tokenizer(
			dataset['text'],
			truncation=True,
			padding='max_length',
			max_length=self.max_length,
			return_tensors='pt'
		)

	def create_training_splits(self,data,ratio=(0.7,0.15,0.15)):
		"""划分数据集"""
		train_size = int(len(data) * ratios[0])
		val_size = int(len(data) * ratios[1])

		train_data = data[:train_size]
		val_data = data[train_size:train_size+val_size]
		test_data = data[train_size+val_size]

		return train_data,val_data,test_data

	

二、创建模型

①、选择需训练模型:比如问答任务选择BERT、RoBERTa等;需要生成答案选择GPT、T5等

业务—选择—>预训练模型

  • 任务类型
    文本分类、情感分析、命名实体识别等理解任务:BERT、RoBERTa、DistilBERT等
    文本生成、对话系统、创意协作等生成任务:GPT-2、GPT-3、BART、T5
    序列到序列任务(翻译、摘要):T5、BART、mT5(多语言T5)等
  • 语言:单语言任务选择对应语言的模型,多语言任务选择多语言模型
  • 领域:如果有特定领域(如医疗、法律),优先选择领域内预训练的模型
  • 资源限制:如果计算资源有限,选择轻量级模型如DistilBERT、TinyBERT等
  • 性能要求:如果追求最高性能,可以选择大型模型如RoBERTa-large、GPT-3等,但需权衡计算成本

如何选择大模型

  • BERT(Bidirectional Encoder Representations from Transformers):用于理解上下文,适合NLU任务,如分类、命名实体识别等
  • RoBERTa(Robustly Optimized BERT Pretraining Approach):对BERT的改进,移除了下一句预测任务,使用更大的批次和更多的数据
  • DistilBERT:轻量级BERT,模型更小,速度更快,适合资源有限的环境
  • GPT(Generative Pre-trained Transformer):自回归模型,适合生成任务,如文本生成、对话生成等
  • BART(Bidirectional and Auto-Regressive Transformers):结合了BERT和GPT的特点,适用于生成和重构任务
  • T5(Text-to-Text Transfer Transformer):将所有的NLP任务都转换为文本到文本的格式,统一了处理方式
  • ELECTRA(Efficiently Learning an Encoder that Classifies Token Replacements Accurately):使用替换令牌检测任务,效率更高
  • 多语言模型:如mBERT(多语言BERT)、XLM-RoBERTa,适用于多语言任务
  • 领域特定预训练模型:如BioBERT(生物医学领域)、SciBERT(科学文献领域)、ClinicalBERT(临床领域)等
领域 模型名称 特点 适用任务
通用 BERT 双向编码,理解能力强 分类、NER、问答
通用 RoBERTa 优化版BERT,移除NSP任务 所有NLU任务
生成 GPT系列 自回归生成,创意能力强 文本生成、对话
多任务 T5 同一文本到文本框架 翻译、摘要、分类
多语言 XLM-R 100中语言支持 跨语言任务
生物医学 BioBERT 在PubMed上训练 医学NER、QA
科学 SciBERT 科学文献训练 科学文本理解
法律 LegalBERT 法律文本训练 合同分析
金融 FinBERT 金融新闻训练 情感分析、预测
代码 CodeBERT 代码数据训练 代码生成、理解
class ModelFactory:
	@staticmethod
	def create_model(model_type,config):
		"""根据需求创建模型"""
		models = {
			'classification':{
				'bert': 'bert-base-uncased',
                'roberta': 'roberta-base',
                'distilbert': 'distilbert-base-uncased'
			},
			'generation':{
				'gpt2': 'gpt2-medium',
                't5': 't5-base',
                'bart': 'facebook/bart-base'
			},
			'multimodal':{
				'clip': 'openai/clip-vit-base-patch32'
			}
		}

		# 考虑因素:计算资源、延迟要求、准确率需求
		if config.gpu_memory < 8 # GPU内存小于8G
			return models[model_type]['distilbert']
		elif config.latency_requirement < 100:# 延迟要求<100ms
			return models[model_type]['distilbert']
		else:
			return models[model_type]['bert']

②、模型架构(任务头+反向传播+):在预训练模型的基础上,添加任务特定的输出层。对于分类任务,可以添加一个全连接层;对于生成任务,则使用预训练模型的生成头

前向传播:用户评论“物流太慢了!”→ RoBERTa编码→全连接层→Softmax→输出[0.1, 0.2, 0.7](负面概率70%)

输入 → 前向传播(全连接层→激活函数→...)→ 输出层 → 任务头 → 预测结果
       ↑                ↓
       反向传播 ← 损失函数(对比真实标签)
       ↑                ↓
    优化器(学习率控制步长)← 梯度(来自反向传播)

全连接层(Fully Connected Layer):深度学习中,每个节点都与上一层的所有节点连接的层。
在NLP任务中,通常在预训练模型的输出之上添加一个或多个全连接层,将模型输出的高维向量映射到任务所需的维度

激活函数(Activation Function):常见的有ReLU、Sigmoid、Tanh。业务场景中,选激活函数影响模型性能,比如Sigmoid在二分类输出层常用,但可能有梯度消失,所以深层网络用ReLU

过拟合(Overfitting):模型在训练集表现好,测试集差,因为记住了噪声。nn.Dropout(p=0.5),或者L2正则化(weight_decay)

import torch
import torch.nn.as nn

class FullyConnectedLayerExplained:
	"""全连接层详解"""
	def __init__(self):
		#最简单的全连接层示例(输入768维,输出256维)
		self.fc_example = nn.Linear(in_features=768,out_features=256)

		#多层全连接网络示例
		self.mlp = nn.Sequential(
			nn.Linear(768,512),# 第一层:768 -> 512
			nn.ReLU(),# 激活函数,给神经网络引入非线性,解决梯度消失
			nn.Dropout(0.1), # 防止过拟合,解决方式:正则化、Dropout、早停
			
			nn.Linear(512,256),# 第二层:512 -> 256
			nn.ReLU(),
			nn.Dropout(0.1),

			nn.Linear(256,10) # 输出层(模型最后一层):256 -> 10 (假设10分类)
		)

任务头:
大模型预训练后,针对不同的下游任务添加的“专用输出层”(后面接不同的任务头来做具体任务,预训练模型通常不包括任务特定的输出层)

BERT预训练模型在预训练时使用MLM(掩码语言模型)和NSP(下一句预测)头,但在下游任务中我们需要根据任务替换这些头。常见的头有:

  • 分类头:用于文本分类,通常是1层全连接+Softmax(如情感分析),将[CLS]标记的输出向量映射到类别数量的维度,然后接softmax。BERT后面加一个线性层作为分类头。代码里可能是model.classifier = nn.Linear(hidden_size, num_classes)
  • 序列标注头:CRF层或全连接层(如NER)。用于命名实体识别等任务,通常是一个全连接层,将每个token的输出向量映射到标签数量维度
  • 生成头:Transformer解码器(如文本生成),用于生成任务,如GPT-2、BART、T5等,通常是一个线性层,将隐藏状态映射到词汇表大小的维度,然后通过softmax生成下一个token的概率
  • 问答头:用于问答任务,通常包括两个线性层,分别预测答案的可是和结束位置
# 同一预训练模型通过更换任务头,既可以做“用户意图分类”(3类),也可以做“商品属性抽取”(NER任务头)
from transformers import BertForSequenceClassification
model = BertForSequenceClassification.from_pretrained("bert-base-uncased", num_labels=3)# 3分类任务头
class PretrainedHeadsExplained:
    """预训练模型头部结构详解"""
    
    def explain_pretrained_heads(self):
        """解释不同预训练模型的头部结构"""
        
        print("预训练模型生成头(Pretrained Heads)详解:")
        
        print("\n1. BERT的预训练头:")
        print("   a) MLM头(Masked Language Modeling):")
        print("      - 目的:预测被掩码的token")
        print("      - 结构:Linear(768, 30522) + Softmax")
        print("      - 输出:词汇表概率分布")
        
        print("\n   b) NSP头(Next Sentence Prediction):")
        print("      - 目的:判断两个句子是否连续")
        print("      - 结构:Linear(768, 2) + Softmax")
        print("      - 输出:[是连续,不连续]")
        
        print("\n2. GPT的预训练头:")
        print("   - 目的:预测下一个token")
        print("   - 结构:Linear(768, 50257) + Softmax")
        print("   - 特点:自回归生成,每次生成一个token")
        
        print("\n3. T5的预训练头:")
        print("   - 目的:文本到文本转换")
        print("   - 结构:编码器-解码器 + LM头")
        print("   - 输出:任意文本序列")
        
        print("\n4. 为什么需要自定义头?")
        print("   - 预训练头针对预训练任务设计")
        print("   - 下游任务通常与预训练任务不同")
        print("   - 需要适配任务特定的输出格式")

根据业务添加输出层的步骤:

  1. 确定任务类型(分类、生成、序列标注)和输出形式(类别数、序列长度)
  2. 分局任务类型选择合适的头结构(如分类任务使用分类头)
  3. 在预训练模型的基础上构建模型,将预训练模型的输出作为自定义头的输入
  4. 训练时,可以先冻结预训练模型,只训练头部份,然后在微调整个模型,或者直接一起训练
# 生成头与分类头的区别
def compare_heads():
    """比较不同任务的头结构"""
    
    print("任务特定头结构对比:")
    
    print("\n1. 分类头(Classification Head):")
    print("   输入: [batch_size, hidden_size]")
    print("   结构: Linear(hidden_size, num_classes)")
    print("   输出: [batch_size, num_classes]")
    print("   示例: 情感分析(正面/负面/中性)")
    
    print("\n2. 序列标注头(Sequence Labeling Head):")
    print("   输入: [batch_size, seq_len, hidden_size]")
    print("   结构: Linear(hidden_size, num_tags) 每个token独立")
    print("   输出: [batch_size, seq_len, num_tags]")
    print("   示例: 命名实体识别(PER, ORG, LOC, O)")
    
    print("\n3. 生成头(Generation Head):")
    print("   输入: [batch_size, seq_len, hidden_size]")
    print("   结构: Linear(hidden_size, vocab_size)")
    print("   输出: [batch_size, seq_len, vocab_size]")
    print("   示例: 机器翻译、文本摘要")
    
    print("\n4. 问答头(Question Answering Head):")
    print("   输入: [batch_size, seq_len, hidden_size]")
    print("   结构: 两个Linear层(分别预测开始和结束位置)")
    print("   输出: [batch_size, seq_len] × 2")
    print("   示例: SQuAD问答任务")
# 自定义模型架构
import torch.nn as nn
from transformers import AutoModel,AutoConfig

class CustomEnterpriseModel(nn.Module):
	def __init__(self,config):
		super().__init__()

		#加载预训练模型
		self.base_model = AutoModel.from_pretrained(config.base_model)

		#添加企业特定层
		self.domain_adapter = nn.Linear(768,512)
		
		self.task_head = nn.MuduleDict({
			'classification'nn.Linear(512,config.num_classes):,
			'regression':nn.Linear(512,1),
			'generation':nn.Linear(512,config.vocab_size)
		})
		
		#企业知识注入层
		self.knowledge_fusion = KnowledgeFusionLayer(
			external_knowledge = config.knowledge_base
		)

	def forward(self,input_ids,attention_mask,task_type='classification'):
		#基础模型特征提取
		outputs = self.base_model(input_ids,attention_mask=attention_mask)
		sequence_output = outputs.last_hidden_state

		#领域适配
		adapted_features = self.domain_adapter(dequence_output[:,0,:])

		#知识融合
		enhanced_features = self.knowledge_fusion(adapted_features)
		
		#任务特定输出
		return self.task_head[task_type](enhanced_features)
#为不同任务添加输出层
from transformers import AutoModel,AutoConfig
import torch.nn as nn

class TaskSpecificModelFactory:
	"""任务特定模型工厂"""
	@staticmethod
	def create_model_for_task(model_name:str,task_type:str,**kwargs):
		"""
		根据任务创建定制化模型
        
        参数:
            model_name: 预训练模型名称
            task_type: 任务类型
            **kwargs: 任务特定参数	
		"""
		#加载预训练模型配置
		config = AutoConfig.from_pretrained(model_name)

		if task_type == "text_classification":
			return TaskSpecificModelFactory._create_classification_model(
				model_name,config,**kwargs
			)
		elif task_type == "named_entity_recognition":
			return TaskSpecificModelFactory._create_ner_model(
				model_name,config,**kwargs
			)
		elif task_type == "question_answering":
			return TaskSpecificModelFactory._create_qa_model(
				model_name,config,**kwargs
			)
		elif task_type == "text_generation":
			return TaskSpecificModelFactory._create_generation_model(
				model_name,config,**kwargs
			)
		else:
			raise ValueError(f"不主持的任务类型:{task_type}")
		
	@staticmethod
	def _create_classification_model(model_name, config, num_labels=2, dropout_prob=0.1):
		"""创建分类模型"""
		class ClassificationModel(nn.Module):
			super().__init__()
			#加载预训练模型(不包括头部)
			self.backbone = AutoModel.from_pretrained(model_name, add_pooling_layer=False)

			#获取隐藏层大小
			hidden_size = self.backbone.config.hidden_size

			#添加分类头
			self.dropout = nn.Dropput(dropout_prob)
			self.classifier = nn.Linear(hidden_size,num_labels)

			#初始化分类头权重
			self._init_weights(self.classifier)
			
		def _init_weights(self,module):
			"""初始化权重"""
			if isinstance(module,nn.Linear):
				module.weight.data.normal_(mean=0.0,std=0.02)
				if module.bias is not None:
					module.bias.data.zero_()
		def forward(self,input_ids,attention_mask=None,token_type_ids=None):
			#通过预训练模型获取特征
			outputs = self.backbone(
				input_ids,
				attention_mask=attention_mask,
				token_type_ids=token_type_ids
			)
			# 取[CLS]标记的隐藏状态(对于分类任务)
                # sequence_output形状: [batch_size, seq_len, hidden_size]
                sequence_output = outputs.last_hidden_state
                # 取第一个token([CLS])的表示
                cls_output = sequence_output[:, 0, :]
                
                # 应用dropout
                cls_output = self.dropout(cls_output)
                
                # 分类头
                logits = self.classifier(cls_output)
                
                return logits
		return cl

③、环境配置:安装必要的深度学习框架(如PyTorch、TensorFlow)和模型库(如Hugging Face Transformers)

三、训练模型(微调)

①、设置训练参数:包括学习率(Learning Rate, LR)、批次大小、训练轮数(epochs)等

学习率:优化器更新权重时的步长(每次调整的幅度)
太大,可能跳过最优解(震荡不收敛)太小,训练缓慢,容易陷入局部最优
预训练大模型微调时,学习率通常较小(如1e-5),避免破坏预训练知识。
从头训练小模型时,学习率可能设为1e-3
optimizer = torch.optim.Adam(model.parameters(),lr=1e-4) 学习率0.0001

②、设置优化器和损失函数(Loss Function):例如,使用Adam优化器和交叉熵损失函数等

损失函数:衡量模型预测值与真实值的差距(“误差大小”),指导优化方向

  • 分类:交叉熵损失(CrossEntropyLoss,等价于LogSoftmax+NLLLoss)
  • 回归:均方误差(MSELoss)、平均绝对误差(MAELoss)
  • 序列任务:CTC Loss(语音识别)、交叉熵(机器翻译)
#优化策略
from torch.optim import AdamW
from transformers import get_linear_schedule_with_warmup

class OptimizedTraining:
	def __init__(self,model,config):
		self.model = model
		self.config = config

	def setup_optimization(self):
		"""配置优化器和学习率调度"""
		#参数分组,不同层使用不同学习率
		no_decay = ['bias','LayerNorm.weight']
		optimizer_grouped_parameters = [
			{
				'params': [p for n, p in self.model.named_parameters() 
                          if not any(nd in n for nd in no_decay)],
                'weight_decay': self.config.weight_decay,
                'lr': self.config.learning_rate
			},
			{
				'params': [p for n, p in self.model.named_parameters() 
                          if any(nd in n for nd in no_decay)],
                'weight_decay': 0.0,
                'lr': self.config.learning_rate * 0.1
			}
		]

		optimizer = AdamW(
			optimizer_grouped_parameters,
			lr=self.config.learning_rate,
			betas=(0.9,0.999)
		)
		
		#学习率调度
		scheduler = get_linear_schedule_with_warmup(
			optimizer,
			num_warmup_steps=self.config.warmup_steps,
			num_training_steps=self.config.total_steps
		)
		return optimizer,scheduler

	def gradient_checkpointing(self):
		"""梯度检查点节省内存"""
		if hasattr(self.model,'gradient_checkpointing_enable'):
			self.moel.gradient_checkpointing_enable()

③、训练循环:在每个epoch中,将训练数据输入模型、计算损失、反向传播更新参数。同时,使用验证集监控模型性能,防止过拟合(Overfitting)

过拟合:模型过度学习训练数据的噪声(死记硬背),导致在测试集上表现差(泛化能力差)
解决方法:

  • 数据增强(Data Augmentation):对文本同义词替换、图像旋转
  • 正则化(Regularization):L2正则(权重衰减)、Dropout(随机失活神经元) self.dropout = nn.Dropout(0.5) # 训练时50%神经元失活
  • 早停(Early Stopping):验证集指标不再提升时停止训练
  • 简化模型:减少层数/神经元数(降低复杂度)
#分布式训练配置
import torch
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader,DistributedSampler

class DistributedTrainer:
	def __init__(self,config):
		self.config = config
		self.setup_distributed()

	def setup_distributed(self):
		"""设置分布式训练环境"""
		torch.distributed.init_process_group(backend='nccl')
		self.local_rank = torch.distributed.get_rank()
		self.world_size = torch.distributed.get_world_size()
	
	def train(self,model,dataset,optimizer):
		"""分布式训练循环"""
		#数据并行采样
		sampler = DistributedSampler(
			dataset,
			num_replicas = self.world_size,
			rank = self.local_rank
		)
		dataloader = DataLoader(
			dataset,
			batch_size = self.config.batch_size,
			sampler = sampler,
			num_workers=4
		)

		#模型并行
		model = DDP(model.to(self.local_rank),device_ids=[self.local_rank])

		#混合精度训练
		scaler = torch.cuda.amp.GradScaler()

		for epoch in range(self.config.epochs):
			sampler.set_epoch(epoch)

			for batch in dataloader:
				with torch.cuda.amp.autocast():
					loss = model(**batch).loss

				scaler.scale(loss).backward()
				scaler.step(optimizer)
				scaler.update()
				optimizer.zero_grad()
import wandb
from torch.utils.tensorboard import SummaryWriter

class TrainingMonitor:
	def __init__(self,config):
		self.config = config
		self.setup_logging()

	def setup_logging(self):
		"""设置训练监控"""
		#WandB集成
		wandb.init(
			project=config.project_name,
			config=config.__dict__
		)
		
		#TensorBoard
		self.writer = SummaryWriter(log_dir=config.log_dir)

	def log_metrics(self,metrics,step):
		"""记录训练指标"""
		#WandB记录
		wandb.log(metircs,step=step)

		#TensorBorad记录
		for key,value in metrics.items():
			self.writer.add_scalar(key,value,step)

		#企业监控系统集成
		self._send_to_enterprise_monitoring(metrics)	

④、模型保存:保存训练好的模型权重

详细内容

四、评估模型

①、使用测试集评估模型性能。评估指标根据任务而定,例如对于分类任务使用准确率、精确率、召回率、F1分数;对于生成任务可以使用BLEU、ROUGE等

②、进行端到端的测试:模拟真实场景,检查模型输出是否符合预期

# 综合评估框架
from sklearn.metrics import precision_recall_fscore_support,accuracy_score
import numpy as np

class ModelEvaluator:
	def __init__(self,model,tokenizer):
		self.model = model
		self.tokenizer = tokenizer

	def evaluate(self,test_dataset):
		"""综合评估"""
		results = {}
		# 基础性能指标
        results.update(self._compute_basic_metrics(test_dataset))
        
        # 业务特定指标
        results.update(self._compute_business_metrics(test_dataset))
        
        # 偏差检测
        results.update(self._detect_bias(test_dataset))
        
        # 鲁棒性测试
        results.update(self._robustness_test(test_dataset))
        
        return results

	def _compute_basic_metrics(self,dataset):
		"""计算基础指标"""
		predictions,labels = self._get_predictions(dataset)
		metrics = {
			'accuracy': accuracy_score(labels, predictions),
            'precision': precision_recall_fscore_support(
                labels, predictions, average='weighted')[0],
            'recall': precision_recall_fscore_support(
                labels, predictions, average='weighted')[1],
            'f1': precision_recall_fscore_support(
                labels, predictions, average='weighted')[2]
		}

		return metrics

	def _detect_bias(self, dataset):
		"""检测模型偏差"""
		bias_metrics = {}

		#按性别、年龄等敏感属性分组评估
		for attribute in ['gender','age_group','region']
			subgroups = dataset.groupby(attribute)
			
			for name,group in subgroups:
				subgroup_metrics = self._compute_basic_metrics(group)
				bias_metrics[f'{attribute}_{name}_f1'] = subgroup_metrics['f1']
		return {'bias_analysis',bias_metrics}
class ABTesting:
	def __init__(self,model_a,model_b):
		self.model_a = model_a
		self.model_b = model_b

	def run_experiment(self,traffic_split=0.5,duration_days=7):
		"""运行A/B测试"""
		results = {
			'model_a':{'conversions':0,'total':0},
			'model_b':{'conversions':0,'total':0}
		}

		#模拟流量分配
		for request in self.simulate_traffic(duration_days):
			if np.random() > traffic_split:
				model = self.model_a
				key = 'model_a'
			else:
				model = self.model_b
				key = 'model_b'

			prediction = model.predict(request)
			results[key]['total'] += 1

			if self.is_success(prediction,request):
				results[key]['conversions'] += 1

		#统计显著性检验
		significance = self._calculate_singificance(results)

		return {
			'results': results,
            'significance': significance,
            'recommendation': self._make_recommendation(results, significance)
		}

前向传播是数据流动的过程,全连接层和激活函数是构建模型的基础,输出层和任务头决定任务类型,学习率和损失函数驱动训练,反向传播和优化器调整权重,而过拟合是需要避免的陷阱。

分类任务:情感分析或垃圾邮件检测,特点时输出离散

生成任务

回归任务

五、部署模型

大模型部署方式

①、模型导出:

②、选择部署平台:

③、构建API服务

④、考虑性能

# 生产环境部署:deployment.py
from fastapi import FastAPI, HTTPException
import torch
import onnxruntime
from prometheus_client import Counter, Histogram

class ModelDeployment:
    def __init__(self, config):
        self.app = FastAPI()
        self.config = config
        self.setup_monitoring()
        self.load_model()
        self.setup_routes()
        
    def load_model(self):
        """加载优化后的模型"""
        if self.config.quantization == 'int8':
            # 量化模型加载
            self.model = self._load_quantized_model()
        elif self.config.optimization == 'onnx':
            # ONNX运行时
            self.session = onnxruntime.InferenceSession(
                "model.onnx",
                providers=['CUDAExecutionProvider', 'CPUExecutionProvider']
            )
        else:
            # 标准PyTorch模型
            self.model = torch.jit.load("model.pt")
            
    def setup_monitoring(self):
        """设置监控指标"""
        self.request_counter = Counter(
            'model_requests_total',
            'Total number of requests'
        )
        self.latency_histogram = Histogram(
            'model_latency_seconds',
            'Request latency in seconds'
        )
        self.error_counter = Counter(
            'model_errors_total',
            'Total number of errors'
        )
        
    def setup_routes(self):
        """设置API路由"""
        @self.app.post("/predict")
        async def predict(request: dict):
            with self.latency_histogram.time():
                try:
                    self.request_counter.inc()
                    
                    # 输入验证
                    validated_input = self.validate_input(request)
                    
                    # 推理
                    prediction = self.inference(validated_input)
                    
                    # 后处理
                    result = self.post_process(prediction)
                    
                    return result
                    
                except Exception as e:
                    self.error_counter.inc()
                    raise HTTPException(status_code=500, detail=str(e))
# 容器化部署:Dockerfile
FROM nvidia/cuda:11.8.0-runtime-ubuntu22.04

# 安装依赖
RUN apt-get update && apt-get install -y \
    python3.10 \
    python3-pip \
    && rm -rf /var/lib/apt/lists/*

WORKDIR /app

# 复制应用代码
COPY requirements.txt .
COPY . .

# 安装Python包
RUN pip3 install --no-cache-dir -r requirements.txt

# 下载模型文件
RUN python3 download_model.py

# 健康检查
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
    CMD python3 health_check.py

# 启动服务
EXPOSE 8000
CMD ["uvicorn", "deployment:app", "--host", "0.0.0.0", "--port", "8000"]
# Kubernetes部署配置kubernetes-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: llm-service
spec:
  replicas: 3
  selector:
    matchLabels:
      app: llm-service
  template:
    metadata:
      labels:
        app: llm-service
    spec:
      containers:
      - name: llm-container
        image: your-registry/llm-service:latest
        resources:
          limits:
            nvidia.com/gpu: 1
            memory: "8Gi"
            cpu: "4"
          requests:
            memory: "4Gi"
            cpu: "2"
        env:
        - name: MODEL_PATH
          value: "/app/models"
        - name: MAX_BATCH_SIZE
          value: "16"
        ports:
        - containerPort: 8000
        readinessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 10
          periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
  name: llm-service
spec:
  selector:
    app: llm-service
  ports:
  - port: 80
    targetPort: 8000
  type: LoadBalancer

六、使用模型

①、客户端通过HTTP请求调用API,传入问题,获取答案

#客户端SDK client_sdk.py
import request
import json
from typing import Dict,Any
import backoff

class LLMClient:
	def __init__(self,base_url:str,api_key:str):
		self.base_url = base_url
		self.headers = {
			"Authorization":f"Bearer {api_key}",
			"Content_type":"application/json"
		}
	
	@backoff.on_exception(
		backoff.expo,
		requests.exceptions.RequestException,
		max_tries=3
	)
	def predict(self,text:str,**kwargs)->Dict[str,Any]:
		"""调用模型API"""
		payload = {
			"text":text,
			**kwargs
		}

		response = requests.post(
			f"{self.base_url}/predict",
			headers=self.headers,
			json=payload,
			timeout=30
		)

		response.raise_for_status()
		return response.json()

	def batch_predict(self,texts:list,batch_size:int=10):
		"""批量预测"""
		results = []
		for i in range(0,len(texts),batch_size):
			batch = texts[i:i_batch_size]
			batch_result = self.predict(batch)
			results.extend(batch_result)
		return results

②、集成到现有系统:例如将模型集成到公司网站、聊天机器人或移动应用中

# enterprise_integration.py
class CRMIntegration:
	def __init__(self,llm_client):
		self.llm_client = llm_client

	def analyze_customer_sentiment(self,customer_id):
		"""分析客户情绪"""
		conversations = self.get_customer_conversations(customer_id)

		#使用LLM分析情绪
		sentiment_results = []
		for conv in conversations:
			analysis = self.llm_client.predict(
				text=conv('content'),
				task="sentiment_analysis"
			)
			sentiment_results.append(analysis)

		#生成客户洞察报告
		insight_report = self.generate_insight_report(sentiment_results)

		#更新CRM系统
		self.update_crm_profile(customer_id,insight_report)
		return insight_report

	def automate_response(self,inquiry):
		"""自动生成回复"""
		#检索相关知识库
		relevant_info = self.retrieve_knowledge_base(inquiry)

		#生成个性化回复
		response = self.llm_client.predict(
			text_inquiry,
			context=relevant_info,
			task="response_generation"
		)

		#安全过滤
		filtered_response = self.safety_filter(response)
		return filtered_response
# 实时推理服务realtime_inference.py
import asyncio
import aiohttp
from concurrent.futures import ThreadPoolExecutor

class RealtimeInterenceService:
	def __init__(self,model,max_workers=4):
		self.model = model
		self.executor = ThreadPoolExecutor(max_workers=max_workers)
		self.session = None

	async def process_stream(self,websocket):
		"""处理WebSocket流"""
		async for message in websocket:
			#异步处理推理请求
			result = await asyncio.get_event_loop().run_in_executor(
				self.executor,
				self._inference,
				message
			)		
			await websocket.send_json(result)

	def _inference(self,data):
		"""执行模型推理"""
		with torch.no_grad():
			inputs = self.preprocess(data)
			outputs = self.model(**inputs)
			return self.postprocess(outputs)

③、与Java集成

  • 模型部署为HTTP服务(使用Python的FaseAPI框架)
  • Java系统通过HTTP客户端调用模型服务

步骤一:使用FastAPI部署模型

假设有一个训练好的模型,并保存为model.pkl(或使用其他格式,如PyTorch的.pt或TensorFlow的SavedModel)
这里以PyTorch模型为例。

步骤二:创建FastAPI应用,包含安装了fastapi、uvicorn、torch等库

步骤三:在Java中调用API

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐