专业历史知识智能体系统设计与实现

概述

本文将详细介绍一个基于Python的专业历史知识智能体系统,该系统能够处理历史文档和图片,实现精确对话功能,并能根据知识库生成高精度的历史文物海报或图片,同时确保图像生成过程不擅自修改文物内容。

系统架构

整体架构设计

+-----------------------+
|     用户交互层        |
|  (Web/API/CLI界面)    |
+-----------------------+
           |
           v
+-----------------------+
|   核心处理引擎        |
|  (对话管理/图像生成)   |
+-----------------------+
           |
           v
+-----------------------+    +-----------------------+
|   知识库管理层        |<-->|  向量数据库          |
|  (文档/图片处理)      |    |  (Chroma/Weaviate)    |
+-----------------------+    +-----------------------+
           |
           v
+-----------------------+
|   外部服务集成层      |
|  (LLM/图像生成API)    |
+-----------------------+

技术栈选择

  • 语言: Python 3.9+
  • 知识库存储: Chroma/Weaviate向量数据库
  • 大语言模型: OpenAI GPT-4/GPT-3.5或本地LLM(Llama 2)
  • 图像处理: OpenCV, Pillow
  • 图像生成: Stable Diffusion (自定义训练)
  • Web框架: FastAPI
  • 前端界面: Streamlit/Gradio
  • 任务队列: Celery with Redis
  • 文档处理: Unstructured, PyMuPDF

系统实现

1. 环境设置与依赖安装

首先创建项目结构并安装必要的依赖:

# 创建项目目录
mkdir historical-knowledge-agent
cd historical-knowledge-agent

# 创建虚拟环境
python -m venv venv
source venv/bin/activate  # Linux/Mac
# 或
venv\Scripts\activate  # Windows

# 安装核心依赖
pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118
pip install transformers diffusers accelerate
pip install chromadb unstructured pymupdf opencv-python
pip install fastapi uvicorn celery redis
pip install streamlit gradio
pip install python-multipart

2. 知识库管理系统实现

文档处理模块
import os
import fitz  # PyMuPDF
from unstructured.partition.pdf import partition_pdf
from unstructured.partition.image import partition_image
import chromadb
from chromadb.utils import embedding_functions
from typing import List, Dict, Any
import hashlib

class DocumentProcessor:
    def __init__(self, persist_directory: str = "./chroma_db"):
        self.persist_directory = persist_directory
        self.embedding_function = embedding_functions.SentenceTransformerEmbeddingFunction(
            model_name="all-MiniLM-L6-v2"
        )
        self.client = chromadb.Client(
            chromadb.config.Settings(
                persist_directory=persist_directory,
                is_persistent=True
            )
        )
        self.collection = self.client.get_or_create_collection(
            name="historical_documents",
            embedding_function=self.embedding_function
        )
    
    def extract_text_from_pdf(self, file_path: str) -> List[Dict[str, Any]]:
        """从PDF提取文本内容"""
        elements = partition_pdf(
            filename=file_path,
            extract_images_in_pdf=False,
            infer_table_structure=True,
            chunking_strategy="by_title",
            max_characters=2000,
            new_after_n_chars=1500,
            combine_text_under_n_chars=1000
        )
        
        documents = []
        for i, element in enumerate(elements):
            if hasattr(element, "text") and element.text.strip():
                doc_id = hashlib.md5(f"{file_path}_{i}".encode()).hexdigest()
                documents.append({
                    "id": doc_id,
                    "text": element.text,
                    "metadata": {
                        "source": file_path,
                        "page": getattr(element, "page_number", None),
                        "type": type(element).__name__
                    }
                })
        
        return documents
    
    def extract_text_from_image(self, image_path: str) -> List[Dict[str, Any]]:
        """从图片中提取文本内容(OCR)"""
        try:
            from PIL import Image
            import pytesseract
            
            image = Image.open(image_path)
            text = pytesseract.image_to_string(image, lang='eng+chi_sim')
            
            doc_id = hashlib.md5(image_path.encode()).hexdigest()
            return [{
                "id": doc_id,
                "text": text,
                "metadata": {
                    "source": image_path,
                    "type": "image_ocr"
                }
            }]
        except Exception as e:
            print(f"OCR处理失败: {e}")
            return []
    
    def add_documents(self, file_path: str, file_type: str = "auto") -> bool:
        """添加文档到知识库"""
        try:
            if file_type == "auto":
                if file_path.lower().endswith(('.png', '.jpg', '.jpeg', '.tiff', '.bmp')):
                    file_type = "image"
                elif file_path.lower().endswith('.pdf'):
                    file_type = "pdf"
                else:
                    return False
            
            if file_type == "pdf":
                documents = self.extract_text_from_pdf(file_path)
            elif file_type == "image":
                documents = self.extract_text_from_image(file_path)
            else:
                return False
            
            if documents:
                ids = [doc["id"] for doc in documents]
                texts = [doc["text"] for doc in documents]
                metadatas = [doc["metadata"] for doc in documents]
                
                self.collection.add(
                    ids=ids,
                    documents=texts,
                    metadatas=metadatas
                )
                
                return True
            return False
        except Exception as e:
            print(f"添加文档失败: {e}")
            return False
    
    def query_documents(self, query: str, n_results: int = 5) -> List[Dict]:
        """查询相关文档"""
        try:
            results = self.collection.query(
                query_texts=[query],
                n_results=n_results
            )
            
            formatted_results = []
            for i in range(len(results['ids'][0])):
                formatted_results.append({
                    "id": results['ids'][0][i],
                    "text": results['documents'][0][i],
                    "metadata": results['metadatas'][0][i],
                    "distance": results['distances'][0][i]
                })
            
            return formatted_results
        except Exception as e:
            print(f"查询失败: {e}")
            return []
