这是一款集成股票市场数据展示、技术指标深度分析、多算法机器学习预测、量化投资策略回测及 AI 智能分析的综合性金融数据分析平台。平台以 "数据输入→数据处理→模型分析→结果展示" 为核心逻辑链条,构建从原始金融数据到实战级投资洞察的完整流水线,兼顾数据处理的高效性、模型预测的准确性及界面操作的易用性,适用于金融实训教学、量化策略研究及市场数据分析等多场景。

一、项目架构设计

1.1 分析流程精细化设计

平台采用模块化分层架构,各环节紧密衔接且独立可扩展,具体流程如下:

  1. 数据接入层:支持多格式(CSV/Excel)、多年度金融数据批量接入,涵盖交易数据、财务数据、指数数据等 11 类核心数据源;
  2. 数据预处理层:执行格式标准化、异常值过滤、缺失值填充、日期统一转换等操作,为后续分析提供高质量数据基础;
  3. 特征工程层:计算技术指标、财务比率、市场情绪等多维度特征,构建适配机器学习模型的特征体系;
  4. 模型分析层:集成多种预测算法与量化策略,实现股票涨跌预测、行业评级、投资组合优化等核心功能;
  5. 可视化展示层:通过交互式图表、结构化表格、自定义仪表盘等形式,直观呈现分析结果与投资建议。

1.2 技术栈选型与优势

技术方向 核心技术 选型优势 应用场景
前端 / 交互 Streamlit 开发效率高,支持快速构建交互式 Web 应用,原生适配数据科学工作流 界面布局、参数配置、结果展示
数据处理 Pandas、NumPy 高效处理结构化数据,支持批量运算与复杂数据清洗逻辑 数据加载、格式转换、特征计算
可视化 Plotly 生成交互式图表,支持缩放、悬停详情、多维度展示,适配金融数据特性 行情走势、指标分析、策略回测结果可视化
机器学习 Scikit-learn 提供丰富的经典算法库,API 简洁统一,支持模型训练与评估全流程 股票涨跌预测、公司综合评级、特征重要性分析
数据源 CSV/Excel 本地文件 数据存储灵活,便于增量更新与版本管理,适配实训场景数据需求 交易数据、财务数据、行业分类数据存储

二、核心模块精细化详解

2.1 数据加载模块

2.1.1 设计理念

采用 "分年度读取 + 合并标准化" 策略,适配金融数据按年度更新的行业特性,同时通过进度监控、异常捕获、格式校验等机制,确保数据加载的稳定性与可靠性。

2.1.2 核心功能
  1. 多源数据加载:支持复权交易数据(2023-2025 年)、常规交易数据(2024-2025 年)、行业分类数据、股票基本信息、上市公司详情、沪深 300 指数数据等 11 类文件加载;
  2. 数据标准化处理
    • 股票代码:去重、去除空格、统一字符串格式;
    • 交易日期:转换为YYYY-MM-DD标准格式,支持后续区间筛选与时间序列分析;
    • 数值类型:统一字段数据类型,避免计算过程中类型错误;
  3. 加载状态监控:通过进度条实时展示加载进度(0%-100%),配合状态文本提示当前加载模块,加载完成后自动显示数据统计概览;
  4. 异常处理机制
    • 文件缺失时,明确提示需补充的文件清单及路径要求;
    • 数据格式错误时,输出详细错误日志,便于问题定位;
    • 网络中断或内存不足时,支持断点续载与数据分片加载。
2.1.3 核心代码示例

python

运行

@st.cache_data(ttl=3600, show_spinner=False)
def load_all_data():
    data_dict = {}
    progress_bar = st.progress(0, text="正在加载数据...")
    status_text = st.empty()
    
    # 分年度加载复权交易数据
    status_text.text("正在加载复权交易数据...")
    try:
        adj_2023 = pd.read_csv('复权交易数据2023.csv')
        adj_2024 = pd.read_csv('复权交易数据2024.csv')
        adj_2025 = pd.read_csv('复权交易数据2025.csv')
        
        # 统一数据格式
        for df in [adj_2023, adj_2024, adj_2025]:
            df['trade_date'] = pd.to_datetime(df['trade_date'], format='%Y%m%d')  # 日期标准化
            df['ts_code'] = df['ts_code'].astype(str).str.strip()  # 股票代码去重去空格
        
        adj_trade_data = pd.concat([adj_2023, adj_2024, adj_2025], ignore_index=True)
        adj_trade_data['year'] = adj_trade_data['trade_date'].dt.year
        adj_trade_data['month'] = adj_trade_data['trade_date'].dt.month
        data_dict['adj_trade_data'] = adj_trade_data
        progress_bar.progress(60, text="复权交易数据加载完成")
    except Exception as e:
        st.warning(f"复权交易数据加载异常: {str(e)}")
        data_dict['adj_trade_data'] = pd.DataFrame()
    
    # 其他数据加载逻辑...
    return data_dict

2.2 龙虎榜计算模块

2.2.1 设计理念

针对金融数据中可能存在的缺失值、停牌数据、异常波动等问题,采用 "双方法验证 + 异常过滤 + 结果融合" 策略,确保龙虎榜数据的准确性与参考价值。

2.2.2 核心功能
  1. 双方法涨跌幅计算
    • 方法 A(直接计算法):基于区间首尾收盘价计算,公式为(期末收盘价-期初收盘价)/期初收盘价×100%
    • 方法 B(复利累乘法):基于每日涨跌幅累乘计算,公式为(∏(1+每日涨跌幅/100)-1)×100%,仅纳入 - 20%~20% 区间的合理涨跌幅数据;
  2. 结果融合策略
    • 若两种方法结果差异小于 50%,采用更精确的复利累乘法结果;
    • 若差异大于 50%,取两种结果中的较小值作为保守估计,避免异常数据影响;
  3. 数据清洗机制
    • 过滤价格为 0 或负数的异常数据;
    • 剔除区间内交易日数少于 5 天的股票(避免数据量不足导致的偏差);
    • 去重处理同一股票的重复记录;
  4. 结果输出:生成累计涨幅 Top20 与累计跌幅 Top20 榜单,包含股票代码、股票简称、涨跌幅等关键信息,并支持数据导出与趋势可视化。
