业务痛点:某头部电商平台年客服工单量超3600万,涉及售后、物流、咨询、投诉等12个部门,人工分类存在:

  • 效率低:日均10万+工单需200+客服处理,耗时占客服总工时40%
  • 准确率低:复杂工单(如“物流延迟+商品破损”跨多部门)分类错误率达25%,导致工单流转延误(平均处理时长增加2小时)
  • 标准不一:新客服培训成本高(需1个月掌握分类规则),离职交接易引发分类波动

开发环境与工具链

算法团队:

  • 语言:Python 3.9(特征工程/模型训练)、Scala 2.13(Spark数据预处理)
  • 文本处理:jieba(结巴分词)、sklearn(TF-IDF)、gensim(关键词提取)
  • 特征存储:Feast 0.34(实体:工单ID,特征:TF-IDF向量/关键词数/历史工单频率)
  • 模型训练:scikit-learn 1.2(SVM-RBF核)、XGBoost 2.0(类别不平衡处理对比)
  • 实验跟踪:MLflow 2.8(记录参数/指标/模型)、Weights & Biases(可视化)
  • 版本控制:git@github.com:ecom/algorithm-ticket-classification.git

业务团队:

  • 语言:Go 1.20(高性能API)、Java 17(客服系统集成)
  • 服务框架:FastAPI 0.104(Python轻量级API)、gRPC(跨语言特征服务调用)
  • 服务治理:Kong 3.4(API网关)、Consul 1.16(服务发现)
  • 监控:Prometheus 2.47(指标采集)、Grafana 10.2(可视化)
  • 版本控制:git@github.com:ecom/business-ticket-classification.git

数据准备与特征变化

(1)原数据结构

① 客服工单系统(ticket_core,Hive表)
在这里插入图片描述

② 用户行为日志(user_behavior,Kafka Topic)

// 历史工单记录(用于提取用户工单频率特征)
{ "user_id": "U789", "historical_tickets": 3, "last_department": "售后部" }

③ 部门分类标准库(department_labels,Excel)
在这里插入图片描述

(2)数据清洗与特征工程(算法团队负责,重点:文本→TF-IDF特征)

步骤1:数据清洗(Spark批处理)

  • 缺失值处理:content为空(占0.1%)用“未知”填充,后续标记为低置信度
  • 异常值处理:content长度<5字符(如“查物流”)视为无效,关联用户历史工单补全(如用户最近3单均为物流问题,则归为物流部)
  • 去重:基于user_id+content哈希值去重(重复工单占比2%,避免模型过拟合)

步骤2:特征工程(核心:结巴分词+TF-IDF+统计特征)
在这里插入图片描述

步骤3:特征变化对比表(原数据→处理后特征)
在这里插入图片描述
在这里插入图片描述

(3)处理后特征矩阵示例(算法团队输出)
在这里插入图片描述

代码结构

算法团队仓库(algorithm-ticket-classification)​

algorithm-ticket-classification/  
├── data_processing/                # 数据预处理(Spark/Scala)  
│   ├── src/main/scala/com/ecom/  
│   │   ├── DataCleaning.scala      # 缺失值/异常值/去重处理  
│   │   └── DataDeduplication.scala  # 基于user_id+content哈希去重  
│   └── build.sbt                    # Spark依赖(spark-core, spark-sql)  
├── feature_engineering/            # 特征工程(Python,按功能拆分)  
│   ├── text_processing.py          # 结巴分词+停用词过滤+关键词提取  
│   ├── tfidf_vectorizer.py         # TF-IDF矩阵构建(sklearn TfidfVectorizer)  
│   ├── statistical_features.py     # 统计特征(词数/频率/分箱)  
│   └── requirements.txt             # 依赖:jieba, sklearn, pandas  
├── model_training/                 # 模型训练(分SVM和XGBoost子模块)  
│   ├── svm/                        # SVM-RBF核模型  
│   │   ├── train_svm.py             # SVM训练(RBF核,class_weight处理不平衡)  
│   │   ├── evaluate_svm.py          # 评估(混淆矩阵/F1-score/PSI)  
│   │   └── svm_params.yaml          # 调参记录(C=10, gamma=0.1)  
│   ├── xgboost/                    # XGBoost对比模型(处理类别不平衡)  
│   │   ├── train_xgb.py             # XGBoost训练(scale_pos_weight处理不平衡)  
│   │   └── xgb_params.yaml          # 调参记录(max_depth=6, learning_rate=0.1)  
│   └── requirements.txt             # 依赖:sklearn, xgboost, shap  
├── feature_store/                  # Feast特征存储(明确内容)  
│   ├── feature_repo/                # Feast特征仓库(核心!)  
│   │   ├── __init__.py  
│   │   ├── features.py              # 特征定义(实体、特征视图、在线/离线特征)  
│   │   └── feature_store.yaml       # Feast配置(在线存储Redis,离线存储Parquet)  
│   └── deploy_feast.sh              # 部署Feast服务到K8s的脚本  
├── mlflow_tracking/                # MLflow实验跟踪  
│   ├── run_svm_experiment.py        # 记录SVM实验(参数/指标/模型)  
│   └── run_xgb_experiment.py         # 记录XGBoost实验  
└── README.md                       # 算法文档(特征字典/模型输入输出说明)

