目录
第一节:工作目录创建
第二节:属性定义
第三节:日志宏
第四节: SqliteHelper
第五节: FileHelper
下期预告:
第一节:工作目录创建
在家目录创建一个名为mq的目录,mq里又创建5个目录:mqserver、mqclient、mqtest、mqthird、mqcommon
mqserver:存放服务器相关代码
mqclient:存放客户端相关代码
mqtest:进行单元测试的目录
mqthird:存放项目要用到的库
mqcommon:存放服务器和客户端都要使用的代码,各种辅助代码就存放在此
第二节:属性定义
打开mqcommon目录,创建一个名为mq_msg.proto的文件并打开,先添加以下内容:
syntax = "proto3"; // 声明版本 package zd; // 声明命名空间
声明版本就是使用最新的proto3语法,它的语法是做过优化的;
声明命名空间的意思就是下面的内容都在名字为"zd"的命名空间中定义。
添加的第一个属性是交换机的发布模式:
// 交换机的发布模式 enum ExchangeType { UNKONW_1 = 0;// 因为只能从0开始,使用它占据0 DIRECT = 1; // 直接模式 FANOUT = 2; // 广播模式 TOPIC = 3 ; // 主题模式 };
交换机的发布模式与交换机的消息审核有关:
(1)直接模式:每个消息都有一个"钥匙"数据,每个绑定也有一个"钥匙"数据,钥匙内容完全相同才成功
(2)广播模式:只要与该交换机绑定的队列都成功,与钥匙无关
(3)主题模式:钥匙满足一定匹配规则才成功,例如消息的钥匙:"news.music.pop"与队列绑定的钥匙:"news.music"就可以成功
添加的第二个属性是消息的投递模式:
// 消息的投递模式 enum DeliveryMode { UNKONW_2 = 0;// 因为只能从0开始,使用它占据0 UNDURABLE = 1;// 非持久化投递 DURABLE = 2; // 持久化投递 };
(1)非持久化投递:消息不保存在文件中,只在内存中保存
(2)持久化投递:消息既保存在文件中,也保存在内存中
持久化的作用:如果服务器因为某些原因崩溃了,可以将持久化的消息从文件中重新读取出来。
消息想要持久化保存还有一个前置条件——消息所属队列也是持久化的,否则即使消息持久化了,也没有队列获取它。
添加的第三个属性是消息的属性:
// 消息属性 message BasicProperties { string id = 1; // 唯一识别码 DeliveryMode delivery_mode = 2; // 投递模式 string routing_key = 3; // 钥匙 };
(1)唯一识别码:消息的"身份证"
(2)投递模式:同上
(3)钥匙:与绑定钥匙进行匹配,成功将被发布给队列
最后将添加一个消息类,它保存消息的各种信息:
// 消息 message Message { // 消息载荷 // 在文件中保存的数据 message Payload { // 消息属性 BasicProperties properties = 1; // 消息内容 string body = 2; // 消息的有效性 string valid = 3; }; Payload payload = 1;// 消息载荷 // 在内存中保存的数据,辅助消息的写入和读取 uint32 offset = 2; // 消息存储位置 uint32 length = 3; // 消息长度 };
(1)消息属性:同上
(2)消息内容:真正要发布的内容
(3)消息的有效性:标识文件中持久化的消息是否有效,如果不用标识,而是将无效消息删除的话,还需要把有效消息向前填充,效率就太低了。
(4)消息的存储位置:保存这条消息在文件中相对于开头的偏移量,用于消息的无效化和读取
(5)消息的长度:存储消息时,验证它和存储的内容长度是否一致
保存文件并退出后,就可以使用以下指令生成C++代码了:
protoc --cpp_out=./ mq_msg.proto
然后就会生成两个文件:
上述定义的各种内容就被声明在mq_msg.pb.h中了。
那么为什么要使用proto生成而不自己定义呢?因为proto生成的内容符合protobuf协议,使用该协议的send函数会自动将其序列化并添加应用层报头,解决了"粘包"的问题。
第三节:日志宏
在mqcommon创建名为mq_logger.hpp的文件并打开,先添加一个防止文件重复包含的预编译指令:
#ifndef __M_LOG_H__ #define __M_LOG_H__ #endif
之后的每个头文件我们都需要添加类似的预处理指令。
然后包含需要的头文件:
#include <iostream> #include <time.h>
我们需要的日志要能够打印时间、文件名、行号和打印内容,所以使用struct tm将时间转化成各种格式,再使用__FILE__, __LINE__, ##__VA_ARGS__获得文件名、行号和其他参数:
// 宏不允许换行\n,所以每行结束加\将\n转义 #define LOG(format,...){\ /* 获得当前时间戳 */\ time_t t = time(nullptr);\ /* 时间戳转化成类,类保存了它的各种格式 */\ struct tm* ptm = localtime(&t);\ /* 将时间的时分秒提取出来,保存为字符串 */\ char time_str[32] = {0};\ strftime(time_str,31,"%H:%M:%S",ptm);\ printf("[%s][%s:%d]\t" format "\n", time_str, __FILE__, __LINE__, ##__VA_ARGS__);\ /* __VA_ARGS__是不定参数...,##__VA_ARGS__表示...默认为空 */\ /* 这样做LOG宏的...形参就可以不传参数了 */\ }
完成上述内容后就可以在mqtest中创建文件,进行测试了。
第四节: SqliteHelper
在mqcommon创建一个名为mq_helper.hpp的文件并打开,首先设置防止文件重复包含的预编译指令:
#ifndef __M_HELPER_H__ #define __M_HELPER_H__ #endif
然后包含所需头文件,日志宏也包含在内:
#include "mq_logger.hpp" #include <iostream> #include <string> #include <vector> #include <sqlite3.h>
声明一个命名空间,名字自定,之后所有的代码基本上都包含在这个命名空间中:
namespace zd {};
在命名空间中定义一个名为SqliteHelper的类,它的构造函数需要传入一个不带路径的文件名,这个文件名就是sqlite存放数据的地方:
class SqliteHelper { public: SqliteHelper(const std::string& dbfile): _dbfile(dbfile) {} private: std::string _dbfile; sqlite3* _handler; // 数据库管理句柄,用它才能对sqlite文件进行操作 }
然后我们设计它向外提供的接口。
首先是打开/创建文件的接口:
bool open(int save_level = SQLITE_OPEN_FULLMUTEX) { int ret = sqlite3_open_v2(_dbfile.c_str(),&_handler,SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | save_level,nullptr); if(ret != SQLITE_OK) { LOG("sqlite数据库打开/创建失败:%s",sqlite3_errmsg(_handler)); return false; } return true; }
SQLITE_OPEN_FULLMUTEX是sqlite文件打开的多线程模式,这种模式可以保证线程安全,所以将其设置为缺省值
其次是执行命令的接口:
bool exec(const std::string& sql,SqliteCallback cb,void* arg) { int ret = sqlite3_exec(_handler,sql.c_str(),cb,arg,nullptr); if(ret != SQLITE_OK) { LOG("%s\n语句执行失败:%s",sql.c_str(),sqlite3_errmsg(_handler)); return false; } return true; }
命名一共有五种:创建表、删除表、向指定表增加数据、向指定表删除数据、向指定表修改数据。
不同命令有自己的格式内容,但是它都能执行。
SqliteCallback cb是指令执行完毕后调用的回调函数,它的类型在SqliteHelper中重定义:
typedef int(*SqliteCallback)(void*,int,char**,char**);
而 void* arg 就是这个回调函数的各种参数。
最后是关闭数据库文件的接口:
void close() { if(_handler) sqlite3_close_v2(_handler); }
就在该文件中,定义FileHelper。
第五节: FileHelper
FileHelper的功能是提供一些文件操作接口,便于对文件的使用:
首先包含它所需的头文件:
#include <stdio.h> #include <string> #include <fstream> #include <unistd.h> #include <sys/stat.h>
构造函数的参数是带路径的文件名:
class FileHelper { public: FileHelper(const std::string filename): _filename(filename) {} private: std::string _filename; }
所需的第一个接口是判断文件是否存在的接口:
// 判断文件是否存在 bool exists() { struct stat st; return stat(_filename.c_str(),&st) == 0; }
其次是获取文件大小的接口:
// 获得文件大小 size_t size() { struct stat st; int ret = stat(_filename.c_str(),&st); if(ret < 0) // 文件不存在返回0 return 0; return st.st_size; }
读取文件数据的接口有两个,一个是从文件的起始读取,一个是根据某个偏移量从文件的中间开始读取:
// 读取数据 bool read(std::string& body) { // 根据文件大小调整body的大小 size_t fsize = this->size(); body.resize(fsize); return read(&body[0],0,fsize); } bool read(char* body,size_t offset,size_t len) { // 1.二进制读方式打开文件 std::ifstream ifs(_filename,std::ios::binary | std::ios::in); if(ifs.is_open() == false) { LOG("%s 文件打开失败!",_filename.c_str()); return false; } // 2.读文件指针 跳转到指定位置 ifs.seekg(offset,std::ios::beg); // 3.读取文件内容 ifs.read(body,len); if(ifs.good() == false) // 操作成功会返回true { LOG("%s 文件读取失败!",_filename.c_str()); return false; } // 4.关闭文件 ifs.close(); return true; }
向文件写数据的接口也有两个,一个是从开头读取文件数据,一个是根据偏移量从中间写入数据:
// 写入数据 bool write(const std::string& body) { return write(body.c_str(),0,body.size()); } bool write(const char* body,size_t offset,size_t len) { // 1.二进制写方式打开文件 std::fstream ofs(_filename,std::ios::binary | std::ios::out | std::ios::in); if(ofs.is_open() == false) { LOG("%s 文件打开失败!",_filename.c_str()); return false; } // 2.写文件指针 跳转到指定位置 // 该操作需要文件的读权限 ofs.seekp(offset,std::ios::beg); // 3.写入内容 ofs.write(body,len); if(ofs.good() == false) { LOG("%s 文件写入失败!",_filename.c_str()); return false; } // 4.关闭文件 ofs.close(); return true; }
创建文件的接口也提供两个,一个是根据this的_filename创建文件,一个设置成静态函数,供外部传入文件名创建文件:
// 创建文件 bool createFile() { // 文件不存在就会创建 std::fstream ofs(_filename.c_str(),std::ios::binary | std::ios::out); if(ofs.is_open() == false) { LOG("%s 文件创建失败!",_filename.c_str()); return false; } ofs.close(); return true; } static bool createFile(const std::string& filename) { // 文件不存在就会创建 std::fstream ofs(filename.c_str(),std::ios::binary | std::ios::out); if(ofs.is_open() == false) { LOG("%s 文件创建失败!",filename.c_str()); return false; } ofs.close(); return true; }
需要注意的是创建文件时文件会被截断,即内容被清空,所以文件已经存在就不要调用它了。
删除文件的接口也对应创建文件接口有两个:
bool removeFile() { return (::remove(_filename.c_str()) == 0); } static bool removeFile(const std::string& filename) { return (::remove(filename.c_str()) == 0); }
还需要对文件进行重命名的接口:
// 重命名 bool rename(const std::string new_filename) { return ::rename(_filename.c_str(),new_filename.c_str()) == 0; } static bool rename(const std::string filename,const std::string new_filename) { return ::rename(filename.c_str(),new_filename.c_str()) == 0; }
获取文件父级路径的对外接口:
// 获取一个文件的父级路径 static std::string parentDirectory(const std::string& filename) { size_t pos = filename.find_last_of("/"); if(pos == std::string::npos) // 找不到就是当前目录,例如"test" { return "./"; } return filename.substr(0,pos); }
创建/删除父级路径的对外接口:
// 创建/删除目录 static bool createDirectory(const std::string& path) { std::string PATH = path; if(PATH.size() > 1) { if(PATH.substr(0,2) == "./") PATH = PATH.substr(2); } size_t pos = 0; size_t idx = 0; while(idx < PATH.size()) { pos = PATH.find("/",idx); if(pos == std::string::npos) // 已经是最后一层了 { return mkdir(PATH.c_str(),0775) == 0; // 0775:其他人只允许查看 } std::string subpath = PATH.substr(0,pos); int ret = mkdir(subpath.c_str(),0775); if(ret != 0 && errno != EEXIST) { LOG("%s 目录创建失败!",subpath.c_str()); return false; } idx = pos+1; } return true; } static bool removeDirectory(const std::string path) { std::string cmd = "rm -rf "+path; return system(cmd.c_str()) != -1; }
这样文件管理类也完成了。
下期预告:
完成了日志宏、SqliteHelper、FileHelper之后,之后将完成:随机id生成类、字符串切割类并对上述5个小功能进行单元测试,没有问题后再完成工作线程池的代码。