构建生产级AI票据处理系统全流程指南
本文像为队友编写代码一样编写:小巧、清晰、可靠的组件,附带审计追踪和操作控制。它很紧凑,所以你可以快速掌握构建生产就绪AI系统的关键实践,这些系统是安全、可审计和可维护的。将此视为你进行真实世界AI工程的实用蓝图。更多精彩内容 请关注我的个人公众号 公众号(办公AI智能小助手)或者 我的个人博客 https://blog.qife122.com/对网络安全、黑客技术感兴趣的朋友可以关注我的安全公众
从PDF到决策:构建生产就绪的AI系统自动化票据处理流程
TL;DR — 你将获得的内容
一套可运行、面向生产的设计方案,包含完整的代码片段,用于构建一个AI代理。该代理能够处理发票PDF文件、运行OCR、使用大语言模型提取结构化字段、验证数据、应用确定性业务逻辑并存储可审计的结果。所有内容都经过精心编写,工程师可以直接复制粘贴到代码仓库中并在本地运行。包含测试、数据库迁移、指标监控和操作控制。
语气:实用且有主见。本文旨在通过代码审查、合规性问题和凌晨三点的生产告警——而不是为了赢得奖项竞赛。
AI获胜的关键不在于聪明,而在于可靠。
为什么你应该关注
大多数生成式AI文章都止步于单一的大语言模型调用。在实践中,真正的系统会在那些看似无聊但至关重要的基础设施方面出问题——例如数据摄取的边界情况、重试机制、可观测性缺口以及静默的验证错误。
本文展示了如何构建一个AI系统——而不是一个演示——来替代人工发票处理流程,同时保持可审计性、安全性和可维护性。
仓库结构
ai-agent-invoice/
├─ docker-compose.yml
├─ Dockerfile
├─ requirements.txt
├─ alembic/
│ └─ versions/001_create_tables.py
├─ app/
│ ├─ main.py
│ ├─ config.py
│ ├─ services/
│ │ ├─ ingestion.py
│ │ ├─ preprocess.py
│ │ ├─ ocr.py
│ │ ├─ extractor.py
│ │ ├─ llm_adapters/
│ │ ├─ validator.py
│ │ ├─ decision.py
│ │ └─ storage.py
│ ├─ models/invoice.py
│ ├─ db/repo.py
│ └─ utils/
│ ├─ logging.py
│ ├─ metrics.py
│ └─ retry.py
└─ infra/prometheus.yml
快速浏览清单
- 架构先行:Pydantic v2模型是唯一的真相来源。
- 大语言模型作为提取器,而非决策器。业务规则存在于代码中。
- 使用Decimal处理金额;使用dateutil处理日期。
- 使用流式上传;采用幂等性和内容哈希。
- 持久化原始PDF、OCR输出和模型元数据以便审计。
- 使用后台工作器(开发环境:BackgroundTasks;生产环境:Celery/RQ/Kafka)。
- 全面监控:Prometheus指标、结构化JSON日志和追踪。
- 如果在六个月后无法解释某个决策的原因,那么这个系统就是不完整的。
高层流程(运行时顺序)
- 客户端上传PDF →
/ingest端点(流式传输文件并返回document_id) - 后台工作器将PDF转换为图像(进行去歪斜和清理)
- 对每个页面运行OCR(本地Tesseract或第三方服务)→ 页面级文本和置信度分数
- 大语言模型使用严格的模式提取结构化字段 → JSON(优先使用函数调用/结构化输出;自由文本解析是最后的手段)
- 使用Pydantic进行规范化与验证(Decimal用于金额,ISO格式日期)
- 确定性决策引擎自动批准或将发票加入人工审核队列
- 持久化结果(process_results JSONB + provider_meta)并发出指标和结构化日志
关键设计原则(简洁清晰)
- 架构先行。架构是真相的来源。大语言模型提取到架构字段中;Pydantic验证它们。
- 关注点分离。在不改变业务逻辑的情况下,更换OCR或大语言模型提供商。
- 确定性决策。如果你需要确定性的结果,请使用版本化的策略将其编码在代码中。
- 审计一切。持久化原始PDF、OCR文本、大语言模型输出、模型和修订详情,以及所有审核操作。
- 默认安全失败。低置信度的数据路由到人工审核。没有静默的自动批准。
- 幂等性。使用X-Idempotency-Key加上内容哈希来避免重复处理。
核心文件
这里只包含最核心的文件——那些审查者会首先阅读的文件。将它们放入仓库布局中所示的路径,根据你的环境进行调整,并根据需要进行修改。
app/models/invoice.py — 架构(Pydantic v2)
# app/models/invoice.py
from pydantic import BaseModel, Field
from decimal import Decimal
from datetime import date
class Invoice(BaseModel):
invoice_number: str
invoice_date: date
vendor_name: str
total_amount: Decimal = Field(..., gt=Decimal("0"))
currency: str
注意:使用Decimal避免金额四舍五入错误。日期存储为ISO格式。
app/config.py — 设置(环境驱动)
# app/config.py
from pydantic import BaseSettings
class Settings(BaseSettings):
DATABASE_URL: str
MISTRAL_API_KEY: str | None = None
MAX_UPLOAD_BYTES: int = 20 * 1024 * 1024 # 20MB default
APPROVAL_THRESHOLD: float = 5000.0
class Config:
env_file = ".env"
settings = Settings()
app/services/ingestion.py — 流式传输 + 幂等性 + 内容哈希
# app/services/ingestion.py
import os, uuid, hashlib
from fastapi import UploadFile
from app.db.repo import save_document, get_document_by_idempotency
from app.utils.retry import retry_backoff
from app.config import settings
UPLOAD_DIR = os.getenv("UPLOAD_DIR", "/data/documents")
os.makedirs(UPLOAD_DIR, exist_ok=True)
@retry_backoff()
async def save_uploaded_file(file: UploadFile, idempotency_key: str | None = None) -> str:
if idempotency_key:
existing = get_document_by_idempotency(idempotency_key)
if existing:
return existing.document_id
document_id = str(uuid.uuid4())
path = os.path.join(UPLOAD_DIR, f"{document_id}.pdf")
size = 0
with open(path, "wb") as out_f:
while True:
chunk = await file.read(1024 * 1024)
if not chunk:
break
size += len(chunk)
if size > settings.MAX_UPLOAD_BYTES:
out_f.close()
os.remove(path)
raise ValueError("File too large")
out_f.write(chunk)
sha256 = hashlib.sha256()
with open(path, "rb") as f:
for block in iter(lambda: f.read(65536), b""):
sha256.update(block)
content_hash = sha256.hexdigest()
save_document(document_id=document_id, path=path,
idempotency_key=idempotency_key, content_hash=content_hash)
return document_id
流式传输可防止内存不足问题;内容哈希和幂等键可防止重复处理。我们在同一个发票以不同文件名上传两次并自动处理两次后,艰难地学到了这一点。
app/services/preprocess.py — PDF→图像 + 去歪斜
# app/services/preprocess.py
from pdf2image import convert_from_path, convert_from_bytes
import cv2, numpy as np
import tempfile, os
def preprocess_pdf_to_images(pdf_path: str, dpi: int = 300) -> list[str]:
images = convert_from_path(pdf_path, dpi=dpi)
out_paths = []
for i, pil_img in enumerate(images):
img = cv2.cvtColor(np.array(pil_img), cv2.COLOR_RGB2BGR)
img = _deskew_image_safe(img)
tmp_path = os.path.join(tempfile.gettempdir(), f"{os.path.basename(pdf_path)}_page_{i}.png")
cv2.imwrite(tmp_path, img)
out_paths.append(tmp_path)
return out_paths
def _deskew_image_safe(img):
try:
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
coords = np.column_stack(np.where(gray > 0))
if coords.size == 0:
return img
angle = cv2.minAreaRect(coords)[-1]
if angle < -45:
angle = -(90 + angle)
else:
angle = -angle
(h, w) = img.shape[:2]
M = cv2.getRotationMatrix2D((w // 2, h // 2), angle, 1.0)
rotated = cv2.warpAffine(img, M, (w, h), flags=cv2.INTER_CUBIC, borderMode=cv2.BORDER_REPLICATE)
return rotated
except Exception:
return img
注意:pdf2image有系统级依赖。这是一个无聊的设置步骤,如果跳过,它会悄悄降低OCR质量。DPI 300是扫描文档的安全默认值。
app/services/ocr.py
# app/services/ocr.py
from PIL import Image
import pytesseract
from typing import NamedTuple
class OCRResult(NamedTuple):
text: str
confidence: float
def local_tesseract_ocr(image_path: str) -> OCRResult:
img = Image.open(image_path)
data = pytesseract.image_to_data(img, output_type=pytesseract.Output.DICT)
lines = []
confidences = []
for i, text in enumerate(data.get("text", [])):
if str(text).strip():
lines.append(text)
try:
conf_val = data.get("conf", [])[i]
conf = float(conf_val)
if conf >= 0:
confidences.append(conf)
except Exception:
continue
text = "\n".join(lines)
avg_conf = (sum(confidences) / len(confidences) / 100.0) if confidences else 0.0
return OCRResult(text=text, confidence=avg_conf)
注意:实现提供商适配器(例如,某机构的Textract或Vision LLM),返回相同的OCRResult接口。这使得管道的其余部分与提供商无关。
app/services/extractor.py — 适配器 + 稳健的JSON提取
# app/services/extractor.py
import json, re
from app.config import settings
def extract_json_from_text(text: str) -> str:
text = re.sub(r"```(?:json)?", "", text)
brace_idx = text.find("{")
if brace_idx == -1:
raise ValueError("No JSON object found in response")
stack = 0
start = -1
for i, ch in enumerate(text[brace_idx:], start=brace_idx):
if ch == "{":
if start == -1:
start = i
stack += 1
elif ch == "}":
stack -= 1
if stack == 0:
json_str = text[start:i+1]
try:
parsed = json.loads(json_str)
return json.dumps(parsed)
except Exception:
break
m = re.search(r"\{.*\}", text, flags=re.DOTALL)
if m:
return m.group(0)
raise ValueError("Could not extract JSON from response")
# Provider adapter example (conceptual)
class OpenAIAdapter:
def __init__(self, client):
self.client = client
def extract_with_schema(self, text: str, schema: dict) -> dict:
# Prefer function-calling / structured outputs if provider supports it.
resp_raw = self.client.call_model(text, schema=schema, temperature=0.0)
json_str = extract_json_from_text(resp_raw)
return json.loads(json_str)
注意:当提供商支持时,始终优先使用结构化输出(函数调用)。自由文本解析之所以存在,只是因为真实的模型仍然会以令人惊讶的方式失败。
app/services/validator.py — 规范化 & 验证(Decimal, dateutil)
# app/services/validator.py
from app.models.invoice import Invoice
from pydantic import ValidationError
from dateutil import parser as date_parser
from decimal import Decimal
import re
def normalize_numbers_and_dates(raw: dict) -> dict:
amount = raw.get("total_amount")
if isinstance(amount, str):
amount = amount.replace(",", "").strip()
amount = re.sub(r"[^\d.\-]", "", amount)
raw["total_amount"] = Decimal(amount) if amount else None
elif isinstance(amount, (int, float)):
raw["total_amount"] = Decimal(str(amount))
date_val = raw.get("invoice_date")
if isinstance(date_val, str):
try:
d = date_parser.parse(date_val, dayfirst=False)
raw["invoice_date"] = d.date().isoformat()
except Exception:
pass
return raw
def validate_invoice(raw: dict) -> Invoice:
raw = normalize_numbers_and_dates(raw)
try:
invoice = Invoice.model_validate(raw)
except ValidationError as e:
raise RuntimeError(f"Validation failed: {e}")
return invoice
注意:在生产环境中明确处理区域设置差异(例如,日期格式和小数点分隔符)。
app/services/decision.py — 确定性策略
# app/services/decision.py
from decimal import Decimal
from app.config import settings
APPROVAL_THRESHOLD = Decimal(str(settings.APPROVAL_THRESHOLD))
def decide(invoice):
if invoice.total_amount < APPROVAL_THRESHOLD:
return {"decision": "AUTO_APPROVED", "reason": "Amount under threshold", "policy_version": "v1"}
return {"decision": "NEEDS_REVIEW", "reason": "Amount exceeds threshold", "policy_version": "v1"}
重要:每个决策都持久化policy_version。当财务或合规部门问“为什么批准了这个?”,这个字段是你唯一可以辩护的答案。
app/db/repo.py — 简化持久化(概念性)
# app/db/repo.py
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
from app.config import settings
engine = create_engine(settings.DATABASE_URL)
Session = sessionmaker(bind=engine)
def save_document(document_id, path, idempotency_key=None, content_hash=None):
# Implement insert with unique constraints and transactions
pass
def save_process_result(document_id, result_json, provider_meta):
# store JSONB record
pass
def get_document_by_idempotency(key):
# return document row if exists
pass
确保存储库包含完整的SQL/ORM实现和Alembic迁移(见下文),以获得完整、可运行的设置。
数据库DDL(Postgres) — infra/db_schema.sql
CREATE TABLE documents (
document_id TEXT PRIMARY KEY,
path TEXT NOT NULL,
status TEXT NOT NULL,
idempotency_key TEXT UNIQUE,
content_hash TEXT UNIQUE,
created_at TIMESTAMPTZ DEFAULT now()
);
CREATE TABLE process_results (
id BIGSERIAL PRIMARY KEY,
document_id TEXT REFERENCES documents(document_id),
result_json JSONB NOT NULL,
provider_meta JSONB,
created_at TIMESTAMPTZ DEFAULT now()
);
CREATE TABLE review_queue (
id BIGSERIAL PRIMARY KEY,
document_id TEXT,
reason TEXT,
added_at TIMESTAMPTZ DEFAULT now(),
resolved BOOLEAN DEFAULT FALSE
);
提示:对result_json和provider_meta使用JSONB来存储所有相关元数据,包括llm_provider、model、revision和prompt_hash。
可观测性(快速指南)
- 结构化日志:使用structlog发出JSON日志,包括trace_id、document_id、step和status。
- 指标(Prometheus):跟踪计数器,如documents_ingested_total和documents_processed_total,并使用直方图记录processing_duration_seconds。
- 追踪:使用OpenTelemetry检测FastAPI和数据库,导出到OTLP收集器。在日志中包含trace_id以实现端到端的可追溯性。
Alembic示例(迁移框架)
alembic/versions/001_create_tables.py
from alembic import op
import sqlalchemy as sa
revision = '001'
def upgrade():
op.create_table(
'documents',
sa.Column('document_id', sa.Text, primary_key=True),
sa.Column('path', sa.Text, nullable=False),
sa.Column('status', sa.Text, nullable=False),
sa.Column('idempotency_key', sa.Text, nullable=True),
sa.Column('content_hash', sa.Text, nullable=True),
sa.Column('created_at', sa.TIMESTAMP(timezone=True), server_default=sa.text('now()'))
)
# create other tables...
def downgrade():
op.drop_table('documents')
人工干预(审核)端点 — 框架
GET /reviews/queue— 获取待审核文档列表POST /reviews/{document_id}/resolve— 提交带有 { decision, note, override_by } 的操作
记录每次审核操作,附带user_id和时间戳以便审计。
UI:显示原始PDF、页面级OCR结果、带有置信度得分的提取JSON,并提供操作按钮:批准、拒绝和编辑。
数据保留和删除(合规性)
实现 DELETE /documents/{id} 作为软删除:将文档标记为已删除,并在保留TTL(生存时间)后安排数据块清除。
始终尊重法律保留要求,并维护完整的删除审计追踪。
必须包含的测试
- 验证器测试:覆盖日期格式和货币格式的所有边界情况。
- 提取器测试:稳健地解析JSON,包括围栏式代码块和额外注释。
- 端到端测试:使用提供商模拟器来模拟上传 → 处理 → 数据库断言。
- 安全测试:验证RBAC端点和权限。
快速本地运行
- 启动应用:
docker-compose up --build - 上传示例发票:
curl -F "file=@tests/fixtures/invoice_sample.pdf" http://localhost:8000/ingest - 通过在process_results表中检查JSON和provider_meta来验证结果。
- 爬取 /metrics 端点(Prometheus)查看计数器是否递增。
清单
- 上传使用流式传输(不要将整个文件加载到内存中)。
- 一致使用Invoice.model_validate / model_dump。
- 对total_amount使用Decimal并稳健地处理日期解析。
- 确保提供商适配器存在,并在测试中正确模拟。
- 在content_hash和idempotency_key上应用唯一约束。
- 为审计持久化原始OCR和大语言模型输出。
- 包含Prometheus /metrics和示例仪表板。
- Alembic迁移应被包含并经过测试。
- 记录后台处理/队列模式。
潜在陷阱
- 扫描图像质量极差:OCR可能会失败。考虑添加Vision LLM或云端OCR以获得更好的结果。
- 模糊的日期格式(DD/MM vs MM/DD):确保你的策略明确选择一个区域设置。
- 意外的大语言模型输出结构:如果可用,使用函数调用或架构强制执行;或者进行防御性解析并记录原始输出。
注意:这些不是错误——它们是你在生产的头几周内会遇到的操作现实。
总结
本文像为队友编写代码一样编写:小巧、清晰、可靠的组件,附带审计追踪和操作控制。它很紧凑,所以你可以快速掌握构建生产就绪AI系统的关键实践,这些系统是安全、可审计和可维护的。将此视为你进行真实世界AI工程的实用蓝图。
更多精彩内容 请关注我的个人公众号 公众号(办公AI智能小助手)或者 我的个人博客 https://blog.qife122.com/
对网络安全、黑客技术感兴趣的朋友可以关注我的安全公众号(网络安全技术点滴分享)
更多推荐

所有评论(0)