图像元数据提取模块
import cv2
import exifread
from PIL import Image, ExifTags
import json
from datetime import datetime

class ImageMetadataExtractor:
    def __init__(self):
        pass
    
    def extract_metadata(self, image_path: str) -> Dict[str, Any]:
        """提取图像的元数据"""
        metadata = {
            "basic": {},
            "exif": {},
            "historical_context": {}
        }
        
        # 基本图像信息
        try:
            img = Image.open(image_path)
            metadata["basic"]["format"] = img.format
            metadata["basic"]["size"] = img.size
            metadata["basic"]["mode"] = img.mode
        except Exception as e:
            print(f"无法读取图像基本信息: {e}")
        
        # EXIF数据
        try:
            with open(image_path, 'rb') as f:
                tags = exifread.process_file(f)
                for tag, value in tags.items():
                    if tag not in ('JPEGThumbnail', 'TIFFThumbnail', 'Filename', 'EXIF MakerNote'):
                        metadata["exif"][tag] = str(value)
        except Exception as e:
            print(f"无法读取EXIF数据: {e}")
        
        # 使用OpenCV分析图像特征
        try:
            img_cv = cv2.imread(image_path)
            if img_cv is not None:
                metadata["basic"]["shape"] = img_cv.shape
                metadata["basic"]["dtype"] = str(img_cv.dtype)
                
                # 计算颜色直方图(简化版)
                hist = cv2.calcHist([img_cv], [0], None, [256], [0, 256])
                metadata["image_analysis"] = {
                    "histogram": hist.flatten().tolist()
                }
        except Exception as e:
            print(f"OpenCV分析失败: {e}")
        
        return metadata
    
    def extract_historical_context(self, image_path: str, text_description: str = "") -> Dict[str, Any]:
        """尝试从图像中提取历史背景信息"""
        # 这里可以集成专门的文物识别模型
        # 简化版实现:基于文件名和文本描述生成一些假设
        
        context = {
            "era": "unknown",
            "cultural_origin": "unknown",
            "artifact_type": "unknown",
            "materials": "unknown",
            "estimated_period": "unknown"
        }
        
        # 基于文件名的简单启发式规则
        filename = os.path.basename(image_path).lower()
        
        # 时代检测(简化版)
        era_keywords = {
            "ancient": ["ancient", "antique", "old", "classical"],
            "medieval": ["medieval", "middle ages", "gothic"],
            "renaissance": ["renaissance", "baroque"],
            "modern": ["modern", "contemporary", "20th", "21st"]
        }
        
        for era, keywords in era_keywords.items():
            if any(keyword in filename for keyword in keywords):
                context["era"] = era
                break
        
        # 文物类型检测
        artifact_keywords = {
            "pottery": ["pottery", "ceramic", "vase", "urn"],
            "sculpture": ["sculpture", "statue", "bust"],
            "painting": ["painting", "canvas", "oil"],
            "weapon": ["weapon", "sword", "spear", "armor"],
            "document": ["document", "manuscript", "scroll"]
        }
        
        for artifact_type, keywords in artifact_keywords.items():
            if any(keyword in filename for keyword in keywords):
                context["artifact_type"] = artifact_type
                break
        
        return context

3. 对话系统实现

对话管理模块
from typing import List, Dict, Any
import openai
# 或者使用本地LLM
from transformers import pipeline, AutoTokenizer, AutoModelForCausalLM
import re

