玄同 765

大语言模型 (LLM) 开发工程师 | 中国传媒大学 · 数字媒体技术(智能交互与游戏设计)

CSDN · 个人主页 | GitHub · Follow


关于作者

  • 深耕领域:大语言模型开发 / RAG 知识库 / AI Agent 落地 / 模型微调
  • 技术栈:Python | RAG (LangChain / Dify + Milvus) | FastAPI + Docker
  • 工程能力:专注模型工程化部署、知识库构建与优化,擅长全流程解决方案

「让 AI 交互更智能,让技术落地更高效」
欢迎技术探讨与项目合作,解锁大模型与智能交互的无限可能!


记忆与调度:智能体的持久化能力

通过记忆系统保存重要信息,通过调度系统实现定时任务,nanobot 具备了超越即时对话的持久化能力。

概述

一个真正有用的 AI Agent 不仅需要能够即时响应用户请求,还需要具备记忆和调度能力。nanobot 通过 MemoryStore 实现了长期记忆和每日笔记,通过 CronService 实现了定时任务调度。本文将深入剖析记忆与调度系统的设计与实现。

问题背景

持久化能力的核心挑战

构建 Agent 的持久化能力需要解决以下问题:

挑战 描述 nanobot 的解决方案
记忆存储 如何保存重要信息 MemoryStore + Markdown 文件
记忆检索 如何获取相关记忆 时间范围查询 + 长期记忆
定时任务 如何实现周期性任务 CronService + croniter
任务持久化 如何保存任务状态 JSON 文件存储
错误恢复 任务失败如何处理 状态记录 + 重试机制

持久化设计目标

设计目标

简单可靠

易于理解

灵活扩展

文件存储

无数据库依赖

Markdown格式

人类可读

多种调度方式

可配置回调

记忆系统

MemoryStore 设计

MemoryStore 提供了简单但有效的记忆存储:

class MemoryStore:
    """
    Memory system for the agent.
  
    Supports daily notes (memory/YYYY-MM-DD.md) and long-term memory (MEMORY.md).
    """
  
    def __init__(self, workspace: Path):
        self.workspace = workspace
        self.memory_dir = ensure_dir(workspace / "memory")
        self.memory_file = self.memory_dir / "MEMORY.md"

记忆文件结构

workspace/
└── memory/
    ├── MEMORY.md           # 长期记忆
    ├── 2026-02-20.md       # 每日笔记
    ├── 2026-02-19.md
    └── 2026-02-18.md

核心方法

今日笔记
def get_today_file(self) -> Path:
    """Get path to today's memory file."""
    return self.memory_dir / f"{today_date()}.md"

def read_today(self) -> str:
    """Read today's memory notes."""
    today_file = self.get_today_file()
    if today_file.exists():
        return today_file.read_text(encoding="utf-8")
    return ""

def append_today(self, content: str) -> None:
    """Append content to today's memory notes."""
    today_file = self.get_today_file()
  
    if today_file.exists():
        existing = today_file.read_text(encoding="utf-8")
        content = existing + "\n" + content
    else:
        # Add header for new day
        header = f"# {today_date()}\n\n"
        content = header + content
  
    today_file.write_text(content, encoding="utf-8")
长期记忆
def read_long_term(self) -> str:
    """Read long-term memory (MEMORY.md)."""
    if self.memory_file.exists():
        return self.memory_file.read_text(encoding="utf-8")
    return ""

def write_long_term(self, content: str) -> None:
    """Write to long-term memory (MEMORY.md)."""
    self.memory_file.write_text(content, encoding="utf-8")
近期记忆检索
def get_recent_memories(self, days: int = 7) -> str:
    """
    Get memories from the last N days.
  
    Args:
        days: Number of days to look back.
  
    Returns:
        Combined memory content.
    """
    from datetime import timedelta
  
    memories = []
    today = datetime.now().date()
  
    for i in range(days):
        date = today - timedelta(days=i)
        date_str = date.strftime("%Y-%m-%d")
        file_path = self.memory_dir / f"{date_str}.md"
      
        if file_path.exists():
            content = file_path.read_text(encoding="utf-8")
            memories.append(content)
  
    return "\n\n---\n\n".join(memories)

记忆上下文构建

def get_memory_context(self) -> str:
    """
    Get memory context for the agent.
  
    Returns:
        Formatted memory context including long-term and recent memories.
    """
    parts = []
  
    # Long-term memory
    long_term = self.read_long_term()
    if long_term:
        parts.append("## Long-term Memory\n" + long_term)
  
    # Today's notes
    today = self.read_today()
    if today:
        parts.append("## Today's Notes\n" + today)
  
    return "\n\n".join(parts) if parts else ""

