博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
对libevent+多线程服务器模型的C++封装类
阅读量:7093 次
发布时间:2019-06-28

本文共 12133 字,大约阅读时间需要 40 分钟。

hot3.png

最近在看memcached的源码,觉得它那种libevent+多线程的服务器模型真的很不错,我将这个模型封装成一个C++类,根据我的简单测试,这个模型的效率真的很不错,欢迎大家试用。

这个类的使用方法很简单(缺点是不太灵活),只要派生一个类,根据需要重写以下这几个虚函数就行了:

//新建连接成功后,会调用该函数virtual void ConnectionEvent(Conn *conn) { }//读取完数据后,会调用该函数virtual void ReadEvent(Conn *conn) { }//发送完成功后,会调用该函数(因为串包的问题,所以并不是每次发送完数据都会被调用)virtual void WriteEvent(Conn *conn) { }//断开连接(客户自动断开或异常断开)后,会调用该函数virtual void CloseEvent(Conn *conn, short events) { }//发生致命错误(如果创建子线程失败等)后,会调用该函数//该函数的默认操作是输出错误提示,终止程序virtual void ErrorQuit(const char *str);

上代码:

头文件:TcpEventServer.h

//TcpEventServer.h#ifndef TCPEVENTSERVER_H_#define TCPEVENTSERVER_H_#include 
#include
#include
#include
#include
#include
#include
#include
#include
#include
using std::map;#include
#include
#include
#include
#include
#include
class TcpEventServer;class Conn;class ConnQueue;struct LibeventThread;//这个类一个链表的结点类,结点里存储各个连接的信息,//并提供了读写数据的接口class Conn{ //此类只能由TcpBaseServer创建, //并由ConnQueue类管理 friend class ConnQueue; friend class TcpEventServer;private: const int m_fd; //socket的ID evbuffer *m_ReadBuf; //读数据的缓冲区 evbuffer *m_WriteBuf; //写数据的缓冲区 Conn *m_Prev; //前一个结点的指针 Conn *m_Next; //后一个结点的指针 LibeventThread *m_Thread; Conn(int fd=0); ~Conn();public: LibeventThread *GetThread() { return m_Thread; } int GetFd() { return m_fd; } //获取可读数据的长度 int GetReadBufferLen() { return evbuffer_get_length(m_ReadBuf); } //从读缓冲区中取出len个字节的数据,存入buffer中,若不够,则读出所有数据 //返回读出数据的字节数 int GetReadBuffer(char *buffer, int len) { return evbuffer_remove(m_ReadBuf, buffer, len); } //从读缓冲区中复制出len个字节的数据,存入buffer中,若不够,则复制出所有数据 //返回复制出数据的字节数 //执行该操作后,数据还会留在缓冲区中,buffer中的数据只是原数据的副本 int CopyReadBuffer(char *buffer, int len) { return evbuffer_copyout(m_ReadBuf, buffer, len); } //获取可写数据的长度 int GetWriteBufferLen() { return evbuffer_get_length(m_WriteBuf); } //将数据加入写缓冲区,准备发送 int AddToWriteBuffer(char *buffer, int len) { return evbuffer_add(m_WriteBuf, buffer, len); } //将读缓冲区中的数据移动到写缓冲区 void MoveBufferData() { evbuffer_add_buffer(m_WriteBuf, m_ReadBuf); }};//带头尾结点的双链表类,每个结点存储一个连接的数据class ConnQueue{private: Conn *m_head; Conn *m_tail;public: ConnQueue(); ~ConnQueue(); Conn *InsertConn(int fd, LibeventThread *t); void DeleteConn(Conn *c); //void PrintQueue();};//每个子线程的线程信息struct LibeventThread{ pthread_t tid; //线程的ID struct event_base *base; //libevent的事件处理机 struct event notifyEvent; //监听管理的事件机 int notifyReceiveFd; //管理的接收端 int notifySendFd; //管道的发送端 ConnQueue connectQueue; //socket连接的链表 //在libevent的事件处理中要用到很多回调函数,不能使用类隐含的this指针 //所以用这样方式将TcpBaseServer的类指针传过去 TcpEventServer *tcpConnect; //TcpBaseServer类的指针};class TcpEventServer{private: int m_ThreadCount; //子线程数 int m_Port; //监听的端口 LibeventThread *m_MainBase; //主线程的libevent事件处理机 LibeventThread *m_Threads; //存储各个子线程信息的数组 map
m_SignalEvents; //自定义的信号处理public: static const int EXIT_CODE = -1;private: //初始化子线程的数据 void SetupThread(LibeventThread *thread); //子线程的入门函数 static void *WorkerLibevent(void *arg); //(主线程收到请求后),对应子线程的处理函数 static void ThreadProcess(int fd, short which, void *arg); //被libevent回调的各个静态函数 static void ListenerEventCb(evconnlistener *listener, evutil_socket_t fd, sockaddr *sa, int socklen, void *user_data); static void ReadEventCb(struct bufferevent *bev, void *data); static void WriteEventCb(struct bufferevent *bev, void *data); static void CloseEventCb(struct bufferevent *bev, short events, void *data);protected: //这五个虚函数,一般是要被子类继承,并在其中处理具体业务的 //新建连接成功后,会调用该函数 virtual void ConnectionEvent(Conn *conn) { } //读取完数据后,会调用该函数 virtual void ReadEvent(Conn *conn) { } //发送完成功后,会调用该函数(因为串包的问题,所以并不是每次发送完数据都会被调用) virtual void WriteEvent(Conn *conn) { } //断开连接(客户自动断开或异常断开)后,会调用该函数 virtual void CloseEvent(Conn *conn, short events) { } //发生致命错误(如果创建子线程失败等)后,会调用该函数 //该函数的默认操作是输出错误提示,终止程序 virtual void ErrorQuit(const char *str);public: TcpEventServer(int count); ~TcpEventServer(); //设置监听的端口号,如果不需要监听,请将其设置为EXIT_CODE void SetPort(int port) { m_Port = port; } //开始事件循环 bool StartRun(); //在tv时间里结束事件循环 //否tv为空,则立即停止 void StopRun(timeval *tv); //添加和删除信号处理事件 //sig是信号,ptr为要回调的函数 bool AddSignalEvent(int sig, void (*ptr)(int, short, void*)); bool DeleteSignalEvent(int sig); //添加和删除定时事件 //ptr为要回调的函数,tv是间隔时间,once决定是否只执行一次 event *AddTimerEvent(void(*ptr)(int, short, void*), timeval tv, bool once); bool DeleteTImerEvent(event *ev);};#endif

 

