前言

在AI Agent技术快速发展的今天,如何将大语言模型的能力真正落地到实际业务场景中,是每个开发者都在思考的问题。本文将深入剖析博主近期完成的一个完整的Multi-Agent系统开源项目——GoldMind,它基于LangChain框架构建,融合了GLM-4-Plus的实时搜索能力和DeepSeek-V3的深度推理能力,实现了对黄金市场的智能分析。

项目地址GoldMind

如果本文对你有帮助,欢迎给项目一个⭐ Star,你的支持是我持续更新的动力!


一、项目背景与技术选型

1.1 为什么选择黄金市场分析场景

黄金市场具有以下特点,非常适合作为AI Agent的落地场景:

  • 数据维度丰富:价格数据、新闻舆情、机构研报、宏观经济指标

  • 时效性要求高:市场变化快,需要实时分析

  • 分析逻辑复杂:需要综合技术面、基本面、情绪面、机构观点

  • 决策支持价值高:直接关联投资决策

    1.2 技术栈选择

    技术组件

    选型

    理由

    Agent框架

    LangChain

    生态成熟,支持多Agent编排,工具链丰富

    实时搜索模型

    GLM-4-Plus

    支持Web Search API,时效性强

    深度推理模型

    DeepSeek-V3

    671B参数,长文本理解能力强

    后端框架

    FastAPI

    高性能异步框架,自动生成API文档

    前端框架

    React 18

    组件化开发,生态完善

    数据库

    MySQL 8.0

    关系型数据,支持复杂查询

    1.3 为什么选择双引擎架构

    在项目初期,我们尝试使用单一模型完成所有任务,但遇到了以下问题:

    1. 实时性 vs 深度性:需要实时搜索获取最新信息,同时需要深度推理生成高质量分析

    2. 成本 vs 质量:DeepSeek-V3成本低推理能力强但是不支持联网检索,GLM-4-Plus价格高但是检索能力极强

    3. 专业化分工:不同Agent需要不同的能力侧重

      因此,我们采用了双引擎架构

      • GLM-4-Plus:负责数据采集、实时搜索、初步分析

      • DeepSeek-V3:负责深度推理、多源信息融合、投资建议生成

         

        二、系统架构设计

        2.1 整体架构图

        架构解析:

        GoldMind系统采用分层架构设计,遵循关注点分离原则,将系统划分为五个清晰的层次:

        第一层:用户界面层

        采用React 18构建,负责与用户交互,展示分析结果。使用Ant Design组件库确保响应式设计,通过React Hooks管理状态,使用axios进行异步HTTP请求。

        第二层:API网关层

        基于FastAPI框架构建,作为系统的统一入口。FastAPI的优势在于:

        • 基于Python类型提示,自动生成交互式API文档

        • 原生支持异步编程,性能接近Node.js和Go

        • 内置数据验证和序列化,减少样板代码

          第三层:Multi-Agent核心层

          包含五个专用Agent,采用并行分析 + 结果融合的协作模式:

          1. 并行分析阶段:Market Analysis、News Intelligence、Institution Research三个Agent同时工作

          2. 数据收集阶段:各Agent将分析结果以结构化JSON格式返回

          3. 融合分析阶段:Investment Advisory Agent接收前三者的输出,进行初步融合

          4. 综合生成阶段:Comprehensive Analysis Agent生成最终报告

            第四层:数据层

            负责数据的持久化和访问,包含MySQL数据库和外部API。数据库设计包括:

            • gold_prices表:存储黄金历史价格

            • news_cache表:缓存新闻数据

            • analysis_history表:存储分析历史

              第五层:外部服务层

              集成第三方AI服务,包括GLM-4-Plus API和DeepSeek-V3 API。通过统一的API封装层隐藏实现细节,实现错误处理、重试机制和成本控制。

              2.2 Agent协作流程

              系统采用并行分析 + 结果融合的协作模式:

              用户请求
                  ↓
              API Gateway (FastAPI)
                  ↓
              并行触发三个Agent:
                  ├─ Market Analysis Agent (GLM-4-Plus)
                  ├─ News Intelligence Agent (GLM-4-Plus)
                  └─ Institution Research Agent (GLM-4-Plus)
                  ↓
              结构化数据收集
                  ↓
              Investment Advisory Agent (DeepSeek-V3)
                  ↓
              Comprehensive Analysis Agent (DeepSeek-V3)
                  ↓
              返回综合分析报告

              并行处理的优势:

              1. 时间节省:三个Agent同时运行,总时间约为单个Agent的1/3

              2. 容错性:单个Agent失败不影响整体系统

              3. 资源优化:充分利用多核CPU和I/O等待时间

                2.3 数据流设计

                数据流解析:

                整个数据流可以分为四个阶段:

                阶段一:请求接收与预处理

                API网关接收到用户请求后,执行参数验证、用户认证、请求限流和日志记录,然后创建唯一的任务ID。

                阶段二:并行分析处理

                三个Agent并行工作,互不干扰:

                • Market Analysis Agent:查询历史价格数据 → 计算技术指标(RSI、MACD、布林带) → 调用GLM-4-Plus进行技术面分析

                • News Intelligence Agent:调用GLM-4-Plus Web Search API → 解析新闻 → 情感分析 → 提取多空因子

                • Institution Research Agent:搜索机构研报 → 提取目标价位和观点 → 分析共识

                  阶段三:结果融合与深度推理

                  Investment Advisory Agent接收多源数据,使用DeepSeek-V3进行深度推理,生成投资策略。Comprehensive Analysis Agent整合所有分析,进行交叉验证和逻辑一致性校验。

                  阶段四:响应返回与展示

                  将所有分析结果封装为统一的JSON响应,前端进行数据可视化和卡片展示。


                  三、核心Agent实现详解

                  3.1 BaseAgent抽象基类设计

                  为了统一Agent的调用接口,我们设计了BaseAgent抽象基类,采用模板方法模式

                  from abc import ABC, abstractmethod
                  from langchain.llms.base import LLM
                  
                  class BaseAgent(ABC):
                      def __init__(self, llm: LLM, name: str):
                          self.llm = llm
                          self.name = name
                          self.prompt_template = self._build_prompt_template()
                      
                      @abstractmethod
                      def _build_prompt_template(self):
                          pass
                      
                      @abstractmethod
                      def analyze(self, context: dict) -> dict:
                          pass
                      
                      def _call_llm(self, prompt: str) -> str:
                          try:
                              response = self.llm(prompt)
                              return response
                          except Exception as e:
                              print(f"Error calling LLM for {self.name}: {e}")
                              return ""

                  设计要点:

                  • 模板方法模式:定义算法骨架,子类实现具体步骤

                  • 统一接口:所有Agent都实现analyze()方法

                  • 错误处理:统一的LLM调用异常处理

                    3.2 Market Analysis Agent实现

                    市场分析Agent负责技术面量化分析,核心流程如下:

                    class MarketAnalysisAgent(BaseAgent):
                        def analyze(self, context: dict) -> dict:
                            df = pd.DataFrame(context.get('price_data', []))
                            
                            technical_indicators = {
                                'rsi': self._calculate_rsi(df['close']),
                                'macd': self._calculate_macd(df['close']),
                                'bb_upper': self._calculate_bollinger_bands(df['close'])[0],
                                'bb_lower': self._calculate_bollinger_bands(df['close'])[1]
                            }
                            
                            prompt = self.prompt_template.format(
                                price_data=df.tail(10).to_json(),
                                **technical_indicators
                            )
                            
                            response = self._call_llm(prompt)
                            return self._parse_response(response)

                    技术亮点:

                    1. 技术指标计算:实现了RSI、MACD、布林带等经典技术指标

                    2. 结构化Prompt:使用PromptTemplate确保输入格式一致

                    3. JSON输出:强制LLM返回结构化JSON,便于后续处理

                      3.3 News Intelligence Agent实现

                      新闻分析Agent利用GLM-4-Plus的Web Search能力,实时抓取并分析新闻:

                      class NewsIntelligenceAgent(BaseAgent):
                          def analyze(self, context: dict) -> dict:
                              search_query = context.get('search_query', '黄金市场 最新新闻')
                              
                              news_list = self._search_news(search_query)
                              
                              prompt = self.prompt_template.format(
                                  news_list=self._format_news(news_list)
                              )
                              
                              response = self._call_llm(prompt)
                              return self._parse_response(response)
                          
                          def _search_news(self, query: str) -> list:
                              from zhipuai import ZhipuAI
                              client = ZhipuAI(api_key=os.getenv("ZHIPU_API_KEY"))
                              
                              response = client.chat.completions.create(
                                  model="glm-4-plus",
                                  messages=[{"role": "user", "content": f"搜索关于'{query}'的最新新闻"}],
                                  tools=[{"type": "web_search", "web_search": {"enable": True}}]
                              )
                              
                              return self._parse_search_results(response.choices[0].message.content)

                      技术亮点:

                      1. 实时搜索:利用GLM-4-Plus的Web Search API获取最新新闻

                      2. 情感分析:自动判断市场情绪倾向

                      3. 多空因子提取:智能识别看涨和看跌因素

                        3.4 Institution Research Agent实现

                        机构研究Agent追踪主流机构的黄金预测观点:

                        class InstitutionResearchAgent(BaseAgent):
                            def analyze(self, context: dict) -> dict:
                                institutions = ['高盛', '瑞银', '摩根士丹利', '花旗', '汇丰']
                                
                                institution_views = []
                                for institution in institutions:
                                    query = f"{institution} 黄金价格预测 2025"
                                    view = self._search_institution_view(query, institution)
                                    if view:
                                        institution_views.append(view)
                                
                                prompt = self.prompt_template.format(
                                    institution_views=self._format_views(institution_views)
                                )
                                
                                response = self._call_llm(prompt)
                                return self._parse_response(response)

                        3.5 Investment Advisory Agent实现(DeepSeek-V3)

                        投资建议Agent使用DeepSeek-V3进行深度推理,融合多源信息:

                        class InvestmentAdvisoryAgent(BaseAgent):
                            def __init__(self):
                                self.llm = DeepSeek(
                                    api_key=os.getenv("DEEPSEEK_API_KEY"),
                                    model="deepseek-chat",
                                    temperature=0.3
                                )
                                super().__init__(self.llm, "InvestmentAdvisoryAgent")
                            
                            def analyze(self, context: dict) -> dict:
                                prompt = self.prompt_template.format(
                                    technical_analysis=json.dumps(context.get('technical_analysis', {})),
                                    news_analysis=json.dumps(context.get('news_analysis', {})),
                                    institution_analysis=json.dumps(context.get('institution_analysis', {}))
                                )
                                
                                response = self._call_llm(prompt)
                                return self._parse_response(response)

                        技术亮点:

                        1. 多源信息融合:整合技术面、基本面、情绪面、机构观点

                        2. 深度推理:利用DeepSeek-V3的671B参数进行复杂推理

                        3. 结构化输出:生成可操作的投资建议

                          3.6 Comprehensive Analysis Agent实现

                          综合分析Agent生成最终的市场分析报告:

                          class ComprehensiveAnalysisAgent(BaseAgent):
                              def analyze(self, context: dict) -> dict:
                                  prompt = self.prompt_template.format(
                                      technical_analysis=json.dumps(context.get('technical_analysis', {})),
                                      news_analysis=json.dumps(context.get('news_analysis', {})),
                                      institution_analysis=json.dumps(context.get('institution_analysis', {})),
                                      investment_advice=json.dumps(context.get('investment_advice', {}))
                                  )
                                  
                                  response = self._call_llm(prompt)
                                  return self._parse_response(response)


                          四、ReAct推理与RAG检索增强

                          4.1 ReAct推理模式实现

                          ReAct(Reasoning + Acting)是Agent的核心推理模式,我们实现了完整的ReAct循环:

                          from langchain.agents import create_react_agent, AgentExecutor
                          
                          class ReActAgent:
                              def __init__(self, llm, tools):
                                  self.agent_executor = AgentExecutor(
                                      agent=create_react_agent(llm, tools, prompt),
                                      tools=tools,
                                      verbose=True,
                                      max_iterations=5
                                  )
                              
                              def run(self, query: str) -> dict:
                                  result = self.agent_executor.invoke({"input": query})
                                  return result

                          ReAct推理流程示例:

                          Thought: 我需要分析当前黄金市场的技术面,首先应该获取最近的价格数据
                          Action: get_historical_price
                          Action Input: {"period": "1M"}
                          Observation: 获取到最近30天的价格数据...
                          
                          Thought: 有了价格数据,现在需要计算技术指标
                          Action: calculate_technical_indicators
                          Action Input: {"price_data": "..."}
                          Observation: RSI=65.3, MACD=12.5...
                          
                          Thought: 综合技术面和消息面分析,可以给出结论
                          Final Answer: 基于技术指标RSI接近超买区域但MACD金叉,建议短期看涨但需警惕回调风险...

                          ...

                          4.2 RAG检索增强实现

                          RAG核心原理:

                          RAG(Retrieval-Augmented Generation)通过从知识库中检索相关上下文来增强LLM的生成能力,解决以下问题:

                          1. 实时信息:从实时更新的知识库中检索最新信息

                          2. 事实依据:检索到的文档作为生成答案的事实依据

                          3. 减少幻觉:基于检索到的真实内容生成答案

                          4. 可追溯性:可以引用检索到的文档来源

                            RAG系统架构:

                            from langchain.vectorstores import FAISS
                            from langchain.embeddings import OpenAIEmbeddings
                            from langchain.chains import RetrievalQA
                            
                            class RAGEnhancedAgent:
                                def __init__(self, llm, vector_store_path="./vector_store"):
                                    self.llm = llm
                                    self.vector_store = FAISS.load_local(
                                        vector_store_path,
                                        OpenAIEmbeddings()
                                    )
                                    self.qa_chain = RetrievalQA.from_chain_type(
                                        llm=llm,
                                        chain_type="stuff",
                                        retriever=self.vector_store.as_retriever(search_kwargs={"k": 3})
                                    )
                                
                                def query(self, question: str) -> str:
                                    result = self.qa_chain({"query": question})
                                    return result["result"]

                            RAG在GoldMind中的应用:

                            1. 历史分析报告检索:当用户询问历史类似市场情况时,检索相关报告

                            2. 新闻舆情分析:检索过去24小时的新闻缓存和历史事件影响

                            3. 机构观点对比:检索机构历史预测准确率和观点

                              性能优化:

                              • 缓存机制:相同查询直接返回缓存结果

                              • 批量检索:合并多个用户的相同请求

                              • 异步处理:使用asyncio实现异步查询


                                五、FastAPI后端实现

                                5.1 API路由设计

                                FastAPI的路由设计遵循RESTful API设计原则,提供清晰、直观的API接口。

                                核心路由:

                                from fastapi import FastAPI, BackgroundTasks
                                from pydantic import BaseModel
                                
                                app = FastAPI(title="GoldMind API", version="1.0.0")
                                
                                class AnalysisRequest(BaseModel):
                                    timeframe: str = "1D"
                                    include_news: bool = True
                                    include_institutions: bool = True
                                
                                @app.post("/api/analysis/comprehensive")
                                async def comprehensive_analysis(
                                    request: AnalysisRequest,
                                    background_tasks: BackgroundTasks
                                ):
                                    task_id = str(uuid.uuid4())
                                    
                                    background_tasks.add_task(
                                        run_analysis_task,
                                        task_id,
                                        request
                                    )
                                    
                                    return {"success": True, "task_id": task_id}
                                
                                @app.get("/api/analysis/status/{task_id}")
                                async def get_task_status(task_id: str):
                                    return get_task_info(task_id)
                                
                                @app.get("/api/price/current")
                                async def get_current_price():
                                    engine = create_engine(os.getenv("DATABASE_URL"))
                                    df = pd.read_sql("SELECT * FROM gold_prices ORDER BY date DESC LIMIT 1", engine)
                                    return {"success": True, "data": df.iloc[0].to_dict()}

                                路由设计要点:

                                1. 统一响应格式:所有API返回统一的JSON格式

                                2. 参数验证:使用Pydantic模型自动验证参数

                                3. 异步处理:使用BackgroundTasks实现异步任务

                                4. 错误处理:区分404(无数据)和500(服务器错误)

                                  中间件设计:

                                  from fastapi.middleware.cors import CORSMiddleware
                                  
                                  app.add_middleware(
                                      CORSMiddleware,
                                      allow_origins=["http://localhost:5173"],
                                      allow_credentials=True,
                                      allow_methods=["*"],
                                      allow_headers=["*"],
                                  )
                                  
                                  @app.middleware("http")
                                  async def log_requests(request, call_next):
                                      start_time = time.time()
                                      response = await call_next(request)
                                      process_time = time.time() - start_time
                                      response.headers["X-Process-Time"] = str(process_time)
                                      return response

                                  5.2 异步任务处理

                                  综合分析任务通常需要30-60秒,使用同步处理会导致HTTP请求超时。因此,我们实现了异步任务处理机制。

                                  任务队列实现:

                                  import redis
                                  import json
                                  
                                  class RedisTaskQueue:
                                      def __init__(self, redis_url: str):
                                          self.redis = redis.from_url(redis_url)
                                          self.task_queue_key = "goldmind:task:queue"
                                      
                                      async def add_task(self, task_id: str, task_data: dict):
                                          task_info = {
                                              "task_id": task_id,
                                              "status": "pending",
                                              "created_at": datetime.now().isoformat()
                                          }
                                          
                                          self.redis.lpush(self.task_queue_key, json.dumps(task_info))
                                          self.redis.set(f"task:info:{task_id}", json.dumps(task_info), ex=3600)
                                      
                                      async def get_task(self) -> dict:
                                          task_json = self.redis.brpop(self.task_queue_key, timeout=5)
                                          if task_json:
                                              return json.loads(task_json[1])
                                          return None

                                  任务执行器:

                                  import asyncio
                                  from concurrent.futures import ThreadPoolExecutor
                                  
                                  class TaskExecutor:
                                      def __init__(self, task_queue, max_workers=5):
                                          self.task_queue = task_queue
                                          self.executor = ThreadPoolExecutor(max_workers=max_workers)
                                      
                                      async def start(self):
                                          while True:
                                              task = await self.task_queue.get_task()
                                              if task:
                                                  asyncio.create_task(self._execute_task(task))
                                      
                                      async def _execute_task(self, task: dict):
                                          task_id = task["task_id"]
                                          
                                          try:
                                              results = await self._run_parallel_analysis(task)
                                              await self.task_queue.complete_task(task_id, results)
                                          except Exception as e:
                                              await self.task_queue.fail_task(task_id, str(e))
                                      
                                      async def _run_parallel_analysis(self, task: dict) -> dict:
                                          loop = asyncio.get_event_loop()
                                          
                                          tasks = [
                                              loop.run_in_executor(self.executor, market_agent.analyze, {}),
                                              loop.run_in_executor(self.executor, news_agent.analyze, {}),
                                              loop.run_in_executor(self.executor, institution_agent.analyze, {})
                                          ]
                                          
                                          results = await asyncio.gather(*tasks, return_exceptions=True)
                                          
                                          return {
                                              "technical": results[0],
                                              "news": results[1],
                                              "institution": results[2]
                                          }

                                  进度推送(WebSocket):

                                  from fastapi import WebSocket
                                  
                                  @app.websocket("/ws/analysis/{task_id}")
                                  async def websocket_analysis_progress(websocket: WebSocket, task_id: str):
                                      await websocket.accept()
                                      
                                      try:
                                          while True:
                                              task_info = get_task_info(task_id)
                                              await websocket.send_json(task_info)
                                              
                                              if task_info["status"] in ["completed", "failed"]:
                                                  break
                                              
                                              await asyncio.sleep(2)
                                      finally:
                                          await websocket.close()

                                  异步任务处理的优势:

                                  1. 用户体验:用户无需长时间等待,可以立即得到响应

                                  2. 系统稳定性:避免长时间连接导致的超时问题

                                  3. 资源利用:可以并发处理多个任务

                                  4. 容错性:任务失败可以重试,不影响其他任务

                                  六、前端可视化实现

                                  6.1 React组件架构

                                  前端采用React 18构建,使用Ant Design组件库和Recharts图表库。

                                  核心组件结构:

                                  import React, { useState, useEffect } from 'react';
                                  import axios from 'axios';
                                  import { LineChart, Line, XAxis, YAxis, CartesianGrid, Tooltip, Legend } from 'recharts';
                                  import { Card, Row, Col, Spin, Tag, Progress } from 'antd';
                                  
                                  const Dashboard = () => {
                                    const [loading, setLoading] = useState(false);
                                    const [priceData, setPriceData] = useState([]);
                                    const [analysisData, setAnalysisData] = useState(null);
                                    const [taskId, setTaskId] = useState(null);
                                  
                                    const startAnalysis = async () => {
                                      const response = await axios.post('/api/analysis/async', {
                                        timeframe: '1D',
                                        include_news: true,
                                        include_institutions: true
                                      });
                                      
                                      setTaskId(response.data.task_id);
                                      pollTaskStatus(response.data.task_id);
                                    };
                                  
                                    const pollTaskStatus = async (taskId) => {
                                      const interval = setInterval(async () => {
                                        const response = await axios.get(`/api/analysis/status/${taskId}`);
                                        const data = response.data.data;
                                        
                                        if (data.status === 'completed') {
                                          clearInterval(interval);
                                          setAnalysisData(data.result);
                                          setLoading(false);
                                        }
                                      }, 2000);
                                    };
                                  
                                    return (
                                      <div className="dashboard">
                                        <Row gutter={[16, 16]}>
                                          <Col span={24}>
                                            <Card title="黄金价格走势">
                                              <ResponsiveContainer width="100%" height={400}>
                                                <LineChart data={priceData}>
                                                  <CartesianGrid strokeDasharray="3 3" />
                                                  <XAxis dataKey="date" />
                                                  <YAxis />
                                                  <Tooltip />
                                                  <Legend />
                                                  <Line type="monotone" dataKey="close" stroke="#8884d8" name="收盘价" />
                                                </LineChart>
                                              </ResponsiveContainer>
                                            </Card>
                                          </Col>
                                          
                                          {analysisData && (
                                            <>
                                              <Col span={12}>
                                                <Card title="技术面分析">
                                                  <AnalysisCard data={analysisData.technical_analysis} />
                                                </Card>
                                              </Col>
                                              <Col span={12}>
                                                <Card title="投资建议">
                                                  <InvestmentAdviceCard data={analysisData.investment_advice} />
                                                </Card>
                                              </Col>
                                            </>
                                          )}
                                        </Row>
                                      </div>
                                    );
                                  };
                                  
                                  

                                  组件设计要点:

                                  1. 状态管理:使用React Hooks管理组件状态

                                  2. 异步处理:使用axios进行HTTP请求,配合async/await

                                  3. 数据可视化:使用Recharts绘制价格走势图

                                  4. 实时更新:通过轮询实现分析进度的实时展示


                                    七、部署与运维

                                    7.1 Docker部署方案

                                    使用Docker和Docker Compose实现容器化部署,简化部署流程。

                                    Dockerfile(后端):

                                    FROM python:3.11-slim
                                    
                                    WORKDIR /app
                                    
                                    COPY requirements.txt .
                                    RUN pip install --no-cache-dir -r requirements.txt
                                    
                                    COPY . .
                                    
                                    CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]

                                    Dockerfile(前端):

                                    FROM node:18-alpine
                                    
                                    WORKDIR /app
                                    
                                    COPY package*.json ./
                                    RUN npm install
                                    
                                    COPY . .
                                    
                                    RUN npm run build
                                    
                                    CMD ["npm", "run", "preview"]

                                    docker-compose.yml:

                                    version: '3.8'
                                    
                                    services:
                                      mysql:
                                        image: mysql:8.0
                                        environment:
                                          MYSQL_ROOT_PASSWORD: ${MYSQL_ROOT_PASSWORD}
                                          MYSQL_DATABASE: gold_analysis
                                        volumes:
                                          - mysql_data:/var/lib/mysql
                                        ports:
                                          - "3306:3306"
                                    
                                      backend:
                                        build: ./backend
                                        ports:
                                          - "8000:8000"
                                        environment:
                                          DATABASE_URL: mysql+pymysql://root:${MYSQL_ROOT_PASSWORD}@mysql:3306/gold_analysis
                                          ZHIPU_API_KEY: ${ZHIPU_API_KEY}
                                          DEEPSEEK_API_KEY: ${DEEPSEEK_API_KEY}
                                        depends_on:
                                          - mysql
                                    
                                      frontend:
                                        build: ./app
                                        ports:
                                          - "80:80"
                                        depends_on:
                                          - backend
                                    
                                    volumes:
                                      mysql_data:

                                    部署步骤:

                                    # 1. 配置环境变量
                                    cp backend/.env.example backend/.env
                                    # 编辑.env文件,填入必要的API密钥
                                    
                                    # 2. 启动服务
                                    docker-compose up -d --build
                                    
                                    # 3. 查看日志
                                    docker-compose logs -f backend
                                    
                                    # 4. 访问应用
                                    # 前端: http://localhost
                                    # 后端: http://localhost:8000
                                    # API文档: http://localhost:8000/docs

                                    7.2 监控与日志

                                    实现完整的监控和日志系统,便于问题排查和性能优化。

                                    日志记录

                                    import logging
                                    from prometheus_client import Counter, Histogram, generate_latest
                                    
                                    logging.basicConfig(
                                        level=logging.INFO,
                                        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
                                    )
                                    
                                    logger = logging.getLogger(__name__)
                                    
                                    request_count = Counter('api_requests_total', 'Total API requests', ['method', 'endpoint'])
                                    request_duration = Histogram('api_request_duration_seconds', 'API request duration')
                                    
                                    @app.middleware("http")
                                    async def log_requests(request, call_next):
                                        start_time = time.time()
                                        
                                        logger.info(f"Request: {request.method} {request.url.path}")
                                        request_count.labels(method=request.method, endpoint=request.url.path).inc()
                                        
                                        with request_duration.time():
                                            response = await call_next(request)
                                        
                                        duration = time.time() - start_time
                                        logger.info(f"Response: {response.status_code} - Duration: {duration:.2f}s")
                                        
                                        return response
                                    
                                    @app.get("/metrics")
                                    async def metrics():
                                        return Response(content=generate_latest(), media_type="text/plain")

                                    监控指标:

                                    • 请求计数:记录API调用次数

                                    • 响应时间:记录API响应时间

                                    • 错误率:记录API错误率

                                    • 任务队列长度:监控任务队列积压情况


                                      八、性能优化与最佳实践

                                      8.1 缓存策略

                                      实现多级缓存策略,减少重复计算和API调用。

                                      Redis缓存:

                                      import redis
                                      import json
                                      from functools import wraps
                                      
                                      redis_client = redis.Redis(host='localhost', port=6379, db=0)
                                      
                                      def cache_result(key: str, ttl: int = 3600):
                                          def decorator(func):
                                              @wraps(func)
                                              def wrapper(*args, **kwargs):
                                                  cache_key = f"{key}:{str(args)}:{str(kwargs)}"
                                                  
                                                  cached = redis_client.get(cache_key)
                                                  if cached:
                                                      return json.loads(cached)
                                                  
                                                  result = func(*args, **kwargs)
                                                  
                                                  redis_client.setex(cache_key, ttl, json.dumps(result))
                                                  
                                                  return result
                                              return wrapper
                                          return decorator
                                      
                                      @cache_result("market_analysis", ttl=1800)
                                      def analyze_market(context: dict) -> dict:
                                          pass

                                      缓存策略:

                                      1. 价格数据:永久缓存,每日更新

                                      2. 新闻数据:缓存1小时,每小时更新

                                      3. 机构观点:缓存4小时,每4小时更新

                                      4. 分析结果:缓存30分钟,相同参数直接返回

                                        8.2 并发控制

                                        实现并发控制和资源管理,提升系统性能。

                                        并发处理:

                                        import asyncio
                                        from concurrent.futures import ThreadPoolExecutor
                                        
                                        executor = ThreadPoolExecutor(max_workers=10)
                                        
                                        async def parallel_analysis(context: dict) -> dict:
                                            loop = asyncio.get_event_loop()
                                            
                                            tasks = [
                                                loop.run_in_executor(executor, market_agent.analyze, context),
                                                loop.run_in_executor(executor, news_agent.analyze, context),
                                                loop.run_in_executor(executor, institution_agent.analyze, context)
                                            ]
                                            
                                            results = await asyncio.gather(*tasks, return_exceptions=True)
                                            
                                            return {
                                                "technical": results[0],
                                                "news": results[1],
                                                "institution": results[2]
                                            }

                                        并发控制策略:

                                        1. 线程池:限制并发线程数,避免资源耗尽

                                        2. 异步I/O:使用asyncio实现异步I/O操作

                                        3. 连接池:使用数据库连接池,减少连接开销

                                        4. 限流:使用令牌桶算法实现API限流


                                          九、踩坑经验总结

                                          9.1 LLM调用稳定性问题

                                          问题:LLM API调用经常超时或失败

                                          解决方案

                                          from tenacity import retry, stop_after_attempt, wait_exponential
                                          
                                          @retry(
                                              stop=stop_after_attempt(3),
                                              wait=wait_exponential(multiplier=1, min=4, max=10)
                                          )
                                          def call_llm_with_retry(prompt: str) -> str:
                                              try:
                                                  response = llm(prompt, timeout=30)
                                                  return response
                                              except Exception as e:
                                                  logger.error(f"LLM call failed: {e}")
                                                  raise

                                          9.2 JSON解析失败问题

                                          问题:LLM返回的JSON格式不规范

                                          解决方案

                                          import json
                                          import re
                                          
                                          def robust_json_parse(response: str) -> dict:
                                              try:
                                                  return json.loads(response)
                                              except json.JSONDecodeError:
                                                  try:
                                                      json_match = re.search(r'\{.*\}', response, re.DOTALL)
                                                      if json_match:
                                                          return json.loads(json_match.group())
                                                  except:
                                                      pass
                                                  
                                                  return {"raw_response": response, "parse_error": "Failed to parse as JSON"}

                                          十、未来规划

                                          1. 增加更多Agent:宏观经济分析Agent、地缘政治分析Agent

                                          2. 强化学习优化:基于历史数据优化Agent决策

                                          3. 多语言支持:支持英文、日文等多语言分析

                                          4. 移动端适配:开发移动端应用

                                          5. 社区功能:用户分享、讨论、跟单功能


                                            结语

                                            本文详细介绍了如何从零构建一个基于LangChain Multi-Agent的黄金市场智能分析引擎。通过GLM-4-Plus和DeepSeek-V3的双引擎架构,我们实现了实时搜索、深度推理、多源信息融合的完整分析流程。

                                            希望这篇文章能够帮助到正在学习AI Agent技术的开发者。如果你对项目有任何问题或建议,欢迎在GitHub上提Issue或PR。

                                            项目地址GoldMind

                                            如果本文对你有帮助,欢迎给项目一个⭐ Star,你的支持是我持续更新的动力!


                                            参考资料:

                                            Logo

                                            有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

                                            更多推荐