class DialogueManager:
    def __init__(self, knowledge_base: DocumentProcessor, 
                 use_openai: bool = True, 
                 openai_api_key: str = None,
                 local_model_path: str = None):
        self.knowledge_base = knowledge_base
        self.use_openai = use_openai
        self.conversation_history = []
        
        if use_openai and openai_api_key:
            openai.api_key = openai_api_key
            self.llm = self._call_openai
        elif local_model_path:
            # 加载本地LLM
            self.tokenizer = AutoTokenizer.from_pretrained(local_model_path)
            self.model = AutoModelForCausalLM.from_pretrained(local_model_path)
            self.llm = self._call_local_llm
        else:
            raise ValueError("必须提供OpenAI API密钥或本地模型路径")
    
    def _call_openai(self, prompt: str, max_tokens: int = 500) -> str:
        """调用OpenAI API"""
        try:
            response = openai.ChatCompletion.create(
                model="gpt-4",
                messages=[{"role": "user", "content": prompt}],
                max_tokens=max_tokens,
                temperature=0.7
            )
            return response.choices[0].message.content
        except Exception as e:
            return f"调用OpenAI API时出错: {e}"
    
    def _call_local_llm(self, prompt: str, max_tokens: int = 500) -> str:
        """调用本地LLM"""
        try:
            inputs = self.tokenizer(prompt, return_tensors="pt", truncation=True, max_length=2048)
            outputs = self.model.generate(
                **inputs,
                max_new_tokens=max_tokens,
                temperature=0.7,
                do_sample=True,
                pad_token_id=self.tokenizer.eos_token_id
            )
            response = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
            return response[len(prompt):]  # 返回新生成的部分
        except Exception as e:
            return f"调用本地LLM时出错: {e}"
    
    def generate_response(self, user_query: str, use_knowledge_base: bool = True) -> str:
        """生成对用户查询的响应"""
        # 1. 检索相关知识
        relevant_info = ""
        if use_knowledge_base:
            kb_results = self.knowledge_base.query_documents(user_query, n_results=3)
            if kb_results:
                relevant_info = "相关知识:\n"
                for i, result in enumerate(kb_results):
                    relevant_info += f"{i+1}. {result['text'][:500]}...\n"
                    relevant_info += f"来源: {result['metadata']['source']}\n\n"
        
        # 2. 构建提示
        prompt = f"""
        你是一个专业的历史知识助手,专门回答关于历史文物、事件和人物的相关问题。

        {relevant_info}

        用户问题: {user_query}

        请根据以上信息提供一个准确、专业的回答。如果你不确定或信息不足,请如实说明,不要编造信息。
        回答时请保持专业性和客观性,引用已知事实。

        回答:
        """
        
        # 3. 调用LLM生成回答
        response = self.llm(prompt, max_tokens=800)
        
        # 4. 保存对话历史
        self.conversation_history.append({
            "user": user_query,
            "assistant": response,
            "timestamp": datetime.now().isoformat()
        })
        
        # 5. 限制历史记录长度
        if len(self.conversation_history) > 10:
            self.conversation_history = self.conversation_history[-10:]
        
        return response
    
    def handle_follow_up(self, follow_up_query: str) -> str:
        """处理后续问题,考虑对话上下文"""
        # 构建包含上下文的提示
        context = "对话历史:\n"
        for i, turn in enumerate(self.conversation_history[-3:]):  # 最近3轮对话
            context += f"用户: {turn['user']}\n"
            context += f助手: {turn['assistant']}\n\n"
        
        prompt = f"""
        {context}
        
        当前问题: {follow_up_query}
        
        请根据对话历史上下文回答当前问题,保持回答的连贯性和相关性。
        """
        
        response = self.llm(prompt, max_tokens=600)
        
        # 更新对话历史
        self.conversation_history.append({
            "user": follow_up_query,
            "assistant": response,
            "timestamp": datetime.now().isoformat()
        })
        
        return response

4. 图像生成系统实现

精确图像生成工作流
import torch
from diffusers import StableDiffusionPipeline, StableDiffusionImg2ImgPipeline
from diffusers import DPMSolverMultistepScheduler
from PIL import Image, ImageDraw, ImageFont
import numpy as np
import cv2

