rpc框架实现

整体流程(大概)

在这里插入图片描述
上面的图太模糊了,可以查看原图
以用户流式请求为例,先说一下大致的流程
首先用户输入数据后agent-communication\client\src\main.cpp中的getline接收输入数据,client.queryStream处理消息

int main(int argc, char* argv[]) {
    ...
    std::string line;
    while (g_running) {
        std::cout << "[" << context_id << (stream_mode ? "/流式" : "") << "] > ";
        std::cout.flush();
        //接收消息
        if (!std::getline(std::cin, line)) {
            // EOF 或输入错误,退出循环
            if (std::cin.eof()) {
                std::cout << "\n" << std::endl;
            }
            break;
        }
        
        // 检查是否收到退出信号
        if (!g_running) {
            break;
        }
        
        if (line == "/stream" || line == "/s") {
            stream_mode = !stream_mode;
            std::cout << "流式模式: " << (stream_mode ? "开启" : "关闭") << std::endl;
            continue;
        }
        
        
        ...
        
        // 发送 AI 查询
        std::cout << "\n思考中..." << std::endl;
        
        if (stream_mode) {
            // 流式查询
            std::cout << "\nAI: ";
            std::cout.flush();
            //客户端开始调用处理函数
            bool success = client.queryStream(line, 
                [](const agent_communication::AIStreamEvent& event) {
                    if (event.event_type() == "partial") {
                        std::cout << event.content();
                        std::cout.flush();
                    } else if (event.event_type() == "complete") {
                        std::cout << std::endl;
                    } else if (event.event_type() == "error") {
                        std::cout << "\n错误: " << event.content() << std::endl;
                    }
                }, context_id, timeout_seconds);
            
            if (!success) {
                std::cout << "\n流式查询失败" << std::endl;
            }
        } else ...
}

agent-communication\client\src\ai_query_client.cppAIQueryClient::queryStream封装request消息,将request请求发送给server,读取返回的数据,调用callback进行处理

bool AIQueryClient::queryStream(
    const std::string& question,
    StreamEventCallback callback,
    const std::string& context_id,
    int timeout_seconds) {
    //封装成request
    agent_communication::AIQueryRequest request;
    request.set_request_id(generateRequestId());
    request.set_question(question);
    request.set_context_id(context_id);
    request.set_timeout_seconds(timeout_seconds);
    
    return queryStream(request, callback);
}

bool AIQueryClient::queryStream(
    const agent_communication::AIQueryRequest& request,
    StreamEventCallback callback) {
    
    if (!connected_) {
        LOG_ERROR("AIQueryClient not connected");
        return false;
    }
    
    if (!callback) {
        LOG_ERROR("No callback provided for streaming query");
        return false;
    }
    
    auto start_time = std::chrono::steady_clock::now();
    
    grpc::ClientContext context;
    
    // Set deadline
    int timeout = request.timeout_seconds() > 0 ? request.timeout_seconds() : 60;
    auto deadline = std::chrono::system_clock::now() +
                   std::chrono::seconds(timeout);
    context.set_deadline(deadline);
    
    LOG_INFO("Starting streaming AI query: " + request.request_id());
    //发起gRPC的流式rpc调用并在客户端创建一个client对象
    std::unique_ptr<grpc::ClientReader<agent_communication::AIStreamEvent>> reader(
        stub_->QueryStream(&context, request));
    
    if (!reader) {
        LOG_ERROR("Failed to create stream reader");
        return false;
    }
    
    agent_communication::AIStreamEvent event;
    int event_count = 0;
    //gRPC C++ 的流式读取接口
    while (reader->Read(&event)) {
        event_count++;
        //读取数据后调用callback进行处理
        callback(event);
        
        // Check for completion or error
        if (event.event_type() == "complete" || event.event_type() == "error") {
            break;
        }
    }
    
    grpc::Status status = reader->Finish();
    
    auto end_time = std::chrono::steady_clock::now();
    auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
        end_time - start_time);
    
    if (status.ok()) {
        LOG_INFO("Streaming AI query completed: " + request.request_id() +
                " with " + std::to_string(event_count) + " events" +
                " in " + std::to_string(duration.count()) + "ms");
        return true;
    } else {
        LOG_ERROR("Streaming AI query failed: " + request.request_id() +
                 " - " + status.error_message());
        return false;
    }
}

server中的agent-communication\server\src\ai_query_service.cpp中的AIQueryServiceImpl::QueryStream接着进行处理,调用a2a_adapter_->processQueryStreaming接着处理

grpc::Status AIQueryServiceImpl::QueryStream(
    grpc::ServerContext* context,
    const agent_communication::AIQueryRequest* request,
    grpc::ServerWriter<agent_communication::AIStreamEvent>* writer) {
    
    ...
    bool success = true;
    std::string error_message;
    
    // Process streaming query
    a2a_adapter_->processQueryStreaming(*request,
        [&context, &writer, &success, &error_message](
            const agent_communication::AIStreamEvent& event) {
            
            // Check for cancellation
            if (context->IsCancelled()) {
                success = false;
                error_message = "Request cancelled";
                return;
            }
            
            // Write event to stream
            if (!writer->Write(event)) {
                success = false;
                error_message = "Failed to write stream event";
            }
        });
   ...
}

agent-communication\a2a_adapter\src\a2a_adapter.cpp中的A2AAdapter::processQueryStreaming将request请求传换成A2A格式,然后发送给a2a_client_

void A2AAdapter::processQueryStreaming(
    const agent_communication::AIQueryRequest& request,
    std::function<void(const agent_communication::AIStreamEvent&)> callback) {
    
    if (!initialized_ || !callback || !config_.enable_streaming) {
        return;
    }
    
    try {
        // Convert RPC request to A2A format
        a2a::MessageSendParams params = request_adapter_->convertToA2A(request);
        std::string context_id = params.context_id().value_or("");
        
        // Use streaming API
        // 注意:http_client按双换行符切分数据,每次回调收到完整的SSE事件
        a2a_client_->send_message_streaming(params, 
            [this, &callback, &context_id](const std::string& event_line) {
                // 跳过空行
                if (event_line.empty() || event_line == "\n" || event_line == "\r\n") {
                    return;
                }
                
                // 解析SSE格式: "data: {...}\n" 或 "data: {...}"
                std::string event_data = event_line;
                
                // 移除行尾换行符
                while (!event_data.empty() && 
                       (event_data.back() == '\n' || event_data.back() == '\r')) {
                    event_data.pop_back();
                }
                
                // 提取data:后面的内容
                const std::string data_prefix = "data: ";
                if (event_data.find(data_prefix) == 0) {
                    event_data = event_data.substr(data_prefix.length());
                }
                
                // 跳过空数据
                if (event_data.empty()) {
                    return;
                }
                
                // 解析 JSON 响应,捕获所有 JSON 异常(包括 UTF-8 错误)
                json j;
                try {
                    j = json::parse(event_data);
                } catch (const json::exception& e) {
                    // JSON 解析失败(包括 UTF-8 错误),跳过这个事件
                    return;
                }
                
                try {
                    // 检查是否有错误
                    if (j.contains("error")) {
                        agent_communication::AIStreamEvent event;
                        std::string error_msg = j["error"].value("message", "Unknown error");
                        response_adapter_->buildStreamEvent(
                            error_msg, context_id, "error", &event);
                        callback(event);
                        return;
                    }
                    
                    // 检查是否有结果
                    if (j.contains("result")) {
                        auto& result = j["result"];
                        std::string type = result.value("type", "");
                        
                        if (type == "chunk") {
                            // 流式内容块
                            std::string content = result.value("content", "");
                            agent_communication::AIStreamEvent event;
                            response_adapter_->buildStreamEvent(
                                content, context_id, "partial", &event);
                            callback(event);
                        } else if (type == "stream_start") {
                            // 流开始事件
                            agent_communication::AIStreamEvent event;
                            response_adapter_->buildStreamEvent(
                                "", context_id, "status", &event);
                            event.set_task_state("processing");
                            callback(event);
                        } else if (type == "stream_end") {
                            // 流结束事件 - 不在这里发送 complete,让外层处理
                        } else if (type == "intent") {
                            // 意图识别事件
                            agent_communication::AIStreamEvent event;
                            std::string intent = result.value("intent", "");
                            response_adapter_->buildStreamEvent(
                                "Intent: " + intent, context_id, "status", &event);
                            callback(event);
                        }
                    }
                } catch (const std::exception& e) {
                    // 处理结果时出错,跳过
                }
            });
        
        // Send completion event
        agent_communication::AIStreamEvent complete_event;
        response_adapter_->buildStreamEvent(
            "", context_id, "complete", &complete_event);
        callback(complete_event);
        
    } catch (const std::exception& e) {
        // Send error event
        agent_communication::AIStreamEvent error_event;
        response_adapter_->buildStreamEvent(
            e.what(), request.context_id(), "error", &error_event);
        callback(error_event);
    }
}

agent-communication\a2a\src\client\a2a_client.cpp中的A2AClient::send_message_streaming发送post_stream消息

void A2AClient::send_message_streaming(const MessageSendParams& params,
                                       std::function<void(const std::string&)> callback) {
    // Serialize params to JSON
    std::string params_json = params.to_json();
    
    // Create JSON-RPC request
    JsonRpcRequest request(generate_uuid(), A2AMethods::MESSAGE_STREAM, params_json);
    std::string request_json = request.to_json();
    
    // Send streaming POST request
    impl_->http_client_.post_stream(
        impl_->base_url_,
        request_json,
        "application/json",
        callback
    );
}

agent-communication\a2a\src\core\http_client.cpp中的HttpClient::post_stream就是封装的流式请求

void HttpClient::post_stream(const std::string& url,
                             const std::string& body,
                             const std::string& content_type,
                             std::function<void(const std::string&)> callback) {
    CURL* curl = curl_easy_init();
    if (!curl) {
        throw A2AException("Failed to initialize CURL", ErrorCode::InternalError);
    }
    
    // 创建流式处理上下文,处理UTF-8边界问题
    StreamContext ctx;
    ctx.callback = &callback;
    
    curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
    curl_easy_setopt(curl, CURLOPT_POST, 1L);
    curl_easy_setopt(curl, CURLOPT_POSTFIELDS, body.c_str());
    curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, body.length());
    curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, stream_callback);
    curl_easy_setopt(curl, CURLOPT_WRITEDATA, &ctx);  // 传递上下文而非直接传递callback
    
    // 流式请求:禁用总超时,使用低速超时
    // 如果60秒内没有收到任何数据才超时
    curl_easy_setopt(curl, CURLOPT_TIMEOUT, 0L);  // 禁用总超时
    curl_easy_setopt(curl, CURLOPT_LOW_SPEED_LIMIT, 1L);  // 最低速度 1 byte/s
    curl_easy_setopt(curl, CURLOPT_LOW_SPEED_TIME, 60L);  // 60秒内低于最低速度则超时
    
    // Set headers
    struct curl_slist* header_list = nullptr;
    header_list = curl_slist_append(header_list, ("Content-Type: " + content_type).c_str());
    header_list = curl_slist_append(header_list, "Accept: text/event-stream");
    
    for (const auto& [key, value] : impl_->headers_) {
        std::string header = key + ": " + value;
        header_list = curl_slist_append(header_list, header.c_str());
    }
    
    curl_easy_setopt(curl, CURLOPT_HTTPHEADER, header_list);
    
    CURLcode res = curl_easy_perform(curl);
    
    // 刷新剩余缓冲区
    ctx.flush();
    
    curl_slist_free_all(header_list);
    curl_easy_cleanup(curl);
    
    if (res != CURLE_OK) {
        throw A2AException(
            std::string("CURL error: ") + curl_easy_strerror(res),
            ErrorCode::InternalError
        );
    }
}

