# 高并发内存池项目 **Repository Path**: eucalyptus-globulusyang/High-concurrent-memory-pool-project ## Basic Information - **Project Name**: 高并发内存池项目 - **Description**: 当前项目是实现一个高并发的内存池,他的原型是google的一个开源项目tcmalloc,tcmalloc全称 Thread-Caching Malloc,即线程缓存的malloc,实现了高效的多线程内存管理,用于替代系统的内存 分配相关的函数(malloc、free)。 - **Primary Language**: C++ - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 5 - **Forks**: 1 - **Created**: 2023-05-22 - **Last Updated**: 2025-03-16 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # 高并发内存池项目 # 项目介绍 ## 1.这个项目做的是什么? 当前项目是实现一个高并发的内存池,他的原型是google的一个开源项目tcmalloc,tcmalloc全称Thread-Caching Malloc,即线程缓存的malloc,实现了高效的多线程内存管理,用于替代系统的内存分配相关的函数(malloc、free)。 我们这个项目是把tcmalloc最核心的框架简化后拿出来,模拟实现出一个自己的高并发内存池,目的就是学习tcmalloc的精华,这种方式有点类似我之前学习STL容器的方式。但是相比STL容器部分,tcmalloc的代码量和复杂度上升了很多。 另一方面tcmalloc是全球大厂google开源的,可以认为当时顶尖的C++高手写出来的,他的知名度也是非常高的,不少公司都在用它,Go语言直接用它做了自己内存分配器。 ## 2.这个项目的要求的知识储备 这个项目会用到C/C++、数据结构(链表、哈希桶)、操作系统内存管理、单例模式、多线程、互斥锁等等方面的知识。 开发环境:win11、VS2019 > 注:此项目不涉及网络、数据库。 # 什么是内存池 ## 1. 池化技术 所谓“池化技术”,就是程序先向系统申请过量的资源,然后自己管理,以备不时之需。之所以要申请过量的资源,是因为每次申请该资源都有较大的开销,因此不如提前申请好了,这样使用时就会更为快捷,大大提高程序运行效率。 在计算机中,有很多使用“池”这种技术的地方,除了内存池,还有连接池,线程池,对象池等。以服务器上的线程池为例,它的主要思想是:先启动若干数量的线程,让他们处于睡眠状态,当接收到客户端的请求时,唤醒池中某个睡眠的线程,让它来处理客户端的需求,当处理完这个请求后,线程又进入睡眠状态。 ## 2. 内存池 内存池是指程序预先从操作系统申请一块足够大的内存,此后,当程序中需要申请内存的时候,不是直接向操作系统申请,而是直接从内存池中获取;同理,当程序释放内存的时候,并不真正将内存返回给操作系统,而是返回给内存池,当程序退出(或者特定时间)时,内存池才将之前申请的内存真正释放。 ## 3. 内存池主要解决的问题 1. 内存池主要解决的是效率上的问题 2. 其次如果作为系统的内存分配器的角度,还会解决内存碎片的问题。 **什么是内存碎片呢?** ![image-20230522204352730](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202305222043808.png) 当执行程序也就是存在进程时,会对应一个进程地址空间,而对于vector、map、mysql、list等容器,扩容或者申请时就开辟在这块进程地址空间的堆空间中,如上图在进程地址空间的堆上,vector申请256Byte,map申请256Byte,mysql申请512Byte,list申请128Byte。当vector和list对象销毁时,进程地址空间就会将vector和list在堆上占用的空间进行回收,也就是回收了256Byte+128Byte也就是384Byte,而如果我们仍然申请超过256Byte的空间,此时不会申请到,因为我们知道在堆上申请的是连续空间,而vector和list回收后的空间并不连续,所以这种不连续的内存被称为内存碎片。 *** 需要补充说明的是,内存碎片分为内碎片和外碎片,上面讲的是外碎片问题。外碎片是一些空闲的连续内存区域太小,这些内存空间不连续,以至于合计的内存足够,但是不能满足一些的内存分配申请需求。而内碎片是由于一些对齐的需求(比如结构体成员对齐,位段),导致分配出去的空间中一些内存无法被利用。内碎片问题,后面项目中会看到,到时会进行更准确的理解。 ## 4. malloc 在C/C++中,我们要动态申请内存都是通过malloc去申请内存,但是我们要知道,实际我们不是直接去堆获取内存的,这是因为malloc就是一个内存池。malloc()相当于向操作系统“批发”了一块较大的内存空间,然后“零售”给程序用。当全部“售完”或程序有大量的内存需求时,再根据实际需求向操作系统“进货”。malloc的实现方式有很多种,一般不同编译器平台用的都是不同的。比如windows的VS系列用的是微软自己写的一套,linux gcc用的glibc中的ptmalloc。 ![image-20230522211213541](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202305222112611.png) 下面有几篇关于这块的文章,大概可以去简单看看了解一下,关于ptmalloc,学完我们的项目以后,有兴趣大家可以去看看他的实现细节。 [一文了解,Linux内存管理,malloc、free 实现原理 (zhihu.com)](https://zhuanlan.zhihu.com/p/384022573) [malloc()背后的实现原理——内存池 ](http://azhao.net/index.php/archives/81/) [malloc的底层实现 (ptmalloc)](https://blog.csdn.net/z_ryan/article/details/79950737) 可以看出,malloc也是作为 内存池设计的,可以有效的防止内存碎片的产生,那此时,我们所说的tcmalloc,也就是高并发内存池为什么还被设计出来呢?因为tcmalloc除了解决内存碎片的问题,其效率比malloc(包括ptmalloc)更快,更强。 # 开胃菜--先设计一个定长的内存池 ## 1. 定长内存池原理 作为程序员(C/C++)我们知道申请内存使用的是malloc,malloc其实就是一个通用的大众货,什么场景下都可以用, **但是什么场景下都可以用就意味着什么场景下都不会有很高的性能** ,下面我们就先来设计一个定长内存池做个开胃菜,当然这个定长内存池在我们后面的高并发内存池中也是有价值的,所以学习他目的有两层,先熟悉一下简单内存池是如何控制的,第二他会作为我们后面内存池的一个基础组件。 ![image-20230522214141943](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202305222141979.png) *** **定长内存池:** 解决指定大小的空间这种纯粹的需求 **固定大小的内存池的设计特点:** 1. 性能达到极致 2. 不考虑内存碎片问题 ![image-20230522214233987](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202305222142020.png) 设计内存池时,不仅要考虑从内存池中申请空间,还要考虑回收空间时怎么办,为了解决回收空间的问题,出现了__freeList(自由链表)将还回的内存块挂起来,再要用的时候又可以还给你。 ## 2. 实现定长内存池 > 创建项目名字:ConcurrentMemoryPool 在建立类时,我们可以用非类型模板参数`template`的形式固定每一块内存池的大小,表示申请大小都为N。但也可以用另外一种方式:模板参数`template`,这是由于每次获取的都是T对象,T的大小一旦固定,也符合我们对定长内存池的预期,下面就来看看`ObjectPool`其结构以及内部成员。 `ObjectPool.h` ```cpp #pragma once #include using std::cout;//避免命名污染 using std::endl; ////定长内存池 //template//定长为N: 即申请大小都为N //class ObjectPool //{}; template //T大小 class ObjectPool { public: private: char* _memory = nullptr;//内存池头指针 void* _freeList = nullptr;//自由链表 }; ``` 对于`_memory`,作为内存池头指针,其功能如图: ![image-20230522221345986](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202305222213022.png) 如果切下来的50块使用后还回去,此时就用到自由链表`_freeList`,自由链表本身结点就代表被切下来的块,把要回收的下一块的地址用上一块的头四个字节(32位,64位就需要8个字节)存储起来,就可以将被回收的块进行链接起来,从而很好的进行管理。这样,最重要的两个成员变量`_memory`和`_freeList`就定义好了,并通过C++11中的缺省值初始化成nullptr,就可以不用在初始化列表进行初始化。 > 对于上图,拿走是形象的说法,即逻辑上的说法,因为我们只是把块的地址拿过去提供使用,所以物理上并没有将其切下来移动块的位置,仍然是连续的空间。 此时新增一个成员函数`New`用于从内存池中获取内存块: ```cpp #pragma once #include using std::cout;//避免命名污染 using std::endl; template //T大小 class ObjectPool { public: T* New() { //第一次进来,memory和list都为空 if (_memory == nullptr) { _memory = (char*)malloc(128 * 1024);//申请指定大小内存池,单位:KB if (_memory == nullptr) { throw std::bad_alloc(); } } T* obj = (T*)_memory; _memory += sizeof(T); return obj; } private: char* _memory = nullptr;//内存池头指针 void* _freeList = nullptr;//自由链表 }; ``` 第一次`_memory`为nullptr,需要申请空间。如果申请失败,则用C++的处理方式:抛异常。(C的方式也可以:printf("错误信息"); exit(1);) ![image-20230523095800681](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202305230958746.png) 可以发现,我们通过`obj`的地址拿到可用空间,与此同时`_memory`也在向后移动,但是当内存池用光了也就是`_memory`到内存池的终点位置时,obj就无法继续申请,而`_memory`此时不为nullptr,因为内存池之后本身就不是nullptr,只不过后面的位置不属于内存池而已,因此,为了解决_memory这个临界条件让内存池用光之后不再支持申请,New函数还需完善,我们还需添加新的成员变量。 *** **添加新的成员变量:** `_remainBytes` 成员变量解释:`_remainBytes`表示申请的内存池没被用过的内存大小(单位:Byte),有了新的成员变量`_remainByte`后我们在第一次申请内存池时,就不必把条件设置成`_memory=nullptr`,而是对`_remainBytes`设置条件,这样就可以规避`_memory`在用尽内存池空间时继续申请内存块的情况。而`_remainBytes`,如果条件设置成`if(_remainBytes == 0)`,那么当剩下的空间不足以申请`sizeof(T)`(单位)大小,此方式仍会越界,因此要把条件设置为`if(_remainBytes < sizeof(T))`。 ```cpp #pragma once #include using std::cout;//避免命名污染 using std::endl; #define NUM 128*1024 template //T大小 class ObjectPool { public: T* New() { //剩余内存不够一个对象大小时,则重新开大块空间 if (_remainBytes < sizeof(T)) { _remainBytes = NUM; _memory = (char*)malloc(_remainBytes);//KB if (_memory == nullptr) { throw std::bad_alloc(); } } T* obj = (T*)_memory; _memory += sizeof(T); _remainBytes -= sizeof(T); return obj; } private: char* _memory = nullptr;//指向大块内存池的指针 size_t _remainBytes = 0;//大块内存在切分过程中剩余的字节数 void* _freeList = nullptr;//还回来过程中链接的自由链表的头指针 }; ``` *** 申请空间New解决之后,对应的还有释放空间的Delete()函数。(此时就会用到成员变量:`_freeList`) > 回顾多态:在学习多态时,存在一个虚表指针,在类中的前4/8个字节,如果有n个虚函数,则将这个n个虚函数依次链接起来,链接的过程中,地址存在前4/8个字节。 因此对于`_freeList`,未开始回收内存块时,其为nullptr,若有内存块需要回收,则需要将这个内存块链接起来,并将前4/8个字节填充下一个结点(也就是内存块或者nullptr)的地址。以`_freeList`第一次链接为例,其链接的内存块的头4/8个字节需要指向nullptr,也就是赋值为nullptr,那么如何确定一个指针是几个字节呢?有两种方式,其一是在某一个位置`size_t n = sizeof(void*)`,将返回的值判断是4还是8,通过if语句分别编写代码,但是这样看起来有点麻烦,并且有代码冗余,因此有一种更好的方式去解决判断指针的大小。 **回顾二级指针:** 在学习指针时,我们知道对于`int*`,指向的内容为int大小,而对于`int**`,指向的就是`int*`的大小,因此,如果我们想用一个指针大小的空间存储地址,那么就需要得到这个指针的大小,所以对于obj指针,我们通过强转为二级指针的方式,其指向的空间大小就为一个指针大小,这样就可以有效的处理4或者8字节的问题,如果指针(地址)大小是4字节,那么`int**`指向的空间就为4;如果是8字节,`int**`指向的空间就为8。对于二级指针,其指向的是一级指针的大小,所以无论是什么类型,都没有区别,下面就用void类型。 > 注:强转之后值不变,只不过看待方式变成了二级指针,解引用时存储内容的空间变成了一级指针的大小而已。 ```cpp #pragma once #include using std::cout;//避免命名污染 using std::endl; #define NUM 128*1024 template //T大小 class ObjectPool { public: T* New() { //剩余内存不够一个对象大小时,则重新开大块空间 if (_remainBytes < sizeof(T)) { _remainBytes = NUM; _memory = (char*)malloc(_remainBytes);//KB if (_memory == nullptr) { throw std::bad_alloc(); } } T* obj = (T*)_memory; _memory += sizeof(T); _remainBytes -= sizeof(T); return obj; } void Delete(T* obj) { if (_freeList == nullptr) { _freeList = obj; *(void**)obj = nullptr; } else { //头插 *(void**)obj = _freeList; _freeList = obj; } } private: char* _memory = nullptr;//指向大块内存池的指针 size_t _remainBytes = 0;//大块内存在切分过程中剩余的字节数 void* _freeList = nullptr;//还回来过程中链接的自由链表的头指针 }; ``` 对于第一次插入,`_freeList`为空,因此要将`_freeList`赋值为obj,并让obj的前4/8个字节指向nullptr,通过`*(void**)obj = nullptr;`将obj强转为二级指针从而确定存储下一个内存块空间的地址的大小,在解引用为一级指针将下一块地址赋值给obj,上述代码就完成了。 ![image-20230523112432471](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202305231124510.png) 可以发现,当一个结点都没有时,采用头插的方式是一模一样的,所以上述代码可以简化一下: ```cpp void Delete(T* obj) { //头插 *(void**)obj = _freeList; _freeList = obj; } ``` 回过头来分析,在切分内存池的过程中,有内存块还回来链接到`_freeList`中,若要继续申请,还回来的`_freeList`的内存块还可以继续使用,就没必要继续切割内存池了。 ![image-20230523134647287](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202305231346342.png) 所以`New()`中可以补充: ```cpp T* New() { T* obj = nullptr; //优先把还回来内存块对象,再次重复利用 if (_freeList) { void* next = *((void**)_freeList); obj = (T*)_freeList; _freeList = next; return obj; } else { //剩余内存不够一个对象大小时,则重新开大块空间 if (_remainBytes < sizeof(T)) { _remainBytes = NUM; _memory = (char*)malloc(_remainBytes);//KB if (_memory == nullptr) { throw std::bad_alloc(); } } obj = (T*)_memory; _memory += sizeof(T); _remainBytes -= sizeof(T); return obj; } } ``` *** **存在的问题:** **1、特殊场景的空间问题** 这段代码在一定场景下存在保存地址空间不够的问题。也就是每一块空间至少需要足够存下一块的地址,如果T为char或者int类型并且在64位环境下,那么每一块的空间就小于存放下一块地址的空间,因此在赋值时出现这种情况我们就不能只给sizeof(T)的大小,而是至少给sizeof(void*)的大小。 ```cpp T* New() { T* obj = nullptr; //优先把还回来内存块对象,再次重复利用 if (_freeList) { void* next = *((void**)_freeList); obj = (T*)_freeList; _freeList = next; } else { //剩余内存不够一个对象大小时,则重新开大块空间 if (_remainBytes < sizeof(T)) { _remainBytes = NUM; _memory = (char*)malloc(_remainBytes);//KB if (_memory == nullptr) { throw std::bad_alloc(); } } obj = (T*)_memory; size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T); _memory += objSize; _remainBytes -= objSize; } return obj; } ``` **2、定位new** 如果T是自定义类型,我们需要调用其构造函数进行初始化,这时候为了在指定的空间进行构造,可以采用定位new的方式进行初始化,同时显示调用析构函数清理对象。 定位new的知识总结:[C++定位new]([(102条消息) 【C++修炼之路】定位new(项目记录)_每天都要进步呀~的博客-CSDN博客](https://blog.csdn.net/NEFUT/article/details/130825919?spm=1001.2014.3001.5501)) ```cpp #pragma once #include using std::cout;//避免命名污染 using std::endl; #define NUM 128*1024 template //T大小 class ObjectPool { public: T* New() { T* obj = nullptr; //优先把还回来内存块对象,再次重复利用 if (_freeList) { void* next = *((void**)_freeList); obj = (T*)_freeList; _freeList = next; } else { //剩余内存不够一个对象大小时,则重新开大块空间 if (_remainBytes < sizeof(T)) { _remainBytes = NUM; _memory = (char*)malloc(_remainBytes);//KB if (_memory == nullptr) { throw std::bad_alloc(); } } obj = (T*)_memory; size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T); _memory += objSize; _remainBytes -= objSize; } // 定位new, 显示调用T的构造函数初始化 new(obj)T; return obj; } void Delete(T* obj) { //显示调用析构函数清理对象 obj->~T(); //头插 *(void**)obj = _freeList; _freeList = obj; } private: char* _memory = nullptr;//指向大块内存池的指针 size_t _remainBytes = 0;//大块内存在切分过程中剩余的字节数 void* _freeList = nullptr;//还回来过程中链接的自由链表的头指针 }; ``` ## 3. 定长内存池效率测试 `ObjectPool.h` ```cpp #pragma once #include #include #include using std::cout;//避免命名污染 using std::endl; ////定长内存池 //template//定长为N: 即申请大小都为N //class ObjectPool //{}; #define NUM 128*1024 template //T大小 class ObjectPool { public: T* New() { T* obj = nullptr; //优先把还回来内存块对象,再次重复利用 if (_freeList) { void* next = *((void**)_freeList); obj = (T*)_freeList;//obj需要强转一下,否则编译错误,强转是因为后续会解引用 _freeList = next; } else { //剩余内存不够一个对象大小时,则重新开大块空间 if (_remainBytes < sizeof(T)) { _remainBytes = NUM; _memory = (char*)malloc(_remainBytes);//KB if (_memory == nullptr) { throw std::bad_alloc(); } } obj = (T*)_memory; size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T); _memory += objSize; _remainBytes -= objSize; } // 定位new, 显示调用T的构造函数初始化 new(obj)T; return obj; } void Delete(T* obj) { //显示调用析构函数清理对象 obj->~T(); //头插 *(void**)obj = _freeList; _freeList = obj; } private: char* _memory = nullptr;//指向大块内存池的指针 size_t _remainBytes = 0;//大块内存在切分过程中剩余的字节数 void* _freeList = nullptr;//还回来过程中链接的自由链表的头指针 }; struct TreeNode { int _val; TreeNode* _left; TreeNode* _right; TreeNode() :_val(0) , _left(nullptr) , _right(nullptr) {} }; void TestObjectPool() { // 申请释放的轮次 const size_t Rounds = 3; // 每轮申请释放多少次 const size_t N = 1000000; std::vector v1; v1.reserve(N); size_t begin1 = clock(); for (size_t j = 0; j < Rounds; ++j) { for (int i = 0; i < N; ++i) { v1.push_back(new TreeNode); } for (int i = 0; i < N; ++i) { delete v1[i]; } v1.clear(); } size_t end1 = clock(); std::vector v2; v2.reserve(N); ObjectPool TNPool; size_t begin2 = clock(); for (size_t j = 0; j < Rounds; ++j) { for (int i = 0; i < N; ++i) { v2.push_back(TNPool.New()); } for (int i = 0; i < 100000; ++i) { TNPool.Delete(v2[i]); } v2.clear(); } size_t end2 = clock(); cout << "new cost time:" << end1 - begin1 << endl; cout << "object pool cost time:" << end2 - begin2 << endl; } ``` `Test.cpp` ```cpp #include"ObjectPool.h" int main() { TestObjectPool(); return 0; } ``` 通过改变每轮申请释放的次数并在debug和release下面分别测试,可以看出我们设计的定长内存池效率远远高于new,new的底层也是malloc。 ![image-20230523145923780](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202305231459825.png) *** 但需要注意的是malloc也实际上也是在操作系统层面之上进行的封装,那我们亦可以跳过malloc,直接调用操作系统的接口,而Windows和Linux在接口上也有不同: [Windows下的VirtualAlloc](https://baike.baidu.com/item/VirtualAlloc/1606859?fr=aladdin) [Linux下的brk和mmap](https://www.cnblogs.com/vinozly/p/5489138.html) ![image-20230522211213541](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202305222112611.png) 由于我采用的环境是Windows11,所以在这测试一下Windows的接口 ```cpp #pragma once #include #include #include using std::cout;//避免命名污染 using std::endl; #ifdef _WIN32 #include #else // #endif //直接去堆上按页申请空间 inline static void* SystemAlloc(size_t kpage) { #ifdef _WIN32 void* ptr = VirtualAlloc(0, kpage << 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE); #else // linux下brk mmap等 #endif if (ptr == nullptr) throw std::bad_alloc(); return ptr; } template //T大小 class ObjectPool { public: T* New() { T* obj = nullptr; //优先把还回来内存块对象,再次重复利用 if (_freeList) { void* next = *((void**)_freeList); obj = (T*)_freeList;//obj需要强转一下,否则编译错误,强转是因为后续会解引用 _freeList = next; } else { //剩余内存不够一个对象大小时,则重新开大块空间 if (_remainBytes < sizeof(T)) { _remainBytes = 128 * 1024; //_memory = (char*)malloc(_remainBytes); _memory = (char*)SystemAlloc(_remainBytes >> 13);//K if (_memory == nullptr) { throw std::bad_alloc(); } } obj = (T*)_memory; size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T); _memory += objSize; _remainBytes -= objSize; } // 定位new, 显示调用T的构造函数初始化 new(obj)T; return obj; } void Delete(T* obj) { //显示调用析构函数清理对象 obj->~T(); //头插 *(void**)obj = _freeList; _freeList = obj; } private: char* _memory = nullptr;//指向大块内存池的指针 size_t _remainBytes = 0;//大块内存在切分过程中剩余的字节数 void* _freeList = nullptr;//还回来过程中链接的自由链表的头指针 }; struct TreeNode { int _val; TreeNode* _left; TreeNode* _right; TreeNode() :_val(0) , _left(nullptr) , _right(nullptr) {} }; void TestObjectPool() { // 申请释放的轮次 const size_t Rounds = 3; // 每轮申请释放多少次 const size_t N = 100000; std::vector v1; v1.reserve(N); size_t begin1 = clock(); for (size_t j = 0; j < Rounds; ++j) { for (int i = 0; i < N; ++i) { v1.push_back(new TreeNode); } for (int i = 0; i < N; ++i) { delete v1[i]; } v1.clear(); } size_t end1 = clock(); std::vector v2; v2.reserve(N); ObjectPool TNPool; size_t begin2 = clock(); for (size_t j = 0; j < Rounds; ++j) { for (int i = 0; i < N; ++i) { v2.push_back(TNPool.New()); } for (int i = 0; i < 100000; ++i) { TNPool.Delete(v2[i]); } v2.clear(); } size_t end2 = clock(); cout << "new cost time:" << end1 - begin1 << endl; cout << "object pool cost time:" << end2 - begin2 << endl; } ``` ![image-20230523152436428](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202305231524473.png) 这样就完成了定长内存池完全脱离了malloc,此接口不能自动回收资源,但也无关紧要,因为进程结束之后,其对应的进程地址空间全部会被销毁。 # 高并发内存池整体框架设计 单CPU单线程达到极速,相比多核多线程的速度也是自惭形秽。多线程可以在单个CPU并行的运行,使得开发效率达到最大化。现代很多的开发环境都是多核多线程,在申请内存的场景下,线程之间必然存在激烈的锁竞争问题。malloc本身已经很优秀,即malloc作为内存池也可以解决内存碎片问题。但是在并发情况下,malloc虽然可以正常的运行,但却需要加众多的锁,影响性能。而我们项目的原型tcmalloc就是在多线程高并发的场景下更胜一筹,所以这次实现的内存池需要考虑以下几个方面的问题。 1. 性能问题(malloc很快,但tcmalloc更快) 2. 多线程环境下,锁竞争问题(tcmallo相比malloc而言最突出的优势) 3. 内存碎片问题(malloc作为内存池也可以处理,没什么区别) concurrent memory pool主要由以下三个部分构成: 1. **thread cache:** 线程缓存是每个线程独有的,进程里面有几个线程就会创建几个线程缓存,用于小于256KB(20多万字节)的内存分配, **线程从这里申请内存不用加锁,每个线程独享一个cache,这也就是这个并发线程池高效的地方。**(malloc上来就要加锁,而tcmalloc存在线程缓存则不用) 2. **central cache:** 中心缓存是所有线程所共享,thread cache是按需从central cache中获取的对象。central cache合适的时机回收thread cache中的对象,避免一个线程占用了太多的内存,而其他线程的内存吃紧,达到内存分配在多个线程中更均衡的按需调度的目的。central cache是存在竞争的,所以从这里取内存对象需要加锁。central内部采用哈希桶的数据结构,这也就意味着只要不同的线程不是同时向central cache中的同一个哈希桶获取内存,就不需要加锁。此外也只有在不同线程的thread cache同时没有内存对象时才会找central cache时进行加锁。有了这两方面的处理,锁的竞争就不会很激烈。 3. **page cache:** 页缓存是在central cache缓存上面的一层,存储的内存是以页为单位存储以及分配的,central cache没有内存对象时,从page cache分配出一定数量的Page,并切割成定长大小的小块内存,分配给central cache。并且会有一个名为span的行内标签,当一个span的几个跨度页的对象都回收以后,page cache会回收central cache满足条件的span对象,并且合并相邻的页从而组成更大的页,缓解内存碎片的问题。 可以看出,central cache在其中产生了一个居中调度的角色,thread cache内存多了,还给central cache,thread cache没有内存,central cache给其内存。page cache同样如此,但如果page cache没有内存了,就需要去操作系统中申请内存,直接去进程地址空间中申请堆空间,在Windows和Linux有所区别,上述提到过。 ![image-20230523171712321](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202305231717392.png) 将这三部分分别探讨: # 高并发内存池—thread cache 对于上面定长内存池的实现,采用了一个自由链表,这个自由链表的结点大小是固定的,即T类型大小(比T大就为void*的大小),但对于真正的tcmalloc,他并不是定长的内存池,最重要的是申请的结点的大小也不一定相同,比如`_freeList`每个结点大小为8字节,而此时我们却需要申请4字节,16字节,64字节等等大小,如果仍采用定长内存池的方案,用一个`_freeList`自由链表管理切好的小块的内存,是不可能实现这种情况的,因此我们可以考虑多个自由链表分别对应相应的结点大小。 在上一标题:高并发内存池整体框架设计的thread cache描述中,小于256KB大小的内存分配都会用到thread cache,此时就需要`256*1024`个字节,如果每个大小都一一对应相应的自由链表的话,理论上虽然可行,但是非常不现实。此时为了能够像这个思路靠拢,在设计时不必设置和字节数量相同的自由链表,而将大小根据一定标准分成若干个层次,也就对应若干个自由链表,这就意味着若干个一定是可以实现出来的。即8Byte,16Byte,24Byte...`256*1024`Byte。 * 当申请空间x<=8Byte时,采用第一个自由链表。(8为最小是考虑了64位环境指针的大小) * 当申请空间8 #include #include #include using std::cout;//避免命名污染 using std::endl; // C++不建议宏,这个用来assert static const size_t MAX_BYTES = 256 * 1024; void*& NextObj(void* obj)//取头上4/8字节的值 { return *(void**)obj; } //管理切分好的小对象的自由链表 class FreeList { public: void Push(void* obj) { assert(obj); //头插 //*(void**)obj = _freeList; NextObj(obj) = _freeList; _freeList = obj; } void* Pop() { assert(_freeList); //头删 void* obj = _freeList; _freeList = NextObj(obj);//下一个结点 return obj; } private: void* _freeList; }; ``` 可以看出,对于头插头删,与之前的定长内存池是一样的,所以就不详细解释了,这里的NextObj是将获取下一个结点的方法提取出来,方便观察。 *** 对于`ThreadCache.h`,首先要搭一个架子,即先创建一个ThreadCache类,在里面有申请和释放空间的方法,并且还有哈希表的成员变量,每个桶挂着的都是一个FreeList: ```cpp #pragma once #include"Common.h" class ThreadCache { public: //申请和释放内存对象 void* Allocate(size_t size); void Deallocate(void* ptr, size_t size);//后面这个size是为了后续的哈希映射 private: FreeList _freeLists[]; }; ``` 以_freeLists[]数组的形式,就可以模拟出哈希桶的映射结构,`[]`中的大小,就需要根据我们的需求来决定了。 **threadcache哈希桶映射对齐规则** 若以上述每8字节分档创建自由链表,则需要创建256*1024/8=32768个自由链表,也就是说上面的[]内的数字为32768,虽然可以,但是觉得还是有一点多,因为这是仿照tcmalloc设计的简化版本,所以在这个数量上不必过多,大致映射思路相同就可以了,所以做出以下设计: * 内存池使用范围在[1, 128]byte时,采用标准的8byte为标准对齐。 freelist[0,16) ==128/8=16个桶== * 内存池使用范围在[128+1, 1024]byte时,采用16byte为标准对齐。freelist[16,72) ==(1024-128)/16 + 16 = 72个桶== * 内存池使用范围在[1024+1, 8*1024]byte时,采用128byte为标准对齐。freelist[72,128) * 内存池使用范围在[8*1024+1,64\*1024]byte时,采用1024byte为标准对齐。freelist[128,184) * 内存池使用范围在[64*1024+1,256\*1024]byte时,采用8\*1024byte为标准对齐。freelist[184,208) 举个例子:如果使用到了第129个字节,并且我只需要申请一个字节的空间的话,那么根据现有标准就需要至少申请16字节,此时就浪费了15个字节,15/(129+16)= 0.103448275862069,大约浪费10% > 通过例子可以看出,随着使用基数的增加,我们将对齐标准也进行增加,这样即便对齐标准增加,内碎片浪费的百分比也不会很大,整体控制在最多10%左右的内碎片浪费 。相比32768个自由链表,208个自由链表则简化很多。需要注意的是,最开始时对齐标准不敢太大,因为最开始时基数小,内碎片浪费的百分比会很大,但对齐标准至少是8字节大小。 *** **代码设计** 上述的五种情况,就是我们所要设计的,最终得到的是对应的对齐数,因此可以在`Common.h`中新增一个类并进行进行如下设计: ```cpp //计算对象大小的对齐映射规则 class SizeClass { public: // 整体控制在最多10%左右的内碎片浪费 // [1,128] 8byte对齐    freelist[0,16) // [128+1,1024] 16byte对齐  freelist[16,72) // [1024+1,8*1024] 128byte对齐  freelist[72,128) // [8*1024+1,64*1024] 1024byte对齐   freelist[128,184) // [64*1024+1,256*1024] 8*1024byte对齐  freelist[184,208) static inline size_t _RoundUp(size_t size, size_t AlignNum) { size_t alignSize; if (size % AlignNum != 0) { alignSize = (size / AlignNum + 1) * AlignNum;//得到对齐数 } else//等于0就不需要处理 { alignSize = AlignNum; //得到对齐数 } return AlignNum;//返回对齐数 } static inline size_t RoundUp(size_t size) { if (size <= 128) { return _RoundUp(size, 8); } else if (size <= 1024) { return _RoundUp(size, 16); } else if (size <= 1024 * 8) { return _RoundUp(size, 128); } else if (size <= 1024 * 64) { return _RoundUp(size, 1024); } else if (size <= 1024 * 256) { return _RoundUp(size, 1024*8); } else { //超过1024*256时就不能用ThreadCache分配 assert(false); return -1; } } private: }; ``` 对于`_RoundUp()`,也可以学习大佬的写法: ```cpp static inline size_t _RoundUp(size_t bytes, size_t align) { return (((bytes)+align - 1) & ~(align - 1)); } ``` 由于这个类里面没有什么成员,完全是为了对齐规则而制定的,因此通过static,让其成员函数不必通过对象调用,并搞成一个内联减少栈帧的开销。 所以说,我们根据内存池使用容量的多少,最终实现对应的映射关系: ![image-20230613131227253](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202306131312327.png) 继续在`ThreadCache.cpp`中完善函数功能 ```cpp #include"ThreadCache.h" void* ThreadCache::Allocate(size_t size) { assert(size <= MAX_BYTES); size_t alignSize = SizeClass::RoundUp(size);//确定对齐数 } ``` 确定对齐数之后,根据我们所划分的规则,共有208个桶,因此我们还需要确定是哪一个桶,方法同样放在SizeClass中,因为读都是确定映射的规则,比较合适一些。 ![image-20230613164107852](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202306131641940.png) 首先是自己想到的代码风格: ```cpp static inline size_t _Index(size_t bytes, size_t alignNum) { if (bytes % alignNum == 0) { return bytes / alignNum - 1; } else { return bytes / alignNum; } } ``` 但位运算会更快,所以下方的映射运算通过位运算实现,和上面自己写的代码的结果是一样的。 `Common.h` ```cpp #pragma once #include #include #include #include #include using std::cout; using std::endl; static const size_t MAX_BYTES = 256 * 1024; static const size_t NFREELIST = 208; static void*& NextObj(void* obj) { return *(void**)obj; } // 管理切分好的小对象的自由链表 class FreeList { public: void Push(void* obj) { assert(obj); // 头插 //*(void**)obj = _freeList; NextObj(obj) = _freeList; _freeList = obj; } void* Pop() { assert(_freeList); // 头删 void* obj = _freeList; _freeList = NextObj(obj); return obj; } bool Empty() { return _freeList == nullptr; } private: void* _freeList = nullptr; }; // 计算对象大小的对齐映射规则 class SizeClass { public: // 整体控制在最多10%左右的内碎片浪费 // [1,128] 8byte对齐 freelist[0,16) // [128+1,1024] 16byte对齐 freelist[16,72) // [1024+1,8*1024] 128byte对齐 freelist[72,128) // [8*1024+1,64*1024] 1024byte对齐 freelist[128,184) // [64*1024+1,256*1024] 8*1024byte对齐 freelist[184,208) /*size_t _RoundUp(size_t size, size_t alignNum) { size_t alignSize; if (size % alignNum != 0) { alignSize = (size / alignNum + 1)*alignNum; } else { alignSize = size; } return alignSize; }*/ // 1-8 static inline size_t _RoundUp(size_t bytes, size_t alignNum) { return ((bytes + alignNum - 1) & ~(alignNum - 1)); } static inline size_t RoundUp(size_t size) { if (size <= 128) { return _RoundUp(size, 8); } else if (size <= 1024) { return _RoundUp(size, 16); } else if (size <= 8 * 1024) { return _RoundUp(size, 128); } else if (size <= 64 * 1024) { return _RoundUp(size, 1024); } else if (size <= 256 * 1024) { return _RoundUp(size, 8 * 1024); } else { assert(false); return -1; } } /*size_t _Index(size_t bytes, size_t alignNum) { if (bytes % alignNum == 0) { return bytes / alignNum - 1; } else { return bytes / alignNum; } }*/ // 1 + 7 8 // 2 9 // ... // 8 15 // 9 + 7 16 // 10 // ... // 16 23 static inline size_t _Index(size_t bytes, size_t align_shift) { return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1; } // 计算映射的哪一个自由链表桶 static inline size_t Index(size_t bytes) { assert(bytes <= MAX_BYTES); // 每个区间有多少个链 static int group_array[4] = { 16, 56, 56, 56 }; if (bytes <= 128) { return _Index(bytes, 3); } else if (bytes <= 1024) { return _Index(bytes - 128, 4) + group_array[0]; } else if (bytes <= 8 * 1024) { return _Index(bytes - 1024, 7) + group_array[1] + group_array[0]; } else if (bytes <= 64 * 1024) { return _Index(bytes - 8 * 1024, 10) + group_array[2] + group_array[1] + group_array[0]; } else if (bytes <= 256 * 1024) { return _Index(bytes - 64 * 1024, 13) + group_array[3] + group_array[2] + group_array[1] + group_array[0]; } else { assert(false); } return -1; } }; //字符编码修正:安装Force UTF-8(NO BOM) ``` 同时将桶的数量在`Common.h`中写死,此时记得将之前的_freeList[]中的[]里加上下面写死的NFREELIST: ![image-20230613164911389](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202306131649452.png) ![image-20230613165320396](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202306131653461.png) 接下来,`ThreadCache.cpp`中可以完成桶的选择,如果桶后有内存,即节点资源,直接就可以拿出来,并在这个所在自由链表的头删掉,因为被使用了;如果没有内存,那么就需要从中心缓存要内存了,即`CentralCache`,这就可以看出,在`ThreadCache.h`中的ThreadCache类中,还需要加上一个能够从中心缓存获取对象的成员函数: ```cpp #pragma once #include"Common.h" class ThreadCache { public: //申请和释放内存对象 ——from 1 to 8 -> sent to 8 void* Allocate(size_t size); void Deallocate(void* ptr, size_t size);//后面这个size是为了后续的哈希映射 //从中心缓存获取对象 void* FetchFromCentralCache(size_t index, size_t size); private: FreeList _freeLists[NFREELIST]; }; ``` 所以`ThreadCache.cpp`要如下编写代码: ```cpp #include"ThreadCache.h" void* ThreadCache::Allocate(size_t size) { assert(size <= MAX_BYTES); size_t alignSize = SizeClass::RoundUp(size);//确定对齐数 size_t index = SizeClass::Index(size);//确定桶 if (!_freeLists[index].Empty()) { return _freeLists[index].Pop(); } else { return FetchFromCentralCache(index, alignSize);//从中心缓存获取,并且不需要考虑对齐的问题。因为已经处理完了 } } ``` 此时回过头来,Empty还没有实现,因此在Common.h中的FreeList类中,将Empty()实现出来。 ```cpp //管理切分好的小对象的自由链表 class FreeList { public: void Push(void* obj) { assert(obj); //头插 //*(void**)obj = _freeList; NextObj(obj) = _freeList; _freeList = obj; } void* Pop() { assert(_freeList); //头删 void* obj = _freeList; _freeList = NextObj(obj);//下一个结点 return obj; } bool Empty() { return _freeList == nullptr; } private: void* _freeList; }; ``` 这样看来,对齐数也就完成了。 **小结:** 通过将内存浪费与时间消耗尽可能的互相调节,实现了这样的一个对齐方式。实际上对于这样8/16/128的对齐阶梯性质的规则,也可以统一为某个数字,不过这样在最开始消耗的内存百分比就会相对较大,此外也会发现没有采用自己的对齐函数,而是通过借鉴大佬的位运算的风格,因为对齐规则会频繁的调用,而所设计高并发内存池的目标就是极致的速度,所以采用大佬的位运算风格。而对于刚才向中心缓存CentralCache要内存,会在 **Central-cache** 标题中实现。 # threadcacheTLS无锁访问 之前提到过,对于threadcache,每个线程都有一个无锁的threadcache,threadcache里面是哈希桶,每个桶里面都是切好的小对象freeList自由链表。来了一个值以后,就需要向内存池申请空间大小,在此之前会先去freeList中索取内存,而一旦`_freeLists`哈希桶中都有对象,那么效率就会非常高。 > 那么如何获取到threadcache资源? > > 在线程的学习中,我们知道每一个进程都可以有多个线程,线程之间有属于自己的栈、寄存器等等……;但也有很多公共资源,如全局变量、数据段、代码段等等……。但每一个线程都有一个对应的threadcache,**这个时候就需要弄清楚threadcache是如何被线程创建出来的,什么时候被创建出来的** 这些都是需要思考的问题。 > > > > 如果将threadcache变成全局或者静态全局,线程之间就需要对threadcache的获取进行加锁,但此时的需求不让threadcache加锁访问,此时就需要一个新的知识:**tls** (不是网络的tls加密) **TLS--thread local storage:** [windows下tls](https://www.cnblogs.com/liu6666/p/12729014.html) [linux gcc下tls](https://zhuanlan.zhihu.com/p/142418922) 由于采用的环境是win,所以直接用win下的静态TLS对应的`_declspec(thread)`就可以了。 `ThreadCache.h` ```cpp #pragma once #include"Common.h" class ThreadCache { public: //申请和释放内存对象 ——from 1 to 8 -> sent to 8 void* Allocate(size_t size); void Deallocate(void* ptr, size_t size);//后面这个size是为了后续的哈希映射 //从中心缓存获取对象 void* FetchFromCentralCache(size_t index, size_t size); private: FreeList _freeLists[NFREELIST]; }; // TLS thread local storage _declspec(thread) ThreadCache* pTLSThreadCache = nullptr; ``` 通过`_declspec(thread)` 修饰的ThreadCache指针,在每个线程中就都有一个这样的指针。为了验证一下,新建`ConcurrentAlloc.h`,在其中,仍需要对应上面代码的allocate再创建申请和释放内存的函数`ConcurrentAlloc`和`ConcurrentFree`,其功能和ThreadCache类中的两个申请释放函数是一样的,不过是每个线程独有的。 由于多个文件都会引入`ConcurrentAlloc.h`,为了不让这里面的两个函数在全局不发生冲突,采用static修饰。 `ConcurrentAlloc.h` ```cpp #pragma once #include"Common.h" #include"ThreadCache.h" static void* ConcurrentAlloc(size_t size) { if (pTLSThreadCache == nullptr) { pTLSThreadCache = new ThreadCache; } return pTLSThreadCache->Allocate(size); } static void* ConcurrentFree(void* ptr) { } ``` > 为了做单元测试(每写一个部分,就对一部分进行测试),将之前的源文件Test.cpp更名为UnitTest.cpp会更好理解,因为之后还会有性能测试。 由于需要做单元测试,会采用C++11中的线程库的方式[thread - C++ Reference (cplusplus.com)](https://legacy.cplusplus.com/reference/thread/thread/?kw=thread), 因次在Common.h中引入thread库。 ![image-20230614134554289](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202306141345378.png) 此外,为了让其能够正常运行,之前未处理过的centralCache也需要粗略的编写一下,目的不是使用他,而是让单元测试正常运行。 ![image-20230614140146056](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202306141401111.png) 下面运行一下,运行结果: ![image-20230614140600165](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202306141406230.png) 这是由于NextObj和ThreadCache被多个源文件使用,从而造成符号表产生时发生冲突,所以还需将这些都进行static修饰。 ![image-20230614140738672](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202306141407735.png) ![image-20230614140804989](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202306141408057.png) 再次生成解决方案: ![image-20230614140825996](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202306141408054.png) ![image-20230614141017855](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202306141410921.png) 打开并行监视,查看: ![image-20230703104732256](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202307031047354.png) 运行时,每一个线程变量对应的地址是不变的,各个线程之间的地址不同,互不影响。 > 注:vs2019并行监视存在问题,此为通过vs2013观察。 此外还可以通过在ConcurrentAlloc.h的ConcurrentAlloc函数打印线程ID验证: ```cpp #pragma once #include"Common.h" #include"ThreadCache.h" static void* ConcurrentAlloc(size_t size) { if (pTLSThreadCache == nullptr) { pTLSThreadCache = new ThreadCache; } cout << std::this_thread::get_id() << ":" << pTLSThreadCache << endl;//打印验证 return pTLSThreadCache->Allocate(size); } static void* ConcurrentFree(void* ptr) { } ``` ![image-20230703105311522](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202307031053606.png) 可见,13722对应的pTLSThreadCache的地址一直都是000002E5FAFF7D50,43064对应的pTLSThreadCache的地址一直都是000002E5AFF8410。但是可以看出两个线程是交错的运行,即并行运行,改正UnitTest.cpp中t1.join()先执行再创建t2就会串行执行。 ```cpp #include"ObjectPool.h" #include"ConcurrentAlloc.h" void Alloc1() { for (size_t i = 0; i < 5; i++) { void* ptr = ConcurrentAlloc(6);//申请6字节 } } void Alloc2() { for (size_t i = 0; i < 5; i++) { void* ptr = ConcurrentAlloc(7);//申请7字节 } } void TLSTest() { std::thread t1(Alloc1); t1.join(); std::thread t2(Alloc2); t2.join(); } int main() { //TestObjectPool(); TLSTest(); return 0; } ``` ![image-20230703105750552](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202307031057623.png) ![image-20230703105820971](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202307031058052.png) 通过这个实验就可以看出,通过tls的方式,每个线程就可以无锁的获取自己专属的ThreadCache对象。所以,如果Thread映射的size中的桶存在对象,在Pop时效率就会非常高,因为通过TLS技术,可以无锁的进行并行访问!(上面的join再都放到最后面,变成并行) *** ConcurrentAlloc.h中还存在着 `static void ConcurrentFree(void* ptr,size_t size)`函数,即释放一个size对应的对象,也很简单,具体分为一下步骤: 1. 先断言一下,只有每个线程第一次ConcurrentAlloc的时候才为空,因此在释放的时候不会为空。 2. 通过线程对应的地址ptr以及size大小找到对应的对象,调用之前的Deallocate(ptr, size)方法,从而释放对应的对象。(注:目前size是需要加上的,后续连在一起的时候就可以将这个size去掉了) 3. 编写ThreadCache.cpp中的void ThreadCache::Deallocate(void* ptr, size_t size)函数:需要断言ptr不为空以及范围不超过MAX_BYTES,然后回收对象到对应的链表桶(因为之前还未编写具体的代码) `ConcurrentAlloc.h` ```cpp #pragma once #include"Common.h" #include"ThreadCache.h" static void* ConcurrentAlloc(size_t size) { //通过TLS 每个线程无锁的获取自己的专属的ThreadCache对象 if (pTLSThreadCache == nullptr) { pTLSThreadCache = new ThreadCache; } cout << std::this_thread::get_id() << ":" << pTLSThreadCache << endl; return pTLSThreadCache->Allocate(size); } //编写:先断言,再调用 static void* ConcurrentFree(void* ptr, size_t size) { assert(pTLSThreadCache); pTLSThreadCache->Deallocate(ptr, size); } ``` `ThreadCache.cpp` ```cpp #include"ThreadCache.h" void* ThreadCache::FetchFromCentralCache(size_t index, size_t size) { // ... return nullptr; } void* ThreadCache::Allocate(size_t size) { assert(size <= MAX_BYTES); size_t alignSize = SizeClass::RoundUp(size);//确定对齐数 size_t index = SizeClass::Index(size);//确定桶 if (!_freeLists[index].Empty()) { return _freeLists[index].Pop(); } else { return FetchFromCentralCache(index, alignSize);//从中心缓存获取,并且不需要考虑对齐的问题。因为已经处理完了 } } void ThreadCache::Deallocate(void* ptr, size_t size) { assert(ptr); assert(size <= MAX_BYTES); //找出对应的链表桶,并将对象插入进去 size_t index = SizeClass::Index(size);//算出是第几个桶 _freeLists[index].Push(ptr); } ``` ![image-20230703111948126](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202307031119279.png) 具体细节仍需优化,比如size可以去掉,不过这都是后话。 # 高并发内存池—central cache整体设计 > 回顾之前的内容,此项目分为三层,分别为thread cache、central cache、page cache。整体框架里说过,在thread cache中,自由链表一旦没有对象,就需要向central cache中获取对象,central cache整体设计也由此而来。 central cache也是一个哈希桶结构,他的哈希桶的映射关系跟thread cache是一样的。不同的是他的每个哈希桶位置挂的是SpanList链表结构,不过每个映射桶下面的span中的大内存块被按映射关系切成了一个个小内存块对象挂在span的自由链表中。 ![image-20230703141640491](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202307031416597.png) 与thread cache的区别: 1. central cache存在锁的竞争问题,即thread cache会向同一个central申请资源,但两个线程访问不同的桶就无须加锁,因此桶锁的竞争不会很激烈。 2. central cache挂的是span(跨度),如果是8Byte,那么就将span以8Byte分割成若干块,当thread cache申请对象时,central cache往往会给超出thread cache所需的对象大小。 **申请内存:** 1. 当thread cache中没有内存时,就会批量向central cache申请一些内存对象,这里的批量获取对象的数量使用了类似网络tcp协议拥塞控制的慢开始算法;central cache也有一个哈希映射的spanlist,spanlist中挂着span,从span中取出对象给thread cache,这个过程是需要加锁的,不过这里使用的是一个桶锁,尽可能提高效率。 2. central cache映射的spanlist中所有span都没有内存以后,或者当指定桶申请时,其映射的span没有内存,则需要向page cache申请一个新的span对象,拿到span以后将span管理的内按大小切好作为自由链表链接到一起。然后从span中取对象给thread cache。 3. central cache中挂的span中的use_count记录了分配了多少个对象出去,分配一个对象给thread cache,就++use_count。 **释放内存:** 当thread_cache过长或者线程销毁,则会将内存释放回central cache中的,释放回来时--use_count。当use_count减到0时则表示所有对象都回到了span,则将span释放回page cache,page cache中会对前后相邻的空闲页进行合并。 **价值体现:** 1. 第一个价值:如果thread cache从central cache申请多了,就需要将他还回去,这时候会有这种场景:thread cache1 还回去,还回去的内存对象可以供其他线程使用,此时thread cache2 再申请就可以申请thread cache1还回来的内存,这就是**均衡调度**。 2. 第二个价值:还回内存时,会--use_count。假设一块span是一页,切出了1024个8Byte,最开始central cache一个都没分时use_count=0,每分一个就+1,当thread cache拿多了还回去,use_count就会++,当use_count为0,所有的内存都还给了central cache,还回去时,要尽可能的将内存合并,缓解内存碎片的问题。 再次总结一下,central cache的价值体现的是一个承上启下的作用,thread cache没有内存找central cache,并且存在锁竞争;central cache没有内存找page cache,page cache没有内存找系统。 # 高并发内存池—central cache结构设计 > 根据central cache整体设计来实现代码 新增一个`CentralCache.h`用来编写central cache的结构代码。 首先,需要在Common.h中设计一个struct Span类,其结构为带头双向链表,因为不只是`central cache`在用,`page cache`也在用,所以在`Common.h`中设计,里面的成员包括: 1. 页号(_pageId) 在进程地址空间中,有2位和64位之分。32位下,假设每页有8G,即2^13^,那么页数=2^32^ / 2 ^13^ = 2^19^ 页;64位下,假设每页有8G,即2^13^ , 那么页数=2^64^ / 2^13^ =2^51^页,这个数量超过了int以及size_t所能表示的范围,但64位可以用unsigned long long 表示(范围为2^64^ -1),32位下如果也用此类型就会产生浪费,所以选择采用条件编译确定是32位还是64位。 ![image-20230703191428193](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202307031914309.png) 可以发现,明明是x64环境,但是条件编译亮的还是32位,这是因为: > 1. WIN32/_WIN32可以用来判断是否Windows系统(对于跨平台程序) > > 2. _WIN64用来判断编译环境是x86(32位)还是x64(64位) > > 在Win32配置下,_WIN32有定义,\_WIN64没有定义。 > > 在x64配置下,_WIN32和\_WIN64都有定义。 所以这个时候都有,那还是走_WIN32,所以调换一下位置就可以解决问题: ![image-20230706211915268](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202307062119370.png) ![image-20230706212430981](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202307062124052.png) 解决了范围问题,就可以继续对Span添加成员。 由于需要设计双向循环链表的类,所以再定义一个SpanList类,其应包括带头双向链表的增删查改。 比如插入: ![image-20230706214724717](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202307062147774.png) 而删除时,并不需要delete掉,因为我们需要将内存块还给page cache: ![image-20230707150854095](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202307071508173.png) `Common.h`新增如下: ```cpp // 管理多个连续页大块内存跨度结构 struct Span { PAGE_ID _pageId = 0; // 大块内存起始页的页号 size_t _n = 0; // 页的数量 Span* _next = nullptr; // 双向链表的结构 Span* _prev = nullptr; size_t _useCount = 0; // 切好小块内存,被分配给thread cache的计数 void* _freeList = nullptr; // 切好的小块内存的自由链表 }; // 带头双向循环链表 class SpanList { public: SpanList() { _head = new Span; _head->_next = _head; _head->_prev = _head; } void Insert(Span* pos, Span* newSpan) { assert(pos); assert(newSpan); Span* prev = pos->_prev; // prev newspan pos prev->_next = newSpan; newSpan->_prev = prev; newSpan->_next = pos; pos->_prev = newSpan; } void Erase(Span* pos) { assert(pos); assert(pos != _head); Span* prev = pos->_prev; Span* next = pos->_next; prev->_next = next; next->_prev = prev; } private: Span* _head; }; ``` 此时的Common.h将其与CentralCache.h联动起来: centralcache的映射规则和threadcache的映射规则是一样的,即threadcache是208个桶,threadcache也是208个桶。 > 如果在threadcache中找到的是24字节,发现是2号桶,如果有内存就直接在threadcache中获取,如果没有,则就会去centralcache对应的2号桶,并且向span中获取。 `CentralCache.h` ```cpp #pragma once #include"Common.h" class CentralCache { public: private: SpanList _spanLists[NFREELIST]; }; ``` 所以上述的_spanLists[NFREELIST]与\_freeLists[NFREELIST]是对应的。而threadcache是无锁的,但centralcache是有锁的,即桶锁,有多少个桶就有多少个锁。所以不同的是centralcache需要在每个桶都各自设计一把锁,在竞争的时候进行加锁。 头文件:`#include`添加在common.h中。在SpanList类中加上桶锁。 ![image-20230707152059499](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202307071520588.png) `Common.h` ```cpp #pragma once #include #include #include #include #include #include using std::cout; using std::endl; static const size_t MAX_BYTES = 256 * 1024; static const size_t NFREELIST = 208; #ifdef _WIN64 typedef unsigned long long PAGE_ID; #elif _WIN32 typedef size_t PAGE_ID; #else //Linux #endif // _WIN32 static void*& NextObj(void* obj) { return *(void**)obj; } // 管理切分好的小对象的自由链表 class FreeList { public: void Push(void* obj) { assert(obj); // 头插 //*(void**)obj = _freeList; NextObj(obj) = _freeList; _freeList = obj; } void* Pop() { assert(_freeList); // 头删 void* obj = _freeList; _freeList = NextObj(obj); return obj; } bool Empty() { return _freeList == nullptr; } private: void* _freeList = nullptr; }; // 计算对象大小的对齐映射规则 class SizeClass { public: // 整体控制在最多10%左右的内碎片浪费 // [1,128] 8byte对齐 freelist[0,16) // [128+1,1024] 16byte对齐 freelist[16,72) // [1024+1,8*1024] 128byte对齐 freelist[72,128) // [8*1024+1,64*1024] 1024byte对齐 freelist[128,184) // [64*1024+1,256*1024] 8*1024byte对齐 freelist[184,208) /*size_t _RoundUp(size_t size, size_t alignNum) { size_t alignSize; if (size % alignNum != 0) { alignSize = (size / alignNum + 1)*alignNum; } else { alignSize = size; } return alignSize; }*/ // 1-8 static inline size_t _RoundUp(size_t bytes, size_t alignNum) { return ((bytes + alignNum - 1) & ~(alignNum - 1)); } static inline size_t RoundUp(size_t size) { if (size <= 128) { return _RoundUp(size, 8); } else if (size <= 1024) { return _RoundUp(size, 16); } else if (size <= 8 * 1024) { return _RoundUp(size, 128); } else if (size <= 64 * 1024) { return _RoundUp(size, 1024); } else if (size <= 256 * 1024) { return _RoundUp(size, 8 * 1024); } else { assert(false); return -1; } } /*size_t _Index(size_t bytes, size_t alignNum) { if (bytes % alignNum == 0) { return bytes / alignNum - 1; } else { return bytes / alignNum; } }*/ // 1 + 7 8 // 2 9 // ... // 8 15 // 9 + 7 16 // 10 // ... // 16 23 static inline size_t _Index(size_t bytes, size_t align_shift) { return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1; } // 计算映射的哪一个自由链表桶 static inline size_t Index(size_t bytes) { assert(bytes <= MAX_BYTES); // 每个区间有多少个链 static int group_array[4] = { 16, 56, 56, 56 }; if (bytes <= 128) { return _Index(bytes, 3); } else if (bytes <= 1024) { return _Index(bytes - 128, 4) + group_array[0]; } else if (bytes <= 8 * 1024) { return _Index(bytes - 1024, 7) + group_array[1] + group_array[0]; } else if (bytes <= 64 * 1024) { return _Index(bytes - 8 * 1024, 10) + group_array[2] + group_array[1] + group_array[0]; } else if (bytes <= 256 * 1024) { return _Index(bytes - 64 * 1024, 13) + group_array[3] + group_array[2] + group_array[1] + group_array[0]; } else { assert(false); } return -1; } }; //字符编码修正:安装Force UTF-8(NO BOM) // 管理多个连续页大块内存跨度结构 struct Span { PAGE_ID _pageId = 0; // 大块内存起始页的页号 size_t _n = 0; // 页的数量 Span* _next = nullptr; // 双向链表的结构 Span* _prev = nullptr; size_t _useCount = 0; // 切好小块内存,被分配给thread cache的计数 void* _freeList = nullptr; // 切好的小块内存的自由链表 }; // 带头双向循环链表 class SpanList { public: SpanList() { _head = new Span; _head->_next = _head; _head->_prev = _head; } void Insert(Span* pos, Span* newSpan) { assert(pos); assert(newSpan); Span* prev = pos->_prev; // prev newspan pos prev->_next = newSpan; newSpan->_prev = prev; newSpan->_next = pos; pos->_prev = newSpan; } void Erase(Span* pos) { assert(pos); assert(pos != _head); Span* prev = pos->_prev; Span* next = pos->_next; prev->_next = next; next->_prev = prev; } private: Span* _head; std::mutex _mtx;//桶锁。 }; ``` centralcache的基础结构就设计到这里,他的核心逻辑:即与thread cache的联动 还需继续设计。 > ps: 此开源项目源码中也大量涉及到assert,宁愿直接终止程序,也不能存在细节错误。 # 高并发内存池—central cache核心实现 > 上几节说到,threadcache通过tls技术可以获得线程独享的资源且不需要加锁,而centralcache则需要对每个桶都进行加锁,向threadcache对应的桶申请内存,如果没有内存,由于centralcache与threadcache的映射规则是一样的,就需要向centralcache对应的桶中获取,而centralcache没内存的情况暂且不谈。所以,目前需要解决的是,如何向centralcache中获取内存对象呢? ## 1. 单例模式创建 每个进程中只有一个central cache,多个thread cache,因此此时central cache可以设计成单例模式来获取对象。单例模式分为饿汉模式和懒汉模式,懒汉模式略复杂,所以此时设计成饿汉模式。 * 将central cache在CentralCache.h中设计成单例模式中的饿汉模式。 `CentralCache.h` ```cpp #pragma once #include"Common.h" //单例模式 class CentralCache { public: static CentralCache* GetInstance() { return &_sInst;//返回指针 } private: SpanList _spanLists[NFREELIST]; private: CentralCache() {} CentralCache(const CentralCache&) = delete; static CentralCache _sInst; }; ``` 类中的_sInst只是声明,还需要定义。\_sInst不应该在`CentralCache.h`中定义,否则引用此头文件时\_sInst会在多个.cpp文件里面包含。所以\_sInst在CentralCache.cpp中定义,此时也是第一次编辑CentralCache.cpp。 `CentralCache.cpp` ```cpp #include"CentralCache.h" CentralCache CentralCache::_sInst; ``` 对于ThreadCache.cpp,存在当对应的链表为空时,就会向CentralCache中心缓存中申请对象空间: ![image-20230711150914187](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202307111509289.png) 为了缓解桶锁的竞争问题,每次申请空间时会多给一些,但是由于每个桶的对象对应的大小不同,那么给多了难免会造成空间浪费。基于防止这种占着茅坑不拉屎的行为,在`void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)`中采用**慢开始反馈调节算法**进行处理。 ## 2. 慢开始反馈调节算法 **什么是慢开始反馈调节算法:** 慢开始反馈调节算法用于计算central cache应该分配给thread cache多少个内存块对象,当thread cache向central cache申请内存时,如果申请的是较小的对象,那么可以多给一点,但如果申请的是较大的对象,就可以少给一点。 ![image-20230711152625767](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202307111526861.png) 算法设计步骤: 1. 传入对应size,表示对应内存块的大小。 2. 通过num确定数量。 3. 调节num大小,num较小,则多给些;num较大则分批给。 将此算法添加到Common.h中的SizeClass类中: ```cpp // 一次thread cache从中心缓存获取多少个 static size_t NumMoveSize(size_t size) { assert(size > 0); //对象越大,计算出的上限越低 //对象越小,计算出的上限越高 int num = MAX_BYTES / size; if (num < 2) num = 2; if (num > 512) num = 512; return num; } ``` 此时,ThreadCache.cpp中的`void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)`就可以使用该算法。 就算申请的是小对象,一次性给出512个也是比较多的,基于这个原因,我们在FreeList结构中增加一个叫做_maxSize的成员变量,该变量的初始值设置为1,并且提供一个公有成员函数用于获取这个变量。也就是说,现在thread cache中的每个自由链表都会有一个自己的\_maxSize。 ![image-20230711165026926](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202307111650009.png) ```cpp // 管理切分好的小对象的自由链表 class FreeList { public: void Push(void* obj) { assert(obj); // 头插 //*(void**)obj = _freeList; NextObj(obj) = _freeList; _freeList = obj; } void* Pop() { assert(_freeList); // 头删 void* obj = _freeList; _freeList = NextObj(obj); return obj; } bool Empty() { return _freeList == nullptr; } size_t& MaxSize() { return _maxSize; } private: void* _freeList = nullptr; size_t _maxSize = 1; }; ``` 那么此时,通过NumMoveSize方法获取到的值与这个_maxSize进行比较,取出最小值赋值给batchNum作为本次申请对象的个数,如果\_freeLists[index].MaxSize() == batchNum,就说明此时的\_maxSize是最小的,此时给\_maxSize+1,就会起到慢增长的作用。 因此,thread cache第一次向central cache申请某大小的对象时,申请到的都是一个,但下一次thread cache再向central cache申请同样大小的对象时,因为该自由链表中的_maxSize增加了,最终就会申请到两个。直到该自由链表中\_maxSize的值,增长到超过计算出的值后就不会继续增长了,此后申请到的对象个数就是计算出的个数。 > 若+1比较慢,可以将其改成+n,n为任意正整数。 ```cpp void* ThreadCache::FetchFromCentralCache(size_t index, size_t size) { //慢开始反馈调节算法 //1、最开始不会一次向central cache批量要太多,因为要太多了可能用不完 //2、如果不断有size大小内存的需求,就会不断增长,直到上限 //3、size越大,一次向central cache要的batchNum就越小 //4、size越小,一次向central cache要的batchNum就越大(这个越大是慢慢增长的) size_t batchNum = std::min(_freeLists[index].MaxSize(),SizeClass::NumMoveSize(size));//表示获取数量 if (_freeLists[index].MaxSize() == batchNum) { _freeLists[index].MaxSize() += 1; } return nullptr; } ``` **慢开始反馈调节算法** 1、最开始不会一次向central cache批量要太多,因为要太多了可能用不完 2、如果不断有size大小内存的需求,就会不断增长,直到上限 3、size越大,一次向central cache要的batchNum就越小 4、size越小,一次向central cache要的batchNum就越大(这个越大是慢慢增长的) ## 3. 向central cache申请内存 上述的FetchFromCentralCache中,只存在慢开始反馈调节算法,但是还没有向central cache申请内存的代码,因此,还需要在此函数中继续编辑申请centralcache内存的代码。 **方法步骤:** 1. 在此ThreadCache.cpp中引入CentralCache.h头文件。 2. 引入单例对象GetInstance(),获取资源。 3. 在CentralCache.h中添加对应的获取资源的方法供单例对象使用。 4. 对FetchFromCentralCache添加相应的获取单例对象资源的操作。 按照上述步骤,代码相应的,也应该添加对应方法: 1. 在CentralCache.h中的CentralCache添加方法:FetchRangeObj ```cpp #pragma once #include"Common.h" //单例模式 class CentralCache { public: static CentralCache* GetInstance() { return &_sInst;//返回指针 } // 从中心缓存获取一定数量的对象给thread cache size_t FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size); private: SpanList _spanLists[NFREELIST]; private: CentralCache() {} CentralCache(const CentralCache&) = delete; static CentralCache _sInst; }; ``` 参数说明:(start、end均作为输出型参数) * start:对应内存链表的头。 * end:对应内存链表的尾。 * batchNum:对象数量。 * size:对象大小。 2. 完善FetchFromCentralCache:添加申请单例对象的内存。 如果要十块内存,而Span只有5块,那么就只能根据实际给出对应的5块内存大小。此外结点的申请至少一个,因此避免错误采用assert的方式。 ```cpp void* ThreadCache::FetchFromCentralCache(size_t index, size_t size) { //慢开始反馈调节算法 //1、最开始不会一次向central cache批量要太多,因为要太多了可能用不完 //2、如果不断有size大小内存的需求,就会不断增长,直到上限 //3、size越大,一次向central cache要的batchNum就越小 //4、size越小,一次向central cache要的batchNum就越大(这个越大是慢慢增长的) size_t batchNum = std::min(_freeLists[index].MaxSize(), SizeClass::NumMoveSize(size));//表示获取数量 if (_freeLists[index].MaxSize() == batchNum) { _freeLists[index].MaxSize() += 1; } void* start = nullptr; void* end = nullptr; size_t actualNum = CentralCache::GetInstance()->FetchRangeObj(start, end, batchNum, size); assert(actualNum > 1);//至少为1,否则new失败抛异常 if (actualNum == 1) { assert(start == end); return start; } else { _freeLists[index].PushRange(NextObj(start), end); return start; } } ``` 3. 添加FreeList中范围插入的方法:PushRange 不管自由链表有没有结点,直接头插就可以了:让end指向第一个 ![image-20230711183918177](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202307111839290.png) 4. 完善CentralCache中的FetchRangeObj方法,在CentralCache.cpp实现: ![image-20230711184408119](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202307111844202.png) 实现步骤: * 先映射对应桶,找到对应位置(有Span就直接用,Span为空则需要向page cache申请),那么此时就需要再CentralCache类中再添加一个能够向page cache获取内存的GetOneSpan。 ![image-20230711185025688](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202307111850769.png) * 加锁。由于多个threadCache有可能会竞争同一个桶。注:将锁设成public。 * 取走链表【start, end】之间的结点。若batchNum大于Span的结点数量,也就是结点不够,那么取的时候一定会error,所以需要继续判断结点的终点,通过while循环就可以顺便判断这个情况。 ```cpp #include"CentralCache.h" CentralCache CentralCache::_sInst; //获取一个非空的Span Span* CentralCache::GetOneSpan(SpanList& list, size_t size) { //逻辑暂未实现 return nullptr; } // 从中心缓存获取一定数量的对象给thread cache size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size) { size_t index = SizeClass::Index(size); _spanLists[index]._mtx.lock(); Span* span = GetOneSpan(_spanLists[index], size); //从span中获取batchNum个对象 //如果不够batchNum个,有多少拿多少 start = span->_freeList; end = start; size_t i = 0; size_t actualNum = 1; while (i < batchNum - 1 && NextObj(end) != nullptr) { end = NextObj(end); ++i; ++actualNum; } span->_freeList = NextObj(end); NextObj(end) = nullptr; _spanLists[index]._mtx.unlock(); return actualNum; } ``` 此时也可以编译成功,不过GetOneSpan还没有完成。 ![image-20230711203916209](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202307112039308.png) # 高并发内存池—page cache整体设计 新增`PageCache.h`,`PageCache.cpp` 。 pagecache作为最后一层需要进行设计的结构。无一例外,仍然是申请内存,释放内存,与上一层centralcache建立内存申请上的连接关系。 **申请内存:** 1. 当central cache向page cache申请内存时,page cache先检查对应位置有没有span,如果没有则向更大页寻找一个span,如果找到则分裂成两个。比如:申请的是4页page,4页page后面没有挂span,则向后面寻找更大的span,假设在10页page位置找到一个span,则将10页pagespan分裂为一个4页page span和一个6页page span。 ![image-20230714150738525](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202307141507732.png) 2. 如果找到_spanList[128]都没有合适的span,则向系统(堆)使用mmap、brk或者是VirtualAlloc等方式 申请128页page span挂在自由链表中,再重复1中的过程。 3. 需要注意的是central cache和page cache 的核心结构都是spanlist的哈希桶,但是他们是有本质区别的,central cache中哈希桶,是按跟thread cache一样的大小对齐关系映射的,他的spanlist中挂的span中的内存都被按映射关系切好链接成小块内存的自由链表。而page cache 中的spanlist则是按下标桶号映射的,也就是说第i号桶中挂的span都是i页内存。 因此,由于映射方式不同,在Common.h中可以继续增加一个静态变量来控制映射下标,把这个变量称为NPAGES。 > 单个对象最多内存为256KB,至于page cache当中究竟有多少个桶,这就要看你最大想挂几页的span了,这里我们就最大挂128页的span,假设一页我们用的是8K,128*8k=1024KB,那么就可以切四段给centralcache。此时以及为了让桶号与页号对应起来,我们可以将第0号桶空出来不用,因此我们需要将哈希桶的个数设置为129。 ```cpp static const size_t NPAGES = 129; ``` 相比centralcache的桶锁,此时的pagecache不能使用桶锁,而是需要一把锁住整体的锁对pagecache的资源进行管理。 **为什么不能用桶锁?** PageCache中的Span是按序号映射的,若此时centralcache需要2page的span,而PageCache对应2page的span为空,我们知道pagecache的下一层就是堆,直接向堆申请内存,由于要的内存不是很大,这就会造成内存碎片问题,因此除了向堆要内存,还可以采用另外一种方式:继续向下找其他的npage,n大于当前需要的数。比如2page后的span为nullptr,那就继续向下找3page,将3page分割成2page和1page,2page被centralcache拿去使用,剩下的1page挂到序号为1的桶下,即1page的后面。 ![image-20230714163349898](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202307141633978.png) 因此,这就与centralcache不同,pagecache的桶是不确定的,所以不能采用桶锁,而是用一把大锁将整个pagecache都锁住。这样的好处是当还回内存对象给pagecache时,可以将多个page进行合并成一个大的page,这样的空间就更加的连续。我们知道,一开始pagecache是不挂span的,因此当centralcache向pagecache申请内存时,就可以直接申请128KB的内存挂在pagecache的128号位,然后当centralcache进行申请内存时,为空就一直向下,直到找到对应的128号位,再通过分割的方式获取内存,这样就避免了内存碎片的问题。 此时,span的usecount变量就管理着span块的数量。如果central cache中的span usecount等于0,说明切分给thread cache的小块内存都还回来了,则centralcache把这个span还给page cache,page cache通过页号,查看前后相邻页是否空闲,是的话就合并,合并出更大的页,解决内存碎片问题。 **PageCache类如何设计?** ![image-20230714154444109](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202307141544175.png) 每个线程找centralcache,唯一的单例对象centralcache又去找pagecache,pagecache也是唯一的,故pagecache同样也需要设计成单例模式(饿汉)。类内声明,PageCache.cpp即类外定义。 `PageCache.h` ```cpp #pragma once #include"Common.h" class PageCache { public: static PageCache* GetInstance() { return &_sInst; } //获取一个K页的span Span* NewSpan(size_t k); private: SpanList _spanLists[NPAGES]; std::mutex _pageMtx; PageCache() {} PageCache(const PageCache&) = delete; static PageCache _sInst; }; ``` `PageCache.cpp` ```cpp #include"PageCache.h" PageCache PageCache::_sInst; ``` 然后,就需要central cache向page cache中获取span,将二者之间联系起来。 # 高并发内存池—page cache中获取span ## 1. CentralCache向page cache获取span 此时,已经完成了thread cache与central cache之间的联系,而现在要做的就是central cache与page cache之间的联系。在centralcache还剩下一个连接central cache和page cache的函数: ![image-20230714172150947](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202307141721065.png) 由于spanList是双向带头循环链表,即如下结构: ![image-20230715143350429](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202307151433541.png) 所以,为了方便控制,在spanList中添加对应的双向链表的头和尾的接口,以及后续需要头插: ```cpp // 带头双向循环链表 class SpanList { public: Span* Begin() { return _head->_next; } Span* End() { return _head; } void PushFront(Span* span)//头插 { Insert(Begin(), span); } private: Span* _head; public: std::mutex _mtx; }; ``` 提供了Begin()和End(),就方便在GetOneSpan中遍历了。 GetOneSpan完成的功能:centralcache获取一个非空的Span 1. 先查看当前的spanlist中是否含有未分配对象的span,若没有,则需执行后面的步骤,否则不需执行。 2. 没有空闲的span了,只能找page要。 3. 计算span的大块内存的起始地址和大块内存的大小(字节数) 4. 把大块内存切成自由链表链接起来 > 当central cache申请到若干页的span后,还需要将这个span切成一个个对应大小的对象挂到该span的自由链表当中。 > >   如何找到一个span所管理的内存块呢?首先需要计算出该span的起始地址,我们可以用这个span的起始页号乘以一页的大小即可得到这个span的起始地址,然后用这个span的页数乘以一页的大小就可以得到这个span所管理的内存块的大小,用起始地址加上内存块的大小即可得到这块内存块的结束位置。 > >   明确了这块内存的起始和结束位置后,我们就可以进行切分了。根据所需对象的大小,每次从大块内存切出一块固定大小的内存块尾插到span的自由链表中即可。 > >   为什么是尾插呢?因为我们如果是将切好的对象尾插到自由链表,这些对象看起来是按照链式结构链接起来的,而实际它们在物理上是连续的,这时当我们把这些连续内存分配给某个线程使用时,可以提高该线程的CPU缓存利用率。 ```cpp //获取一个非空的Span Span* CentralCache::GetOneSpan(SpanList& list, size_t size) { //先查看当前的spanlist中是否含有未分配对象的span Span* it = list.Begin(); while (it != list.End()) { //判断span是否合适 if (it->_freeList != nullptr) { return it; } else { it = it->_next; } } //走到这里说明没有空闲的span了,只能找page要 Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size)); //计算span的大块内存的起始地址和大块内存的大小(字节数) char* start = (char*)(span->_pageId << PAGE_SHIFT); size_t bytes = span->_n << PAGE_SHIFT; char* end = start + bytes; //把大块内存切成自由链表链接起来 // 1、先切下来一块做头,方便尾插 span->_freeList = start; start += size; void* tail = span->_freeList; while (start < end) { NextObj(tail) = start; tail = NextObj(tail);// tail = start start += size; } list.PushFront(span);//头插进去 return span; return nullptr; } ``` 此时就完成了centralcache获取span的步骤。其中有SizeClass::NumMovePage(size)我们尚未实现,其功能是向page要size大小的内存。 **NumMovePage()原理:** 那具体是向page cache申请多大的内存块呢?我们可以根据具体所需对象的大小来决定,就像之前我们根据对象的大小计算出,thread cache一次向central cache申请对象的个数上限,现在我们是根据对象的大小计算出,central cache一次应该向page cache申请几页的内存块。   我们可以先根据对象的大小计算出,thread cache一次向central cache申请对象的个数上限,然后将这个上限值乘以单个对象的大小,就算出了具体需要多少字节,最后再将这个算出来的字节数转换为页数,如果转换后不够一页,那么我们就申请一页,否则转换出来是几页就申请几页。也就是说,central cache向page cache申请内存时,要求申请到的内存尽量能够满足thread cache向central cache申请时的上限。 ```cpp //管理对齐和映射等关系 class SizeClass { public: //central cache一次向page cache获取多少页 static size_t NumMovePage(size_t size) { size_t num = NumMoveSize(size); //计算出thread cache一次向central cache申请对象的个数上限 size_t nPage = num*size; //num个size大小的对象所需的字节数 nPage >>= PAGE_SHIFT; //将字节数转换为页数 if (nPage == 0) //至少给一页 nPage = 1; return nPage; } }; ``` 代码中的`PAGE_SHIFT`代表页大小转换偏移,我们这里以页的大小为8K为例,`PAGE_SHIFT`的值就是13。 ```cpp //页大小转换偏移,即一页定义为2^13,也就是8KB static const size_t PAGE_SHIFT = 13; ``` ## 2. page向系统获取K页的span 在pagecache整体设计中,pagecache获取span的步骤: 1. 通过对应的序号桶找到对应位置 2. 在这个映射下寻找非空的span 3. 若有,则用这个span;若没有,步骤如下: 4. 找到更大的非空span进行分割,若没有则向系统申请内存。最开始的pagecache就没有任何span。 5. 若向系统申请内存了,则需要递归再调用一次,才能返回给central cache。 若在第四步找到更大的非空span,则将其切分成一个k页的span和一个n-k页的span;k页的span返回给central cache;n-k页的span挂到第n-k号桶中去。 `PageCache.h` ```cpp #include"PageCache.h" PageCache PageCache::_sInst; //获取一个k页的span Span* PageCache::NewSpan(size_t k) { assert(k > 0 && k < NPAGES); if (!_spanLists[k].Empty()) { return _spanLists->PopFront(); } //检查一下后面的桶里面有没有span,如果有可以把他进行切分 for (size_t i = k + 1; i < NPAGES; i++) { if (!_spanLists[i].Empty()) { Span* nSpan = _spanLists[i].PopFront(); Span* kSpan = new Span; //在nspan的头部切一个k页下来 //k页的span返回 //nSpan再挂到对应映射的位置 kSpan->_pageId = nSpan->_pageId; kSpan->_n = k; nSpan->_pageId += k; nSpan->_n -= k; //挂上去 _spanLists[nSpan->_n].PushFront(nSpan); return kSpan; } } //到了这里说明pagecache没有大页的span,则需要向系统申请 //这时就去找堆要一个128页的span Span* bigSpan = new Span; void* ptr = SystemAlloc(NPAGES - 1); bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT; bigSpan->_n = NPAGES - 1; _spanLists[bigSpan->_n].PushFront(bigSpan); return NewSpan(k); } ``` 回顾一下三层结构: * thread cache:无锁 * central cache:桶锁(单例模式) * page cache:整个锁(单例模式) **锁的问题** 需要注意的是,在FetchRangeObj中,存在将central cache加上桶锁,如果在这个过程中,central cache没有span并且需要向page cache申请,那么FetchRangeObj的锁就需要解锁,虽然在申请内存上无伤大雅,只需要等待,但对于释放内存来说,一旦不解开central cache的桶锁,其他线程释放内存对象回来就会产生阻塞,因此为了避免这种情况的发生,就需要在GetOneSpan函数中向page cache申请内存之前将桶锁解锁。 ![image-20230715200509794](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202307152005926.png) 此外对于page cache的锁,因为其递归的方式不能直接对其加锁,因此对于这种情况,可以使用专门的递归互斥锁:recursive_mutex,也可以分离一个子函数_NewSpan来避免递归。 为了让PageCache加锁更加方便,将_mtx进行public化更加方便。(封一个接口也可以)直接在pageCache要之前就加上锁,这样就可以避免递归锁的问题。 ![image-20230715200759070](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202307152007213.png) 获取的span不需要加锁,因为没有此时的切分不会出现线程之间的竞争,由于没有挂到list上,故其他的线程访问不到span。 但是当插入剩下的n-k的span,需要挂在list上,此时就需要加锁: ![image-20230715205231846](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202307152052982.png) **思考:** 1.为什么不在从系统申请128字节时直接切分,反而要递归? 这实际上是一个代码复用的好习惯,直接切分成2字节和126字节固然很好,但是这样就不能通过递归来复用代码,即便递归调用会慢一点点,但是这点东西对于计算机来说不算什么,反而复用更重要。 2.申请流程已经完成了,此时还差释放内存的过程,以及这三层的申请内存过程的联合调用。 # 申请内存过程连调 ## 1. 生成解决方案—检查错误 ![image-20230716134956514](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202307161349641.png) 目前编译是失败的,故存在一些细节错误。 由于包了windows.h,windows.h中存在着min的宏,而在ThreadCache.cpp中存在std::min,由于前者的优先级高,因此作用域std会产生语法错误,所以采用将std命名空间去掉,即不用algorithm中的min,而是采用windows.h中的宏min。 ![image-20230716135608655](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202307161356812.png) ## 2. UnitTest.cpp单元测试 在将thread cache、central cache以及page cache的申请流程写通了之后,我们就可以向外提供一个TestConcurrentAlloc函数,用于申请内存块。每个线程第一次调用该函数时会通过TLS获取到自己专属的thread cache对象,然后每个线程就可以通过自己对应的thread cache申请对象了。这是一开始就在ConcurrentAlloc.h中就提供的接口。 ```cpp static void* ConcurrentAlloc(size_t size) { //通过TLS 每个线程无锁的获取自己的专属的ThreadCache对象 if (pTLSThreadCache == nullptr) { pTLSThreadCache = new ThreadCache; } cout << std::this_thread::get_id() << ":" << pTLSThreadCache << endl; return pTLSThreadCache->Allocate(size); } ``` > 申请内存过程联调测试一 由于在多线程场景下调试观察起来非常麻烦,这里就先不考虑多线程场景,看看在单线程场景下代码的执行逻辑是否符合我们的预期,其次,我们这里就只简单观察在一个桶当中的内存申请就行了。   下面该线程进行了五次内存申请,这五次内存申请的字节数最终都对齐到了8,此时当线程申请内存时就只会访问到thread cache的第0号桶。 ```cpp void TestConcurrentAlloc() { //都映射在一号桶 void* p1 = ConcurrentAlloc(6); void* p2 = ConcurrentAlloc(8); void* p3 = ConcurrentAlloc(1); void* p4 = ConcurrentAlloc(7); void* p5 = ConcurrentAlloc(8); } int main() { TestConcurrentAlloc(); return 0; } ``` ![image-20230716142048436](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202307161420525.png) 当线程第一次申请内存时,该线程需要通过TLS获取到自己专属的thread cache对象,然后通过这个thread cache对象进行内存申请。 ![image-20230716151408734](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202307161514805.png) 在申请内存时通过计算索引到了thread cache的第0号桶,但此时thread cache的第0号桶中是没有对象的,因此thread cache需要向central cache申请内存块。 ![image-20230716151544051](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202307161515104.png) ![image-20230716151653402](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202307161516451.png)  在向central cache申请内存块前,首先通过NumMoveSize函数计算得出,thread cache一次最多可向central cache申请8字节大小对象的个数是512,但由于我们采用的是慢开始算法,因此还需要将上限值与对应自由链表的_maxSize的值进行比较,而此时对应自由链表_maxSize的值是1,所以最终得出本次thread cache向central cache申请8字节对象的个数是1个。 并且在此之后会将该自由链表中_maxSize的值进行自增,下一次thread cache再向central cache申请8字节对象时最终申请对象的个数就会是2个了。 ![image-20230716151953072](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202307161519128.png) 在thread cache向central cache申请对象之前,需要先将central cache的0号桶的锁加上,然后再从该桶获取一个非空的span。 ![image-20230716152258490](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202307161522552.png) 在central cache的第0号桶获取非空span时,先遍历对应的span双链表,看看有没有非空的span,但此时肯定是没有的,因此在这个过程中我们无法找到一个非空的span。 那么此时central cache就需要向page cache申请内存了,但在此之前需要先把central cache第0号桶的锁解掉,然后再将page cache的大锁给加上,之后才能向page cache申请内存。 ![image-20230716153811784](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202307161538838.png)  在向page cache申请内存时,由于central cache一次给thread cache8字节对象的上限是512,对应就需要4096字节,所需字节数不足一页就按一页算,所以这里central cache就需要向page cache申请一页的内存块。 ![image-20230716212014843](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202307162120920.png) 但此时page cache的第1个桶以及之后的桶当中都是没有span的,因此page cache需要直接向堆申请一个128页的span。 ![image-20230716212733128](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202307162127184.png) 这里通过监视窗口可以看到,用于管理申请到的128页内存的span信息。 ![image-20230716214747150](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202307162147211.png) 我们可以顺便验证一下,按页向堆申请的内存块的起始地址和页号之间是可以相互转换的。 ![image-20230716214902161](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202307162149247.png) 现在将申请到的128页的span插入到page cache的第128号桶当中,然后再调用一次NewSpan,在这次调用的时候,虽然在1号桶当中没有span,但是在往后找的过程中就一定会在第128号桶找到一个span。 ![image-20230716215044909](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202307162150973.png)  此时我们就可以把这个128页的span拿出来,切分成1页的span和127页的span,将1页的span返回给central cache,而把127页的span挂到page cache的第127号桶即可。 ![image-20230716215128931](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202307162151009.png) 从page cache返回后,就可以把page cache的大锁解掉了,但紧接着还要将获取到的1页的span进行切分,因此这里没有立刻重新加上central cache对应的桶锁。 ![image-20230716215252105](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202307162152154.png)  在进行切分的时候,先通过该span的起始页号得到该span的起始地址,然后通过该span的页数得到该span所管理内存块的总的字节数。 ![image-20230716215438080](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202307162154127.png) 在确定内存块的开始和结束后,就可以将其切分成一个个8字节大小的对象挂到该span的自由链表中了。在调试过程中通过内存监视窗口可以看到,切分出来的每个8字节大小的对象的前四个字节存储的都是下一个8字节对象的起始地址。 ![image-20230716220335924](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202307162203968.png)  当切分结束后再获取central cache第0号桶的桶锁,然后将这个切好的span插入到central cache的第0号桶中,最后再将这个非空的span返回,此时就获取到了一个非空的span。 # threadcache回收内存 对于threadcache来说,还回来的内存挂在自由链表上,但挂的太长无疑也是一种资源的占用。因此,有必要当链表的长度大于中心缓存一次给的长度时,就将这条自由链表内存块还给中心缓存。 因此,我们将这个功能提供给ThreadCache.h中的ThreadCache类,将其命名为: `ListToolLong` ```cpp #pragma once #include"Common.h" class ThreadCache { public: //申请和释放内存对象 ——from 1 to 8 -> sent to 8 void* Allocate(size_t size); void Deallocate(void* ptr, size_t size);//后面这个size是为了后续的哈希映射 //从中心缓存获取对象 void* FetchFromCentralCache(size_t index, size_t size); //释放对象时,链表过长时,回收内存回到中心缓存 void ListToolLong(FreeList& list, size_t size); private: FreeList _freeLists[NFREELIST]; }; // TLS thread local storage static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr; ``` * `ListToolLong`功能:当threadcache释放对象时,一旦链表长度大于中心缓存的一次批量,就将链表内存块中这个批量原封不动的批量回收到中心缓存。当然不一定将这个链表内存一次拿完,只是将中心缓存的批量拿走。 因此下面就接着设计ThreadCache回收内存: * 在ThreadCache类的Deallocate就新增一个回收到中心缓存的功能。 * ListToolLong的实现。 * 对应的中心缓存也需要有对应的接口,用来将threadcache回收的内存对象释放到span跨度才能方便对接,这个接口称之为ReleaseListToSpans ```cpp void ThreadCache::Deallocate(void* ptr, size_t size) { assert(ptr); assert(size <= MAX_BYTES); //找出对应的链表桶,并将对象插入进去 size_t index = SizeClass::Index(size);//算出是第几个桶 _freeLists[index].Push(ptr); //如果链表长度大于中心缓存一次批量给的长度,就回收一段list给中心缓存 if (_freeLists[index].Size() >= _freeLists->MaxSize()) { ThreadCache::ListToolLong(_freeLists[index], size); } } void ThreadCache::ListToolLong(FreeList& list, size_t size) { //取一批内存出来 void* start = nullptr; void* end = nullptr; list.PopRange(start, end, list.MaxSize()); //还给对应的span跨度 CentralCache::GetInstance()->ReleaseListToSpans(start, size); } ``` ```cpp //单例模式 class CentralCache { public: //将一定数量的对象释放到span跨度 void ReleaseListToSpans(void* start, size_t byte_size); }; ``` 这个方法也是我们后面中心缓存回收内存要实现的。 ```cpp void CentralCache::ReleaseListToSpans(void* start, size_t byte_size) { //... } ``` * 下面,就将自由链表继续完善一下,增加记录自由链表上记录对象个数的成员变量。 * 提供一个size接口,返回_size。 * 还有PopRange,表示Pop一段范围的对象,在回收对象时使用,通过输出型参数的方式将start和end对应到位置记录下来。 ![image-20230807150928163](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202308071509257.png) # centralcache回收内存 当thread cache中某个自由链表太长时,会将自由链表当中的这些对象还给central cache中的span。   但是需要注意的是,还给central cache的这些对象不一定都是属于同一个span的。central cache中的每个哈希桶当中可能都不止一个span,因此当我们计算出还回来的对象应该还给central cache的哪一个桶后,还需要知道这些对象到底应该还给这个桶当中的哪一个span。 > 如何根据对象的地址得到对象所在的页号? 首先我们必须理解的是,某个页当中的所有地址除以页的大小都等该页的页号。比如我们这里假设一页的大小是100,那么地址0\~99都属于第0页,它们除以100都等于0,而地址100~199都属于第1页,它们除以100都等于1。 *** > 如何找到一个对象对应的span? 为了解决这个问题,我们可以建立页号和span之间的映射。由于这个映射关系在page cache进行span的合并时也需要用到,因此我们直接将其存放到page cache里面。这时我们就需要在PageCache类当中添加一个映射关系了,这里可以用C++当中unordered_map进行实现,并且添加一个函数接口,用于让central cache获取这里的映射关系。(下面代码中只展示了PageCache类当中新增的成员) ```cpp //单例模式 class PageCache { public: //获取从对象到span的映射 Span* MapObjectToSpan(void* obj); private: std::unordered_map _idSpanMap; }; ``` 每当page cache分配span给central cache时,都需要记录一下页号和span之间的映射关系。此后当thread cache还对象给central cache时,才知道应该具体还给哪一个span。 因此当central cache在调用NewSpan接口向page cache申请k页的span时,page cache在返回这个k页的span给central cache之前,应该建立这k个页号与该span之间的映射关系。 ```cpp //获取一个k页的span Span* PageCache::NewSpan(size_t k) { assert(k > 0 && k < NPAGES); //先检查第k个桶里面有没有span if (!_spanLists[k].Empty()) { Span* kSpan = _spanLists[k].PopFront(); //建立页号与span的映射,方便central cache回收小块内存时查找对应的span for (PAGE_ID i = 0; i < kSpan->_n; i++) { _idSpanMap[kSpan->_pageId + i] = kSpan; } return kSpan; } //检查一下后面的桶里面有没有span,如果有可以将其进行切分 for (size_t i = k + 1; i < NPAGES; i++) { if (!_spanLists[i].Empty()) { Span* nSpan = _spanLists[i].PopFront(); Span* kSpan = new Span; //在nSpan的头部切k页下来 kSpan->_pageId = nSpan->_pageId; kSpan->_n = k; nSpan->_pageId += k; nSpan->_n -= k; //将剩下的挂到对应映射的位置 _spanLists[nSpan->_n].PushFront(nSpan); //建立页号与span的映射,方便central cache回收小块内存时查找对应的span for (PAGE_ID i = 0; i < kSpan->_n; i++) { _idSpanMap[kSpan->_pageId + i] = kSpan; } return kSpan; } } //走到这里说明后面没有大页的span了,这时就向堆申请一个128页的span Span* bigSpan = new Span; void* ptr = SystemAlloc(NPAGES - 1); bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT; bigSpan->_n = NPAGES - 1; _spanLists[bigSpan->_n].PushFront(bigSpan); //尽量避免代码重复,递归调用自己 return NewSpan(k); } ``` 此时我们就可以通过对象的地址找到该对象对应的span了,直接将该对象的地址除以页的大小得到页号,然后在unordered_map当中找到其对应的span即可。 ```cpp //获取从对象到span的映射 Span* PageCache::MapObjectToSpan(void* obj) { PAGE_ID id = (PAGE_ID)obj >> PAGE_SHIFT; //页号 auto ret = _idSpanMap.find(id); if (ret != _idSpanMap.end()) { return ret->second; } else//不可能找不到 { assert(false); return nullptr; } } ``` 注意一下,当我们要通过某个页号查找其对应的span时,该页号与其span之间的映射一定是建立过的,如果此时我们没unordered_map当中找到,则说明我们之前的代码逻辑有问题,因此当没有找到对应的span时可以直接用断言结束程序,以表明程序逻辑出错。 > central cache回收内存  这时当thread cache还对象给central cache时,就可以依次遍历这些对象,将这些对象插入到其对应span的自由链表当中,并且及时更新该span的_useCount计数即可。   在thread cache还对象给central cache的过程中,如果central cache中某个span的_useCount减到0时,说明这个span分配出去的对象全部都还回来了,那么此时就可以将这个span再进一步还给page cache。 ```cpp //将一定数量的对象还给对应的span void CentralCache::ReleaseListToSpans(void* start, size_t size) { size_t index = SizeClass::Index(size); _spanLists[index]._mtx.lock(); //加锁 while (start) { void* next = NextObj(start); //记录下一个 Span* span = PageCache::GetInstance()->MapObjectToSpan(start); //将对象头插到span的自由链表 NextObj(start) = span->_freeList; span->_freeList = start; span->_useCount--; //更新被分配给thread cache的计数 if (span->_useCount == 0) //说明这个span分配出去的对象全部都回来了 { //此时这个span就可以再回收给page cache,page cache可以再尝试去做前后页的合并 _spanLists[index].Erase(span); span->_freeList = nullptr; //自由链表置空 span->_next = nullptr; span->_prev = nullptr; //释放span给page cache时,使用page cache的锁就可以了,这时把桶锁解掉 _spanLists[index]._mtx.unlock(); //解桶锁 PageCache::GetInstance()->_pageMtx.lock(); //加大锁 PageCache::GetInstance()->ReleaseSpanToPageCache(span); PageCache::GetInstance()->_pageMtx.unlock(); //解大锁 _spanLists[index]._mtx.lock(); //加桶锁 } start = next; } _spanLists[index]._mtx.unlock(); //解锁 } ``` 需要注意,如果要把某个span还给page cache,我们需要先将这个span从central cache对应的双链表中移除,然后再将该span的自由链表置空,因为page cache中的span是不需要切分成一个个的小对象的,以及该span的前后指针也都应该置空,因为之后要将其插入到page cache对应的双链表中。但span当中记录的起始页号以及它管理的页数是不能清除的,否则对应内存块就找不到了。   并且在central cache还span给page cache时也存在锁的问题,此时需要先将central cache中对应的桶锁解掉,然后再加上page cache的大锁之后才能进入page cache进行相关操作,当处理完毕回到central cache时,除了将page cache的大锁解掉,还需要立刻获得central cache对应的桶锁,然后将还未还完对象继续还给central cache中对应的span。 # pagecache回收内存 如果central cache中有某个span的_useCount减到0了,那么central cache就需要将这个span还给page cache了。 这个过程看似是非常简单的,page cache只需将还回来的span挂到对应的哈希桶上就行了。但实际为了缓解内存碎片的问题,page cache还需要尝试将还回来的span与其他空闲的span进行合并。 *** * page cache进行前后页的合并 合并的过程可以分为向前合并和向后合并。如果还回来的span的起始页号是num,该span所管理的页数是n。那么在向前合并时,就需要判断第num-1页对应span是否空闲,如果空闲则可以将其进行合并,并且合并后还需要继续向前尝试进行合并,直到不能进行合并为止。而在向后合并时,就需要判断第num+n页对应的span是否空闲,如果空闲则可以将其进行合并,并且合并后还需要继续向后尝试进行合并,直到不能进行合并为止。 ​ 因此page cache在合并span时,是需要通过页号获取到对应的span的,这就是我们要把页号与span之间的映射关系存储到page cache的原因。 ​ 但需要注意的是,当我们通过页号找到其对应的span时,这个span此时可能挂在page cache,也可能挂在central cache。而在合并时我们只能合并挂在page cache的span,因为挂在central cache的span当中的对象正在被其他线程使用。 ​ 可是我们不能通过span结构当中的_useCount成员,来判断某个span到底是在central cache还是在page cache。因为当central cache刚向page cache申请到一个span时,这个span的\_useCount就是等于0的,这时可能当我们正在对该span进行切分的时候,page cache就把这个span拿去进行合并了,这显然是不合理的。 ​ 鉴于此,我们可以在span结构中再增加一个_isUse成员,用于标记这个span是否正在被使用,而当一个span结构被创建时我们默认该span是没有被使用的。 ```cpp //管理以页为单位的大块内存 struct Span { PAGE_ID _pageId = 0; //大块内存起始页的页号 size_t _n = 0; //页的数量 Span* _next = nullptr; //双链表结构 Span* _prev = nullptr; size_t _useCount = 0; //切好的小块内存,被分配给thread cache的计数 void* _freeList = nullptr; //切好的小块内存的自由链表 bool _isUse = false; //是否在被使用 }; ``` 当central cache向page cache申请到一个span时,需要立即将该span的_isUse改为true。 ```cpp span->_isUse = true; ``` 当central cache将某个span还给page cache时,也就需要将该span的_isUse改成false。 ```cpp span->_isUse = false; ``` 由于在合并page cache当中的span时,需要通过页号找到其对应的span,而一个span是在被分配给central cache时,才建立的各个页号与span之间的映射关系,因此page cache当中的span也需要建立页号与span之间的映射关系。   与central cache中的span不同的是,在page cache中,只需建立一个span的首尾页号与该span之间的映射关系。因为当一个span在尝试进行合并时,如果是往前合并,那么只需要通过一个span的尾页找到这个span,如果是向后合并,那么只需要通过一个span的首页找到这个span。也就是说,在进行合并时我们只需要用到span与其首尾页之间的映射关系就够了。   因此当我们申请k页的span时,如果是将n页的span切成了一个k页的span和一个n-k页的span,我们除了需要建立k页span中每个页与该span之间的映射关系之外,还需要建立剩下的n-k页的span与其首尾页之间的映射关系。 ```cpp //获取一个k页的span Span* PageCache::NewSpan(size_t k) { assert(k > 0 && k < NPAGES); //先检查第k个桶里面有没有span if (!_spanLists[k].Empty()) { Span* kSpan = _spanLists[k].PopFront(); //建立页号与span的映射,方便central cache回收小块内存时查找对应的span for (PAGE_ID i = 0; i < kSpan->_n; i++) { _idSpanMap[kSpan->_pageId + i] = kSpan; } return kSpan; } //检查一下后面的桶里面有没有span,如果有可以将其进行切分 for (size_t i = k + 1; i < NPAGES; i++) { if (!_spanLists[i].Empty()) { Span* nSpan = _spanLists[i].PopFront(); Span* kSpan = new Span; //在nSpan的头部切k页下来 kSpan->_pageId = nSpan->_pageId; kSpan->_n = k; nSpan->_pageId += k; nSpan->_n -= k; //将剩下的挂到对应映射的位置 _spanLists[nSpan->_n].PushFront(nSpan); //存储nSpan的首尾页号与nSpan之间的映射,方便page cache合并span时进行前后页的查找 _idSpanMap[nSpan->_pageId] = nSpan; _idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan; //建立页号与span的映射,方便central cache回收小块内存时查找对应的span for (PAGE_ID i = 0; i < kSpan->_n; i++) { _idSpanMap[kSpan->_pageId + i] = kSpan; } return kSpan; } } //走到这里说明后面没有大页的span了,这时就向堆申请一个128页的span Span* bigSpan = new Span; void* ptr = SystemAlloc(NPAGES - 1); bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT; bigSpan->_n = NPAGES - 1; _spanLists[bigSpan->_n].PushFront(bigSpan); //尽量避免代码重复,递归调用自己 return NewSpan(k); } ``` 此时page cache当中的span就都与其首尾页之间建立了映射关系,现在我们就可以进行span的合并了,其合并逻辑如下: ```cpp //释放空闲的span回到PageCache,并合并相邻的span void PageCache::ReleaseSpanToPageCache(Span* span) { //对span的前后页,尝试进行合并,缓解内存碎片问题 //1、向前合并 while (1) { PAGE_ID prevId = span->_pageId - 1; auto ret = _idSpanMap.find(prevId); //前面的页号没有(还未向系统申请),停止向前合并 if (ret == _idSpanMap.end()) { break; } //前面的页号对应的span正在被使用,停止向前合并 Span* prevSpan = ret->second; if (prevSpan->_isUse == true) { break; } //合并出超过128页的span无法进行管理,停止向前合并 if (prevSpan->_n + span->_n > NPAGES - 1) { break; } //进行向前合并 span->_pageId = prevSpan->_pageId; span->_n += prevSpan->_n; //将prevSpan从对应的双链表中移除 _spanLists[prevSpan->_n].Erase(prevSpan); delete prevSpan; } //2、向后合并 while (1) { PAGE_ID nextId = span->_pageId + span->_n; auto ret = _idSpanMap.find(nextId); //后面的页号没有(还未向系统申请),停止向后合并 if (ret == _idSpanMap.end()) { break; } //后面的页号对应的span正在被使用,停止向后合并 Span* nextSpan = ret->second; if (nextSpan->_isUse == true) { break; } //合并出超过128页的span无法进行管理,停止向后合并 if (nextSpan->_n + span->_n > NPAGES - 1) { break; } //进行向后合并 span->_n += nextSpan->_n; //将nextSpan从对应的双链表中移除 _spanLists[nextSpan->_n].Erase(nextSpan); delete nextSpan; } //将合并后的span挂到对应的双链表当中 _spanLists[span->_n].PushFront(span); //建立该span与其首尾页的映射 _idSpanMap[span->_pageId] = span; _idSpanMap[span->_pageId + span->_n - 1] = span; //将该span设置为未被使用的状态 span->_isUse = false; } ``` 需要注意的是,在向前或向后进行合并的过程中: 1. 如果没有通过页号获取到其对应的span,说明对应到该页的内存块还未申请,此时需要停止合并。 2. 如果通过页号获取到了其对应的span,但该span处于被使用的状态,那我们也必须停止合并。 3. 如果合并后大于128页则不能进行本次合并,因为page cache无法对大于128页的span进行管理。 4. 在合并span时,由于这个span是在page cache的某个哈希桶的双链表当中的,因此在合并后需要将其从对应的双链表中移除,然后再将这个被合并了的span结构进行delete。   除此之外,在合并结束后,除了将合并后的span挂到page cache对应哈希桶的双链表当中,还需要建立该span与其首位页之间的映射关系,便于此后合并出更大的span。 # 释放内存过程连调 > ConcurrentFree函数 上面将thread cache,central cache以及page cache的释放流程都处理完了并结合在一起,用于释放内存块,释放内存块时每个线程通过自己的thread cache对象,调用thread cache释放内存的接口。 ```cpp static void ConcurrentFree(void* ptr, size_t size) { assert(pTLSThreadCache); pTLSThreadCache->Deallocate(ptr, size); } ``` > 释放内存过程联调测试  现在我们将这三个对象再进行释放,看看这其中的释放流程是如何进行的。 ```cpp void TestConcurrentAlloc1() { void* p1 = ConcurrentAlloc(6); void* p2 = ConcurrentAlloc(8); void* p3 = ConcurrentAlloc(1); cout << p1 << endl; cout << p2 << endl; cout << p3 << endl; ConcurrentFree(p1, 6); ConcurrentFree(p2, 8); ConcurrentFree(p3, 1); } ```  首先,这3次申请和释放的对象大小进行对齐后都是8字节,因此对应操作的就是thread cache和central cache的第0号桶,以及page cache的第1号桶。  由于第三次对象申请时,刚好将thread cache第0号桶当中仅剩的一个对象拿走了,因此在三次对象申请后thread cache的第0号桶当中是没有对象的。 通过监视窗口可以看到,此时thread cache第0号桶中自由链表的_maxSize已经慢增长到了3,而当我们释放完第一个对象后,该自由链表当中对象的个数只有一个,因此不会将该自由链表当中的对象进一步还给central cache。 ![image-20230809131120682](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202308091311752.png)  当第二个对象释放给thread cache的第0号桶后,该桶对应自由链表当中对象的个数变成了2,也是不会进行ListTooLong操作的。 ![image-20230809131217176](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202308091312284.png) 直到第3个对象释放给thread cache的第0号桶时,此时该自由链表的\_size的值变为3,与_maxSize的值相等,现在thread cache就需要将对象给central cache了。 ![image-20230809131319292](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202308091313368.png)  thread cache先是将第0号桶当中的对象弹出maxSize个,在这里实际上就是全部弹出,此时该自由链表_size的值变为0,然后继续调用central cache当中的ReleaseListToSpans函数,将这三个对象还给central cache当中对应的span。 ![image-20230809131424281](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202308091314369.png)  在进入central cache的第0号桶还对象之前,先把第0号桶对应的桶锁加上,然后通过查page cache中的映射表找到其对应的span,最后将这个对象头插到该span的自由链表中,并将该span的\_useCount进行--。当第一个对象还给其对应的span时,可以看到该span的_useCount减到了2。 ![image-20230809131554258](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202308091315400.png) 而由于我们只进行了三次对象申请,并且这些对象大小对齐后大小都是8字节,因此我们申请的这三个对象实际都是同一个span切分出来的。当我们将这三个对象都还给这个span时,该span的_useCount就减为了0。 ![image-20230809131640766](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202308091316858.png) 现在central cache就需要将这个span进一步还给page cache,而在将该span交给page cache之前,会将该span的自由链表以及前后指针都置空。并且在进入page cache之前会先将central cache第0号桶的桶锁解掉,然后再加上page cache的大锁,之后才能进入page cache进行相关操作。 ![image-20230809132842011](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202308091328145.png) 由于这个一页的span是从128页的span的头部切下来的,在向前合并时由于前面的页还未向系统申请,因此在查映射关系时是无法找到的,此时直接停止了向前合并。 > (说明一下:由于下面是重新另外进行的一次调试,因此监视窗口显示的span的起始页号与之前的不同,实际应该与上面一致) ![image-20230809132947168](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202308091329251.png)   而在向后合并时,由于page cache没有将该页后面的页分配给central cache,因此在向后合并时肯定能够找到一个127页的span进行合并。合并后就变成了一个128页的span,这时我们将原来127页的span从第127号桶删除,然后还需要将该127页的span结构进行delete,因为它管理的127页已经与1页的span进行合并了,不再需要它来管理了。 ![image-20230809133025882](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202308091330954.png)   紧接着将这个128页的span插入到第128号桶,然后建立该span与其首尾页的映射,便于下次被用于合并,最后再将该span的状态设置为未被使用的状态即可。 ![image-20230809133110605](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202308091331666.png)   当从page cache回来后,除了将page cache的大锁解掉,还需要立刻加上central cache中对应的桶锁,然后继续将对象还给central cache中的span,但此时实际上是还完了,因此再将central cache的桶锁解掉就行了。 ![image-20230809133142185](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202308091331254.png)   至此我们便完成了这三个对象的申请和释放流程。 # 大于256KB的大块内存申请问题 > 申请过程 之前说到,每个线程的thread cache是用于申请小于等于256KB的内存的,而对于大于256KB的内存,我们可以考虑直接向page cache申请,但page cache中最大的页也就只有128页,因此如果是大于128页的内存申请,就只能直接向堆申请了。 | 申请内存的大小 | 申请方式 | | ------------------------------------------------------------ | ------------------ | | x≤256KB(32页) | 向thread cache申请 | | 32页 MAX_BYTES) //大于256KB的内存申请 { //计算出对齐后需要申请的页数 size_t alignSize = SizeClass::RoundUp(size); size_t kPage = alignSize >> PAGE_SHIFT; //向page cache申请kPage页的span PageCache::GetInstance()->_pageMtx.lock(); Span* span = PageCache::GetInstance()->NewSpan(kPage); PageCache::GetInstance()->_pageMtx.unlock(); void* ptr = (void*)(span->_pageId << PAGE_SHIFT); return ptr; } else { //通过TLS,每个线程无锁的获取自己专属的ThreadCache对象 if (pTLSThreadCache == nullptr) { pTLSThreadCache = new ThreadCache; } cout << std::this_thread::get_id() << ":" << pTLSThreadCache << endl; return pTLSThreadCache->Allocate(size); } } ``` 也就是说,申请大于256KB的内存时,会直接调用page cache当中的NewSpan函数进行申请,因此这里我们需要再对NewSpan函数进行改造,当需要申请的内存页数大于128页时,就直接向堆申请对应页数的内存块。而如果申请的内存页数是小于128页的,那就在page cache中进行申请,因此当申请大于256KB的内存调用NewSpan函数时也是需要加锁的,因为我们可能是在page cache中进行申请的。 ```cpp //获取一个k页的span Span* PageCache::NewSpan(size_t k) { assert(k > 0); if (k > NPAGES - 1) //大于128页直接找堆申请 { void* ptr = SystemAlloc(k); Span* span = new Span; span->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT; span->_n = k; //建立页号与span之间的映射 _idSpanMap[span->_pageId] = span; return span; } //先检查第k个桶里面有没有span if (!_spanLists[k].Empty()) { Span* kSpan = _spanLists[k].PopFront(); //建立页号与span的映射,方便central cache回收小块内存时查找对应的span for (PAGE_ID i = 0; i < kSpan->_n; i++) { _idSpanMap[kSpan->_pageId + i] = kSpan; } return kSpan; } //检查一下后面的桶里面有没有span,如果有可以将其进行切分 for (size_t i = k + 1; i < NPAGES; i++) { if (!_spanLists[i].Empty()) { Span* nSpan = _spanLists[i].PopFront(); Span* kSpan = new Span; //在nSpan的头部切k页下来 kSpan->_pageId = nSpan->_pageId; kSpan->_n = k; nSpan->_pageId += k; nSpan->_n -= k; //将剩下的挂到对应映射的位置 _spanLists[nSpan->_n].PushFront(nSpan); //存储nSpan的首尾页号与nSpan之间的映射,方便page cache合并span时进行前后页的查找 _idSpanMap[nSpan->_pageId] = nSpan; _idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan; //建立页号与span的映射,方便central cache回收小块内存时查找对应的span for (PAGE_ID i = 0; i < kSpan->_n; i++) { _idSpanMap[kSpan->_pageId + i] = kSpan; } return kSpan; } } //走到这里说明后面没有大页的span了,这时就向堆申请一个128页的span Span* bigSpan = new Span; void* ptr = SystemAlloc(NPAGES - 1); bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT; bigSpan->_n = NPAGES - 1; _spanLists[bigSpan->_n].PushFront(bigSpan); //尽量避免代码重复,递归调用自己 return NewSpan(k); } ``` > 释放过程 当释放对象时,我们需要判断释放对象的大小: | 释放内存的大小 | 释放方式 | | -------------- | ------------------ | | x≤256KB(32页) | 释放给thread cache | | 32页 MAX_BYTES) //大于256KB的内存释放 { Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr); PageCache::GetInstance()->_pageMtx.lock(); PageCache::GetInstance()->ReleaseSpanToPageCache(span); PageCache::GetInstance()->_pageMtx.unlock(); } else { assert(pTLSThreadCache); pTLSThreadCache->Deallocate(ptr, size); } } ``` 因此page cache在回收span时也需要进行判断,如果该span的大小是小于等于128页的,那么直接还给page cache进行了,page cache会尝试对其进行合并。而如果该span的大小是大于128页的,那么说明该span是直接向堆申请的,我们直接将这块内存释放给堆,然后将这个span结构进行delete就行了。 ```cpp //释放空闲的span回到PageCache,并合并相邻的span void PageCache::ReleaseSpanToPageCache(Span* span) { if (span->_n > NPAGES - 1) //大于128页直接释放给堆 { void* ptr = (void*)(span->_pageId << PAGE_SHIFT); SystemFree(ptr); delete span; return; } //对span的前后页,尝试进行合并,缓解内存碎片问题 //1、向前合并 while (1) { PAGE_ID prevId = span->_pageId - 1; auto ret = _idSpanMap.find(prevId); //前面的页号没有(还未向系统申请),停止向前合并 if (ret == _idSpanMap.end()) { break; } //前面的页号对应的span正在被使用,停止向前合并 Span* prevSpan = ret->second; if (prevSpan->_isUse == true) { break; } //合并出超过128页的span无法进行管理,停止向前合并 if (prevSpan->_n + span->_n > NPAGES - 1) { break; } //进行向前合并 span->_pageId = prevSpan->_pageId; span->_n += prevSpan->_n; //将prevSpan从对应的双链表中移除 _spanLists[prevSpan->_n].Erase(prevSpan); delete prevSpan; } //2、向后合并 while (1) { PAGE_ID nextId = span->_pageId + span->_n; auto ret = _idSpanMap.find(nextId); //后面的页号没有(还未向系统申请),停止向后合并 if (ret == _idSpanMap.end()) { break; } //后面的页号对应的span正在被使用,停止向后合并 Span* nextSpan = ret->second; if (nextSpan->_isUse == true) { break; } //合并出超过128页的span无法进行管理,停止向后合并 if (nextSpan->_n + span->_n > NPAGES - 1) { break; } //进行向后合并 span->_n += nextSpan->_n; //将nextSpan从对应的双链表中移除 _spanLists[nextSpan->_n].Erase(nextSpan); delete nextSpan; } //将合并后的span挂到对应的双链表当中 _spanLists[span->_n].PushFront(span); //建立该span与其首尾页的映射 _idSpanMap[span->_pageId] = span; _idSpanMap[span->_pageId + span->_n - 1] = span; //将该span设置为未被使用的状态 span->_isUse = false; } ``` 说明一下,直接向堆申请内存时我们调用的接口是VirtualAlloc,与之对应的将内存释放给堆的接口叫做VirtualFree,而Linux下的brk和mmap对应的释放接口叫做sbrk和unmmap。此时我们也可以将这些释放接口封装成一个叫做SystemFree的接口,当我们需要将内存释放给堆时直接调用SystemFree即可。 ```cpp //直接将内存还给堆 inline static void SystemFree(void* ptr) { #ifdef _WIN32 VirtualFree(ptr, 0, MEM_RELEASE); #else //linux下sbrk unmmap等 #endif } ``` > 简单测试 下面我们对大于256KB的申请释放流程进行简单的测试: ```cpp //找page cache申请 void* p1 = ConcurrentAlloc(257 * 1024); //257KB ConcurrentFree(p1, 257 * 1024); //找堆申请 void* p2 = ConcurrentAlloc(129 * 8 * 1024); //129页 ConcurrentFree(p2, 129 * 8 * 1024); ```  当申请257KB的内存时,由于257KB的内存按页向上对齐后是33页,并没有大于128页,因此不会直接向堆进行申请,会向page cache申请内存,但此时page cache当中实际是没有内存的,最终page cache就会向堆申请一个128页的span,将其切分成33页的span和95页的span,并将33页的span进行返回。 ![image-20230813142632232](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202308131426474.png) 而在释放内存时,由于该对象的大小大于了256KB,因此不会将其还给thread cache,而是直接调用的page cache当中的释放接口。 ![image-20230813142742823](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202308131427899.png) 由于该对象的大小是33页,不大于128页,因此page cache也不会直接将该对象还给堆,而是尝试对其进行合并,最终就会把这个33页的span和之前剩下的95页的span进行合并,最终将合并后的128页的span挂到第128号桶中。 ![image-20230813142904914](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202308131429989.png) 当申请129页的内存时,由于是大于256KB的,于是还是调用的page cache对应的申请接口,但此时申请的内存同时也大于128页,因此会直接向堆申请。在申请后还会建立该span与其起始页号之间的映射,便于释放时可以通过页号找到该span。 ![image-20230813143018159](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202308131430235.png) 在释放内存时,通过对象的地址找到其对应的span,从span结构中得知释放内存的大小大于128页,于是会将该内存直接还给堆。 ![image-20230813143123649](https://newcfy.oss-cn-beijing.aliyuncs.com/img2/202308131431760.png) # 使用定长内存池配合脱离使用new tcmalloc是要在高并发场景下替代malloc进行内存申请的,因此tcmalloc在实现的时,其内部是不能调用malloc函数的,我们当前的代码中存在通过new获取到的内存,而new在底层实际上就是封装了malloc。   为了完全脱离掉malloc函数,此时我们之前实现的定长内存池就起作用了,代码中使用new时基本都是为Span结构的对象申请空间,而span对象基本都是在page cache层创建的,因此我们可以在PageCache类当中定义一个_spanPool,用于span对象的申请和释放。 ```cpp //单例模式 class PageCache { public: //... private: ObjectPool _spanPool; }; ``` 然后将代码中使用new的地方替换为调用定长内存池当中的New函数,将代码中使用delete的地方替换为调用定长内存池当中的Delete函数。 ```cpp //申请span对象 Span* span = _spanPool.New(); //释放span对象 _spanPool.Delete(span); ``` 注意,当使用定长内存池当中的New函数申请Span对象时,New函数通过定位new也是对Span对象进行了初始化的。 ***  此外,每个线程第一次申请内存时都会创建其专属的thread cache,而这个thread cache目前也是new出来的,我们也需要对其进行替换。 ```cpp //通过TLS,每个线程无锁的获取自己专属的ThreadCache对象 if (pTLSThreadCache == nullptr) { static std::mutex tcMtx; static ObjectPool tcPool; tcMtx.lock(); pTLSThreadCache = tcPool.New(); tcMtx.unlock(); } ```  这里我们将用于申请ThreadCache类对象的定长内存池定义为静态的,保持全局只有一个,让所有线程创建自己的thread cache时,都在个定长内存池中申请内存就行了。   但注意在从该定长内存池中申请内存时需要加锁,防止多个线程同时申请自己的ThreadCache对象而导致线程安全问题。 *** 最后在SpanList的构造函数中也用到了new,因为SpanList是带头循环双向链表,所以在构造期间我们需要申请一个span对象作为双链表的头结点。 ```cpp //带头双向循环链表 class SpanList { public: SpanList() { _head = _spanPool.New(); _head->_next = _head; _head->_prev = _head; } private: Span* _head; static ObjectPool _spanPool; }; ``` 由于每个span双链表只需要一个头结点,因此将这个定长内存池定义为静态的,保持全局只有一个,让所有span双链表在申请头结点时,都在一个定长内存池中申请内存就行了。 *** # 释放对象时优化为不传对象大小  当我们使用malloc函数申请内存时,需要指明申请内存的大小;而当我们使用free函数释放内存时,只需要传入指向这块内存的指针即可。   而我们目前实现的内存池,在释放对象时除了需要传入指向该对象的指针,还需要传入该对象的大小。 **原因如下:** - 如果释放的是大于256KB的对象,需要根据对象的大小来判断这块内存到底应该还给page cache,还是应该直接还给堆。 - 如果释放的是小于等于256KB的对象,需要根据对象的大小计算出应该还给thread cache的哪一个哈希桶。 如果我们也想做到,在释放对象时不用传入对象的大小,那么我们就需要建立对象地址与对象大小之间的映射。由于现在可以通过对象的地址找到其对应的span,而span的自由链表中挂的都是相同大小的对象。  因此我们可以在Span结构中再增加一个_objSize成员,该成员代表着这个span管理的内存块被切成的一个个对象的大小。 ```cpp //管理以页为单位的大块内存 struct Span { PAGE_ID _pageId = 0; //大块内存起始页的页号 size_t _n = 0; //页的数量 Span* _next = nullptr; //双链表结构 Span* _prev = nullptr; size_t _objSize = 0; //切好的小对象的大小 size_t _useCount = 0; //切好的小块内存,被分配给thread cache的计数 void* _freeList = nullptr; //切好的小块内存的自由链表 bool _isUse = false; //是否在被使用 }; ```   而所有的span都是从page cache中拿出来的,因此每当我们调用NewSpan获取到一个k页的span时,就应该将这个span的_objSize保存下来。 ```cpp Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size)); span->_objSize = size; ``` 代码中有两处,一处是在central cache中获取非空span时,如果central cache对应的桶中没有非空的span,此时会调用NewSpan获取一个k页的span;另一处是当申请大于256KB内存时,会直接调用NewSpan获取一个k页的span。  此时当我们释放对象时,就可以直接从对象的span中获取到该对象的大小,准确来说获取到的是对齐以后的大小。 ```cpp static void ConcurrentFree(void* ptr) { Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr); size_t size = span->_objSize; if (size > MAX_BYTES) //大于256KB的内存释放 { PageCache::GetInstance()->_pageMtx.lock(); PageCache::GetInstance()->ReleaseSpanToPageCache(span); PageCache::GetInstance()->_pageMtx.unlock(); } else { assert(pTLSThreadCache); pTLSThreadCache->Deallocate(ptr, size); } } ``` > 读取映射关系时的加锁问题 我们将页号与span之间的映射关系是存储在PageCache类当中的,当我们访问这个映射关系时是需要加锁的,因为STL容器是不保证线程安全的。   对于当前代码来说,如果我们此时正在page cache进行相关操作,那么访问这个映射关系是安全的,因为当进入page cache之前是需要加锁的,因此可以保证此时只有一个线程在进行访问。   但如果我们是在central cache访问这个映射关系,或是在调用ConcurrentFree函数释放内存时访问这个映射关系,那么就存在线程安全的问题。因为此时可能其他线程正在page cache当中进行某些操作,并且该线程此时可能也在访问这个映射关系,因此当我们在page cache外部访问这个映射关系时是需要加锁的。   实际就是在调用page cache对外提供访问映射关系的函数时需要加锁,这里我们可以考虑使用C++当中的unique_lock,当然你也可以用普通的锁。 ```cpp //获取从对象到span的映射 Span* PageCache::MapObjectToSpan(void* obj) { PAGE_ID id = (PAGE_ID)obj >> PAGE_SHIFT; //页号 std::unique_lock lock(_pageMtx); //构造时加锁,析构时自动解锁 auto ret = _idSpanMap.find(id); if (ret != _idSpanMap.end()) { return ret->second; } else { assert(false); return nullptr; } } ``` # 多线程环境下对比malloc测试 之前我们只是对代码进行了一些基础的单元测试,下面我们在多线程场景下对比malloc进行测试。 ```cpp void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds) { std::vector vthread(nworks); std::atomic malloc_costtime = 0; std::atomic free_costtime = 0; for (size_t k = 0; k < nworks; ++k) { vthread[k] = std::thread([&, k]() { std::vector v; v.reserve(ntimes); for (size_t j = 0; j < rounds; ++j) { size_t begin1 = clock(); for (size_t i = 0; i < ntimes; i++) { v.push_back(malloc(16)); //v.push_back(malloc((16 + i) % 8192 + 1)); } size_t end1 = clock(); size_t begin2 = clock(); for (size_t i = 0; i < ntimes; i++) { free(v[i]); } size_t end2 = clock(); v.clear(); malloc_costtime += (end1 - begin1); free_costtime += (end2 - begin2); } }); } for (auto& t : vthread) { t.join(); } printf("%u个线程并发执行%u轮次,每轮次malloc %u次: 花费:%u ms\n", nworks, rounds, ntimes, malloc_costtime); printf("%u个线程并发执行%u轮次,每轮次free %u次: 花费:%u ms\n", nworks, rounds, ntimes, free_costtime); printf("%u个线程并发malloc&free %u次,总计花费:%u ms\n", nworks, nworks*rounds*ntimes, malloc_costtime + free_costtime); } void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds) { std::vector vthread(nworks); std::atomic malloc_costtime = 0; std::atomic free_costtime = 0; for (size_t k = 0; k < nworks; ++k) { vthread[k] = std::thread([&]() { std::vector v; v.reserve(ntimes); for (size_t j = 0; j < rounds; ++j) { size_t begin1 = clock(); for (size_t i = 0; i < ntimes; i++) { v.push_back(ConcurrentAlloc(16)); //v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1)); } size_t end1 = clock(); size_t begin2 = clock(); for (size_t i = 0; i < ntimes; i++) { ConcurrentFree(v[i]); } size_t end2 = clock(); v.clear(); malloc_costtime += (end1 - begin1); free_costtime += (end2 - begin2); } }); } for (auto& t : vthread) { t.join(); } printf("%u个线程并发执行%u轮次,每轮次concurrent alloc %u次: 花费:%u ms\n", nworks, rounds, ntimes, malloc_costtime); printf("%u个线程并发执行%u轮次,每轮次concurrent dealloc %u次: 花费:%u ms\n", nworks, rounds, ntimes, free_costtime); printf("%u个线程并发concurrent alloc&dealloc %u次,总计花费:%u ms\n", nworks, nworks*rounds*ntimes, malloc_costtime + free_costtime); } int main() { size_t n = 10000; cout << "==========================================================" << endl; BenchmarkConcurrentMalloc(n, 4, 10); cout << endl << endl; BenchmarkMalloc(n, 4, 10); cout << "==========================================================" << endl; return 0; } ``` 其中测试函数各个参数的含义如下: - ntimes:单轮次申请和释放内存的次数。 - nworks:线程数。 - rounds:轮次。  在测试函数中,我们通过clock函数分别获取到每轮次申请和释放所花费的时间,然后将其对应累加到malloc_costtime和free_costtime上。最后我们就得到了,nworks个线程跑rounds轮,每轮申请和释放ntimes次,这个过程申请所消耗的时间、释放所消耗的时间、申请和释放总共消耗的时间。   注意,我们创建线程时让线程执行的是lambda表达式,而我们这里在使用lambda表达式时,以值传递的方式捕捉了变量k,以引用传递的方式捕捉了其他父作用域中的变量,因此我们可以将各个线程消耗的时间累加到一起。   我们将所有线程申请内存消耗的时间都累加到malloc_costtime上, 将释放内存消耗的时间都累加到free_costtime上,此时malloc_costtime和free_costtime可能被多个线程同时进行累加操作的,所以存在线程安全的问题。鉴于此,我们在定义这两个变量时使用了atomic类模板,这时对它们的操作就是原子操作了。 *** > 上一段以及后部分借鉴如下博客: [高并发内存池_](https://blog.csdn.net/chenlong_cxy/article/details/122819562)