class HistoricalImageGenerator:
    def __init__(self, model_path: str = "runwayml/stable-diffusion-v1-5", 
                 device: str = "cuda" if torch.cuda.is_available() else "cpu"):
        self.device = device
        
        # 加载文本到图像模型
        self.txt2img_pipe = StableDiffusionPipeline.from_pretrained(
            model_path,
            torch_dtype=torch.float16 if device == "cuda" else torch.float32,
            safety_checker=None,
            requires_safety_checker=False
        )
        self.txt2img_pipe.scheduler = DPMSolverMultistepScheduler.from_config(
            self.txt2img_pipe.scheduler.config
        )
        self.txt2img_pipe = self.txt2img_pipe.to(device)
        
        # 加载图像到图像模型
        self.img2img_pipe = StableDiffusionImg2ImgPipeline.from_pretrained(
            model_path,
            torch_dtype=torch.float16 if device == "cuda" else torch.float32,
            safety_checker=None,
            requires_safety_checker=False
        )
        self.img2img_pipe = self.img2img_pipe.to(device)
        
        # 文物保护提示词
        self.preservation_prompt = (
            "historical accuracy, museum quality, archeological precision, "
            "authentic details, no anachronisms, no modern elements, "
            "professional photograph, expert restoration"
        )
        
        # 负面提示词
        self.negative_prompt = (
            "blurry, distorted, inaccurate, modern, fictional, fantasy, "
            "anachronism, fake, cartoon, animation, video game, CGI, "
            "incorrect proportions, wrong colors, unrealistic"
        )
    
    def generate_from_description(self, description: str, 
                                 additional_context: str = "",
                                 width: int = 512, 
                                 height: int = 512,
                                 num_inference_steps: int = 30,
                                 guidance_scale: float = 7.5) -> Image.Image:
        """根据文本描述生成历史文物图像"""
        
        # 构建完整的提示词
        full_prompt = f"{description}. {additional_context}. {self.preservation_prompt}"
        
        # 生成图像
        result = self.txt2img_pipe(
            prompt=full_prompt,
            negative_prompt=self.negative_prompt,
            width=width,
            height=height,
            num_inference_steps=num_inference_steps,
            guidance_scale=guidance_scale,
            num_images_per_prompt=1
        )
        
        return result.images[0]
    
    def enhance_existing_image(self, base_image: Image.Image, 
                              enhancement_description: str,
                              strength: float = 0.7) -> Image.Image:
        """基于现有图像进行增强,保持文物原貌"""
        
        # 构建保护性提示词
        protective_prompt = f"Enhance and clarify this historical artifact while preserving all original details. {enhancement_description}. {self.preservation_prompt}"
        
        # 使用img2img进行增强
        result = self.img2img_pipe(
            prompt=protective_prompt,
            negative_prompt=self.negative_prompt,
            image=base_image,
            strength=strength,
            num_inference_steps=30,
            guidance_scale=7.5
        )
        
        return result.images[0]
    
    def create_educational_poster(self, historical_data: Dict[str, Any], 
                                 template_image: Image.Image = None) -> Image.Image:
        """创建教育海报"""
        
        if template_image is None:
            # 创建默认海报模板
            poster_width, poster_height = 1024, 1536
            poster = Image.new('RGB', (poster_width, poster_height), color='white')
            draw = ImageDraw.Draw(poster)
            
            # 添加标题
            title = historical_data.get('title', 'Historical Artifact')
            try:
                font_large = ImageFont.truetype("arial.ttf", 48)
                font_medium = ImageFont.truetype("arial.ttf", 24)
                font_small = ImageFont.truetype("arial.ttf", 18)
            except:
                font_large = ImageFont.load_default()
                font_medium = ImageFont.load_default()
                font_small = ImageFont.load_default()
            
            # 绘制标题
            draw.text((poster_width//2, 50), title, fill='black', font=font_large, anchor='mm')
            
            # 生成文物图像
            artifact_description = historical_data.get('description', 'historical artifact')
            artifact_image = self.generate_from_description(
                artifact_description,
                width=512,
                height=512
            )
            
            # 将文物图像添加到海报
            poster.paste(artifact_image, (poster_width//2 - 256, 120))
            
            # 添加描述文本
            description = historical_data.get('detailed_description', '')
            y_position = 650
            for line in self._wrap_text(description, font_medium, poster_width - 100):
                draw.text((50, y_position), line, fill='black', font=font_medium)
                y_position += 30
            
            # 添加元数据信息
            metadata = historical_data.get('metadata', {})
            y_position += 50
            for key, value in metadata.items():
                info_text = f"{key}: {value}"
                draw.text((50, y_position), info_text, fill='darkgray', font=font_small)
                y_position += 25
            
            return poster
        else:
            # 使用提供的模板
            # 这里可以实现更复杂的模板处理逻辑
            return template_image
    
    def _wrap_text(self, text: str, font: ImageFont.FreeTypeFont, max_width: int) -> List[str]:
        """将文本换行以适应宽度"""
        lines = []
        words = text.split()
        current_line = []
        
        for word in words:
            test_line = ' '.join(current_line + [word])
            bbox = font.getbbox(test_line)
            width = bbox[2] - bbox[0]
            
            if width <= max_width:
                current_line.append(word)
            else:
                lines.append(' '.join(current_line))
                current_line = [word]
        
        if current_line:
            lines.append(' '.join(current_line))
        
        return lines
    
    def verify_historical_accuracy(self, generated_image: Image.Image, 
                                  reference_data: Dict[str, Any]) -> Dict[str, Any]:
        """验证生成图像的历史准确性"""
        # 这里可以实现更复杂的验证逻辑
        # 简化版:基于颜色和形状的基本分析
        
        accuracy_report = {
            "overall_score": 0.0,
            "color_consistency": 0.0,
            "shape_consistency": 0.0,
            "anachronisms_detected": False,
            "warnings": []
        }
        
        # 将图像转换为numpy数组进行分析
        img_array = np.array(generated_image)
        
        # 简单的颜色分析
        avg_color = np.mean(img_array, axis=(0, 1))
        
        # 这里可以添加更复杂的历史准确性检查逻辑
        # 例如与参考图像比较,使用分类器检测时代特征等
        
        return accuracy_report

5. 系统集成与API实现

FastAPI后端实现
from fastapi import FastAPI, File, UploadFile, HTTPException, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, JSONResponse, StreamingResponse
from pydantic import BaseModel
from typing import List, Optional
import uuid
import json
import os
from celery import Celery

# Celery配置
celery_app = Celery(
    'historical_agent',
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/0'
)

app = FastAPI(title="Historical Knowledge Agent API")

# CORS配置
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# 全局组件实例
doc_processor = None
dialogue_manager = None
image_generator = None

class QueryRequest(BaseModel):
    question: str
    use_knowledge_base: bool = True

class ImageGenerationRequest(BaseModel):
    description: str
    context: Optional[str] = None
    width: int = 512
    height: int = 512

class UploadResponse(BaseModel):
    file_id: str
    status: str
    message: str

@app.on_event("startup")
async def startup_event():
    """初始化系统组件"""
    global doc_processor, dialogue_manager, image_generator
    
    # 初始化文档处理器
    doc_processor = DocumentProcessor("./chroma_db")
    
    # 初始化对话管理器(这里需要实际配置API密钥或模型路径)
    try:
        dialogue_manager = DialogueManager(
            knowledge_base=doc_processor,
            use_openai=False,  # 根据实际情况调整
            local_model_path="./local_llm_model"  # 需要提前下载模型
        )
    except:
        print("对话管理器初始化失败,将使用模拟模式")
        dialogue_manager = None
    
    # 初始化图像生成器
    try:
        image_generator = HistoricalImageGenerator()
    except Exception as e:
        print(f"图像生成器初始化失败: {e}")
        image_generator = None

@app.post("/upload", response_model=UploadResponse)
async def upload_file(file: UploadFile = File(...)):
    """上传文档或图片到知识库"""
    try:
        # 创建上传目录
        os.makedirs("uploads", exist_ok=True)
        
        # 生成唯一文件名
        file_extension = os.path.splitext(file.filename)[1]
        file_id = str(uuid.uuid4())
        file_path = f"uploads/{file_id}{file_extension}"
        
        # 保存文件
        with open(file_path, "wb") as f:
            content = await file.read()
            f.write(content)
        
        # 处理文件
        file_type = "auto"
        success = doc_processor.add_documents(file_path, file_type)
        
        if success:
            return UploadResponse(
                file_id=file_id,
                status="success",
                message="文件上传并处理成功"
            )
        else:
            return UploadResponse(
                file_id=file_id,
                status="error",
                message="文件处理失败"
            )
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"上传失败: {str(e)}")

@app.post("/query")
async def query_knowledge_base(request: QueryRequest):
    """查询知识库并生成回答"""
    if dialogue_manager is None:
        return JSONResponse(
            content={"error": "对话系统未初始化"},
            status_code=503
        )
    
    try:
        response = dialogue_manager.generate_response(
            request.question, 
            request.use_knowledge_base
        )
        
        return {"answer": response}
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"查询失败: {str(e)}")

@app.post("/generate_image")
async def generate_image(request: ImageGenerationRequest, background_tasks: BackgroundTasks):
    """生成历史文物图像"""
    if image_generator is None:
        return JSONResponse(
            content={"error": "图像生成系统未初始化"},
            status_code=503
        )
    
    try:
        image = image_generator.generate_from_description(
            request.description,
            request.context or "",
            request.width,
            request.height
        )
        
        # 保存图像
        image_id = str(uuid.uuid4())
        image_path = f"generated_images/{image_id}.png"
        os.makedirs("generated_images", exist_ok=True)
        image.save(image_path)
        
        # 在后台进行准确性验证
        background_tasks.add_task(
            verify_image_accuracy,
            image_path,
            request.description
        )
        
        return FileResponse(
            image_path,
            media_type="image/png",
            filename=f"historical_artifact_{image_id}.png"
        )
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"图像生成失败: {str(e)}")

@celery_app.task
def verify_image_accuracy(image_path: str, description: str):
    """后台任务:验证图像历史准确性"""
    try:
        # 这里可以实现复杂的验证逻辑
        print(f"开始验证图像准确性: {image_path}")
        # 模拟验证过程
        import time
        time.sleep(2)
        print(f"图像验证完成: {image_path}")
    except Exception as e:
        print(f"图像验证失败: {e}")

@app.get("/health")
async def health_check():
    """健康检查端点"""
    components_ok = {
        "document_processor": doc_processor is not None,
        "dialogue_manager": dialogue_manager is not None,
        "image_generator": image_generator is not None
    }
    
    status = "healthy" if all(components_ok.values()) else "degraded"
    
    return {
        "status": status,
        "components": components_ok
    }

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)
Streamlit前端界面
# frontend.py
import streamlit as st
import requests
import base64
from PIL import Image
import io

