event_control.h
调整监听事件状态
#ifndef __EVENTCONTROL_H__
#define __EVENTCONTROL_H__
struct epoll_event;
class RecordEvent;
class EventControl{
public:
inline void set_epollfd(const int & epoll_fd){ this->epoll_fd = epoll_fd; }
inline int get_epollfd(){ return epoll_fd; }
void event_add(const int & events, RecordEvent * record_event);
void event_del(RecordEvent * record_event);
void event_set(const int & client_fd, RecordEvent * record_event);
private:
int epoll_fd;
};
#endif
event_control.cpp
#include <sys/epoll.h>
#include <time.h>
#include <iostream>
#include <errno.h>
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include "record_event.h"
#include "event_control.h"
using std::cout;
using std::endl;
void EventControl::event_add(const int & events, RecordEvent * record_event){
struct epoll_event epv = {0, {0}};
epv.data.ptr = record_event;
epv.events = events;
record_event->set_events(events);
int option = (record_event->get_status() == 1 ? EPOLL_CTL_MOD : EPOLL_CTL_ADD);
record_event->set_status(1);
if(epoll_ctl(epoll_fd, option, record_event->get_fd(), &epv) < 0){
cout << "event_add failed" << endl;
}
}
void EventControl::event_del(RecordEvent * record_event){
if(record_event->get_status() != 1)return;
struct epoll_event epv = {0, {0}};
epv.data.ptr = record_event;
if(epoll_ctl(epoll_fd, EPOLL_CTL_DEL, record_event->get_fd(), &epv) < 0){
cout << "event_del failed" << endl;
}
record_event->set_status(0);
}
void EventControl::event_set(const int & client_fd, RecordEvent * record_event){
record_event->set_status(0);
record_event->set_active(time(NULL));
record_event->set_fd(client_fd);
}
record_event.h
扩展事件类,用于业务
#ifndef __RECORDEVENT_H__
#define __RECORDEVENT_H__
class EventControl;
class RecordEvent{
public:
RecordEvent();
~RecordEvent();
inline void set_bufSize(const int & buf_size){ this->buf_size = buf_size; buf = new char [buf_size]; }
inline int get_bufSize(){ return buf_size; }
inline void set_fd(const int & fd){ this->fd = fd; }
inline int get_fd(){ return fd; }
inline void set_events(const int & events){ this->events = events; }
inline int get_events(){ return events; }
inline void set_status(const int & status){ this->status = status; }
inline int get_status(){ return status; }
inline void set_active(const long long & active){ this->active = active; }
inline long long get_active(){ return active; }
bool send_data(EventControl * event_control);
bool recv_data(EventControl * event_control);
private:
int buf_size;
int fd;
int events;
int status;
char * buf;
int len;
long long active;
};
#endif
record_event.cpp
#include <time.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/epoll.h>
#include "event_control.h"
#include "record_event.h"
RecordEvent::RecordEvent(){
buf_size = 0;
fd = -1;
events = -1;
status = 0;
buf = NULL;
len = 0;
active = time(NULL);
}
RecordEvent::~RecordEvent(){
delete [] buf;
}
bool RecordEvent::send_data(EventControl * event_control){
if(len <= 0)return true;
ssize_t flag = send(fd, buf, len, 0);
if(flag > 0){
len = 0;
event_control->event_add(EPOLLIN, this);
return true;
}else{
return false;
}
}
bool RecordEvent::recv_data(EventControl * event_control){
len = recv(fd, buf, buf_size, 0);
if(len > 0){
buf[len] = '\0';
event_control->event_add(EPOLLOUT, this);
return true;
}else{
return false;
}
}
epoll_server.h
epoll反应堆服务端框架
#ifndef __EPOLLSERVER_H__
#define __EPOLLSERVER_H__
#include <sys/epoll.h>
#include "select_server.h"
#include "event_control.h"
#include "record_event.h"
class EpollServer : public SelectServer{
public:
EpollServer();
~EpollServer();
void check_active();
virtual void init_data();
virtual void process_run();
virtual void wait_requests();
virtual void accept_clients();
virtual void close_client(const int & client_fd);
virtual void deal_clients_messages();
private:
int epoll_fd;
struct epoll_event * socket_events;
RecordEvent * record_events;
EventControl * event_control;
};
#endif
epoll_server.cpp
实现源码
#include <sys/types.h>
#include <time.h>
#include <unistd.h>
#include <fcntl.h>
#include <iostream>
#include "epoll_server.h"
const int client_bufsize = 4096;
using std::cout;
using std::endl;
EpollServer::EpollServer(){
epoll_fd = 0;
socket_events = NULL;
record_events = NULL;
event_control = NULL;
cout << "epoll server is starting ......" << endl;
}
EpollServer::~EpollServer(){
if(init_flag){
for(int i = 0; i < client_max_num; ++i){
if(record_events[i].get_fd() != -1){
net_api.Close(record_events[i].get_fd());
}
}
delete [] client_addrs;
delete [] socket_events;
delete [] record_events;
delete event_control;
}
}
void EpollServer::init_data(){
if( !client_max_num ) return;
epoll_fd = epoll_create(client_max_num);
if(epoll_fd == -1){
net_api.Perror("epoll_create error");
}
client_addrs = new sockaddr_in [client_max_num];
socket_events = new epoll_event [client_max_num];
record_events = new RecordEvent [client_max_num];
event_control = new EventControl;
event_control->set_epollfd(epoll_fd);
if(fcntl(listen_fd, F_SETFL, O_NONBLOCK) < 0){
net_api.Perror("fcntl listen_fd error");
}
socket_events[0].events = EPOLLIN;
socket_events[0].data.fd = listen_fd;
if(epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd, &socket_events[0]) < 0){
net_api.Perror("epoll_ctl add listen_fd error");
}
init_flag = true;
}
void EpollServer::process_run(){
if( !init_flag )return;
int check_num = 0;
while(true){
wait_requests();
if(check_num >= 100){
check_active();
check_num = 0;
}
if(request_nums > 0){
deal_clients_messages();
}
++check_num;
}
}
void EpollServer::check_active(){
int current_time = time(NULL);
for(int i = 0; i < client_max_num; ++i){
if(record_events[i].get_fd() == -1 || !record_events[i].get_status())
continue;
if(record_events[i].get_active() - current_time >= 6000){
close_client(record_events[i].get_fd());
}
}
}
void EpollServer::wait_requests(){
request_nums = epoll_wait(epoll_fd, socket_events, client_max_num, 600);
}
void EpollServer::accept_clients(){
int coord = -1;
for(int i = 0; i < client_max_num; ++i){
if(record_events[i].get_fd() == -1){
coord = i;
break;
}
}
if(coord == -1){
cout << "client number out of bound!" << endl;
return;
}
socket_len = sizeof(client_addrs[coord]);
int client_fd = net_api.Accept(listen_fd, (struct sockaddr *) & client_addrs[coord], & socket_len);
if(fcntl(client_fd, F_SETFL, O_NONBLOCK) < 0){
cout << "fcntl nonblocking client_fd failed!" << endl;
return;
}
event_control->event_set(client_fd, &record_events[coord]);
event_control->event_add(EPOLLIN, &record_events[coord]);
if(record_events[coord].get_bufSize() <= 0){
record_events[coord].set_bufSize(client_bufsize);
}
map_coord[client_fd] = coord;
cout << "A client connect................" << endl;
display_socket_addr(coord);
}
void EpollServer::close_client(const int & client_fd){
int coord = map_coord[client_fd];
record_events[coord].set_fd(-1);
cout << "A client disconnect.................." << endl;
display_socket_addr(coord);
net_api.Close(client_fd);
}
void EpollServer::deal_clients_messages(){
for(int i = 0; i < request_nums; ++i){
if(socket_events[i].data.fd == listen_fd && (socket_events[i].events & EPOLLIN)){
accept_clients();
continue;
}
RecordEvent * re_ev = (RecordEvent *) socket_events[i].data.ptr;
if((socket_events[i].events & EPOLLIN) && (re_ev->get_events() & EPOLLIN)){
if(re_ev->recv_data(event_control) == false){
close_client(re_ev->get_fd());
}else{
re_ev->set_active(time(NULL));
}
}
if((socket_events[i].events & EPOLLOUT) && (re_ev->get_events() & EPOLLOUT)){
if(re_ev->send_data(event_control) == false){
close_client(re_ev->get_fd());
}else{
re_ev->set_active(time(NULL));
}
}
}
}
上述实现了多路I/O转接的基于epoll反应堆模型的高并发服务器的基本框架,其中用到的一些类在上一节中已经给出了原型,基于篇幅就不再展现。在进行异步服务的时候还可以考虑用线程池来进行对客户端的服务,能够极大提高服务器的响应处理效率!
Comments