event_control.h

调整监听事件状态

#ifndef __EVENTCONTROL_H__

#define __EVENTCONTROL_H__


struct epoll_event;

class RecordEvent;


class EventControl{


public:

inline void set_epollfd(const int & epoll_fd){ this->epoll_fd = epoll_fd; }

inline int get_epollfd(){ return epoll_fd; }

void event_add(const int & events, RecordEvent * record_event);

void event_del(RecordEvent * record_event);

void event_set(const int & client_fd, RecordEvent * record_event);


private:

int epoll_fd;

};


#endif


event_control.cpp

#include <sys/epoll.h>

#include <time.h>

#include <iostream>

#include <errno.h>

#include <unistd.h>

#include <stdlib.h>

#include <stdio.h>


#include "record_event.h"

#include "event_control.h"


using std::cout;

using std::endl;



void EventControl::event_add(const int & events, RecordEvent * record_event){


struct epoll_event epv = {0, {0}};


epv.data.ptr = record_event;

epv.events = events;

record_event->set_events(events);


int option = (record_event->get_status() == 1 ? EPOLL_CTL_MOD : EPOLL_CTL_ADD);

record_event->set_status(1);


if(epoll_ctl(epoll_fd, option, record_event->get_fd(), &epv) < 0){

cout << "event_add failed" << endl;

}


}


void EventControl::event_del(RecordEvent * record_event){


if(record_event->get_status() != 1)return;


struct epoll_event epv = {0, {0}};

epv.data.ptr = record_event;

if(epoll_ctl(epoll_fd, EPOLL_CTL_DEL, record_event->get_fd(), &epv) < 0){

cout << "event_del failed" << endl;

}

record_event->set_status(0);


}


void EventControl::event_set(const int & client_fd, RecordEvent * record_event){


record_event->set_status(0);

record_event->set_active(time(NULL));

record_event->set_fd(client_fd);


}


record_event.h

扩展事件类,用于业务

#ifndef __RECORDEVENT_H__

#define __RECORDEVENT_H__


class EventControl;


class RecordEvent{


public:

RecordEvent();

~RecordEvent();


inline void set_bufSize(const int & buf_size){ this->buf_size = buf_size; buf = new char [buf_size]; }

inline int get_bufSize(){ return buf_size; }

inline void set_fd(const int & fd){ this->fd = fd; }

inline int get_fd(){ return fd; } 

inline void set_events(const int & events){ this->events = events; }

inline int get_events(){ return events; }

inline void set_status(const int & status){ this->status = status; }

inline int get_status(){ return status; }

inline void set_active(const long long & active){ this->active = active; }

inline long long get_active(){ return active; }

bool send_data(EventControl * event_control);

bool recv_data(EventControl * event_control);


private:

int buf_size;

int fd;

int events;

int status;

char * buf;

int len;

long long active;

};


#endif 


record_event.cpp

#include <time.h>

#include <sys/socket.h>

#include <sys/types.h>

#include <sys/epoll.h>


#include "event_control.h"

#include "record_event.h"



RecordEvent::RecordEvent(){

buf_size = 0;

fd = -1;

events = -1;

status = 0;

buf = NULL;

len = 0;

active = time(NULL);


}


RecordEvent::~RecordEvent(){

delete [] buf;

}


bool RecordEvent::send_data(EventControl * event_control){


if(len <= 0)return true;

ssize_t flag = send(fd, buf, len, 0);

if(flag > 0){

len = 0;

event_control->event_add(EPOLLIN, this);

return true;

}else{

return false;

}


}


bool RecordEvent::recv_data(EventControl * event_control){


len = recv(fd, buf, buf_size, 0);


if(len > 0){


buf[len] = '\0';

event_control->event_add(EPOLLOUT, this);


return true;

}else{

return false;

}

}


epoll_server.h

epoll反应堆服务端框架


#ifndef __EPOLLSERVER_H__

#define __EPOLLSERVER_H__


#include <sys/epoll.h>


#include "select_server.h"

#include "event_control.h"

#include "record_event.h"



class EpollServer : public SelectServer{


public:

EpollServer();

~EpollServer();


void check_active();


virtual void init_data();

virtual void process_run();

virtual void wait_requests();

virtual void accept_clients();

virtual void close_client(const int & client_fd);

virtual void deal_clients_messages();



private:

int epoll_fd;


struct epoll_event * socket_events;

RecordEvent * record_events;

EventControl * event_control;


};



#endif

epoll_server.cpp

实现源码

#include <sys/types.h>

