会话压缩

文件: compact.ts

管理上下文窗口,避免超出限制。

export async function compactEmbeddedPiSession(
  params: CompactEmbeddedPiSessionParams,
): Promise<EmbeddedPiCompactResult> {
  const { sessionManager } = await prepareSessionManagerForRun({...});
  
  // 1. 检查是否需要压缩
  const needsCompaction = await sessionManager.needsCompaction();
  
  if (!needsCompaction) {
    return { ok: true, compacted: false };
  }
  
  // 2. 估算压缩后的令牌数
  const estimate = await sessionManager.compactionEstimate();
  
  // 3. 执行压缩
  const result = await sessionManager.compact({
    systemPrompt: buildSystemPrompt(),
    reserveTokens: resolveCompactionReserveTokensFloor(params.config),
  });
  
  if (result.success) {
    return {
      ok: true,
      compacted: true,
      originalTokens: estimate.originalTokens,
      compactedTokens: result.tokenCount,
    };
  }
  
  return {
    ok: false,
    compacted: false,
    reason: result.error,
  };
}

运行状态管理

文件: runs.ts

跟踪当前运行的会话状态。

// 活动运行映射
const ACTIVE_EMBEDDED_RUNS = new Map<string, EmbeddedPiQueueHandle>();

// 等待器映射
const EMBEDDED_RUN_WAITERS = new Map<string, Set<EmbeddedRunWaiter>>();

// 注册活动运行
export function setActiveEmbeddedRun(
  sessionId: string,
  handle: EmbeddedPiQueueHandle
) {
  ACTIVE_EMBEDDED_RUNS.set(sessionId, handle);
}

// 检查是否有活动运行
export function isEmbeddedPiRunActive(sessionId: string): boolean {
  return ACTIVE_EMBEDDED_RUNS.has(sessionId);
}

// 队列消息(用于流式响应)
export function queueEmbeddedPiMessage(sessionId: string, text: string): boolean {
  const handle = ACTIVE_EMBEDDED_RUNS.get(sessionId);
  if (!handle) return false;
  
  handle.queueMessage(text);
  return true;
}

// 中断运行
export function abortEmbeddedPiRun(sessionId: string): boolean {
  const handle = ACTIVE_EMBEDDED_RUNS.get(sessionId);
  if (!handle) return false;
  
  handle.abort();
  return true;
}

消息处理流程

消息接收

SessionManager RunParams Gateway SessionManager RunParams Gateway 接收消息和会话信息 创建/加载会话 会话就绪
// 从参数构建运行上下文
async function prepareRunContext(params) {
  // 1. 解析会话
  const sessionManager = await prepareSessionManagerForRun({
    sessionId: params.sessionId,
    sessionKey: params.sessionKey,
    workspaceDir: params.workspaceDir,
    config: params.config,
  });
  
  // 2. 加载引导文件
  const { bootstrapFiles, contextFiles } = await resolveBootstrapContextForRun({
    workspaceDir: params.workspaceDir,
    config: params.config,
    sessionKey: params.sessionKey,
  });
  
  return { sessionManager, bootstrapFiles, contextFiles };
}

提示词构建

构建步骤

Bootstrap 文件

系统提示词

Skills

上下文文件

最终提示词

拼接身份信息

添加引导规则

合并 Skills

添加时间/环境信息

// 构建系统提示词
function buildEmbeddedSystemPrompt(params) {
  const { sessionManager, config, workspaceDir, bootstrapFiles } = params;
  
  // 1. 基础身份
  const identity = buildIdentitySection();
  
  // 2. 引导文件内容
  const bootstrap = loadBootstrapFiles(bootstrapFiles);
  
  // 3. Skills 提示
  const skills = resolveSkillsPromptForRun({
    config,
    workspaceDir,
  });
  
  // 4. 工具描述
  const tools = buildToolDescriptions();
  
  // 5. 合并所有部分
  return [
    identity,
    bootstrap,
    skills,
    tools,
  ].join("\n\n---\n\n");
}

LLM 调用

Tools LLM SessionManager Tools LLM SessionManager alt [有工具调用] [无工具调用] complete(message, tools, thinking) streaming response execute(tool_call) tool result 继续对话 final response
// 执行 LLM 调用
async function callLLM(params) {
  const { sessionManager, model, tools, thinking } = params;
  
  const response = await sessionManager.complete({
    model,
    tools,
    thinking,
    
    // 流式回调
    onChunk: (chunk) => {
      // 处理流式响应
      handleStreamingChunk(chunk);
    },
    
    onComplete: (response) => {
      // 处理完成
      handleComplete(response);
    },
  });
  
  return response;
}

工具执行循环

等待输入