记忆系统架构

AgentLoop

ContextBuilder

MemoryStore

Write

Append

Long-term Memory
MEMORY.md

Daily Notes
YYYY-MM-DD.md

Recent Memories
Last N days

Build Memory Context

Context for LLM

会话管理

Session 数据结构

@dataclass
class Session:
    """
    A conversation session.
  
    Stores messages in JSONL format for easy reading and persistence.
    """
  
    key: str  # channel:chat_id
    messages: list[dict[str, Any]] = field(default_factory=list)
    created_at: datetime = field(default_factory=datetime.now)
    updated_at: datetime = field(default_factory=datetime.now)
    metadata: dict[str, Any] = field(default_factory=dict)
  
    def add_message(self, role: str, content: str, **kwargs: Any) -> None:
        """Add a message to the session."""
        msg = {
            "role": role,
            "content": content,
            "timestamp": datetime.now().isoformat(),
            **kwargs
        }
        self.messages.append(msg)
        self.updated_at = datetime.now()
  
    def get_history(self, max_messages: int = 50) -> list[dict[str, Any]]:
        """Get message history for LLM context."""
        recent = self.messages[-max_messages:] if len(self.messages) > max_messages else self.messages
        return [{"role": m["role"], "content": m["content"]} for m in recent]
  
    def clear(self) -> None:
        """Clear all messages in the session."""
        self.messages = []
        self.updated_at = datetime.now()

SessionManager 实现

class SessionManager:
    """Manages conversation sessions."""
  
    def __init__(self, workspace: Path):
        self.workspace = workspace
        self.sessions_dir = ensure_dir(Path.home() / ".nanobot" / "sessions")
        self._cache: dict[str, Session] = {}
  
    def _get_session_path(self, key: str) -> Path:
        """Get the file path for a session."""
        safe_key = safe_filename(key.replace(":", "_"))
        return self.sessions_dir / f"{safe_key}.jsonl"
  
    def get_or_create(self, key: str) -> Session:
        """Get an existing session or create a new one."""
        # Check cache
        if key in self._cache:
            return self._cache[key]
      
        # Try to load from disk
        session = self._load(key)
        if session is None:
            session = Session(key=key)
      
        self._cache[key] = session
        return session
  
    def save(self, session: Session) -> None:
        """Save a session to disk."""
        path = self._get_session_path(session.key)
      
        with open(path, "w") as f:
            # Write metadata first
            metadata_line = {
                "_type": "metadata",
                "created_at": session.created_at.isoformat(),
                "updated_at": session.updated_at.isoformat(),
                "metadata": session.metadata
            }
            f.write(json.dumps(metadata_line) + "\n")
          
            # Write messages
            for msg in session.messages:
                f.write(json.dumps(msg) + "\n")

会话文件格式

{"_type": "metadata", "created_at": "2026-02-20T10:00:00", "updated_at": "2026-02-20T10:30:00", "metadata": {}}
{"role": "user", "content": "Hello!", "timestamp": "2026-02-20T10:00:05"}
{"role": "assistant", "content": "Hi! How can I help you?", "timestamp": "2026-02-20T10:00:10"}
{"role": "user", "content": "What's the weather?", "timestamp": "2026-02-20T10:01:00"}
{"role": "assistant", "content": "Let me check...", "timestamp": "2026-02-20T10:01:05"}

定时任务系统

CronService 设计

class CronService:
    """Service for managing and executing scheduled jobs."""
  
    def __init__(
        self,
        store_path: Path,
        on_job: Callable[[CronJob], Coroutine[Any, Any, str | None]] | None = None
    ):
        self.store_path = store_path
        self.on_job = on_job  # Callback to execute job
        self._store: CronStore | None = None
        self._timer_task: asyncio.Task | None = None
        self._running = False

调度类型

@dataclass
class CronSchedule:
    """Schedule for a cron job."""
    kind: str  # "at", "every", "cron"
    at_ms: int | None = None      # One-shot at timestamp
    every_ms: int | None = None   # Repeat every N ms
    expr: str | None = None       # Cron expression
    tz: str | None = None         # Timezone

任务数据结构

