OpenClaw Agent Loop 机制源码深度分析(二)
管理上下文窗口,避免超出限制。跟踪当前运行的会话状态。Bootstrap 文件。
·
会话压缩
文件: compact.ts
管理上下文窗口,避免超出限制。
export async function compactEmbeddedPiSession(
params: CompactEmbeddedPiSessionParams,
): Promise<EmbeddedPiCompactResult> {
const { sessionManager } = await prepareSessionManagerForRun({...});
// 1. 检查是否需要压缩
const needsCompaction = await sessionManager.needsCompaction();
if (!needsCompaction) {
return { ok: true, compacted: false };
}
// 2. 估算压缩后的令牌数
const estimate = await sessionManager.compactionEstimate();
// 3. 执行压缩
const result = await sessionManager.compact({
systemPrompt: buildSystemPrompt(),
reserveTokens: resolveCompactionReserveTokensFloor(params.config),
});
if (result.success) {
return {
ok: true,
compacted: true,
originalTokens: estimate.originalTokens,
compactedTokens: result.tokenCount,
};
}
return {
ok: false,
compacted: false,
reason: result.error,
};
}
运行状态管理
文件: runs.ts
跟踪当前运行的会话状态。
// 活动运行映射
const ACTIVE_EMBEDDED_RUNS = new Map<string, EmbeddedPiQueueHandle>();
// 等待器映射
const EMBEDDED_RUN_WAITERS = new Map<string, Set<EmbeddedRunWaiter>>();
// 注册活动运行
export function setActiveEmbeddedRun(
sessionId: string,
handle: EmbeddedPiQueueHandle
) {
ACTIVE_EMBEDDED_RUNS.set(sessionId, handle);
}
// 检查是否有活动运行
export function isEmbeddedPiRunActive(sessionId: string): boolean {
return ACTIVE_EMBEDDED_RUNS.has(sessionId);
}
// 队列消息(用于流式响应)
export function queueEmbeddedPiMessage(sessionId: string, text: string): boolean {
const handle = ACTIVE_EMBEDDED_RUNS.get(sessionId);
if (!handle) return false;
handle.queueMessage(text);
return true;
}
// 中断运行
export function abortEmbeddedPiRun(sessionId: string): boolean {
const handle = ACTIVE_EMBEDDED_RUNS.get(sessionId);
if (!handle) return false;
handle.abort();
return true;
}
消息处理流程
消息接收
// 从参数构建运行上下文
async function prepareRunContext(params) {
// 1. 解析会话
const sessionManager = await prepareSessionManagerForRun({
sessionId: params.sessionId,
sessionKey: params.sessionKey,
workspaceDir: params.workspaceDir,
config: params.config,
});
// 2. 加载引导文件
const { bootstrapFiles, contextFiles } = await resolveBootstrapContextForRun({
workspaceDir: params.workspaceDir,
config: params.config,
sessionKey: params.sessionKey,
});
return { sessionManager, bootstrapFiles, contextFiles };
}
提示词构建
// 构建系统提示词
function buildEmbeddedSystemPrompt(params) {
const { sessionManager, config, workspaceDir, bootstrapFiles } = params;
// 1. 基础身份
const identity = buildIdentitySection();
// 2. 引导文件内容
const bootstrap = loadBootstrapFiles(bootstrapFiles);
// 3. Skills 提示
const skills = resolveSkillsPromptForRun({
config,
workspaceDir,
});
// 4. 工具描述
const tools = buildToolDescriptions();
// 5. 合并所有部分
return [
identity,
bootstrap,
skills,
tools,
].join("\n\n---\n\n");
}
LLM 调用
// 执行 LLM 调用
async function callLLM(params) {
const { sessionManager, model, tools, thinking } = params;
const response = await sessionManager.complete({
model,
tools,
thinking,
// 流式回调
onChunk: (chunk) => {
// 处理流式响应
handleStreamingChunk(chunk);
},
onComplete: (response) => {
// 处理完成
handleComplete(response);
},
});
return response;
}
工具执行循环
// 主执行循环
async function executeLoop(params) {
const { sessionManager, tools } = params;
// 1. 发送用户消息
await sessionManager.appendUserMessage(params.message);
// 2. 循环直到没有工具调用
while (true) {
// 2.1 调用 LLM
const response = await sessionManager.complete({
model: params.modelId,
tools,
thinking: params.thinkLevel,
});
// 2.2 检查工具调用
if (response.tool_calls?.length > 0) {
// 2.3 执行所有工具调用
for (const toolCall of response.tool_calls) {
const result = await executeTool(toolCall);
// 2.4 添加工具结果到会话
await sessionManager.appendToolResult(
toolCall.id,
toolCall.name,
result
);
}
// 2.5 继续循环
continue;
}
// 3. 没有工具调用,完成
return {
success: true,
reply: response.content,
usage: response.usage,
};
}
}
关键机制
流式响应
// 流式响应处理
async function handleStreaming(params) {
const { sessionManager, onChunk } = params;
// 累积响应
let accumulatedContent = "";
await sessionManager.complete({
model: params.modelId,
tools: params.tools,
thinking: params.thinking,
// 流式回调
onChunk: async (chunk) => {
if (chunk.content) {
accumulatedContent += chunk.content;
// 发送增量更新
onChunk?.({
type: "content",
content: chunk.content,
fullContent: accumulatedContent,
});
}
if (chunk.tool_use) {
// 工具调用开始
onChunk?.({
type: "tool_call",
tool: chunk.tool_use.name,
id: chunk.tool_use.id,
});
}
},
});
}
上下文管理
// 上下文窗口管理
async function manageContext(params) {
const { sessionManager, config } = params;
// 1. 检查当前上下文大小
const info = await sessionManager.contextInfo();
if (info.tokenCount > config.contextWindow * 0.9) {
// 2. 需要压缩
const result = await sessionManager.compact({
systemPrompt: buildSystemPrompt(),
reserveTokens: resolveCompactionReserveTokensFloor(config),
});
if (!result.success) {
throw new Error("Compaction failed: " + result.error);
}
return { compacted: true, originalTokens: info.tokenCount };
}
return { compacted: false, originalTokens: info.tokenCount };
}
工具策略
// 工具策略检查
async function checkToolPolicy(params) {
const { toolName, config, session } = params;
// 获取工具策略
const policy = resolveSandboxToolPolicyForAgent(
config,
session.agentId
);
// 检查是否允许
if (!isToolAllowed(policy, toolName)) {
return {
allowed: false,
reason: `Tool "${toolName}" is blocked by policy`,
};
}
return { allowed: true };
}
错误处理
// 错误处理和重试
async function handleError(params) {
const { error, sessionManager, retryCount } = params;
if (isContextOverflowError(error)) {
// 1. 上下文溢出,尝试压缩
const compactResult = await sessionManager.compact({
systemPrompt: buildSystemPrompt(),
reserveTokens: resolveCompactionReserveTokensFloor(),
});
if (compactResult.success && retryCount < MAX_RETRIES) {
return { action: "retry", reason: "compacted" };
}
}
if (isAuthError(error)) {
// 2. 认证错误,尝试切换
const failover = await attemptAuthFailover(sessionManager);
if (failover.success) {
return { action: "retry", reason: "auth_failover" };
}
}
if (isRateLimitError(error)) {
// 3. 速率限制,等待后重试
await sleep(getRetryDelay(error));
return { action: "retry", reason: "rate_limit" };
}
// 4. 其他错误
return { action: "fail", error: error.message };
}
更多推荐



所有评论(0)