收藏 | 程序员小白也能学会:一步步打造生产级大模型概念验证 (POC)
本文通过构建一个面向SaaS产品的实时AI支持助手,详细讲解了如何使用LangGraph、FastAPI和Streamlit技术栈来设计一个具备生产思维的概念验证(POC)。文章涵盖了项目搭建、强类型状态设计、分层架构、端到端数据流、可观测性、安全注意事项以及如何将POC扩展为生产系统等内容,旨在帮助开发者理解并实践大模型在真实场景中的应用。
一个可用于生产环境的概念验证(逐步详解)

1. 引言
大多数 AI 演示在第一眼看上去令人印象深刻,但在真实场景中往往难以落地。它们通常存在以下问题:
- • 只能处理单次 Prompt
- • 无法在多轮交互中保持上下文
- • 将 UI 与 AI 逻辑混在一起
- • 随着复杂度提升迅速失控
真正的 AI 应用需要 状态管理(state)、结构化设计(structure) 以及 显式控制流(explicit control flow)。
在本文中,我们将构建一个小型但具备生产思维的概念验证(POC),使用以下技术栈:
- • LangGraph —— 用于建模具备状态的多步骤 AI 工作流
- • FastAPI —— 将 AI 逻辑封装为清晰的后端服务
- • Streamlit —— 构建响应式、实时的用户界面
这个 POC 在功能上刻意保持简单,但在架构设计上遵循真实生产级 AI 系统所采用的原则。
2. 我们要构建什么?
—— 一个面向 SaaS 产品的实时(请求–响应式)AI 支持助手
问题(Problem)
- • 客服团队反复回答相同问题
- • 用户期望即时响应
- • 传统聊天机器人在多轮追问中容易失效
解决方案(Solution)
构建一个实时 AI 助手,它能够:
- • 理解用户意图(Intent Understanding)
- • 进行多步骤推理(Multi-step Reasoning)
- • 维护对话状态(Conversation State)
- • 通过 UI 实时返回响应
为什么这个用例重要
- • 在 SaaS、内部工具和 AI 平台中需求极高
- • 表面简单,但正确实现难度很大
- • 是展示 LangGraph 优势的理想场景
3. 为什么选择 LangGraph + FastAPI + Streamlit?
LangGraph(AI 工作流引擎)
LangGraph 允许你:
- • 将 AI 逻辑表示为图结构(Graph),而不是线性链式流程
- • 在多个步骤之间维护状态(Stateful Execution)
- • 添加验证(Validation)、路由(Routing)和控制逻辑(Control Flow)
- • 对每一个推理步骤进行调试与可观测分析
它本质上是一个 具备显式控制流的 AI 工作流编排引擎。
FastAPI(后端 API 层)
FastAPI 提供:
- • 高性能(基于 ASGI)
- • UI 与 AI 逻辑的清晰分离
- • 易于部署与横向扩展
它负责将 AI 能力封装为标准 API 服务。
Streamlit(前端 UI 层)
Streamlit 支持:
- • 快速构建交互式界面
- • 实时用户交互
- • 极低的前端开发复杂度
适合快速构建可运行的产品级 POC。
三者协同架构
Streamlit(UI) → FastAPI(API 层) → LangGraph(AI 逻辑层)
这种分层解耦对于生产环境至关重要:
- • UI 不承担 AI 逻辑
- • 后端不混入展示代码
- • AI 工作流可独立演进
此外,LangGraph 提供了节点级(node-level)的日志记录与调试能力,非常适合做系统可观测性(Observability)。
4. 高层架构(High-Level Architecture)
+--------------+| Streamlit UI |+------+-------+ | v+------------------+| FastAPI Server |+--------+---------+ | v+------------------+| LangGraph Engine |+------------------+ | v LLM Provider
5. 端到端数据流(End-to-End Data Flow)
User Input | v[Streamlit UI] | v[FastAPI Endpoint] | v[Intent Node] | v[Response Node] | v[Safety / Post-Process Node] | v[Final AI Response]
6. 项目结构
realtime_ai_poc/│├── backend/│ ├── __init__.py│ ├── main.py # FastAPI application│ ├── graph.py # LangGraph workflow definition│ ├── config.py # Environment & settings management│ └── logging_config.py # Logging configuration│├── frontend/│ ├── app.py # Streamlit UI│ └── api_client.py # Backend communication layer│├── .env # Local environment variables (not committed)├── .gitignore├── requirements.txt└── README.md# Note: backend/config.py:定义并管理 BaseSettings(统一配置入口)# backend/logging_config.py:集中化管理日志配置(Centralized Logging Configuration)
7. 环境搭建
创建虚拟环境
mamba create -n venv python=3.10mamba activate venv
安装依赖项
pip install -r requirements.txt### requirements.txtfastapi>=0.110uvicorn[standard]>=0.27streamlit>=1.32requests>=2.31langgraph>=0.0.30langchain-openai>=0.1.0langchain-community>=0.0.1 # if using langchain_community vectorstoreschromadb>=0.3.0 # Chroma backing storepython-dotenv>=1.0pydantic>=2.0
设置您的 API 密钥:
OPENAI_API_KEY=your_api_key_here
添加.gitignore:
.env__pycache__/.venv/
8. LangGraph:强类型状态(Typed State)
为什么强类型状态很重要?
- • 使数据流显式化(Explicit Data Flow)
- • 防止隐式错误(Silent Bugs)
- • 提升代码可读性与调试效率
在基于 LangGraph 构建的工作流中,状态(State)是贯穿多个节点的核心载体。使用强类型定义状态,可以:
- • 明确每个节点的输入与输出
- • 在编译阶段捕获类型错误
- • 避免运行时因字段缺失或格式错误导致的问题
相关实现位于:backend/graph.py
# backend/graph.pyfrom __future__ import annotationsimport loggingimport osfrom typing import Optional, Dict, Any, Listfrom dotenv import load_dotenvtry: from typing import TypedDictexcept ImportError: from typing_extensions import TypedDictfrom langgraph.graph import StateGraphfrom langchain_openai import ChatOpenAI, OpenAIEmbeddingsfrom langchain_community.vectorstores import Chromaload_dotenv()logger = logging.getLogger(__name__)logger.setLevel(logging.INFO)# ============================================================# Model Initialization# ============================================================OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")llm = ChatOpenAI( model=os.getenv("LLM_MODEL", "gpt-4o-mini"), temperature=float(os.getenv("LLM_TEMPERATURE", "0.0")), api_key=OPENAI_API_KEY or None,)# ============================================================# Vector Store (RAG Setup)# ============================================================EMBEDDINGS_MODEL = OpenAIEmbeddings(api_key=OPENAI_API_KEY)VECTOR_DB_DIR = os.getenv("VECTOR_DB_DIR", "./chroma_db")vectorstore = Chroma( persist_directory=VECTOR_DB_DIR, embedding_function=EMBEDDINGS_MODEL,)retriever = vectorstore.as_retriever(search_kwargs={"k": 3})# ============================================================# State Definition# ============================================================VALID_INTENTS = {"billing", "technical", "account", "general"}class ChatState(TypedDict, total=False): message: str intent: Optional[str] context: Optional[str] response: Optional[str]# ============================================================# Helper: Extract text safely from LLM result# ============================================================def _extract_text_from_result(res: Any) -> str: if res is None: return "" if hasattr(res, "content"): return (res.content or "").strip() if hasattr(res, "text"): return (res.text or "").strip() try: d = res.dict() if isinstance(d, dict): return (d.get("content") or d.get("text") or "").strip() except Exception: pass return str(res).strip()async def _llm_ainvoke_safe(prompt: str) -> str: if hasattr(llm, "ainvoke"): try: res = await llm.ainvoke(prompt) return _extract_text_from_result(res) except Exception: logger.exception("ainvoke failed") if hasattr(llm, "apredict"): try: res = await llm.apredict(prompt) if isinstance(res, str): return res.strip() return _extract_text_from_result(res) except Exception: logger.exception("apredict failed") if hasattr(llm, "invoke"): try: res = llm.invoke(prompt) return _extract_text_from_result(res) except Exception: logger.exception("invoke failed") return ""# ============================================================# Node 1: Intent Classification# ============================================================async def intent_node(state: ChatState) -> Dict[str, str]: message = state.get("message", "").strip() if not message: raise ValueError("Empty message received in intent_node") prompt = ( "You are an intent classifier for a SaaS support system.\n" "Classify the user message into exactly one of:\n" "billing, technical, account, general.\n\n" "Return ONLY the single word category.\n\n" f"User message:\n{message}\n\nCategory:" ) raw = await _llm_ainvoke_safe(prompt) intent = raw.lower().strip() if intent not in VALID_INTENTS: intent = "general" logger.info("Intent classified: %s", intent) return {"intent": intent}# ============================================================# Node 2: Retrieval (RAG)# ============================================================async def retrieve_node(state: ChatState) -> Dict[str, str]: message = state.get("message", "").strip() if not message: return {"context": ""} try: docs = retriever.get_relevant_documents(message) context_chunks: List[str] = [doc.page_content for doc in docs] context = "\n\n".join(context_chunks) logger.info("Retrieved %d context documents", len(context_chunks)) return {"context": context} except Exception: logger.exception("Retrieval failed") return {"context": ""}# ============================================================# Node 3: Response Generation (Grounded)# ============================================================async def response_node(state: ChatState) -> Dict[str, str]: message = state.get("message", "").strip() intent = state.get("intent", "general") context = state.get("context", "") prompt = ( "You are a professional SaaS support assistant.\n\n" "Use ONLY the provided product documentation to answer.\n" "If the documentation does not contain the answer, say:\n" "'I couldn't find this in our documentation. Let me connect you with support.'\n\n" "============================\n" "Product Documentation:\n" f"{context}\n" "============================\n\n" f"User intent: {intent}\n" f"User message: {message}\n\n" "Provide a concise, accurate, actionable answer." ) raw = await _llm_ainvoke_safe(prompt) response = raw.strip() logger.info("Response generated (len=%d)", len(response)) return {"response": response}# ============================================================# Node 4: Safety Layer# ============================================================def safety_node(state: ChatState) -> Dict[str, str]: response = state.get("response", "") restricted_keywords = ["medical advice", "legal advice"] if any(keyword in response.lower() for keyword in restricted_keywords): return { "response": ( "I'm unable to provide medical or legal advice. " "Please consult a qualified professional." ) } return {"response": response}# ============================================================# Build LangGraph Workflow# ============================================================graph = StateGraph(ChatState)graph.add_node("intent", intent_node)graph.add_node("retrieve", retrieve_node)graph.add_node("response", response_node)graph.add_node("safety", safety_node)graph.set_entry_point("intent")graph.add_edge("intent", "retrieve")graph.add_edge("retrieve", "response")graph.add_edge("response", "safety")graph.set_finish_point("safety")app_graph = graph.compile()
- FastAPI Backend
# backend/main.pyimport asyncioimport loggingimport uuidfrom typing import Optionalfrom fastapi import FastAPI, HTTPException, Requestfrom pydantic import BaseModel, BaseSettingsfrom fastapi.middleware.cors import CORSMiddleware# Ensure backend/__init__.py existsfrom .graph import app_graph# ============================================================# Logging Setup# ============================================================logging.basicConfig( level=logging.INFO, format="%(asctime)s | %(levelname)s | %(name)s | %(message)s",)logger = logging.getLogger("realtime_ai_backend")# ============================================================# Settings# ============================================================class Settings(BaseSettings): ALLOWED_ORIGINS: list[str] = ["http://localhost:8501"] TIMEOUT_SECONDS: int = 20 MAX_INPUT_LENGTH: int = 2000 class Config: env_file = ".env"settings = Settings()# ============================================================# FastAPI App# ============================================================app = FastAPI( title="Real-Time AI Support Assistant API", version="1.0.0",)app.add_middleware( CORSMiddleware, allow_origins=settings.ALLOWED_ORIGINS, allow_credentials=True, allow_methods=["POST", "OPTIONS"], allow_headers=["*"],)# ============================================================# Models# ============================================================class Query(BaseModel): message: strclass ChatResponse(BaseModel): response: str request_id: str# ============================================================# Health Check (Production Standard)# ============================================================@app.get("/health")async def health(): return {"status": "ok"}# ============================================================# Chat Endpoint# ============================================================@app.post("/chat", response_model=ChatResponse)async def chat(query: Query, request: Request): request_id = str(uuid.uuid4()) logger.info("Request %s received", request_id) text = query.message.strip() # ---- Input Validation ---- if not text: raise HTTPException(status_code=400, detail="Please enter a valid question.") if len(text) > settings.MAX_INPUT_LENGTH: raise HTTPException( status_code=400, detail=f"Message too long (max {settings.MAX_INPUT_LENGTH} characters).", ) # ---- Invoke Graph ---- try: if hasattr(app_graph, "ainvoke"): result = await asyncio.wait_for( app_graph.ainvoke({"message": text}), timeout=settings.TIMEOUT_SECONDS, ) else: loop = asyncio.get_running_loop() result = await loop.run_in_executor( None, app_graph.invoke, {"message": text} ) except asyncio.TimeoutError: logger.exception("Request %s timed out", request_id) raise HTTPException( status_code=504, detail="Processing timed out. Please try again.", ) except Exception: logger.exception("Request %s failed internally", request_id) raise HTTPException( status_code=500, detail="Internal server error.", ) # ---- Extract Response ---- response_text: Optional[str] = None if isinstance(result, dict): response_text = result.get("response") else: response_text = getattr(result, "response", None) if not response_text: logger.error("Request %s returned empty response", request_id) raise HTTPException( status_code=500, detail="No response from model pipeline.", ) logger.info("Request %s completed successfully", request_id) return ChatResponse( response=response_text, request_id=request_id, )
- Streamlit Frontend
# frontend/app.pyimport streamlit as stimport requestsfrom requests.exceptions import RequestException# ============================================================# Configuration# ============================================================API_URL = "http://localhost:8000/chat"TIMEOUT_SECONDS = 25 # Slightly higher than backend timeoutMAX_INPUT_LENGTH = 2000 # Keep aligned with backend# ============================================================# Page Setup# ============================================================st.set_page_config( page_title="Real-Time AI Support Assistant", page_icon="🤖", layout="centered",)st.title("🤖 Real-Time AI Support Assistant")st.caption("Intent-aware • Retrieval-augmented • SaaS-ready architecture")# ============================================================# Session State# ============================================================if "history" not in st.session_state: st.session_state.history = []if "last_request_id" not in st.session_state: st.session_state.last_request_id = None# ============================================================# Input Form# ============================================================with st.form("chat_form", clear_on_submit=True): user_input = st.text_input( "Ask your question", max_chars=MAX_INPUT_LENGTH, placeholder="e.g., How do I upgrade my billing plan?", ) submitted = st.form_submit_button("Send")# ============================================================# Submit Handling# ============================================================if submitted: if not user_input or not user_input.strip(): st.warning("Please enter a valid question.") else: placeholder = st.empty() try: with placeholder.container(): st.info("Sending request to backend...") response = requests.post( API_URL, json={"message": user_input.strip()}, timeout=TIMEOUT_SECONDS, ) response.raise_for_status() data = response.json() answer = data.get("response", "No response returned.") request_id = data.get("request_id") # Store conversation st.session_state.history.append(("You", user_input.strip())) st.session_state.history.append(("Assistant", answer)) st.session_state.last_request_id = request_id except RequestException as e: st.error("Backend request failed.") st.exception(e) finally: placeholder.empty()# ============================================================# Chat History Rendering# ============================================================if st.session_state.history: st.divider() st.subheader("Conversation") for speaker, text in st.session_state.history: if speaker == "You": st.markdown(f"**🧑 You:** {text}") else: st.markdown(f"**🤖 Assistant:** {text}")# ============================================================# Footer Info# ============================================================if st.session_state.last_request_id: st.divider() st.caption(f"Request ID: `{st.session_state.last_request_id}`")
注意:关键词过滤仅用于演示级别。生产环境中应使用专业的内容审核(Moderation)API。
- 运行 POC
🔹 第 1 步:激活虚拟环境
在项目根目录执行:
mamba activate venv
🔹 第 2 步:启动后端(在项目根目录)
uvicorn backend.main:app --reload
为什么这样做?
- • 确保 backend 被视为 Python 包
- • 避免相对导入错误
- • 在不同环境下保持一致行为
后端服务运行地址:
http://localhost:8000
🔹 第 3 步:启动前端(在项目根目录)
打开一个新的终端窗口:
streamlit run frontend/app.py
前端访问地址:
http://localhost:8501
🔹 第 4 步:验证系统是否正常运行
打开:
📘 API 文档 → http://localhost:8000/docs
💬 UI 界面 → http://localhost:8501
- 关于“实时”的说明
本 POC 实际提供的能力
该实现提供的是近实时(near real-time)的请求–响应式 AI 交互,具体流程如下:
-
- 前端向 FastAPI 后端发送请求
-
- 后端调用 LangGraph 工作流
-
- 模型生成完整响应
-
- 返回结果并在 UI 中渲染
⚠ 注意:
- • 不支持逐 token 流式输出
- • 只有在生成完成后才返回完整响应
- 可观测性与调试(Observability & Debugging)
优秀的 AI 系统必须具备可观测性。
在本 POC 中:
- • 每个节点都会记录状态变更
- • 意图识别、响应生成与安全检查步骤可追踪
- • 可以轻松扩展接入 LangSmith 或 OpenTelemetry
在真实系统中,这一点至关重要。
- 性能优化建议
- • 保持 Prompt 简洁
- • 避免不必要的 LLM 调用
- • 缓存高频问题
- • 对简单请求进行短路处理(short-circuit)
- • 控制图结构复杂度
- 安全注意事项
- • 永远不要在前端暴露 API Key
- • 验证用户输入
- • 防御 Prompt 注入攻击
- • 为后端接口设置限流(Rate Limiting)
- • 避免记录敏感信息到日志
- 常见误区
❌ 误区 1:用 LangGraph 处理简单单步任务
如果只有单步 Prompt,LangGraph 会增加不必要的复杂性。
❌ 误区 2:UI 与 AI 逻辑强耦合
让 Streamlit 专注展示层,推理与编排逻辑交给后端。
❌ 误区 3:没有显式状态定义
没有清晰的状态模型,工作流会变得难以调试与推理。
❌ 误区 4:缺乏可观测性与日志
当系统出问题时,无法定位是哪个节点出错。
❌ 误区 5:模糊使用“实时”概念
需要明确区分:
- • 低延迟
- • 流式输出
- • 异步执行
- 什么时候不需要 LangGraph?
不适用场景:
- • 单次 Prompt
- • 无状态或无分支逻辑
- • 简单文本生成
适用场景:
- • 状态至关重要
- • 决策依赖前序步骤
- • 需要可控性与可靠性
- 如何将 POC 扩展为生产系统?
你可以通过以下方式升级该系统:
1️⃣ 流式响应
- • 使用 LangGraph 的
astream() - • 在 FastAPI 中返回
StreamingResponse - • 在 Streamlit 中逐 token 渲染
2️⃣ 会话记忆
- • 扩展
ChatState增加历史记录 - • 使用 Redis 或数据库持久化会话
- • 支持多轮上下文对话
3️⃣ 知识库(RAG)
- • 在响应生成前添加检索节点
- • 集成向量搜索(如 FAISS、Pinecone 等)
- • 基于公司真实数据进行回答
4️⃣ 身份认证与多用户支持
- • 在 FastAPI 中接入 JWT 或 OAuth
- • 将对话与用户 ID 关联
- • 添加限流机制
5️⃣ 容器化与部署
- • 为前后端创建 Dockerfile
- • 使用 Docker Compose 本地开发
- • 部署到云平台(AWS、GCP、Azure 等)
6️⃣ 分析与监控
- • 添加结构化日志
- • 集成 LangSmith 或 OpenTelemetry
- • 监控延迟、Token 使用量、成本与错误率
- 总结
这个 POC 展示了真实 AI 应用的构建方式:
- • 有状态的工作流,而非临时拼接 Prompt
- • 基于 LangGraph 节点的显式控制流
- • 清晰的前后端分离
- • 带超时控制的异步执行
- • 内建可观测性与安全意识
它规模不大,但设计遵循生产级原则。
如果你能够构建并理解这个系统,那么你不仅是在做一个聊天机器人。
你是在构建一个可扩展、实时 AI 应用的工程基础架构。
从这里开始,扩展规模只是工程问题,而不再是风险博弈。
普通人如何抓住AI大模型的风口?
领取方式在文末
为什么要学习大模型?
目前AI大模型的技术岗位与能力培养随着人工智能技术的迅速发展和应用 , 大模型作为其中的重要组成部分 , 正逐渐成为推动人工智能发展的重要引擎 。大模型以其强大的数据处理和模式识别能力, 广泛应用于自然语言处理 、计算机视觉 、 智能推荐等领域 ,为各行各业带来了革命性的改变和机遇 。
目前,开源人工智能大模型已应用于医疗、政务、法律、汽车、娱乐、金融、互联网、教育、制造业、企业服务等多个场景,其中,应用于金融、企业服务、制造业和法律领域的大模型在本次调研中占比超过 30%。
随着AI大模型技术的迅速发展,相关岗位的需求也日益增加。大模型产业链催生了一批高薪新职业:
人工智能大潮已来,不加入就可能被淘汰。如果你是技术人,尤其是互联网从业者,现在就开始学习AI大模型技术,真的是给你的人生一个重要建议!
最后
只要你真心想学习AI大模型技术,这份精心整理的学习资料我愿意无偿分享给你,但是想学技术去乱搞的人别来找我!
在当前这个人工智能高速发展的时代,AI大模型正在深刻改变各行各业。我国对高水平AI人才的需求也日益增长,真正懂技术、能落地的人才依旧紧缺。我也希望通过这份资料,能够帮助更多有志于AI领域的朋友入门并深入学习。
真诚无偿分享!!!
vx扫描下方二维码即可
加上后会一个个给大家发
【附赠一节免费的直播讲座,技术大佬带你学习大模型的相关知识、学习思路、就业前景以及怎么结合当前的工作发展方向等,欢迎大家~】
大模型全套学习资料展示
自我们与MoPaaS魔泊云合作以来,我们不断打磨课程体系与技术内容,在细节上精益求精,同时在技术层面也新增了许多前沿且实用的内容,力求为大家带来更系统、更实战、更落地的大模型学习体验。