之后agent-communication\examples\ai_orchestrator\orchestrator_main.cpp中的start(int port)会启动agent-communication\a2a\include\a2a\examples\http_server.hppstart()会开始监听消息,收到消息后this->handle_client来进行处理

// 接受连接
        while (running_) {
            struct sockaddr_in client_addr;
            socklen_t client_len = sizeof(client_addr);
            // 程序运行到这行时,操作系统会让当前线程“睡觉”(挂起,Sleep)
            //client_fd 对应的 内核中的TCP 连接(也就是 Orchestrator HTTP Server 与客户端之间的连接)。
            //accept() 会返回一个 已经建立连接的 socket 描述符
            int client_fd = accept(server_fd, (struct sockaddr*)&client_addr, &client_len);
            if (client_fd < 0) {
                continue;
            }
            
            // 处理请求(在新线程中)
            std::thread([this, client_fd]() {
                this->handle_client(client_fd);
            }).detach();
        }

agent-communication\a2a\include\a2a\examples\http_server.hpp中的handle_client处理请求,找到对应请求的处理函数

void handle_client(int client_fd) {
        char buffer[8192] = {0};
        ssize_t bytes_read = read(client_fd, buffer, sizeof(buffer) - 1);
        
        if (bytes_read <= 0) {
            close(client_fd);
            return;
        }
        
        std::string request(buffer, bytes_read);
        
        // 解析 HTTP 请求
        std::istringstream request_stream(request);
        std::string method, path, version;
        request_stream >> method >> path >> version;
        
        // 提取请求体
        std::string body;
        size_t body_pos = request.find("\r\n\r\n");
        if (body_pos != std::string::npos) {
            body = request.substr(body_pos + 4);
        }
        
        // 检查是否需要流式响应(通过检查请求体中的 method)
        bool is_stream_request = false;
        if (!body.empty()) {
            // 简单检查是否包含 message/stream 方法
            is_stream_request = (body.find("\"message/stream\"") != std::string::npos);
        }
        
        // 优先检查流式处理器
        auto stream_it = stream_handlers_.find(path);
        if (is_stream_request && stream_it != stream_handlers_.end()) {
            handle_stream_request(client_fd, body, stream_it->second);
            return;
        }

        // 查找普通处理器
        std::string response_body;
        int status_code = 200;
        
        auto it = handlers_.find(path);
        if (it != handlers_.end()) {
            try {
                response_body = it->second(body);
            } catch (const std::exception& e) {
                status_code = 500;
                response_body = std::string("{\"error\":\"") + e.what() + "\"}";
            }
        } else {
            status_code = 404;
            response_body = "{\"error\":\"Not Found\"}";
        }
        
        // 构造 HTTP 响应
        std::ostringstream response;
        response << "HTTP/1.1 " << status_code << " OK\r\n";
        response << "Content-Type: application/json\r\n";
        response << "Content-Length: " << response_body.length() << "\r\n";
        response << "Access-Control-Allow-Origin: *\r\n";
        response << "\r\n";
        response << response_body;
        
        std::string response_str = response.str();
        write(client_fd, response_str.c_str(), response_str.length());
        
        close(client_fd);
    }

handle_stream_request设置响应头,调用处理器handler

void handle_stream_request(int client_fd, const std::string& body, StreamHandler& handler) {
        // 发送 SSE 响应头
        std::ostringstream header;
        header << "HTTP/1.1 200 OK\r\n";
        header << "Content-Type: text/event-stream\r\n";
        header << "Cache-Control: no-cache\r\n";
        header << "Connection: keep-alive\r\n";
        header << "Access-Control-Allow-Origin: *\r\n";
        header << "\r\n";
        
        std::string header_str = header.str();
        if (write(client_fd, header_str.c_str(), header_str.length()) < 0) {
            close(client_fd);
            return;
        }
        
        // 调用流式处理器,传入写入回调
        try {
            handler(body, [client_fd](const std::string& event_data) -> bool {
                // 格式化为 SSE 事件
                std::string sse_event = "data: " + event_data + "\n\n";
                ssize_t written = write(client_fd, sse_event.c_str(), sse_event.length());
                return written > 0;
            });
        } catch (const std::exception& e) {
            // 发送错误事件
            std::string error_event = "data: {\"error\":\"" + std::string(e.what()) + "\"}\n\n";
            write(client_fd, error_event.c_str(), error_event.length());
        }
        
        close(client_fd);
    }

agent-communication\examples\ai_orchestrator\orchestrator_main.cpp中的handle_stream_request处理流式请求 (message/stream)

void handle_stream_request(const std::string& body, 
                               std::function<bool(const std::string&)> write_callback) {
        try {
            auto request_json = json::parse(body);
            auto request = JsonRpcRequest::from_json(body);
            
            if (request.method() != "message/stream") {
                // 非流式方法,返回错误
                json error_response = {
                    {"jsonrpc", "2.0"},
                    {"id", request.id()},
                    {"error", {
                        {"code", -32601},
                        {"message", "Method not found for streaming"}
                    }}
                };
                write_callback(error_response.dump());
                return;
            }
            
            auto params_json = request_json["params"];
            auto message = AgentMessage::from_json(params_json["message"].dump());
            
            // 获取文本内容
            std::string user_text;
            if (!message.parts().empty()) {
                auto text_part = dynamic_cast<TextPart*>(message.parts()[0].get());
                if (text_part) {
                    user_text = text_part->text();
                }
            }
            
            std::string context_id = message.context_id().value_or("default");
            
            std::cout << "[Orchestrator] 收到流式消息: " << user_text << std::endl;
            
            // 保存用户消息
            save_message(context_id, message);
            
            // 发送开始事件
            json start_event = {
                {"jsonrpc", "2.0"},
                {"id", request.id()},
                {"result", {
                    {"type", "stream_start"},
                    {"contextId", context_id}
                }}
            };
            write_callback(start_event.dump());
            
            // 识别意图
            std::string intent = analyze_intent(user_text);
            std::cout << "[Orchestrator] 识别意图: " << intent << std::endl;
            
            // 发送意图识别事件
            json intent_event = {
                {"jsonrpc", "2.0"},
                {"id", request.id()},
                {"result", {
                    {"type", "intent"},
                    {"intent", intent}
                }}
            };
            write_callback(intent_event.dump());
            
            // 处理查询并流式返回
            std::string response_text;
            
            if (intent == "math") {
                response_text = call_math_agent(user_text, context_id);
            } else if (intent == "code") {
                response_text = call_code_agent(user_text, context_id);
            } else {
                response_text = handle_general_query(user_text, context_id);
            }
            
            // UTF-8 安全的分块函数
            auto utf8_safe_chunk = [](const std::string& text, size_t start, size_t max_len) -> std::string {
                if (start >= text.length()) return "";
                
                size_t end = std::min(start + max_len, text.length());
                
                // 确保不在 UTF-8 多字节字符中间切断
                while (end > start && end < text.length()) {
                    unsigned char c = static_cast<unsigned char>(text[end]);
                    // 如果是 UTF-8 后续字节 (10xxxxxx),向前移动
                    if ((c & 0xC0) == 0x80) {
                        end--;
                    } else {
                        break;
                    }
                }
                
                return text.substr(start, end - start);
            };
            
            // 流式输出:UTF-8 安全分块
            const size_t chunk_size = 50;
            size_t pos = 0;
            while (pos < response_text.length()) {
                std::string chunk = utf8_safe_chunk(response_text, pos, chunk_size);
                if (chunk.empty()) break;
                
                pos += chunk.length();
                
                json chunk_event = {
                    {"jsonrpc", "2.0"},
                    {"id", request.id()},
                    {"result", {
                        {"type", "chunk"},
                        {"content", chunk}
                    }}
                };
                
                if (!write_callback(chunk_event.dump())) {
                    std::cerr << "[Orchestrator] 流式写入失败" << std::endl;
                    return;
                }
                
                // 小延迟模拟流式效果
                std::this_thread::sleep_for(std::chrono::milliseconds(10));
            }
            
            // 保存 Agent 响应
            auto response_msg = AgentMessage::create()
                .with_role(MessageRole::Agent)
                .with_context_id(context_id);
            response_msg.add_text_part(response_text);
            save_message(context_id, response_msg);
            
            // 发送完成事件
            json complete_event = {
                {"jsonrpc", "2.0"},
                {"id", request.id()},
                {"result", {
                    {"type", "stream_end"},
                    {"message", response_msg.to_json()}
                }}
            };
            write_callback(complete_event.dump());
            
            std::cout << "[Orchestrator] 流式响应完成" << std::endl;
            
        } catch (const std::exception& e) {
            std::cerr << "[Orchestrator] 流式处理错误: " << e.what() << std::endl;
            json error_event = {
                {"jsonrpc", "2.0"},
                {"id", "1"},
                {"error", {
                    {"code", -32603},
                    {"message", e.what()}
                }}
            };
            write_callback(error_event.dump());
        }
    }

根据意图识别选择对应的工作agent,比如是code agent

std::string call_code_agent(const std::string& query, const std::string& context_id) {
        return call_agent_by_tag("code", query, context_id);
    }

call_agent_by_tag查找指定的agent并发送请求

std::string call_agent_by_tag(const std::string& tag, 
                                   const std::string& query, 
                                   const std::string& context_id) {
        try {
            // 从注册中心查找 Agent
            std::string agent_url = registry_client_.select_agent_by_tag(tag);
            
            std::cout << "[Orchestrator] 调用 " << tag << " Agent: " << agent_url << std::endl;
            
            // 构造请求
            json request = {
                {"jsonrpc", "2.0"},
                {"id", "1"},
                {"method", "message/send"},
                {"params", {
                    {"message", {
                        {"role", "user"},
                        {"contextId", context_id},
                        {"parts", {{{"kind", "text"}, {"text", query}}}}
                    }},
                    {"historyLength", 5}
                }}
            };
            
            // 发送请求
            std::string response_body = SimpleHttpClient::post(agent_url, request.dump());
            auto response_json = json::parse(response_body);
            
            if (response_json.contains("result") &&
                response_json["result"].contains("parts") &&
                !response_json["result"]["parts"].empty()) {
                return response_json["result"]["parts"][0]["text"].get<std::string>();
            }
            
            return "无法解析响应";
            
        } catch (const std::exception& e) {
            std::cerr << "[Orchestrator] 调用 " << tag << " Agent 失败: " << e.what() << std::endl;
            return "抱歉," + tag + " 服务暂时不可用,使用通用模型回答";
        }
    }

