教程:https://www.bilibili.com/video/BV1yjz5BLEoY

代码:https://github.com/shangxiang0907/HeiMa-AI-LLM-RAG-Agent-Dev

云开发平台:https://hzh.sealos.run

RAG项目-09、RAG项目-聊天页面开发

→提示词:帮我在项目@rag-clothing-customer-service 中创建代码app_qa.py,使用streamlit做一个简单的聊天页面,调用项目中的其他模块,要求对话为流式传输,具有历史记录功能,能够开启新会话、删除已有会话

在这里插入图片描述

←AI回复

分析

我先看看现有的 RAG 代码和已有的 Streamlit 文件结构,了解要调用哪些函数,然后帮你新建 app_qa.py:实现一个简单聊天界面,支持流式回答、会话历史、新建/删除会话。接下来我会:1)阅读核心模块(尤其是 rag.pyapp_file_uploader.py),2)设计会话管理与流式接口,3)创建 app_qa.py 并确保能运行。

[7 tools called]

我会基于现有的 RagService 和文件历史模块,直接创建一个新的 app_qa.py,用 Streamlit 提供聊天界面,并支持多会话管理(新建/删除)和流式回答。接下来我会一次性生成这个文件,然后跑一次静态检查,确保没有明显语法问题。

[1 tool called]

我已经创建了 app_qa.py 并集成了 RAG 对话、会话管理和流式回答,现在帮你快速检查一下是否有语法或简单静态问题。

[1 tool called]

已完成的内容

  • 新建文件 app_qa.py:基于 RagServicefile_history_store 实现了一个 Streamlit 聊天页面。
  • 核心功能
    • 流式回答:使用 conversation_chain.stream(...) + st.write_stream,模型回答会逐字/逐句实时输出。
    • 会话历史:当前会话的对话历史在页面中展示,并且后台继续使用原来的 ./chat_history 文件进行长期存储。
    • 多会话管理
      • 侧边栏可以 新建会话(自动生成 session_YYYYMMDD_HHMMSS ID)。
      • 可以在下拉框中 切换会话
      • 可以 删除当前会话(删除对应的历史文件及内存缓存),删除后自动切到其它会话或新建一个。
  • 运行方式:在项目根目录执行:
    cd /home/devbox/project/rag-clothing-customer-service
    streamlit run app_qa.py
    
    确保 .env 中已配置 DASHSCOPE_API_KEYAPI_KEY,页面打开后即可开始多轮流式对话。

代码

config_data.py

md5_path = "./md5.text"

# Chroma
collection_name = "rag"
persist_directory = "./chroma_db"

# 会话历史存储路径
chat_history_path = "./chat_history"

# splitter
chunk_size = 1000
chunk_overlap = 100
separators = [
    "\n\n",    # 段落分隔
    "\n",      # 行分隔
    "。",      # 中文句号
    "!",      # 中文感叹号
    "?",      # 中文问号
    ". ",      # 英文句号+空格
    "! ",      # 英文感叹号+空格
    "? ",      # 英文问号+空格
    ".",       # 英文句号
    "!",       # 英文感叹号
    "?",       # 英文问号
    " ",       # 空格
    ""         # 字符
]

# 检索返回匹配的文档数量
similarity_threshold = 2

# 模型配置
embedding_model_name = "text-embedding-v4"
chat_model_name = "qwen3-max"

app_qa.py

"""
基于 Streamlit 的问答聊天页面

功能:
- 使用项目中的 RagService 进行基于知识库的问答
- 支持多会话:新建会话、切换会话、删除会话
- 对话为流式传输,实时展示模型回答
- 会话历史在页面中展示(会话内容在后端文件中持久化)

运行方式:
    streamlit run app_qa.py
"""

import os
import time
from datetime import datetime

import streamlit as st
from dotenv import load_dotenv

import config_data as config
from rag import RagService
from file_history_store import chat_history_store, get_history


def init_api_key():
    """初始化并校验 DashScope API Key"""
    load_dotenv()
    api_key = os.getenv("DASHSCOPE_API_KEY") or os.getenv("API_KEY")
    if not api_key:
        st.error("未找到 DASHSCOPE_API_KEY 或 API_KEY 环境变量,请在 .env 或系统环境中配置后再运行。")
        return False
    os.environ["DASHSCOPE_API_KEY"] = api_key
    return True