希望这份系统、实用的大模型学习路径,能够帮助你从零入门,进阶到实战,真正掌握AI时代的核心技能!
01 教学内容

-
从零到精通完整闭环:【基础理论 →RAG开发 → Agent设计 → 模型微调与私有化部署调→热门技术】5大模块,内容比传统教材更贴近企业实战!
-
大量真实项目案例: 带你亲自上手搞数据清洗、模型调优这些硬核操作,把课本知识变成真本事!
02适学人群
应届毕业生: 无工作经验但想要系统学习AI大模型技术,期待通过实战项目掌握核心技术。
零基础转型: 非技术背景但关注AI应用场景,计划通过低代码工具实现“AI+行业”跨界。
业务赋能突破瓶颈: 传统开发者(Java/前端等)学习Transformer架构与LangChain框架,向AI全栈工程师转型。

vx扫描下方二维码即可
【附赠一节免费的直播讲座,技术大佬带你学习大模型的相关知识、学习思路、就业前景以及怎么结合当前的工作发展方向等,欢迎大家~】
本教程比较珍贵,仅限大家自行学习,不要传播!更严禁商用!
03 入门到进阶学习路线图
大模型学习路线图,整体分为5个大的阶段:
04 视频和书籍PDF合集

从0到掌握主流大模型技术视频教程(涵盖模型训练、微调、RAG、LangChain、Agent开发等实战方向)

新手必备的大模型学习PDF书单来了!全是硬核知识,帮你少走弯路(不吹牛,真有用)
05 行业报告+白皮书合集
收集70+报告与白皮书,了解行业最新动态!
06 90+份面试题/经验
AI大模型岗位面试经验总结(谁学技术不是为了赚$呢,找个好的岗位很重要)

07 deepseek部署包+技巧大全

由于篇幅有限
只展示部分资料
并且还在持续更新中…
真诚无偿分享!!!
vx扫描下方二维码即可
加上后会一个个给大家发
【附赠一节免费的直播讲座,技术大佬带你学习大模型的相关知识、学习思路、就业前景以及怎么结合当前的工作发展方向等,欢迎大家~】
更多推荐

所有评论(0)