实现文件:TcpEventServer.cpp

//TcpEventServer.cpp#include "TcpEventServer.h"Conn::Conn(int fd) : m_fd(fd){	m_Prev = NULL;	m_Next = NULL;}Conn::~Conn(){}ConnQueue::ConnQueue(){	//建立头尾结点,并调整其指针	m_head = new Conn(0);	m_tail = new Conn(0);	m_head->m_Prev = m_tail->m_Next = NULL;	m_head->m_Next = m_tail;	m_tail->m_Prev = m_head;}ConnQueue::~ConnQueue(){	Conn *tcur, *tnext;	tcur = m_head;	//循环删除链表中的各个结点	while( tcur != NULL )	{		tnext = tcur->m_Next;		delete tcur;		tcur = tnext;	}}Conn *ConnQueue::InsertConn(int fd, LibeventThread *t){	Conn *c = new Conn(fd);	c->m_Thread = t;	Conn *next = m_head->m_Next;	c->m_Prev = m_head;	c->m_Next = m_head->m_Next;	m_head->m_Next = c;	next->m_Prev = c;	return c;}void ConnQueue::DeleteConn(Conn *c){	c->m_Prev->m_Next = c->m_Next;	c->m_Next->m_Prev = c->m_Prev;	delete c;}/*void ConnQueue::PrintQueue(){	Conn *cur = m_head->m_Next;	while( cur->m_Next != NULL )	{		printf("%d ", cur->m_fd);		cur = cur->m_Next;	}	printf("\n");}*/TcpEventServer::TcpEventServer(int count){	//初始化各项数据	m_ThreadCount = count;	m_Port = -1;	m_MainBase = new LibeventThread;	m_Threads = new LibeventThread[m_ThreadCount];	m_MainBase->tid = pthread_self();	m_MainBase->base = event_base_new();	//初始化各个子线程的结构体	for(int i=0; i
base); for(int i=0; i
tcpConnect = this; me->base = event_base_new(); if( NULL == me->base ) ErrorQuit("event base new error"); //在主线程和子线程之间建立管道 int fds[2]; if( pipe(fds) ) ErrorQuit("create pipe error"); me->notifyReceiveFd = fds[0]; me->notifySendFd = fds[1]; //让子线程的状态机监听管道 event_set( &me->notifyEvent, me->notifyReceiveFd, EV_READ | EV_PERSIST, ThreadProcess, me ); event_base_set(me->base, &me->notifyEvent); if ( event_add(&me->notifyEvent, 0) == -1 ) ErrorQuit("Can't monitor libevent notify pipe\n");}void *TcpEventServer::WorkerLibevent(void *arg){ //开启libevent的事件循环,准备处理业务 LibeventThread *me = (LibeventThread*)arg; //printf("thread %u started\n", (unsigned int)me->tid); event_base_dispatch(me->base); //printf("subthread done\n");}bool TcpEventServer::StartRun(){ evconnlistener *listener; //如果端口号不是EXIT_CODE,就监听该端口号 if( m_Port != EXIT_CODE ) { sockaddr_in sin; memset(&sin, 0, sizeof(sin)); sin.sin_family = AF_INET; sin.sin_port = htons(m_Port); listener = evconnlistener_new_bind(m_MainBase->base, ListenerEventCb, (void*)this, LEV_OPT_REUSEABLE|LEV_OPT_CLOSE_ON_FREE, -1, (sockaddr*)&sin, sizeof(sockaddr_in)); if( NULL == listener ) ErrorQuit("TCP listen error"); } //开启各个子线程 for(int i=0; i
base); //事件循环结果,释放监听者的内存 if( m_Port != EXIT_CODE ) { //printf("free listen\n"); evconnlistener_free(listener); }}void TcpEventServer::StopRun(timeval *tv){ int contant = EXIT_CODE; //向各个子线程的管理中写入EXIT_CODE,通知它们退出 for(int i=0; i
base, tv);}void TcpEventServer::ListenerEventCb(struct evconnlistener *listener, evutil_socket_t fd, struct sockaddr *sa, int socklen, void *user_data){ TcpEventServer *server = (TcpEventServer*)user_data; //随机选择一个子线程,通过管道向其传递socket描述符 int num = rand() % server->m_ThreadCount; int sendfd = server->m_Threads[num].notifySendFd; write(sendfd, &fd, sizeof(evutil_socket_t));}void TcpEventServer::ThreadProcess(int fd, short which, void *arg){ LibeventThread *me = (LibeventThread*)arg; //从管道中读取数据(socket的描述符或操作码) int pipefd = me->notifyReceiveFd; evutil_socket_t confd; read(pipefd, &confd, sizeof(evutil_socket_t)); //如果操作码是EXIT_CODE,则终于事件循环 if( EXIT_CODE == confd ) { event_base_loopbreak(me->base); return; } //新建连接 struct bufferevent *bev; bev = bufferevent_socket_new(me->base, confd, BEV_OPT_CLOSE_ON_FREE); if (!bev) { fprintf(stderr, "Error constructing bufferevent!"); event_base_loopbreak(me->base); return; } //将该链接放入队列 Conn *conn = me->connectQueue.InsertConn(confd, me); //准备从socket中读写数据 bufferevent_setcb(bev, ReadEventCb, WriteEventCb, CloseEventCb, conn); bufferevent_enable(bev, EV_WRITE); bufferevent_enable(bev, EV_READ); //调用用户自定义的连接事件处理函数 me->tcpConnect->ConnectionEvent(conn);}void TcpEventServer::ReadEventCb(struct bufferevent *bev, void *data){ Conn *conn = (Conn*)data; conn->m_ReadBuf = bufferevent_get_input(bev); conn->m_WriteBuf = bufferevent_get_output(bev); //调用用户自定义的读取事件处理函数 conn->m_Thread->tcpConnect->ReadEvent(conn);} void TcpEventServer::WriteEventCb(struct bufferevent *bev, void *data){ Conn *conn = (Conn*)data; conn->m_ReadBuf = bufferevent_get_input(bev); conn->m_WriteBuf = bufferevent_get_output(bev); //调用用户自定义的写入事件处理函数 conn->m_Thread->tcpConnect->WriteEvent(conn);}void TcpEventServer::CloseEventCb(struct bufferevent *bev, short events, void *data){ Conn *conn = (Conn*)data; //调用用户自定义的断开事件处理函数 conn->m_Thread->tcpConnect->CloseEvent(conn, events); conn->GetThread()->connectQueue.DeleteConn(conn); bufferevent_free(bev);}bool TcpEventServer::AddSignalEvent(int sig, void (*ptr)(int, short, void*)){ //新建一个信号事件 event *ev = evsignal_new(m_MainBase->base, sig, ptr, (void*)this); if ( !ev || event_add(ev, NULL) < 0 ) { event_del(ev); return false; } //删除旧的信号事件(同一个信号只能有一个信号事件) DeleteSignalEvent(sig); m_SignalEvents[sig] = ev; return true;}bool TcpEventServer::DeleteSignalEvent(int sig){ map
::iterator iter = m_SignalEvents.find(sig); if( iter == m_SignalEvents.end() ) return false; event_del(iter->second); m_SignalEvents.erase(iter); return true;}event *TcpEventServer::AddTimerEvent(void (*ptr)(int, short, void *), timeval tv, bool once){ int flag = 0; if( !once ) flag = EV_PERSIST; //新建定时器信号事件 event *ev = new event; event_assign(ev, m_MainBase->base, -1, flag, ptr, (void*)this); if( event_add(ev, &tv) < 0 ) { event_del(ev); return NULL; } return ev;}bool TcpEventServer::DeleteTImerEvent(event *ev){ int res = event_del(ev); return (0 == res);}

 

测试文件:test.cpp

/*这是一个测试用的服务器,只有两个功能:1:对于每个已连接客户端,每10秒向其发送一句hello, world2:若客户端向服务器发送数据,服务器收到后,再将数据回发给客户端*///test.cpp#include "TcpEventServer.h"#include 
#include
using namespace std;//测试示例class TestServer : public TcpEventServer{private: vector
vec;protected: //重载各个处理业务的虚函数 void ReadEvent(Conn *conn); void WriteEvent(Conn *conn); void ConnectionEvent(Conn *conn); void CloseEvent(Conn *conn, short events);public: TestServer(int count) : TcpEventServer(count) { } ~TestServer() { } //退出事件,响应Ctrl+C static void QuitCb(int sig, short events, void *data); //定时器事件,每10秒向所有客户端发一句hello, world static void TimeOutCb(int id, int short events, void *data);};void TestServer::ReadEvent(Conn *conn){ conn->MoveBufferData();}void TestServer::WriteEvent(Conn *conn){}void TestServer::ConnectionEvent(Conn *conn){ TestServer *me = (TestServer*)conn->GetThread()->tcpConnect; printf("new connection: %d\n", conn->GetFd()); me->vec.push_back(conn);}void TestServer::CloseEvent(Conn *conn, short events){ printf("connection closed: %d\n", conn->GetFd());}void TestServer::QuitCb(int sig, short events, void *data){ printf("Catch the SIGINT signal, quit in one second\n"); TestServer *me = (TestServer*)data; timeval tv = {
1, 0}; me->StopRun(&tv);}void TestServer::TimeOutCb(int id, short events, void *data){ TestServer *me = (TestServer*)data; char temp[33] = "hello, world\n"; for(int i=0; i
vec.size(); i++) me->vec[i]->AddToWriteBuffer(temp, strlen(temp));}int main(){ printf("pid: %d\n", getpid()); TestServer server(3); server.AddSignalEvent(SIGINT, TestServer::QuitCb); timeval tv = {
10, 0}; server.AddTimerEvent(TestServer::TimeOutCb, tv, false); server.SetPort(2111); server.StartRun(); printf("done\n"); return 0;}

 

