Linux_libevent_Reactor模式

内容

  1. IO框架库
  2. Reactor模式的IO框架库包含哪些组件
  3. libevent是一个轻量级的I/O框架库。

I/O框架库

I/O框架库以库函数的形式,封装了较为底层的系统调用。
各种I/O框架库的实现原理基本相似,要么以Reactor模式实现,要么以Proactor模式实现,要么同时两种模式实现。

Reactor模式

Reactor模式要求主线程(IO处理单元)只负责监听文件描述符上是否有事件发生,有的话就立即向工作线程(逻辑单元)通知该事件。除此之外,主线程不做其他实质性的工作。即读写数据、接受新的连接、处理客户请求均在工作线程中完成。

工作流程

使用同步I/O模型(以epoll_wait为例)实现的Reactor模式的工作流程是:

  1. 主线程往epoll内核事件表中注册socket上的读就绪事件
  2. 主线程调用epoll_wait等待socket上有数据可读。
  3. socket上有数据可读时,epoll_wait通知主线程。主线程则将socket可读事件放入请求队列
  4. 睡眠在请求队列上的某个工作线程被唤醒,它从socket读取数据,并处理客户请求,然后工作线程往epoll内核事件表中注册该socket上的写就绪事件
  5. 主线程调用epoll_wait等待socket可写。
  6. socket可写时,epoll_wait通知主线程主线程将socket可写事件放入请求队列
  7. 睡眠在请求队列上的某个工作线程被唤醒,它往socket上写入服务器处理客户请求的结果。

总结:主线程注册读事件,可读时,主线程放入请求队列;工作线程读数据,处理请求,工作线程注册写事件;可写时,主线程放入请求队列;工作线程写数据。

image-20220524104612105

组件框架

基于Reactor模式的I/O框架库包含如下几个组件:

  1. 句柄(Handle)
  2. 事件多路分发器(Event Demultiplexer)
  3. 事件处理器(Event Handler)和具体的事件处理器(Concrete EventHandler)
  4. Reactor。

这些组件的关系如下图所示。

image-20220423142048988

  • 句柄
    • 说白了就是文件描述符,句柄在windows上某个资源的id,因为libevent库是跨平台的,所以叫法容易混用。
  • 事件多路分发器
    • 事件的到来是随机的、异步的。比如我们无法预知程序何时收到一个客户连接请求,又亦或收到一个暂停信号,所以程序需要循环地等待判断有无事件产生,这就是事件循环
    • 在事件循环中,等待事件一般使用I/O复用技术来实现。I/O框架库一般将系统支持的各种I/O复用系统调用封装成统一的接口,称为事件多路分发器。因此事件多路分发器可以理解为封装了IO复用,提供了一个更便于使用的接口。
    • 事件多路分发器的demultiplex方法是等待事件的核心函数,其内部调用的是select、poll、epoll_wait等函数。
    • 事件多路分发器还需要实现register_eventremove_event方法,以供调用者给事件多路分发器中添加事件和从中删除事件。
  • 事件处理器和具体事件处理器
    • 事件处理器执行事件对应的业务逻辑。它通常包含一个或多个handle_event回调函数,这些回调函数在事件循环中被执行。
    • I/O框架库提供的事件处理器通常是一个接口,用户需要继承它来实现自己的事件处理器,即具体事件处理器。因此,事件处理器中的回调函数一般被声明为虚函数,以支持用户的扩展
    • 此外,事件处理器一般还提供一个get_handle方法,它返回与该事件处理器关联的句柄。那么,事件处理器和句柄有什么关系?当事件多路分发器检测到有事件发生时,它是通过句柄来通知应用程序的。因此,我们必须将事件处理器和句柄绑定,才能在事件发生时获取到正确的事件处理器。
  • Reactor是I/O框架库的核心。它提供的几个主要方法是:
    • handle_events,该方法执行事件循环。重复过程:等待事件,然后依次处理所有就绪事件对应的事件处理器。
    • register_handler,该方法调用事件多路分发器的register_event方法来给事件多路分发器中注册一个事件。
    • remove_handler,该方法调用事件多路分发器的remove_event方法来删除事件多路分发器中的一个事件。

libevent

libevent支持的事件类型

1
2
3
4
5
6
#define EV_TIMEOUT		0x01	/* 定时事件 */
#define EV_READ 0x02 /* 可读事件 */
#define EV_WRITE 0x04 /* 可写事件 */
#define EV_SIGNAL 0x08 /* 信号事件 */
#define EV_PERSIST 0x10 /* 永久事件 */
#define EV_ET 0x20 /*边沿触发事件,需要IO复用系统调用支持,如epoll*/

