# chatserver **Repository Path**: wzihalo/chatserver ## Basic Information - **Project Name**: chatserver - **Description**: 基于muduo网络库实现的ChatServer服务器 - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2025-08-06 - **Last Updated**: 2025-09-05 ## Categories & Tags **Categories**: Uncategorized **Tags**: muduo, MySQL, cmake, Redis, Nginx ## README # chatserver #### 介绍 基于muduo网络库实现的ChatServer服务器 #### 软件架构 软件架构说明 # C++实现集群聊天服务器 [TOC] # 技术栈 - Json序列化和反序列化 - muduo网络库开发 - nginx源码编译安装和环境部署 - nginx的tcp负载均衡器配置 - redis缓存服务器编程实践 - 基于发布-订阅的服务器中间件redis消息队列编程实践 - MySQL数据库编程 - CMake构建编译环境 - Github托管项目 # 项目需求 1. 客户端新用户注册 2. 客户端用户登录 3. 添加好友和添加群组 4. 好友聊天 5. 群组聊天 6. 离线消息 7. nginx配置tcp负载均衡 8. 集群聊天系统支持客户端跨服务器通信 # 项目目标 1. 掌握服务器的网络I/O模块,业务模块,数据模块分层的设计思想 2. 掌握C++ muduo网络库的编程以及实现原理 3. 掌握Json的编程应用 4. 掌握nginx配置部署tcp负载均衡器的应用以及原理 5. 掌握服务器中间件的应用场景和基于发布-订阅的redis编程实践以及应用原理 6. 掌握CMake构建自动化编译环境 7. 掌握Github管理项目 # 开发环境 1. ubuntu linux环境 2. 安装Json开发库 资源中有json.cpp文件直接包含即可 3. 安装boost + muduo网络库开发环境 boots安装教程:[Linux下boost库环境搭建_linux 安装boost-CSDN博客](https://blog.csdn.net/x_fengmo/article/details/137648322) boost下载:[Boost Getting Started on Unix Variants](https://www.boost.org/doc/libs/1_82_0/more/getting_started/unix-variants.html#the-boost-distribution) 解压:`tar --bzip2 -xf /boost_1_82_0.tar.bz2` 编译:`./bootstrap.sh` 运行b2: `./b2` `sudo cp -r /home/tony/file/boost_1_82_0 /usr/local` `sudo cp /home/tony/file/boost_1_82_0/stage/lib/* /usr/lib` `sudo apt-get install python-dev` 运行:`./b2 install` 不成功就加上 `sudo` muduo安装:[Linux平台下muduo网络库源码编译安装与测试,包含boost库的安装与测试](https://blog.csdn.net/m0_73537205/article/details/138353805) 4. 安装redis环境: [在Ubuntu上部署Redis并设置密码以及允许外网访问 ](https://www.cnblogs.com/fanggm/p/17981693) 查看当前Redis的运行状态:`systemctl status redis` 启动redis:`sudo systemctl start redis` 5. 安装mysql数据库环境 : [在Ubuntu 22.04 LTS 上安装 MySQL两种方式:在线方式和离线方式_ubuntu离线安装mysql-CSDN博客](https://blog.csdn.net/weixin_45626288/article/details/133220238) 启动mysql:`sudo systemctl start mysql` 设置为开机自启动:`sudo systemctl enable mysql` 检查MySQL状态:`sudo systemctl status mysql` 登录mysql,在默认安装时如果没有让我们设置密码,则直接回车就能登录成功。`sudo mysql -uroot -p` 设置密码:`ALTER USER 'root'@'localhost' IDENTIFIED WITH mysql_native_password BY '新密码';` sudo netstat -tanp mysql -u root -p123456 6. 安装nginx: 7. 安装CMake环境: 查询ip ` ifconfig -a` **查看当前系统所有 TCP 网络连接、监听端口以及对应进程** ```ini sudo netstat -tanp ``` # Json Json是一种轻量级的数据交换格式(也叫数据序列化方式)。Json采用完全独立于编程语言的文本格式 来存储和表示数据。简洁和清晰的层次结构使得 Json 成为理想的数据交换语言。 易于人阅读和编 写,同时也易于机器解析和生成,并有效地提升网络传输效率。 protobuf更加高效,-> rpc项目 ## 一个优秀的Json三方库 JSON for Modern C++ 是一个由德国大牛 nlohmann 编写的在 C++ 下使用的 JSON 库。 具有以下特点 - 直观的语法 - 整个代码由一个头文件组成 json.hpp,没有子项目,没有依赖关系,没有复杂的构建系统,使用 起来非常方便 - 使用 C++ 11 标准编写 - 使用 json 像使用 STL 容器一样 - STL 和 json 容器之间可以相互转换 - 严谨的测试:所有类都经过严格的单元测试,覆盖了 100% 的代码,包括所有特殊的行为。此 外,还检查了 Valgrind 是否有内存泄漏。为了保持高质量,该项目遵循核心基础设施倡议(CII) 的最佳实践 ## 包含json头文件 在网络中,常用的数据传输序列化格式有XML,Json,ProtoBuf,在公司级别的项目中,大量的在使用 ProtoBuf作为数据序列化的方式,以其数据压缩编码传输,占用带宽小,同样的数据信息,是Json的 1/10,XML的1/20,但是使用起来比Json稍复杂一些,所以项目中我们选择常用的Json格式来打包传输 数据。 下面列举一些项目中用到的有关Json数据的序列化和反序列化代码,仅供参考!JSON for Modern C++这个三方库的使用非常简单,如下所示: ```cpp #include "json.hpp" using json = nlohmann::json; ``` ## Json数据序列化 就是把我们想要打包的数据,或者对象,直接处理成Json字符串。 1. 普通数据序列化 ```cpp void func1(){ json js; js["msg_type"] = 2; js["from"] = "zhangsan"; js["to"] = "Lisi"; js["msg"] = "hello, what is your name?"; cout << js < vec; vec.push_back(1); vec.push_back(2); vec.push_back(5); js["list"] = vec; // 直接序列化一个map容器 map m; m.insert({1, "黄山"}); m.insert({2, "华山"}); m.insert({3, "泰山"}); js["path"] = m; string sendBuf = js.dump(); // json数据对象 =》序列化 json字符串 cout << sendBuf << endl; } ``` 强大到直接把C++ STL中的容器内容可以直接序列化成Json字符串 ## Json数据反序列化 ```cpp string func1(){ json js; js["msg_type"] = 2; js["from"] = "zhangsan"; js["to"] = "Lisi"; js["msg"] = "hello, what is your name?"; string sendBuf = js.dump(); return sendBuf; } int main(){ string recvBuf = func1(); // 数据的反序列化 json字符串 =》反序列化 数据对象(看作容器,方便访问) json jsbuf = json::parse(recvBuf); cout << jsbuf["msg"] << endl; return 0; } ``` # muduo网络库编程 ## muduo源码编译安装 muduo库源码编译安装和环境搭建 muduo库的使用需要链接 /usr/lib /usr/local/lib libmuduo_base.so libmuduo_net.so libpthread.so 链接的时候需要写: -lmuduo_net -lmuduo_base -lpthread ## 基于muduo的客户端服务器编程 基于muduo的客户端服务器编程 muduo网络库的编程很容易,要实现基于muduo网络库的服务器和客户端程序,只需要简单的组合 TcpServer和TcpClient就可以,代码实现如下: ```cpp /* 服务器类,基于muduo库开发 */ /* muduo网络库给用户提供了两个主要的类 TcpServer :用于编写服务器程序的 TcpClient :用于编写客户端程序的 封装 :epoll + 线程池 好处: 能够把 网络I/O的代码 和 业务代码 区分开 业务代码:用户的连接和断开 用户的可读写事件 */ #include #include #include #include #include // 绑定器 using namespace std; using namespace muduo; using namespace muduo::net; using namespace placeholders; // 参数占位符 // 编译: g++ -o server muduo_server.cpp -lmuduo_net -lmuduo_base -lpthread /* 基于muduo网络库开发服务器程序 1. 定义TcpServer对象 2. 创建EventLoop事件循环对象的指针 3. 明确TcpServer构造函数需要什么参数,输出chatserver的构造函数 4. 在当前服务器类的构造函数当中,注册处理连接的回调函数 和 处理读写事件的回调函数 5. 设置合适的服务器线程数量, muduo库会自己分配I/O线程和worker线程 */ class ChatServer { public: ChatServer(EventLoop *loop, // 事件循环 const InetAddress &listenAddr, // IP + Port const string &nameArg) // 服务器的名字 : _server(loop, listenAddr, nameArg), _loop(loop) { // 给服务器注册用户连接的创建和断开回调 当监听到 就调用ChatServer::onConnection _server.setConnectionCallback(std::bind(&ChatServer::onConnection, this, _1)); // 给服务器注册用户读写事件回调 _server.setMessageCallback(std::bind(&ChatServer::onMassage, this, _1, _2, _3)); // 设置服务器端的线程数量 1个I/O线程(新用户的连接) 3个worker线程(处理已连接用户的读写时间) _server.setThreadNum(4); } // 开启事件循环 void start() { _server.start(); } private: // 专门处理用户的连接创建和断开 自己编写epolls时 从listenfd 上accept void onConnection(const TcpConnectionPtr &conn) { if (conn->connected()) { cout << conn->peerAddress().toIpPort() << "->" << conn->localAddress().toIpPort() << " state: online" << endl; } else // 连接断开 { cout << conn->peerAddress().toIpPort() << "->" << conn->localAddress().toIpPort() << " state: offline" << endl; conn->shutdown(); // 相当于close(fd) 释放socket // _loop->quit(); } } // 专门处理用户的读写事件 void onMassage(const TcpConnectionPtr &conn, // 连接 Buffer *buffer, // 缓冲区 Timestamp time) // 接收到数据的时间信息 { string buf = buffer->retrieveAllAsString(); cout << "recv data: " << buf << " time: " << time.toString() << endl; conn->send(buf); // 回声服务器, 收到什么发什么 } TcpServer _server; // #1 定义TcpServer对象 EventLoop *_loop; // #2 创建EventLoop事件循环对象的指针 epoll }; int main() { EventLoop loop; // epoll InetAddress addr("127.0.0.1", 6000); ChatServer server(&loop, addr, "ChatServer"); server.start(); // 将listenfd 通过epoll_ctl 添加到epoll上 loop.loop(); // epoll_wait 以阻塞方式等待新用户连接,已连接用户的读写事件等 return 0; } ``` 执行指令进行编译: ```ini g++ -o server muduo_server.cpp -lmuduo_net -lmuduo_base -lpthread ``` 使用CMakeLists.txt 简化编译指令, 防止依赖过多指令太长 ```cmake cmake_minimum_required(VERSION 3.0) project(main) # 配置编译选项 set(CMAKE_CXX_FLAGS ${CMAKE_CXX_FLAGS} -g) # 配置头文件的搜索路径 # include_directories() # 配置库文件的搜索路径 # link_directories() #设置需要编译的源文件列表 set(SRC_LIST ./muduo_server.cpp) #把.指定路径下的所有源文件名字放入变量名SRC_LIST里面 # aux_source_directory(. SRC_LISH) # 生成可执行文件server, 由后面的源文件名称或者变量定义的源文件编译而来 add_executable(目标文件, 依赖文件1,...) # add_executable(server muduo-server.c) add_executable(server ${SRC_LIST}) # 表示server 需要链接后面的库文件 target_link_libraries(server muduo_net muduo_base pthread) ``` 执行: `cmake .` 执行:`make` 文件组成: - bin:生成的可执行文件 - lib:生成的中间库文件 - include:头文件 - src:源文件 - build:编译过程中产生的中间文件 - example:示例代码 - thirdparty:第三方文件 - CMakeLists.txt - autobuild.sh - readme CMakeLists中添加:使生成的可执行文件保存至/bin中 ```ini # 设置可执行文件最终存储的路径 set(EXECUTABLE_OUTPUT_PATH ${PROJECT_SOURCE_DIR}/bin) ``` ```cpp /* 客户端实现,基于C++ muduo网络库 */ class ChatClient { public: ChatClient(muduo::net::EventLoop *loop, const muduo::net::InetAddress &addr) :_client(loop, addr, "ChatClient"){ // 设置客户端TCP连接回调接口 _client.setConnectionCallback(bind(&ChatClient::onConnection, this, _1)); // 设置客户端接收数据回调接口 _client.setMessageCallback(bind(&ChatClient::onMessage, this, _1, _2, _3)); } // 连接服务器 void connect(){ _client.connect(); } private: // TcpClient绑定回调函数,当连接或者断开服务器时调用 void onConnection(const muduo::net::TcpConnectionPtr &con); // TcpClient绑定回调函数,当有数据接收时调用 void onMessage(const muduo::net::TcpConnectionPtr &con, muduo::net::Buffer *buf, muduo::Timestamp time); muduo::net::TcpClient _client; }; ``` ## 用muduo中的线程池做计算任务 采用muduo进行服务器编程,如果遇到需要开辟多线程单独来处理复杂的计算任务或者其它阻塞任务 等,不需要直接调用pthread_create来创建线程,muduo库提供的ThreadPool线程池管理类已经把Linux 的线程创建完全封装起来了,如果想研究源码,可以剖析muduo中ThreadPool.cc和Thread.cc。 ThreadPool使用示例: ```cpp // 客户端输入界面,在单独的线程中接收用户输入进行发送操作 void userClient(const muduo::net::TcpConnectionPtr &con); muduo::ThreadPool _pool; ``` ```cpp /* 连接服务器成功后,开启和服务器的交互通信功能 */ if (con->connected()) // 和服务器连接成功 { LOG_INFO << "connect server success!"; // 启动线程专门处理用户的输入操作 _pool.run(bind(&ChatClient::userClient, this, con)); } else // 和服务器连接失败 {} ``` ## muduo的日志系统 在开发软件产品过程中,日志的输出非常重要,可以记录很多软件运行过程中的信息,方便定位调试问 题,跟踪统计信息等等,muduo库提供的日志级别有: ```cpp #define LOG_TRACE if (muduo::Logger::logLevel() <= muduo::Logger::TRACE) \ muduo::Logger(__FILE__, __LINE__, muduo::Logger::TRACE, __func__).stream() #define LOG_DEBUG if (muduo::Logger::logLevel() <= muduo::Logger::DEBUG) \ muduo::Logger(__FILE__, __LINE__, muduo::Logger::DEBUG, __func__).stream() #define LOG_INFO if (muduo::Logger::logLevel() <= muduo::Logger::INFO) \ muduo::Logger(__FILE__, __LINE__).stream() #define LOG_WARN muduo::Logger(__FILE__, __LINE__, muduo::Logger::WARN).stream() #define LOG_ERROR muduo::Logger(__FILE__, __LINE__,muduo::Logger::ERROR).stream() #define LOG_FATAL muduo::Logger(__FILE__, __LINE__,muduo::Logger::FATAL).stream() #define LOG_SYSERR muduo::Logger(__FILE__, __LINE__, false).stream() #define LOG_SYSFATAL muduo::Logger(__FILE__, __LINE__, true).stream() // 示例: LOG_INFO << "记录相应级别的日志信息"; ``` # Muduo网络库的多线程模型 ## 网络服务器编程常用模型 **【方案1】 : accept + read/write** 不是并发服务器 **【方案2】 : accept + fork - process-pre-connection** 适合并发连接数不大,计算任务工作量大于fork的开销 **【方案3】 :accept + thread thread-pre-connection** 比方案2的开销小了一点,但是并发造成线程堆积过多 **【方案4】: muduo的网络设计:reactors in threads - one loop per thread** 方案的特点是one loop per thread,有一个main reactor负载accept连接,然后把连接分发到某个sub reactor(采用round-robin的方式来选择sub reactor),该连接的所用操作都在那个sub reactor所处的线程中完成。多个连接可能被分派到多个线程中,以充分利用CPU。 Reactor poll的大小是固定的,根据CPU的数目确定。 ```cpp // 设置EventLoop的线程个数,底层通过EventLoopThreadPool线程池管理线程类EventLoopThread _server.setThreadNum(10); ``` 一个Base IO thread负责accept新的连接,接收到新的连接以后,使用轮询的方式在reactor pool中找 到合适的sub reactor将这个连接挂载上去,这个连接上的所有任务都在这个sub reactor上完成。 如果有过多的耗费CPU I/O的计算任务,可以提交到创建的ThreadPool线程池中专门处理耗时的计算任 务。 **【方案5】 : reactors in process - one loop pre process** nginx服务器的网络模块设计,基于进程设计,采用多个Reactors充当I/O进程和工作进程,通过一把 accept锁,完美解决多个Reactors的“惊群现象”。 ## muduo中的reactor模型 reactor模型是什么?先看一下维基百科的标准解释 > The reactor design pattern is an event handling pattern for handling service requests delivered concurrently to a service handler by one or more inputs. The service handler then demultiplexes the incoming requests and dispatches them synchronously to the associated request handlers. 从上面的描述,可以看出如下关键点: 1. 事件驱动(event handling) 2. 可以处理一个或多个输入源(one or more inputs) 3. 通过Service Handler同步的将输入事件(Event)采用多路复用分发给相应的Request Handler(多个)处理 # 服务器集群 ## 负载均衡器 - 一致性哈希算法 单台服务器受限于硬件资源,其性能是有上限的,当单台服务器不能满足应用场景的并发需求量时,就 需要考虑部署多个服务器共同处理客户端的并发请求,但是客户端怎么知道去连接具体哪台服务器呢? 此时就需要一台负载均衡器,通过预设的负载算法,指导客户端连接服务器。 负载均衡器有基于客户端的负载均衡和服务器的负载均衡。 普通的基于哈希的负载算法,并不能满足负载均衡所要求的单调性和平衡性,但一致性哈希算法非常好 的保持了这两种特性,所以经常用在需要设计负载算法的应用场景当中。 ## nginx下载配置tcp负载均衡 在服务器快速集群环境搭建中,都迫切需要一个能拿来即用的负载均衡器,nginx在1.9版本之前,只支 持http协议web服务器的负载均衡,从1.9版本开始以后,nginx开始支持tcp的长连接负载均衡,但是 nginx默认并没有编译tcp负载均衡模块,编写它时,需要加入--with-stream参数来激活这个模块。 > nginx编译加入--with-stream参数激活tcp负载均衡模块 nginx编译安装需要先安装pcre、openssl、zlib等库,也可以直接编译执行下面的configure命令,根据错误提示信息,安装相应缺少的库。 ```shell $ wget https://nginx.org/download/nginx-1.26.2.tar.gz $ tar -xzf nginx-1.26.2.tar.gz $ cd nginx-1.26.2 $ ./configure --with-stream $ make $ sudo make install ``` 编译完成后,默认安装在了/usr/local/nginx目录。 ```shell $ cd /usr/local/nginx/ $ ls conf html logs sbin ``` 可执行文件在sbin目录里面,配置文件在conf目录里面。 #### nginx配置tcp负载均衡 主要在conf目录里面配置nginx.conf文件,配置如下: ``` events { worker_connections 1024; } # nginx tcp loadbalance config stream { upstream MyServer { server 127.0.0.1:6000 weight=1 max_fails=3 fail_timeout=30s; server 127.0.0.1:6002 weight=1 max_fails=3 fail_timeout=30s; } server { proxy_connect_timeout 1s; #proxy_timeout 3s; listen 8000; proxy_pass MyServer; tcp_nodelay on; } } http { include mime.types; default_type application/octet-stream; ``` 配置完成后 ```shell nginx -s reload # 重新加载配置文件启动 ,平滑重启(不需要关闭服务直接更新配置文件) nginx -s stop # 停止nginx服务 --- 相关指令 ``` # 服务器中间件-基于发布-订阅的Redis ## 集群服务器之间的通信设计 集群部署的服务器之间进行通信,最好的方式就是引入中间件消息队列,解耦各个服务器,使整个系统 松耦合,提高服务器的响应能力,节省服务器的带宽资源。 在集群分布式环境中,经常使用的中间件消息队列有ActiveMQ、RabbitMQ、Kafka等,都是应用场景广 泛并且性能很好的消息队列,供集群服务器之间,分布式服务之间进行消息通信。限于我们的项目业务 类型并不是非常复杂,对并发请求量也没有太高的要求,因此我们的中间件消息队列选型的是-**基于发布-订阅模式的redis**。 ### redis环境安装和配置 ```shell $ sudo apt-get install redis-server # ubuntu命令安装redis服务 ``` ubuntu通过上面命令安装完redis,会自动启动redis服务,通过ps命令确认: ```shell $ ps -ef | grep redis redis 2717 1 0 13:24 ? 00:00:00 /usr/bin/redis-server 127.0.0.1:6379 ``` 可以看到redis默认工作在本地主机的6379端口上。 ### redis发布-订阅相关命令 redis首先是一个强大的缓存服务器,比memcache强大很多,不仅仅支持多种数据结构(不像memcache 只能存储字符串)如字符串、list列表、set集合、map映射表等结构,还可以支持数据的持久化存储 (memcache只支持内存存储),经常被应用到高并发的服务器环境设计之中。 启动redis-cli客户端,连接redis server体验一下数据缓存功能,如下: redis存储普通key-value: ```shell $ redis-cli 127.0.0.1:6379> set 1 "zhang san" # 设置key-value OK 127.0.0.1:6379> get 1 "zhang san" 127.0.0.1:6379> set num 1 OK 127.0.0.1:6379> INCR num # redis本身支持事务处理,多线程对key自增自减是线程安全的 (integer) 2 127.0.0.1:6379> INCR num (integer) 3 ``` ```shell $ redis-cli 127.0.0.1:6379> subscribe 22 Reading messages... (press Ctrl-c to quit) 1) "subscribe" 2)"22" 3)(interger) 1 1) "message" 2) "22" 3) "hello" # 另一个客户端 $ redis-cli 127.0.0.1:6379> publish 22 "hello" ``` ## redis发布-订阅的客户端编程 redis支持多种不同的客户端编程语言,例如Java对应jedis、php对应phpredis、C++对应的则是 hiredis。 下面是安装hiredis的步骤: ```shell # 从github上下载hiredis客户端,进行源码 编译安装 $ git clone https://github.com/redis/hiredis 正克隆到 'hiredis'... remote: Enumerating objects: 3261, done. ^C收对象中: 83% (2707/3261), 876.01 KiB | 59.00 KiB/s $ cd hiredis $ make cc -std=c99 -c -O3 -fPIC -Wall -Wextra -Wstrict-prototypes -Wwrite-strings -Wno-missing-field-initializers -Werror -g -ggdb -pedantic alloc.c cc -std=c99 -c -O3 -fPIC -Wall -Wextra -Wstrict-prototypes -Wwrite-strings -Wno-missing-field-initializers -Werror -g -ggdb -pedantic net.c cc -std=c99 -c -O3 -fPIC -Wall -Wextra -Wstrict-prototypes -Wwrite-strings -Wno-missing-field-initializers -Werror -g -ggdb -pedantic hiredis.c cc -std=c99 -c -O3 -fPIC -Wall -Wextra -Wstrict-prototypes -Wwrite-strings -Wno-missing-field-initializers -Werror -g -ggdb -pedantic sds.c cc -std=c99 -c -O3 -fPIC -Wall -Wextra -Wstrict-prototypes -Wwrite-strings -Wno-missing-field-initializers -Werror -g -ggdb -pedantic async.c cc -std=c99 -c -O3 -fPIC -Wall -Wextra -Wstrict-prototypes -Wwrite-strings -Wno-missing-field-initializers -Werror -g -ggdb -pedantic read.c cc -std=c99 -c -O3 -fPIC -Wall -Wextra -Wstrict-prototypes -Wwrite-strings -Wno-missing-field-initializers -Werror -g -ggdb -pedantic sockcompat.c cc -shared -Wl,-soname,libhiredis.so.1.3.0 -o libhiredis.so alloc.o net.o hiredis.o sds.o async.o read.o sockcompat.o ar rcs libhiredis.a alloc.o net.o hiredis.o sds.o async.o read.o sockcompat.o cc -std=c99 -c -O3 -fPIC -Wall -Wextra -Wstrict-prototypes -Wwrite-strings -Wno-missing-field-initializers -Werror -g -ggdb -pedantic test.c cc -o hiredis-test -O3 -fPIC -Wall -Wextra -Wstrict-prototypes -Wwrite-strings -Wno-missing-field-initializers -Werror -g -ggdb -pedantic -I. test.o libhiredis.a Generating hiredis.pc for pkgconfig... $ sudo make install mkdir -p /usr/local/include/hiredis /usr/local/include/hiredis/adapters /usr/local/lib cp -pPR hiredis.h async.h read.h sds.h alloc.h sockcompat.h /usr/local/include/hiredis cp -pPR adapters/*.h /usr/local/include/hiredis/adapters cp -pPR libhiredis.so /usr/local/lib/libhiredis.so.1.3.0 cd /usr/local/lib && ln -sf libhiredis.so.1.3.0 libhiredis.so && ln -sf libhiredis.so.1.3.0 libhiredis.so.1 cp -pPR libhiredis.a /usr/local/lib mkdir -p /usr/local/lib/pkgconfig cp -pPR hiredis.pc /usr/local/lib/pkgconfig # 拷贝生成的动态库到/usr/local/lib目录下! $ sudo ldconfig /user/local/lib ``` # 数据库设计 public.hpp定义了 ```cpp #ifndef PUBLIC_H #define PUBLIC_H /* server和client的公共文件 */ enum EnMsgType { LOGIN_MSG = 1, // 登陆消息 LOGIN_MSG_ACK, // 登录响应消息 REG_MSG, // 注册消息 REG_MSG_ACK, // 注册响应消息 ONE_CHAT_MSG, // 聊天消息 GROUP_CHAT_MSG // }; // 如果没有显式指定值,会按照前一个成员的值自动递增 1。 #endif ``` ```telnet {"msgid":3,"name":"lisi","password":"111111"} // 注册 {"msgid":3,"name":"zhangsan","password":"123456"} {"msgid":1,"id":22,"password":"123456"} // 登录 {"msgid":1,"id":23,"password":"111111"} {"msgid":5,"id":22,"from":"zhangsan","to":23,"msg":"hello!"} // 发送消息 在线conn,不在线存入离线消息表 {"msgid":5,"id":23,"from":"lisi","to":22,"msg":"hi!"} // 添加好友 {"msgid":6,"id":22,"friendid":23} friend和user select a.id,a.name,a.state from user a inner join friend b on b.friendid=a.id where b.userid = %d; ``` ```mysql DROP database if EXISTS mychat; CREATE database mychat; use mychat; ``` ## 表设计 +--------------------------+ | Tables_in_chat | +--------------------------+ | `allgroup` | | `friend` | | `groupuser` | | `offlinemessage` | | `user ` | +--------------------------+ **User表** | 字段名称 | 字段类型 | 字段说明 | 约束 | | -------- | ------------------------- | ------------ | --------------------------- | | id | INT | 用户id | PRIMARY KEY、AUTO_INCREMENT | | name | VARCHAR(50) | 用户名 | NOT NULL, UNIQUE | | password | VARCHAR(50) | 用户密码 | NOT NULL | | state | ENUM('online', 'offline') | 当前登录状态 | DEFAULT 'offline' | ```mysql DROP TABLE IF EXISTS `user`; CREATE TABLE `user` ( `id` int(11) NOT NULL AUTO_INCREMENT, `name` varchar(50) DEFAULT NULL, `password` varchar(50) DEFAULT NULL, `state` enum('online','offline') CHARACTER SET latin1 DEFAULT 'offline', PRIMARY KEY (`id`), UNIQUE KEY `name` (`name`) ) ENGINE=InnoDB AUTO_INCREMENT=22 DEFAULT CHARSET=utf8; insert into user values(1,"zhangsan","123456") ``` **Friend表** | 字段名称 | 字段类型 | 字段说明 | 约束 | | -------- | -------- | -------- | ------------------ | | userid | INT | 用户id | NOT NULL、联合主键 | | friendid | INT | 好友id | NOT NULL、联合主键 | ```mysql DROP TABLE IF EXISTS `friend`; CREATE TABLE `friend` ( `userid` int(11) NOT NULL, `friendid` int(11) NOT NULL, KEY `userid` (`userid`,`friendid`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; ``` **AllGroup表** | 字段名称 | 字段类型 | 字段说明 | 约束 | | --------- | ------------ | ---------- | --------------------------- | | id | INT | 组id | PRIMARY KEY、AUTO_INCREMENT | | groupname | VARCHAR(50) | 组名称 | NOT NULL, UNIQUE | | groupdesc | VARCHAR(200) | 组功能描述 | DEFAULT '' | ```mysql DROP TABLE IF EXISTS `allgroup`; CREATE TABLE `allgroup` ( `id` int(11) NOT NULL AUTO_INCREMENT, `groupname` varchar(50) CHARACTER SET latin1 NOT NULL, `groupdesc` varchar(200) CHARACTER SET latin1 DEFAULT '', PRIMARY KEY (`id`), UNIQUE KEY `groupname` (`groupname`) ) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8; ``` **GroupUser表** | 字段名称 | 字段类型 | 字段说明 | 约束 | | --------- | ------------------------- | -------- | ------------------ | | groupid | INT | 组id | NOT NULL、联合主键 | | userid | INT | 组员id | NOT NULL、联合主键 | | grouprole | ENUM('creator', 'normal') | 组内角色 | DEFAULT 'normal' | ```mysql DROP TABLE IF EXISTS `groupuser`; CREATE TABLE `groupuser` ( `groupid` int(11) NOT NULL, `userid` int(11) NOT NULL, `grouprole` enum('creator','normal') CHARACTER SET latin1 DEFAULT NULL, KEY `groupid` (`groupid`,`userid`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; ``` **OfflineMessage表** | 字段名称 | 字段类型 | 字段说明 | 约束 | | -------- | ----------- | -------------------------- | -------- | | userid | INT | 用户id | NOT NULL | | message | VARCHAR(50) | 离线消息(存储Json字符串) | NOT NULL | ```mysql DROP TABLE IF EXISTS `offlinemessage`; CREATE TABLE `offlinemessage` ( `userid` int(11) NOT NULL, `message` varchar(500) NOT NULL ) ENGINE=InnoDB DEFAULT CHARSET=latin1; ``` # MySQL数据库环境搭建和编程 ## MySQL环境安装设置 ubuntu环境安装mysql-server和mysql开发包,包括mysql头文件和动态库文件,命令如下: ```shell sudo apt-get install mysql-server =》 安装最新版MySQL服务器 sudo apt-get install libmysqlclient-dev =》 安装开发包 ``` ubuntu默认安装最新的mysql,但是初始的用户名和密码是自动生成的,按下面步骤修改mysql的root用 户密码为123456 ```shell :~$ sudo cat /etc/mysql/debian.cnf [client] host = localhost user = debian-sys-maint 《============== 初始的用户名 password = Kk3TbShbFNvjvhpM 《=============== 初始的密码 socket = /var/run/mysqld/mysqld.sock # 用上面初始的用户名和密码,登录mysql server,修改root用户的密码,命令如下: :~$ mysql -u debian-sys-maint -pKk3TbShbFNvjvhpM # 命令解释: -u后面是上面查看的用户名 -p后面紧跟上面查看的密码 mysql> update mysql.user set authentication_string=password('123456') where user='root' and host='localhost'; mysql> update mysql.user set plugin="mysql_native_password"; mysql> flush privileges; Query OK, 0 rows affected (0.01 sec) mysql> exit Bye ``` 重新用root和123456登录mysql-server ```shell :~$ mysql -u root -p123456 mysql> ``` 设置MySQL字符编码utf-8,可以支持中文操作 ```shell mysql> show variables like "char%"; # 先查看MySQL默认的字符编码 +--------------------------+----------------------------+ | Variable_name | Value | +--------------------------+----------------------------+ | character_set_client | utf8 | | character_set_connection | utf8 | | character_set_database | latin1 | | character_set_filesystem | binary | | character_set_results | utf8 | | character_set_server | latin1 | 《============不支持中文!!! | character_set_system | utf8 | | character_sets_dir | /usr/share/mysql/charsets/ | +--------------------------+----------------------------+ 8 rows in set (0.06 sec) mysql> set character_set_server=utf8; Query OK, 0 rows affected (0.00 sec) ``` 修改表的字符编码:alter table user default character set utf8; 修改属性的字符编码:alter table user modify column name varchar(50) character set utf8; ## MySQL数据库编程 ```cpp // 数据库配置信息 static string server = "127.0.0.1"; static string user = "root"; static string password = "123456"; static string dbname = "chat"; // 数据库操作类 class MySQL{ public: // 初始化数据库连接 MySQL(){ _conn = mysql_init(nullptr);} // 释放数据库连接资源 ~MySQL(){ if (_conn != nullptr) mysql_close(_conn); } // 连接数据库 bool connect(){ MYSQL *p = mysql_real_connect(_conn, server.c_str(), user.c_str(), password.c_str(), dbname.c_str(), 3306, nullptr, 0); if (p != nullptr){ mysql_query(_conn, "set names gbk"); } return p; } // 更新操作 bool update(string sql){ if (mysql_query(_conn, sql.c_str())){ LOG_INFO << __FILE__ << ":" << __LINE__ << ":" << sql << "更新失败!"; return false; } return true; } // 查询操作 MYSQL_RES* query(string sql){ if (mysql_query(_conn, sql.c_str())){ LOG_INFO << __FILE__ << ":" << __LINE__ << ":" << sql << "查询失败!"; return nullptr; } return mysql_use_result(_conn); } private: MYSQL *_conn; }; ``` 这里用UserModel示例,通过UserModel如何对业务层封装底层数据库的操作。代码示例如下: ```cpp class UserModel { public: // 重写add接口方法,实现增加用户操作 bool add(UserDO &user) { // 组织sql语句 char sql[1024] = { 0 }; sprintf(sql, "insert into user(name,password,state) values('%s', '%s','%s')", user.getName().c_str(), user.getPwd().c_str(), user.getState().c_str()) MySQL mysql; if (mysql.connect()) { if (mysql.update(sql)) { LOG_INFO << "add User success => sql:" << sql; return true; } } LOG_INFO << "add User error => sql:" << sql; return false; } } ``` ## 构建项目 创建数据库 ```ini # 在项目目录中进行 # 连接MySQL mysql -u root -ppassward # 创建数据库 create database chat; # 执行数据库脚本创建表 source chat.sql ``` 执行脚本构建项目 ```ini sudo apt install libhiredis-dev sudo apt install libmysqlclient-dev bash build.sh ``` ## 执行生成文件 ```ini # 在终端 # 启动服务端 cd ./bin ./ChatServer 6000 ``` ```ini # 启动客户端 cd ./bin ./ChatClient 127.0.0.1 6000 ``` ### 业务 ```cpp // chatservice.hpp #ifndef CHATSERVICE_H #define CHATSERVICE_H #include #include #include // 绑定器 bind #include //互斥锁 // #include // 智能指针 using namespace std; using namespace muduo; using namespace muduo::net; #include "usermodel.hpp" #include "offlinemessagemodel.hpp" #include "friendmodel.hpp" #include "groupmodel.hpp" #include "json.hpp" using json = nlohmann::json; // 表示处理消息的事件回调方法类型 using MsgHandler = std::function; // 聊天服务器业务类 class ChatService { public: static ChatService *instance(); // 获取单例对象的接口函数 void login(const TcpConnectionPtr &conn, json js, Timestamp time); // 处理登录业务 void reg(const TcpConnectionPtr &conn, json js, Timestamp time); // 处理注册业务 void oneChat(const TcpConnectionPtr &conn, json js, Timestamp time); // 一对一聊天业务 void addFriend(const TcpConnectionPtr &conn, json js, Timestamp time); // 添加好友业务 MsgHandler getHandler(int msgid); // 获取消息对应的处理器 void reset(); // 服务器异常业务重置方法 void clientCloseException(const TcpConnectionPtr &conn); // 处理客户端异常退出 void createGroup(const TcpConnectionPtr &conn, json js, Timestamp time); // 创建群组业务 void addGroup(const TcpConnectionPtr &conn, json js, Timestamp time); // 加入群组业务 void groupChat(const TcpConnectionPtr &conn, json js, Timestamp time); // 群组聊天业务 private: ChatService(); // 存储消息id和其对应的业务处理方法 unordered_map _msgHandlerMap; // 存储在线用户的通信连接 // 会多线程调用, 需要注意线程安全问题 unordered_map _userConnMap; // 定义互斥锁,保证_usercornMap的线程安全 mutex _connMutex; // 数据操作类对象 UserModel _userMdoel; OfflineMsgModel _offlineMsgModel; FriendModel _friendModel; GroupModel _groupModel; }; #endif ``` 用户注册 用户登录 添加好友 服务器异常业务重置方法 客户端异常退出 用户P2P 聊天 离线消息 创建群 加群 群组聊天 # 中间调试 ```shell $ gdb ChatServer (gdb) break chatservice.cpp:41 # 打断点 (gdb) run (gdb) n # 下一步 (gdb) quit # 另一个终端 $ telnet 127.0.0.1 6000 ``` ```shell $ gdb ./ChatServer 127.0.0.1 6000 (gdb) run (gdb) quit # 另一个终端 $ ./ChatClient 127.0.0.1 8000 ``` ```shell $ ps -u $ gdb attach 程序的PID (gdb) info threads # 查看线程数量 ``` # 出现过的问题 1. chatserver.hpp中 EventLoop *_loop 定义顺序修改: ```cpp EventLoop *_loop; // 指向事件循环对象的指针 TcpServer _server; // 组合的muduo库,实现服务器功能的类对象 ``` 2. offlinemessagemodel.cpp中query() ```cpp // 把userid用户的所有离线消息放入vec中返回 MYSQL_ROW row; while((row = mysql_fetch_row(res)) != nullptr) { vec.push_back(row[0]); } mysql_free_result(res); ``` 3. 发送离线消息发送一条之后,在进行发送的时候就错误 数据库表结构问题 `offlinemessage` 表结构中,`userid` 和 `message` 一起作为联合主键(`PRI` 标识): ```sql | Field | Type | Null | Key | Default | Extra | |---------|--------------|------|-----|---------|-------| | userid | int(11) | NO | PRI | NULL | | | message | varchar(500) | NO | | NULL | | ``` 当执行 `insert` 操作时,如果 **同一 `userid` 再次插入新的 `message`**,由于主键的唯一性约束(`userid`),数据库会拒绝插入,报主键冲突错误,导致 “更新错误” 假象(实际是插入失败)。 解决办法: - 业务需求是给同一个用户存多条离线消息,需要调整表结构: - 去掉 `userid` 的主键属性: ```sql DROP TABLE IF EXISTS `offlinemessage`; CREATE TABLE `offlinemessage` ( `userid` int(11) NOT NULL, `message` varchar(500) NOT NULL ) ENGINE=InnoDB DEFAULT CHARSET=latin1; ``` ```sql | Field | Type | Null | Key | Default | Extra | +---------+--------------+------+-----+---------+-------+ | userid | int | NO | | NULL | | | message | varchar(500) | NO | | NULL | | ``` 4. 客户端还未断开,服务器端意外停止,再次运行服务器的时候出现 ```shell 20250804 13:38:22.099922Z 52914 FATAL Address already in use (errno=98) sockets::bindOrDie - SocketsOps.cc:102 已放弃 (核心已转储) ``` 这个错误 `Address already in use (errno=98)` 表示**服务器要绑定的端口(如 6000)被占用**,导致重新启动服务器时无法绑定端口而崩溃。 **问题原因** 当服务器意外断开(如崩溃、强制杀死进程)时,操作系统会保留端口一段时间(处于 `TIME_WAIT` 状态),防止残留的网络数据包干扰新连接。此时重新启动服务器,会因端口被占用而绑定失败。 **解决办法:** 手动释放被占用的端口(推荐) ```bash # 查找占用6000端口的进程 sudo netstat -tulpn | grep 6000 ``` 输出: ```plaintext tcp 0 0 127.0.0.1:6000 0.0.0.0:* LISTEN 52401/./ChatServer ``` 杀死进程: ```shell kill -9 52401 ``` 之后重新启动服务器即可。 5. 加入redis 发布-订阅消息队列进行集群之后 进行登录就发生错误,显示json数据格式问题 ```cpp // 独立线程中接收订阅通道的消息 void Redis::observer_channel_message() { redisReply *reply = nullptr; while (REDIS_OK == redisGetReply(subcribe_context_, (void **)&reply)) { // reply里面是返回的数据有三个,0. message , 1.通道号,2.消息 if (reply != nullptr && reply->element[2] != nullptr && reply->element[2]->str != nullptr) { // 给业务层上报通道上发送的消息 _notify_message_handler(atoi(reply->element[1]->str), reply->element[2]->str); // notify_message_handler_(atoi(reply->element[1]->str), reply->element[2]->str); } freeReplyObject(reply); } cerr << "----------------------- oberver_channel_message quit--------------------------" << endl; } ``` **数组越界访问**:如果返回的不是 **3 个元素**(如 `subscribe`, `unsubscribe`, `message` 之外的情况),就会 **越界访问**。 在进行消息订阅的时候`redisGetReply()` 这次返回的是 **一条简单字符串**(`REDIS_REPLY_STRING`),而你的代码假设它一定是 **数组** (`REDIS_REPLY_ARRAY`),导致 **类型不匹配**。 ```cpp if (reply != nullptr && reply->element[2] != nullptr && reply->element[2]->str != nullptr) // 改为 if (reply != nullptr && reply->type == REDIS_REPLY_ARRAY && reply->elements >= 3 && reply->element[0] != nullptr && reply->element[0]->str != nullptr && strcmp(reply->element[0]->str, "message") == 0 && reply->element[1] != nullptr && reply->element[1]->str != nullptr && reply->element[2] != nullptr && reply->element[2]->str != nullptr) ``` # 集群 一台32位的linux聊天服务器大概的并发量 1024 - 20000;只能支持2W人左右同时聊天 想要提高服务器的并发能力,就要进行集群或者分布式部署,这里选择使用集群。在水平方向上有多台主机服务器进行提供服务。 两个问题 **1.ChatServer集群后怎么引入负载均衡器** 负载均衡器(反向代理设备) - 把client的请求按照负载算法分发到具体的业务服务器ChatServer上面 - 能够和ChatServer保持心跳机制,监测ChatServer故障 - 能够发现新添加的ChatServer设备,方便扩展服务器数量 选择nginx的tcp负载均衡模块 - 如何进行nginx源码编译,包含tcp负载均衡模块 - nginx.conf配置文件中如何配置负载均衡 - nginx的平滑加载配置文件启动 如果想要更大的并发服务时, 对于负载均衡器也可以进行集群 + 前端LVS **2.如何解决跨服务器通信问题** 引入中间消息队列,通过向消息队列发布、订阅消息,以及接收队列推送的消息来解决跨服务器聊天的问题 # 编译 ```shell cd build cmake .. make cd .. cd bin/ ls ``` 网络服务器的项目 四个模块: 网络模块- muduo 性能非常不错的开源网络库来设计的,好处解耦网络模块和业务模块代码,能够让开发者更专注于业务的开发 服务层 C++11的技术, map, bind绑定器,消息id和处理业务的回调函数绑定 数据存储层-mysql 单机服务下并发有限,考虑快速提升并发能力,支持多级扩展,使用nginx的tcp负载均衡做长连接,跨服务器的通信,引入redis作为消息队列的功能,利用其发布订阅来实现跨服务器通信 服务器中间件:MQ消息队列 redis基于内存的访问快、低延时, 非核心业务,流量不是非常大的场景 kafka 分布式的消息队列中间件,非常的稳定,高并发,高可用,可扩张性强 | 组件 | 描述 | | ----------------------- | ------------------------------------------------------------ | | **Producer** | 生产者,负责将消息发送到 Kafka 的某个主题(Topic) | | **Consumer** | 消费者,订阅并消费某个 Topic 的消息 | | **Broker** | Kafka 服务器节点,负责接收、存储和转发消息 | | **Topic** | 类似于消息队列的名字或频道,是消息的分类方式 | | **Partition** | Topic 中的分区,每个分区是有序且可持久化的日志结构 | | **Zookeeper**(旧架构) | Kafka 使用 Zookeeper 管理集群状态、Broker 元数据等(目前推荐使用 KRaft 模式,取代 Zookeeper) | zeromq rabbitmq rocketmq redis允许不稳定,挂掉怎么办??? 消息的消费不可靠,当进行发布的时候,如果通道没有被订阅,会出现消息的丢失 redis消息积累过快, 消费消息过慢,会导致崩溃 # 问题: ### 单机聊天服务器的瓶颈: 1.受限于硬件资源,聊天服务器所能承受的用户的并发量 2.任意模块的修改,都会导致整个项目代码重新编译、部署 3.系统中,有些模块是属于CPU密集型的,有些模块是属于I/O密集型的,造成各模块对于硬件资源的需求是不一样的 ### 集群聊天服务器 优点:用户的并发量提升了 缺点: - 项目代码还是需要整体重新编译,而且需要进行多次部署 - 有些业务不需要高并发