2.2.3 核心代码示例

python

运行

def calculate_dragon_tiger(start_date_int, end_date_int, trade_data, stock_info_path):
    """
    核心设计思想:多方法验证、异常过滤、结果融合
    参数:
        start_date_int: 开始日期(整数格式YYYYMMDD)
        end_date_int: 结束日期(整数格式YYYYMMDD)
        trade_data: 交易数据集(DataFrame)
        stock_info_path: 股票基本信息文件路径
    返回:
        龙虎榜数据(DataFrame)包含股票代码、名称、涨跌幅等字段
    """
    # 数据筛选:指定日期区间
    mask = (trade_data['trade_date'] >= start_date_int) & (trade_data['trade_date'] <= end_date_int)
    filtered_data = trade_data[mask].copy()
    
    # 按股票代码分组计算
    dragon_tiger_data = []
    for ts_code, group in filtered_data.groupby('ts_code'):
        if len(group) < 5:  # 过滤交易日数不足5天的股票
            continue
        
        # 方法A:首尾收盘价直接计算
        first_close = group.iloc[0]["close"]
        last_close = group.iloc[-1]["close"]
        if first_close <= 0:  # 过滤异常价格
            continue
        pct_from_price = (last_close - first_close) / first_close * 100
        
        # 方法B:日涨跌幅累乘计算
        pct_from_daily = None
        if 'pct_chg' in group.columns:
            valid_pct = group['pct_chg'].dropna()
            valid_pct = valid_pct[(valid_pct >= -20) & (valid_pct <= 20)]  # 过滤极端涨跌幅
            if len(valid_pct) > 0:
                cumulative = valid_pct.apply(lambda x: 1 + x/100).prod()
                pct_from_daily = (cumulative - 1) * 100
        
        # 结果融合
        if pct_from_daily is not None:
            if abs(pct_from_price - pct_from_daily) < 50:
                final_pct = pct_from_daily
            else:
                final_pct = min(pct_from_price, pct_from_daily)
        else:
            final_pct = pct_from_price
        
        # 获取股票名称
        stock_name = get_stock_name(ts_code, stock_info_path)
        dragon_tiger_data.append({
            'ts_code': ts_code,
            'stock_name': stock_name,
            'pct_change': final_pct,
            'trade_days': len(group)
        })
    
    return pd.DataFrame(dragon_tiger_data)

2.3 技术指标计算模块

2.3.1 设计理念

将经典金融技术分析理论转化为标准化算法,覆盖趋势类、动量类、超买超卖类等多维度指标,同时处理边界情况(如数据量不足、NaN 值),确保指标计算的稳定性与实用性,既支持可视化展示,也可作为机器学习模型的特征输入。

2.3.2 核心指标详解
指标类型 具体指标 计算逻辑 应用场景
趋势类指标 MA5/MA10/MA20/MA60 收盘价的 5/10/20/60 日滚动平均 判断价格中长期趋势
动量类指标 MACD(DIF/DEA/MACD 柱) DIF=EMA12-EMA26;DEA=DIF 的 9 日 EMA;MACD 柱 = 2×(DIF-DEA) 捕捉趋势转折与动量变化
超买超卖类指标 KDJ RSV=(收盘价 - 9 日最低价)/(9 日最高价 - 9 日最低价)×100;K=RSV 的 2 日 EMA;D=K 的 2 日 EMA;J=3K-2D 判断价格超买超卖状态
相对强弱类指标 RSI 14 日上涨幅度均值与下跌幅度均值的比值,公式为 100-(100/(1+RS)) 衡量市场多空力量对比
能量类指标 OBV 基于收盘价涨跌方向累计成交量,上涨时加成交量,下跌时减成交量 验证价格趋势的有效性
波动率指标 ATR 真实波幅(最高价 - 最低价、最高价 - 前收盘价、前收盘价 - 最低价的最大值)的 14 日滚动平均 衡量价格波动幅度
价格位置指标 布林带 中轨 = 20 日 MA;上轨 = 中轨 + 2×20 日标准差;下轨 = 中轨 - 2×20 日标准差 判断价格运行区间与突破信号
2.3.3 核心代码示例

python

运行