这样流式请求的链路就通了

四种通讯模式

简单rpc时序图
服务端流式rpc时序图

心跳机制

心跳机制时序图
客户端实现
1.启动心跳进程

// 文件: src/rpc_client.cpp:642-650
void RpcClient::startHeartbeat() {
    if (heartbeat_running_) {
        return;  // 防止重复启动
    }
    
    heartbeat_running_ = true;
    heartbeat_thread_ = std::thread([this]() {
        heartbeatLoop();  // 在独立线程中运行心跳循环
    });
}

2.心跳循环主题

// 文件: src/rpc_client.cpp:662-677
void RpcClient::heartbeatLoop() {
    while (heartbeat_running_) {
        if (connected_ && !current_agent_id_.empty()) {
            // 发送心跳
            if (!sendHeartbeat(current_agent_id_, current_agent_info_)) {
                // ❌ 心跳失败处理
                LOG_WARN("Heartbeat failed, attempting reconnection");
                connected_ = false;
                
                if (!reconnect()) {
                    LOG_ERROR("Failed to reconnect, stopping heartbeat");
                    break;  // 重连失败,停止心跳
                }
            }
        }
        
        // 等待下一次心跳间隔(默认30秒)
        std::this_thread::sleep_for(std::chrono::seconds(config_.heartbeat_interval));
    }
}

3.发送心跳请求

// 文件: src/rpc_client.cpp:420-455
bool RpcClient::sendHeartbeat(const std::string& agent_id,
                             const ServiceEndpoint& agent_info) {
    if (!connected_) {
        return false;
    }
    
    try {
        // 构建心跳请求
        agent_communication::HeartbeatRequest request;
        request.set_agent_id(agent_id);
        auto* info = request.mutable_agent_info();
        info->set_host(agent_info.host);
        info->set_port(agent_info.port);
        info->set_service_name(agent_info.service_name);
        info->set_version(agent_info.version);
        
        agent_communication::HeartbeatResponse response;
        grpc::ClientContext context;
        
        // 设置超时(10秒)
        auto deadline = std::chrono::system_clock::now() + std::chrono::seconds(10);
        context.set_deadline(deadline);
        
        // 发送 gRPC 请求
        grpc::Status status = stub_->Heartbeat(&context, request, &response);
        
        // 检查结果
        if (status.ok() && response.status().code() == 0) {
            return true;
        } else {
            LOG_WARN("Heartbeat failed: " + 
                (status.ok() ? response.status().message() : status.error_message()));
            return false;
        }
        
    } catch (const std::exception& e) {
        LOG_WARN("Heartbeat error: " + std::string(e.what()));
        return false;
    }
}

服务端实现
1.处理心跳请求

// 文件: src/rpc_server.cpp:404-429
grpc::Status AgentCommunicationServiceImpl::Heartbeat(
    grpc::ServerContext* context,
    const agent_communication::HeartbeatRequest* request,
    agent_communication::HeartbeatResponse* response) {
    
    try {
        // 记录指标
        auto metrics = Metrics::getInstance();
        metrics->recordRpcRequest("AgentCommunicationService", "Heartbeat", 0);
        
        // 更新 Agent 心跳时间
        updateAgentHeartbeat(request->agent_id());
        
        // 返回成功响应
        response->mutable_status()->set_code(0);
        response->mutable_status()->set_message("Success");
        response->set_server_time(/* 当前时间戳 */);
        
        return grpc::Status::OK;
        
    } catch (const std::exception& e) {
        // 服务器错误
        LOG_ERROR("Error in Heartbeat: " + std::string(e.what()));
        response->mutable_status()->set_code(2);  // 错误码2:服务器内部错误
        response->mutable_status()->set_message("Internal server error");
        
        metrics->recordRpcError("AgentCommunicationService", "Heartbeat", "InternalError");
        return grpc::Status::OK;
    }
}

2.更新心跳时间戳

// 文件: src/rpc_server.cpp:105-111
void AgentCommunicationServiceImpl::updateAgentHeartbeat(const std::string& agent_id) {
    std::lock_guard<std::mutex> lock(agents_mutex_);  // 线程安全
    auto it = agents_.find(agent_id);
    if (it != agents_.end()) {
        it->second.last_heartbeat = std::chrono::steady_clock::now();
    }
}

3.清理离线 Agent

// 文件: src/rpc_server.cpp:113-128
void AgentCommunicationServiceImpl::cleanupOfflineAgents() {
    std::lock_guard<std::mutex> lock(agents_mutex_);
    auto now = std::chrono::steady_clock::now();
    auto timeout = std::chrono::seconds(60);  // 60秒超时
    
    auto it = agents_.begin();
    while (it != agents_.end()) {
        if (now - it->second.last_heartbeat > timeout) {
            // Agent 超时,清理资源
            LOG_WARN("Agent offline, removing: " + it->first);
            agent_message_queues_.erase(it->first);  // 清理消息队列
            it = agents_.erase(it);                   // 移除 Agent
        } else {
            ++it;
        }
    }
}

重连机制

// 文件: client/src/rpc_client.cpp:234-255
bool RpcClient::reconnect() {
    // 检查重试次数上限
    if (connection_retry_count_ >= MAX_RETRY_COUNT) {
        LOG_ERROR("Max reconnection attempts reached");
        return false;
    }
    
    // 指数退避:第N次重连等待 N * RETRY_DELAY_MS 毫秒
    std::this_thread::sleep_for(
        std::chrono::milliseconds(RETRY_DELAY_MS * (connection_retry_count_ + 1)));
    
    try {
        setupChannel();                              // 重新建立 gRPC 通道
        connected_ = true;
        connection_retry_count_ = 0;                 // 重置计数器
        last_connection_time_ = std::chrono::steady_clock::now();
        
        LOG_INFO("Reconnected to server successfully");
        return true;
    } catch (const std::exception& e) {
        connection_retry_count_++;                   // 增加重试计数
        LOG_ERROR("Reconnection failed: " + std::string(e.what()));
        return false;
    }
}

监控与指标

架构总览

┌─────────────────────────────────────────────────────────────────────────────┐
│                          性能监控系统架构                                     │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│   ┌─────────────────────────────────────────────────────────────────────┐   │
│   │                        应用层(埋点)                                 │   │
│   │  ┌───────────┐  ┌───────────┐  ┌───────────┐  ┌───────────┐        │   │
│   │  │rpc_server │  │rpc_client │  │ai_query   │  │a2a_adapter│        │   │
│   │  │    .cpp   │  │    .cpp   │  │service.cpp│  │    .cpp   │        │   │
│   │  └─────┬─────┘  └─────┬─────┘  └─────┬─────┘  └─────┬─────┘        │   │
│   └────────┼──────────────┼──────────────┼──────────────┼──────────────┘   │
│            │              │              │              │                   │
│            ↓              ↓              ↓              ↓                   │
│   ┌─────────────────────────────────────────────────────────────────────┐   │
│   │                     Metrics 管理器(单例)                           │   │
│   │  ┌────────────────────────────────────────────────────────────────┐ │   │
│   │  │ recordRpcRequest() | recordConnection() | recordMessage() | ...│ │   │
│   │  └────────────────────────────────────────────────────────────────┘ │   │
│   └─────────────────────────────┬───────────────────────────────────────┘   │
│                                 │                                           │
│                                 ↓                                           │
│   ┌─────────────────────────────────────────────────────────────────────┐   │
│   │                     MetricsCollector(指标收集器)                    │   │
│   │  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐            │   │
│   │  │ Counter  │  │  Gauge   │  │Histogram │  │ Summary  │            │   │
│   │  │  计数器  │  │ 仪表盘   │  │  直方图  │  │   摘要   │            │   │
│   │  └──────────┘  └──────────┘  └──────────┘  └──────────┘            │   │
│   └─────────────────────────────┬───────────────────────────────────────┘   │
│                                 │                                           │
│                                 ↓                                           │
│   ┌─────────────────────────────────────────────────────────────────────┐   │
│   │                         导出层                                       │   │
│   │      exportPrometheus()           exportJson()                      │   │
│   │           │                            │                            │   │
│   │           ↓                            ↓                            │   │
│   │    ┌────────────┐              ┌────────────┐                       │   │
│   │    │ Prometheus │              │ Grafana/   │                       │   │
│   │    │   Server   │              │ Dashboard  │                       │   │
│   │    └────────────┘              └────────────┘                       │   │
│   └─────────────────────────────────────────────────────────────────────┘   │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘
四种指标类型详解

Counter(计数器)
特点:只能增加,不能减少,重启后归零

// 文件: include/agent_rpc/metrics.h:48-72
class CounterMetric : public Metric {
public:
    void increment(double value = 1.0);  // 增加
    void set(double value);               // 设置(只能比当前值大)
    double get() const;                   // 获取当前值
    void reset();                         // 重置
    
private:
    std::atomic<double> value_{0.0};      // 原子变量,线程安全
};

使用场景

  • RPC 请求总数
  • 错误发生次数
  • 消息发送/接收总数

Gauge(仪表盘)
特点:可增可减,表示瞬时值

// 文件: include/agent_rpc/metrics.h:74-99
class GaugeMetric : public Metric {
public:
    void set(double value);               // 设置
    void increment(double value = 1.0);   // 增加
    void decrement(double value = 1.0);   // 减少
    double get() const;                   // 获取当前值
    
private:
    std::atomic<double> value_{0.0};
};

使用场景

  • 当前活跃连接数
  • 内存使用量
  • CPU 使用率

Histogram(直方图)
特点:统计数据分布,按桶(bucket)分组

// 文件: include/agent_rpc/metrics.h:101-130
class HistogramMetric : public Metric {
public:
    // 默认桶: 0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0, 7.5, 10.0, ∞
    HistogramMetric(const std::string& name, 
                    const std::vector<double>& buckets = {...});
    
    void observe(double value);           // 记录一个观测值
    double getSum() const;                // 所有观测值的总和
    uint64_t getCount() const;            // 观测次数
    std::vector<HistogramBucket> getBuckets() const;  // 获取桶分布
    
private:
    std::vector<HistogramBucket> buckets_;
    std::atomic<double> sum_{0.0};
    std::atomic<uint64_t> count_{0};
};

使用场景

  • RPC 请求延迟分布
  • 消息大小分布

