Ключевые слова:epoll, linux, select, gcc, (найти похожие документы)
From: Maksud Nurullaev <maksud.nurullaev[DOG]gmail.com>
Date: Mon, 8 Apr 2010 17:02:14 +0000 (UTC)
Subject: Создание сервера для обработки событий с использованием epoll
К моменту написания данной программы информации по epoll практически не было даже на английском, и мною был создан простой проект чата с названием seChat (сокращение от "Simple Epoll Chat") для исследования ее возможностей. В статье хотелось поделится общими впечатлениями о epoll и помочь другим узнать ее на примере простого проекта.
Наша задача - создать чат-проект с простыми программами, а именно:
сервер
прослушивает предопределенный ip адрес и порт, и "регистрирует" всех клиентов на обслуживание;
идентифицирует каждого нового клиента при подключении;
принимает сообщение от любого клиента, и рассылает всем, кроме "отправителя";
клиент может получать и отправлять сообщения одновременно;
тестер - программа для тестирования нагрузки на сервер с большим количеством одновременных подключений.
Оговоримся сразу:
epoll используется для управления событиями о новых сообщениях как на стороне сервера, так и на стороне клиента;
по всем незнакомым командам и функциям не поленитесь обратится к соответствующим руководствам, там все прекрасно описано;
для простоты:
в проекте "нормальных" обработчиков ошибок практически нет и при любых исключениях программа просто завершается с ошибкой (что оказалось очень практичным решением на стадии кодирования и тестирования!). В идеале, программа должна обработать ошибку и постараться вернуться в "рабочий режим", но я старался кодировать "без фанатизма" поддерживая философию "чем проще, тем лучше для усвоения";
я жестко закодировал ip адрес и порт - мне лень каждый раз набивать их в параметрах запуска, а вы можете сделать по другому;
#ifndef _SCHAT_LOCAL_H_
#define _SCHAT_LOCAL_H
#include <iostream>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <errno.h>
#include <unistd.h>
#include <stdio.h>
#include <list>
#include <time.h>
// Default buffer size
#define BUF_SIZE 1024
// Default port
#define SERVER_PORT 44444
// seChat server ip, you should change it to your own server ip address
#define SERVER_HOST "192.168.34.15"
// Default timeout - http://linux.die.net/man/2/epoll_wait
#define EPOLL_RUN_TIMEOUT -1
// Count of connections that we are planning to handle (just hint to kernel)
#define EPOLL_SIZE 10000
// First welcome message from server
#define STR_WELCOME "Welcome to seChat! You ID is: Client #%d"
// Format of message population
#define STR_MESSAGE "Client #%d>> %s"
// Warning message if you alone in server
#define STR_NOONE_CONNECTED "Noone connected to server except you!"
// Commad to exit
#define CMD_EXIT "EXIT"
// Macros - exit in any error (eval < 0) case
#define CHK(eval) if(eval < 0){perror("eval"); exit(-1);}
// Macros - same as above, but save the result(res) of expression(eval)
#define CHK2(res, eval) if((res = eval) < 0){perror("eval"); exit(-1);}
// Preliminary declaration of functions
int setnonblocking(int sockfd);
void debug_epoll_event(epoll_event ev);
int handle_message(int new_fd);
int print_incoming(int fd);
#endif
Кодировать сервер было проще всего. Задача сервера предельна ясна, получать сообщения от клиента и делать массовые рассылки другим( если таковые есть на сервере) или отправить предупреждение отправителю об их отсутствии.
#include "local.h"
#include "utils.h"
using namespace std;
// To store client's socket list
list<int> clients_list;
// for debug mode
int DEBUG_MODE = 0;
int main(int argc, char *argv[])
{
// *** Define debug mode
// any additional parameres on startup
// i.e. like './server f' or './server debug'
// we will switch to switch to debug mode(very simple anmd useful)
if(argc > 1) DEBUG_MODE = 1;
if(DEBUG_MODE){
printf("Debug mode is ON!\n");
printf("MAIN: argc = %d\n", argc);
for(int i=0; i<argc; i++)
printf(" argv[%d] = %s\n", i, argv[i]);
}else printf("Debug mode is OFF!\n");
// *** Define values
// main server listener
int listener;
// define ip & ports for server(addr)
// and incoming client ip & ports(their_addr)
struct sockaddr_in addr, their_addr;
// configure ip & port for listen
addr.sin_family = PF_INET;
addr.sin_port = htons(SERVER_PORT);
addr.sin_addr.s_addr = inet_addr(SERVER_HOST);
// size of address
socklen_t socklen;
socklen = sizeof(struct sockaddr_in);
// event template for epoll_ctl(ev)
// storage array for incoming events from epoll_wait(events)
// and maximum events count could be EPOLL_SIZE
static struct epoll_event ev, events[EPOLL_SIZE];
// watch just incoming(EPOLLIN)
// and Edge Trigged(EPOLLET) events
ev.events = EPOLLIN | EPOLLET;
// chat message buffer
char message[BUF_SIZE];
// epoll descriptor to watch events
int epfd;
// to calculate the execution time of a program
clock_t tStart;
// other values:
// new client descriptor(client)
// to keep the results of different functions(res)
// to keep incoming epoll_wait's events count(epoll_events_count)
int client, res, epoll_events_count;
// *** Setup server listener
// create listener with PF_INET(IPv4) and
// SOCK_STREAM(sequenced, reliable, two-way, connection-based byte stream)
CHK2(listener, socket(PF_INET, SOCK_STREAM, 0));
printf("Main listener(fd=%d) created! \n",listener);
// setup nonblocking socket
setnonblocking(listener);
// bind listener to address(addr)
CHK(bind(listener, (struct sockaddr *)&addr, sizeof(addr)));
printf("Listener binded to: %s\n", SERVER_HOST);
// start to listen connections
CHK(listen(listener, 1));
printf("Start to listen: %s!\n", SERVER_HOST);
// *** Setup epoll
// create epoll descriptor
// and backup store for EPOLL_SIZE of socket events
CHK2(epfd,epoll_create(EPOLL_SIZE));
printf("Epoll(fd=%d) created!\n", epfd);
// set listener to event template
ev.data.fd = listener;
// add listener to epoll
CHK(epoll_ctl(epfd, EPOLL_CTL_ADD, listener, &ev));
printf("Main listener(%d) added to epoll!\n", epfd);
// *** Main cycle(epoll_wait)
while(1)
{
CHK2(epoll_events_count,epoll_wait(epfd, events, EPOLL_SIZE, EPOLL_RUN_TIMEOUT));
if(DEBUG_MODE) printf("Epoll events count: %d\n", epoll_events_count);
// setup tStart time
tStart = clock();
for(int i = 0; i < epoll_events_count ; i++)
{
if(DEBUG_MODE){
printf("events[%d].data.fd = %d\n", i, events[i].data.fd);
debug_epoll_event(events[i]);
}
// EPOLLIN event for listener(new client connection)
if(events[i].data.fd == listener)
{
CHK2(client,accept(listener, (struct sockaddr *) &their_addr, &socklen));
if(DEBUG_MODE) printf("connection from:%s:%d, socket assigned to:%d \n",
inet_ntoa(their_addr.sin_addr),
ntohs(their_addr.sin_port),
client);
// setup nonblocking socket
setnonblocking(client);
// set new client to event template
ev.data.fd = client;
// add new client to epoll
CHK(epoll_ctl(epfd, EPOLL_CTL_ADD, client, &ev));
// save new descriptor to further use
clients_list.push_back(client); // add new connection to list of clients
if(DEBUG_MODE) printf("Add new client(fd = %d) to epoll and now clients_list.size = %d\n",
client,
clients_list.size());
// send initial welcome message to client
bzero(message, BUF_SIZE);
res = sprintf(message, STR_WELCOME, client);
CHK2(res, send(client, message, BUF_SIZE, 0));
}else { // EPOLLIN event for others(new incoming message from client)
CHK2(res,handle_message(events[i].data.fd));
}
}
// print epoll events handling statistics
printf("Statistics: %d events handled at: %.2f second(s)\n",
epoll_events_count,
(double)(clock() - tStart)/CLOCKS_PER_SEC);
}
close(listener);
close(epfd);
return 0;
}
// *** Handle incoming message from clients
int handle_message(int client)
{
// get row message from client(buf)
// and format message to populate(message)
char buf[BUF_SIZE], message[BUF_SIZE];
bzero(buf, BUF_SIZE);
bzero(message, BUF_SIZE);
// to keep different results
int len;
// try to get new raw message from client
if(DEBUG_MODE) printf("Try to read from fd(%d)\n", client);
CHK2(len,recv(client, buf, BUF_SIZE, 0));
// zero size of len mean the client closed connection
if(len == 0){
CHK(close(client));
clients_list.remove(client);
if(DEBUG_MODE) printf("Client with fd: %d closed! And now clients_list.size = %d\n", client, clients_list.size());
// populate message around the world
}else{
if(clients_list.size() == 1) { // this means that noone connected to server except YOU!
CHK(send(client, STR_NOONE_CONNECTED, strlen(STR_NOONE_CONNECTED), 0));
return len;
}
// format message to populate
sprintf(message, STR_MESSAGE, client, buf);
// populate message around the world ;-)...
list<int>::iterator it;
for(it = clients_list.begin(); it != clients_list.end(); it++){
if(*it != client){ // ... except youself of course
CHK(send(*it, message, BUF_SIZE, 0));
if(DEBUG_MODE) printf("Message '%s' send to client with fd(%d) \n", message, *it);
}
}
if(DEBUG_MODE) printf("Client(%d) received message successfully:'%s', a total of %d bytes data...\n",
client,
buf,
len);
}
return len;
}
Главная проблема клиентской части - одновременно следить за новыми сообщения как от пользователя, так и от сервера. И я решил ее созданием двух процессов (родительского и дочернего, через fork) для того что бы:
дочерний процесс - ожидал ввода сообщения от пользователя;
родительский процесс - ожидал новых сообщений как от сервера, так и дочернего процесса используя все то же epoll.
Связь между дочерним и родительским процессом осуществляется через pipe (в 'man pipe' есть отличный пример как это сделать).
#include "local.h"
#include "utils.h"
using namespace std;
// chat message buffer
char message[BUF_SIZE];
// for debug mode
int DEBUG_MODE = 0;
/*
We use 'fork' to make two process.
Child process:
- waiting for user's input message;
- and sending all users messages to parent process through pipe.
('man pipe' has good example how to do it)
Parent process:
- wating for incoming messages(EPOLLIN):
-- from server(socket) to display;
-- from child process(pipe) to transmit to server(socket)
*/
int main(int argc, char *argv[])
{
// *** Define debug mode
// any additional parameres on startup
// i.e. like './client f' or './client debug'
// we will switch to debug mode(very simple anmd useful)
if(argc > 1) DEBUG_MODE = 1;
if(DEBUG_MODE){
printf("Debug mode is ON!\n");
printf("MAIN: argc = %d\n", argc);
for(int i=0; i<argc; i++)
printf(" argv[%d] = %s\n", i, argv[i]);
}else printf("Debug mode is OFF!\n");
// *** Define values
// socket connection with server(sock)
// process ID(pid)
// pipe between chils & parent processes(pipe_fd)
// epoll descriptor to watch events
int sock, pid, pipe_fd[2], epfd;
// define ip & ports for server(addr)
struct sockaddr_in addr;
addr.sin_family = PF_INET;
addr.sin_port = htons(SERVER_PORT);
addr.sin_addr.s_addr = inet_addr(SERVER_HOST);
// event template for epoll_ctl(ev)
// storage array for incoming events from epoll_wait(events)
// and maximum events count could be 2
// 'sock' from server and 'pipe' from parent process(user inputs)
static struct epoll_event ev, events[2]; // Socket(in|out) & Pipe(in)
ev.events = EPOLLIN | EPOLLET;
// if it's zero, we should shoud down client
int continue_to_work = 1;
// *** Setup socket connection with server
CHK2(sock,socket(PF_INET, SOCK_STREAM, 0));
CHK(connect(sock, (struct sockaddr *)&addr, sizeof(addr)) < 0);
// *** Setup pipe to send messages from child process to parent
CHK(pipe(pipe_fd));
if(DEBUG_MODE) printf("Created pipe with pipe_fd[0](read part): %d and pipe_fd[1](write part): % d\n",
pipe_fd[0],
pipe_fd[1]);
// *** Create & configure epoll
CHK2(epfd,epoll_create(EPOLL_SIZE));
if(DEBUG_MODE) printf("Created epoll with fd: %d\n", epfd);
// add server connetion(sock) to epoll to listen incoming messages from server
ev.data.fd = sock;
CHK(epoll_ctl(epfd, EPOLL_CTL_ADD, sock, &ev));
if(DEBUG_MODE) printf("Socket connection (fd = %d) added to epoll\n", sock);
// add read part of pipe(pipe_fd[0]) to epoll
// to listen incoming messages from child process
ev.data.fd = pipe_fd[0];
CHK(epoll_ctl(epfd, EPOLL_CTL_ADD, pipe_fd[0], &ev));
if(DEBUG_MODE) printf("Pipe[0] (read) with fd(%d) added to epoll\n", pipe_fd[0]);
// Fork
CHK2(pid,fork());
switch(pid){
case 0: // child process
close(pipe_fd[0]); // we dont need read pipe anymore
printf("Enter 'exit' to exit\n");
while(continue_to_work){
bzero(&message, BUF_SIZE);
fgets(message, BUF_SIZE, stdin);
// close while cycle for 'exit' command
if(strncasecmp(message, CMD_EXIT, strlen(CMD_EXIT)) == 0){
continue_to_work = 0;
// send user's message to parent process
}else CHK(write(pipe_fd[1], message, strlen(message) - 1));
}
break;
default: //parent process
close(pipe_fd[1]); // we dont need write pipe anymore
// incoming epoll_wait's events count(epoll_events_count)
// results of different functions(res)
int epoll_events_count, res;
// *** Main cycle(epoll_wait)
while(continue_to_work) {
CHK2(epoll_events_count,epoll_wait(epfd, events, 2, EPOLL_RUN_TIMEOUT));
if(DEBUG_MODE) printf("Epoll events count: %d\n", epoll_events_count);
for(int i = 0; i < epoll_events_count ; i++){
bzero(&message, BUF_SIZE);
// EPOLLIN event from server( new message from server)
if(events[i].data.fd == sock){
if(DEBUG_MODE) printf("Server sends new message!\n");
CHK2(res,recv(sock, message, BUF_SIZE, 0));
// zero size of result means the server closed connection
if(res == 0){
if(DEBUG_MODE) printf("Server closed connection: %d\n", sock);
CHK(close(sock));
continue_to_work = 0;
}else printf("%s\n", message);
// EPOLLIN event from child process(user's input message)
}else{
if(DEBUG_MODE) printf("New pipe event!\n");
CHK2(res, read(events[i].data.fd, message, BUF_SIZE));
// zero size of result means the child process going to exit
if(res == 0) continue_to_work = 0; // exit parent to
// send message to server
else{
CHK(send(sock, message, BUF_SIZE, 0));
}
}
}
}
}
if(pid){
if(DEBUG_MODE) printf("Shutting down parent!\n");
close(pipe_fd[0]);
close(sock);
}else{
if(DEBUG_MODE) printf("Shutting down child!\n");
close(pipe_fd[1]);
}
return 0;
}
открывает одновременно EPOLL_SIZE соединений с сервером (в моем случае EPOLL_SIZE = 10000);
получает по каждому соединению отдельное "приветствие сервера";
закрывает все соединения;
и выводит небольшую статистику о своей работе.
Все просто!
#include "local.h"
#include "utils.h"
using namespace std;
// to keep message from server
char message[BUF_SIZE];
// for debuf mode
int DEBUG_MODE = 0;
// to store client's sockets list
list<int> list_of_clients;
// to keep result of different functions
int res;
// to calculate the execution time of a program
clock_t tStart;
int main(int argc, char *argv[])
{
// *** Define debug mode
// any additional parameres on startup
// i.e. like './server f' or './server debug'
// we will switch to switch to debug mode(very simple anmd useful)
if(argc > 1) DEBUG_MODE = 1;
if(DEBUG_MODE){
printf("Debug mode is ON!\n");
printf("MAIN: argc = %d\n", argc);
for(int i=0; i<argc; i++)
printf(" argv[%d] = %s\n", i, argv[i]);
}else printf("Debug mode is OFF!\n");
// *** Define values
// connetion with server
int sock;
// define address & port of server
struct sockaddr_in addr;
addr.sin_family = PF_INET;
addr.sin_port = htons(SERVER_PORT);
addr.sin_addr.s_addr = inet_addr(SERVER_HOST);
// setup start time
tStart = clock();
// create EPOLL_SIZE connections with server
for(int i=0 ; i<EPOLL_SIZE; i++){
// create new socket connection with server
CHK2(sock,socket(PF_INET, SOCK_STREAM, 0));
CHK(connect(sock, (struct sockaddr *)&addr, sizeof(addr)) < 0);
list_of_clients.push_back(sock);
if(DEBUG_MODE) printf("Create new test client with fd: %d\n", sock);
// Get welcome messge from server!
bzero(&message, BUF_SIZE);
CHK2(res,recv(sock, message, BUF_SIZE, 0));
printf("%s\n", message);
}
// close all connections
list<int>::iterator it;
for(it = list_of_clients.begin(); it != list_of_clients.end() ; it++)
close(*it);
// print statistics
printf("Test passed at: %.2f second(s)\n", (double)(clock() - tStart)/CLOCKS_PER_SEC);
printf("Total server connections was: %d\n", EPOLL_SIZE);
return 0;
}
Лучше один раз увидеть, чем сто раз услышать - тут картинка с результатом работы программы-теста.
ИТОГИ:
По результатам тестов на моей машине (виртуальная CentOS 5.2 на Proxmox c оперативкой 1Гб и одним выделенным процессором) сервер обработал около 7000 соединений за четверть секунды. По моему не плохо!
Для удобства добавил все исходники в Google Code открыв новый проект sechat, тестируйте/пользуйтесь кому интересно.
Приветствуются любая критика и замечания - maksud.nurullaev[DOG]gmail.com
P.S.
если при запуске теста, система будет ругаться на большое количество одновременно открытых файлов(дескрипторов), проверьте свои лимиты через ulimit -n и измените на подходящее значение;
тексты программ изобилуют комментариями, но если будут какие то вопросы или пожелания о детализации и переводе, нет проблем, сделаю как только освобожусь в ближайшее время;
я создал проект ТОЛЬКО ЛИШЬ для тестирования epoll, а не для создания "шедеврального" чата, так что заранее прошу извинить за примитивность кода.
К моменту написания данного текста доступна (во всех смыслах) информация типа man epoll. Она включает в себя пример кода (более удобоваримый) и что-то вроде faq, "там все прекрасно описано"(цитата). Вместо солидного куска кода (зачем? каждая лишняя строка уменьшает кол-во дочитавших до конца) Было бы интересней сравнить poll, ppoll,epoll,select,pselect,... Ну, то есть тем,кому лень Стивенсов и маны читать.
У Стивенса нет информации по epoll, хотя все остальные вызовы довольно неплохо описаны. Код и описание на нормальном уровне, а вот вопрос своевременности... И еще момент: может сейчас лучше использовать libevent вместо низкоуровневых системных вызовов? И с переносимостью дела будут лучше обстоять.
>У Стивенса нет информации по epoll, хотя все остальные вызовы довольно неплохо
>описаны. Код и описание на нормальном уровне, а вот вопрос
>своевременности... И еще момент: может сейчас лучше использовать libevent вместо низкоуровневых
>системных вызовов? И с переносимостью дела будут лучше обстоять.
Точно, надо было сразу на libevent писать, как то не сообразил, перепишу как только освобожусь, а эту версию можно оставить для истории. ;-)