编译与运行命令:

qch@LinuxMint ~/program/ztemp $ g++ TcpEventServer.cpp test.cpp -o test -levent -lpthreadqch@LinuxMint ~/program/ztemp $ ./testpid: 20264new connection: 22connection closed: 22^CCatch the SIGINT signal, quit in one seconddone

转载于:https://my.oschina.net/elitetao/blog/1933427

你可能感兴趣的文章
高级文件系统管理
查看>>
磁盘阵列RAID的功能作用介绍
查看>>
安装discuz
查看>>
详解:Redis主从技术的应用
查看>>
听JITStack讲解:什么是边缘计算?边缘计算是靠近数据源的计算基础...
查看>>
机器学习的定义与起源,我国机器学习发展现状和出路?
查看>>
maven 笔记,具体配置
查看>>
Shell/Python实现Mysql读txt文本
查看>>
Linux学习笔记<二十二>——计划任务
查看>>
chart 目录结构 - 每天5分钟玩转 Docker 容器技术(164)
查看>>
自动化运维工具之Zabbix宏使用及用户自定义监控(三)
查看>>
每日一道shell 练习(05)——批量打包文件
查看>>
思科3550交换机 12T 12G 两种型号区别
查看>>
grep 技巧
查看>>
4月第一周中国五大顶级域名增3.7万 美国减3457个
查看>>
RedHat6使用Centos6的yum源
查看>>
NoSQL最新现状和趋势:云NoSQL数据库将成重要增长引擎
查看>>
206. Reverse Linked List - LeetCode
查看>>
Linux环境下Maven仓库的搭建(nexus)及Mavan的简单使用
查看>>
批量替换多个文件中的字符
查看>>