Summary(摘要)
特点:计算分位数(P50, P90, P95, P99)

// 文件: include/agent_rpc/metrics.h:132-165
class SummaryMetric : public Metric {
public:
    // 默认分位数: 0.5, 0.9, 0.95, 0.99
    SummaryMetric(const std::string& name, 
                  const std::vector<double>& quantiles = {0.5, 0.9, 0.95, 0.99});
    
    void observe(double value);
    std::map<double, double> getQuantiles() const;  // 获取各分位数值
    
private:
    std::vector<double> values_;          // 存储所有观测值
    std::map<double, double> quantile_values_;  // 分位数结果
};

使用场景

  • 请求延迟的 P99 值
  • 精确的性能 SLA 监控
四类监控指标的具体实现

RPC 请求指标

// 文件: include/agent_rpc/metrics.h:225-228
class Metrics {
    // RPC相关指标
    void recordRpcRequest(const std::string& service, const std::string& method, double duration_ms);
    void recordRpcResponse(const std::string& service, const std::string& method, int status_code);
    void recordRpcError(const std::string& service, const std::string& method, const std::string& error_type);
    
private:
    std::shared_ptr<CounterMetric> rpc_request_counter_;      // 请求计数器
    std::shared_ptr<HistogramMetric> rpc_duration_histogram_; // 延迟直方图
    std::shared_ptr<CounterMetric> rpc_error_counter_;        // 错误计数器
};

//实现细节
// 文件: src/metrics.cpp:473-489
void Metrics::recordRpcRequest(const std::string& service, 
                               const std::string& method, 
                               double duration_ms) {
    if (!rpc_request_counter_) {
        initializeDefaultMetrics();
    }
    
    // 1. 增加请求计数
    rpc_request_counter_->increment();
    
    // 2. 记录延迟分布(转换为秒)
    if (rpc_duration_histogram_) {
        rpc_duration_histogram_->observe(duration_ms / 1000.0);
    }
}

连接指标

// 文件: include/agent_rpc/metrics.h:230-232
void recordConnection(const std::string& service, bool success);
void recordDisconnection(const std::string& service);

private:
    std::shared_ptr<GaugeMetric> active_connections_gauge_;  // 活跃连接数

//实现细节
// 文件: src/metrics.cpp:503-518
void Metrics::recordConnection(const std::string& service, bool success) {
    if (success) {
        // 连接成功,活跃连接数 +1
        if (active_connections_gauge_) {
            active_connections_gauge_->increment();
        }
    }
}

void Metrics::recordDisconnection(const std::string& service) {
    // 断开连接,活跃连接数 -1
    if (active_connections_gauge_) {
        active_connections_gauge_->decrement();
    }
}

消息指标

// 文件: include/agent_rpc/metrics.h:234-236
void recordMessageSent(const std::string& message_type, size_t size);
void recordMessageReceived(const std::string& message_type, size_t size);

private:
    std::shared_ptr<CounterMetric> message_counter_;  // 消息计数器

//实现细节
// 文件: src/metrics.cpp:519-535
void Metrics::recordMessageSent(const std::string& message_type, size_t size) {
    if (!message_counter_) {
        initializeDefaultMetrics();
    }
    
    // 记录发送消息数量
    message_counter_->increment();
    
    // 可以额外记录消息大小到直方图
}

void Metrics::recordMessageReceived(const std::string& message_type, size_t size) {
    if (!message_counter_) {
        initializeDefaultMetrics();
    }
    message_counter_->increment();
}

系统资源指标

// 文件: include/agent_rpc/metrics.h:238-240
void recordMemoryUsage(size_t bytes);
void recordCpuUsage(double percentage);

private:
    std::shared_ptr<GaugeMetric> memory_usage_gauge_;  // 内存使用
    std::shared_ptr<GaugeMetric> cpu_usage_gauge_;     // CPU 使用
//实现细节
// 文件: src/metrics.cpp:537-553
void Metrics::recordMemoryUsage(size_t bytes) {
    if (!memory_usage_gauge_) {
        initializeDefaultMetrics();
    }
    memory_usage_gauge_->set(static_cast<double>(bytes));
}

void Metrics::recordCpuUsage(double percentage) {
    if (!cpu_usage_gauge_) {
        initializeDefaultMetrics();
    }
    cpu_usage_gauge_->set(percentage);
}
项目中的实际使用场景

场景 1:服务端 RPC 方法监控

// 文件: src/rpc_server.cpp:131-186
grpc::Status AgentCommunicationServiceImpl::SendMessage(...) {
    try {
        // 1️⃣ 方法开始:记录请求
        auto metrics = Metrics::getInstance();
        metrics->recordRpcRequest("AgentCommunicationService", "SendMessage", 0);
        
        auto start_time = std::chrono::steady_clock::now();
        
        // ... 业务逻辑 ...
        
        // 2️⃣ 方法结束:记录耗时
        auto end_time = std::chrono::steady_clock::now();
        auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
            end_time - start_time);
        metrics->recordRpcRequest("AgentCommunicationService", "SendMessage", 
                                  duration.count());
        
        return grpc::Status::OK;
        
    } catch (const std::exception& e) {
        // 3️⃣ 异常时:记录错误
        auto metrics = Metrics::getInstance();
        metrics->recordRpcError("AgentCommunicationService", "SendMessage", 
                               "InternalError");
        return grpc::Status(grpc::StatusCode::INTERNAL, e.what());
    }
}

场景 2:客户端 RPC 调用监控

// 文件: src/rpc_client.cpp:71-113
bool RpcClient::sendMessage(const std::string& message, ...) {
    try {
        auto metrics = Metrics::getInstance();
        auto start_time = std::chrono::steady_clock::now();
        
        // ... gRPC 调用 ...
        grpc::Status status = stub_->SendMessage(&context, request, &response);
        
        auto end_time = std::chrono::steady_clock::now();
        auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
            end_time - start_time);
        
        if (status.ok()) {
            // ✅ 成功:记录请求和响应
            metrics->recordRpcRequest("AgentCommunicationService", "SendMessage", 
                                      duration.count());
            metrics->recordRpcResponse("AgentCommunicationService", "SendMessage", 
                                       response.status().code());
            return true;
        } else {
            // ❌ gRPC 错误:记录错误类型
            metrics->recordRpcError("AgentCommunicationService", "SendMessage", 
                                   "GrpcError");
            return false;
        }
    } catch (const std::exception& e) {
        metrics->recordRpcError("AgentCommunicationService", "SendMessage", 
                               "Exception");
        return false;
    }
}

场景 3:AI 查询服务监控

// 文件: server/src/ai_query_service.cpp:229-242
void AIQueryServiceImpl::recordMetrics(const std::string& method, 
                                       int64_t duration_ms, 
                                       bool success) {
    auto& metrics = common::Metrics::getInstance();
    
    // 记录请求
    metrics.recordRpcRequest("AIQueryService", method, duration_ms);
    
    if (success) {
        // 成功
        metrics.recordRpcResponse("AIQueryService", method, 0);
    } else {
        // 失败
        metrics.recordRpcError("AIQueryService", method, "Error");
    }
}

场景 4:A2A 适配器专用指标

// 文件: a2a_adapter/include/agent_rpc/a2a_adapter/a2a_metrics.h
class A2AMetrics {
    // 查询指标
    void recordQueryRequest(const std::string& agent_id, bool streaming);
    void recordQueryComplete(const std::string& agent_id, int64_t duration_ms, bool success);
    void recordQueryError(const std::string& agent_id, const std::string& error_type);
    
    // 任务指标
    void recordTaskCreated(const std::string& task_id);
    void recordTaskStateChange(const std::string& task_id, const std::string& from, const std::string& to);
    void recordTaskComplete(const std::string& task_id, int64_t duration_ms, bool success);
    
    // Agent 指标
    void recordAgentRegistered(const std::string& agent_id);
    void recordAgentUnregistered(const std::string& agent_id);
    void recordAgentHealthCheck(const std::string& agent_id, bool healthy);
    void recordAgentRouting(const std::string& from, const std::string& to, const std::string& skill);
    
    // 连接指标
    void recordConnectionAttempt(const std::string& target_url, bool success);
    void recordMessageSent(const std::string& message_type, size_t size_bytes);
    void recordMessageReceived(const std::string& message_type, size_t size_bytes);
    
private:
    // 原子计数器
    std::atomic<uint64_t> total_queries_{0};
    std::atomic<uint64_t> successful_queries_{0};
    std::atomic<uint64_t> failed_queries_{0};
    std::atomic<uint64_t> streaming_queries_{0};
    std::atomic<uint64_t> total_query_latency_ms_{0};
    // ... 更多计数器
};

Prometheus 格式

// 文件: src/metrics.cpp:583-585
std::string Metrics::exportPrometheus() const {
    return MetricsCollector::getInstance().exportPrometheus();
}
//输出格式
# HELP rpc_requests_total Total number of RPC requests
# TYPE rpc_requests_total counter
rpc_requests_total{service="AgentCommunicationService",method="SendMessage"} 1523

# HELP rpc_request_duration_seconds RPC request duration in seconds
# TYPE rpc_request_duration_seconds histogram
rpc_request_duration_seconds_bucket{le="0.005"} 100
rpc_request_duration_seconds_bucket{le="0.01"} 250
rpc_request_duration_seconds_bucket{le="0.025"} 500
rpc_request_duration_seconds_bucket{le="0.05"} 800
rpc_request_duration_seconds_bucket{le="+Inf"} 1523
rpc_request_duration_seconds_sum 45.67
rpc_request_duration_seconds_count 1523

# HELP active_connections Current active connections
# TYPE active_connections gauge
active_connections{service="AgentCommunicationService"} 15

# HELP memory_usage_bytes Current memory usage in bytes
# TYPE memory_usage_bytes gauge
memory_usage_bytes 134217728

JSON 格式

// 文件: src/metrics.cpp:587-589
std::string Metrics::exportJson() const {
    return MetricsCollector::getInstance().exportJson();
}
//输出示例
{
  "metrics": [
    {
      "name": "rpc_requests_total",
      "type": "counter",
      "value": 1523,
      "labels": {
        "service": "AgentCommunicationService",
        "method": "SendMessage"
      }
    },
    {
      "name": "active_connections",
      "type": "gauge",
      "value": 15,
      "labels": {
        "service": "AgentCommunicationService"
      }
    },
    {
      "name": "memory_usage_bytes",
      "type": "gauge",
      "value": 134217728
    }
  ]
}

时序图

熔断机制开发

加入熔断机制的好处

1.快速失败,节省资源

// 无熔断:每次请求都要等超时
auto response = client.query(question);  // 等 30 秒...

// 有熔断:直接快速失败
if (circuit_breaker->getState() == CircuitState::OPEN) {
    return "服务暂不可用";  // 立即返回
}

2. 防止级联故障(雪崩效应)

