上一篇我们搞定了 “检索的核心技术”,但在检索之前,还有一个关键环节:如何把你的业务文档(PDF / 图片 / Word)变成可检索的向量索引

这个过程涉及 3 个核心步骤:

  1. 用 MinIO 存储原始文档(避免本地文件丢失);
  2. 用 OCR 处理扫描件 / 图片(提取文字);
  3. 文本分块(适配大模型上下文窗口)+ 向量入库(Milvus+PostgreSQL)。

今天我手把手实现这个全链路。

一、MinIO:分布式对象存储,原始文档的 “安全保险柜”

首先我们要知道MinIO 是 RAG 系统的 “原始文件仓库”,负责存储所有未经处理的文档(包括可直接解析的 Word/PDF,和需要 OCR 的扫描件 / 图片)。它的优势在于 “高可用 + 可扩展”,比本地文件夹更适合企业级场景。

1. MinIO 的核心功能

  • 支持所有文件类型:无论是 100MB 的 PDF,还是 5MB 的图片,都能稳定存储;
  • 生成唯一 URL:每个文件上传后会生成一个可访问的 URL,方便后续 OCR / 解析时调用;
  • 兼容 S3 协议:可无缝对接 AWS S3,也支持本地部署(成本低);
  • 权限管理:可设置桶(Bucket)的读写权限,避免文档泄露。

2. MinIO 部署与文件上传(实操)

步骤 1:本地部署 MinIO(Docker 方式)

# 1. 启动MinIO服务(设置用户名minioadmin,密码minioadmin,挂载本地data目录)
docker run -p 9000:9000 -p 9001:9001 --name minio \
  -v /home/yourname/minio_data:/data \
  -e "MINIO_ROOT_USER=minioadmin" \
  -e "MINIO_ROOT_PASSWORD=minioadmin" \
  minio/minio server /data --console-address ":9001"

# 2. 访问MinIO控制台:浏览器打开http://localhost:9001,用上述账号登录

步骤 2:创建存储桶(Bucket)

  1. 登录 MinIO 控制台→点击 “Create a Bucket”→命名为rag-documents(用于存储 RAG 的所有文档);
  2. 关闭 “Bucket Versioning”(测试阶段无需版本控制)→点击 “Create Bucket”。

步骤 3:MinIO 文件上传代码

minio_utils.py(用于封装 MinIO 操作)

# minio_utils.py:MinIO工具类
from minio import Minio
from minio.error import S3Error
from .config import rag_config  # 从配置读取MinIO参数

class MinIOUtils:
    def __init__(self):
        # 初始化MinIO客户端(从配置读取地址、账号、密码)
        self.client = Minio(
            endpoint=rag_config.minio_endpoint,  # 如"localhost:9000"
            access_key=rag_config.minio_access_key,  # 如"minioadmin"
            secret_key=rag_config.minio_secret_key,  # 如"minioadmin"
            secure=False  # 本地部署用HTTP,False;线上用HTTPS,True
        )
        self.bucket_name = rag_config.minio_bucket  # 如"rag-documents"

    def upload_file(self, local_file_path: str, remote_file_name: str) -> str:
        """
        上传本地文件到MinIO:
        local_file_path:本地文件路径(如"./temp/scan.pdf")
        remote_file_name:MinIO中的文件名(如"2024/05/scan_123.pdf")
        返回:文件在MinIO中的URL
        """
        try:
            # 检查桶是否存在,不存在则创建
            if not self.client.bucket_exists(self.bucket_name):
                self.client.make_bucket(self.bucket_name)
            
            # 上传文件(content_type自动识别)
            self.client.fput_object(
                bucket_name=self.bucket_name,
                object_name=remote_file_name,
                file_path=local_file_path
            )
            
            # 生成文件URL(有效期7天,可自定义)
            presigned_url = self.client.presigned_get_object(
                bucket_name=self.bucket_name,
                object_name=remote_file_name,
                expires=60*60*24*7  # 7天有效期
            )
            return presigned_url
        
        except S3Error as e:
            print(f"MinIO上传失败:{e}")
            raise