@dataclass
class CronJob:
    """A scheduled job."""
    id: str
    name: str
    enabled: bool
    schedule: CronSchedule
    payload: CronPayload
    state: CronJobState
    created_at_ms: int
    updated_at_ms: int
    delete_after_run: bool = False

@dataclass
class CronPayload:
    """Payload for a cron job."""
    kind: str = "agent_turn"      # Type of job
    message: str = ""             # Message to send
    deliver: bool = False         # Deliver to channel
    channel: str | None = None    # Target channel
    to: str | None = None         # Target chat_id

@dataclass
class CronJobState:
    """State of a cron job."""
    next_run_at_ms: int | None = None
    last_run_at_ms: int | None = None
    last_status: str | None = None  # "ok", "error"
    last_error: str | None = None

调度计算

def _compute_next_run(schedule: CronSchedule, now_ms: int) -> int | None:
    """Compute next run time in ms."""
    if schedule.kind == "at":
        return schedule.at_ms if schedule.at_ms and schedule.at_ms > now_ms else None
  
    if schedule.kind == "every":
        if not schedule.every_ms or schedule.every_ms <= 0:
            return None
        return now_ms + schedule.every_ms
  
    if schedule.kind == "cron" and schedule.expr:
        try:
            from croniter import croniter
            cron = croniter(schedule.expr, time.time())
            next_time = cron.get_next()
            return int(next_time * 1000)
        except Exception:
            return None
  
    return None

定时器机制

async def start(self) -> None:
    """Start the cron service."""
    self._running = True
    self._load_store()
    self._recompute_next_runs()
    self._save_store()
    self._arm_timer()
    logger.info(f"Cron service started with {len(self._store.jobs)} jobs")

def _arm_timer(self) -> None:
    """Schedule the next timer tick."""
    if self._timer_task:
        self._timer_task.cancel()
  
    next_wake = self._get_next_wake_ms()
    if not next_wake or not self._running:
        return
  
    delay_ms = max(0, next_wake - _now_ms())
    delay_s = delay_ms / 1000
  
    async def tick():
        await asyncio.sleep(delay_s)
        if self._running:
            await self._on_timer()
  
    self._timer_task = asyncio.create_task(tick())

async def _on_timer(self) -> None:
    """Handle timer tick - run due jobs."""
    if not self._store:
        return
  
    now = _now_ms()
    due_jobs = [
        j for j in self._store.jobs
        if j.enabled and j.state.next_run_at_ms and now >= j.state.next_run_at_ms
    ]
  
    for job in due_jobs:
        await self._execute_job(job)
  
    self._save_store()
    self._arm_timer()

任务执行

async def _execute_job(self, job: CronJob) -> None:
    """Execute a single job."""
    start_ms = _now_ms()
    logger.info(f"Cron: executing job '{job.name}' ({job.id})")
  
    try:
        response = None
        if self.on_job:
            response = await self.on_job(job)
      
        job.state.last_status = "ok"
        job.state.last_error = None
        logger.info(f"Cron: job '{job.name}' completed")
      
    except Exception as e:
        job.state.last_status = "error"
        job.state.last_error = str(e)
        logger.error(f"Cron: job '{job.name}' failed: {e}")
  
    job.state.last_run_at_ms = start_ms
    job.updated_at_ms = _now_ms()
  
    # Handle one-shot jobs
    if job.schedule.kind == "at":
        if job.delete_after_run:
            self._store.jobs = [j for j in self._store.jobs if j.id != job.id]
        else:
            job.enabled = False
            job.state.next_run_at_ms = None
    else:
        # Compute next run
        job.state.next_run_at_ms = _compute_next_run(job.schedule, _now_ms())

定时任务流程

AgentLoop CronJob Timer CronService CLI AgentLoop CronJob Timer CronService CLI loop [定时器循环] add_job(name, schedule, message) _compute_next_run() _arm_timer() _on_timer() Find due jobs Execute job on_job callback Response Update state _save_store() _arm_timer()

公共 API

添加任务

def add_job(
    self,
    name: str,
    schedule: CronSchedule,
    message: str,
    deliver: bool = False,
    channel: str | None = None,
    to: str | None = None,
    delete_after_run: bool = False,
) -> CronJob:
    """Add a new job."""
    store = self._load_store()
    now = _now_ms()
  
    job = CronJob(
        id=str(uuid.uuid4())[:8],
        name=name,
        enabled=True,
        schedule=schedule,
        payload=CronPayload(
            kind="agent_turn",
            message=message,
            deliver=deliver,
            channel=channel,
            to=to,
        ),
        state=CronJobState(next_run_at_ms=_compute_next_run(schedule, now)),
        created_at_ms=now,
        updated_at_ms=now,
        delete_after_run=delete_after_run,
    )
  
    store.jobs.append(job)
    self._save_store()
    self._arm_timer()
  
    return job