无熔断时的雪崩:
┌────────┐    ┌────────┐    ┌────────┐    ┌────────┐
│ 用户端 │ → │ 网关   │ → │ 服务A  │ → │ 服务B  │ (宕机)
└────────┘    └────────┘    └────────┘    └────────┘
     ↑             ↑             ↑
  线程阻塞      线程阻塞      线程阻塞
  资源耗尽      资源耗尽      资源耗尽
     └─────────────┴─────────────┴── 全部崩溃!

有熔断时:
┌────────┐    ┌────────┐    ┌────────┐    ┌─────────┐
│ 用户端 │ → │ 网关   │ → │ 服务A  │ ✕ │ 熔断器  │ → 服务B(宕机)
└────────┘    └────────┘    └────────┘    └─────────┘
     ↑                            │
   正常返回                    直接拒绝
   "服务暂时不可用"             不再发送请求

3. 自动恢复能力

// 熔断器会自动尝试恢复
// 60秒后进入 HALF_OPEN 状态,放行少量请求测试
// 如果服务恢复正常,自动切回 CLOSED 状态

4. 可观测性

auto stats = circuit_breaker->getStats();
// 可以监控:
// - 总请求数、成功数、失败数
// - 被拒绝的请求数
// - 当前失败率
// - 熔断器状态
代码对应以及适用场景

熔断器头文件: -agent-communication/include/agent_rpc/circuit_breaker.h

1. 熔断器状态 (L9-14):

enum class CircuitState {
    CLOSED,     // 关闭状态 - 正常请求
    OPEN,       // 开启状态 - 熔断,拒绝请求
    HALF_OPEN   // 半开状态 - 尝试恢复
};

2. 熔断器配置 (L17-24):

struct CircuitBreakerConfig {
    int failure_threshold = 5;           // 失败阈值
    int success_threshold = 3;           // 半开状态下的成功阈值
    std::chrono::milliseconds timeout = std::chrono::milliseconds(60000);
    std::chrono::milliseconds half_open_timeout = std::chrono::milliseconds(30000);
    double failure_rate_threshold = 0.5; // 失败率阈值
    int min_request_count = 10;          // 最小请求数量
};

3. 熔断器实现: -agent-communication/src/circuit_breaker.cpp

方法 功能 代码行
execute() 执行请求,自动记录成功/失败 L15-29
recordSuccess() 记录成功,半开转关闭 L34-48
recordFailure() 记录失败,超阈值转开启 L50-63
isRequestAllowed() 判断是否允许请求 L65-87
transitionToOpen() 转换到开启状态 L109-115
transitionToHalfOpen() 转换到半开状态 L117-128
transitionToClosed() 转换到关闭状态 L130-140
updateFailureRate() 计算失败率 L142-151
shouldAttemptReset() 判断是否尝试恢复 L153-158

4. 熔断器管理器 (L160-221):

class CircuitBreakerManager {
    std::shared_ptr<CircuitBreaker> getCircuitBreaker(const std::string& service_name);
    void removeCircuitBreaker(const std::string& service_name);
    std::map<std::string, CircuitState> getAllStates() const;
    void resetAll();
    void updateConfig(const std::string& service_name, const CircuitBreakerConfig& config);
};

适用场景
路径:-agent-communication/examples/ai_query_client.cpp

/**
 * @file ai_query_client.cpp
 * @brief AI Query RPC Client - 通过 gRPC 发送 AI 查询请求
 * 
 * 这是项目的核心测试用例:
 * - rpc_client 通过 gRPC 连接 rpc_server
 * - rpc_server 通过 A2A 协议调用 Orchestrator
 * - Orchestrator 路由到各个专业 Agent (Math Agent 等)
 */

#include "agent_rpc/client/ai_query_client.h"
#include "agent_rpc/common/rpc_framework.h"
#include <iostream>
#include <signal.h>
#include <string>

// ============================================================================
// [熔断机制集成示例 - 可选]
// 如需启用熔断保护,取消以下注释:
// #include "agent_rpc/circuit_breaker.h"
// 或使用 common 模块:
// #include "agent_rpc/common/circuit_breaker.h"
// ============================================================================

using namespace agent_rpc::client;
using namespace agent_rpc::common;

// ============================================================================
// [熔断机制命名空间 - 可选]
// 如需使用熔断器,取消以下注释:
// using agent_rpc::CircuitBreaker;
// using agent_rpc::CircuitBreakerManager;
// using agent_rpc::CircuitBreakerConfig;
// using agent_rpc::CircuitState;
// ============================================================================

// 全局变量用于优雅关闭
std::atomic<bool> g_running{true};

void signalHandler(int signal) {
    std::cout << "\n收到信号 " << signal << ", 退出..." << std::endl;
    g_running = false;
}

void printHelp() {
    std::cout << "\n命令:" << std::endl;
    std::cout << "  /help     - 显示帮助" << std::endl;
    std::cout << "  /stream   - 切换流式模式" << std::endl;
    std::cout << "  /context <id> - 切换上下文" << std::endl;
    std::cout << "  /quit     - 退出" << std::endl;
    std::cout << "\n直接输入问题发送给 AI\n" << std::endl;
}

void printUsage(const char* program) {
    std::cout << "用法: " << program << " <RPC_SERVER_ADDRESS>" << std::endl;
    std::cout << std::endl;
    std::cout << "参数:" << std::endl;
    std::cout << "  RPC_SERVER_ADDRESS - gRPC 服务器地址 (例如: localhost:50051)" << std::endl;
    std::cout << std::endl;
    std::cout << "示例:" << std::endl;
    std::cout << "  " << program << " localhost:50051" << std::endl;
    std::cout << std::endl;
    std::cout << "注意: 需要先启动以下服务:" << std::endl;
    std::cout << "  1. ai_orchestrator 系统 (./start_system.sh)" << std::endl;
    std::cout << "  2. rpc_server (./rpc_server)" << std::endl;
}

int main(int argc, char* argv[]) {
    if (argc < 2) {
        printUsage(argv[0]);
        return 1;
    }
    
    std::string server_address = argv[1];
    
    // 设置信号处理
    signal(SIGINT, signalHandler);
    signal(SIGTERM, signalHandler);
    
    std::cout << "==========================================" << std::endl;
    std::cout << "AI Query RPC Client" << std::endl;
    std::cout << "==========================================" << std::endl;
    std::cout << "连接到 RPC Server: " << server_address << std::endl;
    
    // 创建 AI 查询客户端
    AIQueryClient client;
    
    // ========================================================================
    // [熔断器初始化示例 - 可选]
    // 为 AI 查询服务创建熔断器,提供服务容错保护:
    //
    // // 方式1:使用默认配置
    // auto& cb_manager = CircuitBreakerManager::getInstance();
    // auto circuit_breaker = cb_manager.getCircuitBreaker("ai_query_service");
    //
    // // 方式2:使用自定义配置
    // CircuitBreakerConfig cb_config;
    // cb_config.failure_threshold = 5;        // 连续5次失败触发熔断
    // cb_config.success_threshold = 3;        // 半开状态3次成功恢复
    // cb_config.timeout = std::chrono::milliseconds(30000);  // 30秒超时后尝试恢复
    // cb_config.failure_rate_threshold = 0.5; // 失败率超过50%触发熔断
    // cb_config.min_request_count = 10;       // 至少10次请求后才计算失败率
    // auto circuit_breaker = std::make_shared<CircuitBreaker>(cb_config);
    // ========================================================================
    
    if (!client.connect(server_address)) {
        std::cerr << "无法连接到服务器: " << server_address << std::endl;
        return 1;
    }
    
    // ========================================================================
    // [使用熔断器包装连接操作示例 - 可选]
    // 连接操作可以用熔断器保护,防止服务不可用时频繁重试:
    //
    // bool connected = false;
    // try {
    //     connected = circuit_breaker->execute([&]() {
    //         return client.connect(server_address);
    //     });
    // } catch (const std::runtime_error& e) {
    //     // 熔断器开启时会抛出 "Circuit breaker is open" 异常
    //     std::cerr << "服务熔断中,稍后重试: " << e.what() << std::endl;
    //     return 1;
    // }
    // if (!connected) {
    //     std::cerr << "无法连接到服务器" << std::endl;
    //     return 1;
    // }
    // ========================================================================
    
    std::cout << "连接成功!" << std::endl;
    printHelp();
    
    std::string context_id = "default";
    bool stream_mode = false;
    std::string line;
    
    while (g_running) {
        std::cout << "[" << context_id << (stream_mode ? "/流式" : "") << "] > ";
        
        if (!std::getline(std::cin, line)) {
            break;
        }
        
        if (line.empty()) continue;
        
        // 处理命令
        if (line == "/quit" || line == "/exit" || line == "/q") {
            std::cout << "再见!" << std::endl;
            break;
        }
        
        if (line == "/help" || line == "/h") {
            printHelp();
            continue;
        }
        
        if (line == "/stream" || line == "/s") {
            stream_mode = !stream_mode;
            std::cout << "流式模式: " << (stream_mode ? "开启" : "关闭") << std::endl;
            continue;
        }
        
        if (line.substr(0, 9) == "/context " || line.substr(0, 3) == "/c ") {
            size_t pos = line.find(' ');
            if (pos != std::string::npos) {
                context_id = line.substr(pos + 1);
                std::cout << "切换到上下文: " << context_id << std::endl;
            }
            continue;
        }
        
        // 发送 AI 查询
        std::cout << "\n思考中..." << std::endl;
        
        // ====================================================================
        // [熔断器状态检查示例 - 可选]
        // 在发送请求前可以检查熔断器状态:
        //
        // CircuitState state = circuit_breaker->getState();
        // if (state == CircuitState::OPEN) {
        //     std::cout << "⚠️ 服务熔断中,请稍后重试..." << std::endl;
        //     continue;
        // } else if (state == CircuitState::HALF_OPEN) {
        //     std::cout << "ℹ️ 服务恢复测试中..." << std::endl;
        // }
        // ====================================================================
        
        if (stream_mode) {
            // 流式查询
            std::cout << "\nAI: ";
            bool success = client.queryStream(line, 
                [](const agent_communication::AIStreamEvent& event) {
                    if (event.event_type() == "partial") {
                        std::cout << event.content();
                        std::cout.flush();
                    } else if (event.event_type() == "complete") {
                        std::cout << std::endl;
                    } else if (event.event_type() == "error") {
                        std::cout << "\n错误: " << event.content() << std::endl;
                    }
                }, context_id, 60);
            
            if (!success) {
                std::cout << "\n流式查询失败" << std::endl;
                // ============================================================
                // [熔断器记录失败示例 - 可选]
                // circuit_breaker->recordFailure();
                // ============================================================
            }
            // ================================================================
            // [熔断器记录成功示例 - 可选]
            // else {
            //     circuit_breaker->recordSuccess();
            // }
            // ================================================================
        } else {
            // 同步查询
            auto response = client.query(line, context_id, 60);
            
            // ================================================================
            // [使用熔断器包装同步查询示例 - 可选]
            // 推荐方式:使用 execute() 方法自动处理成功/失败记录
            //
            // try {
            //     auto response = circuit_breaker->execute([&]() {
            //         return client.query(line, context_id, 60);
            //     });
            //     // 处理响应...
            // } catch (const std::runtime_error& e) {
            //     if (std::string(e.what()) == "Circuit breaker is open") {
            //         std::cout << "⚠️ 服务熔断中: " << e.what() << std::endl;
            //     } else {
            //         throw;  // 重新抛出其他异常
            //     }
            // }
            // ================================================================
            
            if (response.status().code() == 0) {
                std::cout << "\nAI: " << response.answer() << std::endl;
                if (!response.agent_name().empty()) {
                    std::cout << "[处理 Agent: " << response.agent_name() 
                              << ", 耗时: " << response.processing_time_ms() << "ms]" << std::endl;
                }
                // ============================================================
                // [熔断器记录成功 - 可选]
                // circuit_breaker->recordSuccess();
                // ============================================================
            } else {
                std::cout << "\n错误: " << response.status().message() << std::endl;
                // ============================================================
                // [熔断器记录失败 - 可选]
                // circuit_breaker->recordFailure();
                // ============================================================
            }
        }
        std::cout << std::endl;
    }
    
    client.disconnect();
    std::cout << "已断开连接" << std::endl;
    
    // ========================================================================
    // [熔断器统计信息示例 - 可选]
    // 程序退出前可以打印熔断器统计信息:
    //
    // auto stats = circuit_breaker->getStats();
    // std::cout << "\n========== 熔断器统计 ==========" << std::endl;
    // std::cout << "总请求数: " << stats.total_requests << std::endl;
    // std::cout << "成功请求: " << stats.successful_requests << std::endl;
    // std::cout << "失败请求: " << stats.failed_requests << std::endl;
    // std::cout << "被拒绝请求: " << stats.rejected_requests << std::endl;
    // std::cout << "当前失败率: " << (stats.current_failure_rate * 100) << "%" << std::endl;
    // std::cout << "当前状态: ";
    // switch (circuit_breaker->getState()) {
    //     case CircuitState::CLOSED:    std::cout << "CLOSED (正常)"; break;
    //     case CircuitState::OPEN:      std::cout << "OPEN (熔断中)"; break;
    //     case CircuitState::HALF_OPEN: std::cout << "HALF_OPEN (恢复测试)"; break;
    // }
    // std::cout << std::endl;
    // std::cout << "=================================" << std::endl;
    // ========================================================================
    
    return 0;
}

