三、从 MinIO 存储到 OCR 提取,再到向量索引生成
本文手把手实现了RAG系统中的文档处理全链路,包含三个核心环节:首先使用MinIO分布式存储作为文档"保险柜",支持多格式文件存储和高可用访问;其次通过PaddleOCR处理扫描件/图片,提取文字内容并清洗;最后采用智能分块技术将文本切割为适配大模型窗口的语义单元。系统实现了从文档上传、OCR识别、文本分块到向量化入库的自动化流程,为后续检索环节提供结构化数据支持。关键技术包括
上一篇我们搞定了 “检索的核心技术”,但在检索之前,还有一个关键环节:如何把你的业务文档(PDF / 图片 / Word)变成可检索的向量索引?
这个过程涉及 3 个核心步骤:
- 用 MinIO 存储原始文档(避免本地文件丢失);
- 用 OCR 处理扫描件 / 图片(提取文字);
- 文本分块(适配大模型上下文窗口)+ 向量入库(Milvus+PostgreSQL)。
今天我手把手实现这个全链路。
一、MinIO:分布式对象存储,原始文档的 “安全保险柜”
首先我们要知道MinIO 是 RAG 系统的 “原始文件仓库”,负责存储所有未经处理的文档(包括可直接解析的 Word/PDF,和需要 OCR 的扫描件 / 图片)。它的优势在于 “高可用 + 可扩展”,比本地文件夹更适合企业级场景。
1. MinIO 的核心功能
- 支持所有文件类型:无论是 100MB 的 PDF,还是 5MB 的图片,都能稳定存储;
- 生成唯一 URL:每个文件上传后会生成一个可访问的 URL,方便后续 OCR / 解析时调用;
- 兼容 S3 协议:可无缝对接 AWS S3,也支持本地部署(成本低);
- 权限管理:可设置桶(Bucket)的读写权限,避免文档泄露。
2. MinIO 部署与文件上传(实操)
步骤 1:本地部署 MinIO(Docker 方式)
# 1. 启动MinIO服务(设置用户名minioadmin,密码minioadmin,挂载本地data目录)
docker run -p 9000:9000 -p 9001:9001 --name minio \
-v /home/yourname/minio_data:/data \
-e "MINIO_ROOT_USER=minioadmin" \
-e "MINIO_ROOT_PASSWORD=minioadmin" \
minio/minio server /data --console-address ":9001"
# 2. 访问MinIO控制台:浏览器打开http://localhost:9001,用上述账号登录
步骤 2:创建存储桶(Bucket)
- 登录 MinIO 控制台→点击 “Create a Bucket”→命名为
rag-documents
(用于存储 RAG 的所有文档); - 关闭 “Bucket Versioning”(测试阶段无需版本控制)→点击 “Create Bucket”。
步骤 3:MinIO 文件上传代码
minio_utils.py
(用于封装 MinIO 操作)
# minio_utils.py:MinIO工具类
from minio import Minio
from minio.error import S3Error
from .config import rag_config # 从配置读取MinIO参数
class MinIOUtils:
def __init__(self):
# 初始化MinIO客户端(从配置读取地址、账号、密码)
self.client = Minio(
endpoint=rag_config.minio_endpoint, # 如"localhost:9000"
access_key=rag_config.minio_access_key, # 如"minioadmin"
secret_key=rag_config.minio_secret_key, # 如"minioadmin"
secure=False # 本地部署用HTTP,False;线上用HTTPS,True
)
self.bucket_name = rag_config.minio_bucket # 如"rag-documents"
def upload_file(self, local_file_path: str, remote_file_name: str) -> str:
"""
上传本地文件到MinIO:
local_file_path:本地文件路径(如"./temp/scan.pdf")
remote_file_name:MinIO中的文件名(如"2024/05/scan_123.pdf")
返回:文件在MinIO中的URL
"""
try:
# 检查桶是否存在,不存在则创建
if not self.client.bucket_exists(self.bucket_name):
self.client.make_bucket(self.bucket_name)
# 上传文件(content_type自动识别)
self.client.fput_object(
bucket_name=self.bucket_name,
object_name=remote_file_name,
file_path=local_file_path
)
# 生成文件URL(有效期7天,可自定义)
presigned_url = self.client.presigned_get_object(
bucket_name=self.bucket_name,
object_name=remote_file_name,
expires=60*60*24*7 # 7天有效期
)
return presigned_url
except S3Error as e:
print(f"MinIO上传失败:{e}")
raise
# 测试上传:把本地的扫描PDF上传到MinIO
if __name__ == "__main__":
minio_util = MinIOUtils()
local_path = "./scan_device_manual.pdf" # 本地文件
remote_name = f"device_manual/scan_{datetime.now().strftime('%Y%m%d%H%M%S')}.pdf" # 远程文件名(加时间戳避免重复)
file_url = minio_util.upload_file(local_path, remote_name)
print(f"MinIO文件URL:{file_url}") # 输出如"http://localhost:9000/rag-documents/device_manual/scan_20240520143000.pdf"
二、OCR 处理:让扫描件 / 图片 “说话” 的关键
如果用户上传的是扫描 PDF(本质是图片集合)或 JPG 图片,直接解析会得到空文本 —— 这时候必须用 OCR(光学字符识别)技术提取文字。我们以 PaddleOCR 为例(中文识别精度更高)。
1. OCR 的核心流程
- 触发条件:系统检测到文件类型是 “image/jpeg”“application/pdf(扫描件)” 时,自动调用 OCR;
- 处理步骤:
- 扫描 PDF→拆分成单张图片;
- 图片→OCR 引擎→识别文字;
- 输出纯文本 + 文字位置信息(可选);
- 下游衔接:OCR 输出的文本,会进入 “文本分块” 环节,后续流程与纯文本文档一致。
2. PaddleOCR 安装与集成(实操)
步骤 1:安装 PaddleOCR 依赖
# 1. 安装PaddlePaddle(CPU版本,适合本地测试)
pip install paddlepaddle==2.5.2 -i https://pypi.tuna.tsinghua.edu.cn/simple
# 2. 安装PaddleOCR
pip install paddleocr==2.7.0
# 3. 安装PDF处理依赖(用于拆分扫描PDF)
pip install PyPDF2==3.0.1
步骤 2:OCR 工具类( ocr.py)
ocr.py
封装了 “图片 OCR” 和 “扫描 PDF OCR” 两个方法,核心代码如下:
# ocr.py:OCR工具类(基于PaddleOCR)
from paddleocr import PaddleOCR
from PyPDF2 import PdfReader
from PIL import Image
import os
import tempfile
class OCRUtils:
def __init__(self):
# 初始化PaddleOCR(设置语言为中文,使用CPU)
self.ocr = PaddleOCR(
lang="ch", # 中文识别
use_angle_cls=True, # 自动纠正文字方向(如倒置的文本)
use_gpu=False # 本地测试用CPU,有GPU可设为True
)
def ocr_image(self, image_path: str) -> str:
"""识别单张图片中的文字"""
result = self.ocr.ocr(image_path, cls=True) # cls=True:方向检测
# 提取文字(result是嵌套列表,需遍历解析)
text = ""
for line in result:
if line: # 过滤空结果
for word_info in line:
text += word_info[1][0] + "\n" # word_info[1][0]是识别到的文字
return text.strip()
def ocr_scan_pdf(self, pdf_path: str) -> str:
"""识别扫描PDF中的文字(先拆分成图片)"""
reader = PdfReader(pdf_path)
total_text = ""
# 创建临时目录存储拆分后的图片
with tempfile.TemporaryDirectory() as temp_dir:
for page_num, page in enumerate(reader.pages):
# 把PDF页转为图片(需安装pdf2image:pip install pdf2image)
from pdf2image import convert_from_path
images = convert_from_path(pdf_path, first_page=page_num+1, last_page=page_num+1)
for img in images:
img_path = os.path.join(temp_dir, f"page_{page_num+1}.jpg")
img.save(img_path)
# 调用图片OCR
page_text = self.ocr_image(img_path)
total_text += f"【第{page_num+1}页】\n{page_text}\n\n"
return total_text.strip()
# 测试OCR:处理扫描PDF
if __name__ == "__main__":
ocr_util = OCRUtils()
# 1. 处理图片
img_text = ocr_util.ocr_image("./device_image.jpg")
print(f"图片OCR结果:{img_text}")
# 2. 处理扫描PDF
pdf_text = ocr_util.ocr_scan_pdf("./scan_device_manual.pdf")
print(f"扫描PDF OCR结果(前500字):{pdf_text[:500]}")
3. OCR 结果的质量控制
- 多引擎对比:如果 PaddleOCR 识别精度不够,可集成 Tesseract(需安装 tesseract-ocr 引擎),用 “双引擎交叉验证”;
- 文字清洗:OCR 结果可能包含乱码(如 “100MPa” 识别为 “100MPα”),需用正则表达式过滤:
import re def clean_ocr_text(text: str) -> str: # 保留中文、英文、数字、常见符号(如MPa、%) cleaned = re.sub(r"[^\u4e00-\u9fa5a-zA-Z0-9\.\,\:\;\%\-\_MPa]", "", text) # 去除多余空行 cleaned = re.sub(r"\n+", "\n", cleaned) return cleaned.strip()
三、文本分块:适配大模型上下文的 “黄金分割”
即使是 OCR 提取的纯文本,也不能直接向量化 —— 如果文档是 1 万字的手册,直接转向量会丢失局部语义(比如 “维护周期” 和 “工作压力” 混在一起)。必须将文本拆成 “小块”,这就是 “文本分块”。
1. 分块的核心原则
- 语义完整性:拆分后的块要保持 “完整语义”,比如一个段落、一个小标题下的内容,不能把 “设备最大工作压力 100MPa” 拆成 “设备最大工作压力” 和 “100MPa”;
- 适配上下文窗口:块大小不能超过大模型的上下文限制(如 GPT-3.5 是 4096token,约 3000 汉字),建议设为 “512~1024 字符”;
- 重叠度:块之间保留 10%~20% 的重叠(如块 1 结尾是 “设备的维护流程”,块 2 开头重复 “维护流程”),避免拆分导致的语义断裂。
2. 分块工具:LlamaIndex 的 SentenceSplitter( base_rag.py )
base_rag.py
用了 LlamaIndex 的SentenceSplitter
,支持按 “句子边界” 智能分块,核心代码如下:
# base_rag.py:文本分块逻辑
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core import Document
def split_text_into_chunks(documents: list[Document]) -> list[Document]:
"""
文本分块:
documents:OCR后的文档列表(每个Document含text和metadata)
返回:分块后的文档节点列表
"""
# 初始化分块器
splitter = SentenceSplitter(
chunk_size=512, # 每个块512字符(可根据模型调整)
chunk_overlap=50, # 块之间重叠50字符(避免语义断裂)
separator="\n", # 按换行符分割(优先保持段落完整)
paragraph_separator="\n\n", # 按空行识别段落
secondary_chunking_regex="([\u4e00-\u9fa5。,;!?])" # 中文标点辅助分割
)
# 执行分块:将Document拆成Node(节点,含分块文本和元数据)
nodes = splitter.get_nodes_from_documents(documents)
# 给每个节点添加分块序号(方便溯源)
for i, node in enumerate(nodes):
node.metadata["chunk_index"] = i # 分块序号
node.metadata["total_chunks"] = len(nodes) # 总块数
return nodes
# 测试分块:OCR后的文本拆成块
if __name__ == "__main__":
from ocr import OCRUtils
ocr_util = OCRUtils()
# 1. OCR获取文本
pdf_text = ocr_util.ocr_scan_pdf("./scan_device_manual.pdf")
# 2. 封装成Document对象
doc = Document(text=pdf_text, metadata={"file_path": "scan_device_manual.pdf", "page_num": 1})
# 3. 分块
nodes = split_text_into_chunks([doc])
print(f"原文本长度:{len(pdf_text)}字符")
print(f"分块数量:{len(nodes)}")
print(f"第1块文本:{nodes[0].text[:200]}...")
print(f"第1块元数据:{nodes[0].metadata}") # 包含file_path、chunk_index等
四、全链路串联:从 “上传文件” 到 “向量入库”
现在我们把 MinIO、OCR、分块、Milvus/PostgreSQL 串联起来,实现 “一键上传文档→自动生成索引” 的流程,来自base_rag.py
:
# base_rag.py:文档处理全链路
from .minio_utils import MinIOUtils
from .ocr import OCRUtils
from .milvus_utils import MilvusUtils
from .postgresql_utils import PostgreSQLUtils
from .config import rag_config
class RAGDocumentProcessor:
def __init__(self):
self.minio_util = MinIOUtils()
self.ocr_util = OCRUtils()
self.milvus_util = MilvusUtils()
self.pg_util = PostgreSQLUtils()
async def process_document(self, local_file_path: str) -> str:
"""
文档处理全链路:
1. 上传到MinIO → 2. OCR提取文本 → 3. 文本分块 → 4. 向量化+入库
返回:索引ID(用于后续查询)
"""
# 1. 上传到MinIO
remote_file_name = f"processed/{os.path.basename(local_file_path).split('.')[0]}_{datetime.now().strftime('%Y%m%d%H%M%S')}.{local_file_path.split('.')[-1]}"
minio_url = self.minio_util.upload_file(local_file_path, remote_file_name)
print(f"MinIO存储完成:{minio_url}")
# 2. 判断文件类型,决定是否OCR
file_ext = local_file_path.split(".")[-1].lower()
if file_ext in ["jpg", "jpeg", "png"] or (file_ext == "pdf" and self._is_scan_pdf(local_file_path)):
# 扫描件/图片:调用OCR
if file_ext == "pdf":
text = self.ocr_util.ocr_scan_pdf(local_file_path)
else:
text = self.ocr_util.ocr_image(local_file_path)
else:
# 纯文本文档(如Word、普通PDF):直接解析
from llama_index.readers.file import PDFReader, DocxReader
if file_ext == "pdf":
reader = PDFReader()
elif file_ext in ["docx", "doc"]:
reader = DocxReader()
text = "\n".join([d.text for d in reader.load_data(local_file_path)])
# 3. 文本清洗与分块
cleaned_text = clean_ocr_text(text) # 之前定义的清洗函数
doc = Document(text=cleaned_text, metadata={"file_path": minio_url, "file_type": file_ext})
nodes = split_text_into_chunks([doc])
print(f"文本分块完成:{len(nodes)}个块")
# 4. 向量化+Milvus/PostgreSQL入库
# 4.1 向量化(用全局配置的BGE模型)
from llama_index.core import Settings
for node in nodes:
node.embedding = Settings.embed_model.get_text_embedding(node.text)
# 4.2 存入Milvus(向量)和PostgreSQL(原文+元数据)
index_id = await self.milvus_util.insert_vectors([node.embedding for node in nodes])
# 批量插入PostgreSQL
pg_data = [
{
"node_id": index_id + f"_{node.metadata['chunk_index']}",
"text": node.text,
"file_path": node.metadata["file_path"],
"chunk_index": node.metadata["chunk_index"],
"total_chunks": node.metadata["total_chunks"]
}
for node in nodes
]
self.pg_util.bulk_insert(pg_data)
print(f"向量入库完成:索引ID={index_id}")
return index_id
def _is_scan_pdf(self, pdf_path: str) -> bool:
"""判断PDF是否为扫描件(无文本层)"""
reader = PdfReader(pdf_path)
try:
# 尝试提取第一页文本,若为空则视为扫描件
first_page_text = reader.pages[0].extract_text()
return len(first_page_text.strip()) == 0
except:
return True
# 测试全链路:上传一个扫描PDF并处理
if __name__ == "__main__":
processor = RAGDocumentProcessor()
index_id = asyncio.run(processor.process_document("./scan_device_manual.pdf"))
print(f"全链路处理完成,索引ID:{index_id}")
小结
这一篇我们实现了 RAG 的 “数据准备阶段”:用 MinIO 保障原始文档安全,用 OCR 突破非文本限制,用智能分块保留语义完整性,最后将向量和元数据分别存入 Milvus 和 PostgreSQL。下一篇我们会进入 “查询阶段”:如何用 Chainlit 搭建 Web 界面,实现 “用户提问→流式返回答案” 的完整交互。
更多推荐
所有评论(0)