(1)算法团队:特征存储(Feast,明确内容)

feature_store/feature_repo/features.py(定义实体、特征视图、在线/离线特征,业务团队通过此调用)

from feast import Entity,FeatureView,Field,FileSource,RedisSource
from feast.types import Float32,Int64,String
import pandas as pd

# 1.定义实体:工单ID
ticket_entity = Entity(name="ticket_id",value_type=String,description="工单唯一标识")

# 2.定义离线特征(从数据湖Parquet文件获取,用于模型训练)
offline_source = FileSource(
	path="s3://ecom-data-lake/processed/ticket_features.parquet",# 算法团队处理后的特征矩阵 
	event_timestamp_column="submit_time", # 事件时间戳(用于时间旅行查询) 
	created_timestamp_column="created_at"
)

# 3.定义在线特征(从Redis读取,用于实时预测,低延迟)
online_source = RedisSource(
	host="feast-redis-master",  # K8s中Redis服务名  
    port=6379,  
    key_field="ticket_id",  # 键:工单ID  
    value_field="feature_vector"  # 值:序列化后的特征向量(TF-IDF+统计特征)
)

# 4.定义特征视图(整合实体、特征、数据源)
ticket_features_view = FeatureView(
	name="ticket_features",
	entities=[ticket_entity],
	ttl=timedelta(days=7),# 特征保留7天  
	schema=[  # 特征列表(与算法团队处理后的特征矩阵对应)
		Field(name="word_count", dtype=Int64),  
        Field(name="stopword_ratio", dtype=Float32),  
        Field(name="keyword_match_count", dtype=Int64),  
        Field(name="user_ticket_freq_1m", dtype=Int64),  
        Field(name="submit_hour_risk", dtype=Int64),  
        Field(name="tfidf_features", dtype=String)  # 稀疏向量序列化后存储(如JSON) 
	],
	source=offline_source,
	online_source=online_source
)

feature_store/feature_store.yaml(Feast配置,指定在线/离线存储)

project: ecom_ticket_classification  # 项目名称  
registry: data/registry.db  # 特征注册表路径(存储特征元数据)  
provider: local  # 本地开发用,生产环境用k8s  
online_store:  
  type: redis  
  connection_string: "redis://feast-redis-master:6379"  # 在线存储Redis地址  
offline_store:  
  type: file  # 生产环境用bigquery/snowflake  
  path: "data/offline_store"  # 离线存储路径(Parquet文件)

feature_store/deploy_feast.sh(部署Feast服务到K8s,供业务团队调用)

#!/bin/bash  
# 1. 构建Feast Docker镜像(含feature_repo)  
docker build -t ecom/feast-feature-server:latest -f Dockerfile.feast .  

# 2. 推送镜像到仓库  
docker push ecom/feast-feature-server:latest  

# 3. 部署到K8s(StatefulSet+Service,确保稳定网络标识)  
kubectl apply -f k8s/feast-statefulset.yaml  
kubectl apply -f k8s/feast-service.yaml  # 暴露gRPC端口6566

(2)算法团队:SVM-RBF核模型训练(model_training/svm/train_svm.py)