负载均衡机制

负载均衡整体架构
┌─────────────────────────────────────────────────────────────────────────────┐
│                          负载均衡系统架构                                     │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│                           LoadBalancerFactory                               │
│                      createLoadBalancer(strategy)                           │
│                                 │                                           │
│         ┌───────────────────────┼───────────────────────┐                   │
│         │           │           │           │           │           │       │
│         ↓           ↓           ↓           ↓           ↓           ↓       │
│   ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
│   │RoundRobin│ │  Random  │ │LeastConn │ │Weighted  │ │Consistent│ │LeastResp │
│   │LoadBal   │ │LoadBal   │ │LoadBal   │ │RoundRobin│ │Hash LB   │ │Time LB   │
│   └──────────┘ └──────────┘ └──────────┘ └──────────┘ └──────────┘ └──────────┘
│         │           │           │           │           │           │       │
│         └───────────┴───────────┴───────────┴───────────┴───────────┘       │
│                                 │                                           │
│                                 ↓                                           │
│                    LoadBalancer 接口 (抽象基类)                               │
│            ┌────────────────────────────────────────┐                       │
│            │ selectEndpoint(endpoints) → Endpoint  │                       │
│            │ updateEndpoints(endpoints)            │                       │
│            │ markEndpointStatus(id, healthy)       │                       │
│            └────────────────────────────────────────┘                       │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘
轮询策略 (Round Robin)

原理

请求按顺序依次分配给每个服务器,循环往复

服务器:    A    B    C    A    B    C    A    B    C
请求:      1    2    3    4    5    6    7    8    9

实现代码

// 文件: src/load_balancer.cpp:11-33
ServiceEndpoint RoundRobinLoadBalancer::selectEndpoint(const std::vector<ServiceEndpoint>& endpoints) {
    // 1. 过滤健康端点
    std::vector<ServiceEndpoint> healthy_endpoints;
    for (const auto& endpoint : endpoints) {
        if (endpoint.is_healthy) {
            healthy_endpoints.push_back(endpoint);
        }
    }
    
    // 2. 轮询选择:原子操作获取并递增索引
    size_t index = current_index_.fetch_add(1) % healthy_endpoints.size();
    //             ↑ 原子操作:获取当前值并+1
    //                                    ↑ 取模确保循环
    
    return healthy_endpoints[index];
}

// 成员变量
std::atomic<size_t> current_index_{0};  // 原子计数器,线程安全

图解

┌─────────────────────────────────────────────────────────────────────────────┐
│  轮询策略工作流程                                                            │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│  current_index_ = 0                                                         │
│                                                                             │
│  endpoints = [A, B, C]  (3个健康服务器)                                      │
│                                                                             │
│  请求1: index = 0 % 3 = 0  → 选择 A, current_index_ = 1                     │
│  请求2: index = 1 % 3 = 1  → 选择 B, current_index_ = 2                     │
│  请求3: index = 2 % 3 = 2  → 选择 C, current_index_ = 3                     │
│  请求4: index = 3 % 3 = 0  → 选择 A, current_index_ = 4                     │
│  请求5: index = 4 % 3 = 1  → 选择 B, current_index_ = 5                     │
│  ...                                                                        │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

使用场景

  • 服务器性能相近
  • 无状态服务
  • 简单高效
随机策略 (Random)

原理

每次请求随机选择一个服务器

服务器:    B    A    C    C    A    B    C    A    A
请求:      1    2    3    4    5    6    7    8    9
          ↑ 随机

实现代码

// 文件: src/load_balancer.cpp:54-76
ServiceEndpoint RandomLoadBalancer::selectEndpoint(const std::vector<ServiceEndpoint>& endpoints) {
    // 1. 过滤健康端点
    std::vector<ServiceEndpoint> healthy_endpoints;
    for (const auto& endpoint : endpoints) {
        if (endpoint.is_healthy) {
            healthy_endpoints.push_back(endpoint);
        }
    }
    
    // 2. 随机选择
    std::uniform_int_distribution<> dis(0, healthy_endpoints.size() - 1);
    //                              ↑ 均匀分布 [0, size-1]
    
    return healthy_endpoints[dis(gen_)];
    //                       ↑ 生成随机数
}

// 成员变量
std::random_device rd_;        // 硬件随机数种子
mutable std::mt19937 gen_;     // 梅森旋转随机数生成器

使用场景

  • 服务器性能相近
  • 简单场景
  • 不需要均匀分布时
轮询策略 (Round Robin)

原理

实现代码

图解

使用场景

  • 服务器性能相近
  • 服务器性能相近
  • 服务器性能相近
最少连接策略 (Least Connections)

原理

选择当前连接数最少的服务器

服务器 A: ████  (4 连接)
服务器 B: ██    (2 连接)  ← 选择这个
服务器 C: ███   (3 连接)

实现代码

// 文件: src/load_balancer.cpp:96-127
ServiceEndpoint LeastConnectionsLoadBalancer::selectEndpoint(const std::vector<ServiceEndpoint>& endpoints) {
    ServiceEndpoint* best_endpoint = nullptr;
    int min_connections = INT_MAX;
    
    for (const auto& endpoint : endpoints) {
        if (!endpoint.is_healthy) continue;
        
        std::string endpoint_id = endpoint.host + ":" + std::to_string(endpoint.port);
        int connections = connection_counts_[endpoint_id];  // 获取当前连接数
        
        if (connections < min_connections) {
            min_connections = connections;
            best_endpoint = const_cast<ServiceEndpoint*>(&endpoint);
        }
    }
    
    // 选中后增加连接计数
    connection_counts_[endpoint_id]++;
    
    return *best_endpoint;
}

// 连接管理
void incrementConnections(const std::string& endpoint_id);  // 建立连接时调用
void decrementConnections(const std::string& endpoint_id);  // 断开连接时调用

图解

┌─────────────────────────────────────────────────────────────────────────────┐
│  最少连接策略                                                                │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│  connection_counts_ = {                                                     │
│      "A:8080": 4,                                                           │
│      "B:8080": 2,  ← 最小                                                   │
│      "C:8080": 3                                                            │
│  }                                                                          │
│                                                                             │
│  新请求到达 → 选择 B:8080                                                    │
│  connection_counts_["B:8080"]++  → 变为 3                                   │
│                                                                             │
│  请求完成后 → 调用 decrementConnections("B:8080")                           │
│  connection_counts_["B:8080"]--  → 变回 2                                   │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

使用场景

  • 长连接服务
  • 请求处理时间差异大
  • 需要动态负载均衡
加权轮询策略 (Weighted Round Robin)

原理

根据服务器权重分配请求,权重高的分配更多

服务器 A (权重=5): █████
服务器 B (权重=3): ███
服务器 C (权重=2): ██

10个请求分配: A A A A A B B B C C

实现代码

// 文件: src/load_balancer.cpp:177-211
ServiceEndpoint WeightedRoundRobinLoadBalancer::selectEndpoint(const std::vector<ServiceEndpoint>& endpoints) {
    WeightedEndpoint* best_endpoint = nullptr;
    int max_current_weight = 0;
    
    // 1. 遍历所有端点,累加权重
    for (auto& endpoint : weighted_endpoints_) {
        if (!endpoint.endpoint.is_healthy) continue;
        
        // 当前权重 += 初始权重
        endpoint.current_weight += endpoint.weight;
        
        // 找出当前权重最大的
        if (endpoint.current_weight > max_current_weight) {
            max_current_weight = endpoint.current_weight;
            best_endpoint = &endpoint;
        }
    }
    
    // 2. 选中后减去自身权重
    best_endpoint->current_weight -= best_endpoint->weight;
    
    return best_endpoint->endpoint;
}

// 权重从 metadata 获取
auto it = endpoint.metadata.find("weight");
if (it != endpoint.metadata.end()) {
    weighted_endpoint.weight = std::stoi(it->second);
}

图解