编程流程

  1. 定义、创建框架示例
  2. 向框架示例注册、注销事件:指定具体哪个base、哪个描述符,哪种事件,绑定回调函数参数
    1. 有哪些事件:IO事件(fdEV_READfun_cb)、信号事件(sigEV_SIGNALsig_cb)、定时器事件(-1EV_TIMEOUTtv_cb
  3. 开启事件循环,实际上就是框架底层调用select/poll/epoll
  4. 事件发生之后,调用回调函数如fun_cb

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#include<sys/signal.h>	//SIGINT
#include<event.h>
void signal_cb(int fd, short event, void * argc)
{
struct event_base* base = (event_base*)argc;
struct timeval delay = {2, 0};
printf("Caught an interrupt signal; exiting cleanly in 2 seconds...\n");
event_base_loopexit(base, &delay);
}
void timeout_cb(int fd, short event, void * argc)
{
printf("timeout\n");
}
int main()
{
struct event_base* base = event_init();
struct event* signal_event = evsignal_new(base, SIGINT, signal_cb, base);
event_add(signal_event, NULL);

timeval tv = {1, 0};
struct event* timeout_event = evtimer_new(base, timeout_cb, NULL);
event_add(timeout_event, &tv);

event_base_dispatch(base);

event_free(timeout_event);
event_free(signal_event);
event_base_free(base);
}

上面的代码描述了使用Libevent库的主要逻辑:

  1. 调用event_init函数创建event_base对象。一个event_base相当于一个Reactor实例。
  2. 创建具体的事件处理器,并设置他们所从属的Reactor实例。本例中的**evsignal_new用于创建信号事件处理器,evtimer_new**用于创建定时事件处理器,它们是定义在/include/event2/event.h文件中的宏,代码如下。其中evtimer_new的原型event_new的第二个参数默认赋-1,第三个参数默认赋0
1
2
3
4
#define evsignal_new(b, x, cb, arg) \
event_new((b), (x), EV_SIGNAL|EV_PERSIST, (cb), (arg))
#define evtimer_new(b, cb, arg) \
event_new((b), -1, 0, (cb), (arg))
  1. 回调函数的格式需要统一:void fun_cb(int fd, short event, void* argc)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#include<sys/signal.h>	//SIGINT
#include<event.h>
void sig_fun(int fd, short event, void * argc)
{
printf("sig=%d\n", fd);
}
void timeout_fun(int fd, short event, void * argc)
{
if(ev & EV_TIMEOUT)
{
printf("timeout\n");
}
}
int main()
{
struct event_base* base = event_init();
assert(base != NULL);

struct event* sig_ev = evsignal_new(base, SIGINT, sig_fun, NULL);
event_add(sig_ev, NULL);

struct timeval tv = {5, 0};
//定时器不需要fd描述符、也不需要信号代号。
//所以,相应地: evtimer_new对应的timeout_fun回调函数中的fd参数默认赋-1
// evtimer_new对应的event_new函数的信号代号参数默认赋-1
struct event* timeout_ev = evtimer_new(base, timeout_fun, NULL);
event_add(timeout_ev, &tv);

event_base_dispatch(base); //开启事件循环

event_free(sig_ev);
event_free(timeout_ev);
event_base_free(base);
}

ctrl+c终止进程的信号代号是2,但是信号事件并没有fd描述符,而是巧妙地复用了fd,写入信号代号。

编译测试

gcc编译链接命令后需要加后缀-levent

MainServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class TcpServer;
class ThreadPool;
class Reactor;

class MainServer
{
private:
TcpServer * m_server;
ThreadPool * m_pool;
Reactor * m_reactor;
public:
MainServer();
~MainServer();
static void ListenEventCallBack(int fd, short events, void * arg);

};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#include"mainServer.h"
#include"tcpServer.h"
MainServer::Mainserver()
{
m_server = new TcpServer;
}
MainServer::~MainServer()
{
m_server = new TcpServer(IpAddressPort{"127.0.0.1", 8000});
m_pool = new ThreadPool(3);
m_reactor = new Reactor();
m_reactor->AddEventAndHandler(m_server->GetLfd, EV_READ | EV_PERSIST, MainServer::ListenEventCallBack)
}
MainServer::ListenEventCallBack(int fd, short events, void * arg)
{

}

对象池

内容

  1. 关联关系
  2. 一级配置器和二级配置器的关系
  3. 类型萃取
  4. allocator
  5. __malloc_alloc_template静态成员的类外初始化,其中函数指针尤为麻烦

关联关系

弱关联

1
2
3
4
5
6
7
8
class Person
{
private:
Book * pbook;
public:
Student(){}
~Student(){} //不能析构book
}

强关联

1
2
3
4
5
6
7
class Person
{
private:
Book & book;
public:
Student(Book & book) : _book(book){}
}

聚合/组合关系

1
2
3
4
5
6
7
8
9
10
11
class Point
{
private:
float _x;
float _y;
public:
Point(float x = 0.0f, float y = 0.0f) : _x(x), _y(y){}
Point(const Point&) = default;
Point & operator=(const Point&) = default;
~Point(){}
}

new

  1. sizeof
  2. 分配字节空间
  3. 构建 返回地址

delete

  1. 析构
  2. 空间释放

二级配置器

  • 一级配置器的问题
    • 直接的malloc后的数据上下各有一个越界标记。上越界标记之外还有信息,包括申请了多少字节,还有next域、prev域以便连接到链表中进行内存管理。除此之外可能还有有效/失效(是否释放)的标记。
    • 除了包装信息占空间较多之外,还有malloc时花费的时间也多。这样就造成对于比较小的对象数据在malloc时入不敷出。
    • 结论就是:对于较小数据,适用于内存池来进行内存管理。
  • 对于客户端申请较小空间(128bytes)的具体处理流程
    1. 每次配置一大块内存,并维护对应的自由链表(free-list)
    2. 下次如果再有相同大小的内存需求,直接从free-list中取出若干小块。
    3. 如果客户端释放小块内存,就由配置器回收到free-list中。所以配置器除了分配空间,还负责回收空间。
  • 为了方便管理,SGI第二级配置器会主动按任何小块内存的内存需求量计算成8的倍数。(比如客户端要求30bytes,则计算为32bytes。
  • 共维护16个free-lists,各自管理大小分别为8、16、24、32、40、48、56、64、72、80、88、96、104、112、120、128的小额区块。
  • free-lists的单结点结构如下:
1
2
3
4
5
union obj
{
union obj * free_list_link;
char client_data[1]; //the client sees this
}

总览

1
2
3
enum { __ALIGN = 8};
enum { __MAX_BYTES = 128};
enum { __NFREELISTS = __MAX_BYTES / __ALIGN}; // 128 / 8 = 16
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
template<bool threads, int inst>	//参数1:是否考虑多线程,参数2:标识
class __default_alloc_template
{
private:
union obj /* 只是类型定义 */
{
union obj * free_list_link; //next
char client_data[1];
};
private:
static obj * volatile free_list[__NFREELISTS];//volatile修饰obj*,即free_list数组中对于的数据内容。
static char* start_free;//每次配置的一大块'内存' 的内存来源的剩余部分的起始
static char* end_free; //每次配置的一大块'内存' 的内存来源的剩余部分的末尾
static size_t heap_size;//上面提到的都是内存来源,而这个内存来源都是从堆区申请的,堆区申请的总空间数要记录在案
static size_t ROUNT_UP(size_t bytes);
static size_t FREELIST_INDEX(size_t bytes); //hash
static char* chunk_alloc(size_t size, int & nobjs);//size - 对象的大小,nobjs对象的个数
static void* refill(size_t size); //重新填充
public: /* 用户调用 */
static void* allocate(size_t size);
static void deallocate(void* p, size_t n);
static void* reallocate(void* p, size_t old_sz, size_t new_sz);
};

成员初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
// 对free_list[__NFREELISTS]进行初始化。
template<bool threads, int inst> // 1: __default_alloc_template是模板类,需要声明
typename __default_alloc_template<threads, inst>::obj * volatile // 2: obj是__default_alloc_template<threads, inst>类内的类,需要加上“哪个类”,并且要加上"typename"以示"obj"是个类名,而非成员。
__default_alloc_template<threads, inst>::free_list[__NFREELISTS] = {}; // 3: free_list[__NFREELISTS]是__default_alloc_template<threads, inst>类内的成员,需要加上“哪个类”,但无需加"typename"。
// 对start_free进行初始化
template<bool threads, int inst>
char* __default_alloc_template<threads, inst>::start_free = nullptr;
// 对end_free进行初始化
template<bool threads, int inst>
char* __default_alloc_template<threads, inst>::end_free = nullptr;
// 对heap_size进行初始化
template<bool threads, int inst>
size_t __default_alloc_template<threads, inst>::heap_size = 0;

函数实现

准备工作

1
using malloc_alloc = __malloc_alloc_template<0>;

allocate

相当于单链表的头删法,返回删了的节点的地址给用户用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static void * allocate(size_t size)
{
if(size > (size_t)__MAX_BYTES)
{
return malloc_alloc::allocate(size); //转给一级配置器。
}
obj * result = nullptr;
obj * volatile * my_free_list = nullptr; //是二级指针,要指向obj*
my_free_list = free_list + FREELIST_INDEX(size);
result = *my_free_list;
if(nullptr == result) //说明此下标处无内存块了,需要申请。
{
void * r = refill(ROUND_UP(size));
return r;
}
*my_free_list = result->free_list_link; //把free_list此下标的指针更新到下一个节点。相当于链表的头删法。
return result;
}

refill

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
static void * refill(size_t size)
{
int nobjs = 20;
char * chunk = chunk_alloc(size, nobjs);
//chunk_alloc运行之后,nobjs值可能发生改变。
if(1 == nobjs)
{
//如果nobjs是1,则说明内存块正好用完,不用再处理后续内存区域的链接、移动
return chunk;
}

//接下来,把内存块分崩离析,依次串联。
obj * volatile * my_free_list = NULL;
obj * result = (obj*)chunk;
obj * current_obj = NULL, * next_obj = NULL;
my_free_list = free_list + FREELIST_INDEX(size);
*my_free_list = next_obj = (obj*)(chunk+size);
for(int i = 1; ; ++i)
{
current_obj = next_obj;
next_obj = (obj*)((char*)next_obj + size);
if(i == nobjs-1)//最后一块
{
current_obj->free_list_link = NULL;
break;
}
current_obj->free_list_link = next_obj;
}
return result;
}

chunk_alloc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
static char* chunk_alloc(size_t size, int & nobjs)
{
char * result = NULL;
size_t total_bytes = size * nobjs;
size_t bytes_left = end_free - start_free;
if(bytes_left >= total_bytes)
{
result = start_free;
start_free = start_free + total_bytes;
return result;
}
else if(bytes_left >= size)
{
nobjs = bytes_left / size;
total = size * nobjs;

result = start_free;
start_free = start_free + total_bytes;
return result;
}
else //内存不足以分配1个对象,把剩余的插入到其他下标(剩8字节插到0, 16字节插到1, ...),然后再另外申请一个大的
{
size_t bytes_to_get = 2 * total_bytes + ROUND_UP(heap_size >> 4);
if(bytes_left > 0)
{
obj * volatile * my_free_list = free_list + FREELIST_INDEX(bytes_left);
//头插
((obj*)start_free)->free_list_link = *my_free_list;
*my_free_list = (obj*)start_free;
}
//剩余的残存空间 处理完毕,接下来申请大块内存
start_free = (char*)malloc(bytes_to_get);
if(NULL == start_free) //堆区居然没有资源了!无奈之计,只能向比他之后的链表找有无剩余块。
{
obj * volatile * my_free_list = NULL;
for(int i = size; i <= __MAX_BYTES; i += __ALIGN) //注意,i从size开始,而不一定从最小的块大小8开始,因为找比你小的没有用处。得找大的。
{
my_free_list = free_list + FREELIST_INDEX(i);
obj * p = NULL;
if(NULL != p) //找到了比他大的的剩余的了!
{
*my_free_list = (*my_free_list)->free_list_link;
start_free = (char*)p;
end_free = start_free + i;
return chunk_alloc(size, nobjs);
}
}
//如果上面的for循环没能使得其找到合适的内存块,则说明真的是“山穷水尽疑无路”了,只能转1级配置器,抛出异常或者通过handler处理。
start_free = malloc_alloc::allocate(bytes_to_get);
}
end_free = start_free + bytes_to_get;
heap_size += bytes_to_get;
return chunk_alloc(size, nobjs); //递归此函数,相当于重新来一遍流程
}
}

ROUND_UP & INDEX

1
2
3
4
5
6
7
8
9
10
static size_t ROUNT_UP(size_t bytes)
{
// 0 -> 0, 1~8 -> 8, 9~16 - > 16, 17~24 -> 24, ...
return (bytes+ __ALIGN-1) & ~(__ALIGN - 1);
}
static size_t FREELIST_INDEX(size_t bytes)
{
// 1~8 -> 0, 9~16 -> 1, 17~24 -> 2, ...
return ((bytes+ __ALIGN-1) / __ALIGN) - 1;
}

测试allocate

1
2
3
4
5
6
#ifdef __USE_MALLOC
typedef __malloc_alloc_template<0> malloc_alloc;
typedef malloc_alloc alloc;
#else
typedef __default_alloc_template<0, 0> alloc;
#endif
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
template<class T, class Alloc>
class simple_alloc
{
public:
static T* allocate(size_t n) //申请n个T对象
{
return Alloc::allocate(sizeof(T)*n);
}
static T* allocate()
{
return Alloc::allocate(sizeof(T));
}
void deallocate(T *p, size_t n)
{
if(NULL == p)return;
Alloc::deallocate(p, sizeof(T)*n);
}
void deallocate(T* p)
{
if(NULL == p)return;
Alloc::deallocate(p, sizeof(T));
}
};

引入list进行测试

1
2
3
4
5
6
7
8
9
10
11
12
#include"my_list.h"
using namespace std;
int main()
{
xcg::my_list<char> mylist;

for(int i = 0; i < 23; ++i)
{
mylist.push_back(i + 'a');
}
return 0;
}

list构建流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
template<class _Ty, class _A = xcg::alloc>
class my_list
{
public:
typedef _Ty value_type;
typedef _Ty& reference;
typedef _Ty* pointer;
typedef const _Ty& const_reference;
typedef const _Ty* const_pointer;

typedef _A allocator_type;
typedef xcg::simple_alloc<_Node, _A> data_allocate; //simple_alloc中使用的是第二个参数的allocate。在这里即使用_A::allocate,默认参数是xcg::alloc,而根据开关语句,若是使用malloc_alloc则为一级配置器,若为default_alloc则用二级配置器。我们默认使用的是__default_alloc_template<0, 0>。

public:
my_list() : _Head(_Buynode()), _Size(0) {}
protected:
_Nodeptr _Buynode(_Nodeptr _parg = NULL, _Nodeptr _narg = NULL)
{
_Nodeptr _S = data_allocate::allocate();
_Acc::_Prev(_S) = _parg == NULL ? _S : _parg;
_Acc::_Next(_S) = _narg == NULL ? _S : _narg;
return _S;
}
private:
_Nodeptr _Head;
size_t _Size;
};

deallocate

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
static void deallocate(void * p, size_t size)	// n 表示空间大小
{
if(size > (size_t)__MAX_BYTES)
{
malloc_alloc::deallocate(p, size);
return;
}
//归还给配置器
obj * q = (obj*)p;
obj * volatile * my_free_list = free_list + FREELIST_INDEX(size);
//头插
q->free_list_link = *my_free_list;
*my_free_list = q;
return;
}

测试deallocate

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
iterator erase(iterator _P)
{
_Nodeptr _S = _P++._Mynode();
_Acc::_Next(_Acc::_Prev(_S)) = _Acc::_Next(_S);
_Acc::_Prev(_Acc::_Next(_S)) = _Acc::_Prev(_S);
destroy(&_Acc::_Value(_S)); //不删除节点,而是释放value域。
_Freenode(_);

return _P;
}
void _Freenode(_Nodeptr _P)
{
data_allocate::deallocate(_P);
}
void pop_back()
{
erase(--end());
}
void pop_front()
{
erase(begin());
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class Object
{
private:
int value;
public:
Object(int x = 0) : value(x)
{
cout << "create object" << this << endl;
}
~Object()
{
cout << "destroy object" << this << endl;
}
};
int main()
{
xcg::my_list<Object> objlist;
for(int i = 0; i < 10; ++i)
{
objlist.push_back(Object(i));
}
objlist.pop_back();
return 0;
}

reallocate

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
static void* reallocate(void * p, size_t old_sz, size_t new_sz)
{
if(old_sz > (size_t)__MAX_BYTES && new_sz > (size_t)__MAX_BYTES)
{
return malloc_alloc::reallocate(p, old_sz, new_sz);
}
// old_sz == 20, new_sz == 22 -->24, 24
if(ROUND_UP(old_sz) == ROUND_UP(new_sz))
{
return p;
}
// old_sz > 128, new_sz < 128
// old_sz < 128, new_sz < 128
// old_sz < 128, new_sz > 128
size_t sz = old_sz < new_sz ? old_sz : new_sz;
void * s = allocate(new_sz);
memmove(s, p ,sz);
deallocate(p, old, sz);
return s;
}