# 配置API端点
API_BASE = "http://localhost:8000"

st.set_page_config(
    page_title="历史知识智能体",
    page_icon="🏛️",
    layout="wide"
)

st.title("🏛️ 专业历史知识智能体系统")

# 初始化会话状态
if "conversation_history" not in st.session_state:
    st.session_state.conversation_history = []
if "uploaded_files" not in st.session_state:
    st.session_state.uploaded_files = []

# 创建侧边栏
with st.sidebar:
    st.header("系统设置")
    
    # 文件上传
    uploaded_file = st.file_uploader(
        "上传历史文档或图片",
        type=["pdf", "png", "jpg", "jpeg", "tiff"]
    )
    
    if uploaded_file:
        with st.spinner("上传并处理文件中..."):
            files = {"file": (uploaded_file.name, uploaded_file.getvalue())}
            response = requests.post(f"{API_BASE}/upload", files=files)
            
            if response.status_code == 200:
                st.success("文件上传成功!")
                st.session_state.uploaded_files.append(uploaded_file.name)
            else:
                st.error("文件上传失败")
    
    # 显示已上传文件
    if st.session_state.uploaded_files:
        st.subheader("已上传文件")
        for file in st.session_state.uploaded_files:
            st.write(f"• {file}")
    
    # 系统状态检查
    if st.button("检查系统状态"):
        try:
            response = requests.get(f"{API_BASE}/health")
            if response.status_code == 200:
                status = response.json()
                st.json(status)
            else:
                st.error("无法获取系统状态")
        except:
            st.error("无法连接到API服务器")