#include <time.h>

#include <unistd.h>

#include <fcntl.h>

#include <iostream>


#include "epoll_server.h"


const int client_bufsize = 4096;


using std::cout;

using std::endl;


EpollServer::EpollServer(){


epoll_fd = 0;

socket_events = NULL;

record_events = NULL;

event_control = NULL;


cout << "epoll server is starting ......" << endl;

}


EpollServer::~EpollServer(){


if(init_flag){

for(int i = 0; i < client_max_num; ++i){

if(record_events[i].get_fd() != -1){

net_api.Close(record_events[i].get_fd());

}

}

delete [] client_addrs;

delete [] socket_events;

delete [] record_events;

delete event_control;

}


}


void EpollServer::init_data(){


if( !client_max_num ) return;


epoll_fd = epoll_create(client_max_num);


if(epoll_fd == -1){

net_api.Perror("epoll_create error");

}

client_addrs = new sockaddr_in [client_max_num];

socket_events = new epoll_event [client_max_num];

record_events = new RecordEvent [client_max_num];

event_control = new EventControl;


event_control->set_epollfd(epoll_fd);

if(fcntl(listen_fd, F_SETFL, O_NONBLOCK) < 0){

net_api.Perror("fcntl listen_fd error");

}

socket_events[0].events = EPOLLIN;

socket_events[0].data.fd = listen_fd;


if(epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd, &socket_events[0]) < 0){

net_api.Perror("epoll_ctl add listen_fd error");

}


init_flag = true;


}


void EpollServer::process_run(){


if( !init_flag )return;

int check_num = 0;


while(true){


wait_requests();

if(check_num >= 100){


check_active();

check_num = 0;

}


if(request_nums > 0){


deal_clients_messages();

}


++check_num;

}

}


void EpollServer::check_active(){


int current_time = time(NULL);

for(int i = 0; i < client_max_num; ++i){

if(record_events[i].get_fd() == -1 || !record_events[i].get_status())

continue;


if(record_events[i].get_active() - current_time >= 6000){

close_client(record_events[i].get_fd());

}

}


}


void EpollServer::wait_requests(){

request_nums = epoll_wait(epoll_fd, socket_events, client_max_num, 600);


}


void EpollServer::accept_clients(){


int coord = -1;

for(int i = 0; i < client_max_num; ++i){

if(record_events[i].get_fd() == -1){

coord = i;

break;

}

}

if(coord == -1){

cout << "client number out of bound!" << endl;

return;

}


socket_len = sizeof(client_addrs[coord]);

int client_fd = net_api.Accept(listen_fd, (struct sockaddr *) & client_addrs[coord], & socket_len);

if(fcntl(client_fd, F_SETFL, O_NONBLOCK) < 0){

cout << "fcntl nonblocking client_fd failed!" << endl;

return;

}


event_control->event_set(client_fd, &record_events[coord]);

event_control->event_add(EPOLLIN, &record_events[coord]);


if(record_events[coord].get_bufSize() <= 0){

record_events[coord].set_bufSize(client_bufsize);

}


map_coord[client_fd] = coord;


cout << "A client connect................" << endl;

display_socket_addr(coord);

}


void EpollServer::close_client(const int & client_fd){


int coord = map_coord[client_fd];

record_events[coord].set_fd(-1);

cout << "A client disconnect.................." << endl;

display_socket_addr(coord);

net_api.Close(client_fd);

}


void EpollServer::deal_clients_messages(){


for(int i = 0; i < request_nums; ++i){


if(socket_events[i].data.fd == listen_fd && (socket_events[i].events & EPOLLIN)){

accept_clients();

continue;

}


RecordEvent * re_ev = (RecordEvent *) socket_events[i].data.ptr;


if((socket_events[i].events & EPOLLIN) && (re_ev->get_events() & EPOLLIN)){

if(re_ev->recv_data(event_control) == false){

close_client(re_ev->get_fd());

}else{

re_ev->set_active(time(NULL));

}

}

if((socket_events[i].events & EPOLLOUT) && (re_ev->get_events() & EPOLLOUT)){


if(re_ev->send_data(event_control) == false){

close_client(re_ev->get_fd());

}else{

re_ev->set_active(time(NULL));

}

}

}


}


  上述实现了多路I/O转接的基于epoll反应堆模型的高并发服务器的基本框架,其中用到的一些类在上一节中已经给出了原型,基于篇幅就不再展现。在进行异步服务的时候还可以考虑用线程池来进行对客户端的服务,能够极大提高服务器的响应处理效率!