import joblib
import pandas as pd
import numpy as np
from sklearn.svm import SVC
from sklearn.model_selection import train_test_split,GridSearchCV
from sklearn.preprocessing import StandardScaler
from feature_entineering.tfidf_vectorizer import load_tfidf_matrix # 加载TF-IDF特征
from feature_engineering.statistical_features import load_statistical_features # 加载统计特征
import mlflow
import logging

logging.basicConfig(level=logging.INFO)  
logger = logging.getLogger(__name__) 

def load_features_and_labels(feature_path:str,label_path:str)->tuple:
	"""加载特征矩阵(TF-IDF+统计特征)和标签(部门ID)"""
	# 加载TF=IDF(文本特征提取方法,)特征(TF-IDF+统计特征)核标签(部门ID)
	tfidf_matrix,ticket_ids = load_tfidf_matrix(feature_path)
	
	# 加载统计特征(DataFrame:word_count, stopword_ratio等)
	stat_features = load_statistical_features(label_path) # 含ticket_id和标签department 

	# 合并特征(TF-IDF稀疏矩阵+统计特征密集矩阵)
	from scipy.sparse import hstack
	X = hstack([tfidf_matrix, stat_features.drop(columns=["ticket_id", "department"]).values])
	y = stat_features["department"]  # 标签:12个部门ID(DEP01-DEP12)
	return X,y,ticket_ids

def handle_class_imbalance(X:np.ndarray,y:ndarray)->tuple:
	"""处理类别不平衡:XGBoost作为对比,SVM用class_weight='balanced'"""
	"""处理类别不平衡:XGBoost作为对比,SVM用class_weight='balanced'"""  
    # 方案1:SVM原生支持class_weight(自动调整类别权重,少数类权重高)  
    svm_class_weight = "balanced"  
    # 方案2:XGBoost(作为对比,用scale_pos_weight,但多分类需用sample_weight)  
    # 此处仅演示,实际用XGBoost时单独训练  
    return svm_class_weight 

def train_svm_rbf(X_train:np.ndarray,y_train:np.ndarray,class_weight:str)->SVC:
	"""训练SVM-RBF核模型(重点:RBF核参数调优)"""
	# 初始化SVM模型(RBF核,class_weight处理不平衡)
	svm = SVC(
		kernel="rbf",  # 径向基函数核(高维映射)  
        C=10,  # 惩罚系数(调参后最优值,C越大对误分类惩罚越重)  
        gamma=0.1,  # 核函数系数(调参后最优值,gamma越小映射空间越平滑)  
        class_weight=class_weight,  # 平衡类别权重(欺诈/少数类样本权重高)  
        probability=True,  # 输出概率(用于业务侧风险分级)  
        random_state=42,  
        verbose=1  # 输出训练日志 
	)
	# 网格搜索调优(可选,此处用预调参结果)  
    # param_grid = {"C": [1, 10, 100], "gamma": [0.01, 0.1, 1]}  
    # grid_search = GridSearchCV(svm, param_grid, cv=5, scoring="f1_macro")  
    # grid_search.fit(X_train, y_train)  
    # svm = grid_search.best_estimator_  

    # 训练模型  
    svm.fit(X_train, y_train)  
    logger.info(f"SVM-RBF训练完成,参数:C={svm.C}, gamma={svm.gamma}")  
    return svm  

def evaluate_model(model:SVC,X_test:np.ndarray,y_test:np.ndarray)->dict:
	"""评估模型性能(多分类F1-score、混淆矩阵)"""
	"""评估模型性能(多分类F1-score、混淆矩阵)"""  
    y_pred = model.predict(X_test)  
    report = classification_report(y_test, y_pred, output_dict=True)  
    f1_macro = f1_score(y_test, y_pred, average="macro")  # 宏平均F1(关注少数类)  
    logger.info(f"测试集F1-macro: {f1_macro:.4f},分类报告:\n{report}")  
    return {"f1_macro": f1_macro, "classification_report": report}

