【智能体学习】MinerU 集成与应用指南:从 API 调用到 LangGraph 节点
MinerU 是一款将复杂文档转化为机器可读格式的开源工具,能够从 PDF、图片、Office 文档中精准提取包括表格、公式、图表在内的多模态内容,并转换为高质量的 Markdown 和 JSON 结构化数据。相较于依赖 OCR 的传统方案,MinerU 拥有独立知识库级别的版面分析能力,能智能识别文档中各元素的所属关系,在保留原始结构方面表现突出,尤其适合处理学术论文、产品手册等复杂格式的文档。
本文系统整理 MinerU 精准解析 API 的调用流程、代码实现、常见问题及 LangGraph 节点集成,帮助你在 AI 应用中快速落地文档解析能力。你在阅读过程中发现的代码用法、API 结构等疑问,都已整理进相应章节的 Q&A 中,方便日后查阅。
📌 注意:MinerU 原 v2、v3 版本 API 已于 2025-01-17 停止服务,本文所有示例均基于 v4 版 API。如使用旧版,请更新 Token 并更换接口域名。
一、MinerU 简介与 API 选型
MinerU 提供两种文档解析 API,满足不同场景需求。
1. 两种 API 模式对比
| 对比维度 | 精准解析 API | Agent 轻量解析 API |
|---|---|---|
| 是否需要 Token | ✅ 需要 | ❌ 无需(IP 限频) |
| 接口地址 | /api/v4/extract/task 或 /api/v4/file-urls/batch |
/api/v1/agent/parse/url 或 /api/v1/agent/parse/file |
| 模型版本 | pipeline(默认)/ vlm(推荐)/ MinerU-HTML |
固定 pipeline 轻量模型 |
| 文件大小限制 | ≤ 200MB | ≤ 10MB |
| 页数限制 | ≤ 600 页 | ≤ 20 页 |
| 批量支持 | ✅ 支持(≤ 200 个) | ❌ 仅单文件 |
| 输出格式 | Zip 包(Markdown + JSON),可导出为 docx/html/latex | 仅 Markdown(CDN 链接) |
| 调用方式 | 异步(提交 → 轮询) | 异步(提交 → 轮询) |
选型建议:处理完整产品手册、学术论文等大型复杂文档时,推荐使用精准解析 API;快速测试或处理小文档,可选择Agent 轻量解析 API,无需 Token、免配置、开箱即用。
2. 申请 Token
- 访问 MinerU 官网,注册并登录。
- 进入“个人中心”或“API 管理”页面,生成专属 Token。
- Token 有效期为 14 天,过期需重新生成。
3. 核心限制提醒
根据官方文档,当前服务策略如下:
| 限制项 | 限制值 |
|---|---|
| 单文件大小 | ≤ 200MB |
| 单文件页数 | ≤ 600 页 |
| 高优先级解析额度 | 每日 2000 页(超出部分优先级降低) |
| 批量支持 | ≤ 200 个文件 |
| 提交任务频控 | 300 次/分钟 |
| 获取结果频控 | 1000 次/分钟 |
| 单日上传上限 | 1 万个文件 |
| 上传链接有效期 | 上传链接有效期为 24 小时 |
注意:因网络限制,GitHub、AWS 等海外服务的 URL 可能请求超时,建议使用自备的可公开访问 URL 或上传后再解析。
二、精准解析 API:单文件调用流程
若已有远程文件 URL,可直接调用 /api/v4/extract/task 提交解析任务。
1. 创建解析任务
import requests
token = "官网申请的api token"
url = "https://mineru.net/api/v4/extract/task"
header = {
"Content-Type": "application/json",
"Authorization": f"Bearer {token}",
"token": "用户的唯一标识" # 可选,用于用户额度管理
}
data = {
"url": "https://your-server.com/example.pdf",
"model_version": "vlm", # 推荐 vlm 模型
"enable_formula": True, # 开启公式识别
"enable_table": True, # 开启表格识别
"is_ocr": False # 是否启用 OCR(扫描件可开启)
}
response = requests.post(url, headers=header, json=data)
if response.status_code == 200:
result = response.json()
if result["code"] == 0:
task_id = result["data"]["task_id"]
print(f"任务创建成功,task_id: {task_id}")
else:
print(f"任务创建失败: {result['msg']}")
else:
print(f"HTTP 错误: {response.status_code}")
请求体参数说明:
url(必填):文件 URL,支持 pdf/doc/docx/ppt/pptx/png/jpg/jpeg/html 等格式。model_version:模型版本,pipeline(默认)/vlm(推荐)/MinerU-HTML。is_ocr:是否启动 OCR,默认 false,仅对 pipeline/vlm 有效。enable_formula:是否开启公式识别,默认 true。enable_table:是否开启表格识别,默认 true。language:文档语言,可选,帮助提升识别精度。page_ranges:指定解析页码范围,如"2,4-6"或"2–2"(倒数)。
2. 轮询结果直至完成
import time
result_url = f"https://mineru.net/api/v4/extract-results/task/{task_id}"
headers = {"Authorization": f"Bearer {token}"}
while True:
resp = requests.get(result_url, headers=headers)
data = resp.json()["data"]
state = data["state"]
if state == "done":
zip_url = data["full_zip_url"]
print(f"✅ 解析完成,下载地址: {zip_url}")
break
elif state == "failed":
raise Exception(f"❌ 解析失败: {data.get('err_msg', '未知错误')}")
else:
print(f"⏳ 状态: {state},等待中...")
time.sleep(5)
3. 轮询结果响应结构
{
"code": 0,
"msg": "ok",
"data": {
"batch_id": "task_id",
"state": "done",
"full_zip_url": "https://...",
"err_msg": ""
}
}
关键字段:
state:任务状态,可选值pending/processing/running/done/failed。full_zip_url:仅当state='done'时才存在,是结果包的完整下载链接。err_msg:当state='failed'时返回详细错误信息。
注意:提交任务频控为 300 次/分钟,获取结果频控为 1000 次/分钟,请合理安排轮询频率。
三、精准解析 API:批量解析本地文件
对于本地文件批量解析,需先通过 /api/v4/file-urls/batch 获取上传链接并上传文件,系统收到文件后会自动触发解析任务,无需额外调用 /api/v4/extract/task。
1. 申请上传链接
import requests
token = "官网申请的api token"
url = "https://mineru.net/api/v4/file-urls/batch"
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {token}",
"token": "用户的唯一标识"
}
data = {
"files": [
{"name": "device_manual.pdf"},
{"name": "product_spec.pdf"}
],
"model_version": "vlm"
}
response = requests.post(url, headers=headers, json=data)
result = response.json()
if result["code"] == 0:
batch_id = result["data"]["batch_id"]
upload_urls = result["data"]["file_urls"]
print(f"批量任务 batch_id: {batch_id}")
else:
print(f"申请失败: {result['msg']}")
2. 上传本地文件
def upload_file(file_path, upload_url):
with open(file_path, "rb") as f:
put_resp = requests.put(upload_url, data=f)
return put_resp.status_code == 200
for i, file_path in enumerate(["device_manual.pdf", "product_spec.pdf"]):
success = upload_file(file_path, upload_urls[i])
if success:
print(f"✅ {file_path} 上传成功")
else:
print(f"❌ {file_path} 上传失败")
上传链接有效期 24 小时,需在此期间完成上传。
3. 轮询批量任务状态
当所有文件上传完成后,系统自动提交解析任务,可通过 batch_id 轮询 /api/v4/extract-results/batch/{batch_id}。
poll_url = f"https://mineru.net/api/v4/extract-results/batch/{batch_id}"
headers = {"Authorization": f"Bearer {token}"}
while True:
resp = requests.get(poll_url, headers=headers)
data = resp.json()["data"]
extract_results = data["extract_result"]
all_done = True
for item in extract_results:
if item["state"] == "done":
zip_url = item["full_zip_url"]
print(f"✅ {item['file_name']} 解析完成")
elif item["state"] == "failed":
print(f"❌ {item['file_name']} 解析失败: {item.get('err_msg')}")
else:
all_done = False
if all_done:
break
time.sleep(5)
4. 批量轮询响应结构
{
"code": 0,
"msg": "ok",
"data": {
"batch_id": "batch_id",
"extract_result": [
{
"file_name": "device_manual.pdf",
"state": "done",
"err_msg": "",
"full_zip_url": "https://..."
},
{
"file_name": "product_spec.pdf",
"state": "running",
"err_msg": "",
"extract_progress": {
"extracted_pages": 5,
"total_pages": 20,
"start_time": "2025-01-20 11:43:20"
}
}
]
}
}
关键字段:
extract_result:每个文件独立的解析状态列表,与上传顺序一一对应。state:可取pending(等待)、processing(处理中)、running(运行中)、done(成功)、failed(失败)。full_zip_url:仅state='done'时才有,是该文件的独立结果包 URL。extract_progress:解析进度信息,仅在state='running'时返回。
四、ZIP 结果包处理
1. 三级 MD 文件匹配策略
import zipfile
def extract_markdown(zip_path, expected_stem):
with zipfile.ZipFile(zip_path, "r") as zf:
md_files = [f for f in zf.namelist() if f.endswith(".md")]
# 1. 优先匹配 <stem>.md
target = next((f for f in md_files if f.startswith(expected_stem)), None)
# 2. 其次匹配 full.md
if not target:
target = next((f for f in md_files if f == "full.md"), None)
# 3. 最终取第一个 .md 文件
if not target and md_files:
target = md_files[0]
if target:
return zf.read(target).decode("utf-8")
return None
2. ZIP 包目录结构
full.pdf.zip
├── images/ # 提取的图片资源
├── full.md # 完整 Markdown 内容(主文件名不一定是 full.md)
├── content_list.json # 结构化数据(含图片位置、表格信息等)
└── layout.json # 页面布局信息
3. 常见问题:解压后文件夹里是否有 _manifest.json?
官方 API 返回的 ZIP 包中不包含 _manifest.json。 该文件仅存在于本地部署 SDK 的输出中,云端 API 返回的 ZIP 包结构仅有 content_list.json 和 full.md 等基础文件,通过轮询接口 extract_result 中按顺序或 file_name 与原始文件匹配即可建立对应关系,不要依赖 _manifest.json。
五、LangGraph 节点实现(单文件演示 + 验证)
以下是一个可直接运行的 MinerU 解析节点,包含完整的申请上传、文件上传、轮询结果、解压提取和三级匹配逻辑。代码中已集成重试、超时控制、临时文件清理等生产级特性。你可以将其直接复制到 Python 环境中测试。
1. 核心客户端实现(改进版)
import os
import time
import zipfile
import tempfile
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import requests
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception
# ---------- 自定义异常 ----------
class MinerUAPIError(Exception):
pass
# ---------- 可重试条件 ----------
def is_retryable_exception(exception):
"""判断异常是否可重试(网络问题或 5xx)"""
if isinstance(exception, requests.exceptions.Timeout):
return True
if isinstance(exception, requests.exceptions.ConnectionError):
return True
if isinstance(exception, MinerUAPIError):
return "5xx" in str(exception) # 简单判断,可扩展
return False
# ---------- 批量解析客户端 ----------
class MinerUBatchClient:
def __init__(self, api_token: str, base_url: str = "https://mineru.net/api/v4",
output_dir: Optional[str] = None, timeout: int = 600):
self.base_url = base_url
self.headers = {"Authorization": f"Bearer {api_token}", "Content-Type": "application/json"}
self.output_dir = Path(output_dir) if output_dir else Path.cwd() / "mineru_output"
self.timeout = timeout
self.session = self._create_session()
def _create_session(self) -> requests.Session:
"""创建带重试和代理绕过的 Session"""
session = requests.Session()
session.trust_env = False # 避免代理干扰
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
retries = Retry(total=3, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504])
session.mount('https://', HTTPAdapter(max_retries=retries))
return session
def parse_batch(self, file_paths: List[str]) -> Dict[str, str]:
"""
批量解析本地文件,返回 {原始文件路径: 最终MD文件路径}
"""
# 1. 申请上传 URL
batch_id, upload_urls = self._request_upload_urls(file_paths)
# 2. 上传文件,收集失败
uploaded_urls, failed_uploads = self._upload_files(file_paths, upload_urls)
if failed_uploads:
err_msg = "以下文件上传失败:\n" + "\n".join(failed_uploads)
raise MinerUAPIError(err_msg)
# 3. 轮询结果(超时控制)
results = self._poll_results(batch_id, file_paths)
# 4. 保存 MD 文件到 output_dir 并返回路径映射
return self._save_markdowns(results)
def _request_upload_urls(self, file_paths: List[str]) -> Tuple[str, List[str]]:
"""步骤1:为每个文件申请上传 URL"""
files_payload = [{"name": Path(p).name} for p in file_paths]
payload = {"files": files_payload, "model_version": "vlm"}
resp = self.session.post(f"{self.base_url}/file-urls/batch", json=payload)
resp.raise_for_status()
result = resp.json()
if result.get("code") != 0:
raise MinerUAPIError(f"获取上传链接失败: {result.get('msg')}")
data = result["data"]
batch_id = data["batch_id"]
upload_urls = data["file_urls"]
return batch_id, upload_urls
def _upload_files(self, file_paths: List[str], upload_urls: List[str]) -> Tuple[List[str], List[str]]:
"""步骤2:上传文件,返回成功URL列表和失败信息列表"""
uploaded_urls = []
failed_uploads = []
for path, url in zip(file_paths, upload_urls):
fname = Path(path).name
try:
with open(path, 'rb') as f:
put_resp = self.session.put(url, data=f)
put_resp.raise_for_status()
uploaded_urls.append(url)
print(f"✅ {fname} 上传成功")
except Exception as e:
failed_uploads.append(f"{fname} (错误: {str(e)})")
print(f"❌ {fname} 上传失败: {e}")
return uploaded_urls, failed_uploads
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10),
retry=retry_if_exception(is_retryable_exception))
def _poll_results(self, batch_id: str, original_paths: List[str]) -> Dict[str, str]:
"""步骤3:轮询解析结果,返回 {原始路径: zip_url} 映射(还未解压)"""
result_url = f"{self.base_url}/extract-results/batch/{batch_id}"
start_time = time.time()
processed = {} # {原始路径: zip_url}
pending_indices = list(range(len(original_paths))) # 待处理的索引
while pending_indices:
if time.time() - start_time > self.timeout:
raise TimeoutError(f"批量解析超时(>{self.timeout}秒)")
resp = self.session.get(result_url, headers=self.headers)
if resp.status_code >= 500:
time.sleep(2)
continue
resp.raise_for_status()
data = resp.json()
if data.get("code") != 0:
raise MinerUAPIError(f"查询结果失败: {data.get('msg')}")
extract_results = data["data"]["extract_result"]
# 遍历待处理的索引
for idx in pending_indices[:]:
item = extract_results[idx]
state = item.get("state")
if state == "done":
zip_url = item.get("full_zip_url")
if zip_url:
processed[original_paths[idx]] = zip_url
pending_indices.remove(idx)
print(f"✅ {Path(original_paths[idx]).name} 解析完成")
elif state == "failed":
err_msg = item.get("err_msg", "未知错误")
raise MinerUAPIError(f"文件 {Path(original_paths[idx]).name} 解析失败: {err_msg}")
# 其他状态(pending/processing)继续等待
if pending_indices:
time.sleep(3)
return processed
def _save_markdowns(self, zip_url_map: Dict[str, str]) -> Dict[str, str]:
"""步骤4:下载 ZIP,解压,提取 MD 文件,保存到 output_dir,返回 {原始路径: MD文件路径}"""
self.output_dir.mkdir(parents=True, exist_ok=True)
result_map = {}
for original_path, zip_url in zip_url_map.items():
stem = Path(original_path).stem
# 下载 ZIP 到临时文件
resp = self.session.get(zip_url)
resp.raise_for_status()
with tempfile.NamedTemporaryFile(suffix=".zip", delete=False) as tmp:
tmp.write(resp.content)
tmp_path = tmp.name
# 解压到临时目录
import shutil
extract_dir = Path(tempfile.mkdtemp())
with zipfile.ZipFile(tmp_path, 'r') as zf:
zf.extractall(extract_dir)
# 查找 MD 文件(三级匹配)
md_files = list(extract_dir.glob("*.md"))
target_md = None
# 1. 优先匹配 <stem>.md
for md in md_files:
if md.stem == stem:
target_md = md
break
# 2. 匹配 full.md
if not target_md:
for md in md_files:
if md.name.lower() == "full.md":
target_md = md
break
# 3. 取第一个
if not target_md and md_files:
target_md = md_files[0]
if not target_md:
raise MinerUAPIError(f"在 ZIP 中未找到任何 .md 文件: {original_path}")
# 重命名(如果名称不一致)
final_md_name = f"{stem}.md"
final_md_path = self.output_dir / final_md_name
# 若目标已存在,先删除
if final_md_path.exists():
final_md_path.unlink()
# 移动并重命名
shutil.move(str(target_md), str(final_md_path))
# 清理临时文件
os.unlink(tmp_path)
shutil.rmtree(extract_dir)
result_map[original_path] = str(final_md_path)
print(f"📄 {Path(original_path).name} -> {final_md_path}")
return result_map
2. LangGraph 节点封装
from typing import Dict, Any
def mineru_batch_parse_node(state: Dict[str, Any]) -> Dict[str, Any]:
"""
LangGraph 节点:批量解析 PDF 文件
输入 state 需包含:
- "local_files": List[str] # 本地 PDF 文件路径列表
- "output_dir": str (可选)
- "api_token": str (可选,也可从环境变量读取)
输出 state 增加:
- "md_files": Dict[str, str] # 原始路径 -> MD 文件路径
- "error": str (如果有错误)
"""
api_token = state.get("api_token") or os.getenv("MINERU_API_TOKEN")
if not api_token:
return {**state, "error": "缺少 MINERU_API_TOKEN"}
local_files = state.get("local_files", [])
if not local_files:
return {**state, "error": "未提供 local_files 列表"}
output_dir = state.get("output_dir") or "./mineru_output"
try:
client = MinerUBatchClient(
api_token=api_token,
output_dir=output_dir,
timeout=600
)
md_files = client.parse_batch(local_files)
return {**state, "md_files": md_files, "error": None}
except Exception as e:
return {**state, "md_files": None, "error": str(e)}
3. 单节点演示与验证
if __name__ == "__main__":
# 设置环境变量或直接赋值
os.environ["MINERU_API_TOKEN"] = "你的API_TOKEN"
# 准备测试文件(请确保文件存在)
base_dir = Path(__file__).parent
pdf_path = base_dir / "doc" / "说明文档.pdf"
if not pdf_path.exists():
print(f"测试文件不存在: {pdf_path}")
import sys
sys.exit(1)
# 模拟 LangGraph 的初始状态
state = {
"local_files": [str(pdf_path)],
"output_dir": str(base_dir / "output")
}
result_state = mineru_batch_parse_node(state)
if result_state.get("error"):
print(f"❌ 解析失败: {result_state['error']}")
else:
for orig, md in result_state["md_files"].items():
print(f"✅ {orig} -> {md}")
将上述代码保存为 .py 文件,填写正确的 API Token 和本地 PDF 路径后即可运行测试。
六、常见问题与最佳实践
1. 为什么只重试 5xx 错误?
5xx 服务端错误通常是临时性的(服务过载、网络抖动),重试极大概率可恢复;而 4xx 客户端错误(如 400 参数错误、401 Token 失效、403 权限不足、404 资源不存在)表示请求本身有问题,重试只会得到相同错误,应直接排查代码。这种区分在轮询逻辑中尤其重要:面对 5xx 可自动重试,面对 4xx 则应立即停止并报错。
批量轮询代码中对
5xx的自动重试,正体现了这一设计原则。
2. 轮询时发现 state 总是 running?
- 检查文件大小和页数是否在限制范围内(200MB / 600 页)。
- 查看
err_msg字段是否有提示。 - 检查网络环境:GitHub、AWS 等海外 URL 可能超时,建议使用自备的可公网访问的 URL。
- 适当延长轮询间隔至 10 秒,避免频繁请求触发限流。
3. 上传成功后长时间无返回?
检查轮询逻辑是否正确提取了 extract_result[*]["state"],并确保已设置合理的超时控制(推荐 600 秒),避免无限等待。注意批量响应中 full_zip_url 只存在于 state='done' 的结果中,未遍历完所有文件前不能中断轮询。
4. enable_formula 和 enable_table 是否对所有模型生效?
这两个参数仅对 pipeline 和 vlm 模型有效,对 MinerU-HTML 模型不适用。特别地,对于 vlm 模型,enable_formula 只影响行内公式的解析,块级公式的解析不受此参数控制。
5. 关于 @retry 装饰器和 tempfile 的说明
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10), retry=retry_if_exception(is_retryable_exception))表示:最多重试 3 次,每次重试等待时间呈指数增长(2秒、4秒、8秒),仅当抛出is_retryable_exception返回True的异常时才重试。with tempfile.NamedTemporaryFile(suffix=".zip", delete=False) as tmp:创建一个带.zip后缀的临时文件,delete=False确保退出with块后文件不被自动删除,以便后续使用。需要手动清理os.unlink(tmp_path)。
七、代码与实现对比
| 维度 | 说明 |
|---|---|
| 流程控制 | 单文件可用 /api/v4/extract/task 直接提交;批量需先走 upload‑batch 流程。轮询时单文件按 task_id 取结果,批量按 batch_id 取 extract_result 列表 |
| 错误收集 | 上传失败清单:_upload_files 方法在批量上传时收集所有失败文件和原因,最后统一抛出包含清单的异常 |
| MD 文件匹配 | 三级匹配:优先 <stem>.md → 其次 full.md → 最后取第一个 .md |
| 重试与超时 | tenacity 指数退避重试(3 次)+ 可配置超时保护(默认 600 秒) |
| 资源管理 | tempfile 临时文件 + 手动 unlink 和 shutil.rmtree 确保清理 |
| 批量支持 | 原生批量:同时处理多个文件,一个 batch_id 统管所有文件解析状态 |
| 异常处理 | 节点内部捕获并存入 state["error"],便于 LangGraph 分支路由或错误恢复 |
八、最佳实践总结
| 实践 | 说明 |
|---|---|
| Token 管理 | 通过环境变量 MINERU_API_TOKEN 管理,定期检查有效期 |
| 超时设置 | 轮询超时建议 300-600 秒,批量任务取 600 秒以上 |
| 三级匹配 | 优先 <stem>.md → 其次 full.md → 最后取第一个 .md,适应不同文件命名 |
| 错误分类 | 仅对 5xx 和网络/超时异常实施指数退避重试,4xx 直接报错 |
| 批量上传 | 先收集失败清单,最后统一报错,便于一次性修复 |
| 资源清理 | tempfile 与 NamedTemporaryFile(delete=False) 配合手动清理,确保临时文件不残留 |
| 请求复用 | 使用 requests.Session 复用连接,设置 trust_env=False 避免代理干扰 |
| LangGraph 集成 | 节点内捕获所有异常并返回 state["error"],便于流程图分支路由;同步调用场景可直接抛出异常 |
| 参数选择 | 复杂文档用 vlm 模型,学术论文/技术文档需表格公式高精度时用 pipeline,HTML 文档用 MinerU-HTML |
将这份指南收藏好,以后遇到 PDF 解析需求时,可以快速查阅 API 调用、代码实现、问题排查等核心要点。
更多推荐

所有评论(0)