def calculate_technical_indicators(df):
    """
    技术指标计算工厂函数,支持多维度指标批量计算
    参数:df为包含trade_date、close、high、low、vol字段的DataFrame
    返回:添加技术指标后的DataFrame
    """
    df = df.sort_values('trade_date').copy()  # 确保按日期排序
    df = df.reset_index(drop=True)
    
    # 1. 移动平均线(MA)- 趋势类指标
    df['MA5'] = df['close'].rolling(window=5, min_periods=1).mean()
    df['MA10'] = df['close'].rolling(window=10, min_periods=1).mean()
    df['MA20'] = df['close'].rolling(window=20, min_periods=1).mean()
    df['MA60'] = df['close'].rolling(window=60, min_periods=1).mean()
    
    # 2. MACD - 动量类指标
    df['EMA12'] = df['close'].ewm(span=12, adjust=False).mean()
    df['EMA26'] = df['close'].ewm(span=26, adjust=False).mean()
    df['DIF'] = df['EMA12'] - df['EMA26']
    df['DEA'] = df['DIF'].ewm(span=9, adjust=False).mean()
    df['MACD'] = (df['DIF'] - df['DEA']) * 2
    df['MACD_Hist'] = df['MACD']  # MACD柱
    
    # 3. KDJ - 超买超卖类指标
    df['Low_9'] = df['low'].rolling(window=9, min_periods=1).min()
    df['High_9'] = df['high'].rolling(window=9, min_periods=1).max()
    df['RSV'] = (df['close'] - df['Low_9']) / (df['High_9'] - df['Low_9'] + 1e-8) * 100  # 避免除数为0
    df['K'] = df['RSV'].ewm(com=2, min_periods=1).mean()
    df['D'] = df['K'].ewm(com=2, min_periods=1).mean()
    df['J'] = 3 * df['K'] - 2 * df['D']
    
    # 4. RSI - 相对强弱类指标
    delta = df['close'].diff()
    gain = (delta.where(delta > 0, 0)).rolling(window=14, min_periods=1).mean()
    loss = (-delta.where(delta < 0, 0)).rolling(window=14, min_periods=1).mean()
    rs = gain / (loss + 1e-8)  # 避免除数为0
    df['RSI'] = 100 - (100 / (1 + rs))
    
    # 5. OBV - 能量类指标
    df['OBV'] = (np.sign(df['close'].diff()) * df['vol']).fillna(0).cumsum()
    
    # 6. ATR - 波动率指标
    high_low = df['high'] - df['low']
    high_close = np.abs(df['high'] - df['close'].shift())
    low_close = np.abs(df['low'] - df['close'].shift())
    tr = pd.concat([high_low, high_close, low_close], axis=1).max(axis=1)
    df['ATR'] = tr.rolling(window=14, min_periods=1).mean()
    
    # 7. 布林带 - 价格位置指标
    df['BB_Middle'] = df['close'].rolling(window=20, min_periods=1).mean()
    bb_std = df['close'].rolling(window=20, min_periods=1).std()
    df['BB_Upper'] = df['BB_Middle'] + 2 * bb_std
    df['BB_Lower'] = df['BB_Middle'] - 2 * bb_std
    
    # 处理NaN值
    df = df.fillna(method='ffill').fillna(method='bfill')
    return df

2.4 机器学习模型模块

2.4.1 设计理念

采用 "统一接口 + 多模型适配" 架构,支持多种经典算法快速切换与对比,通过科学的数据划分、标准化处理、多维度评估,确保模型预测的可靠性,同时降低用户使用门槛,适配不同场景下的预测需求。

2.4.2 核心功能
  1. 多模型支持:集成逻辑回归、随机森林、支持向量机、神经网络、梯度提升树、AdaBoost 等 6 种算法,覆盖线性与非线性场景;
  2. 数据划分策略:采用 6:2:2 比例划分训练集、验证集、测试集,贴合金融时间序列数据特性,避免数据泄露;
  3. 特征处理:支持基础特征、技术指标特征、高级特征等多维度特征集选择,自动筛选有效特征,剔除冗余信息;
  4. 标准化处理:通过 StandardScaler 对特征进行标准化(均值为 0,方差为 1),提升模型收敛速度与预测精度;
  5. 多维度评估:输出准确率、精确率、召回率、F1 分数、AUC 值等核心指标,全面评估模型性能;
  6. 预测结果输出:生成股票涨跌预测信号(上涨 / 下跌 / 横盘)及预测概率,为量化策略提供决策依据。
2.4.3 核心代码示例

python

运行

def train_prediction_model(X, y, model_type='逻辑回归', feature_selection='all'):
    """
    模型训练统一接口,支持多模型切换与特征集选择
    参数:
        X: 特征矩阵(DataFrame)
        y: 目标变量(Series),1=上涨,-1=下跌,0=横盘
        model_type: 模型类型,可选值为['逻辑回归', '随机森林', '支持向量机', '神经网络', '梯度提升树', 'AdaBoost']
        feature_selection: 特征选择策略,可选值为['basic', 'technical', 'advanced', 'all']
    返回:
        模型训练结果字典,包含模型、scaler、评估指标、预测结果等
    """
    # 特征选择
    feature_mapping = {
        'basic': ['MA5', 'MA10', 'MA20', 'RSI', 'Volume_Ratio', 'Momentum'],
        'technical': ['MA5', 'MA10', 'MA20', 'MA60', 'MACD', 'RSI', 'K', 'D', 'J', 'OBV'],
        'advanced': ['MA5', 'MA10', 'MA20', 'MA60', 'MACD', 'RSI', 'K', 'D', 'J', 'OBV', 'Volume_Ratio', 'Momentum', 'Volatility', 'ATR', 'Price_Position'],
        'all': ['MA5', 'MA10', 'MA20', 'MA60', 'MACD', 'RSI', 'K', 'D', 'J', 'OBV', 'Volume_Ratio', 'Momentum', 'Volatility', 'ATR', 'Price_Position']
    }
    selected_features = feature_mapping[feature_selection]
    X_selected = X[selected_features].copy()
    
    # 数据划分:60%训练集,20%验证集,20%测试集
    train_size = int(0.6 * len(X_selected))
    val_size = int(0.2 * len(X_selected))
    X_train = X_selected.iloc[:train_size]
    X_val = X_selected.iloc[train_size:train_size+val_size]
    X_test = X_selected.iloc[train_size+val_size:]
    y_train = y.iloc[:train_size]
    y_val = y.iloc[train_size:train_size+val_size]
    y_test = y.iloc[train_size+val_size:]
    
    # 标准化处理
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_val_scaled = scaler.transform(X_val)
    X_test_scaled = scaler.transform(X_test)
    
    # 模型字典:定义各模型默认参数
    model_dict = {
        '逻辑回归': LogisticRegression(max_iter=1000, C=1.0, random_state=42),
        '随机森林': RandomForestClassifier(n_estimators=100, max_depth=None, random_state=42),
        '支持向量机': SVC(kernel='rbf', C=1.0, probability=True, random_state=42),
        '神经网络': MLPClassifier(hidden_layer_sizes=(100, 50), max_iter=1000, random_state=42),
        '梯度提升树': GradientBoostingClassifier(n_estimators=100, learning_rate=0.1, random_state=42),
        'AdaBoost': AdaBoostClassifier(n_estimators=50, random_state=42)
    }
    
    # 模型选择与训练
    model = model_dict.get(model_type, model_dict['逻辑回归'])
    model.fit(X_train_scaled, y_train)
    
    # 模型评估
    def evaluate_model(model, X, y, set_name):
        y_pred = model.predict(X)
        y_proba = model.predict_proba(X) if hasattr(model, 'predict_proba') else None
        accuracy = accuracy_score(y, y_pred)
        precision, recall, f1, _ = precision_recall_fscore_support(y, y_pred, average='weighted')
        auc_score = roc_auc_score(y, y_proba, multi_class='ovr') if y_proba is not None and len(np.unique(y)) > 1 else None
        return {
            'set_name': set_name,
            'accuracy': accuracy,
            'precision': precision,
            'recall': recall,
            'f1': f1,
            'auc': auc_score,
            'y_pred': y_pred,
            'y_proba': y_proba
        }
    
    train_metrics = evaluate_model(model, X_train_scaled, y_train, 'train')
    val_metrics = evaluate_model(model, X_val_scaled, y_val, 'val')
    test_metrics = evaluate_model(model, X_test_scaled, y_test, 'test')
    
    # 返回结果
    return {
        'model': model,
        'scaler': scaler,
        'selected_features': selected_features,
        'train_metrics': train_metrics,
        'val_metrics': val_metrics,
        'test_metrics': test_metrics,
        'X_test': X_test,
        'y_test': y_test
    }