# 测试上传:把本地的扫描PDF上传到MinIO
if __name__ == "__main__":
    minio_util = MinIOUtils()
    local_path = "./scan_device_manual.pdf"  # 本地文件
    remote_name = f"device_manual/scan_{datetime.now().strftime('%Y%m%d%H%M%S')}.pdf"  # 远程文件名(加时间戳避免重复)
    file_url = minio_util.upload_file(local_path, remote_name)
    print(f"MinIO文件URL:{file_url}")  # 输出如"http://localhost:9000/rag-documents/device_manual/scan_20240520143000.pdf"

二、OCR 处理:让扫描件 / 图片 “说话” 的关键

如果用户上传的是扫描 PDF(本质是图片集合)或 JPG 图片,直接解析会得到空文本 —— 这时候必须用 OCR(光学字符识别)技术提取文字。我们以 PaddleOCR 为例(中文识别精度更高)。

1. OCR 的核心流程

  1. 触发条件:系统检测到文件类型是 “image/jpeg”“application/pdf(扫描件)” 时,自动调用 OCR;
  2. 处理步骤
    • 扫描 PDF→拆分成单张图片;
    • 图片→OCR 引擎→识别文字;
    • 输出纯文本 + 文字位置信息(可选);
  3. 下游衔接:OCR 输出的文本,会进入 “文本分块” 环节,后续流程与纯文本文档一致。

2. PaddleOCR 安装与集成(实操)

步骤 1:安装 PaddleOCR 依赖

# 1. 安装PaddlePaddle(CPU版本,适合本地测试)
pip install paddlepaddle==2.5.2 -i https://pypi.tuna.tsinghua.edu.cn/simple

# 2. 安装PaddleOCR
pip install paddleocr==2.7.0

# 3. 安装PDF处理依赖(用于拆分扫描PDF)
pip install PyPDF2==3.0.1

步骤 2:OCR 工具类( ocr.py)

ocr.py封装了 “图片 OCR” 和 “扫描 PDF OCR” 两个方法,核心代码如下:

# ocr.py:OCR工具类(基于PaddleOCR)
from paddleocr import PaddleOCR
from PyPDF2 import PdfReader
from PIL import Image
import os
import tempfile

class OCRUtils:
    def __init__(self):
        # 初始化PaddleOCR(设置语言为中文,使用CPU)
        self.ocr = PaddleOCR(
            lang="ch",  # 中文识别
            use_angle_cls=True,  # 自动纠正文字方向(如倒置的文本)
            use_gpu=False  # 本地测试用CPU,有GPU可设为True
        )

    def ocr_image(self, image_path: str) -> str:
        """识别单张图片中的文字"""
        result = self.ocr.ocr(image_path, cls=True)  # cls=True:方向检测
        # 提取文字(result是嵌套列表,需遍历解析)
        text = ""
        for line in result:
            if line:  # 过滤空结果
                for word_info in line:
                    text += word_info[1][0] + "\n"  # word_info[1][0]是识别到的文字
        return text.strip()

    def ocr_scan_pdf(self, pdf_path: str) -> str:
        """识别扫描PDF中的文字(先拆分成图片)"""
        reader = PdfReader(pdf_path)
        total_text = ""
        # 创建临时目录存储拆分后的图片
        with tempfile.TemporaryDirectory() as temp_dir:
            for page_num, page in enumerate(reader.pages):
                # 把PDF页转为图片(需安装pdf2image:pip install pdf2image)
                from pdf2image import convert_from_path
                images = convert_from_path(pdf_path, first_page=page_num+1, last_page=page_num+1)
                for img in images:
                    img_path = os.path.join(temp_dir, f"page_{page_num+1}.jpg")
                    img.save(img_path)
                    # 调用图片OCR
                    page_text = self.ocr_image(img_path)
                    total_text += f"【第{page_num+1}页】\n{page_text}\n\n"
        return total_text.strip()

# 测试OCR:处理扫描PDF
if __name__ == "__main__":
    ocr_util = OCRUtils()
    # 1. 处理图片
    img_text = ocr_util.ocr_image("./device_image.jpg")
    print(f"图片OCR结果:{img_text}")
    
    # 2. 处理扫描PDF
    pdf_text = ocr_util.ocr_scan_pdf("./scan_device_manual.pdf")
    print(f"扫描PDF OCR结果(前500字):{pdf_text[:500]}")