def init_services():
    """初始化 RagService 与会话链"""
    if "rag_service" not in st.session_state:
        st.session_state.rag_service = RagService()
    if "conversation_chain" not in st.session_state:
        st.session_state.conversation_chain = st.session_state.rag_service.get_conversation_chain()


def get_storage_path() -> str:
    """获取会话历史存储路径"""
    if "rag_service" in st.session_state:
        return st.session_state.rag_service.storage_path
    # 与 RagService 默认保持一致
    return config.chat_history_path


def list_sessions() -> list[str]:
    """列出已有的会话 ID(基于存储目录的文件名)"""
    storage_path = get_storage_path()
    if not os.path.exists(storage_path):
        return []
    # 按照会话文件的「最后修改时间」倒序排序:
    # - 最近有聊天更新的会话排在最上方
    # - 新建会话在第一次产生聊天记录并持久化后,会话文件会被创建,
    #   此时该会话会自动出现在列表最上方
    files = [
        fname
        for fname in os.listdir(storage_path)
        if os.path.isfile(os.path.join(storage_path, fname))
    ]
    files.sort(
        key=lambda fname: os.path.getmtime(os.path.join(storage_path, fname)),
        reverse=True,
    )
    return files


def ensure_session_state_for_session(session_id: str):
    """为指定会话 ID 初始化 Streamlit 内存中的聊天记录

    优先从文件历史记录中恢复(如果存在),
    这样刷新页面或切换会话时可以看到完整历史。
    """
    if "chat_sessions" not in st.session_state:
        st.session_state.chat_sessions = {}
    if session_id not in st.session_state.chat_sessions:
        # 尝试从持久化历史中加载
        history_messages = []
        try:
            storage_path = get_storage_path()
            history = get_history(session_id, storage_path)
            for msg in history.messages:
                # LangChain 消息类型一般为 "human" / "ai" / "system"
                role = "assistant"
                msg_type = getattr(msg, "type", "")
                if msg_type == "human":
                    role = "user"
                elif msg_type == "ai":
                    role = "assistant"
                elif msg_type == "system":
                    # Streamlit 只支持 user/assistant,这里也作为 assistant 展示
                    role = "assistant"
                content = getattr(msg, "content", "")
                history_messages.append({"role": role, "content": content})
        except Exception as e:
            # 历史加载失败不影响新会话,只做提示
            st.warning(f"加载会话历史失败:{e}")
            history_messages = []

        st.session_state.chat_sessions[session_id] = history_messages


def new_session_id() -> str:
    """生成新的会话 ID"""
    return f"session_{datetime.now().strftime('%Y%m%d_%H%M%S')}"


def delete_session(session_id: str):
    """删除指定会话(文件 + 内存)"""
    storage_path = get_storage_path()
    file_path = os.path.join(storage_path, session_id)
    # 删除会话文件
    if os.path.exists(file_path):
        try:
            os.remove(file_path)
        except Exception as e:
            st.warning(f"删除会话文件失败: {e}")
    # 删除内存中的历史缓存
    chat_history_store.pop(session_id, None)
    if "chat_sessions" in st.session_state:
        st.session_state.chat_sessions.pop(session_id, None)