2.5 综合评价模块

2.5.1 设计理念

基于主成分分析(PCA)实现多维度财务指标的降维与融合,将营业收入、净利润、净资产收益率等多个分散指标压缩为单一综合得分,客观反映公司综合实力,为股票排序与投资决策提供量化依据。

2.5.2 核心流程
  1. 数据清洗
    • 过滤财务指标为负值或 0 的异常数据(如净利润为负可能表示公司经营异常);
    • 剔除缺失值占比超过 30% 的样本,剩余缺失值采用行业均值填充;
  2. 标准化处理:通过 StandardScaler 消除不同指标的量纲差异(如营业收入与净资产收益率的单位不同);
  3. 主成分分析
    • 提取主成分,保留累计方差贡献率达 95% 的成分,在降维的同时最大限度保留原始信息;
    • 计算各主成分的方差贡献率,作为加权系数;
  4. 综合得分计算:以各主成分的方差贡献率为权重,对标准化后的主成分得分进行加权求和,得到公司综合得分;
  5. 结果输出:按综合得分排序,生成行业内公司排名榜单,包含股票代码、名称、核心财务指标及综合得分。
2.5.3 核心代码示例

python

运行

def calculate_comprehensive_score(data, year):
    """
    基于主成分分析的公司综合评价函数
    参数:
        data: 财务数据集(DataFrame),包含股票代码、年度、营业收入、净利润、净资产收益率等字段
        year: 评价年度(int)
    返回:
        综合评价结果(DataFrame),包含股票代码、名称、综合得分及排名
    """
    # 筛选指定年度数据
    year_data = data[data['年度'] == year].copy()
    if len(year_data) < 10:
        raise ValueError(f"年度{year}的有效数据样本不足10个,无法进行综合评价")
    
    # 数据清洗
    # 1. 选择核心财务指标
    financial_indicators = ['营业收入', '净利润', '净资产收益率', '资产负债率', '毛利率', '净利率']
    data_x = year_data[financial_indicators].copy()
    # 2. 过滤负值和0
    for col in financial_indicators:
        data_x = data_x[data_x[col] > 0]
    # 3. 处理缺失值
    data_x = data_x.fillna(data_x.mean())
    # 4. 关联股票代码和名称
    data_x['股票代码'] = year_data.loc[data_x.index, '股票代码']
    data_x['股票名称'] = year_data.loc[data_x.index, '股票名称']
    
    # 标准化处理
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(data_x[financial_indicators])
    
    # 主成分分析
    pca = PCA(n_components=0.95)  # 保留95%的信息
    X_pca = pca.fit_transform(X_scaled)
    explained_variance_ratio = pca.explained_variance_ratio_  # 各主成分方差贡献率
    
    # 计算综合得分:加权求和(权重为方差贡献率)
    comprehensive_score = np.dot(X_pca, explained_variance_ratio)
    
    # 构建结果DataFrame
    result = pd.DataFrame({
        '股票代码': data_x['股票代码'],
        '股票名称': data_x['股票名称'],
        '综合得分': comprehensive_score
    })
    
    # 添加排名
    result['排名'] = result['综合得分'].rank(ascending=False, method='min').astype(int)
    
    # 合并原始财务指标
    result = pd.merge(result, data_x[['股票代码'] + financial_indicators], on='股票代码', how='left')
    
    return result.sort_values('综合得分', ascending=False)

2.6 收益率计算模块

2.6.1 设计理念

针对不同数据源中股票代码格式不一致的问题,采用 "渐进式匹配" 策略,提高数据查找成功率,同时通过多重有效性校验,确保收益率计算的准确性。

2.6.2 核心功能
  1. 股票代码多格式匹配
    • 精确匹配:直接匹配完整股票代码(如 601919.SH);
    • 纯数字匹配:提取代码中的数字部分进行匹配(如 601919);
    • 后缀补充匹配:为纯数字代码添加常见交易所后缀(.SH/.SZ)后匹配;
  2. 数据有效性校验
    • 时间区间校验:确保指定的开始日期和结束日期在数据覆盖范围内;
    • 交易日数校验:确保区间内有效交易日数不少于 2 天(避免无法计算收益率);
    • 价格有效性校验:过滤收盘价为 0 或负数的异常数据;
  3. 收益率计算:采用区间收益率公式(期末收盘价-期初收盘价)/期初收盘价×100%,计算指定时间区间内的累计收益率。
2.6.3 核心代码示例

python

运行