if __name__=="__main__":
	# 加载特征(算法团队处理后的TF-IDF+统计特征)
	X,y,_ = load_features_and_labels(
		feature_path = "data/processed/tfidf_matrix.npz",
		label_path = "data/processed/stat_features.csv"
	)
	# 划分训练集/测试集(分层抽样,保持部门分布)
	X_train,X_test,y_train,y_test = train_test_split(
		X, y, test_size=0.2, random_state=42, stratify=y
	)
	
	# 处理类别不平衡(SVM用class_weight)
	class_weight = handle_class_imbalance(X_train,y_train)

	# 训练SVM-RBF模型
	svm_model = train_svm_rbf(X_train,y_train,class_weight)

	# 评估模型
	eval_metrics = evaluate_model(svm_model,X_test,y_test)

	# 保存模型(供业务团队调用)
	joblib.dump(svm_model,"model/svm_rbf_ticket_classifier.pkl")
	
	# 记录MLflow实验
	with mlfolw.start_fun(run_name="svm_rbf_ticket_classification"):
		mlflow.log_param("kernel", "rbf")  
        mlflow.log_param("C", 10)  
        mlflow.log_param("gamma", 0.1)  
        mlflow.log_metric("f1_macro", eval_metrics["f1_macro"])  
        mlflow.sklearn.log_model(svm_model, "svm_rbf_model")  
    logger.info("模型训练完成,已保存至model/svm_rbf_ticket_classifier.pkl")
		

业务团队仓库(business-ticket-classification)

business-ticket-classification/  
├── api_gateway/                    # Kong API网关配置  
│   ├── kong.yml                    # 路由规则(/classify转发至工单分类服务)  
│   └── auth_plugin.lua             # JWT认证插件  
├── ticket_classification_service/  # 工单分类微服务(FastAPI)  
│   ├── main.py                     # 服务入口(加载模型+Feast客户端)  
│   ├── predictor.py                # 核心预测逻辑(调用SVM模型+特征服务)  
│   ├── schemas.py                  # 请求/响应模型(Pydantic)  
│   └── requirements.txt             # 依赖:fastapi, uvicorn, feast, joblib  
├── monitoring/                     # 监控配置  
│   ├── prometheus_rules.yml         # 告警规则(F1-score<0.9触发重训)  
│   └── grafana_dashboards/          # 面板(API延迟、分类准确率、PSI)  
└── deployment/                     # K8s部署配置  
    ├── svm-model-deployment.yaml   # SVM模型服务Deployment  
    └── feast-service-deployment.yaml # Feast特征服务Deployment

工单分类服务(调用算法团队特征服务+模型,predictor.py)

import joblib
import pandas as pd
from feast import FeatureStore  # 调用算法团队的Feast特征服务
from pydantic import BaseModel # 请求、响应模型

logging.basicConfig(level=logging.INFO)  
logger = logging.getLogger(__name__)  

# 1.定义请求、响应模型
class TicketRequest(BaseModel):
	ticket_id: str  # 工单ID(业务系统生成,唯一标识)  
    content: str  # 工单内容(非结构化文本)  
    user_id: str  # 用户ID(用于获取历史工单特征)  
    submit_time: str  # 提交时间(ISO格式)

class ClassificationResponse(BaseModel):
	ticket_id: str  
    predicted_department: str  # 预测部门ID(如DEP01)  
    confidence: float  # 预测置信度(0-1)  
    key_keywords: list[str]  # 关键匹配关键词(如["破损", "换货"])  
    timestamp: str

