记忆与调度:智能体的持久化能力(nanobot架构分析)
一个真正有用的 AI Agent 不仅需要能够即时响应用户请求,还需要具备记忆和调度能力。nanobot 通过 MemoryStore 实现了长期记忆和每日笔记,通过 CronService 实现了定时任务调度。本文将深入剖析记忆与调度系统的设计与实现。
·
关于作者
- 深耕领域:大语言模型开发 / RAG 知识库 / AI Agent 落地 / 模型微调
- 技术栈:Python | RAG (LangChain / Dify + Milvus) | FastAPI + Docker
- 工程能力:专注模型工程化部署、知识库构建与优化,擅长全流程解决方案
「让 AI 交互更智能,让技术落地更高效」
欢迎技术探讨与项目合作,解锁大模型与智能交互的无限可能!
记忆与调度:智能体的持久化能力
通过记忆系统保存重要信息,通过调度系统实现定时任务,nanobot 具备了超越即时对话的持久化能力。
概述
一个真正有用的 AI Agent 不仅需要能够即时响应用户请求,还需要具备记忆和调度能力。nanobot 通过 MemoryStore 实现了长期记忆和每日笔记,通过 CronService 实现了定时任务调度。本文将深入剖析记忆与调度系统的设计与实现。
问题背景
持久化能力的核心挑战
构建 Agent 的持久化能力需要解决以下问题:
| 挑战 | 描述 | nanobot 的解决方案 |
|---|---|---|
| 记忆存储 | 如何保存重要信息 | MemoryStore + Markdown 文件 |
| 记忆检索 | 如何获取相关记忆 | 时间范围查询 + 长期记忆 |
| 定时任务 | 如何实现周期性任务 | CronService + croniter |
| 任务持久化 | 如何保存任务状态 | JSON 文件存储 |
| 错误恢复 | 任务失败如何处理 | 状态记录 + 重试机制 |
持久化设计目标
记忆系统
MemoryStore 设计
MemoryStore 提供了简单但有效的记忆存储:
class MemoryStore:
"""
Memory system for the agent.
Supports daily notes (memory/YYYY-MM-DD.md) and long-term memory (MEMORY.md).
"""
def __init__(self, workspace: Path):
self.workspace = workspace
self.memory_dir = ensure_dir(workspace / "memory")
self.memory_file = self.memory_dir / "MEMORY.md"
记忆文件结构
workspace/
└── memory/
├── MEMORY.md # 长期记忆
├── 2026-02-20.md # 每日笔记
├── 2026-02-19.md
└── 2026-02-18.md
核心方法
今日笔记
def get_today_file(self) -> Path:
"""Get path to today's memory file."""
return self.memory_dir / f"{today_date()}.md"
def read_today(self) -> str:
"""Read today's memory notes."""
today_file = self.get_today_file()
if today_file.exists():
return today_file.read_text(encoding="utf-8")
return ""
def append_today(self, content: str) -> None:
"""Append content to today's memory notes."""
today_file = self.get_today_file()
if today_file.exists():
existing = today_file.read_text(encoding="utf-8")
content = existing + "\n" + content
else:
# Add header for new day
header = f"# {today_date()}\n\n"
content = header + content
today_file.write_text(content, encoding="utf-8")
长期记忆
def read_long_term(self) -> str:
"""Read long-term memory (MEMORY.md)."""
if self.memory_file.exists():
return self.memory_file.read_text(encoding="utf-8")
return ""
def write_long_term(self, content: str) -> None:
"""Write to long-term memory (MEMORY.md)."""
self.memory_file.write_text(content, encoding="utf-8")
近期记忆检索
def get_recent_memories(self, days: int = 7) -> str:
"""
Get memories from the last N days.
Args:
days: Number of days to look back.
Returns:
Combined memory content.
"""
from datetime import timedelta
memories = []
today = datetime.now().date()
for i in range(days):
date = today - timedelta(days=i)
date_str = date.strftime("%Y-%m-%d")
file_path = self.memory_dir / f"{date_str}.md"
if file_path.exists():
content = file_path.read_text(encoding="utf-8")
memories.append(content)
return "\n\n---\n\n".join(memories)
记忆上下文构建
def get_memory_context(self) -> str:
"""
Get memory context for the agent.
Returns:
Formatted memory context including long-term and recent memories.
"""
parts = []
# Long-term memory
long_term = self.read_long_term()
if long_term:
parts.append("## Long-term Memory\n" + long_term)
# Today's notes
today = self.read_today()
if today:
parts.append("## Today's Notes\n" + today)
return "\n\n".join(parts) if parts else ""
记忆系统架构
会话管理
Session 数据结构
@dataclass
class Session:
"""
A conversation session.
Stores messages in JSONL format for easy reading and persistence.
"""
key: str # channel:chat_id
messages: list[dict[str, Any]] = field(default_factory=list)
created_at: datetime = field(default_factory=datetime.now)
updated_at: datetime = field(default_factory=datetime.now)
metadata: dict[str, Any] = field(default_factory=dict)
def add_message(self, role: str, content: str, **kwargs: Any) -> None:
"""Add a message to the session."""
msg = {
"role": role,
"content": content,
"timestamp": datetime.now().isoformat(),
**kwargs
}
self.messages.append(msg)
self.updated_at = datetime.now()
def get_history(self, max_messages: int = 50) -> list[dict[str, Any]]:
"""Get message history for LLM context."""
recent = self.messages[-max_messages:] if len(self.messages) > max_messages else self.messages
return [{"role": m["role"], "content": m["content"]} for m in recent]
def clear(self) -> None:
"""Clear all messages in the session."""
self.messages = []
self.updated_at = datetime.now()
SessionManager 实现
class SessionManager:
"""Manages conversation sessions."""
def __init__(self, workspace: Path):
self.workspace = workspace
self.sessions_dir = ensure_dir(Path.home() / ".nanobot" / "sessions")
self._cache: dict[str, Session] = {}
def _get_session_path(self, key: str) -> Path:
"""Get the file path for a session."""
safe_key = safe_filename(key.replace(":", "_"))
return self.sessions_dir / f"{safe_key}.jsonl"
def get_or_create(self, key: str) -> Session:
"""Get an existing session or create a new one."""
# Check cache
if key in self._cache:
return self._cache[key]
# Try to load from disk
session = self._load(key)
if session is None:
session = Session(key=key)
self._cache[key] = session
return session
def save(self, session: Session) -> None:
"""Save a session to disk."""
path = self._get_session_path(session.key)
with open(path, "w") as f:
# Write metadata first
metadata_line = {
"_type": "metadata",
"created_at": session.created_at.isoformat(),
"updated_at": session.updated_at.isoformat(),
"metadata": session.metadata
}
f.write(json.dumps(metadata_line) + "\n")
# Write messages
for msg in session.messages:
f.write(json.dumps(msg) + "\n")
会话文件格式
{"_type": "metadata", "created_at": "2026-02-20T10:00:00", "updated_at": "2026-02-20T10:30:00", "metadata": {}}
{"role": "user", "content": "Hello!", "timestamp": "2026-02-20T10:00:05"}
{"role": "assistant", "content": "Hi! How can I help you?", "timestamp": "2026-02-20T10:00:10"}
{"role": "user", "content": "What's the weather?", "timestamp": "2026-02-20T10:01:00"}
{"role": "assistant", "content": "Let me check...", "timestamp": "2026-02-20T10:01:05"}
定时任务系统
CronService 设计
class CronService:
"""Service for managing and executing scheduled jobs."""
def __init__(
self,
store_path: Path,
on_job: Callable[[CronJob], Coroutine[Any, Any, str | None]] | None = None
):
self.store_path = store_path
self.on_job = on_job # Callback to execute job
self._store: CronStore | None = None
self._timer_task: asyncio.Task | None = None
self._running = False
调度类型
@dataclass
class CronSchedule:
"""Schedule for a cron job."""
kind: str # "at", "every", "cron"
at_ms: int | None = None # One-shot at timestamp
every_ms: int | None = None # Repeat every N ms
expr: str | None = None # Cron expression
tz: str | None = None # Timezone
任务数据结构
@dataclass
class CronJob:
"""A scheduled job."""
id: str
name: str
enabled: bool
schedule: CronSchedule
payload: CronPayload
state: CronJobState
created_at_ms: int
updated_at_ms: int
delete_after_run: bool = False
@dataclass
class CronPayload:
"""Payload for a cron job."""
kind: str = "agent_turn" # Type of job
message: str = "" # Message to send
deliver: bool = False # Deliver to channel
channel: str | None = None # Target channel
to: str | None = None # Target chat_id
@dataclass
class CronJobState:
"""State of a cron job."""
next_run_at_ms: int | None = None
last_run_at_ms: int | None = None
last_status: str | None = None # "ok", "error"
last_error: str | None = None
调度计算
def _compute_next_run(schedule: CronSchedule, now_ms: int) -> int | None:
"""Compute next run time in ms."""
if schedule.kind == "at":
return schedule.at_ms if schedule.at_ms and schedule.at_ms > now_ms else None
if schedule.kind == "every":
if not schedule.every_ms or schedule.every_ms <= 0:
return None
return now_ms + schedule.every_ms
if schedule.kind == "cron" and schedule.expr:
try:
from croniter import croniter
cron = croniter(schedule.expr, time.time())
next_time = cron.get_next()
return int(next_time * 1000)
except Exception:
return None
return None
定时器机制
async def start(self) -> None:
"""Start the cron service."""
self._running = True
self._load_store()
self._recompute_next_runs()
self._save_store()
self._arm_timer()
logger.info(f"Cron service started with {len(self._store.jobs)} jobs")
def _arm_timer(self) -> None:
"""Schedule the next timer tick."""
if self._timer_task:
self._timer_task.cancel()
next_wake = self._get_next_wake_ms()
if not next_wake or not self._running:
return
delay_ms = max(0, next_wake - _now_ms())
delay_s = delay_ms / 1000
async def tick():
await asyncio.sleep(delay_s)
if self._running:
await self._on_timer()
self._timer_task = asyncio.create_task(tick())
async def _on_timer(self) -> None:
"""Handle timer tick - run due jobs."""
if not self._store:
return
now = _now_ms()
due_jobs = [
j for j in self._store.jobs
if j.enabled and j.state.next_run_at_ms and now >= j.state.next_run_at_ms
]
for job in due_jobs:
await self._execute_job(job)
self._save_store()
self._arm_timer()
任务执行
async def _execute_job(self, job: CronJob) -> None:
"""Execute a single job."""
start_ms = _now_ms()
logger.info(f"Cron: executing job '{job.name}' ({job.id})")
try:
response = None
if self.on_job:
response = await self.on_job(job)
job.state.last_status = "ok"
job.state.last_error = None
logger.info(f"Cron: job '{job.name}' completed")
except Exception as e:
job.state.last_status = "error"
job.state.last_error = str(e)
logger.error(f"Cron: job '{job.name}' failed: {e}")
job.state.last_run_at_ms = start_ms
job.updated_at_ms = _now_ms()
# Handle one-shot jobs
if job.schedule.kind == "at":
if job.delete_after_run:
self._store.jobs = [j for j in self._store.jobs if j.id != job.id]
else:
job.enabled = False
job.state.next_run_at_ms = None
else:
# Compute next run
job.state.next_run_at_ms = _compute_next_run(job.schedule, _now_ms())
定时任务流程
公共 API
添加任务
def add_job(
self,
name: str,
schedule: CronSchedule,
message: str,
deliver: bool = False,
channel: str | None = None,
to: str | None = None,
delete_after_run: bool = False,
) -> CronJob:
"""Add a new job."""
store = self._load_store()
now = _now_ms()
job = CronJob(
id=str(uuid.uuid4())[:8],
name=name,
enabled=True,
schedule=schedule,
payload=CronPayload(
kind="agent_turn",
message=message,
deliver=deliver,
channel=channel,
to=to,
),
state=CronJobState(next_run_at_ms=_compute_next_run(schedule, now)),
created_at_ms=now,
updated_at_ms=now,
delete_after_run=delete_after_run,
)
store.jobs.append(job)
self._save_store()
self._arm_timer()
return job
CLI 使用示例
# 添加定时任务
nanobot cron add --name "daily" --message "Good morning!" --cron "0 9 * * *"
# 添加间隔任务
nanobot cron add --name "hourly" --message "Check status" --every 3600
# 列出所有任务
nanobot cron list
# 删除任务
nanobot cron remove <job_id>
技能系统
SkillsLoader 设计
class SkillsLoader:
"""
Loader for agent skills.
Skills are markdown files (SKILL.md) that teach the agent how to use
specific tools or perform certain tasks.
"""
def __init__(self, workspace: Path, builtin_skills_dir: Path | None = None):
self.workspace = workspace
self.workspace_skills = workspace / "skills"
self.builtin_skills = builtin_skills_dir or BUILTIN_SKILLS_DIR
技能目录结构
skills/
├── github/
│ └── SKILL.md
├── weather/
│ └── SKILL.md
├── tmux/
│ ├── SKILL.md
│ └── scripts/
│ ├── find-sessions.sh
│ └── wait-for-text.sh
└── summarize/
└── SKILL.md
技能加载
def load_skill(self, name: str) -> str | None:
"""Load a skill by name."""
# Check workspace first
workspace_skill = self.workspace_skills / name / "SKILL.md"
if workspace_skill.exists():
return workspace_skill.read_text(encoding="utf-8")
# Check built-in
if self.builtin_skills:
builtin_skill = self.builtin_skills / name / "SKILL.md"
if builtin_skill.exists():
return builtin_skill.read_text(encoding="utf-8")
return None
渐进式加载
def build_skills_summary(self) -> str:
"""
Build a summary of all skills (name, description, path, availability).
This is used for progressive loading - the agent can read the full
skill content using read_file when needed.
"""
all_skills = self.list_skills(filter_unavailable=False)
if not all_skills:
return ""
lines = ["<skills>"]
for s in all_skills:
name = s["name"]
desc = self._get_skill_description(s["name"])
available = self._check_requirements(self._get_skill_meta(s["name"]))
lines.append(f' <skill available="{str(available).lower()}">')
lines.append(f" <name>{name}</name>")
lines.append(f" <description>{desc}</description>")
lines.append(f" <location>{s['path']}</location>")
lines.append(f" </skill>")
lines.append("</skills>")
return "\n".join(lines)
技能元数据
---
name: weather
description: Get weather information for any location
nanobot:
requires:
bins:
- curl
env:
- WEATHER_API_KEY
always: false
---
# Weather Skill
This skill allows you to get weather information...
系统集成
记忆系统集成到上下文
def build_system_prompt(self, skill_names: list[str] | None = None) -> str:
"""Build the system prompt from bootstrap files, memory, and skills."""
parts = []
# Core identity
parts.append(self._get_identity())
# Bootstrap files
bootstrap = self._load_bootstrap_files()
if bootstrap:
parts.append(bootstrap)
# Memory context
memory = self.memory.get_memory_context()
if memory:
parts.append(f"# Memory\n\n{memory}")
# Skills
# ...
return "\n\n---\n\n".join(parts)
定时任务集成到 Agent
# 在 AgentLoop 初始化时
if self.cron_service:
self.tools.register(CronTool(self.cron_service))
# 在 gateway 启动时
cron_service = CronService(
store_path=workspace / "cron.json",
on_job=lambda job: agent_loop.process_direct(job.payload.message)
)
await cron_service.start()
总结
nanobot 的记忆与调度系统提供了完整的持久化能力:
| 系统 | 功能 | 实现 |
|---|---|---|
| MemoryStore | 长期记忆 + 每日笔记 | Markdown 文件存储 |
| SessionManager | 对话历史管理 | JSONL 文件存储 + 缓存 |
| CronService | 定时任务调度 | croniter + asyncio Timer |
| SkillsLoader | 技能按需加载 | Markdown + YAML 元数据 |
这些系统共同构成了 nanobot 的持久化能力,使其不仅能即时响应用户请求,还能记住重要信息、执行定时任务、动态扩展能力。
更多推荐




所有评论(0)