def calculate_stock_return(rdata, rank_data, start_date, end_date, trade_data):
    """
    股票收益率计算函数,支持多格式股票代码匹配与数据有效性校验
    参数:
        rdata: 综合评价结果(DataFrame)
        rank_data: 排名数量(int)
        start_date: 开始日期(datetime)
        end_date: 结束日期(datetime)
        trade_data: 交易数据集(DataFrame)
    返回:
        收益率计算结果(DataFrame),包含股票代码、名称、综合得分、持有期收益率等
    """
    # 数据格式统一化
    trade_data_clean = trade_data.copy()
    trade_data_clean['trade_date'] = pd.to_datetime(trade_data_clean['trade_date'])
    trade_data_clean['ts_code'] = trade_data_clean['ts_code'].astype(str).str.strip().str.upper()
    start_date_str = start_date.strftime('%Y-%m-%d')
    end_date_str = end_date.strftime('%Y-%m-%d')
    
    # 筛选排名前N的股票
    top_stocks = rank_data.head(rank_data).copy()
    return_results = []
    
    for _, stock in top_stocks.iterrows():
        stock_code = str(stock['股票代码']).strip().upper()
        stock_name = stock['股票名称']
        comprehensive_score = stock['综合得分']
        
        # 股票代码多格式匹配策略
        search_codes = []
        if '.' in stock_code:
            # 带后缀的代码:保留原格式和纯数字格式
            search_codes.append(stock_code)
            search_codes.append(stock_code.split('.')[0])
        else:
            # 纯数字代码:保留原格式和添加后缀格式
            search_codes.append(stock_code)
            search_codes.append(f"{stock_code}.SH")
            search_codes.append(f"{stock_code}.SZ")
        
        # 查找匹配的交易数据
        stk_data = None
        for code in search_codes:
            temp_data = trade_data_clean[
                (trade_data_clean['ts_code'].str.contains(code, na=False)) &
                (trade_data_clean['trade_date'] >= start_date) &
                (trade_data_clean['trade_date'] <= end_date)
            ]
            if not temp_data.empty:
                stk_data = temp_data.sort_values('trade_date')
                break
        
        # 数据有效性校验
        if stk_data is None or len(stk_data) < 2:
            return_results.append({
                '股票代码': stock_code,
                '股票名称': stock_name,
                '综合得分': comprehensive_score,
                '持有期收益率(%)': 0,
                '备注': '有效交易数据不足'
            })
            continue
        
        # 计算收益率
        start_price = stk_data.iloc[0]['close']
        end_price = stk_data.iloc[-1]['close']
        if start_price <= 0:
            return_results.append({
                '股票代码': stock_code,
                '股票名称': stock_name,
                '综合得分': comprehensive_score,
                '持有期收益率(%)': 0,
                '备注': '期初价格异常'
            })
            continue
        
        holding_period_return = (end_price - start_price) / start_price * 100
        
        return_results.append({
            '股票代码': stock_code,
            '股票名称': stock_name,
            '综合得分': comprehensive_score,
            '持有期收益率(%)': round(holding_period_return, 2),
            '交易天数': len(stk_data),
            '期初价格': round(start_price, 2),
            '期末价格': round(end_price, 2),
            '备注': '正常'
        })
    
    return pd.DataFrame(return_results)

2.7 Streamlit 界面设计模块

2.7.1 设计理念

采用 "分层组织 + 交互式布局",以侧边栏为导航核心,主界面按功能模块动态加载内容,通过标签页、折叠面板、多列布局等组件优化信息展示,既保证功能的丰富性,又避免界面杂乱,提升用户操作体验。

2.7.2 核心布局
  1. 侧边栏
    • 模块导航:支持 "市场总览" 与各行业模块快速切换,采用按钮式布局,选中状态高亮显示;
    • 系统信息:展示行业数量、交易记录条数、股票总数等核心数据统计;
    • 功能按钮:包含数据刷新、帮助文档展开等功能;
    • 系统状态:显示数据加载状态(正常 / 异常)与版本信息;
  2. 主界面
    • 标题与样式:自定义 CSS 美化界面,包含渐变标题、卡片式布局、按钮美化等;
    • 市场总览页面:分为 "市场指数分析" 和 "行业统计分析" 两个标签页,包含指数走势图、龙虎榜统计、行业规模排名等内容;
    • 行业分析页面:分为 "行业指数交易数据"、"上市公司信息"、"股票交易数据"、"财务数据" 四个标签页,支持技术指标可视化、模型预测、策略回测等功能;
    • 交互组件:包含日期选择器、股票筛选框、模型参数配置区、图表交互控件等。
2.7.3 核心代码示例

python

运行