class TicketClassifier:
	def __init__(self,model_path:str,feast_repo_path:str,feast_registry_url:str):
		"""初始化:加载算法团队的SVM模型和Feast特征服务客户端"""
		# 加载SVM模型(算法团队输出的joblib文件)
		self.model = joblib.load(model_path)
		#连接算法团队部署的Feast服务(gRPC地址,K8s内部域名)
		self.feature_store = FeatureStore(
			repo_path=feast_repo_path,  # 本地特征仓库(开发用),生产用远程registry  
            registry=feast_registry_url  # 算法团队Feast注册表URL(如http://feast-registry:8080)  
		)
		logger.info("工单分类器初始化完成:模型加载成功,Feast服务连接成功")

	def extract_keywords(self,content:str) -> list[str]:
		"""结巴分词+关键词提取(复用算法团队的特征工程逻辑)"""
		import jieba
		from feature_engineering.text_processing import load_stopwords # 算法团队提供的停用词表  
		stopwords = load_stopwords()
		words = jieba.cut(content) # 结巴分词(精确模式)
		keywords = [word for word in words if word not in stopwords and len(word) > 1]
		return keywords[:5] # 取前5个关键词(业务测展示用)
	
	def get_realtime_features(self,ticket_id:str,user_id:str)->pd.DataFrame:
		"""调用算法团队的Feast特征服务,获取实时特征(在线特征)"""
		feature_refs = [
			"word_count", "stopword_ratio", "keyword_match_count",  
            "user_ticket_freq_1m", "submit_hour_risk"  
		]
		# 从Feast获取实时特征(实体:ticket_id,关联user_id)  
        # 注:Feast支持多实体,此处简化为ticket_id,实际可加user_id作为实体  
        feature_vector = self.feature_store.get_online_features(  
            entity_rows=[{"ticket_id": ticket_id}],  # 实体行(工单ID)  
            features=feature_refs  # 特征引用列表  
        ).to_dict()  # 转为字典:{feature_name: [value]}  
        return pd.DataFrame(feature_vector)

	def predict(self,request:TicketRequest)->ClassificationResponse:
		"""核心预测逻辑:融合实时特征+模型预测"""
		try:
			# 1.提取工单内容关键词(结巴分词+停用词过滤,复用算法团队逻辑)
			key_keywords = self.extract_keywords(request.content)

			# 2.调用Feast特征服务,并获取实时特征(算法团队维护的在线特征)
			realtime_features = self.get_realtime_features(
				ticket_id = request.ticket_id,
				user_id = request.user_id
			)
			if realtime_features.empty:
				raise ValueError(f"Feast服务未找到ticket_id={request.ticket_id}的特征")

			# 3.构造TF-IDF特征(实时计算,因工单内容是最新文本)
			from feature_engineering.tfidf_vectorizer import vectorize_text  # 算法团队提供的TF-IDF向量化函数 
			tfidf_vector = vectorize_text([request.content]) # 输入文本列表,输出稀疏向量

			# 合并特征(TF-IDF向量+实时统计特征)
			from scipy.sparse import hstack
			X = hstack([tfidf_vector, realtime_features.values])
			
			# 5.调用SVM-RBF模型预测部门和置信度
			predicted_dept_idx = self.model.predict(X)[0] # 预测部门索引
			dept_mapping = self.model.classes_ #部门ID列表如["DEP01", "DEP02", ...])
			predicted_department = dept_mapping[predicted_dept_idx]  
            confidence = self.model.predict_proba(X)[0][predicted_dept_idx]  # 置信度(最高类概率)
			
			# 6. 构造响应  
            return ClassificationResponse(  
                ticket_id=request.ticket_id,  
                predicted_department=predicted_department,  
                confidence=round(confidence, 4),  
                key_keywords=key_keywords,  
                timestamp=pd.Timestamp.now().isoformat()  
            )  
		except Exception as e:
			logger.error(f"预测失败:ticket_id={request.ticket_id}, 错误={str(e)}", exc_info=True)  
            raise

部署后应用

Step 1:工单提交时实时分类

客服系统(如Zendesk)通过Webhook调用业务团队API网关:POST https://api.ecom.com/ticket/classify
请求体(TicketRequest):

{  
  "ticket_id": "TK20231001",  
  "content": "订单20231001,物流显示已签收但未收到,联系快递员电话不通",  
  "user_id": "U123",  
  "submit_time": "2023-10-02T10:30:00"  
}

Step 2:API处理与响应

工单分类服务调用TicketClassifier.predict(),内部流程:

  1. 结巴分词+停用词过滤提取关键词(如“物流”“签收”“未收到”)
  2. 调用Feast服务获取实时特征(如用户近1月工单数=2,提交时间10:30→非夜间=0)
  3. 构造TF-IDF特征(基于“物流”“快递员”等词权重)
  4. SVM-RBF模型预测部门(物流部,DEP02),置信度0.95

响应体(ClassificationResponse):

{  
  "ticket_id": "TK20231001",  
  "predicted_department": "DEP02",  
  "confidence": 0.95,  
  "key_keywords": ["物流", "签收", "未收到", "快递员"],  
  "timestamp": "2023-10-02T10:30:05"  
}

Step 3:工单自动分配与监控

客服系统根据predicted_department自动分配至对应部门队列(如物流部),并在工单看板标注置信度(<0.7时标黄,需人工复核)

监控系统(Grafana)实时展示分类准确率、PSI(特征分布漂移)、API错误率,若PSI>0.25(数据分布变化),自动触发模型重训(Airflow调度算法团队重训流程)

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