CLI 使用示例

# 添加定时任务
nanobot cron add --name "daily" --message "Good morning!" --cron "0 9 * * *"

# 添加间隔任务
nanobot cron add --name "hourly" --message "Check status" --every 3600

# 列出所有任务
nanobot cron list

# 删除任务
nanobot cron remove <job_id>

技能系统

SkillsLoader 设计

class SkillsLoader:
    """
    Loader for agent skills.
  
    Skills are markdown files (SKILL.md) that teach the agent how to use
    specific tools or perform certain tasks.
    """
  
    def __init__(self, workspace: Path, builtin_skills_dir: Path | None = None):
        self.workspace = workspace
        self.workspace_skills = workspace / "skills"
        self.builtin_skills = builtin_skills_dir or BUILTIN_SKILLS_DIR

技能目录结构

skills/
├── github/
│   └── SKILL.md
├── weather/
│   └── SKILL.md
├── tmux/
│   ├── SKILL.md
│   └── scripts/
│       ├── find-sessions.sh
│       └── wait-for-text.sh
└── summarize/
    └── SKILL.md

技能加载

def load_skill(self, name: str) -> str | None:
    """Load a skill by name."""
    # Check workspace first
    workspace_skill = self.workspace_skills / name / "SKILL.md"
    if workspace_skill.exists():
        return workspace_skill.read_text(encoding="utf-8")
  
    # Check built-in
    if self.builtin_skills:
        builtin_skill = self.builtin_skills / name / "SKILL.md"
        if builtin_skill.exists():
            return builtin_skill.read_text(encoding="utf-8")
  
    return None

渐进式加载

def build_skills_summary(self) -> str:
    """
    Build a summary of all skills (name, description, path, availability).
  
    This is used for progressive loading - the agent can read the full
    skill content using read_file when needed.
    """
    all_skills = self.list_skills(filter_unavailable=False)
    if not all_skills:
        return ""
  
    lines = ["<skills>"]
    for s in all_skills:
        name = s["name"]
        desc = self._get_skill_description(s["name"])
        available = self._check_requirements(self._get_skill_meta(s["name"]))
      
        lines.append(f'  <skill available="{str(available).lower()}">')
        lines.append(f"    <name>{name}</name>")
        lines.append(f"    <description>{desc}</description>")
        lines.append(f"    <location>{s['path']}</location>")
        lines.append(f"  </skill>")
    lines.append("</skills>")
  
    return "\n".join(lines)

技能元数据

---
name: weather
description: Get weather information for any location
nanobot:
  requires:
    bins:
      - curl
    env:
      - WEATHER_API_KEY
  always: false
---

# Weather Skill

This skill allows you to get weather information...

系统集成

记忆系统集成到上下文

def build_system_prompt(self, skill_names: list[str] | None = None) -> str:
    """Build the system prompt from bootstrap files, memory, and skills."""
    parts = []
  
    # Core identity
    parts.append(self._get_identity())
  
    # Bootstrap files
    bootstrap = self._load_bootstrap_files()
    if bootstrap:
        parts.append(bootstrap)
  
    # Memory context
    memory = self.memory.get_memory_context()
    if memory:
        parts.append(f"# Memory\n\n{memory}")
  
    # Skills
    # ...
  
    return "\n\n---\n\n".join(parts)

定时任务集成到 Agent

# 在 AgentLoop 初始化时
if self.cron_service:
    self.tools.register(CronTool(self.cron_service))

# 在 gateway 启动时
cron_service = CronService(
    store_path=workspace / "cron.json",
    on_job=lambda job: agent_loop.process_direct(job.payload.message)
)
await cron_service.start()

总结

nanobot 的记忆与调度系统提供了完整的持久化能力:

系统 功能 实现
MemoryStore 长期记忆 + 每日笔记 Markdown 文件存储
SessionManager 对话历史管理 JSONL 文件存储 + 缓存
CronService 定时任务调度 croniter + asyncio Timer
SkillsLoader 技能按需加载 Markdown + YAML 元数据

这些系统共同构成了 nanobot 的持久化能力,使其不仅能即时响应用户请求,还能记住重要信息、执行定时任务、动态扩展能力。


Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