用户消息

LLM 响应

有工具调用?

继续 LLM

返回结果

需要压缩?

Idle

CallingLLM

ProcessingLLM

HasToolCalls

|是|

执行工具

|否|

生成回复

压缩上下文

// 主执行循环
async function executeLoop(params) {
  const { sessionManager, tools } = params;
  
  // 1. 发送用户消息
  await sessionManager.appendUserMessage(params.message);
  
  // 2. 循环直到没有工具调用
  while (true) {
    // 2.1 调用 LLM
    const response = await sessionManager.complete({
      model: params.modelId,
      tools,
      thinking: params.thinkLevel,
    });
    
    // 2.2 检查工具调用
    if (response.tool_calls?.length > 0) {
      // 2.3 执行所有工具调用
      for (const toolCall of response.tool_calls) {
        const result = await executeTool(toolCall);
        
        // 2.4 添加工具结果到会话
        await sessionManager.appendToolResult(
          toolCall.id,
          toolCall.name,
          result
        );
      }
      
      // 2.5 继续循环
      continue;
    }
    
    // 3. 没有工具调用,完成
    return {
      success: true,
      reply: response.content,
      usage: response.usage,
    };
  }
}

关键机制

流式响应

LLM 流式响应

接收 chunk

解析内容

增量更新回复

还有数据?

完成

// 流式响应处理
async function handleStreaming(params) {
  const { sessionManager, onChunk } = params;
  
  // 累积响应
  let accumulatedContent = "";
  
  await sessionManager.complete({
    model: params.modelId,
    tools: params.tools,
    thinking: params.thinking,
    
    // 流式回调
    onChunk: async (chunk) => {
      if (chunk.content) {
        accumulatedContent += chunk.content;
        
        // 发送增量更新
        onChunk?.({
          type: "content",
          content: chunk.content,
          fullContent: accumulatedContent,
        });
      }
      
      if (chunk.tool_use) {
        // 工具调用开始
        onChunk?.({
          type: "tool_call",
          tool: chunk.tool_use.name,
          id: chunk.tool_use.id,
        });
      }
    },
  });
}

上下文管理

新消息

添加到历史

超出窗口?

压缩

继续

LLM 调用

// 上下文窗口管理
async function manageContext(params) {
  const { sessionManager, config } = params;
  
  // 1. 检查当前上下文大小
  const info = await sessionManager.contextInfo();
  
  if (info.tokenCount > config.contextWindow * 0.9) {
    // 2. 需要压缩
    const result = await sessionManager.compact({
      systemPrompt: buildSystemPrompt(),
      reserveTokens: resolveCompactionReserveTokensFloor(config),
    });
    
    if (!result.success) {
      throw new Error("Compaction failed: " + result.error);
    }
    
    return { compacted: true, originalTokens: info.tokenCount };
  }
  
  return { compacted: false, originalTokens: info.tokenCount };
}

工具策略

工具调用请求

解析工具名称

在白名单?

拒绝调用

在黑名单?

执行工具

返回结果

// 工具策略检查
async function checkToolPolicy(params) {
  const { toolName, config, session } = params;
  
  // 获取工具策略
  const policy = resolveSandboxToolPolicyForAgent(
    config,
    session.agentId
  );
  
  // 检查是否允许
  if (!isToolAllowed(policy, toolName)) {
    return {
      allowed: false,
      reason: `Tool "${toolName}" is blocked by policy`,
    };
  }
  
  return { allowed: true };
}

错误处理

可恢复

上下文溢出

认证失败

其他

发生错误

错误类型?

重试

压缩重试

切换认证

返回错误

重试次数?

等待后重试

压缩上下文

使用备选密钥

// 错误处理和重试
async function handleError(params) {
  const { error, sessionManager, retryCount } = params;
  
  if (isContextOverflowError(error)) {
    // 1. 上下文溢出,尝试压缩
    const compactResult = await sessionManager.compact({
      systemPrompt: buildSystemPrompt(),
      reserveTokens: resolveCompactionReserveTokensFloor(),
    });
    
    if (compactResult.success && retryCount < MAX_RETRIES) {
      return { action: "retry", reason: "compacted" };
    }
  }
  
  if (isAuthError(error)) {
    // 2. 认证错误,尝试切换
    const failover = await attemptAuthFailover(sessionManager);
    if (failover.success) {
      return { action: "retry", reason: "auth_failover" };
    }
  }
  
  if (isRateLimitError(error)) {
    // 3. 速率限制,等待后重试
    await sleep(getRetryDelay(error));
    return { action: "retry", reason: "rate_limit" };
  }
  
  // 4. 其他错误
  return { action: "fail", error: error.message };
}

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