┌─────────────────────────────────────────────────────────────────────────────┐
│  方式一:减去自身权重(代码实际实现)                                          │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│  A=5, B=1, C=1                                                              │
│                                                                             │
│  请求1: A=5,B=1,C=1 → 选A → A=5-5=0 → A=0,B=1,C=1                          │
│  请求2: A=5,B=2,C=2 → 选A → A=5-5=0 → A=0,B=2,C=2                          │
│  请求3: A=5,B=3,C=3 → 选A → A=5-5=0 → A=0,B=3,C=3                          │
│  请求4: A=5,B=4,C=4 → 选A → A=5-5=0 → A=0,B=4,C=4                          │
│  请求5: A=5,B=5,C=5 → 选A → A=5-5=0 → A=0,B=5,C=5                          │
│  请求6: A=5,B=6,C=6 → 选B → B=6-1=5 → A=5,B=5,C=6                          │
│  ...                                                                        │
│                                                                             │
│  问题:A 连续被选中很多次,不够平滑!                                          │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────────────────────┐
│  方式二:减去总权重(标准Nginx算法)                                           │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│  A=5, B=1, C=1, 总权重=7                                                    │
│                                                                             │
│  请求1: A=5,B=1,C=1 → 选A → A=5-7=-2 → A=-2,B=1,C=1                        │
│  请求2: A=3,B=2,C=2 → 选A → A=3-7=-4 → A=-4,B=2,C=2                        │
│  请求3: A=1,B=3,C=3 → 选B → B=3-7=-4 → A=1,B=-4,C=3                        │
│  请求4: A=6,B=-3,C=4 → 选A → A=6-7=-1 → A=-1,B=-3,C=4                      │
│  请求5: A=4,B=-2,C=5 → 选C → C=5-7=-2 → A=4,B=-2,C=-2                      │
│  请求6: A=9,B=-1,C=-1 → 选A → ...                                          │
│  请求7: A=7,B=0,C=0  → 选A → ...                                           │
│                                                                             │
│  7次请求分配:A A B A C A A  → A得5, B得1, C得1次 ✓                       │
│  而且分布是交错的,更平滑!                                                    │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

使用场景

  • 服务器性能不同
  • 需要手动控制流量分配
  • 灰度发布
一致性哈希策略 (Consistent Hash)

原理

将服务器和请求都映射到一个哈希环上,请求顺时针找最近的服务器

       0°
        │
   ╭────┴────╮
  ╱    A      ╲         请求 X 的哈希值在 B 和 C 之间
 │              │       → 顺时针找到 C
 │      B       │
  ╲            ╱
   ╰────┬────╯
        │
       C

实现代码

// 文件: src/load_balancer.cpp:249-362
ConsistentHashLoadBalancer::ConsistentHashLoadBalancer(int virtual_nodes) 
    : virtual_nodes_(virtual_nodes) {}  // 默认 150 个虚拟节点

// 构建哈希环
void ConsistentHashLoadBalancer::buildHashRing() {
    hash_ring_.clear();
    
    for (const auto& pair : endpoints_) {
        if (!pair.second.is_healthy) continue;
        
        // 为每个物理节点创建 N 个虚拟节点
        for (int i = 0; i < virtual_nodes_; ++i) {
            std::string virtual_key = pair.first + "#" + std::to_string(i);
            //                        "A:8080#0", "A:8080#1", ... "A:8080#149"
            
            uint32_t hash_value = hash(virtual_key);
            
            HashNode node;
            node.key = virtual_key;
            node.endpoint = pair.second;
            node.hash = hash_value;
            
            hash_ring_.push_back(node);
        }
    }
    
    // 按哈希值排序
    std::sort(hash_ring_.begin(), hash_ring_.end(), 
              [](const HashNode& a, const HashNode& b) {
                  return a.hash < b.hash;
              });
}

// 查找端点(二分查找)
ServiceEndpoint ConsistentHashLoadBalancer::findEndpoint(uint32_t hash_value) {
    // 找第一个哈希值 >= 目标值的节点
    auto it = std::lower_bound(hash_ring_.begin(), hash_ring_.end(), hash_value,
                              [](const HashNode& node, uint32_t value) {
                                  return node.hash < value;
                              });
    
    // 环形结构:如果超出末尾,回到开头
    if (it == hash_ring_.end()) {
        it = hash_ring_.begin();
    }
    
    return it->endpoint;
}

图解

┌─────────────────────────────────────────────────────────────────────────────┐
│  一致性哈希环(含虚拟节点)                                                   │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│                        0                                                    │
│                        │                                                    │
│              A#2 ─────┬┴┬───── B#0                                         │
│                  ╱         ╲                                               │
│              A#0          B#1                                              │
│            ╱                   ╲                                           │
│         C#1                     A#1                                        │
│        │                           │                                       │
│        │      ← 请求X的哈希值       │                                       │
│        │        落在这里           │                                       │
│        │           │              │                                       │
│         C#0 ─────────────── B#2                                            │
│                  ╲         ╱                                               │
│                   ─────────                                                │
│                                                                             │
│  请求 X → hash("X") = 12345 → 顺时针找到 C#0 → 返回服务器 C                  │
│                                                                             │
│  为什么用虚拟节点?                                                          │
│  - 3个物理节点只有3个点,分布不均匀                                           │
│  - 150个虚拟节点/物理节点 → 450个点,分布更均匀                               │
│  - 添加/删除节点时,只影响相邻区域的请求                                       │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

使用场景

  • 分布式缓存(同一用户/数据总是路由到同一服务器)
  • 有状态服务
  • 需要会话保持
  • 动态扩缩容时最小化数据迁移

解释
先理解普通哈希的问题

┌─────────────────────────────────────────────────────────────────────────────┐
│  场景:你有3台缓存服务器,存放用户数据                                         │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│  普通哈希方法:                                                              │
│  服务器编号 = hash(用户ID) % 服务器数量                                       │
│                                                                             │
│  用户"张三"hash("张三") = 1234512345 % 3 = 0 → 服务器A                │
│  用户"李四"hash("李四") = 6789067890 % 3 = 1 → 服务器B                │
│  用户"王五"hash("王五") = 1111111111 % 3 = 2 → 服务器C                │
│                                                                             │
│  一切正常...                                                                 │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────────────────────┐
│  灾难来了:加了一台服务器,变成4台                                             │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│  现在要 % 4:                                                                │
│                                                                             │
│  用户"张三"12345 % 4 = 1 → 服务器B  (原来在A,变了!)                    │
│  用户"李四"67890 % 4 = 2 → 服务器C  (原来在B,变了!)                    │
│  用户"王五"11111 % 4 = 3 → 服务器D  (原来在C,变了!)                    │
│                                                                             │
│  问题:几乎所有用户的数据都要迁移!                                            │
│  - 缓存全部失效                                                              │
│  - 数据库被大量请求压垮                                                       │
│  - 系统崩溃!                                                                │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

一致性哈希的解决方案

┌─────────────────────────────────────────────────────────────────────────────┐
│  想象一个圆形的"哈希环"(像钟表一样)                                          │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│                         12(哈希值=0)                                      │
│                              │                                              │
│                              │                                              │
│               10点 ──────────┼────────── 2点                                │
│                   ╲          │          ╱                                  │
│                    ╲    [服务器A]      ╱                                   │
│                     ╲        │        ╱                                    │
│         9点          ╲       │       ╱         3点                         │
│                       ╲      │      ╱                                      │
│                        ╲     │     ╱                                       │
│                         ╲    │    ╱                                        │
│         8[服务器B]              4点                         │
│                         ╱    │    ╲                                        │
│                        ╱     │     ╲                                       │
│         7点           ╱      │      ╲          5点                         │
│                      ╱  [服务器C]    ╲                                     │
│                     ╱        │        ╲                                    │
│                    ╱         │         ╲                                   │
│               6点 ───────────┴─────────── 4点                               │
│                                                                             │
│  规则:把服务器和用户都放到环上,用户顺时针找最近的服务器                        │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

具体工作流程

┌─────────────────────────────────────────────────────────────────────────────┐
│  Step 1: 把服务器放到环上                                                    │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│  hash("服务器A") = 1000  → 放在环的 1000 位置                                │
│  hash("服务器B") = 5000  → 放在环的 5000 位置                                │
│  hash("服务器C") = 8000  → 放在环的 8000 位置                                │
│                                                                             │
│                    0                                                        │
│                    │                                                        │
│             A(1000)┼                                                        │
│                   ╱ ╲                                                       │
│                  ╱   ╲                                                      │
│                 ╱     ╲                                                     │
│        C(8000)─┤       ├─B(5000)                                           │
│                 ╲     ╱                                                     │
│                  ╲   ╱                                                      │
│                   ╲ ╱                                                       │
│                    │                                                        │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────────────────────┐
│  Step 2: 用户请求来了,顺时针找最近的服务器                                    │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│  用户"张三"hash("张三") = 3000                                           │
│                                                                             │
│                    0                                                        │
│                    │                                                        │
│             A(1000)┼                                                        │
│                   ╱ ╲                                                       │
│          张三(3000)→  ╲                                                     │
│               ↓   ╱     ╲                                                   │
│        C(8000)─┤       ├─B(5000) ← 顺时针最近!                             │
│                 ╲     ╱                                                     │
│                  ╲   ╱                                                      │
│                   ╲ ╱                                                       │
│                                                                             │
│  结果:张三的数据存到服务器B                                                   │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────────────────────┐
│  Step 3: 添加新服务器D时会发生什么?                                          │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│  新增 hash("服务器D") = 4000                                                │
│                                                                             │
│                    0                                                        │
│                    │                                                        │
│             A(1000)┼                                                        │
│                   ╱ ╲                                                       │
│          张三(3000)   ╲                                                     │
│               ↓       ╲                                                     │
│        C(8000)─┤  D(4000)B(5000)                                          │
│                         ↑                                                   │
│                    张三现在顺时针最近的是D了                                   │
│                                                                             │
│  影响范围:只有 [3000, 4000] 区间的用户需要迁移到D                             │
│  其他用户完全不受影响!                                                       │
│                                                                             │
│  对比普通哈希:加一台服务器几乎所有数据都要迁移                                 │
│  一致性哈希:只有 1/N 的数据需要迁移                                          │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

虚拟节点的作用

┌─────────────────────────────────────────────────────────────────────────────┐
│  问题:如果只有3个点,分布可能很不均匀                                         │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│          A          不均匀的情况:                                           │
│          │          - A 覆盖的区间很大(从C到A)                              │
│          │          - C 覆盖的区间很小(从B到C)                              │
│      ────┼────      - 导致A的负载是C的好几倍                                 │
│         ╱ ╲                                                                 │
│        ╱   ╲                                                                │
│       C─────B                                                               │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────────────────────┐
│  解决:每个服务器创建多个"虚拟节点"                                            │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│  服务器A → 创建 A#0, A#1, A#2, ... A#149 (150个虚拟节点)                     │
│  服务器B → 创建 B#0, B#1, B#2, ... B#149                                    │
│  服务器C → 创建 C#0, C#1, C#2, ... C#149                                    │
│                                                                             │
│  环上现在有 450 个点,分布更均匀:                                            │
│                                                                             │
│            A#0  B#47                                                        │
│              ╲  │  ╱                                                        │
│         C#12 ─┼─┼─┼─ A#88                                                  │
│              ╱  │  ╲                                                        │
│            B#3  │  C#99                                                    │
│                 │                                                           │
│                A#45                                                         │
│                                                                             │
│  虽然点很多,但 A#xxx 都指向服务器A                                           │
│  用户找到 A#88,实际请求发到服务器A                                           │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘
最短响应时间策略 (Least Response Time)

