MQ消息队列
是数据结构序列化和反序列化框架,以二进制数据传输,数据量小,效率高它是语言、平台无关的,ProtoBuf支持java,C++等多种语言(根据定义的message数据结构,使用protoc编译器生成不同语言的头文件源文件),支持多个平台扩展性好,兼容性好:可以更新数据结构,并且不会影响破坏旧程序关于兼容性,兼容性核心依赖于唯一数字标签,数据序列化时通过标签标识字段,而不是直接用字段名。:如果新版本添
一、MQ
介绍项目:
基于protobuf和muduo的发布订阅式消息队列项目是参考Rabbitmq写的,Rabbitmq是一个消息中间件,也是一个生产者消费者模型,它负责接收,存储并转发消息。
我的项目中主要有几大核心模块:
Producer生产者,向消息队列发送消息
Consumer消费者,从消息队列接收消息
Broker服务器:消息队列服务器,也就是RabbitMQ Server
Connection连接:允许客户端与Broker通信
Channel信道:是连接里的一个虚拟通道,发送和接收消息都是通过信道进行的
Exchange交换机:负责接收生产者发送的消息,根据路由匹配规则将消息路由到一个或多个队列
Queue队列:存储消息直到它们被消费者消费
Binding绑定关系:交换机和队列间的映射关系
技术选型:
使用protobuf定义消息结构并序列化/反序列化,使用muduo库以及自定义应用层协议进行网络通信,使用SQLite数据库(因为本身存储元数据,数据量也不多,数据结构也比较简单,使用mysql有点重了,对于大量的消息数据,采用文件方式存储)等。
为什么用文件存储消息?用数据库存储消息,有效数据占比很低,存储效率差。而文件存储几乎是零开销,存储的就是消息本身,不像数据库带有字段开销、索引开销、事务开销等,能节省大量磁盘成本。
应用场景:
电商秒杀系统中,购买一般流程是用户请求->秒杀逻辑->服务器接收处理->返回结果(整个过程同步阻塞效率低)。而引入rabbitmq后,秒杀API只需快速响应“请求已接收”,后续流程异步处理。使用户体验更平滑。同时消息队列会存储瞬间产生的大量请求,后端的服务器按照自己的最大处理能力,从队列中取出请求进行处理,以免后端系统超负荷运行
作用:异步解耦、流量削峰
为什么使用muduo库?
muduo是陈硕大佬编写的知名C++高性能网络库。
选择muduo作为网络底层主要基于以下几点考虑:
- 首先是性能考量:消息队列是一个需要处理大量长连接和高并发请求的核心中间件,muduo基于Reactor模式和非阻塞IO,其中one loop per thread的线程模型能很好利用多核CPU,能很好地应对高并发场景。
- 其次muduo提供了对TCP连接生命周期(建立、断开、错误)的回调机制,能很好地实现连接管理、异常处理等功能
- 最后,muduo是一个在linux下经过验证的高性能库,虽然不像libevent那样跨平台,但它设计哲学是专注于Linux高性能,而消息队列服务器通常也是部署在Linux下的,使用muduo可以避免重复造轮子。
- 技术选型时,我对比了Muduo、libevent和Boost.Asio
Boost.Asio非常强大,但抽象层次较高,对于这个项目来说有点“重”,比较复杂
libevent可以跨平台,但它的C API没有muduo的现代C++风格让我感到熟悉
最终选择muduo,因为它提供了一个在Linux下极致高性能且易于理解的Reactor框架,它的one loop per thread模型非常契合消息队列的高并发需求并且不用让我重复造轮子。
one loop per thread:
是一种非常典型的高性能网络编程线程模型,字面意思是一个线程运行一个事件循环,是Reactor模式在多线程环境下的主要实现方式之一。核心思想是让线程与事件循环一一绑定,通过线程间的分工协作来处理高并发请求。
线程数通常与CPU核心数匹配,可以最大限度利用多核CPU的计算能力,避免线程上下文切换带来的开销。
是线程安全的,每个连接的生命周期只会在一个IO线程内,其所有事件都由这个线程处理,几乎不需要复杂的锁机制,可以减少并发编程的陷阱。
Reactor:
是一种事件驱动的设计模式,它通过一个事件循环eventLoop来同步监听多个IO事件,一旦事件就绪,就立即分发给对应的回调函数处理。核心优势是非阻塞和高性能,能用很少的线程处理大量并发连接。
两者区别:
这两个不是同一个层次的概念,Reactor是一种设计模式,关注单个事件循环内部是如何工作的。one loop per thread是一种线程模型,它解决的是如何利用多核资源来扩展Reactor模式,核心是运行多个Reactor实例,每个线程一个实例,并通过线程间的分工协作来构建高性能应用。
使用了ProtoBuf
是数据结构序列化和反序列化框架,
-
以二进制数据传输,数据量小,效率高
-
它是语言、平台无关的,ProtoBuf支持java,C++等多种语言(根据定义的message数据结构,使用protoc编译器生成不同语言的头文件源文件),支持多个平台
-
扩展性好,兼容性好:可以更新数据结构,并且不会影响破坏旧程序
关于兼容性,兼容性核心依赖于唯一数字标签,数据序列化时通过标签标识字段,而不是直接用字段名。比如:
旧程序解析新数据:如果新版本添加了字段email=3,旧程序会忽略不认识的标签。
新程序解析旧数据:缺失的字段会被赋予默认值,如数字为0,字符串为""
若要删除字段,要标记字段为reserved,避免未来重用标签号引发冲突。
破坏兼容性的操作:
- 修改字段的标签号(旧数据将无法正确解析)
- 改变字段类型(如int32转到string,除非类型兼容,如int32可转到int64)
- 重用已删除的标签号(如果删除一个字段,未来又复用它的标签号表示其他字段,旧程序可能错误地将新数据解析为旧字段 )
ProtoBuf兼容性法则:不修改字段标签号、不直接修改字段类型、不重用已删除的标签号、新增字段是安全的。
使用流程:
- 编写.proto文件,定义数据结构对象以及属性和内容
- 使用protoc编译器编译.proto文件得到一系列接口代码,存放在新生成的头文件和源文件中
- 将编译生成的头文件包含进代码中,调用生成的接口,实现对.proto文件中定义的字段进行设置和获取,以及对message对象进行序列化和反序列化。
// 必须写在除去注释外的第一行,指定语法为proto3
syntax = "proto3";
// 可选声明符,可以声明其命名空间
package mmq;
enum ExchangeType // 交换机类型
{
UNKNOWTYPE = 0;
DIRECT = 1;
FANOUT = 2;
TOPIC = 3;
};
enum DeliveryMode // 是否持久化
{
UNKNOWMODE = 0;
UNDURABLE = 1;
DURABLE = 2;
};
// message定义结构化对象,网络传输中,需要为传输双方定制协议,说白了就是定义结构化数据,ProtoBuf就是以message的方式来支持我们定制协议字段,后期帮助形成类和方法使用
message BasicProperties
{
// 属性字段格式:
// 字段类型 字段名 = 字段唯一编号
string id = 1;
DeliveryMode delivery_mode = 2;
string routing_key = 3;
};
message Message
{
message Payload
{
BasicProperties properties = 1;
string body = 2;
string valid = 3;
}
Payload payload = 1;
uint32 offset = 2;
uint32 length = 3;
}
如何使用protoc编译器:
protoc [-I IMPORt_PATH] -cpp_out=OUT_DIR path/to/file.protoc
IMPORt_PATH:被编译的proto文件所处目录
OUT_DIR:编译后生成文件的目标路径
path/to/file.protoc:要编译的proto文件
例子:
protoc --cpp_out=. my.proto
关于编译生成的C++代码,对于每个message都会生成一个对应的消息类,类中提供每个字段的设置和获取,以及其他操作字段的方法。对于.h和.cc文件,分别存放类的声明和实现。
消息类的父类MessageLite中,提供了读写消息实例的方法,包括序列化与反序列化方法。
SQLite3
SQLite是一个进程内(不是一个独立的进程,可按应用程序需求进行动态或静态连接)的轻量级数据库,它是无服务器的、零配置的(不需要在系统中配置)、事务性的SQL数据库引擎。
为什么用SQLite:
- 不需要配置
- 不需要一个单独的服务器进程
- 非常轻量,完全配置时磁盘占用空间小于400KiB
- SQLite事务兼容ACID,允许从多个进程或线程安全访问
常用接口:
// 查看当前数据库在编译阶段是否启动了线程安全
int sqlite3_threadsafe(); 0-未启用 1-启用
sqlite有三种安全等级:
1. 非线程安全模式
2. 线程安全模式(不同的连接在不同的线程/进程间是安全的,即一个句柄不能用于多线程间)
3. 串行化模式(可以在不同的线程/进程间使用同一个句柄)
// 创建/打开数据库文件并返回操作句柄,成功返回SQLITE_OK
int sqlite3_open(
const char *filename, // 数据库文件名
sqlite3 **ppDb // 数据库连接句柄
);
int sqlite3_open_v2(
const char *filename, // 数据库文件名
sqlite3 **ppDb, // 数据库连接句柄
int flags, // 控制打开行为的标志位
const char *zVfs // 通常填nullptr
);
flags:
SQLITE_OPEN_READWRITE -- 以可读可写方式打开数据库文件
SQLITE_OPEN_CREATE -- 不存在数据库文件则创建
SQLITE_OPEN_NOMUTEX -- 多线程模式,只要不同的线程使用不同的连接可保证线程安全
SQLITE_OPEN_FULLMUTEX -- 串行化模式
// 执行语句,返回SQLITE_OK表示成功
int sqlite3_exec(
sqlite3* db, // 连接句柄
char *sql, // 要执行的sql语句
int (*callback)(void*, int, char**, char**),
void *arg, // 传递给回调函数的参数(可选)
char **err // 错误信息指针,一般置空
);
callbak:
void* -- 设置的在回调时传入的arg参数
int -- 一行中数据的列数
char** -- 存储一行数据的字符指针数组
char** -- 每一列的字段名称
处理成功返回0,返回非0会触发ABORT退出程序
// 销毁句柄
int sqlite3_close(sqlite3 *db); // 成功返回SQLITE_OK
int sqlite3_close_v2(sqlite3*); // 无论何时都返回SQLITE_OK
// 获取错误信息
const char *sqlite3_errmsg(sqlite3 *db);
GTest
是一个跨平台的C++单元测试框架,由google发布,是为了在不同平台上为编写C++单元测试而生。它提供了丰富的断言,致命和非致命判断,参数化等。
服务端模块
- 交换机模块
Exchange类:
using ptr = std::shared_ptr<Exchange>;
成员变量
std::string _name; // 交换机名称
mmq::ExchangeType _type; // 交换机类型
bool _durable; // 是否持久化
bool _auto_delete; // 是否自动删除
google::protobuf::Map<std::string, std::string> _args; // 其它参数
成员函数:
构造函数
// 解析str_args,将内容存储到_args成员中
void SetArgs(const std::string &str_args);
// 将_args序列化,返回一个字符串
std::string GetArgs();
using ExchangeMap = std::unordered_map<std::string, Exchange::ptr>;
ExchangeMapper类:
成员变量:
mmq::SqliteHelper _sql_helper; // 数据库管理句柄
成员函数:
// 构造函数,打开数据库,调用CreateTable()创建表
ExchangeMapper(const std::string &dbfile) : _sql_helper(dbfile)
/* 创建交换机表单,编写sql语句,Exec执行
表结构:
name varchar(32) primary key
type int
durable int
auto_delete int
args varchar(128)
*/
void CreateTable();
// 删除表单,编写sql,执行,若失败abort()退出
void RemoveTable();
// 插入数据,根据交换机指针获取数据,编写sql,执行
bool Insert(Exchange::ptr &exp);
// 删除某个交换机数据,根据交换机名,编写sql,执行
void Remove(const std::string &name);
// 恢复交换机数据,编写sql,执行,进行select查找并调用回调,获取数据
ExchangeMap Recovery();
// select查找设置的回调函数
static int SelectCallback(void *arg, int colnum, char **rows, char **fields)
{
// 定义ExchangeMap *result并承载(将arg强转为ExchangeMap *类型)
// 定义智能指针对象exp,调用make_shared<Exchange>()初始化
// 根据rows[i]获取一行数据中的每一列数据,设置到exp中
// 将exp插入到result中
// 返回0,返回非0会终止查询
}
ExchangeManager类:
using ptr = std::shared_ptr<ExchangeManager>;
成员变量:
std::mutex _mutex;
ExchangeMapper _mapper; // 数据库操作管理句柄
ExchangeMap _exchanges; // 所有的交换机[交换机名:交换机对象指针]
成员函数:
// 构造函数,传递dbfile数据库文件路径,内部调用Recovery获取所有交换机
ExchangeManager(const std::string &dbfile);
// 声明交换机,先加锁,在find判断交换机是否存在,不存在构造并插入Exchange共享指针到,若为持久化还要插入到数据库
bool DeclareExchange(const std::string &qname, mmq::ExchangeType type, bool durable, bool auto_delete, const google::protobuf::Map<std::string, std::string> &args);
// 删除交换机,先加锁,find再查看交换机是否存在,存在则删除(内存中&数据库中)
void DeleteExchange(const std::string &name);
// 获取指定交换机对象,加锁,find判断是否存在,返回
Exchange::ptr SelectExchange(const std::string &name);
// 查看交换机是否存在,加锁,find查找,有返回true,否则返回false
bool Exists(const std::string &name)
// 返回交换机数量,加锁,调用size查看
size_t Size();
// 清理所有交换机,加锁,从内存和数据库(删除表单)中删除
void Clear();
- 队列模块
MsgQueue类:
using ptr = std::shared_ptr<MsgQueue>;
成员变量:
std::string _name; // 队列名
bool _durable; // 是否持久化
bool _exclusive; // 是否独占
bool _auto_delete; // 是否自动删除
google::protobuf::Map<std::string, std::string> _args; // 其它参数
成员函数:
同交换机
using QueueMap = std::unordered_map<std::string, MsgQueue::ptr>;
MsgQueueMapper类:
成员变量:
同交换机
成员函数:
同交换机
/* 队列表单:
name varchar(32) primary key
durable int
exclusive int
auto_delete int
args varchar(128)
*/
MsgQueueManager类:
成员变量:
同交换机
成员函数:
同交换机
// 获取所有队列,加锁。线程安全问题不能返回引用
QueueMap AllQueue();
- 绑定信息模块
Binding类:
成员变量:
std::string _exchange_name;
std::string _msgqueue_name;
std::string _binding_key;
成员函数:
构造函数
// [队列名:绑定信息],每个队列都对应一个绑定信息
using MsgQueueBindingMap = std::unordered_map<std::string, Binding::ptr>;
// [交换机名:[队列名:绑定信息]],包含了所有绑定信息,以交换机为单元划分
using BindingMap = std::unordered_map<std::string, MsgQueueBindingMap>;
BindingMapper类:
成员变量:
同交换机
/* 绑定信息表单:
exchange_name varchar(32)
msgqueue_name varchar(32)
binding_key varchar(128)
*/
// 根据交换机名和队列名删除绑定信息
void Remove(const std::string &ename, const std::string &qname);
// 删除交换机关联的所有绑定信息
void RemoveExchangeBindings(const std::string &ename);
// 删除队列对应的绑定信息,包括不同交换机中的该队列
void RemoveMsgQueueBindings(const std::string &qname);
// select回调函数,先强转为BindingMap*类型,构造Binding共享指针对象,为了防止覆盖历史数据,先根据队列名得到对应的MsgQueueBindingMap,再向其中insert绑定信息
static int SelectCallback(void *arg, int colnum, char **row, char **fields);
BindingManager类:
成员变量:
同交换机
成员函数:
同交换机
// 进行绑定,加锁,根据交换机名查找对应的一个或多个绑定信息,若存在交换机&&存在队列直接返回true(表示已存在绑定信息),否则,构造绑定信息共享指针,绑定信息是否进行持久化取决于交换机和队列是否持久化,这里为了解耦,利用外来的durable参数判断,如持久化则进行Insert,接着根据队列名获取对应的[队列名:绑定信息],{}构造对象插入其中
bool Bind(const std::string &ename, const std::string &qname, const std::string &key, bool durable);
// 解绑,加锁,查看交换机,再查看队列是否存在,若存在则从内存和数据库中删除
void UnBind(const std::string &ename, const std::string &qname)
// 删除交换机对应的所有绑定信息,加锁,从内存和数据库中删除
void RemoveExchangeBindings(const std::string &ename);
// 删除队列对应的绑定信息,加锁,先从数据库中删除,再遍历交换机,删除每个交换机中的目标队列
void RemoveMsgQueueBindings(const std::string &qname);
// 获取交换机对应的所有绑定信息,加锁,判断交换机是否存在,存在返回对应的[队列名:绑定信息]
MsgQueueBindingMap GetExchangeBindings(const std::string &ename);
// 获取指定的一个绑定信息,加锁,先根据ename得到[qname:BindingPtr],再根据qname得到BindingPtr,返回
Binding::ptr GetBinding(const std::string &ename, const std::string &qname);
- 路由模块
Routing类:
成员函数:
// 检查routingkey是否合法,合法字符(a~z A~Z 0~9 . _),不能含有通配符,for循环检查即可
static bool IsLegalRoutingKey(const std::string &routing_key);
// 检查bindingkey是否合法,合法字符(a~z A~Z 0~9 . _ * #),*和#不能连续出现,首先for循环排除非法字符,再以.为分隔符,获取所有字符串,再次for循环查看是否有连续的#或*
static bool IsLegalBindingKey(const std::string &binding_key);
// 路由匹配算法,交换机有三种模式:
DIRECT:直接交换,bindingkey == routingkey则匹配成功
FANOUT:广播模式,直接返回true
TOPIC:主题交换,进行模式匹配,先将两者进行字符串分割,得到各个单词数组,定义标记数组dp(多定义一行一列,初始化dp[0][0]为true,以为一个单词匹配成功需要从左上方继承结果),如果bindingkey以#开头,则将#对应行的第0列置为true,最后使用routingkey和bindingkey中的每个单词匹配(两层for循环),若当前bindingkey的单词为#,则从左,左上,上方继承结果,若为* || 两者单词相同则从左上方继承,否则置为false,最后返回dp右下角的结果
static bool Route(ExchangeType type, const std::string &routing_key, const std::string &binding_key);
- 消息模块
// 用于构建如queue1.mqd存储消息内容的文件
#define DATAFILE_SUBFIX ".mqd"
// 构建临时文件,用于垃圾数据回收时,接受有效数据,删除前者,并与前者文件名交换
#define TMPFILE_SUBFIX ".mqd.tmp"
// Message是.proto文件中自定义的消息结构
using MessagePtr = std::shared_ptr<mmq::Message>;
MessageMapper类:
成员变量:
std::string _qname; // 队列名称
std::string _datafile; // 存储队列消息数据文件
std::string _tmpfile; // 临时文件(用于垃圾回收)
成员函数:
// 构造函数,首先创建文件夹basedir,接着用basedir+qname+DATAFILE_SUBFIX赋值给datafile,替换TMPFILE_SUBFIX赋值给_tmpfile。调用CreateMsgFile创建datafile文件
MessageMapper(std::string &basedir, const std::string &qname);
// 创建消息数据文件,创建datafile文件
bool CreateMsgFile();
// 删除消息数据文件和临时文件
void RemoveMsgFile();
// 把数据写到文件中,内部调用重载函数Insert(_datafile, msg)
bool Insert(MessagePtr &msg);
// 把数据写入文件,首先将消息的payload序列化得到字符串,之后打开datafile,定义偏移量为文件长度,之后在偏移量(也就是文件末尾)处写入sizeof size_t字节的字符串长度信息,之后接着在最后写入指定长度的字符串数据。最后设置消息的偏移量,写入长度
bool Insert(const std::string &filename, MessagePtr &msg)
// 删除一条消息,但并不是从文件中删除,而是把文件中该消息的有效标志置为“0”。首先将msg的payload中的valid属性置为"0",接着对消息payload进行序列化得到字符串,如果得到的字符串的大小与消息中length属性大小不一样,说明这里序列化新得到的字符串与原数据长度不同,返回false。之后将序列化得到字符串写入文件中原有数据所在位置进行覆盖,此时我们有消息在文件中的偏移量,可以定位到原数据位置,进行写入。
bool Remove(MessagePtr &msg);
// 垃圾数据回收,当数据量超过2000,并且一半以上都是无效数据时,进行垃圾回收,返回有效数据。首先调用Load获取所有有效数据,通过FileHelper创建_tmpfile文件。遍历有效消息链表,调用Insert将消息写入到_tmpfile。循环完毕删除datafile源文件,最后替换_tmpfile和datafile的文件名,最后将有效消息链表返回。
std::list<MessagePtr> GC();
// 加载文件中所有有效数据,通过FileHelper工具类获取文件大小,当偏移量小于文件大小时一直while循环,先读取sizeof size_t字节的数据msg_size(代表消息数据的长度),更新偏移量,再读取当前偏移msg_size长度的数据msg_body,更新偏移。定义消息对象共享指针msgp,msgp调用mutable_payload()->ParseFromString(msg_body),进行反序列化。若得到的数据vaild为"1"有效,则push到result中,继续循环。
bool Load(std::list<MessagePtr> &result);
QueueMessage类:
using ptr = std::shared_ptr<QueueMessage>;
成员变量:
std::mutex _mutex;
std::string _qname;
size_t _valid_count; // 有效消息数量
size_t _total_count; // 总持久化消息数量(包括无效的持久化消息)
MessageMapper _mapper; // 消息数据写入管理句柄
std::list<MessagePtr> _msgs; // 待推送消息
std::unordered_map<std::string, MessagePtr> _durable_msgs; // 持久化消息hash
std::unordered_map<std::string, MessagePtr> _waitack_msgs; // 待确认消息hash
成员函数:
// 构造函数
QueueMessage(std::string &basedir, const std::string &qname);
// 恢复历史消息,加锁,调用mapper的GC()获取所有有效消息链表,遍历链表消息id和消息共享指针构成键值对push到持久化消息hash中。循环结束置有效消息数量和总消息数量为持久化消息hash的size。
bool Recovery();
// 首先构造消息对象共享指针msg,设置msg的消息body,接着设置msg的BasicProperties,若bp不为空,持久化由queue_is_durable和bp的持久化策略共同决定(若队列不持久化,则消息为不持久化;若队列持久化,则消息是否持久化取决于bp的持久化策略),设置消息id为bp的id,同样设置routingkey为bp的。若bp为空,消息持久化策略根据队列是否持久化设置,消息的id由uuid工具自动生成,routingkey设置为""。接着加锁,判断消息是否持久化,若为持久化,设置消息valid为"1",调用mapper的Insert,将msg传进去,令有效消息数量加1,总消息数量加1,构建键值对[消息id:msg]插入到持久化消息hash中。如不持久化,只需将msg加入到待推送消息链表中(持久化也要加入)。
bool Insert(const BasicProperties *bp, const std::string &body, bool queue_is_durable);
// 获取并移除待推送消息链表的头结点。若链表中没有消息,返回消息共享指针的临时对象。否则,加锁,获取队头MessagePtr,并移除链表头结点,构建[id:MessagePtr]放入待确认消息hash中,返回MessagePtr。
MessagePtr Front();
// 消息被客户端确认后,从文件和内存中删除该消息,每次删除后,判断是否需要垃圾回收。加锁,从待确认消息hash中根据消息id获取消息,若消息为持久化,mapper调用Remove将文件中消息置为无效,并从持久化消息hash中删除该消息,令有效消息数量减1,最后尝试进行垃圾回收。若非持久化,在待确认消息队列中删除该消息即可(持久化也要删除)。
bool Remove(const std::string &msg_id);
// 获取所有可推送消息数量,加锁,返回待推送消息链表的元素个数。
size_t GetAbleCount();
// 返回总消息数量,加锁
size_t TotalCount();
// 返回持久化消息数量,加锁,返回持久化消息hash的元素个数
size_t DurableCount();
// 返回待确认消息数量,加锁,返回待确认消息hash的元素个数
size_t WaitAckCount();
// 清除消息队列的所有消息。加锁,mapper调用RemoveMsgFile,删除datafile和tmpfile,clear待推送消息链表,clear持久化消息hash,clear待确认消息hash,令有效消息个数和总消息个数置0
void Clear();
// 垃圾回收。先GCCheck()判断是否要垃圾回收,调用mapper的GC()获取所有有效消息,遍历有效消息链表,在持久化消息hash中根据消息id查找,若找不到,将消息添加到待推送消息链表的末尾,同时构建键值对[id:消息]插入到持久化消息hash中。若找到了,则根据当前消息更新持久化消息hash中消息在文件中的偏移和长度。循环结束后,更新有效消息和总消息数量为有效消息链表的元素个数。
void GC();
// 判断是否要进行垃圾回收(当总消息超过2000,并且有效消息不足一半时,进行垃圾回收)
bool GCCheck();
MessageManager类:
using ptr = std::shared_ptr<MessageManager>;
成员变量:
std::mutex _mutex;
std::string _basedir; // 所有队列持久化消息存储文件的目录
std::unordered_map<std::string, QueueMessage::ptr> _queue_msgs; // 对所有消息队列hash
成员函数:
// 构造函数,不直接在构造函数中获取队列信息,因为这需要包队列头文件,设计函数在最后整合时传参来获取
MessageManager(const std::string &basedir);
// 清除消息队列消息。加锁,遍历消息队列hash,每个消息队列都调用它的Clear。循环结束后,消息队列hash也进行clear。
void Clear();
// 初始化消息队列,定义消息队列共享指针qmp,新增{},在{}内加锁,根据队列名在消息队列hash中查找消息队列,找到直接返回,找不到则根据basedir和qname构造qmp,最后插入到消息队列hash中。{}外部调用qmp的Recovery,恢复历史消息。
void InitQueueMessage(const std::string &qname);
// 销毁消息队列。通过队列名,根据消息队列hash找到对应消息队列,同上,在外部定义消息队列,接着在{}内部加锁,获取消息队列并赋值,在消息队列hash中移除该消息队列,最后在{}外部调用消息队列的Clear方法。
void DestroyQueueMessage(const std::string &qname);
// 同上,根据队列名获取对应消息队列,找不到返回false,找到了进行赋值,在{}外部消息队列调用Insert插入。
bool Insert(const std::string &qname, BasicProperties *bp, const std::string &body, bool queue_is_durable)
// 获取第一个待推送消息。同理,获取消息队列调用Front。
MessagePtr Front(const std::string &qname)
// 对消息进行确认。同理获取消息队列对象,调用Remove传递消息id参数。
void Ack(const std::string &qname, const std::string &msg_id);
// 获取有效消息数量,同理
size_t GetAbleCount(const std::string &qname);
// 获取总消息数量,同理
size_t TotalCount(const std::string &qname);
// 获取持久化消息数量,同理
size_t DurableCount(const std::string &qname);
// 获取待确认消息数量,同理
size_t WaitAckCount(const std::string &qname);
- 虚拟主机模块
VirtualHost类:
成员变量:
std::string _hostname; //虚拟机名
ExchangeManager::ptr _emp; // 交换机管理句柄
MsgQueueManager::ptr _mqmp; // 队列管理句柄
BindingManager::ptr _bmp; // 绑定信息管理句柄
MessageManager::ptr _mmp; // 消息管理句柄
成员函数:
// 构造函数,用basedir初始化消息管理句柄。队列管理句柄调用AllQueue获取所有队列hash,遍历所有队列,用每个队列名作为参数给消息管理句柄的InitQueueMessage方法,初始化消息队列信息。
VirtualHost(const std::string &hostname, const std::string &basedir, const std::string &dbfile);
// 声明交换机。调用交换机管理句柄的DeclareExchange
bool DeclareExchange(const std::string &name, mmq::ExchangeType type, bool durable, bool auto_delete, const google::protobuf::Map<std::string, std::string> &args)
// 删除交换机。先删除交换机的绑定信息,再删除交换机,直接调用接口。
void DeleteExchange(const std::string &name);
// 判断交换机是否存在。直接调用接口
bool ExistExchange(const std::string &name);
// 获取交换机。直接调用接口
Exchange::ptr SelectExchange(const std::string &name);
// 声明队列。先消息管理句柄调用InitQueueMessage初始化消息队列,再声明队列。直接调用接口
bool DeclareQueue(const std::string &name, bool durable, bool exclusive, bool auto_delete, const google::protobuf::Map<std::string, std::string> &args);
// 删除队列。先消息管理句柄调用销毁消息队列接口,再绑定信息管理句柄调用删除队列绑定信息接口,最后队列管理句柄调用删除队列接口。
void DeleteQueue(const std::string &name);
// 判断队列是否存在。直接调用接口
bool ExistQueue(const std::string &name);
// 获取所有队列。直接调用接口
QueueMap AllQueues();
// 将交换机和队列绑定。先判断交换机和队列是否存在,再根据两者的durable判断绑定是否需要初始化。
bool Bind(const std::string &ename, const std::string &qname, const std::string &key);
// 解绑。直接调用接口
void UnBind(const std::string &ename, const std::string &qname);
// 获取队列的所有绑定信息。直接调用接口
MsgQueueBindingMap ExchangeBindings(const std::string &ename);
// 判断交换机和队列是否绑定。直接调用接口
bool ExistsBind(const std::string &ename, const std::string &qname);
// 向消息队列中添加消息。先判断队列是否存在,消息管理句柄再调用insert接口
bool BasicPublish(const std::string &qname, BasicProperties *bp, const std::string &body);
// 获取消息。直接调用接口Front
MessagePtr BasicConsume(const std::string &qname);
// 对消息进行确认。直接调用接口
void BasicAck(const std::string &qname, const std::string &msg_id);
// 获取消息队列的有效消息个数。直接调用接口
size_t GetQueueAbleMessageCount(const std::string &qname);
// 销毁虚拟机。对交换机、队列、绑定信息、消息管理句柄调用Clear
void Clear();
- 消费者模块
// 消费者的回调函数类型
using ConsumerCallBack = std::function<void(const std::string, const BasicProperties *bp, const std::string)>;
Consumer类:
成员变量:
std::string _tag; // 消费者唯一标识
std::string _qname; // 消费者订阅的队列名称
bool _auto_ack; // 自动确认标志
ConsumerCallBack _callback; // 回调函数
成员函数:
构造函数,对四个成员变量初始化
// 以队列为单元的消费者管理结构
QueueConsumer类:
using ptr = std::shared_ptr<QueueConsumer>;
成员变量:
std::mutex _mutex;
std::string _qname;
uint64_t _rr_seq; // 轮转序号
std::vector<Consumer::ptr> _consumers;
成员函数:
// 构造函数
QueueConsumer(const std::string &qname) : _qname(qname), _rr_seq(0);
// 队列新增消费者。加锁,遍历消费者数组,根据消费者标识判断,没有重复则新增,构建消费者对象共享指针,push到消费者数组中。返回消费者共享指针。
Consumer::ptr Create(const std::string &tag, const std::string &qname, bool ack_flag, const ConsumerCallBack &cb);
// 队列移除消费者。加锁,遍历消费者数组,找到直接erase
void Remove(const std::string &tag);
// 队列获取消费者。加锁,若没有消费者则返回空共享指针对象,获取当前轮转下标(轮转序号 % 消费者数量),之后令轮转序号加1,返回消费者数组轮转下标对应的消费者。
Consumer::ptr Choose();
// 判断队列消费者数量是否为0。加锁,返回消费者数组元素个数
bool Empty();
// 判断指定消费者是否存在。加锁,遍历消费者数组,若存在消费者标识相等则存在。
bool Exists(const std::string &tag);
// 清理所有消费者。加锁,clear消费者数组,将轮转序号置为0
bool Clear();
// 获取消费者数量。加锁,返回数组size
size_t Count();
ConsumerManager类:
using ptr = std::shared_ptr<ConsumerManager>;
成员变量:
std::mutex _mutex;
std::unordered_map<std::string, QueueConsumer::ptr> _qconsumers; //[队列名:消费者队列]
成员函数:
构造函数什么也不干
// 初始化队列消费者。加锁,判断队列hash是否存在该队列,不存在则新增,构造消费者队列,并插入到队列hash中。
void InitQueueConsumer(const std::string &qname);
// 销毁消费者队列。加锁,队列hash直接erase
void DestroyQueueConsumer(const std::string &qname);
// 创建消费者。首先定义消费者队列,接着在{}中加锁,在队列hash中,查找消费者队列,找到则赋值,找不到返回空共享指针。{}外通过消费者队列调用Create创建消费者。
Consumer::ptr Create(const std::string &tag, const std::string &qname, bool ack_flag, const ConsumerCallBack &cb);
// 删除消费者。同理{}内加锁,获取消费者队列,外部调用Remove。
void Remove(const std::string &tag, const std::string &qname);
// 同理
Consumer::ptr Choose(const std::string &qname);
bool Empty(const std::string &qname);
bool Exists(const std::string &tag, const std::string &qname);
size_t ConsumerCount(const std::string &qname);
// 加锁,队列hash进行clear。
void Clear();
- 信道模块
// 对自定义协议类型进行类型别名声明
using ProtobufCodecPtr = std::shared_ptr<ProtobufCodec>;
using OpenChannelRequestPtr = std::shared_ptr<OpenChannelRequest>;
using CloseChannelRequestPtr = std::shared_ptr<CloseChannelRequest>;
using DeclareExchangeRequestPtr = std::shared_ptr<DeclareExchangeRequest>;
using DeleteExchangeRequestPtr = std::shared_ptr<DeleteExchangeRequest>;
using DeclareQueueRequestPtr = std::shared_ptr<DeclareQueueRequest>;
using DeleteQueueRequestPtr = std::shared_ptr<DeleteQueueRequest>;
using QueueBindRequestPtr = std::shared_ptr<QueueBindRequest>;
using QueueUnBindRequestPtr = std::shared_ptr<QueueUnBindRequest>;
using BasicPublishRequestPtr = std::shared_ptr<BasicPublishRequest>;
using BasicAckRequestPtr = std::shared_ptr<BasicAckRequest>;
using BasicConsumeRequestPtr = std::shared_ptr<BasicConsumeRequest>;
using BasicCancelRequestPtr = std::shared_ptr<BasicCancelRequest>;
// 一个信道对应一个消费者
Channel类:
using ptr = std::shared_ptr<Channel>;
成员变量:
std::string _id; // 信道id
Consumer::ptr _consumer; // 消费者
muduo::net::TcpConnectionPtr _conn; // tcp连接
ProtobufCodecPtr _codec; // protobuf协议处理器
ConsumerManager::ptr _cmp; // 消费者管理句柄
VirtualHost::ptr _host; // 虚拟机
Threadpool::ptr _pool; // 线程池
成员函数:
构造函数
// 析构函数,如果该信道消费者不为空,用消费者管理句柄删除消费者。
~Channel();
// 针对交换机、队列等声明/删除等请求的基础响应。定义基础响应消息resp,设置响应id、信道id、是否成功处理请求。最后使用code调用send传递conn连接和resp参数。send操作:首先将resp序列化为二进制数据,接着进行编码(为序列化数据添加传输所需的格式信息,如长度,消息类型等),最后通过conn进行网络发送。
void BasicResponse(const std::string &rid, const std::string &cid, bool ok);
// 交换机的声明,从req中获取交换机名、交换机类型、持久化、自动删除、其它参数,交给host调用。并调用基础响应返回结果给客户端。
void DeclareExchange(const DeclareExchangeRequestPtr &req);
// 交换机的删除,从req得到交换机名,host调用接口,再调用基础响应向客户端发送响应
void DeleteExchange(const DeleteExchangeRequestPtr &req);
// 队列的声明,同理调用host创建队列,同时消费者管理句柄还要用队列名初始化消息队列,最后发送响应。
void DeclareQueue(const DeclareQueueRequestPtr &req);
// 队列的删除,要从消费者管理句柄和虚拟机中删除,响应
void DeleteQueue(const DeleteQueueRequestPtr &req);
// 队列的绑定与解绑。调用接口,响应。
void QueueBind(const QueueBindRequestPtr &req);
void QueueUnBind(const QueueUnBindRequestPtr &req);
// 消息发布。首先判断交换机是否存在,获取交换机所有绑定信息,若请求中消息属性不为空,获取消息属性和routingkey。遍历绑定信息,使用bindingkey和routingkey路由匹配,匹配成功虚拟机调用发布消息接口添加消息,若该队列存在消费者则向线程池添加一个消息消费任务(内部使用bind绑定Consume消费函数,以及参数this和队列名。再将bind后的任务push到线程池中)。循环外部响应
void BasicPublish(const BasicPublishRequestPtr &req);
// 消息确认。虚拟机调用,响应
void BasicAck(const BasicAckRequestPtr &req);
// 订阅队列消息。首先判断队列是否存在,不存在响应并返回。使用bind绑定Callback回调函数和this以及三个占位符参数作为消费者的回调函数,调用消费者管理句柄创建消费者并赋值给成员变量消费者,进行基础响应。若存在历史消息或在消费者创建之前发布消息,需要在队列中第一个消费者创建时把对应数量的消息任务抛入线程池,bind绑定Consume消费函数和this和队列名参数,获取消息队列的可推送消息数量,抛入该数量的任务到线程池。(不太妥当,优化策略:处理历史消息时不在这里抛入任务,而是在消费任务函数最后,检测是否存在历史消息,存在则再抛一个任务到线程池)。
void BasicConsume(const BasicConsumeRequestPtr &req);
// 取消订阅。消费者管理句柄调用Remove删除队列中的消费者,响应。
void BasicCancel(const BasicCancelRequestPtr &req);
// 组织推送消息,将消息推送给channel对应的客户端,是消费者保存的回调函数。定义基础消费响应,设置信道id、消费者标识、消息体,如果消息属性不为空,根据消息属性设置响应的消息id、持久化策略、routingkey。最后codec调用send发送给客户端。
void Callback(const std::string tag, const BasicProperties *bp, const std::string body);
// 抛入线程池的任务,指定队列消费消息。首先从队列中获取一个消息,再从队列中获取一个消费者,调用消费者的回调函数,传递消息标识、属性、消息体,实现消息的推送。最后如果该消费者为自动确认,则进行虚拟机调用消息确认。
void Consume(const std::string &qname);
ChannelManager类:
using ptr = std::shared_ptr<ChannelManager>;
成员变量:
std::mutex _mutex;
std::unordered_map<std::string, Channel::ptr> _channels; // 信道hash
成员函数:
构造
// 打开创建信道。加锁,查找信道hash,找不到构造信道共享指针,插入信道hash中。
bool OpenChannel(const std::string &id,
const ConsumerManager::ptr &cmp,
const VirtualHost::ptr &host,
const ProtobufCodecPtr &codec,
const muduo::net::TcpConnectionPtr &conn,
const Threadpool::ptr &pool);
// 关闭信道。加锁,信道hash直接erase
void CloseChannel(const std::string &id);
// 获取信道。加锁,信道hash查找,找到返回。
Channel::ptr GetChannel(const std::string &id);
- 连接模块
Connection类:
成员变量:
muduo::net::TcpConnectionPtr _conn; // tcp连接
ProtobufCodecPtr _codec; // protobuf协议处理器
ConsumerManager::ptr _cmp; // 消费者管理句柄
VirtualHost::ptr _host;
Threadpool::ptr _pool;
ChannelManager::ptr _channels; // 信道管理模块
成员函数:
// 构造函数
Connection(const ConsumerManager::ptr &cmp,
const VirtualHost::ptr &host,
const ProtobufCodecPtr &codec,
const muduo::net::TcpConnectionPtr &conn,
const Threadpool::ptr &pool)
: _conn(conn), _codec(codec), _cmp(cmp), _host(host), _pool(pool),
_channels(std::make_shared<ChannelManager>())
{}
// 打开信道。使用消费者管理句柄创建信道,失败成功都发送基础响应
void OpenChannel(const OpenChannelRequestPtr &req);
// 调用接口,发送响应
void CloseChannel(const CloseChannelRequestPtr &req);
// 调用接口
Channel::ptr GetChannel(const std::string &id);
// 基础响应。定义基础响应,设置响应id,信道id,是否处理成功,codec调用send发送给客户端。
void BasicResponse(const std::string &rid, const std::string &cid, bool ok);
ConnectManager类:
using ptr = std::shared_ptr<ConnectManager>;
成员变量:
std::mutex _mutex;
std::unordered_map<muduo::net::TcpConnectionPtr, Connection::ptr> _conns; // 连接hash
成员函数:
构造
// 创建新连接。加锁,连接hash查找,找不到构造并插入
void NewConnection(const ConsumerManager::ptr &cmp,
const VirtualHost::ptr &host,
const ProtobufCodecPtr &codec,
const muduo::net::TcpConnectionPtr &conn,
const Threadpool::ptr &pool);
// 删除连接。加锁,连接hash直接erase
void DelConnection(const muduo::net::TcpConnectionPtr &conn);
// 获取连接。加锁,找到返回
Connection::ptr GetConnection(const muduo::net::TcpConnectionPtr &conn);
- broker模块
// 数据库文件名
#define DBFILE "/meta.db"
// 虚拟机名
#define HOSTNAME "MyVirtualHost"
Server类:
typedef std::shared_ptr<google::protobuf::Message> MessagePtr;
成员变量:
muduo::net::EventLoop _loop; // Reactor事件循环,处理IO任务(单线程)
muduo::net::TcpServer _server; // 服务器对象
ProtobufDispatcher _dispatcher; // 请求分发器对象--要向其中注册请求处理函数
ProtobufCodecPtr _codec; // protobuf协议处理器--针对收到的请求数据进行prorobuf协议处理
VirtualHost::ptr _virutal_host;
ConsumerManager::ptr _consumer_manager;
ConnectManager::ptr _connection_manager;
Threadpool::ptr _threadpool;
成员函数:
// 构造函数,初始化server服务器(loop事件循环,处理IO、InetAddress设置监听地址0.0.0.0和端口、“Server”为服务器名、kReusePort表示多进程支持同一端口),dispatcher请求分发器(注册未知请求的回调函数),codec(编解码器,共享指针初始化,注册回调函数,bind绑定onProtobufMessage回调和参数dispatcher和占位符,这里回调函数功能是根据传来的解析过的protobuf消息,根据消息类型查找对应处理器,找到的话调用具体注册的回调函数,没找到调用默认回调)。内部要先通过虚拟机获取所有队列,用队列名初始化消费者队列,接着注册dispatcher的不同类型消息的回调函数,再注册server的建立连接回调函数,以及连接传来消息时的回调函数(bind绑定protobufcodec类的onMessage函数,和codec原生指针,三个占位符),该回调功能是:从网络消息中解析出完整的protobuf消息,并调用codec注册的onProtobufMessage根据消息类型调用不同回调函数
Server(int port, const std::string &basedir) :
_server(&_loop, muduo::net::InetAddress("0.0.0.0", port), "Server", muduo::net::TcpServer::Option::kReusePort),
_dispatcher(std::bind(&Server::OnUnkonwnMessage, this,
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)),
_codec(std::make_shared<ProtobufCodec>(std::bind(&ProtobufDispatcher::onProtobufMessage, &_dispatcher,
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))),
_virutal_host(std::make_shared<VirtualHost>(HOSTNAME, basedir, basedir + DBFILE)),
_consumer_manager(std::make_shared<ConsumerManager>()),
_connection_manager(std::make_shared<ConnectManager>()),
_threadpool(std::make_shared<Threadpool>())
// 开启服务器,server调用start,loop调用loop开始事件循环处理IO事件,loop之后主线程就变成了事件循环的专用线程
void Start();
ProtobufCodecPtr的send方法:
如_codec->send(_conn, req);
send内部实现:
void send(const muduo::net::TcpConnectionPtr& conn,
const google::protobuf::Message& message)
{
// FIXME: serialize to TcpConnection::outputBuffer()
muduo::net::Buffer buf;
fillEmptyBuffer(&buf, message);
conn->send(&buf);
}
做了三件事:
- 创建一个临时的Buffer对象
- 使用fillEmptyBuffer方法,将protobuf消息序列化到这个buf中
- 调用TcpConnection的send方法发送数据
跨线程调度:客户端向服务端发送数据是在loop线程内进行的,若当前调用send线程不是loop线程,则send方法会把实际发送操作封装为一个函数对象,通过queueInLoop方法将这个函数对象放入到loop线程的事件队列中等待处理。
muduo的Buffer类:
已读取数据 | 可读取数据 | 可写数据
/// +-------------------+------------------+------------------+
/// | prependable bytes | readable bytes | writable bytes |
/// | | (CONTENT) | |
/// +-------------------+------------------+------------------+
/// | | | |
/// 0 <= readerIndex <= writerIndex <= size
ProtobufCodec会进行proto消息类型的序列化和反序列化
协议格式:
总长度(4B)| 类型名长度(4B)| 类型名(NB)| ProtoBuf数据(MB)| 校验和(4B)
__attribute__ ((__packed__))表示不对结构体进行内存对齐,确保跨平台一致性
// struct ProtobufTransportFormat __attribute__ ((__packed__))
// {
// int32_t len;
// int32_t nameLen;
// char typeName[nameLen];
// char protobufData[len-nameLen-8];
// int32_t checkSum; // adler32 of nameLen, typeName and protobufData
// }
序列化:
void ProtobufCodec::fillEmptyBuffer(Buffer* buf, const google::protobuf::Message& message)
首先断言buffer的可读区域为0,接着获取message对象的类型名存储到string中,接着获取string对象的size+1(要包含'\0'),
将类型长度和类型字符串添加到可读缓冲区中。
之后检查消息是否初始化,若初始化了,往下进行,
接着获取消息大小(实际获取的是计算了序列化后的消息的具体字节数),获取可写缓冲区的起始地址,从起始地址写入序列化后的二进制数据,并获取末尾地址,检查末尾地址-初始地址是否等于消息大小。
之后设置校验和
最后检查可读缓冲区的大小是否正确,获取可读缓冲区大小,将大小设置到可读缓冲区前面(复用已读取数据区),完成序列化。
反序列化:
定义一个空的MessagePtr
首先判断消息长度是否等于校验和,
等于,先获取消息类型(从buffer读取4字节获取消息类型长度,再获取消息类型字符串),根据消息类型创建Message对象
若Message对象不为空,从buffer中读取proto二进制序列,message调用ParseFromArray传递二进制序列参数,进行proto消息反序列化
最后返回message对象。
void ProtobufCodec::onMessage(const TcpConnectionPtr& conn,
Buffer* buf,
Timestamp receiveTime)
从网络缓冲区中读取数据,解析ProtoBuf消息并调用对应回调。
while循环判断可读缓冲区是否存在一个最小消息。
内部获取前4字节得到消息总长度,再根据总长度判断可读缓冲区内是否包含完整消息
接着调用反序列化,得到message对象,接着调用codec初始化的回调函数(dispatcher的onProtobufMessage函数)
ProtobufDispatcher类会根据传递来的消息类型调用不同的注册过的回调函数。
void onProtobufMessage(const muduo::net::TcpConnectionPtr& conn,
const MessagePtr& message,
muduo::Timestamp receiveTime) const
首先获取参数message的消息对象描述符descriptor,这是protobuf中标识消息类型的唯一标识符,
根据标识符在回调hash中查找对应的回调并执行。
uuid生成:
根据标准uuid:8-4-4-4-12生成
本项目uuid部分基于随机数,部分基于递增计数器
如:bfd2de3c-22ca-3a1c-0000-000000004e27
前16位由随机数组成,后16位由递增原子变量组成
static std::string uuid()
使用梅森旋转算法,生成8个[0,255]范围内随机数,将随机数转为位宽2位的16进制,空位补0,
加入到字符串末尾,在合适位置添加-
定义static类型atomic<size_t>类型原子类型整数,获取后,令原子变量++,
将获取的64位无符号整型分8次,每次以8位为单位从高位到低位处理
并添加到字符串中,最后返回字符串uuid
FileHelper类:
成员变量:
std::string _filename;
成员函数:
// 文件是否存在
bool Exists()
{
struct stat st;
return (stat(_filename.c_str(), &st) == 0);
}
// 通过struct stat进行获取,它是一个系统数据结构,用于存储一个文件的详细信息
// 调用stat接口时,传递文件名和stat结构体变量地址,操作系统内核会去文件系统查找文件所有信息
// 并填充到结构体变量中,通过这个结构体就能获取到文件的各种属性。
// 文件大小
size_t Size()
{
struct stat st;
int ret = stat(_filename.c_str(), &st);
if(ret < 0) return 0;
return st.st_size;
}
// 读取文件内容
// 使用std::ifstream ifs(_filename.c_str(), std::ios::binary | std::ios::in);
// 以二进制打开,因为默认文本打开可能会对换行进行\r\n处理,导致文件损坏
// 通过is_open检查是否成功打开,
// 通过ifs.seekg(offset, std::ios::beg)从begin位置跳转到之后的offset文件,定位输出流
// 通过ifs.read(body, len),从读指针处读取len字节的数据到body中
// 通过if(ifs.good() == false) 判断上一步读取是否出错
// 最后ifs.close()关闭文件
bool Read(char *body, size_t offset, size_t len)
// 向文件写入
// 使用std::fstream fs(_filename, std::ios::binary | std::ios::in | std::ios::out);
// 不使用ofstream是因为它没有seekg跳转读写位置的句柄,不使用std::ios::trunc截断打开
// is_open检查是否成功打开
// seekp移动写指针
// 通过fs.write(body, len)写入数据
// good检查是否写入成功
// 关闭文件
bool Write(const char *body, size_t offset, size_t len)
// 重命名
// 使用return (::rename(_filename.c_str(), nname.c_str()) == 0)
// C标准库函数rename,分别传递原文件名和新文件名字符串即可
bool Rename(const std::string &nname)
// 创建文件
// 使用std::ofstream ofs(filename, std::ios::binary | std::ios::out)
// is_open检查,成功打开即成功创建文件
// 若文件存在,会清空内容(ofstream的默认模式包含trunc)
// 关闭文件
static bool CreateFile(const std::string &filename)
// 删除文件
// 调用C标准库函数remove,传递文件名字符串即可
// 注意:remove只能删除文件或空目录,对于非空目录会失败。
static bool RemoveFile(const std::string &filename)
// 创建目录
// 若path为aaa/bbb/ccc/ddd,要迭代创建,从第一个目录开始创建
// 循环分割路径,通过不断查找斜杠/,将完整路径分割为逐步增长的子路径
// 对每个子路径调用mkdir,传递路径字符串和权限,
// int ret = mkdir(subpath.c_str(), 0775),0775是八进制数,表示权限标志,rwxrwxr-x
// 如果mkdir失败,且errno不是EEXIST(目录已存在导致的失败),则返回false
static bool CreateDirectory(const std::string &path)
// 删除目录
// 编写指令std::string cmd = "rm -rf " + path
// system(cmd.c_str())调用system函数执行shell指令
static bool RemoveDirectory(const std::string &path)
// 获取上级目录路径
// 查找最后一个斜杠/,若找不到,说明当前目录就是父目录,返回./
// 找到了,截取从开始到最后一个斜杠之前的字符串并返回
static std::string ParentDirectory(const std::string &filename)
日志:
namespace mmq
{
#define DBG_LEVEL 0
#define INF_LEVEL 1
#define ERR_LEVEL 2
#define DEFAULT_LEVEL DBG_LEVEL
#define LOG(lev_str, level, format, ...){\
if(level >= DEFAULT_LEVEL){\
time_t t = time(nullptr);\
struct tm *ptm = localtime(&t);\
char time_str[32];\
strftime(time_str, 31, "%H:%M:%S", ptm);\
printf("[%s][%s][%s:%d]\t" format "\n", lev_str, time_str, __FILE__, __LINE__, ##__VA_ARGS__);\
}\
}
#define DLOG(format, ...) LOG("DBG", DBG_LEVEL, format, ##__VA_ARGS__)
#define ILOG(format, ...) LOG("INF", INF_LEVEL, format, ##__VA_ARGS__)
#define ELOG(format, ...) LOG("ERR", ERR_LEVEL, format, ##__VA_ARGS__)
}
首先通过time获取当前时间戳,
再使用localtime将时间戳转为本地时间的结构体tm,
再使用strftime将tm结构体格式化为字符串这里格式是"小时:分钟:秒",
最后printf格式化输出:
format是调用者传入的格式字符串(例如 "Value is %d")
##__VA_ARGS__是一个宏的变长参数,对应format所需的参数
##的作用:若是__VA_ARGS__可变参数为空,宏替换后,printf的格式化字符串外面会多一个逗号导致语法错误
##放在__VA_ARGS__之前时,若可变参数为空,则预处理器会删除它前面的逗号
问题:
-
在消费者客户端消费消息时,将其强制退出,再次启动消费者,发现文件中存在消息,但消费者却无法消费:
通过日志定位,发现原因是代码中,若有消费者到来,服务端会根据其订阅的队列的待推送消息个数,抛入对应个数的任务到线程池中,每个任务中会从待推送消息链表中获取一个消息进行推送,但客户端退出后,会造成内存中消息链表中的消息被全部获取但发送失败的情况。再次启动消费者订阅该队列,内存中的待推送消息链表为空,没有消息推送。但消息仍在磁盘中存在,当服务重启时,会将磁盘中的消息加载到待推送消息队列中。
为了解决这个问题,我先在消息send发送之前,检查连接是否有效,若无效,根据消息id,在待确认消息hash中查找消息,并将消息重新push到待推送消息链表中。在获取消费者时若没有消费者,也将消息重新入队。
但在这个过程中我有意识到一个新的问题,服务端成功发送给消费者,但没有收到ack,消费者就异常退出的情况下,也会发生内存中的消息丢失。可以给消息的属性中添加消费者标识字段,当连接断开时,识别断开的消费者标识,在待确认消息hash中查找对应的消息添加到待推送消息链表中。
但待推送消息链表的键值是消息id,要根据消费者标识查找消息的话需要遍历整个hash,我觉得效率太低了,于是新增了一个反向索引,以空间换时间,用hash表维护消费者标识以及其对应的所有消息id,根据消费者标识可直接找到所有对应消息,效率提高很多。
最后我觉得在消息中添加消费者标识字段会增加生产者和消费者之间的耦合性,于是查找资料,发现rabbitmq官方是通过服务端信道和交付标签实现的,当rabbitmq服务器通过某一个信道将一条消息推送给消费者时,它会为这次投递生成一个唯一的,在当前信道内单调递增的ID,这就是交付标签。向消费者推送以及消费者ACK都会传递这个交付标签,若消费者异常退出,会根据channel中交付标签的映射将消息重新推送到队列中。以此解决消费者异常退出导致的内存中消息丢失问题。
落实到我的项目中,就是禁用proto文件里ACK请求结构中的消息id字符串,并新增整型的交付标签。并在channel中维护一个uint64类型的计数器以及交付标签与未确认的消息的映射(还要处理多线程并发问题)。这样不仅解决了消费者异常退出导致的内存中消息丢失和之前的生产者消费者耦合问题,用整形替换字符串还减少了网络传输的数据量,提升了效率。 -
在测试我的仿rabbitmq项目时,消费者声明了一个队列,但这个队列名字拼错了,之后我想清理掉这个队列。当时犯了一个错误,我为了省事,绕过我写的删除队列的API,跑到数据库里执行delete语句,把那个队列记录删除了。
结果就出现问题了:之后生产者发布消息时,我的服务器就开始报错,我查日志发现,问题出在路由部分,交换机会查找binding表,找到所有绑定的队列,而那个被删除的队列的绑定关系还留在表里,导致系统一直试图往一个不存在的队列发消息,所以就报错了。
从这件事我学到了:不能图省事直接操作数据库,必须通过封装好的接口来操作资源,因为接口内包含了完整的业务逻辑,比如删除队列时,会先删除绑定信息,再删除队列本身。
如何解决:再声明了一个同名队列,然后调用删除接口,确保删除所有队列关联数据。或给数据库加上外键约束。 -
之前使用json进行网络数据传输,后来觉得json这样的文本协议太大了,于是改用protobuf协议进行二进制传输,大大减少了网络带宽的占用(为什么protobuf更高效:json作为文本格式,所有数据都要转为字符形式,如数字42要转为4和2两个字符,而protobuf使用计算机原生二进制表示,同样一个数字只用1字节;元数据开销更小:json需要存储大量结构分隔符如大括号、冒号、逗号,而protobuf使用预定义的数字标识符代替字段名,同时省略了所有结构分隔符;对常见数据类型做了优化编码)
-
设计应用层协议比较麻烦,之后查看muduo库中的样例,了解了muduo和protobuf之间应用层协议的设计实现、序列化反序列化,以及根据protobuf消息类型调用对应回调等方法。
-
消费者将队列中的所有消息消费完毕后,查看消息队列文件大小发现文件大小不为零:
之后检查发现,垃圾数据回收机制要求消息量超过2000,并且一半以上都是无效消息时才触发。所以此时文件内都是无效消息,重启服务器后,之前的文件大小就变为零了,因为消息队列初始化时不管有多少消息,会直接调用GC进行回收。 -
生产者发布消息时,使用主题交换,但交换机中所有队列都收到了消息:
初次声明交换机时,使用了广播交换,并写入数据库,后来改用主题交换后没有更新数据库中交换机的类型,导致依旧使用广播交换。但不能修改已存在交换机的类型,因为这样会导致已绑定的路由行为混乱,以前路由和现在路由的消息混合到一起了。最好的方法是创建新的交换机进行使用。当生产者消费者客户端声明交换机时,若交换机名相同,类型不同,则直接返回错误。 -
性能优化点:
垃圾回收机制,在总消息超过2000条且无效数据超过一般触发,避免频繁回收浪费性能。
消息队列文件为空时,生产者发布消息,之后消费者消费有时会阻塞,有时能正常消费,使用gdb调试启动时,就不会阻塞。网络问题
生产者发布消息时,强制退出服务端,生产者会永久阻塞,因为WaitResponse中是无条件阻塞等待响应的。可以为WaitResponse添加超时机制使用wait_for替换wait
消费者则可能会一直循环,但如果断开连接时,消费者正在调用BasicConsume方法,则会在内部的WaitResponse永久阻塞
互斥锁应用场景:服务器线程池中抛入的任务中,会从队列中获取一个消费者,但同时可能会向队列中新增或删除消费者,新增还好,若是删除的话,会导致向已删除的消费者发送消息。同理从队列中获取消息、进行ACK确认时,也需要使用互斥锁
直接ctrl + c退出消费者客户端会发生什么?
服务端资源能正确释放。客户端退出,服务端连接超时后关闭连接,触发onConnection回调,调用delConnection,随着connection的删除,它的成员变量会被回收,导致channel共享指针引用计数变为0,调用channel的析构函数,并删除客户端注册的消费者信息。
场景:运行消费者,先退出server,再启动server,问题?
服务端异常退出后,客户端原有连接会失效,若此时客户端发送数据,会检测错误并触发回调,可以在回调中发起重连。
更多推荐
所有评论(0)