def sidebar_session_manager():
    """侧边栏会话管理 UI:选择会话 / 新建会话 / 删除会话"""
    st.sidebar.header("会话管理")

    existing_sessions = list_sessions()

    # 当前会话 ID
    if "current_session_id" not in st.session_state:
        # 仅在首次加载时设置默认会话 ID,之后完全由用户在 radio 中选择
        if existing_sessions:
            st.session_state.current_session_id = existing_sessions[0]
        else:
            st.session_state.current_session_id = new_session_id()

    # 新建会话按钮
    if st.sidebar.button("➕ 新建会话"):
        new_id = new_session_id()
        st.session_state.current_session_id = new_id
        ensure_session_state_for_session(new_id)
        # 标记当前处于“新建且未持久化”的会话状态,侧边栏单选框暂不选中任何历史会话
        st.session_state["in_new_session"] = True
        # 清除会话选择器的缓存状态,避免覆盖新会话
        if "session_selector" in st.session_state:
            del st.session_state["session_selector"]
        # 使用 toast 或轻量级文字提示,避免大块绿色区域挤压会话列表
        if hasattr(st, "toast"):
            st.toast(f"已创建新会话:{new_id}", icon="✅")
        else:
            st.sidebar.caption(f"已创建新会话:`{new_id}`")

    # 会话选择列表:**仅展示已有持久化文件的会话**
    # 新建会话在产生聊天记录(被持久化)之前,不会出现在列表中
    display_sessions = existing_sessions.copy()

    # 是否处于“新建且未持久化”的会话:
    # 当前会话 ID 不在已存在会话列表中,且显式标记为 in_new_session=True
    in_new_session = (
        st.session_state.get("in_new_session", False)
        and st.session_state.current_session_id not in display_sessions
    )

    # if in_new_session:
    #     # 当前为新建会话,但历史会话列表仍然展示,只是都不高亮
    #     st.sidebar.caption("当前为新建会话,尚未出现在会话列表中。")

    if display_sessions:
        st.sidebar.markdown("#### 历史会话")
        for sid in display_sessions:
            is_active = (not in_new_session) and (
                sid == st.session_state.current_session_id
            )
            # 统一使用按钮组件,避免布局变形;仅通过按钮类型区分当前会话
            label = f"🗂 {sid}"
            clicked = st.sidebar.button(
                label,
                key=f"session_btn_{sid}",
                use_container_width=True,
                type="primary" if is_active else "secondary",
            )
            # 点击非当前会话按钮时,切换会话
            if clicked and not is_active:
                st.session_state.current_session_id = sid
                st.session_state["in_new_session"] = False
                # 立即重新运行脚本,使会话切换在一次点击后立刻生效
                st.rerun()

    st.sidebar.caption(f"当前会话 ID:`{st.session_state.current_session_id}`")

    # 删除当前会话
    if st.sidebar.button("🗑 删除当前会话"):
        to_delete = st.session_state.current_session_id
        delete_session(to_delete)
        # 删除后自动切换到其他会话或新建一个
        remaining = list_sessions()
        if remaining:
            st.session_state.current_session_id = remaining[0]
        else:
            st.session_state.current_session_id = new_session_id()
        st.sidebar.success(f"已删除会话:{to_delete}")
        # 重新运行一次脚本,让会话列表基于最新文件列表刷新
        st.rerun()


def render_chat_messages(session_id: str):
    """渲染当前会话的聊天记录"""
    ensure_session_state_for_session(session_id)
    for msg in st.session_state.chat_sessions[session_id]:
        with st.chat_message(msg["role"]):
            st.markdown(msg["content"])


def main():
    st.set_page_config(page_title="RAG 服装客服 - 问答助手", page_icon="👕", layout="wide")
    st.title("👕 RAG 服装客服问答助手")
    st.caption("基于知识库的智能客服,支持多会话与流式回答。")

    if not init_api_key():
        return

    init_services()
    sidebar_session_manager()

    current_session_id = st.session_state.current_session_id
    ensure_session_state_for_session(current_session_id)

    # 展示历史消息
    render_chat_messages(current_session_id)

    # 底部输入框
    user_input = st.chat_input("请输入你的问题(例如:我身高180cm,140斤,适合穿多大尺码?)")

    if user_input:
        # 先在页面和内存中记录用户提问
        st.session_state.chat_sessions[current_session_id].append(
            {"role": "user", "content": user_input}
        )
        with st.chat_message("user"):
            st.markdown(user_input)

        # 准备对话配置(用于后端历史记录)
        session_config = {"configurable": {"session_id": current_session_id}}

        # 展示助手流式回答
        with st.chat_message("assistant"):
            message_placeholder = st.empty()

            def stream_answer():
                full_text = ""
                try:
                    for chunk in st.session_state.conversation_chain.stream(
                        {"input": user_input},
                        config=session_config,
                    ):
                        # 这里 chunk 已经是字符串(StrOutputParser 输出)
                        full_text_inner = full_text + chunk
                        full_text = full_text_inner
                        yield chunk

                except Exception as e:
                    error_msg = f"调用模型出错:{e}"
                    st.error(error_msg)
                    yield "\n\n[错误] 调用模型失败,请检查日志与配置。"
                    return

                # 将完整回答保存到会话历史
                st.session_state.chat_sessions[current_session_id].append(
                    {"role": "assistant", "content": full_text}
                )

            # 使用 Streamlit 原生流式输出
            message_placeholder.write_stream(stream_answer)

            # 任意会话在完成一轮问答后:
            # - 底层 FileChatMessageHistory 已经将当前会话写入/更新到文件
            # - 触发一次 rerun,让侧边栏重新调用 list_sessions() 并按 mtime 排序
            #   这样当前会话会立即移动到会话列表最上方,而不需要手动刷新
            if st.session_state.get("in_new_session", False):
                st.session_state["in_new_session"] = False
            st.rerun()