# 创建主界面选项卡
tab1, tab2, tab3 = st.tabs(["对话系统", "图像生成", "知识库管理"])

with tab1:
    st.header("历史知识对话系统")
    
    # 显示对话历史
    for turn in st.session_state.conversation_history:
        with st.chat_message("user"):
            st.write(turn["user"])
        with st.chat_message("assistant"):
            st.write(turn["assistant"])
    
    # 用户输入
    user_query = st.chat_input("请输入关于历史文物的问题...")
    
    if user_query:
        # 添加用户消息到历史
        st.session_state.conversation_history.append({
            "user": user_query,
            "assistant": ""
        })
        
        # 显示用户消息
        with st.chat_message("user"):
            st.write(user_query)
        
        # 获取助手响应
        with st.chat_message("assistant"):
            with st.spinner("思考中..."):
                try:
                    response = requests.post(
                        f"{API_BASE}/query",
                        json={"question": user_query, "use_knowledge_base": True}
                    )
                    
                    if response.status_code == 200:
                        answer = response.json()["answer"]
                        st.write(answer)
                        # 更新对话历史
                        st.session_state.conversation_history[-1]["assistant"] = answer
                    else:
                        st.error("获取回答时出错")
                except:
                    st.error("无法连接到对话服务")

with tab2:
    st.header("历史文物图像生成")
    
    col1, col2 = st.columns(2)
    
    with col1:
        st.subheader("生成参数")
        
        description = st.text_area(
            "文物描述",
            height=100,
            placeholder="详细描述要生成的历史文物,包括时代、材质、用途等信息"
        )
        
        additional_context = st.text_area(
            "附加背景信息",
            height=60,
            placeholder="可选:提供更多历史背景或特定要求"
        )
        
        width = st.slider("图像宽度", 256, 1024, 512, 64)
        height = st.slider("图像高度", 256, 1024, 512, 64)
        
        if st.button("生成图像", disabled=not description):
            with st.spinner("生成历史文物图像中..."):
                try:
                    response = requests.post(
                        f"{API_BASE}/generate_image",
                        json={
                            "description": description,
                            "context": additional_context,
                            "width": width,
                            "height": height
                        }
                    )
                    
                    if response.status_code == 200:
                        # 显示生成的图像
                        image_data = response.content
                        image = Image.open(io.BytesIO(image_data))
                        st.session_state.generated_image = image
                    else:
                        st.error("图像生成失败")
                except:
                    st.error("无法连接到图像生成服务")
    
    with col2:
        st.subheader("生成结果")
        
        if "generated_image" in st.session_state:
            st.image(st.session_state.generated_image, caption="生成的历史文物图像")
            
            # 提供下载链接
            buf = io.BytesIO()
            st.session_state.generated_image.save(buf, format="PNG")
            byte_im = buf.getvalue()
            
            st.download_button(
                label="下载图像",
                data=byte_im,
                file_name="historical_artifact.png",
                mime="image/png"
            )
        else:
            st.info("请输入文物描述并点击生成按钮")

with tab3:
    st.header("知识库管理")
    
    st.info("""
    知识库当前包含的历史文档和图像:
    - 使用左侧边栏上传新的文档或图片
    - 系统会自动提取文本内容并建立索引
    - 上传的文档将用于增强对话系统的回答准确性
    """)
    
    # 显示知识库统计信息
    if st.button("刷新知识库状态"):
        try:
            # 这里可以添加获取知识库统计信息的API端点
            st.write("知识库统计信息功能待实现")
        except:
            st.error("无法获取知识库状态")
    
    # 知识库搜索功能
    search_query = st.text_input("搜索知识库")
    if search_query:
        with st.spinner("搜索中..."):
            try:
                # 这里可以添加搜索知识库的API端点
                st.write("知识库搜索功能待实现")
            except:
                st.error("搜索失败")

