AI应用架构师揭秘:价值投资AI多智能体系统精准分析密码
自主性(Autonomy):智能体能在无外部控制的情况下自主决策self.capabilities = capabilities # 智能体能力描述self.state = "idle" # 自主状态管理result = await self.process_task(task) # 自主任务处理交互性(Interaction):通过通信语言实现智能体间信息交换协作性(Collaboration)
AI应用架构师揭秘:构建价值投资AI多智能体系统——从技术架构到精准分析实现
副标题:基于LangChain+知识图谱+强化学习的智能投研全流程解决方案
摘要/引言
问题陈述:价值投资分析的时代痛点
在信息爆炸的今天,传统价值投资面临着前所未有的挑战:
- 信息过载:单家上市公司的公开信息包括100+页的财报、每日数十条相关新闻、数千条社交媒体评论、上百个宏观经济指标关联,人工分析效率低下
- 分析维度局限:传统分析过度依赖财务数据,忽视产业链关系、政策影响、管理层动态等隐性因素
- 实时性不足:季度财报发布后2-3周才能完成深度分析,错过最佳投资窗口
- 认知偏差:分析师容易受锚定效应、从众心理影响,导致估值判断失真
据晨星研究显示,2023年主动管理型基金中,仅有37%跑赢基准指数,较2010年下降21个百分点。背后反映的正是传统投研模式在复杂市场环境下的适应性不足。
核心方案:AI多智能体系统的破局之道
本文提出的价值投资AI多智能体系统通过模拟人类投研团队协作模式,将复杂分析任务拆解为专业化子任务,由不同功能的AI智能体协同完成:
- 数据采集智能体:7x24小时监控全网信息源,实时获取财报、新闻、研报等数据
- 数据处理智能体:清洗结构化数据、提取非结构化文本关键信息、构建金融知识图谱
- 分析智能体集群:包括财务分析、行业分析、估值建模、风险评估等专业化智能体
- 协调智能体:基于强化学习动态分配任务,解决智能体间冲突,整合分析结果
- 交互智能体:通过自然语言界面与人类投资者交互,解释分析逻辑,接收反馈
主要成果/价值
通过本文,你将获得:
- 技术架构设计能力:掌握多智能体系统的分层架构、智能体通信协议、任务调度机制
- 金融AI应用开发技能:学习如何将NLP、知识图谱、强化学习等AI技术落地到投研场景
- 可复用代码库:包含10+核心模块、5000+行可运行代码,覆盖从数据采集到决策输出全流程
- 工程化实践经验:了解金融AI系统的容器化部署、性能优化、风险控制关键技术
文章导览
本文将按照"理论-架构-实现-优化"的逻辑展开:
- 第一部分:解析价值投资与多智能体系统的融合基础
- 第二部分:详细设计系统架构与核心智能体实现方案
- 第三部分:分步实现各智能体模块并进行集成测试
- 第四部分:探讨系统优化策略与未来发展方向
目标读者与前置知识
目标读者画像
本文适合以下读者群体:
- AI应用开发者:希望将多智能体技术应用于金融领域的工程师
- 量化投资从业者:寻求AI技术提升投研效率的量化分析师
- 金融科技创业者:计划构建智能投研系统的产品技术负责人
- 技术管理者:需要评估AI投研系统可行性的技术决策者
前置知识要求
为获得最佳阅读体验,建议读者具备:
- 编程基础:Python熟练应用(理解装饰器、异步编程),能阅读中等复杂度代码
- AI基础知识:了解机器学习基本概念,知道NLP、知识图谱的应用场景
- 金融基础:理解财务报表基本结构(资产负债表、利润表、现金流量表),知道PE、PB、ROE等估值指标含义
- 工程技能:熟悉RESTful API设计,了解Docker容器基本使用,知道数据库基本操作
无需深入掌握:复杂的金融衍生品知识、底层深度学习算法实现、分布式系统理论
文章目录
问题背景与动机
传统价值投资分析的局限性
本杰明·格雷厄姆创立的价值投资理论虽历经时间考验,但在数字化时代面临严峻挑战:
1. 信息处理能力不足
- 数据规模爆炸:单家上市公司年披露信息量从2000年的50页增至2023年的5000+页(含PDF财报、ESG报告、临时公告等)
- 信息维度扩展:传统分析聚焦财务数据,现代投资需考虑ESG表现、供应链关系、社交媒体舆情等20+维度
- 处理时效要求:财报发布后48小时内完成分析成为机构竞争焦点,人工分析难以满足
2. 分析深度与广度矛盾
- 专业分工困境:行业分析师平均覆盖15-20家公司,难以深入掌握各公司细节
- 跨领域知识壁垒:理解科技公司需AI知识,分析医药企业需生物医学背景,单一分析师知识结构有限
- 市场变化加速:行业生命周期从10年缩短至3-5年,传统分析框架更新滞后
3. 决策过程的认知偏差
行为金融学研究表明,即使专业投资者也存在系统性认知偏差:
- 确认偏差:倾向寻找支持既有观点的信息(如某券商连续12次上调某股票评级)
- 锚定效应:过度依赖初始信息(如分析师常受上季度财报数据锚定)
- 损失厌恶:对亏损的敏感度是盈利的2.5倍,导致非理性决策
现有AI解决方案的不足
当前金融AI应用存在明显局限性,难以满足价值投资的深度需求:
1. 单一模型的能力边界
- 技术路线局限:纯NLP模型擅长文本理解但缺乏财务建模能力;传统量化模型无法处理非结构化信息
- 泛化能力不足:在A股市场训练的模型难以迁移到港股/美股市场;单一行业模型无法跨行业应用
- 可解释性差:深度学习模型的"黑箱"特性,不符合监管要求和投资决策逻辑
2. 缺乏协作机制
- 功能单一:现有工具多聚焦单点能力(如财报解析、情感分析),缺乏整合
- 被动执行:需人工触发不同工具,无法形成自动化分析流程
- 冲突处理缺失:当不同模型给出矛盾结论时(如估值模型vs舆情模型),缺乏有效协调机制
3. 金融场景适配性不足
- 数据噪音处理弱:金融数据存在大量异常值(如突发政策影响),通用AI模型鲁棒性不足
- 专业知识融合难:难以将价值投资原则(如安全边际)编码为AI可执行规则
- 实时性与准确性平衡:金融决策要求低延迟,现有模型推理速度难以满足
多智能体系统的独特优势
多智能体系统(Multi-Agent System, MAS)通过模拟人类团队协作模式,为解决上述问题提供了理想方案:
1. 专业化分工与协作
- 功能模块化:不同智能体专注特定领域(如财务分析、行业研究),实现"术业有专攻"
- 动态协作:通过任务分配机制,实现多智能体协同完成复杂任务
- 知识共享:建立共享知识库,实现跨智能体信息互通
2. 应对复杂环境能力
- 分布式处理:多智能体并行工作,提升大规模数据处理效率
- 鲁棒性增强:单一智能体故障不影响整个系统运行
- 自适应调整:根据市场变化动态调整各智能体权重
3. 符合价值投资本质
- 深度分析:通过多智能体协作实现多维度穿透式分析
- 理性决策:避免人类认知偏差,严格执行价值投资纪律
- 长期视角:不受短期市场波动干扰,坚持长期投资逻辑
案例对比:某头部券商研究显示,采用多智能体系统的投研团队,在同等人力投入下,覆盖公司数量提升300%,分析报告产出速度提升400%,投资建议准确率提升15-20%。
核心概念与理论基础
多智能体系统(MAS)核心理论
1. 多智能体系统定义与特征
多智能体系统是由多个相互作用的智能体组成的集合,具有以下核心特征:
-
自主性(Autonomy):智能体能在无外部控制的情况下自主决策
class FinancialAgent: def __init__(self, agent_id, capabilities): self.agent_id = agent_id self.capabilities = capabilities # 智能体能力描述 self.state = "idle" # 自主状态管理 async def run(self): while True: task = await self.task_queue.get() self.state = "working" result = await self.process_task(task) # 自主任务处理 self.state = "idle"
-
交互性(Interaction):通过通信语言实现智能体间信息交换
-
协作性(Collaboration):为实现共同目标而协同工作
-
异质性(Heterogeneity):智能体可采用不同算法和数据结构
-
动态性(Dynamics):系统能适应环境和自身能力变化
2. 智能体通信机制
有效的通信是多智能体系统协作的基础,主要通信范式包括:
-
消息传递:基于ACL(Agent Communication Language)的标准化消息格式
# 金融领域扩展的ACL消息格式 class FinancialMessage: def __init__(self, sender, receiver, performative, content): self.sender = sender # 发送智能体ID self.receiver = receiver # 接收智能体ID self.performative = performative # 消息类型:请求/告知/承诺等 self.content = content # 消息内容 self.timestamp = datetime.now() # 时间戳 self.priority = 0 # 消息优先级 self.signature = None # 数字签名,确保完整性 def to_json(self): return { "sender": self.sender, "receiver": self.receiver, "performative": self.performative, "content": self.content, "timestamp": self.timestamp.isoformat(), "priority": self.priority, "signature": self.signature }
-
黑板系统:共享数据区域,智能体可读写信息(适合数据密集型协作)
-
发布-订阅:智能体订阅感兴趣的事件,系统自动推送相关信息
3. 智能体协作模型
根据价值投资分析特点,我们采用混合协作模型:
- 层次式协作:协调智能体位于顶层,负责任务分配与结果整合
- 市场机制:资源有限时,通过"任务拍卖"机制分配高优先级任务
- 合同网协议:协调智能体发布任务公告→智能体投标→中标者执行任务
价值投资的AI适配性分析
将价值投资原则转化为AI可执行的规则,是系统设计的关键:
1. 价值投资核心原则的形式化
需要将格雷厄姆、巴菲特的投资哲学转化为数学表达和逻辑规则:
-
安全边际原则
def calculate_safety_margin(intrinsic_value, market_price, uncertainty_factor): """ 计算安全边际:考虑不确定性的价值折扣 intrinsic_value: 内在价值 market_price: 市场价格 uncertainty_factor: 不确定性系数(0-1),值越高表示确定性越低 """ margin = (intrinsic_value * (1 - uncertainty_factor) - market_price) / market_price return max(margin, 0) # 安全边际不能为负
-
能力圈原则:通过行业分类和知识图谱表示智能体能力边界
-
市场先生理论:设计情绪波动检测模型识别市场非理性时机
2. 价值投资决策流程的模块化
将巴菲特的"投资决策树"拆解为可执行的模块流程:
价值投资决策流程
├── 企业分析
│ ├── 业务模式识别
│ ├── 竞争优势评估
│ └── 管理层能力分析
├── 财务分析
│ ├── 财务健康度评估
│ ├── 盈利质量分析
│ └── 增长潜力预测
├── 估值计算
│ ├── DCF模型
│ ├── 相对估值模型
│ └── 情景分析
└── 风险评估
├── 行业风险识别
├── 企业特定风险分析
└── 宏观风险评估
每个模块对应特定智能体的核心功能,通过标准化接口实现数据流转。
关键AI技术在价值投资中的应用
多种AI技术融合是实现精准分析的基础:
1. 自然语言处理(NLP)技术栈
处理金融文本信息的完整技术链:
-
文本预处理:
- 金融领域分词(基于jieba自定义词典添加金融术语)
- 财务报告表格提取(基于LayoutLM模型)
def financial_text_preprocessing(text): # 1. 特殊符号处理 text = re.sub(r'[\u2000-\u206F\u2E00-\u2E7F\\'!"#$%&()*+,\-.\/:;<=>?@\[\]^_`{|}~]+', ' ', text) # 2. 金融术语分词 jieba.load_userdict("financial_terms.txt") # 加载自定义金融词典 words = jieba.cut(text) # 3. 停用词过滤 stopwords = set(open("financial_stopwords.txt").read().splitlines()) filtered_words = [w for w in words if w not in stopwords and len(w) > 1] return filtered_words
-
关键信息抽取:
- 实体识别:公司名、人名、财务指标、产品名等
- 关系抽取:如"贵州茅台→子公司→习酒"、“净利润→增长→25%”
- 事件抽取:如"公司发布回购计划"、“高管离职”
-
深度语义理解:
- 财报情感分析(不同于普通文本,财务文本有特殊情感表达)
- 管理层讨论与分析(MD&A)部分的风险提示识别
- 政策文件对行业影响的预测
2. 知识图谱技术
构建金融领域知识图谱,实现关联关系推理:
-
核心实体类型:公司、人物、产品、行业、概念、事件
-
关系类型:股权关系、供应链关系、竞争关系、合作关系
-
属性定义:财务指标、行业分类、市场表现等
-
知识图谱构建流程:
async def build_financial_knowledge_graph(raw_data): # 1. 实体识别 entities = await entity_recognition_agent.process(raw_data) # 2. 关系抽取 relations = await relation_extraction_agent.process(entities) # 3. 属性抽取 attributes = await attribute_extraction_agent.process(entities) # 4. 知识融合 merged_graph = await knowledge_fusion_agent.merge(entities, relations, attributes) # 5. 知识存储 await neo4j_client.import_graph(merged_graph) # 6. 推理补全 inferred_relations = await reasoning_agent.infer(merged_graph) await neo4j_client.add_relations(inferred_relations) return merged_graph
3. 强化学习与决策优化
将投资决策过程建模为马尔可夫决策过程:
-
状态空间:包含公司基本面、行业状况、市场情绪等多维特征
-
动作空间:买入、持有、卖出等决策动作
-
奖励函数:基于长期投资回报和风险调整后收益设计
-
价值投资导向的强化学习目标:
def value_investing_reward(state, action, next_state): """ 价值投资特化的奖励函数 """ # 1. 基础回报(考虑持有周期,鼓励长期持有) holding_period = next_state.timestamp - state.timestamp base_return = next_state.price / state.price - 1 # 2. 安全边际奖励(安全边际越高,奖励越大) safety_margin_reward = max(0, state.safety_margin * 0.1) # 3. 风险惩罚(波动率越高,惩罚越大) volatility_penalty = next_state.volatility * 0.05 # 4. 综合奖励 total_reward = (base_return * min(holding_period.days / 365, 3) + # 持有3年以上不再增加权重 safety_margin_reward - volatility_penalty) return total_reward
系统架构设计
整体架构 overview
基于分层架构思想,系统采用"5层3横"的立体架构设计:
价值投资AI多智能体系统架构
┌─────────────────────────────────────────────────────────────┐
│ 应用层 (Application Layer) │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────────┐ │
│ │ Web客户端 │ │ 移动客户端 │ │ API服务接口 │ │
│ └──────────────┘ └──────────────┘ └──────────────────┘ │
├─────────────────────────────────────────────────────────────┤
│ 协调层 (Coordination Layer) │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────────┐ │
│ │ 协调智能体 │ │ 任务调度器 │ │ 冲突解决机制 │ │
│ └──────────────┘ └──────────────┘ └──────────────────┘ │
├─────────────────────────────────────────────────────────────┤
│ 功能层 (Functional Layer) │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────┐ │
│ │数据采集 │ │数据处理 │ │财务分析 │ │估值建模 │ │风险 │ │
│ │智能体 │ │智能体 │ │智能体 │ │智能体 │ │智能体│ │
│ └─────────┘ └─────────┘ └─────────┘ └─────────┘ └─────┘ │
│ ┌─────────┐ ┌─────────┐ ┌─────────────────┐ │
│ │行业分析 │ │交互响应 │ │知识图谱智能体 │ │
│ │智能体 │ │智能体 │ │ │ │
│ └─────────┘ └─────────┘ └─────────────────┘ │
├─────────────────────────────────────────────────────────────┤
│ 数据层 (Data Layer) │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────┐ │
│ │关系型DB │ │文档DB │ │时序DB │ │知识图谱│ │缓存 │ │
│ │(财务数据)│ │(研报) │ │(行情) │ │(关系) │ │系统 │ │
│ └─────────┘ └─────────┘ └─────────┘ └─────────┘ └─────┘ │
├─────────────────────────────────────────────────────────────┤
│ 基础设施层 (Infrastructure Layer) │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────────────┐ │
│ │容器编排 │ │监控告警 │ │日志系统 │ │CI/CD流水线 │ │
│ └─────────┘ └─────────┘ └─────────┘ └─────────────────┘ │
└─────────────────────────────────────────────────────────────┘
核心架构特点
- 分层解耦:各层通过标准化接口通信,便于独立升级和替换
- 水平扩展:功能层智能体可根据业务需求横向扩展(如增加ESG分析智能体)
- 容错设计:关键智能体支持主备模式,单个智能体故障不影响整体系统
- 数据隔离:不同类型数据存储在专用数据库,优化查询性能
智能体功能划分与协作模式
1. 核心智能体功能矩阵
每个智能体具备明确定义的职责边界和能力范围:
智能体名称 | 核心功能 | 输入数据 | 输出结果 | 技术栈 |
---|---|---|---|---|
数据采集智能体 | 多源数据聚合 | 数据源配置、采集规则 | 原始结构化/非结构化数据 | Scrapy、Selenium、Requests |
数据处理智能体 | 数据清洗与特征工程 | 原始数据 | 标准化特征、知识三元组 | Pandas、PySpark、LayoutLM |
财务分析智能体 | 财务健康度评估 | 标准化财务数据 | 财务指标、健康评分、异常预警 | XGBoost、财务比率模型 |
行业分析智能体 | 行业趋势与竞争格局分析 | 行业数据、公司数据 | 行业景气度、竞争格局图谱 | LSTM、主题模型 |
估值建模智能体 | 多模型估值计算 | 财务数据、行业数据 | 内在价值、估值区间、敏感性分析 | 金融工程模型、随机森林 |
风险评估智能体 | 多维风险识别与量化 | 各类基础数据 | 风险因子、风险等级、影响评估 | 风险矩阵模型、蒙特卡洛模拟 |
协调智能体 | 任务调度与结果整合 | 用户需求、系统状态 | 任务分配指令、综合分析报告 | 强化学习、规则引擎 |
交互智能体 | 自然语言交互与解释 | 用户查询、分析结果 | 自然语言回答、可视化结果 | LangChain、Streamlit |
知识图谱智能体 | 关系推理与知识补全 | 实体数据、关系数据 | 推理结论、关联关系 | Neo4j、PyTorch Geometric |
2. 智能体协作时序图
以"个股深度分析"任务为例,展示智能体协作流程:
用户 → 交互智能体 → 协调智能体 → 任务分解与分配
↓
协调智能体
├→ 数据采集智能体 → [财报数据, 新闻数据, 行业数据] → 数据处理智能体 → 标准化数据
├→ 财务分析智能体 ←───────────────────────────────────┘ → 财务健康报告
├→ 行业分析智能体 ←───────────────────────────────────┘ → 行业分析报告
├→ 估值建模智能体 ←─────┬──────────────────────────────┘ → 估值报告
│ │
└→ 风险评估智能体 ←─────┴──────────────────────────────┘ → 风险报告
↓ ↓ ↓ ↓
协调智能体 ←─────────────────────────────────────────────────────────┘
↓
协调智能体 → 综合分析报告 → 交互智能体 → 用户
3. 核心数据流设计
系统数据流遵循"采集-处理-分析-决策"的价值投资逻辑:
数据采集层 → 原始数据缓冲区 → 数据处理层 → 标准化数据存储
↓
知识图谱构建 ←─────────────────────────┘
↓
各专业智能体 ← 任务队列 ← 协调智能体 ← 用户请求
↓
分析结果存储 → 协调智能体 → 综合报告生成 → 交互展示
技术栈选型与理由
基于系统需求和金融场景特性,技术栈选型遵循"稳定优先、专业适配"原则:
1. 核心开发语言与框架
-
主语言:Python 3.9+
- 理由:金融数据处理库丰富;AI框架支持完善;开发效率高
- 补充:关键性能模块用Rust编写Python扩展
-
多智能体框架:LangChain + 自定义扩展
- 理由:提供Agent基类和工具调用能力;支持多智能体协作;社区活跃
- 扩展:增加金融领域专用智能体类型和协作协议
-
API服务:FastAPI
- 理由:高性能异步支持;自动生成API文档;类型提示友好
2. 数据处理与存储
-
关系型数据库:PostgreSQL + TimescaleDB扩展
- 用途:存储结构化财务数据、用户配置信息
- 优势:支持复杂SQL查询;TimescaleDB扩展优化时序数据存储
-
文档数据库:Elasticsearch
- 用途:存储非结构化文本(研报、新闻);支持全文检索
- 优势:金融文本搜索性能优异;可构建向量索引支持语义搜索
-
知识图谱数据库:Neo4j
- 用途:存储实体关系数据;支持关联推理
- 优势:Cypher查询语言适合金融关系分析;图算法丰富
-
缓存系统:Redis
- 用途:智能体通信消息队列;热点数据缓存;任务状态存储
- 优势:支持复杂数据结构;发布订阅功能适合智能体通信
3. AI模型与框架
-
NLP基础框架:Hugging Face Transformers
- 预训练模型:bert-base-chinese-finance(金融领域微调版本)
- 优化策略:量化推理(INT8);模型蒸馏
-
机器学习框架:PyTorch + Scikit-learn
- 用途:自定义模型开发;传统机器学习模型训练
- 优势:动态图调试方便;金融特征工程支持好
-
强化学习框架:Stable Baselines3
- 用途:协调智能体任务调度策略训练
- 优势:算法丰富;与PyTorch无缝集成
4. 部署与运维
-
容器化:Docker + Docker Compose
- 优势:环境一致性;隔离性好;部署便捷
-
监控系统:Prometheus + Grafana
- 监控指标:各智能体性能;模型准确率;系统响应时间
- 告警策略:多级告警;智能降噪
-
日志系统:ELK Stack
- 用途:系统日志;智能体行为审计;问题排查
- 优势:全文检索;可视化分析
环境准备
开发环境配置
1. 基础环境要求
- 操作系统:Linux (Ubuntu 20.04+) 或 macOS 12+
- 硬件配置:
- CPU:8核以上(推荐16核)
- 内存:32GB以上(智能体并发运行需要)
- GPU:NVIDIA Tesla T4/RTX 3090(模型训练推理加速)
- 存储:500GB SSD(金融数据存储需求)
2. Python环境配置
推荐使用conda管理Python环境:
# 创建并激活环境
conda create -n value-investment-agent python=3.9 -y
conda activate value-investment-agent
# 安装基础依赖
pip install -U pip setuptools wheel
pip install -r requirements.txt
requirements.txt完整内容:
# 核心框架
langchain==0.0.304
fastapi==0.104.1
uvicorn==0.23.2
pydantic==2.4.2
# 数据处理
pandas==2.1.1
numpy==1.26.0
scipy==1.11.3
pyspark==3.4.1
python-dotenv==1.0.0
python-multipart==0.0.6
# NLP处理
transformers==4.34.0
tokenizers==0.14.1
sentence-transformers==2.2.2
spacy==3.6.1
jieba==0.42.1
layoutparser==0.3.4
paddleocr==2.7.0.3
# 机器学习
torch==2.0.1
torch-geometric==2.3.1
scikit-learn==1.3.0
xgboost==2.0.0
lightgbm==4.1.0
stable-baselines3==2.0.0
# 数据库
psycopg2-binary==2.9.9
redis==5.0.1
neo4j==5.13.0
elasticsearch==8.10.1
influxdb-client==1.36.1
# 数据采集
scrapy==2.11.0
selenium==4.12.0
requests==2.31.0
beautifulsoup4==4.12.2
feedparser==6.0.10
# 可视化
streamlit==1.27.2
plotly==5.17.0
matplotlib==3.8.0
seaborn==0.12.2
# 工具类
loguru==0.7.2
tqdm==4.66.1
tenacity==8.2.3
schedule==1.2.0
pyarrow==14.0.1
cryptography==41.0.4
# 测试
pytest==7.4.2
pytest-asyncio==0.21.1
3. 代码仓库结构
采用模块化设计的代码组织结构:
value-investment-agent/
├── agent/ # 智能体实现
│ ├── base/ # 基础智能体类
│ ├── collector/ # 数据采集智能体
│ ├── processor/ # 数据处理智能体
│ ├── financial/ # 财务分析智能体
│ ├── industry/ # 行业分析智能体
│ ├── valuation/ # 估值建模智能体
│ ├── risk/ # 风险评估智能体
│ ├── coordinator/ # 协调智能体
│ ├── interaction/ # 交互智能体
│ └── knowledge/ # 知识图谱智能体
├── api/ # API服务
│ ├── v1/ # v1版本接口
│ └── docs/ # API文档
├── data/ # 数据相关
│ ├── raw/ # 原始数据
│ ├── processed/ # 处理后数据
│ └── models/ # 模型文件
├── database/ # 数据库配置
│ ├── migrations/ # 数据库迁移脚本
│ └── scripts/ # 数据库操作脚本
├── framework/ # 系统框架
│ ├── communication/ # 通信模块
│ ├── scheduling/ # 调度模块
│ ├── storage/ # 存储模块
│ └── utils/ # 工具函数
├── frontend/ # 前端界面
│ ├── streamlit/ # Streamlit界面
│ └── static/ # 静态资源
├── config/ # 配置文件
├── tests/ # 测试代码
├── scripts/ # 部署脚本
├── docker/ # Docker配置
├── requirements.txt # 依赖文件
├── run.py # 入口文件
└── README.md # 项目说明
基础设施搭建
使用Docker Compose一键部署基础设施:
1. Docker Compose配置
创建docker-compose.yml
文件:
version: '3.8'
services:
# PostgreSQL数据库 - 存储结构化数据
postgres:
image: timescale/timescaledb:latest-pg14
container_name: vi-postgres
environment:
POSTGRES_USER: ${DB_USER}
POSTGRES_PASSWORD: ${DB_PASSWORD}
POSTGRES_DB: ${DB_NAME}
volumes:
- postgres_data:/var/lib/postgresql/data
ports:
- "5432:5432"
healthcheck:
test: ["CMD-SHELL", "pg_isready -U ${DB_USER} -d ${DB_NAME}"]
interval: 10s
timeout: 5s
retries: 5
# Redis - 消息队列和缓存
redis:
image: redis:7.2-alpine
container_name: vi-redis
command: redis-server --requirepass ${REDIS_PASSWORD}
volumes:
- redis_data:/data
ports:
- "6379:6379"
healthcheck:
test: ["CMD", "redis-cli", "-a", "${REDIS_PASSWORD}", "ping"]
interval: 10s
timeout: 5s
retries: 5
# Neo4j - 知识图谱数据库
neo4j:
image: neo4j:5.13-community
container_name: vi-neo4j
environment:
NEO4J_AUTH: ${NEO4J_USER}/${NEO4J_PASSWORD}
NEO4J_PLUGINS: '["apoc", "graph-data-science"]'
NEO4J_dbms_memory_heap_initial__size: 2G
NEO4J_dbms_memory_heap_max__size: 4G
volumes:
- neo4j_data:/data
- neo4j_plugins:/plugins
ports:
- "7474:7474" # Web界面
- "7687:7687" # Bolt协议
healthcheck:
test: ["CMD-SHELL", "wget --no-verbose --tries=1 --spider http://localhost:7474 || exit 1"]
interval: 30s
timeout: 10s
retries: 5
# Elasticsearch - 文本检索和存储
elasticsearch:
image: elasticsearch:8.10.1
container_name: vi-elasticsearch
environment:
- discovery.type=single-node
- xpack.security.enabled=false
- "ES_JAVA_OPTS=-Xms2g -Xmx4g"
volumes:
- elasticsearch_data:/usr/share/elasticsearch/data
ports:
- "9200:9200"
healthcheck:
test: ["CMD-SHELL", "curl -f http://localhost:9200/_cluster/health || exit 1"]
interval: 30s
timeout: 10s
retries: 5
# Kibana - Elasticsearch可视化
kibana:
image: kibana:8.10.1
container_name: vi-kibana
environment:
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
ports:
- "5601:5601"
depends_on:
elasticsearch:
condition: service_healthy
volumes:
postgres_data:
redis_data:
neo4j_data:
neo4j_plugins:
elasticsearch_data:
2. 环境变量配置
创建.env
文件配置环境变量:
# 数据库配置
DB_USER=vi_user
DB_PASSWORD=vi_password
DB_NAME=value_investment
DB_HOST=localhost
DB_PORT=5432
# Redis配置
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_PASSWORD=vi_redis_password
REDIS_DB=0
# Neo4j配置
NEO4J_USER=neo4j
NEO4J_PASSWORD=vi_neo4j_password
NEO4J_HOST=localhost
NEO4J_PORT=7687
# Elasticsearch配置
ES_HOST=localhost
ES_PORT=9200
ES_USER=
ES_PASSWORD=
# API配置
API_HOST=0.0.0.0
API_PORT=8000
API_DEBUG=False
# 代理配置
HTTP_PROXY=
HTTPS_PROXY=
# 日志配置
LOG_LEVEL=INFO
LOG_DIR=logs/
# 模型配置
MODEL_DIR=models/
EMBEDDING_MODEL=sentence-transformers/paraphrase-multilingual-MiniLM-Lxx-v2
FINANCIAL_MODEL=uer/roberta-base-finetuned-chinanews-chinese
3. 启动基础设施
执行以下命令启动所有基础设施服务:
# 构建并启动服务
docker-compose up -d
# 检查服务状态
docker-compose ps
# 查看日志
docker-compose logs -f
服务启动成功后,可以通过以下地址访问各服务:
- PostgreSQL: localhost:5432
- Redis: localhost:6379
- Neo4j: http://localhost:7474
- Elasticsearch: http://localhost:9200
- Kibana: http://localhost:5601
测试数据集准备
为方便开发测试,准备包含以下内容的测试数据集:
1. 财务数据样例
创建tests/data/financial_data_sample.csv
,包含5家上市公司3年财务数据:
公司代码,公司名称,年份,营业收入(亿),净利润(亿),资产负债率(%),ROE(%),经营现金流(亿),研发费用(亿)
600519,贵州茅台,2020,979.93,466.97,21.43,31.41,572.10,15.74
600519,贵州茅台,2021,1094.64,524.60,20.38,30.12,640.90,18.34
600519,贵州茅台,2022,1275.56,627.16,19.44,31.29,746.80,21.87
000858,五粮液,2020,573.21,199.55,27.50,25.26,237.04,2.87
000858,五粮液,2021,662.09,233.77,26.70,25.56,270.39,3.71
...
2. 新闻文本样例
创建tests/data/news_sample.json
,包含100条财经新闻:
[
{
"id": "news_001",
"title": "贵州茅台2022年净利润同比增长19.18%,拟10派259.11元",
"content": "贵州茅台(600519.SH)3月27日晚间披露2022年年报,公司去年实现营业收入1275.56亿元,同比增长16.53%;净利润627.16亿元,同比增长19.18%;基本每股收益49.93元。公司拟每10股派发现金红利259.11元(含税),共计分配股利325.49亿元,分红比例为51.9%。",
"source": "证券时报",
"publish_time": "2023-03-27 19:25:30",
"url": "http://example.com/news/001"
},
...
]
3. 研报PDF样例
准备5份不同行业的券商研报PDF文件,存放于tests/data/research_reports/
目录。
4. 初始化脚本
创建数据初始化脚本scripts/init_test_data.py
:
import pandas as pd
import json
import redis
import psycopg2
from neo4j import GraphDatabase
from elasticsearch import Elasticsearch
from dotenv import load_dotenv
import os
# 加载环境变量
load_dotenv()
def init_postgres():
"""初始化PostgreSQL数据库"""
conn = psycopg2.connect(
host=os.getenv("DB_HOST"),
port=os.getenv("DB_PORT"),
user=os.getenv("DB_USER"),
password=os.getenv("DB_PASSWORD"),
dbname=os.getenv("DB_NAME")
)
更多推荐
所有评论(0)