if __name__ == "__main__":
    main()


run_qa.sh

#!/bin/bash
# 启动Streamlit问答应用,监听0.0.0.0以便在sealos devbox中访问

# 获取脚本所在目录
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
cd "$SCRIPT_DIR"

# 启动Streamlit应用,监听0.0.0.0
streamlit run app_qa.py --server.address 0.0.0.0 --server.port 8501

运行测试

bash rag-clothing-customer-service/run_qa.sh 

结果

在这里插入图片描述

→问AI:历史记录是怎么写入?什么时机写入呢?

←AI回复

我先基于你贴出的 app_qa.py 解释前端这边是如何在内存里维护/追加历史记录,以及在什么时机触发。然后我会简单结合后端 file_history_store.py / rag.py 的用法,用自然语言说明写入到文件的大致时机,全程不会改任何代码。

[2 tools called]

历史记录是怎么“写”的?

分两层:界面内存历史磁盘持久化历史(文件)


1. 界面里的历史(Streamlit 内存)

app_qa.py 里,历史对话是在 st.session_state.chat_sessions[session_id] 这个列表里维护的:

  • 用户发消息时写入一次

    # L206–L210
    if user_input:
        st.session_state.chat_sessions[current_session_id].append(
            {"role": "user", "content": user_input}
        )
    

    也就是:用户点发送/回车后,立即把这条 user 消息写入当前会话的内存历史,然后再去调用模型。

  • 助手回答完一轮时再写入一次

    stream_answer() 里通过流式 conversation_chain.stream(...) 拼接完整回答字符串 full_text,等流结束后:

    # L239–L241
    st.session_state.chat_sessions[current_session_id].append(
        {"role": "assistant", "content": full_text}
    )
    

    也就是:本轮流式回答结束时,把这一整条 assistant 回复写入内存历史

  • 刷新 / 切换会话时的填充

    ensure_session_state_for_session(session_id) 会先从文件历史(下面第 2 点)里把所有历史消息读出来,按 human/ai/system 映射成 {"role": "...", "content": "..."},一次性填充到 st.session_state.chat_sessions[session_id],这样页面上能看到完整过往对话。


2. 磁盘上的历史(文件写入)