def build_main_interface():
    """构建平台主界面"""
    # 页面配置
    st.set_page_config(
        page_title="金融数据挖掘及其应用综合实训",
        page_icon="📊",
        layout="wide",
        initial_sidebar_state="expanded"
    )
    
    # 自定义CSS样式
    st.markdown("""
    <style>
        /* 全局样式 */
        .main-header {
            font-size: 2.5rem;
            color: #1E3A8A;
            text-align: center;
            margin-bottom: 1rem;
            font-weight: bold;
            background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
            -webkit-background-clip: text;
            -webkit-text-fill-color: transparent;
            padding: 10px 0;
        }
        .sub-header {
            font-size: 1.8rem;
            color: #3B82F6;
            margin-top: 1.5rem;
            margin-bottom: 1rem;
            font-weight: 600;
            border-left: 5px solid #3B82F6;
            padding-left: 15px;
        }
        .card {
            background-color: #F8FAFC;
            padding: 1.5rem;
            border-radius: 10px;
            border-left: 5px solid #3B82F6;
            margin-bottom: 1.5rem;
            box-shadow: 0 4px 6px rgba(0,0,0,0.1);
            transition: transform 0.3s ease;
        }
        .card:hover {
            transform: translateY(-5px);
            box-shadow: 0 6px 12px rgba(0,0,0,0.15);
        }
        /* 按钮美化 */
        .stButton > button {
            background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
            color: white;
            border: none;
            border-radius: 8px !important;
            padding: 10px 20px;
            font-weight: 600;
            transition: all 0.3s ease;
        }
        .stButton > button:hover {
            transform: translateY(-2px);
            box-shadow: 0 4px 12px rgba(102, 126, 234, 0.4);
        }
    </style>
    """, unsafe_allow_html=True)
    
    # 加载数据
    if 'data' not in st.session_state:
        with st.spinner('正在加载数据...'):
            st.session_state.data = load_all_data()
    data = st.session_state.data
    
    # 侧边栏构建
    with st.sidebar:
        st.markdown('<div style="font-size: 1.5rem; color: #2e4057; font-weight: bold; margin-bottom: 1.5rem;">📋 选择分析模块</div>', unsafe_allow_html=True)
        
        # 准备行业列表
        industry_info = data.get('industry_info', pd.DataFrame())
        if not industry_info.empty and '新版一级行业' in industry_info.columns:
            industry_list = sorted(set(industry_info['新版一级行业'].dropna().values))
            industry_list = [ind for ind in industry_list if ind and ind != 'nan']
            module_options = ['📊 市场总览'] + [f"🏭 {ind}" for ind in industry_list]
        else:
            module_options = ['📊 市场总览', '🏭 国防军工', '🏭 电子', '🏭 医药生物', 
                           '🏭 计算机', '🏭 银行', '🏭 房地产']
        
        # 模块选择按钮
        selected_module = st.session_state.get('selected_module', '📊 市场总览')
        for option in module_options:
            if st.button(
                option,
                key=f"btn_{option}",
                use_container_width=True,
                type="primary" if option == selected_module else "secondary"
            ):
                selected_module = option
                st.session_state.selected_module = option
                st.rerun()
        
        # 系统信息
        st.markdown("---")
        st.markdown("### ℹ️ 系统信息")
        if not industry_info.empty and '新版一级行业' in industry_info.columns:
            st.metric("行业数量", len(industry_list))
        if 'trade_data' in data and not data['trade_data'].empty:
            st.metric("交易记录", f"{len(data['trade_data']):,}")
        if 'stock_basic' in data and not data['stock_basic'].empty:
            st.metric("股票总数", len(data['stock_basic']))
        
        # 数据刷新按钮
        st.markdown("---")
        if st.button("🔄 刷新数据", use_container_width=True):
            with st.spinner("正在刷新数据..."):
                st.session_state.data = load_all_data()
            st.success("数据刷新完成!")
            st.rerun()
    
    # 主内容区构建
    st.markdown('<h1 class="main-header">金融数据挖掘及其应用综合实训</h1>', unsafe_allow_html=True)
    module = st.session_state.selected_module
    if module.startswith("📊 "):
        # 市场总览页面
        display_market_overview(data)
    elif module.startswith("🏭 "):
        # 行业分析页面
        industry_name = module[2:]
        display_industry_analysis(data, industry_name)

2.8 量化策略回测模块

2.8.1 设计理念

将机器学习模型的预测信号转化为可执行的投资策略,模拟真实交易环境(包含交易成本、滑点),通过历史数据回测评估策略的盈利能力与风险水平,为实际投资提供参考。

2.8.2 核心功能
  1. 策略逻辑:基于模型预测信号执行全仓交易,预测上涨(信号 = 1)时全仓买入,预测下跌(信号 =-1)时全仓卖出,预测横盘(信号 = 0)时持仓不变;
  2. 交易成本模拟
    • 佣金率:默认 0.03%(可自定义),按交易金额的比例计算;
    • 滑点:默认 0.05%(可自定义),模拟实际交易中买价高于预期、卖价低于预期的情况;
  3. 绩效指标计算
    • 收益率指标:总收益率、年化收益率、超额收益率(相对沪深 300 指数);
    • 风险指标:夏普比率(衡量单位风险的超额收益)、最大回撤(衡量策略最坏表现)、胜率(盈利交易日占比);
  4. 结果可视化
    • 资产价值走势:展示策略执行过程中资产总值的变化趋势;
    • 交易记录详情:包含每笔交易的日期、价格、股数、佣金等信息;
    • 策略对比:与买入持有策略的收益率对比图表。
2.8.3 核心代码示例

python

运行