if __name__ == "__main__":
    st.run()

系统部署与优化

生产环境部署

Docker容器化部署

创建Dockerfile:

FROM python:3.9-slim

WORKDIR /app

# 安装系统依赖
RUN apt-get update && apt-get install -y \
    poppler-utils \
    tesseract-ocr \
    tesseract-ocr-chi-sim \
    libgl1 \
    libglib2.0-0 \
    && rm -rf /var/lib/apt/lists/*

# 复制项目文件
COPY requirements.txt .
RUN pip install -r requirements.txt

COPY . .

# 创建必要的目录
RUN mkdir -p uploads generated_images chroma_db

EXPOSE 8000

CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

创建docker-compose.yml:

version: '3.8'

services:
  historical-agent:
    build: .
    ports:
      - "8000:8000"
    volumes:
      - ./uploads:/app/uploads
      - ./generated_images:/app/generated_images
      - ./chroma_db:/app/chroma_db
    environment:
      - OPENAI_API_KEY=${OPENAI_API_KEY}
    restart: unless-stopped

  redis:
    image: redis:alpine
    ports:
      - "6379:6379"
    restart: unless-stopped

  celery-worker:
    build: .
    command: celery -A main.celery_app worker --loglevel=info
    volumes:
      - ./uploads:/app/uploads
      - ./generated_images:/app/generated_images
      - ./chroma_db:/app/chroma_db
    environment:
      - OPENAI_API_KEY=${OPENAI_API_KEY}
    depends_on:
      - redis
      - historical-agent
    restart: unless-stopped

  streamlit-ui:
    build: .
    command: streamlit run frontend.py --server.port 8501 --server.address 0.0.0.0
    ports:
      - "8501:8501"
    volumes:
      - ./uploads:/app/uploads
      - ./generated_images:/app/generated_images
      - ./chroma_db:/app/chroma_db
    environment:
      - API_BASE=http://historical-agent:8000
    depends_on:
      - historical-agent
    restart: unless-stopped

性能优化策略

知识库查询优化
class OptimizedDocumentProcessor(DocumentProcessor):
    def __init__(self, persist_directory: str = "./chroma_db", 
                 cache_size: int = 1000):
        super().__init__(persist_directory)
        self.cache = {}
        self.cache_size = cache_size
        self.query_cache = {}
    
    def query_documents(self, query: str, n_results: int = 5) -> List[Dict]:
        """带缓存的文档查询"""
        # 生成缓存键
        cache_key = f"{query}_{n_results}"
        
        # 检查缓存
        if cache_key in self.query_cache:
            return self.query_cache[cache_key]
        
        # 执行查询
        results = super().query_documents(query, n_results)
        
        # 更新缓存
        if len(self.query_cache) >= self.cache_size:
            # 移除最旧的条目
            oldest_key = next(iter(self.query_cache))
            del self.query_cache[oldest_key]
        
        self.query_cache[cache_key] = results
        return results
    
    def prewarm_cache(self, common_queries: List[str]):
        """预热缓存"""
        for query in common_queries:
            self.query_documents(query)
图像生成优化
class OptimizedImageGenerator(HistoricalImageGenerator):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.prompt_cache = {}
    
    def generate_from_description(self, description: str, 
                                 additional_context: str = "",
                                 width: int = 512, 
                                 height: int = 512,
                                 num_inference_steps: int = 30,
                                 guidance_scale: float = 7.5) -> Image.Image:
        """带提示词优化的图像生成"""
        
        # 生成缓存键
        cache_key = f"{description}_{additional_context}_{width}_{height}"
        
        # 检查缓存
        if cache_key in self.prompt_cache:
            return self.prompt_cache[cache_key]
        
        # 优化提示词
        optimized_prompt = self._optimize_prompt(description, additional_context)
        
        # 生成图像
        result = self.txt2img_pipe(
            prompt=optimized_prompt,
            negative_prompt=self.negative_prompt,
            width=width,
            height=height,
            num_inference_steps=num_inference_steps,
            guidance_scale=guidance_scale,
            num_images_per_prompt=1
        )
        
        image = result.images[0]
        
        # 更新缓存
        if len(self.prompt_cache) >= 100:  # 限制缓存大小
            oldest_key = next(iter(self.prompt_cache))
            del self.prompt_cache[oldest_key]
        
        self.prompt_cache[cache_key] = image
        return image
    
    def _optimize_prompt(self, description: str, context: str) -> str:
        """优化提示词以提高历史准确性"""
        # 这里可以实现提示词优化逻辑
        # 例如添加时代特定的关键词,增强历史准确性
        
        era_keywords = self._detect_historical_era(description)
        material_keywords = self._detect_materials(description)
        
        optimized_prompt = f"{description}. {context}. {self.preservation_prompt}"
        
        if era_keywords:
            optimized_prompt += f", {era_keywords}"
        if material_keywords:
            optimized_prompt += f", {material_keywords}"
        
        return optimized_prompt
    
    def _detect_historical_era(self, description: str) -> str:
        """检测历史时代关键词"""
        # 简化版实现
        era_mapping = {
            "ancient": ["ancient", "egypt", "roman", "greek", "mesopotamia"],
            "medieval": ["medieval", "middle ages", "gothic", "crusade"],
            "renaissance": ["renaissance", "baroque", "reformation"],
            "modern": ["modern", "contemporary", "20th", "21st"]
        }
        
        description_lower = description.lower()
        for era, keywords in era_mapping.items():
            if any(keyword in description_lower for keyword in keywords):
                return f"{era} era, historically accurate"
        
        return ""
    
    def _detect_materials(self, description: str) -> str:
        """检测材料关键词"""
        materials = [
            "clay", "ceramic", "porcelain", "stone", "marble", "granite",
            "bronze", "iron", "gold", "silver", "wood", "ivory", "paper"
        ]
        
        description_lower = description.lower()
        detected_materials = []
        
        for material in materials:
            if material in description_lower:
                detected_materials.append(material)
        
        if detected_materials:
            return f"made of {', '.join(detected_materials)}"
        
        return ""

测试与验证

单元测试

import unittest
from unittest.mock import Mock, patch
import tempfile
import os

class TestHistoricalAgent(unittest.TestCase):
    def setUp(self):
        """设置测试环境"""
        self.test_dir = tempfile.mkdtemp()
        self.doc_processor = DocumentProcessor(
            persist_directory=os.path.join(self.test_dir, "test_db")
        )
    
    def test_document_processing(self):
        """测试文档处理功能"""
        # 创建测试PDF
        test_pdf_path = os.path.join(self.test_dir, "test.pdf")
        # 这里应该添加创建测试PDF的代码
        
        # 测试文档添加
        success = self.doc_processor.add_documents(test_pdf_path, "pdf")
        self.assertTrue(success)
        
        # 测试查询
        results = self.doc_processor.query_documents("test query", n_results=1)
        self.assertIsInstance(results, list)
    
    @patch('main.openai.ChatCompletion.create')
    def test_dialogue_generation(self, mock_openai):
        """测试对话生成功能"""
        # 模拟OpenAI响应
        mock_response = Mock()
        mock_response.choices = [Mock()]
        mock_response.choices[0].message.content = "Test response"
        mock_openai.return_value = mock_response
        
        dialogue_manager = DialogueManager(
            knowledge_base=self.doc_processor,
            use_openai=True,
            openai_api_key="test_key"
        )
        
        response = dialogue_manager.generate_response("Test question")
        self.assertEqual(response, "Test response")
    
    def tearDown(self):
        """清理测试环境"""
        import shutil
        shutil.rmtree(self.test_dir)

if __name__ == '__main__':
    unittest.main()

集成测试

import pytest
from fastapi.testclient import TestClient
from main import app

client = TestClient(app)

def test_health_check():
    """测试健康检查端点"""
    response = client.get("/health")
    assert response.status_code == 200
    assert "status" in response.json()

def test_query_endpoint():
    """测试查询端点"""
    response = client.post("/query", json={
        "question": "Test question",
        "use_knowledge_base": True
    })
    assert response.status_code in [200, 503]  # 200成功或503服务不可用

def test_image_generation():
    """测试图像生成端点"""
    response = client.post("/generate_image", json={
        "description": "Ancient Greek pottery",
        "context": "Red figure style",
        "width": 256,
        "height": 256
    })
    assert response.status_code in [200, 503]

结论与未来工作

本文详细介绍了基于Python的专业历史知识智能体系统的设计与实现。该系统能够处理历史文档和图片,实现精确对话功能,并能根据知识库生成高精度的历史文物海报或图片。

系统特点

  1. 知识整合能力强:能够处理多种格式的历史文档和图片,建立综合知识库
  2. 对话准确性高:结合检索增强生成(RAG)技术,确保回答的历史准确性
  3. 图像生成保真:通过专门的工作流确保生成的文物图像不偏离历史事实
  4. 系统可扩展性好:采用模块化设计,易于扩展新功能
  5. 部署灵活:支持多种部署方式,从本地开发到生产环境

未来改进方向

  1. 多模态模型集成:集成更先进的多模态大模型,提高图像理解和生成能力
  2. 专业知识验证:与历史学家合作建立更完善的历史准确性验证机制
  3. 多语言支持:扩展对多种历史文献语言的支持
  4. 实时协作功能:添加多用户协作和注释功能
  5. 移动端优化:开发移动应用,支持现场文物识别和查询

这个系统为历史研究、教育和文化遗产保护提供了强大的工具,有望在专业和历史爱好者社区中得到广泛应用。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