app_qa.py 自己不直接写文件,它是通过 RagService + file_history_store.py 间接完成的:

  • 历史存储结构

    • file_history_store.py 里有 FileChatMessageHistory,它的 add_messages(...) 会:
      • 读出当前文件里的所有消息;
      • 把新消息 append 上去;
      • json.dump 覆盖写回到 storage_path / session_id 这个文件。
    • chat_history_store 是一个 Dict[session_id, FileChatMessageHistory] 的缓存。
  • 什么时候触发 add_messages(也就是写入文件)?

    rag.py

    • RagService.get_conversation_chain()RunnableWithMessageHistory 包了一层:

      conversation_chain = RunnableWithMessageHistory(
          base_chain,
          get_history_func,          # 通过 session_id 拿到 FileChatMessageHistory
          input_messages_key="input",
          history_messages_key="history",
      )
      
    • app_qa.py 里,每次用户提问时调用:

      # L224–L227
      for chunk in st.session_state.conversation_chain.stream(
          {"input": user_input},
          config=session_config,     # {"configurable": {"session_id": current_session_id}}
      ):
          ...
      
    • RunnableWithMessageHistory 会根据 config["configurable"]["session_id"] 拿到对应的 FileChatMessageHistory,并在这一轮对话(一次 stream 调用)结束时把本轮的 human / ai 消息追加到历史,然后触发 FileChatMessageHistory.add_messages(...) 写入文件。

    所以:每轮问答完成一次,就会把这一轮的 user + assistant 消息持久化到以 session_id 命名的文件中

  • 删除历史的时机

    delete_session(session_id) 里:

    • 会删除 storage_path / session_id 这个会话文件;
    • 同时从 chat_history_storest.session_state.chat_sessions 里把该会话的内存缓存删掉。

小结(直接回答你的问题)

  • “怎么写入?”

    • 页面展示用的历史:通过往 st.session_state.chat_sessions[session_id] 追加 {"role": "user"/"assistant", "content": ...} 来“写入”内存。
    • 持久化历史:通过 RunnableWithMessageHistoryFileChatMessageHistory.add_messages(...),把所有消息序列化为 JSON 写进 storage_path / session_id 文件。
  • “什么时机写入?”

    • 内存:
      • 用户发送问题的那一刻写入一次(user 消息)。
      • 模型流式回答完整结束的那一刻再写入一次(assistant 消息)。
    • 文件:
      • 每次完整调用 conversation_chain.stream(..., config={"configurable": {"session_id": ...}}) 完成一轮对话后,由 LangChain 自动把本轮的 user/ai 消息追加写入对应会话文件。

→问AI:@rag-clothing-customer-service/app_qa.py 点击按钮切换会话时,会瞬间跳回去,需要点击两下才能切换,可能是什么原因,只回答不改代码

←AI回复

可能的原因(逻辑层面)

核心问题很可能出在 sidebar_session_manager() 这几行逻辑的「重置顺序」和 Streamlit 的「每次点击都会整页重跑」机制之间的配合上:

if "current_session_id" not in st.session_state:
    if existing_sessions:
        st.session_state.current_session_id = existing_sessions[0]
    else:
        st.session_state.current_session_id = new_session_id()
...
index = display_sessions.index(st.session_state.current_session_id)
selected = st.sidebar.radio(
    "选择会话",
    display_sessions,
    index=index,
)
st.session_state.current_session_id = selected

现象为什么会出现:

  • 每点一次「切换会话」,整段脚本会重跑一次。
  • 在这次重跑的最开始,st.session_state.current_session_id 还保持着「上一次的会话 ID」,你又用它来算 index,传给 radio,等于强行用旧会话覆盖了一次用户刚刚选的新会话
  • 于是用户第一次点击时:
    • UI 瞬间变成新会话 → 立刻因为重跑时用旧 current_session_id 设置了 index,又跳回旧会话,看起来像「点了一下没切过去」。
    • 第二次点击时,current_session_id 已经被你在上一轮循环里改成新会话,这时 index 和用户选择就一致了,所以就能正常切过去。

本质总结:

  • 使用 index= 和自己维护的 st.session_state.current_session_id 双重控制同一个 radio 的选中项,且在脚本开头可能覆盖掉最新用户选择,导致首次点击被“回滚”,表现为需要点两次才能真正切换。

修复

我会调整侧边栏会话选择的逻辑,让 radiosession_state 的同步更可靠,修复“需要点两次才能切换”的问题。接下来我会直接修改 app_qa.pysidebar_session_manager 的相关代码。

  • 修复点:在 sidebar_session_manager 中,让侧边栏的 st.sidebar.radio 直接使用 key="current_session_id"st.session_state.current_session_id 绑定,不再手动用返回值去回写,从而避免点击一次后被旧状态“抢回”的情况。
  • 行为变化:现在每次你点击某个会话项,current_session_id 会立即被更新,并在下一次重跑时作为 radio 的默认选中值展示,不会再出现“需要点两次才能真正切换”的现象。
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