def backtest_strategy(prices, signals, initial_capital=1000000, commission_rate=0.0003, slippage=0.0005):
    """
    量化策略回测引擎,基于预测信号执行全仓交易
    参数:
        prices: 股票价格序列(np.array),与signals长度一致
        signals: 预测信号序列(np.array),1=上涨,-1=下跌,0=横盘
        initial_capital: 初始资金(float),默认100万元
        commission_rate: 佣金率(float),默认0.03%
        slippage: 滑点率(float),默认0.05%
    返回:
        回测结果字典,包含绩效指标、交易记录、资产价值走势等
    """
    # 初始化参数
    cash = initial_capital  # 现金余额
    holdings = 0  # 持仓股数
    portfolio_values = []  # 资产总值序列
    trades = []  # 交易记录
    daily_returns = []  # 日收益率序列
    
    # 遍历价格与信号序列
    for i in range(len(prices)):
        signal = signals[i]
        # 考虑滑点:实际交易价格 = 理论价格 × (1 ± 滑点率)
        slippage_adjustment = 1 + np.random.uniform(-slippage, slippage)
        current_price = prices[i] * slippage_adjustment
        
        # 计算当前资产总值
        current_portfolio_value = cash + holdings * current_price
        portfolio_values.append(current_portfolio_value)
        
        # 计算日收益率
        if i > 0:
            daily_return = (current_portfolio_value / portfolio_values[i-1] - 1) * 100
        else:
            daily_return = 0
        daily_returns.append(daily_return)
        
        # 执行交易(从第二个交易日开始,避免首日无历史数据)
        if i == 0:
            continue
        
        # 买入信号:现金>0且无持仓
        if signal == 1 and cash > 0 and holdings == 0:
            max_shares = int(cash // (current_price * (1 + commission_rate)))  # 最大可买股数(整数)
            if max_shares > 0:
                trade_value = max_shares * current_price  # 交易金额
                commission = trade_value * commission_rate  # 佣金
                cash -= (trade_value + commission)  # 扣除交易金额与佣金
                holdings += max_shares  # 增加持仓
                # 记录交易
                trades.append({
                    'trade_date_index': i,
                    'trade_type': 'BUY',
                    'price': current_price,
                    'shares': max_shares,
                    'trade_value': trade_value,
                    'commission': commission,
                    'cash_after_trade': cash,
                    'holdings_after_trade': holdings
                })
        
        # 卖出信号:有持仓
        elif signal == -1 and holdings > 0:
            trade_value = holdings * current_price  # 交易金额
            commission = trade_value * commission_rate  # 佣金
            cash += (trade_value - commission)  # 增加现金(扣除佣金)
            # 记录交易
            trades.append({
                'trade_date_index': i,
                'trade_type': 'SELL',
                'price': current_price,
                'shares': holdings,
                'trade_value': trade_value,
                'commission': commission,
                'cash_after_trade': cash,
                'holdings_after_trade': 0
            })
            holdings = 0  # 清空持仓
    
    # 计算绩效指标
    final_portfolio_value = portfolio_values[-1] if portfolio_values else initial_capital
    total_return = (final_portfolio_value - initial_capital) / initial_capital * 100  # 总收益率
    annual_return = total_return / (len(portfolio_values) / 252) if len(portfolio_values) > 0 else 0  # 年化收益率(假设年交易日252天)
    
    # 风险指标
    daily_returns_array = np.array(daily_returns)
    volatility = daily_returns_array.std() * np.sqrt(252)  # 年化波动率
    sharpe_ratio = annual_return / (volatility + 1e-8)  # 夏普比率(无风险利率假设为0)
    max_drawdown = calculate_max_drawdown(portfolio_values)  # 最大回撤
    win_rate = len([r for r in daily_returns if r > 0]) / len(daily_returns) if daily_returns else 0  # 胜率
    
    # 输出回测结果
    return {
        'initial_capital': initial_capital,
        'final_portfolio_value': final_portfolio_value,
        'total_return(%)': round(total_return, 2),
        'annual_return(%)': round(annual_return, 2),
        'volatility(%)': round(volatility, 2),
        'sharpe_ratio': round(sharpe_ratio, 2),
        'max_drawdown(%)': round(max_drawdown, 2),
        'win_rate(%)': round(win_rate * 100, 2),
        'trade_count': len(trades),
        'trades': pd.DataFrame(trades),
        'portfolio_values': portfolio_values,
        'daily_returns': daily_returns
    }

def calculate_max_drawdown(portfolio_values):
    """计算最大回撤"""
    if len(portfolio_values) < 2:
        return 0
    peak = portfolio_values[0]
    max_dd = 0
    for value in portfolio_values:
        if value > peak:
            peak = value
        drawdown = (peak - value) / peak * 100
        if drawdown > max_dd:
            max_dd = drawdown
    return max_dd

2.9 AI 集成模块

2.9.1 设计理念

将大语言模型(LLM)与传统量化分析相结合,通过结构化信息输入与专业提示词设计,引导 AI 从基本面、技术面、行业地位等多维度进行深度分析,同时通过完善的错误处理机制,确保 AI 服务异常时不影响平台整体运行。

2.9.2 核心功能
  1. 结构化信息整理:从平台数据库中提取股票核心信息,包括股票名称、代码、当前股价、市盈率、市净率、核心财务指标、技术指标等,形成标准化输入;
  2. 专业提示词设计:明确要求 AI 以资深证券投资顾问的身份,从基本面分析、技术面分析、行业地位、风险提示、投资建议五个维度输出专业分析报告;
  3. API 调用与错误处理
    • 支持主流大语言模型 API(如 DeepSeek、GPT 等)调用;
    • 处理网络超时、认证失败、服务限流、返回格式异常等多种错误场景;
  4. 结果展示:将 AI 生成的分析报告以结构化文本形式展示,支持复制、导出功能,同时提供报告要点提炼,方便快速阅读。
2.9.3 核心代码示例

python

运行

def ai_stock_analysis(stock_info, api_config):
    """
    集成大语言模型进行股票深度分析
    参数:
        stock_info: 股票信息字典,包含名称、代码、价格、市盈率、财务指标、技术指标等
        api_config: API配置字典,包含api_url、headers、model_name等
    返回:
        AI分析结果字典,包含完整报告、报告要点、生成状态
    """
    # 构建结构化提示词
    prompt = f"""作为资深证券投资顾问,基于以下股票的全面信息,提供专业、详细的分析报告:

    【股票基本信息】
    股票名称:{stock_info.get('name', '未知')}
    股票代码:{stock_info.get('code', '未知')}
    当前股价:{stock_info.get('price', '未知')}元
    市盈率(PE):{stock_info.get('pe_ratio', '未知')}
    市净率(PB):{stock_info.get('pb_ratio', '未知')}

    【核心财务指标】
    营业收入:{stock_info.get('revenue', '未知')}元
    净利润:{stock_info.get('net_profit', '未知')}元
    净资产收益率(ROE):{stock_info.get('roe', '未知')}%
    净利率:{stock_info.get('net_margin', '未知')}%
    资产负债率:{stock_info.get('debt_ratio', '未知')}%

    【技术指标(最新交易日)】
    移动平均线:MA5={stock_info.get('ma5', '未知')}元,MA20={stock_info.get('ma20', '未知')}元,MA60={stock_info.get('ma60', '未知')}元
    MACD:DIF={stock_info.get('dif', '未知')},DEA={stock_info.get('dea', '未知')},MACD柱={stock_info.get('macd_hist', '未知')}
    RSI:{stock_info.get('rsi', '未知')}
    KDJ:K={stock_info.get('k', '未知')},D={stock_info.get('d', '未知')},J={stock_info.get('j', '未知')}

    【分析要求】
    1. 基本面分析:重点分析财务状况、盈利能力、成长性及财务风险;
    2. 技术面分析:基于提供的技术指标,判断价格趋势、支撑阻力位及交易信号;
    3. 行业地位:分析公司在行业内的竞争优势、市场份额及行业发展前景;
    4. 风险提示:明确指出政策风险、行业风险、公司经营风险等潜在风险;
    5. 投资建议:分别给出短期(1-3个月)、中期(3-6个月)、长期(6个月以上)的投资建议,包括买入、持有、卖出评级及目标价位。

    要求:分析逻辑清晰,数据支撑充分,语言专业但不晦涩,报告结构完整,总字数控制在800-1000字。
    """
    
    # API调用配置
    api_url = api_config.get('api_url')
    headers = api_config.get('headers', {})
    model_name = api_config.get('model_name', 'deepseek-ai/DeepSeek-OCR')
    
    try:
        # 发送API请求
        response = requests.post(
            api_url,
            headers=headers,
            json={
                "model": model_name,
                "messages": [
                    {"role": "system", "content": "你是一名拥有10年以上经验的资深证券投资顾问,擅长股票基本面与技术面综合分析,提供专业、客观的投资建议"},
                    {"role": "user", "content": prompt}
                ],
                "temperature": 0.7,  # 控制输出随机性,0.7为适中值
                "max_tokens": 1500  # 限制最大输出字数
            },
            timeout=30  # 超时时间30秒
        )
        
        # 响应状态码检查
        response.raise_for_status()
        result = response.json()
        full_report = result.get('choices', [{}])[0].get('message', {}).get('content', '')
        
        if not full_report:
            return {
                'status': 'failed',
                'full_report': '',
                'key_points': [],
                'error_message': 'AI返回结果为空'
            }
        
        # 提炼报告要点
        key_points = extract_report_key_points(full_report)
        
        return {
            'status': 'success',
            'full_report': full_report,
            'key_points': key_points,
            'error_message': ''
        }
    
    except requests.exceptions.Timeout:
        return {
            'status': 'failed',
            'full_report': '',
            'key_points': [],
            'error_message': 'API请求超时,请稍后重试'
        }
    except requests.exceptions.ConnectionError:
        return {
            'status': 'failed',
            'full_report': '',
            'key_points': [],
            'error_message': '网络连接错误,请检查网络状态'
        }
    except requests.exceptions.HTTPError as e:
        return {
            'status': 'failed',
            'full_report': '',
            'key_points': [],
            'error_message': f'API请求失败,状态码:{e.response.status_code},详情:{e.response.text}'
        }
    except Exception as e:
        return {
            'status': 'failed',
            'full_report': '',
            'key_points': [],
            'error_message': f'未知错误:{str(e)}'
        }

def extract_report_key_points(report):
    """提炼AI分析报告的核心要点"""
    key_points = []
    # 基于报告结构提取要点,可根据实际返回格式调整
    if '基本面分析' in report:
        key_points.append('【基本面要点】' + report.split('基本面分析')[1].split('技术面分析')[0].strip()[:100] + '...')
    if '技术面分析' in report:
        key_points.append('【技术面要点】' + report.split('技术面分析')[1].split('行业地位')[0].strip()[:100] + '...')
    if '行业地位' in report:
        key_points.append('【行业地位要点】' + report.split('行业地位')[1].split('风险提示')[0].strip()[:100] + '...')
    if '风险提示' in report:
        key_points.append('【风险提示】' + report.split('风险提示')[1].split('投资建议')[0].strip()[:100] + '...')
    if '投资建议' in report:
        key_points.append('【投资建议】' + report.split('投资建议')[1].strip()[:100] + '...')
    return key_points

三、平台特色与优势

  1. 数据处理精细化:采用分年度加载、多格式匹配、异常值过滤等多重机制,确保数据质量与可靠性;
  2. 功能覆盖全面化:从数据加载、指标计算、模型预测到策略回测、AI 分析,覆盖金融数据挖掘全流程;
  3. 模型算法多样化:集成 6 种经典机器学习算法,支持多模型对比,适配不同场景下的预测需求;
  4. 界面交互友好化:采用分层布局、交互式图表、自定义样式,降低使用门槛,提升操作体验;
  5. 结果输出专业化:提供结构化报告、量化指标、可视化图表等多种输出形式,满足不同用户需求;
  6. 扩展能力强大:模块化设计便于新增数据源、指标或算法,支持功能横向扩展与纵向深化。

四、使用注意事项

  1. 数据准备:确保本地数据文件格式正确、路径规范,缺失关键文件可能导致部分功能无法使用;
  2. 参数配置:模型训练与策略回测时,建议根据数据量与场景需求合理调整参数(如预测天数、佣金率等);
  3. 结果解读:模型预测与策略回测结果仅供参考,不构成投资建议,实际投资需结合市场动态、政策环境等多因素综合判断;
  4. 性能优化:大数据量分析时(如全市场股票多年度数据),建议分批次处理或增加内存配置,避免加载延迟;
  5. 系统维护:定期刷新数据以保证数据时效性,发现异常时可通过错误日志定位问题,必要时重新加载数据。

五、版本迭代与扩展方向

  1. 数据源扩展:新增 API 接口对接实时行情数据,支持股票、基金、期货等多品类资产数据接入;
  2. 模型优化:引入深度学习模型(如 LSTM、Transformer),提升时间序列数据的预测精度;
  3. 策略丰富化:新增多因子策略、套利策略、风险管理策略等,支持复杂投资组合构建;
  4. 协作功能:添加用户权限管理、数据共享、报告协作编辑等功能,适配团队协作场景;
  5. 移动端适配:优化界面响应式设计,支持移动端访问与操作,提升使用便捷性。

金融数据挖掘及其应用综合实训 -视频

平台的技术指标深度分析功能有哪些具体的指标?

平台的可视化展示层支持哪些交互式图表?

平台的量化投资策略回测功能是如何实现的?

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