3. OCR 结果的质量控制

  • 多引擎对比:如果 PaddleOCR 识别精度不够,可集成 Tesseract(需安装 tesseract-ocr 引擎),用 “双引擎交叉验证”;
  • 文字清洗:OCR 结果可能包含乱码(如 “100MPa” 识别为 “100MPα”),需用正则表达式过滤:
    import re
    def clean_ocr_text(text: str) -> str:
        # 保留中文、英文、数字、常见符号(如MPa、%)
        cleaned = re.sub(r"[^\u4e00-\u9fa5a-zA-Z0-9\.\,\:\;\%\-\_MPa]", "", text)
        # 去除多余空行
        cleaned = re.sub(r"\n+", "\n", cleaned)
        return cleaned.strip()
    

三、文本分块:适配大模型上下文的 “黄金分割”

即使是 OCR 提取的纯文本,也不能直接向量化 —— 如果文档是 1 万字的手册,直接转向量会丢失局部语义(比如 “维护周期” 和 “工作压力” 混在一起)。必须将文本拆成 “小块”,这就是 “文本分块”。

1. 分块的核心原则

  • 语义完整性:拆分后的块要保持 “完整语义”,比如一个段落、一个小标题下的内容,不能把 “设备最大工作压力 100MPa” 拆成 “设备最大工作压力” 和 “100MPa”;
  • 适配上下文窗口:块大小不能超过大模型的上下文限制(如 GPT-3.5 是 4096token,约 3000 汉字),建议设为 “512~1024 字符”;
  • 重叠度:块之间保留 10%~20% 的重叠(如块 1 结尾是 “设备的维护流程”,块 2 开头重复 “维护流程”),避免拆分导致的语义断裂。

2. 分块工具:LlamaIndex 的 SentenceSplitter( base_rag.py )

base_rag.py用了 LlamaIndex 的SentenceSplitter,支持按 “句子边界” 智能分块,核心代码如下:

# base_rag.py:文本分块逻辑
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core import Document

def split_text_into_chunks(documents: list[Document]) -> list[Document]:
    """
    文本分块:
    documents:OCR后的文档列表(每个Document含text和metadata)
    返回:分块后的文档节点列表
    """
    # 初始化分块器
    splitter = SentenceSplitter(
        chunk_size=512,  # 每个块512字符(可根据模型调整)
        chunk_overlap=50,  # 块之间重叠50字符(避免语义断裂)
        separator="\n",  # 按换行符分割(优先保持段落完整)
        paragraph_separator="\n\n",  # 按空行识别段落
        secondary_chunking_regex="([\u4e00-\u9fa5。,;!?])"  # 中文标点辅助分割
    )

    # 执行分块:将Document拆成Node(节点,含分块文本和元数据)
    nodes = splitter.get_nodes_from_documents(documents)
    
    # 给每个节点添加分块序号(方便溯源)
    for i, node in enumerate(nodes):
        node.metadata["chunk_index"] = i  # 分块序号
        node.metadata["total_chunks"] = len(nodes)  # 总块数
    
    return nodes

# 测试分块:OCR后的文本拆成块
if __name__ == "__main__":
    from ocr import OCRUtils
    ocr_util = OCRUtils()
    # 1. OCR获取文本
    pdf_text = ocr_util.ocr_scan_pdf("./scan_device_manual.pdf")
    # 2. 封装成Document对象
    doc = Document(text=pdf_text, metadata={"file_path": "scan_device_manual.pdf", "page_num": 1})
    # 3. 分块
    nodes = split_text_into_chunks([doc])
    print(f"原文本长度:{len(pdf_text)}字符")
    print(f"分块数量:{len(nodes)}")
    print(f"第1块文本:{nodes[0].text[:200]}...")
    print(f"第1块元数据:{nodes[0].metadata}")  # 包含file_path、chunk_index等

四、全链路串联:从 “上传文件” 到 “向量入库”

现在我们把 MinIO、OCR、分块、Milvus/PostgreSQL 串联起来,实现 “一键上传文档→自动生成索引” 的流程,来自base_rag.py