原理

选择平均响应时间最短的服务器

服务器 A: 平均响应 50ms
服务器 B: 平均响应 20ms  ← 选择这个
服务器 C: 平均响应 35ms

实现代码

// 文件: src/load_balancer.cpp:367-456
ServiceEndpoint LeastResponseTimeLoadBalancer::selectEndpoint(const std::vector<ServiceEndpoint>& endpoints) {
    ServiceEndpoint* best_endpoint = nullptr;
    std::chrono::milliseconds min_response_time = std::chrono::milliseconds::max();
    
    for (const auto& endpoint : endpoints) {
        if (!endpoint.is_healthy) continue;
        
        std::string endpoint_id = endpoint.host + ":" + std::to_string(endpoint.port);
        auto response_time = calculateAverageResponseTime(endpoint_id);
        
        if (response_time < min_response_time) {
            min_response_time = response_time;
            best_endpoint = const_cast<ServiceEndpoint*>(&endpoint);
        }
    }
    
    return *best_endpoint;
}

// 更新响应时间(请求完成后调用)
void LeastResponseTimeLoadBalancer::updateResponseTime(const std::string& endpoint_id, 
                                                       std::chrono::milliseconds response_time) {
    auto& stats = endpoint_stats_[endpoint_id];
    stats.request_count++;
    stats.last_update = std::chrono::steady_clock::now();
    
    // 指数移动平均:新值占 20%,旧值占 80%
    if (stats.avg_response_time.count() == 0) {
        stats.avg_response_time = response_time;
    } else {
        stats.avg_response_time = std::chrono::milliseconds(
            (stats.avg_response_time.count() * 0.8) + (response_time.count() * 0.2)
        );
        // 平滑过渡,避免单次异常值影响太大
    }
}

图解

┌─────────────────────────────────────────────────────────────────────────────┐
│  指数移动平均(EMA)计算示例                                                  │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│  初始: avg = 0                                                              │
│                                                                             │
│  请求1完成,耗时 100ms:                                                      │
│    avg = 100  (第一次直接赋值)                                               │
│                                                                             │
│  请求2完成,耗时 50ms:                                                       │
│    avg = 100 * 0.8 + 50 * 0.2 = 80 + 10 = 90ms                             │
│                                                                             │
│  请求3完成,耗时 60ms:                                                       │
│    avg = 90 * 0.8 + 60 * 0.2 = 72 + 12 = 84ms                              │
│                                                                             │
│  请求4完成,耗时 200ms(异常慢):                                             │
│    avg = 84 * 0.8 + 200 * 0.2 = 67.2 + 40 = 107.2ms                        │
│    (异常值只贡献 20%,不会剧烈波动)                                           │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

使用场景

  • 服务器性能波动大
  • 需要自适应负载均衡
  • 对响应时间敏感的服务
工厂模式使用
// 文件: src/load_balancer.cpp:458-476
std::unique_ptr<LoadBalancer> LoadBalancerFactory::createLoadBalancer(LoadBalanceStrategy strategy) {
    switch (strategy) {
        case LoadBalanceStrategy::ROUND_ROBIN:
            return std::make_unique<RoundRobinLoadBalancer>();
        case LoadBalanceStrategy::RANDOM:
            return std::make_unique<RandomLoadBalancer>();
        case LoadBalanceStrategy::LEAST_CONNECTIONS:
            return std::make_unique<LeastConnectionsLoadBalancer>();
        case LoadBalanceStrategy::WEIGHTED_ROUND_ROBIN:
            return std::make_unique<WeightedRoundRobinLoadBalancer>();
        case LoadBalanceStrategy::CONSISTENT_HASH:
            return std::make_unique<ConsistentHashLoadBalancer>();
        case LoadBalanceStrategy::LEAST_RESPONSE_TIME:
            return std::make_unique<LeastResponseTimeLoadBalancer>();
        default:
            return std::make_unique<RoundRobinLoadBalancer>();
    }
}

使用示例

// 创建负载均衡器
auto lb = LoadBalancerFactory::createLoadBalancer(LoadBalanceStrategy::LEAST_CONNECTIONS);

// 更新服务端点列表
lb->updateEndpoints(endpoints);

// 选择端点发送请求
ServiceEndpoint selected = lb->selectEndpoint(endpoints);

// 标记端点健康状态
lb->markEndpointStatus("192.168.1.1:8080", false);  // 标记为不健康
问题
mt19937 是什么

mt19937 = Mersenne Twister 19937

  • Mersenne Twister:梅森旋转算法(一种随机数生成算法)
  • 19937:算法的周期是 2^19937 - 1(一个超级大的数)
┌─────────────────────────────────────────────────────────────────────────────┐
│  mt19937 = 高质量随机数生成器                                                │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│  类比:                                                                     │
│  - rand() 像一个简单的骰子 → 便宜但不够"随机"                                 │
│  - mt19937 像一个超复杂的随机摇奖机 → 更均匀、更难预测                         │
│                                                                             │
│  特点:                                                                     │
│  1. 周期极长:2^19937-1 次才会重复                                           │
│  2. 分布均匀:每个数出现的概率相等                                            │
│  3. 速度快:适合需要大量随机数的场景                                          │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘
mutable 关键字的含义

mutable = 即使在 const 成员函数中也可以修改

// 没有 mutable 时
ServiceEndpoint selectEndpoint(...) const {  // const 函数
    gen_();  // ❌ 错误!const 函数不能修改成员变量
}

// 有 mutable 时
ServiceEndpoint selectEndpoint(...) const {
    gen_();  // ✓ OK!mutable 变量可以在 const 函数中修改
}
uniform_int_distribution解释
std::uniform_int_distribution<> dis(0, healthy_endpoints.size() - 1);
return healthy_endpoints[dis(gen_)];

┌─────────────────────────────────────────────────────────────────────────────┐
│  第一行:创建"均匀分布生成器"                                                 │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│  std::uniform_int_distribution<> dis(0, healthy_endpoints.size() - 1);     │
│  │                                │  │   │                                     │
│  │                                │  │   └─ 最大值(假设size=3,则最大值=2)    │
│  │                                │  └──── 最小值                             │
│  │                                └──────── 变量名                             │
│  └───────────────────────────────────── 均匀整数分布类型                    │
│                                                                             │
│  假设 healthy_endpoints.size() = 3                                         │
│  dis 就是一个能生成 [0, 1, 2] 范围内整数的分布器                              │
│                                                                             │
│  类比:dis 就像一个"骰子模板",规定了骰子有哪些面(0,1,2)                      │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────────────────────┐
│  第二行:生成随机数并取元素                                                   │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│  return healthy_endpoints[dis(gen_)];                                       │
│         │                 │   │                                            │
│         │                 │   └─ gen_ 是随机数引擎(mt19937)               │
│         │                 └───── dis(gen_) 调用分布器,传入引擎              │
│         │                        返回一个 [0, 2] 范围的随机整数              │
│         └───────────────────────── 用随机数作为下标取数组元素                │
│                                                                             │
│  执行过程:                                                                 │
│  1. gen_() 生成一个原始随机数,比如 3847291038474                            │
│  2. dis 把这个大数"映射"[0, 2] 范围,比如得到 1                           │
│  3. 返回 healthy_endpoints[1]                                              │
│                                                                             │
│  类比:                                                                     │
│  - gen_ = 随机数发动机(输出原始随机数)                                      │
│  - dis = 转换器(把原始随机数转成我们想要的范围)                              │
│  - dis(gen_) = 把发动机接到转换器上,得到最终结果                             │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘
有状态服务 vs 无状态服务

通俗解释

┌─────────────────────────────────────────────────────────────────────────────┐
│  无状态服务 (Stateless)                                                      │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│  类比:快餐店                                                                │
│  - 你去任何一个窗口点餐都一样                                                 │
│  - 服务员不需要"记住"你是谁                                                   │
│  - 每次请求都带上所有信息(我要一个汉堡+可乐)                                  │
│                                                                             │
│  例子:                                                                     │
│  - 计算服务:输入 2+3,返回 5                                                │
│  - 图片压缩服务:输入图片,返回压缩后的图片                                    │
│  - 无登录的API查询                                                          │
│                                                                             │
│  特点:                                                                     │
│  - 请求可以发给任何一台服务器                                                 │
│  - 服务器挂了,换一台继续用                                                   │
│  - 容易扩展:加机器就行                                                      │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────────────────────┐
│  有状态服务 (Stateful)                                                       │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│  类比:私人医生                                                              │
│  - 你必须找你的专属医生                                                       │
│  - 医生有你的病历(历史状态)                                                 │
│  - 换个医生,他不知道你的情况                                                 │
│                                                                             │
│  例子:                                                                     │
│  - 购物车服务:服务器记住你的购物车内容                                        │
│  - 游戏服务器:服务器记住你的角色位置、血量                                    │
│  - WebSocket 长连接:服务器维护连接状态                                       │
│  - 分布式缓存:数据存在特定的服务器上                                          │
│                                                                             │
│  特点:                                                                     │
│  - 必须路由到"正确"的服务器                                                   │
│  - 服务器挂了,状态可能丢失                                                   │
│  - 扩展困难:需要迁移状态                                                     │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

为什么有状态服务需要一致性哈希

┌─────────────────────────────────────────────────────────────────────────────┐
│  场景:分布式缓存(有状态)                                                   │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│  用户"张三"的数据存在服务器B                                                  │
│                                                                             │
│  如果用轮询:                                                                │
│    请求1 → 服务器A → 没有张三的数据 → 去数据库查                               │
│    请求2 → 服务器B → 有数据 → 直接返回                                        │
│    请求3 → 服务器C → 没有张三的数据 → 又去数据库查                             │
│    → 缓存命中率很低!                                                        │
│                                                                             │
│  如果用一致性哈希:                                                          │
│    张三的请求 → 永远路由到服务器B                                             │
│    请求1 → 服务器B → 有数据                                                  │
│    请求2 → 服务器B → 有数据                                                  │
│    请求3 → 服务器B → 有数据                                                  │
│    → 缓存命中率极高!                                                        │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

总结表格

特性 无状态服务 有状态服务
服务器是否需要记住用户 ❌ 不需要 ✅ 需要
请求可以发给任意服务器 ✅ 可以 ❌ 必须发给特定服务器
服务器挂了 换一台继续 状态可能丢失
扩容难度 简单 困难(需迁移数据)
推荐负载均衡策略 轮询、随机 一致性哈希
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