【无标题】金融数据挖掘及其应用综合实训平台
这是一款集成股票市场数据展示、技术指标深度分析、多算法机器学习预测、量化投资策略回测及 AI 智能分析的综合性金融数据分析平台。平台以 "数据输入→数据处理→模型分析→结果展示" 为核心逻辑链条,构建从原始金融数据到实战级投资洞察的完整流水线,兼顾数据处理的高效性、模型预测的准确性及界面操作的易用性,适用于金融实训教学、量化策略研究及市场数据分析等多场景。
这是一款集成股票市场数据展示、技术指标深度分析、多算法机器学习预测、量化投资策略回测及 AI 智能分析的综合性金融数据分析平台。平台以 "数据输入→数据处理→模型分析→结果展示" 为核心逻辑链条,构建从原始金融数据到实战级投资洞察的完整流水线,兼顾数据处理的高效性、模型预测的准确性及界面操作的易用性,适用于金融实训教学、量化策略研究及市场数据分析等多场景。
一、项目架构设计
1.1 分析流程精细化设计
平台采用模块化分层架构,各环节紧密衔接且独立可扩展,具体流程如下:
- 数据接入层:支持多格式(CSV/Excel)、多年度金融数据批量接入,涵盖交易数据、财务数据、指数数据等 11 类核心数据源;
- 数据预处理层:执行格式标准化、异常值过滤、缺失值填充、日期统一转换等操作,为后续分析提供高质量数据基础;
- 特征工程层:计算技术指标、财务比率、市场情绪等多维度特征,构建适配机器学习模型的特征体系;
- 模型分析层:集成多种预测算法与量化策略,实现股票涨跌预测、行业评级、投资组合优化等核心功能;
- 可视化展示层:通过交互式图表、结构化表格、自定义仪表盘等形式,直观呈现分析结果与投资建议。
1.2 技术栈选型与优势
| 技术方向 | 核心技术 | 选型优势 | 应用场景 |
|---|---|---|---|
| 前端 / 交互 | Streamlit | 开发效率高,支持快速构建交互式 Web 应用,原生适配数据科学工作流 | 界面布局、参数配置、结果展示 |
| 数据处理 | Pandas、NumPy | 高效处理结构化数据,支持批量运算与复杂数据清洗逻辑 | 数据加载、格式转换、特征计算 |
| 可视化 | Plotly | 生成交互式图表,支持缩放、悬停详情、多维度展示,适配金融数据特性 | 行情走势、指标分析、策略回测结果可视化 |
| 机器学习 | Scikit-learn | 提供丰富的经典算法库,API 简洁统一,支持模型训练与评估全流程 | 股票涨跌预测、公司综合评级、特征重要性分析 |
| 数据源 | CSV/Excel 本地文件 | 数据存储灵活,便于增量更新与版本管理,适配实训场景数据需求 | 交易数据、财务数据、行业分类数据存储 |
二、核心模块精细化详解
2.1 数据加载模块
2.1.1 设计理念
采用 "分年度读取 + 合并标准化" 策略,适配金融数据按年度更新的行业特性,同时通过进度监控、异常捕获、格式校验等机制,确保数据加载的稳定性与可靠性。
2.1.2 核心功能
- 多源数据加载:支持复权交易数据(2023-2025 年)、常规交易数据(2024-2025 年)、行业分类数据、股票基本信息、上市公司详情、沪深 300 指数数据等 11 类文件加载;
- 数据标准化处理:
- 股票代码:去重、去除空格、统一字符串格式;
- 交易日期:转换为
YYYY-MM-DD标准格式,支持后续区间筛选与时间序列分析; - 数值类型:统一字段数据类型,避免计算过程中类型错误;
- 加载状态监控:通过进度条实时展示加载进度(0%-100%),配合状态文本提示当前加载模块,加载完成后自动显示数据统计概览;
- 异常处理机制:
- 文件缺失时,明确提示需补充的文件清单及路径要求;
- 数据格式错误时,输出详细错误日志,便于问题定位;
- 网络中断或内存不足时,支持断点续载与数据分片加载。
2.1.3 核心代码示例
python
运行
@st.cache_data(ttl=3600, show_spinner=False)
def load_all_data():
data_dict = {}
progress_bar = st.progress(0, text="正在加载数据...")
status_text = st.empty()
# 分年度加载复权交易数据
status_text.text("正在加载复权交易数据...")
try:
adj_2023 = pd.read_csv('复权交易数据2023.csv')
adj_2024 = pd.read_csv('复权交易数据2024.csv')
adj_2025 = pd.read_csv('复权交易数据2025.csv')
# 统一数据格式
for df in [adj_2023, adj_2024, adj_2025]:
df['trade_date'] = pd.to_datetime(df['trade_date'], format='%Y%m%d') # 日期标准化
df['ts_code'] = df['ts_code'].astype(str).str.strip() # 股票代码去重去空格
adj_trade_data = pd.concat([adj_2023, adj_2024, adj_2025], ignore_index=True)
adj_trade_data['year'] = adj_trade_data['trade_date'].dt.year
adj_trade_data['month'] = adj_trade_data['trade_date'].dt.month
data_dict['adj_trade_data'] = adj_trade_data
progress_bar.progress(60, text="复权交易数据加载完成")
except Exception as e:
st.warning(f"复权交易数据加载异常: {str(e)}")
data_dict['adj_trade_data'] = pd.DataFrame()
# 其他数据加载逻辑...
return data_dict
2.2 龙虎榜计算模块
2.2.1 设计理念
针对金融数据中可能存在的缺失值、停牌数据、异常波动等问题,采用 "双方法验证 + 异常过滤 + 结果融合" 策略,确保龙虎榜数据的准确性与参考价值。
2.2.2 核心功能
- 双方法涨跌幅计算:
- 方法 A(直接计算法):基于区间首尾收盘价计算,公式为
(期末收盘价-期初收盘价)/期初收盘价×100%; - 方法 B(复利累乘法):基于每日涨跌幅累乘计算,公式为
(∏(1+每日涨跌幅/100)-1)×100%,仅纳入 - 20%~20% 区间的合理涨跌幅数据;
- 方法 A(直接计算法):基于区间首尾收盘价计算,公式为
- 结果融合策略:
- 若两种方法结果差异小于 50%,采用更精确的复利累乘法结果;
- 若差异大于 50%,取两种结果中的较小值作为保守估计,避免异常数据影响;
- 数据清洗机制:
- 过滤价格为 0 或负数的异常数据;
- 剔除区间内交易日数少于 5 天的股票(避免数据量不足导致的偏差);
- 去重处理同一股票的重复记录;
- 结果输出:生成累计涨幅 Top20 与累计跌幅 Top20 榜单,包含股票代码、股票简称、涨跌幅等关键信息,并支持数据导出与趋势可视化。
2.2.3 核心代码示例
python
运行
def calculate_dragon_tiger(start_date_int, end_date_int, trade_data, stock_info_path):
"""
核心设计思想:多方法验证、异常过滤、结果融合
参数:
start_date_int: 开始日期(整数格式YYYYMMDD)
end_date_int: 结束日期(整数格式YYYYMMDD)
trade_data: 交易数据集(DataFrame)
stock_info_path: 股票基本信息文件路径
返回:
龙虎榜数据(DataFrame)包含股票代码、名称、涨跌幅等字段
"""
# 数据筛选:指定日期区间
mask = (trade_data['trade_date'] >= start_date_int) & (trade_data['trade_date'] <= end_date_int)
filtered_data = trade_data[mask].copy()
# 按股票代码分组计算
dragon_tiger_data = []
for ts_code, group in filtered_data.groupby('ts_code'):
if len(group) < 5: # 过滤交易日数不足5天的股票
continue
# 方法A:首尾收盘价直接计算
first_close = group.iloc[0]["close"]
last_close = group.iloc[-1]["close"]
if first_close <= 0: # 过滤异常价格
continue
pct_from_price = (last_close - first_close) / first_close * 100
# 方法B:日涨跌幅累乘计算
pct_from_daily = None
if 'pct_chg' in group.columns:
valid_pct = group['pct_chg'].dropna()
valid_pct = valid_pct[(valid_pct >= -20) & (valid_pct <= 20)] # 过滤极端涨跌幅
if len(valid_pct) > 0:
cumulative = valid_pct.apply(lambda x: 1 + x/100).prod()
pct_from_daily = (cumulative - 1) * 100
# 结果融合
if pct_from_daily is not None:
if abs(pct_from_price - pct_from_daily) < 50:
final_pct = pct_from_daily
else:
final_pct = min(pct_from_price, pct_from_daily)
else:
final_pct = pct_from_price
# 获取股票名称
stock_name = get_stock_name(ts_code, stock_info_path)
dragon_tiger_data.append({
'ts_code': ts_code,
'stock_name': stock_name,
'pct_change': final_pct,
'trade_days': len(group)
})
return pd.DataFrame(dragon_tiger_data)
2.3 技术指标计算模块
2.3.1 设计理念
将经典金融技术分析理论转化为标准化算法,覆盖趋势类、动量类、超买超卖类等多维度指标,同时处理边界情况(如数据量不足、NaN 值),确保指标计算的稳定性与实用性,既支持可视化展示,也可作为机器学习模型的特征输入。
2.3.2 核心指标详解
| 指标类型 | 具体指标 | 计算逻辑 | 应用场景 |
|---|---|---|---|
| 趋势类指标 | MA5/MA10/MA20/MA60 | 收盘价的 5/10/20/60 日滚动平均 | 判断价格中长期趋势 |
| 动量类指标 | MACD(DIF/DEA/MACD 柱) | DIF=EMA12-EMA26;DEA=DIF 的 9 日 EMA;MACD 柱 = 2×(DIF-DEA) | 捕捉趋势转折与动量变化 |
| 超买超卖类指标 | KDJ | RSV=(收盘价 - 9 日最低价)/(9 日最高价 - 9 日最低价)×100;K=RSV 的 2 日 EMA;D=K 的 2 日 EMA;J=3K-2D | 判断价格超买超卖状态 |
| 相对强弱类指标 | RSI | 14 日上涨幅度均值与下跌幅度均值的比值,公式为 100-(100/(1+RS)) | 衡量市场多空力量对比 |
| 能量类指标 | OBV | 基于收盘价涨跌方向累计成交量,上涨时加成交量,下跌时减成交量 | 验证价格趋势的有效性 |
| 波动率指标 | ATR | 真实波幅(最高价 - 最低价、最高价 - 前收盘价、前收盘价 - 最低价的最大值)的 14 日滚动平均 | 衡量价格波动幅度 |
| 价格位置指标 | 布林带 | 中轨 = 20 日 MA;上轨 = 中轨 + 2×20 日标准差;下轨 = 中轨 - 2×20 日标准差 | 判断价格运行区间与突破信号 |
2.3.3 核心代码示例
python
运行
def calculate_technical_indicators(df):
"""
技术指标计算工厂函数,支持多维度指标批量计算
参数:df为包含trade_date、close、high、low、vol字段的DataFrame
返回:添加技术指标后的DataFrame
"""
df = df.sort_values('trade_date').copy() # 确保按日期排序
df = df.reset_index(drop=True)
# 1. 移动平均线(MA)- 趋势类指标
df['MA5'] = df['close'].rolling(window=5, min_periods=1).mean()
df['MA10'] = df['close'].rolling(window=10, min_periods=1).mean()
df['MA20'] = df['close'].rolling(window=20, min_periods=1).mean()
df['MA60'] = df['close'].rolling(window=60, min_periods=1).mean()
# 2. MACD - 动量类指标
df['EMA12'] = df['close'].ewm(span=12, adjust=False).mean()
df['EMA26'] = df['close'].ewm(span=26, adjust=False).mean()
df['DIF'] = df['EMA12'] - df['EMA26']
df['DEA'] = df['DIF'].ewm(span=9, adjust=False).mean()
df['MACD'] = (df['DIF'] - df['DEA']) * 2
df['MACD_Hist'] = df['MACD'] # MACD柱
# 3. KDJ - 超买超卖类指标
df['Low_9'] = df['low'].rolling(window=9, min_periods=1).min()
df['High_9'] = df['high'].rolling(window=9, min_periods=1).max()
df['RSV'] = (df['close'] - df['Low_9']) / (df['High_9'] - df['Low_9'] + 1e-8) * 100 # 避免除数为0
df['K'] = df['RSV'].ewm(com=2, min_periods=1).mean()
df['D'] = df['K'].ewm(com=2, min_periods=1).mean()
df['J'] = 3 * df['K'] - 2 * df['D']
# 4. RSI - 相对强弱类指标
delta = df['close'].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=14, min_periods=1).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=14, min_periods=1).mean()
rs = gain / (loss + 1e-8) # 避免除数为0
df['RSI'] = 100 - (100 / (1 + rs))
# 5. OBV - 能量类指标
df['OBV'] = (np.sign(df['close'].diff()) * df['vol']).fillna(0).cumsum()
# 6. ATR - 波动率指标
high_low = df['high'] - df['low']
high_close = np.abs(df['high'] - df['close'].shift())
low_close = np.abs(df['low'] - df['close'].shift())
tr = pd.concat([high_low, high_close, low_close], axis=1).max(axis=1)
df['ATR'] = tr.rolling(window=14, min_periods=1).mean()
# 7. 布林带 - 价格位置指标
df['BB_Middle'] = df['close'].rolling(window=20, min_periods=1).mean()
bb_std = df['close'].rolling(window=20, min_periods=1).std()
df['BB_Upper'] = df['BB_Middle'] + 2 * bb_std
df['BB_Lower'] = df['BB_Middle'] - 2 * bb_std
# 处理NaN值
df = df.fillna(method='ffill').fillna(method='bfill')
return df
2.4 机器学习模型模块
2.4.1 设计理念
采用 "统一接口 + 多模型适配" 架构,支持多种经典算法快速切换与对比,通过科学的数据划分、标准化处理、多维度评估,确保模型预测的可靠性,同时降低用户使用门槛,适配不同场景下的预测需求。
2.4.2 核心功能
- 多模型支持:集成逻辑回归、随机森林、支持向量机、神经网络、梯度提升树、AdaBoost 等 6 种算法,覆盖线性与非线性场景;
- 数据划分策略:采用 6:2:2 比例划分训练集、验证集、测试集,贴合金融时间序列数据特性,避免数据泄露;
- 特征处理:支持基础特征、技术指标特征、高级特征等多维度特征集选择,自动筛选有效特征,剔除冗余信息;
- 标准化处理:通过 StandardScaler 对特征进行标准化(均值为 0,方差为 1),提升模型收敛速度与预测精度;
- 多维度评估:输出准确率、精确率、召回率、F1 分数、AUC 值等核心指标,全面评估模型性能;
- 预测结果输出:生成股票涨跌预测信号(上涨 / 下跌 / 横盘)及预测概率,为量化策略提供决策依据。
2.4.3 核心代码示例
python
运行
def train_prediction_model(X, y, model_type='逻辑回归', feature_selection='all'):
"""
模型训练统一接口,支持多模型切换与特征集选择
参数:
X: 特征矩阵(DataFrame)
y: 目标变量(Series),1=上涨,-1=下跌,0=横盘
model_type: 模型类型,可选值为['逻辑回归', '随机森林', '支持向量机', '神经网络', '梯度提升树', 'AdaBoost']
feature_selection: 特征选择策略,可选值为['basic', 'technical', 'advanced', 'all']
返回:
模型训练结果字典,包含模型、scaler、评估指标、预测结果等
"""
# 特征选择
feature_mapping = {
'basic': ['MA5', 'MA10', 'MA20', 'RSI', 'Volume_Ratio', 'Momentum'],
'technical': ['MA5', 'MA10', 'MA20', 'MA60', 'MACD', 'RSI', 'K', 'D', 'J', 'OBV'],
'advanced': ['MA5', 'MA10', 'MA20', 'MA60', 'MACD', 'RSI', 'K', 'D', 'J', 'OBV', 'Volume_Ratio', 'Momentum', 'Volatility', 'ATR', 'Price_Position'],
'all': ['MA5', 'MA10', 'MA20', 'MA60', 'MACD', 'RSI', 'K', 'D', 'J', 'OBV', 'Volume_Ratio', 'Momentum', 'Volatility', 'ATR', 'Price_Position']
}
selected_features = feature_mapping[feature_selection]
X_selected = X[selected_features].copy()
# 数据划分:60%训练集,20%验证集,20%测试集
train_size = int(0.6 * len(X_selected))
val_size = int(0.2 * len(X_selected))
X_train = X_selected.iloc[:train_size]
X_val = X_selected.iloc[train_size:train_size+val_size]
X_test = X_selected.iloc[train_size+val_size:]
y_train = y.iloc[:train_size]
y_val = y.iloc[train_size:train_size+val_size]
y_test = y.iloc[train_size+val_size:]
# 标准化处理
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_val_scaled = scaler.transform(X_val)
X_test_scaled = scaler.transform(X_test)
# 模型字典:定义各模型默认参数
model_dict = {
'逻辑回归': LogisticRegression(max_iter=1000, C=1.0, random_state=42),
'随机森林': RandomForestClassifier(n_estimators=100, max_depth=None, random_state=42),
'支持向量机': SVC(kernel='rbf', C=1.0, probability=True, random_state=42),
'神经网络': MLPClassifier(hidden_layer_sizes=(100, 50), max_iter=1000, random_state=42),
'梯度提升树': GradientBoostingClassifier(n_estimators=100, learning_rate=0.1, random_state=42),
'AdaBoost': AdaBoostClassifier(n_estimators=50, random_state=42)
}
# 模型选择与训练
model = model_dict.get(model_type, model_dict['逻辑回归'])
model.fit(X_train_scaled, y_train)
# 模型评估
def evaluate_model(model, X, y, set_name):
y_pred = model.predict(X)
y_proba = model.predict_proba(X) if hasattr(model, 'predict_proba') else None
accuracy = accuracy_score(y, y_pred)
precision, recall, f1, _ = precision_recall_fscore_support(y, y_pred, average='weighted')
auc_score = roc_auc_score(y, y_proba, multi_class='ovr') if y_proba is not None and len(np.unique(y)) > 1 else None
return {
'set_name': set_name,
'accuracy': accuracy,
'precision': precision,
'recall': recall,
'f1': f1,
'auc': auc_score,
'y_pred': y_pred,
'y_proba': y_proba
}
train_metrics = evaluate_model(model, X_train_scaled, y_train, 'train')
val_metrics = evaluate_model(model, X_val_scaled, y_val, 'val')
test_metrics = evaluate_model(model, X_test_scaled, y_test, 'test')
# 返回结果
return {
'model': model,
'scaler': scaler,
'selected_features': selected_features,
'train_metrics': train_metrics,
'val_metrics': val_metrics,
'test_metrics': test_metrics,
'X_test': X_test,
'y_test': y_test
}
2.5 综合评价模块
2.5.1 设计理念
基于主成分分析(PCA)实现多维度财务指标的降维与融合,将营业收入、净利润、净资产收益率等多个分散指标压缩为单一综合得分,客观反映公司综合实力,为股票排序与投资决策提供量化依据。
2.5.2 核心流程
- 数据清洗:
- 过滤财务指标为负值或 0 的异常数据(如净利润为负可能表示公司经营异常);
- 剔除缺失值占比超过 30% 的样本,剩余缺失值采用行业均值填充;
- 标准化处理:通过 StandardScaler 消除不同指标的量纲差异(如营业收入与净资产收益率的单位不同);
- 主成分分析:
- 提取主成分,保留累计方差贡献率达 95% 的成分,在降维的同时最大限度保留原始信息;
- 计算各主成分的方差贡献率,作为加权系数;
- 综合得分计算:以各主成分的方差贡献率为权重,对标准化后的主成分得分进行加权求和,得到公司综合得分;
- 结果输出:按综合得分排序,生成行业内公司排名榜单,包含股票代码、名称、核心财务指标及综合得分。
2.5.3 核心代码示例
python
运行
def calculate_comprehensive_score(data, year):
"""
基于主成分分析的公司综合评价函数
参数:
data: 财务数据集(DataFrame),包含股票代码、年度、营业收入、净利润、净资产收益率等字段
year: 评价年度(int)
返回:
综合评价结果(DataFrame),包含股票代码、名称、综合得分及排名
"""
# 筛选指定年度数据
year_data = data[data['年度'] == year].copy()
if len(year_data) < 10:
raise ValueError(f"年度{year}的有效数据样本不足10个,无法进行综合评价")
# 数据清洗
# 1. 选择核心财务指标
financial_indicators = ['营业收入', '净利润', '净资产收益率', '资产负债率', '毛利率', '净利率']
data_x = year_data[financial_indicators].copy()
# 2. 过滤负值和0
for col in financial_indicators:
data_x = data_x[data_x[col] > 0]
# 3. 处理缺失值
data_x = data_x.fillna(data_x.mean())
# 4. 关联股票代码和名称
data_x['股票代码'] = year_data.loc[data_x.index, '股票代码']
data_x['股票名称'] = year_data.loc[data_x.index, '股票名称']
# 标准化处理
scaler = StandardScaler()
X_scaled = scaler.fit_transform(data_x[financial_indicators])
# 主成分分析
pca = PCA(n_components=0.95) # 保留95%的信息
X_pca = pca.fit_transform(X_scaled)
explained_variance_ratio = pca.explained_variance_ratio_ # 各主成分方差贡献率
# 计算综合得分:加权求和(权重为方差贡献率)
comprehensive_score = np.dot(X_pca, explained_variance_ratio)
# 构建结果DataFrame
result = pd.DataFrame({
'股票代码': data_x['股票代码'],
'股票名称': data_x['股票名称'],
'综合得分': comprehensive_score
})
# 添加排名
result['排名'] = result['综合得分'].rank(ascending=False, method='min').astype(int)
# 合并原始财务指标
result = pd.merge(result, data_x[['股票代码'] + financial_indicators], on='股票代码', how='left')
return result.sort_values('综合得分', ascending=False)
2.6 收益率计算模块
2.6.1 设计理念
针对不同数据源中股票代码格式不一致的问题,采用 "渐进式匹配" 策略,提高数据查找成功率,同时通过多重有效性校验,确保收益率计算的准确性。
2.6.2 核心功能
- 股票代码多格式匹配:
- 精确匹配:直接匹配完整股票代码(如 601919.SH);
- 纯数字匹配:提取代码中的数字部分进行匹配(如 601919);
- 后缀补充匹配:为纯数字代码添加常见交易所后缀(.SH/.SZ)后匹配;
- 数据有效性校验:
- 时间区间校验:确保指定的开始日期和结束日期在数据覆盖范围内;
- 交易日数校验:确保区间内有效交易日数不少于 2 天(避免无法计算收益率);
- 价格有效性校验:过滤收盘价为 0 或负数的异常数据;
- 收益率计算:采用区间收益率公式
(期末收盘价-期初收盘价)/期初收盘价×100%,计算指定时间区间内的累计收益率。
2.6.3 核心代码示例
python
运行
def calculate_stock_return(rdata, rank_data, start_date, end_date, trade_data):
"""
股票收益率计算函数,支持多格式股票代码匹配与数据有效性校验
参数:
rdata: 综合评价结果(DataFrame)
rank_data: 排名数量(int)
start_date: 开始日期(datetime)
end_date: 结束日期(datetime)
trade_data: 交易数据集(DataFrame)
返回:
收益率计算结果(DataFrame),包含股票代码、名称、综合得分、持有期收益率等
"""
# 数据格式统一化
trade_data_clean = trade_data.copy()
trade_data_clean['trade_date'] = pd.to_datetime(trade_data_clean['trade_date'])
trade_data_clean['ts_code'] = trade_data_clean['ts_code'].astype(str).str.strip().str.upper()
start_date_str = start_date.strftime('%Y-%m-%d')
end_date_str = end_date.strftime('%Y-%m-%d')
# 筛选排名前N的股票
top_stocks = rank_data.head(rank_data).copy()
return_results = []
for _, stock in top_stocks.iterrows():
stock_code = str(stock['股票代码']).strip().upper()
stock_name = stock['股票名称']
comprehensive_score = stock['综合得分']
# 股票代码多格式匹配策略
search_codes = []
if '.' in stock_code:
# 带后缀的代码:保留原格式和纯数字格式
search_codes.append(stock_code)
search_codes.append(stock_code.split('.')[0])
else:
# 纯数字代码:保留原格式和添加后缀格式
search_codes.append(stock_code)
search_codes.append(f"{stock_code}.SH")
search_codes.append(f"{stock_code}.SZ")
# 查找匹配的交易数据
stk_data = None
for code in search_codes:
temp_data = trade_data_clean[
(trade_data_clean['ts_code'].str.contains(code, na=False)) &
(trade_data_clean['trade_date'] >= start_date) &
(trade_data_clean['trade_date'] <= end_date)
]
if not temp_data.empty:
stk_data = temp_data.sort_values('trade_date')
break
# 数据有效性校验
if stk_data is None or len(stk_data) < 2:
return_results.append({
'股票代码': stock_code,
'股票名称': stock_name,
'综合得分': comprehensive_score,
'持有期收益率(%)': 0,
'备注': '有效交易数据不足'
})
continue
# 计算收益率
start_price = stk_data.iloc[0]['close']
end_price = stk_data.iloc[-1]['close']
if start_price <= 0:
return_results.append({
'股票代码': stock_code,
'股票名称': stock_name,
'综合得分': comprehensive_score,
'持有期收益率(%)': 0,
'备注': '期初价格异常'
})
continue
holding_period_return = (end_price - start_price) / start_price * 100
return_results.append({
'股票代码': stock_code,
'股票名称': stock_name,
'综合得分': comprehensive_score,
'持有期收益率(%)': round(holding_period_return, 2),
'交易天数': len(stk_data),
'期初价格': round(start_price, 2),
'期末价格': round(end_price, 2),
'备注': '正常'
})
return pd.DataFrame(return_results)
2.7 Streamlit 界面设计模块
2.7.1 设计理念
采用 "分层组织 + 交互式布局",以侧边栏为导航核心,主界面按功能模块动态加载内容,通过标签页、折叠面板、多列布局等组件优化信息展示,既保证功能的丰富性,又避免界面杂乱,提升用户操作体验。
2.7.2 核心布局
- 侧边栏:
- 模块导航:支持 "市场总览" 与各行业模块快速切换,采用按钮式布局,选中状态高亮显示;
- 系统信息:展示行业数量、交易记录条数、股票总数等核心数据统计;
- 功能按钮:包含数据刷新、帮助文档展开等功能;
- 系统状态:显示数据加载状态(正常 / 异常)与版本信息;
- 主界面:
- 标题与样式:自定义 CSS 美化界面,包含渐变标题、卡片式布局、按钮美化等;
- 市场总览页面:分为 "市场指数分析" 和 "行业统计分析" 两个标签页,包含指数走势图、龙虎榜统计、行业规模排名等内容;
- 行业分析页面:分为 "行业指数交易数据"、"上市公司信息"、"股票交易数据"、"财务数据" 四个标签页,支持技术指标可视化、模型预测、策略回测等功能;
- 交互组件:包含日期选择器、股票筛选框、模型参数配置区、图表交互控件等。
2.7.3 核心代码示例
python
运行
def build_main_interface():
"""构建平台主界面"""
# 页面配置
st.set_page_config(
page_title="金融数据挖掘及其应用综合实训",
page_icon="📊",
layout="wide",
initial_sidebar_state="expanded"
)
# 自定义CSS样式
st.markdown("""
<style>
/* 全局样式 */
.main-header {
font-size: 2.5rem;
color: #1E3A8A;
text-align: center;
margin-bottom: 1rem;
font-weight: bold;
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
-webkit-background-clip: text;
-webkit-text-fill-color: transparent;
padding: 10px 0;
}
.sub-header {
font-size: 1.8rem;
color: #3B82F6;
margin-top: 1.5rem;
margin-bottom: 1rem;
font-weight: 600;
border-left: 5px solid #3B82F6;
padding-left: 15px;
}
.card {
background-color: #F8FAFC;
padding: 1.5rem;
border-radius: 10px;
border-left: 5px solid #3B82F6;
margin-bottom: 1.5rem;
box-shadow: 0 4px 6px rgba(0,0,0,0.1);
transition: transform 0.3s ease;
}
.card:hover {
transform: translateY(-5px);
box-shadow: 0 6px 12px rgba(0,0,0,0.15);
}
/* 按钮美化 */
.stButton > button {
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
color: white;
border: none;
border-radius: 8px !important;
padding: 10px 20px;
font-weight: 600;
transition: all 0.3s ease;
}
.stButton > button:hover {
transform: translateY(-2px);
box-shadow: 0 4px 12px rgba(102, 126, 234, 0.4);
}
</style>
""", unsafe_allow_html=True)
# 加载数据
if 'data' not in st.session_state:
with st.spinner('正在加载数据...'):
st.session_state.data = load_all_data()
data = st.session_state.data
# 侧边栏构建
with st.sidebar:
st.markdown('<div style="font-size: 1.5rem; color: #2e4057; font-weight: bold; margin-bottom: 1.5rem;">📋 选择分析模块</div>', unsafe_allow_html=True)
# 准备行业列表
industry_info = data.get('industry_info', pd.DataFrame())
if not industry_info.empty and '新版一级行业' in industry_info.columns:
industry_list = sorted(set(industry_info['新版一级行业'].dropna().values))
industry_list = [ind for ind in industry_list if ind and ind != 'nan']
module_options = ['📊 市场总览'] + [f"🏭 {ind}" for ind in industry_list]
else:
module_options = ['📊 市场总览', '🏭 国防军工', '🏭 电子', '🏭 医药生物',
'🏭 计算机', '🏭 银行', '🏭 房地产']
# 模块选择按钮
selected_module = st.session_state.get('selected_module', '📊 市场总览')
for option in module_options:
if st.button(
option,
key=f"btn_{option}",
use_container_width=True,
type="primary" if option == selected_module else "secondary"
):
selected_module = option
st.session_state.selected_module = option
st.rerun()
# 系统信息
st.markdown("---")
st.markdown("### ℹ️ 系统信息")
if not industry_info.empty and '新版一级行业' in industry_info.columns:
st.metric("行业数量", len(industry_list))
if 'trade_data' in data and not data['trade_data'].empty:
st.metric("交易记录", f"{len(data['trade_data']):,}")
if 'stock_basic' in data and not data['stock_basic'].empty:
st.metric("股票总数", len(data['stock_basic']))
# 数据刷新按钮
st.markdown("---")
if st.button("🔄 刷新数据", use_container_width=True):
with st.spinner("正在刷新数据..."):
st.session_state.data = load_all_data()
st.success("数据刷新完成!")
st.rerun()
# 主内容区构建
st.markdown('<h1 class="main-header">金融数据挖掘及其应用综合实训</h1>', unsafe_allow_html=True)
module = st.session_state.selected_module
if module.startswith("📊 "):
# 市场总览页面
display_market_overview(data)
elif module.startswith("🏭 "):
# 行业分析页面
industry_name = module[2:]
display_industry_analysis(data, industry_name)
2.8 量化策略回测模块
2.8.1 设计理念
将机器学习模型的预测信号转化为可执行的投资策略,模拟真实交易环境(包含交易成本、滑点),通过历史数据回测评估策略的盈利能力与风险水平,为实际投资提供参考。
2.8.2 核心功能
- 策略逻辑:基于模型预测信号执行全仓交易,预测上涨(信号 = 1)时全仓买入,预测下跌(信号 =-1)时全仓卖出,预测横盘(信号 = 0)时持仓不变;
- 交易成本模拟:
- 佣金率:默认 0.03%(可自定义),按交易金额的比例计算;
- 滑点:默认 0.05%(可自定义),模拟实际交易中买价高于预期、卖价低于预期的情况;
- 绩效指标计算:
- 收益率指标:总收益率、年化收益率、超额收益率(相对沪深 300 指数);
- 风险指标:夏普比率(衡量单位风险的超额收益)、最大回撤(衡量策略最坏表现)、胜率(盈利交易日占比);
- 结果可视化:
- 资产价值走势:展示策略执行过程中资产总值的变化趋势;
- 交易记录详情:包含每笔交易的日期、价格、股数、佣金等信息;
- 策略对比:与买入持有策略的收益率对比图表。
2.8.3 核心代码示例
python
运行
def backtest_strategy(prices, signals, initial_capital=1000000, commission_rate=0.0003, slippage=0.0005):
"""
量化策略回测引擎,基于预测信号执行全仓交易
参数:
prices: 股票价格序列(np.array),与signals长度一致
signals: 预测信号序列(np.array),1=上涨,-1=下跌,0=横盘
initial_capital: 初始资金(float),默认100万元
commission_rate: 佣金率(float),默认0.03%
slippage: 滑点率(float),默认0.05%
返回:
回测结果字典,包含绩效指标、交易记录、资产价值走势等
"""
# 初始化参数
cash = initial_capital # 现金余额
holdings = 0 # 持仓股数
portfolio_values = [] # 资产总值序列
trades = [] # 交易记录
daily_returns = [] # 日收益率序列
# 遍历价格与信号序列
for i in range(len(prices)):
signal = signals[i]
# 考虑滑点:实际交易价格 = 理论价格 × (1 ± 滑点率)
slippage_adjustment = 1 + np.random.uniform(-slippage, slippage)
current_price = prices[i] * slippage_adjustment
# 计算当前资产总值
current_portfolio_value = cash + holdings * current_price
portfolio_values.append(current_portfolio_value)
# 计算日收益率
if i > 0:
daily_return = (current_portfolio_value / portfolio_values[i-1] - 1) * 100
else:
daily_return = 0
daily_returns.append(daily_return)
# 执行交易(从第二个交易日开始,避免首日无历史数据)
if i == 0:
continue
# 买入信号:现金>0且无持仓
if signal == 1 and cash > 0 and holdings == 0:
max_shares = int(cash // (current_price * (1 + commission_rate))) # 最大可买股数(整数)
if max_shares > 0:
trade_value = max_shares * current_price # 交易金额
commission = trade_value * commission_rate # 佣金
cash -= (trade_value + commission) # 扣除交易金额与佣金
holdings += max_shares # 增加持仓
# 记录交易
trades.append({
'trade_date_index': i,
'trade_type': 'BUY',
'price': current_price,
'shares': max_shares,
'trade_value': trade_value,
'commission': commission,
'cash_after_trade': cash,
'holdings_after_trade': holdings
})
# 卖出信号:有持仓
elif signal == -1 and holdings > 0:
trade_value = holdings * current_price # 交易金额
commission = trade_value * commission_rate # 佣金
cash += (trade_value - commission) # 增加现金(扣除佣金)
# 记录交易
trades.append({
'trade_date_index': i,
'trade_type': 'SELL',
'price': current_price,
'shares': holdings,
'trade_value': trade_value,
'commission': commission,
'cash_after_trade': cash,
'holdings_after_trade': 0
})
holdings = 0 # 清空持仓
# 计算绩效指标
final_portfolio_value = portfolio_values[-1] if portfolio_values else initial_capital
total_return = (final_portfolio_value - initial_capital) / initial_capital * 100 # 总收益率
annual_return = total_return / (len(portfolio_values) / 252) if len(portfolio_values) > 0 else 0 # 年化收益率(假设年交易日252天)
# 风险指标
daily_returns_array = np.array(daily_returns)
volatility = daily_returns_array.std() * np.sqrt(252) # 年化波动率
sharpe_ratio = annual_return / (volatility + 1e-8) # 夏普比率(无风险利率假设为0)
max_drawdown = calculate_max_drawdown(portfolio_values) # 最大回撤
win_rate = len([r for r in daily_returns if r > 0]) / len(daily_returns) if daily_returns else 0 # 胜率
# 输出回测结果
return {
'initial_capital': initial_capital,
'final_portfolio_value': final_portfolio_value,
'total_return(%)': round(total_return, 2),
'annual_return(%)': round(annual_return, 2),
'volatility(%)': round(volatility, 2),
'sharpe_ratio': round(sharpe_ratio, 2),
'max_drawdown(%)': round(max_drawdown, 2),
'win_rate(%)': round(win_rate * 100, 2),
'trade_count': len(trades),
'trades': pd.DataFrame(trades),
'portfolio_values': portfolio_values,
'daily_returns': daily_returns
}
def calculate_max_drawdown(portfolio_values):
"""计算最大回撤"""
if len(portfolio_values) < 2:
return 0
peak = portfolio_values[0]
max_dd = 0
for value in portfolio_values:
if value > peak:
peak = value
drawdown = (peak - value) / peak * 100
if drawdown > max_dd:
max_dd = drawdown
return max_dd
2.9 AI 集成模块
2.9.1 设计理念
将大语言模型(LLM)与传统量化分析相结合,通过结构化信息输入与专业提示词设计,引导 AI 从基本面、技术面、行业地位等多维度进行深度分析,同时通过完善的错误处理机制,确保 AI 服务异常时不影响平台整体运行。
2.9.2 核心功能
- 结构化信息整理:从平台数据库中提取股票核心信息,包括股票名称、代码、当前股价、市盈率、市净率、核心财务指标、技术指标等,形成标准化输入;
- 专业提示词设计:明确要求 AI 以资深证券投资顾问的身份,从基本面分析、技术面分析、行业地位、风险提示、投资建议五个维度输出专业分析报告;
- API 调用与错误处理:
- 支持主流大语言模型 API(如 DeepSeek、GPT 等)调用;
- 处理网络超时、认证失败、服务限流、返回格式异常等多种错误场景;
- 结果展示:将 AI 生成的分析报告以结构化文本形式展示,支持复制、导出功能,同时提供报告要点提炼,方便快速阅读。
2.9.3 核心代码示例
python
运行
def ai_stock_analysis(stock_info, api_config):
"""
集成大语言模型进行股票深度分析
参数:
stock_info: 股票信息字典,包含名称、代码、价格、市盈率、财务指标、技术指标等
api_config: API配置字典,包含api_url、headers、model_name等
返回:
AI分析结果字典,包含完整报告、报告要点、生成状态
"""
# 构建结构化提示词
prompt = f"""作为资深证券投资顾问,基于以下股票的全面信息,提供专业、详细的分析报告:
【股票基本信息】
股票名称:{stock_info.get('name', '未知')}
股票代码:{stock_info.get('code', '未知')}
当前股价:{stock_info.get('price', '未知')}元
市盈率(PE):{stock_info.get('pe_ratio', '未知')}
市净率(PB):{stock_info.get('pb_ratio', '未知')}
【核心财务指标】
营业收入:{stock_info.get('revenue', '未知')}元
净利润:{stock_info.get('net_profit', '未知')}元
净资产收益率(ROE):{stock_info.get('roe', '未知')}%
净利率:{stock_info.get('net_margin', '未知')}%
资产负债率:{stock_info.get('debt_ratio', '未知')}%
【技术指标(最新交易日)】
移动平均线:MA5={stock_info.get('ma5', '未知')}元,MA20={stock_info.get('ma20', '未知')}元,MA60={stock_info.get('ma60', '未知')}元
MACD:DIF={stock_info.get('dif', '未知')},DEA={stock_info.get('dea', '未知')},MACD柱={stock_info.get('macd_hist', '未知')}
RSI:{stock_info.get('rsi', '未知')}
KDJ:K={stock_info.get('k', '未知')},D={stock_info.get('d', '未知')},J={stock_info.get('j', '未知')}
【分析要求】
1. 基本面分析:重点分析财务状况、盈利能力、成长性及财务风险;
2. 技术面分析:基于提供的技术指标,判断价格趋势、支撑阻力位及交易信号;
3. 行业地位:分析公司在行业内的竞争优势、市场份额及行业发展前景;
4. 风险提示:明确指出政策风险、行业风险、公司经营风险等潜在风险;
5. 投资建议:分别给出短期(1-3个月)、中期(3-6个月)、长期(6个月以上)的投资建议,包括买入、持有、卖出评级及目标价位。
要求:分析逻辑清晰,数据支撑充分,语言专业但不晦涩,报告结构完整,总字数控制在800-1000字。
"""
# API调用配置
api_url = api_config.get('api_url')
headers = api_config.get('headers', {})
model_name = api_config.get('model_name', 'deepseek-ai/DeepSeek-OCR')
try:
# 发送API请求
response = requests.post(
api_url,
headers=headers,
json={
"model": model_name,
"messages": [
{"role": "system", "content": "你是一名拥有10年以上经验的资深证券投资顾问,擅长股票基本面与技术面综合分析,提供专业、客观的投资建议"},
{"role": "user", "content": prompt}
],
"temperature": 0.7, # 控制输出随机性,0.7为适中值
"max_tokens": 1500 # 限制最大输出字数
},
timeout=30 # 超时时间30秒
)
# 响应状态码检查
response.raise_for_status()
result = response.json()
full_report = result.get('choices', [{}])[0].get('message', {}).get('content', '')
if not full_report:
return {
'status': 'failed',
'full_report': '',
'key_points': [],
'error_message': 'AI返回结果为空'
}
# 提炼报告要点
key_points = extract_report_key_points(full_report)
return {
'status': 'success',
'full_report': full_report,
'key_points': key_points,
'error_message': ''
}
except requests.exceptions.Timeout:
return {
'status': 'failed',
'full_report': '',
'key_points': [],
'error_message': 'API请求超时,请稍后重试'
}
except requests.exceptions.ConnectionError:
return {
'status': 'failed',
'full_report': '',
'key_points': [],
'error_message': '网络连接错误,请检查网络状态'
}
except requests.exceptions.HTTPError as e:
return {
'status': 'failed',
'full_report': '',
'key_points': [],
'error_message': f'API请求失败,状态码:{e.response.status_code},详情:{e.response.text}'
}
except Exception as e:
return {
'status': 'failed',
'full_report': '',
'key_points': [],
'error_message': f'未知错误:{str(e)}'
}
def extract_report_key_points(report):
"""提炼AI分析报告的核心要点"""
key_points = []
# 基于报告结构提取要点,可根据实际返回格式调整
if '基本面分析' in report:
key_points.append('【基本面要点】' + report.split('基本面分析')[1].split('技术面分析')[0].strip()[:100] + '...')
if '技术面分析' in report:
key_points.append('【技术面要点】' + report.split('技术面分析')[1].split('行业地位')[0].strip()[:100] + '...')
if '行业地位' in report:
key_points.append('【行业地位要点】' + report.split('行业地位')[1].split('风险提示')[0].strip()[:100] + '...')
if '风险提示' in report:
key_points.append('【风险提示】' + report.split('风险提示')[1].split('投资建议')[0].strip()[:100] + '...')
if '投资建议' in report:
key_points.append('【投资建议】' + report.split('投资建议')[1].strip()[:100] + '...')
return key_points
三、平台特色与优势
- 数据处理精细化:采用分年度加载、多格式匹配、异常值过滤等多重机制,确保数据质量与可靠性;
- 功能覆盖全面化:从数据加载、指标计算、模型预测到策略回测、AI 分析,覆盖金融数据挖掘全流程;
- 模型算法多样化:集成 6 种经典机器学习算法,支持多模型对比,适配不同场景下的预测需求;
- 界面交互友好化:采用分层布局、交互式图表、自定义样式,降低使用门槛,提升操作体验;
- 结果输出专业化:提供结构化报告、量化指标、可视化图表等多种输出形式,满足不同用户需求;
- 扩展能力强大:模块化设计便于新增数据源、指标或算法,支持功能横向扩展与纵向深化。
四、使用注意事项
- 数据准备:确保本地数据文件格式正确、路径规范,缺失关键文件可能导致部分功能无法使用;
- 参数配置:模型训练与策略回测时,建议根据数据量与场景需求合理调整参数(如预测天数、佣金率等);
- 结果解读:模型预测与策略回测结果仅供参考,不构成投资建议,实际投资需结合市场动态、政策环境等多因素综合判断;
- 性能优化:大数据量分析时(如全市场股票多年度数据),建议分批次处理或增加内存配置,避免加载延迟;
- 系统维护:定期刷新数据以保证数据时效性,发现异常时可通过错误日志定位问题,必要时重新加载数据。
五、版本迭代与扩展方向
- 数据源扩展:新增 API 接口对接实时行情数据,支持股票、基金、期货等多品类资产数据接入;
- 模型优化:引入深度学习模型(如 LSTM、Transformer),提升时间序列数据的预测精度;
- 策略丰富化:新增多因子策略、套利策略、风险管理策略等,支持复杂投资组合构建;
- 协作功能:添加用户权限管理、数据共享、报告协作编辑等功能,适配团队协作场景;
- 移动端适配:优化界面响应式设计,支持移动端访问与操作,提升使用便捷性。
金融数据挖掘及其应用综合实训 -视频
平台的技术指标深度分析功能有哪些具体的指标?
平台的可视化展示层支持哪些交互式图表?
平台的量化投资策略回测功能是如何实现的?
更多推荐



所有评论(0)