探索brpc:特性、使用场景与同步异步调用与封装示例
brpc 是用 c++语言编写的工业级 RPC 框架,常用于搜索、存储、机器学习、广告、推荐等高性能系统RPC(Remote Procedure Call,远程过程调用)框架指用于在网络中实现进程间通信的技术,使得程序能够调用远程计算机上的程序或服务,就像调用本地程序一样。
·
文章目录
前言
brpc 是用 c++语言编写的工业级 RPC 框架,常用于搜索、存储、机器学习、广告、推荐等高性能系统
RPC(Remote Procedure Call,远程过程调用)框架指用于在网络中实现进程间通信的技术,使得程序能够调用远程计算机上的程序或服务,就像调用本地程序一样。
特性
特性 | 描述 |
---|---|
高性能 | 针对高并发场景优化,支持异步 IO,低延迟和高吞吐量。 |
多种传输协议 | 支持 HTTP/2、gRPC、TCP 等多种协议,灵活选择适合的方案。 |
流式支持 | 支持单向和双向流式 RPC 调用,适合实时数据传输场景。 |
负载均衡 | 内置多种负载均衡策略,支持客户端和服务端负载均衡。 |
服务发现 | 提供内置服务发现机制,支持与外部工具(如 Etcd、Consul)集成。 |
灵活序列化方式 | 默认使用 Protobuf,还支持 JSON 等其他序列化格式。 |
易用性 | 提供简单易懂的 API,快速上手和实现服务。 |
扩展性 | 支持插件机制,可以根据需求扩展功能。 |
使用场景
- 微服务架构:作为微服务间通信的基础框架,支持快速构建和部署微服务应用。
- 高并发处理:适用于需要处理大量并发请求的应用,如在线支付、即时通讯等。
- 实时数据传输:支持实时数据流的场景,例如视频直播、在线游戏等。
- 大规模分布式系统:适合于构建大规模的分布式系统,支持动态扩展和负载均衡。
- 跨语言服务调用:支持多种编程语言的服务调用。
brpc && grpc 对比
下面是 gRPC 与 brpc 的对比表格,涵盖了 主要特性和优缺点:
特性 | gRPC | brpc |
---|---|---|
语言支持 | 多种语言(C++, Java, Python, Go等) | 多种语言(C++, Python, Java等) |
传输协议 | HTTP/2 | 自定义协议,支持多种传输方式 |
序列化方式 | Protobuf | Protobuf、JSON等 |
性能 | 高性能,适合微服务架构 | 较高性能,优化了并发处理 |
流式支持 | 支持单向和双向流式 | 支持双向流式 |
负载均衡 | 需要外部支持或使用 gRPC 的内置功能 | 内置支持多种负载均衡策略 |
服务发现 | 依赖外部服务发现(如 Consul) | 内置服务发现机制 |
生态系统 | 强大的生态系统,广泛应用 | 相对较小但在特定场景下表现出色 |
社区活跃度 | 活跃,广泛使用 | 较小,但有特定用户群体 |
文档与支持 | 丰富的文档和社区支持 | 文档相对较少,社区较小 |
总结
- gRPC 适合需要跨语言和高性能通信的微服务架构,具有强大的生态系统和社区支持。
- brpc 更加专注于高性能的 RPC 通信,并且在某些场景下具有更好的灵活性和效率,适合特定需求的使用者。
相关类与接口
日志输出类与接口
日志输出类 包含头文件: #include <butil/logging.h>
在编写项目时,日志输出完全根据个人以及项目需求以使用不同的日志输出类,这里介绍如何关闭brpc自带的日志输出类:
namespace logging {
// 日志输出目标枚举
enum LoggingDestination {
LOG_TO_NONE = 0 // 不输出日志
};
// 日志设置结构体
struct BUTIL_EXPORT LoggingSettings {
// 构造函数,初始化日志设置
LoggingSettings();
// 日志输出目标,决定日志将被发送到何处
LoggingDestination logging_dest;
};
// 初始化日志系统
// 参数:
// settings - 包含日志设置的 LoggingSettings 对象
// 返回:
// bool - 初始化是否成功
bool InitLogging(const LoggingSettings& settings);
}
// 0. 关闭 brpc 默认日志输出
logging::LoggingSettings settings; // 创建一个 LoggingSettings 对象
settings.logging_dest = logging::LoggingDestination::LOG_TO_NONE; // 设置日志输出为不输出
logging::InitLogging(settings); // 初始化日志系统,应用上述设置
protobuf类与接口
namespace google
{
namespace protobuf
{
// Closure 类用于定义可调用对象的接口
class PROTOBUF_EXPORT Closure
{
public:
// 默认构造函数
Closure() {}
// 虚析构函数,用于确保派生类被正确析构
virtual ~Closure();
// 纯虚函数,派生类需要实现该函数以定义具体的操作
virtual void Run() = 0;
};
// 创建一个新的回调对象
// 参数:
// function - 指向要调用的无参数函数的指针
// 返回:
// Closure* - 指向新创建的 Closure 对象的指针
inline Closure *NewCallback(void (*function)());
// RpcController 类用于控制 RPC 调用的状态
class PROTOBUF_EXPORT RpcController
{
public:
// 检查 RPC 调用是否失败
// 返回:
// bool - 如果调用失败则返回 true,否则返回 false
bool Failed();
// 获取错误信息文本
// 返回:
// std::string - 表示错误的文本信息
std::string ErrorText();
};
}
}
服务端类与接口
namespace brpc
{
// ServerOptions 结构用于配置服务器选项
struct ServerOptions
{
// 空闲超时时间,超过该时间后关闭连接
int idle_timeout_sec; // 默认值: -1(禁用)
// 服务器线程数量,默认值为 CPU 核心数
int num_threads; // 默认值: #cpu-cores
// 其他可能的选项...
};
// ServiceOwnership 枚举定义服务的所有权管理方式
enum ServiceOwnership {
// 当添加服务失败时,服务器负责删除服务对象
SERVER_OWNS_SERVICE,
// 当添加服务失败时,服务器不会删除服务对象
SERVER_DOESNT_OWN_SERVICE
};
// Server 类表示一个 BRPC 服务器
class Server
{
public:
// 添加服务到服务器
// 参数:
// service - 要添加的服务对象
// ownership - 服务的所有权类型
// 返回:
// int - 返回操作结果的状态码
int AddService(google::protobuf::Service *service, ServiceOwnership ownership);
// 启动服务器
// 参数:
// port - 监听的端口号
// opt - 服务器选项
// 返回:
// int - 返回启动状态
int Start(int port, const ServerOptions *opt);
// 停止服务器
// 参数:
// closewait_ms - 等待关闭的时间(不再使用)
// 返回:
// int - 返回停止状态
int Stop(int closewait_ms /*not used anymore*/);
// 等待服务器完成所有任务并退出
// 返回:
// int - 返回加入状态
int Join();
// 运行服务器,直到收到退出请求
void RunUntilAskedToQuit();
};
// ClosureGuard 类用于确保在作用域结束时执行回调
class ClosureGuard
{
public:
explicit ClosureGuard(google::protobuf::Closure *done)
: _done(done) {} // 初始化回调指针
~ClosureGuard()
{
if (_done)
_done->Run(); // 在析构时调用回调
}
private:
google::protobuf::Closure *_done; // 存储回调指针
};
// HttpHeader 类表示 HTTP 请求或响应的头部信息
class HttpHeader
{
public:
// 设置内容类型
void set_content_type(const std::string &type);
// 获取指定键的头部值
const std::string *GetHeader(const std::string &key);
// 设置指定键的头部值
void SetHeader(const std::string &key, const std::string &value);
// 获取 URI
const URI &uri() const { return _uri; }
// 获取 HTTP 方法
HttpMethod method() const { return _method; }
// 设置 HTTP 方法
void set_method(const HttpMethod method);
// 获取状态码
int status_code();
// 设置状态码
void set_status_code(int status_code);
private:
URI _uri; // 存储 URI 信息
HttpMethod _method; // 存储 HTTP 方法
// 其他可能的成员...
};
// Controller 类用于管理 RPC 调用
class Controller : public google::protobuf::RpcController
{
public:
// 设置超时时间
void set_timeout_ms(int64_t timeout_ms);
// 设置最大重试次数
void set_max_retry(int max_retry);
// 获取响应消息
google::protobuf::Message *response();
// 获取 HTTP 响应头
HttpHeader &http_response();
// 获取 HTTP 请求头
HttpHeader &http_request();
// 检查 RPC 调用是否失败
bool Failed();
// 获取错误文本
std::string ErrorText();
// 定义 RPC 响应后的回调函数类型
using AfterRpcRespFnType = std::function<
void(Controller *cntl,
const google::protobuf::Message *req,
const google::protobuf::Message *res)>;
// 设置 RPC 响应后的回调函数
void set_after_rpc_resp_fn(AfterRpcRespFnType &&fn);
private:
// 其他成员...
};
}
客户端类与接口
namespace brpc
{
// ChannelOptions 结构用于配置通道选项
struct ChannelOptions
{
// 请求连接超时时间,单位为毫秒
int32_t connect_timeout_ms; // 默认值: 200 毫秒
// RPC 请求超时时间,单位为毫秒
int32_t timeout_ms; // 默认值: 500 毫秒
// 最大重试次数
int max_retry; // 默认值: 3
// 序列化协议类型
AdaptiveProtocolType protocol; // 默认值: "baidu_std"
// 其他可能的选项...
};
// Channel 类表示一个 RPC 通道,用于和服务器进行通信
class Channel : public ChannelBase
{
public:
// 初始化接口
// 参数:
// server_addr_and_port - 服务器地址和端口
// options - 指向 ChannelOptions 的指针,用于配置通道
// 返回:
// int - 成功返回 0,失败返回错误码
int Init(const char *server_addr_and_port,
const ChannelOptions *options);
};
// 其他相关类和功能...
}
使用
同步调用 & 异步调用
同步调用是指在程序中,当一个函数被调用时,调用者会等待被调用的函数执行完毕并返回结果后,才会继续执行后面的代码。
放在brpc中,同步调用是指客户端在发送请求后,会以阻塞的方式等待服务端的响应;
首先编写proto文件:
syntax = "proto3";
package emp;
option cc_generic_services = true; // 生成通用服务代码 (用于rpc)
message EchoRequest {
string message = 1;
}
message EchoResponse {
string message = 1;
}
service EchoService {
rpc Echo(EchoRequest) returns (EchoResponse);
}
编辑后开始写server代码:
#include <butil/logging.h>
#include <brpc/server.h>
#include "main.pb.h"
class EchoServiceT : public emp::EchoService {
public:
EchoServiceT() {}
~EchoServiceT() {}
// 重写父类回声函数
void Echo(google::protobuf::RpcController* controller,
const ::emp::EchoRequest* request,
::emp::EchoResponse* response,
::google::protobuf::Closure* done)
{
brpc::ClosureGuard done_guard(done); // 自动调用done->Run()
std::cout << "接收到消息: " << request->message() << std::endl;
std::string msg = "响应消息: " + request->message();
response->set_message(msg);
}
};
int main(int argc, char* argv[]) {
// 0. 关闭brpc默认日志输出
logging::LoggingSettings settings;
settings.logging_dest = logging::LoggingDestination::LOG_TO_NONE;
logging::InitLogging(settings);
// 1. 初始化服务器对象
brpc::Server server;
// 2. 注册服务
EchoServiceT echo_service;
// SERVER_OWNS_SERVICE 服务器负责管理销毁 该服务
// SERVER_DOESNT_OWN_SERVICE 服务器不负责该服务的生命周期
auto ret = server.AddService(&echo_service, brpc::ServiceOwnership::SERVER_OWNS_SERVICE);
if (ret != 0) {
std::cout << "添加RPC服务失败。" << std::endl;
return -1;
}
// 3. 启动服务器
brpc::ServerOptions options;
options.idle_timeout_sec = -1; // 设置超时时间 为-1,表示不超时
options.num_threads = 1; // 设置线程数
ret = server.Start(8080, &options);
if (ret != 0) {
std::cout << "启动服务器失败。" << std::endl;
return -1;
}
server.RunUntilAskedToQuit(); // 阻塞等待直到收到退出信号
return 0;
}
客户端代码:
#include <brpc/channel.h>
#include <thread>
#include <iostream>
#include "main.pb.h"
#define SYNC 0
// 异步回调函数
void callback(brpc::Controller *cntl, emp::EchoResponse *resp) {
std::unique_ptr<brpc::Controller> cntl_guard(cntl);
std::unique_ptr<emp::EchoResponse> resp_guard(resp);
if (cntl->Failed()) {
std::cout << "RPC调用失败: " << cntl->ErrorText() << std::endl;
return;
}
std::cout << "收到响应: " << resp->message() << std::endl;
}
int main(int argc, char* argv[]) {
// 1. 创建信道 连接服务器
brpc::ChannelOptions options;
options.protocol = "baidu_std"; // 序列化协议 默认
options.connect_timeout_ms = -1; // 连接超时时间 -1表示永不超时
options.timeout_ms = -1; // 超时时间 -1表示永不超时
options.max_retry = 3; // 最大重试次数
brpc::Channel channel;
if (channel.Init("127.0.0.1:8080", &options) != 0) {
std::cout << "初始化信道失败" << std::endl;
return -1;
}
// 2. 构造EchoService_Stub对象(用于RPC调用)
emp::EchoService_Stub stub(&channel);
// 3. 进行RPC调用
emp::EchoRequest req;
std::cout << "请输入消息: ";
std::string msg;
getline(std::cin, msg);
req.set_message(msg);
// 4. 构造Controller对象(用于控制RPC调用)
brpc::Controller *cntl = new brpc::Controller();
emp::EchoResponse *resp = new emp::EchoResponse();
#if SYNC // 同步调用
stub.Echo(cntl, &req, resp, nullptr); // google::protobuf::Closure *done: 传入nullptr代表同步调用
if(cntl->Failed()) {
std::cout << "RPC调用失败: " << cntl->ErrorText() << std::endl;
return -1;
}
std::cout << "RPC调用成功, 响应信息: " << resp->message() << std::endl;
delete cntl;
delete resp;
#else // 异步调用
auto clusure = google::protobuf::NewCallback(callback, cntl, resp);
stub.Echo(cntl, &req, resp, clusure);
std::cout << "异步调用成功" << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
#endif
return 0;
}
封装
封装思想
由于不同的服务调用使用不同的 Stub,其封装意义不大。因此,我们主要封装通信所需的 Channel。当需要进行服务调用时,只需通过服务名称获取对应的 Channel,然后实例化 Stub 进行调用即可。
设计概要
-
Channel 管理类:
- 每个服务可能有多个主机提供服务,因此一个服务可能对应多个 Channel。需要管理这些 Channel,并提供获取指定服务 Channel 的接口。
- 在进行 RPC 调用时,根据 Round Robin(RR)轮转策略选择 Channel。
-
服务声明接口:
- 整体项目中通常会提供多个服务,当前可能并不需要用到所有服务。因此,通过声明来告知模块当前关心的服务,并建立连接进行管理。未声明的服务即使上线也不需要进行连接的建立。
-
服务上线处理接口:
- 提供新增指定服务的 Channel 的接口,以便在服务上线时进行管理。
-
服务下线处理接口:
- 提供删除指定服务下的 Channel 的接口,以便在服务下线时进行管理。
代码
class ServiceChannel
{
public:
using ptr = std::shared_ptr<ServiceChannel>;
using ChannelPtr = std::shared_ptr<brpc::Channel>;
ServiceChannel(const std::string& service_name) :
_index(0), _service_name(service_name) {}
void append(const std::string& host)
{
// 创建信道
auto channel = std::make_shared<brpc::Channel>();
brpc::ChannelOptions options;
options.protocol = "baidu_std";
options.timeout_ms = -1;
options.connect_timeout_ms = -1;
options.max_retry = 3;
int ret = channel->Init(host.c_str(), &options);
if (ret != 0)
{
LOG_ERROR("初始化{}-{}信道失败", _service_name, host);
return;
}
std::unique_lock<std::mutex> lock(_mutex);
_hosts.insert({host, channel});
_channels.push_back(channel);
} /* 服务上线一个节点 - 调用append新增信道 */
void remove(const std::string& host)
{
std::unique_lock<std::mutex> lock(_mutex);
auto it = _hosts.find(host);
if(it == _hosts.end()) {
LOG_WARN("{}-{}删除时,未找到信道信息", _service_name, host);
return;
}
for(auto vit = _channels.begin(); vit != _channels.end(); ++vit)
{
if(*vit == it->second) {
_channels.erase(vit);
break;
}
}
_hosts.erase(it);
LOG_INFO("{}-{}删除成功", _service_name, host);
} /* 服务下线一个节点 - 调用remove释放信道 */
ChannelPtr getChannel() {
std::unique_lock<std::mutex> lock(_mutex);
if(_channels.empty()) {
LOG_ERROR("当前没有能提供{}服务的节点", _service_name);
return ChannelPtr();
}
int32_t idx = _index++ % _channels.size(); // 轮转索引
return _channels[idx];
}
private:
std::mutex _mutex; // 互斥锁
int32_t _index; // 轮转索引
std::string _service_name; // 服务名称
std::vector<ChannelPtr> _channels; // 服务对应的信道集合
std::unordered_map<std::string, ChannelPtr> _hosts; // // 主机地址到信道映射
};
class ServiceManager
{
public:
using ptr = std::shared_ptr<ServiceManager>;
ServiceManager() {}
/* 获取指定服务的信道节点 */
ServiceChannel::ChannelPtr getChannel(const std::string& service_name) {
std::unique_lock<std::mutex> lock(_mutex);
auto it = _services.find(service_name);
if(it == _services.end()) {
LOG_ERROR("当前没有能提供{}服务的节点", service_name);
return ServiceChannel::ChannelPtr();
}
return it->second->getChannel();
}
/* 声明关注的服务 */
void declareTrackService(const std::string& service_name) {
std::unique_lock<std::mutex> lock(_mutex);
_track_services.insert(service_name);
}
/* 服务上线回调 */
void onServiceOnline(const std::string& service_instance, const std::string& host) {
std::string service_name = getServiceName(service_instance);
ServiceChannel::ptr service;
{
std::unique_lock<std::mutex> lock(_mutex);
auto tit = _track_services.find(service_name);
if(tit == _track_services.end()) {
LOG_DEBUG("{}-{}服务上线了 (不在关注列表中)", service_name, host);
return;
}
auto sit = _services.find(service_name);
if(sit == _services.end()) {
service = std::make_shared<ServiceChannel>(service_name);
_services.insert({service_name, service});
} else {
service = sit->second;
}
}
if(!service) {
LOG_ERROR("{}服务新增失败", service_name);
return;
}
service->append(host);
LOG_DEBUG("{}服务新增成功", service_name);
}
/* 服务下线回调 */
void onServiceOffline(const std::string& service_instance, const std::string& host) {
std::string service_name = getServiceName(service_instance);
ServiceChannel::ptr service;
{
std::unique_lock<std::mutex> lock(_mutex);
auto tit = _track_services.find(service_name);
if(tit == _track_services.end()) {
LOG_DEBUG("{}-{}服务下线了 (不在关注列表中)", service_name, host);
return;
}
auto sit = _services.find(service_name);
if(sit == _services.end()) {
LOG_WARN("删除{}服务时,未找到管理对象", service_name);
return;
}
service = sit->second;
}
service->remove(host);
LOG_DEBUG("{}服务删除成功", service_name);
}
private:
std::string getServiceName(const std::string& service_instance) {
auto pos = service_instance.find_last_of('/');
if(pos == std::string::npos) {
return service_instance;
}
return service_instance.substr(0, pos);
}
private:
std::mutex _mutex;
std::unordered_set<std::string> _track_services; // 跟踪的服务集合
std::unordered_map<std::string, ServiceChannel::ptr> _services; // 服务名称到信道集合的映射
};
更多推荐
所有评论(0)