# base_rag.py:文档处理全链路
from .minio_utils import MinIOUtils
from .ocr import OCRUtils
from .milvus_utils import MilvusUtils
from .postgresql_utils import PostgreSQLUtils
from .config import rag_config

class RAGDocumentProcessor:
    def __init__(self):
        self.minio_util = MinIOUtils()
        self.ocr_util = OCRUtils()
        self.milvus_util = MilvusUtils()
        self.pg_util = PostgreSQLUtils()

    async def process_document(self, local_file_path: str) -> str:
        """
        文档处理全链路:
        1. 上传到MinIO → 2. OCR提取文本 → 3. 文本分块 → 4. 向量化+入库
        返回:索引ID(用于后续查询)
        """
        # 1. 上传到MinIO
        remote_file_name = f"processed/{os.path.basename(local_file_path).split('.')[0]}_{datetime.now().strftime('%Y%m%d%H%M%S')}.{local_file_path.split('.')[-1]}"
        minio_url = self.minio_util.upload_file(local_file_path, remote_file_name)
        print(f"MinIO存储完成:{minio_url}")

        # 2. 判断文件类型,决定是否OCR
        file_ext = local_file_path.split(".")[-1].lower()
        if file_ext in ["jpg", "jpeg", "png"] or (file_ext == "pdf" and self._is_scan_pdf(local_file_path)):
            # 扫描件/图片:调用OCR
            if file_ext == "pdf":
                text = self.ocr_util.ocr_scan_pdf(local_file_path)
            else:
                text = self.ocr_util.ocr_image(local_file_path)
        else:
            # 纯文本文档(如Word、普通PDF):直接解析
            from llama_index.readers.file import PDFReader, DocxReader
            if file_ext == "pdf":
                reader = PDFReader()
            elif file_ext in ["docx", "doc"]:
                reader = DocxReader()
            text = "\n".join([d.text for d in reader.load_data(local_file_path)])
        
        # 3. 文本清洗与分块
        cleaned_text = clean_ocr_text(text)  # 之前定义的清洗函数
        doc = Document(text=cleaned_text, metadata={"file_path": minio_url, "file_type": file_ext})
        nodes = split_text_into_chunks([doc])
        print(f"文本分块完成:{len(nodes)}个块")

        # 4. 向量化+Milvus/PostgreSQL入库
        # 4.1 向量化(用全局配置的BGE模型)
        from llama_index.core import Settings
        for node in nodes:
            node.embedding = Settings.embed_model.get_text_embedding(node.text)
        
        # 4.2 存入Milvus(向量)和PostgreSQL(原文+元数据)
        index_id = await self.milvus_util.insert_vectors([node.embedding for node in nodes])
        # 批量插入PostgreSQL
        pg_data = [
            {
                "node_id": index_id + f"_{node.metadata['chunk_index']}",
                "text": node.text,
                "file_path": node.metadata["file_path"],
                "chunk_index": node.metadata["chunk_index"],
                "total_chunks": node.metadata["total_chunks"]
            }
            for node in nodes
        ]
        self.pg_util.bulk_insert(pg_data)
        print(f"向量入库完成:索引ID={index_id}")

        return index_id

    def _is_scan_pdf(self, pdf_path: str) -> bool:
        """判断PDF是否为扫描件(无文本层)"""
        reader = PdfReader(pdf_path)
        try:
            # 尝试提取第一页文本,若为空则视为扫描件
            first_page_text = reader.pages[0].extract_text()
            return len(first_page_text.strip()) == 0
        except:
            return True

# 测试全链路:上传一个扫描PDF并处理
if __name__ == "__main__":
    processor = RAGDocumentProcessor()
    index_id = asyncio.run(processor.process_document("./scan_device_manual.pdf"))
    print(f"全链路处理完成,索引ID:{index_id}")

小结

这一篇我们实现了 RAG 的 “数据准备阶段”:用 MinIO 保障原始文档安全,用 OCR 突破非文本限制,用智能分块保留语义完整性,最后将向量和元数据分别存入 Milvus 和 PostgreSQL。下一篇我们会进入 “查询阶段”:如何用 Chainlit 搭建 Web 界面,实现 “用户提问→流式返回答案” 的完整交互。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