Ключевые слова:threads, linux, mutex, (найти похожие документы)
Date: Tue, 10 Dec 2002 16:15:22 +0500
From: Maksym Shevchenko <[email protected]>
Newsgroups: ftn.ru.unix.prog
Subject: Реализация multithreaded сервера на C++
Вот реализовал один алгоритм и решил поделится с уважаемым All.
Постановка задачи: Требуется принимать входящие соединения и асинхронно
обрабатывать принимаемые данные и посылать что-то в ответ. Язык - С++,
target platform = Linux RH 7.x.
Что я сделал:
Первое - врапперы для POSIX mutex & pthread (на самом деле остались от
старого проекта):
class CMutex
{
public:
CMutex ();
~CMutex ();
bool lock ();
bool unlock ();
void cond_wait();
void cond_timedwait(long utime);
void cond_broadcast();
private:
pthread_mutex_t m_mutex;
pthread_cond_t m_cv;
};
class CThread
{
public:
CThread();
~CThread();
static void* _start(void* lwp);
int start(); //start thread
void join(void *ptr); //wait for thread end
void cancel();
void detach(); //detach thread
void stop(); //join (NULL)
void exit(void *ptr); //terminate thread from itself
void kill();
pid_t get_pid(); // get thread pid
virtual void * Loop ()=0;
private:
static void* _start(void* lwp);
void set_pid(pid_t p) { pid = p; };
pthread_attr_t attr;
pthread_t thread;
pid_t pid;
};
Здесь остановлюсь только на реализации функции CThread::start():
int CThread::start ()
{
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_JOINABLE);
pthread_attr_setscope(&attr,PTHREAD_SCOPE_PROCESS);
{
int ret;
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE , &ret);
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &ret);
}
return pthread_create(&thread, NULL, _start, this);
}
void* CThread::_start(void* lwp)
{
((CThread*)lwp)->set_pid(getpid());
p_debug(LOG_DEBUG, "Started thread [%d]", getpid());
return ((CThread*)lwp)->Loop();
}
Скажем, мне такая реализация показалась удобной, но обсуждаемой. ;-)
Далее реализовано несколько классов:
СListener - собствеено говоря он принимает входящие соединения;
СSocket - класс отвечающий за интерфейс с открытым сокетом;
СManager - управляет жзненым циклом CListener и хранит список СSocket.
class CManager : private CMutex, public CThread
{
public:
CManager();
~CManager();
CSocket* GetSocket (const FDS& id);
void KillSocket (const FDS& id);
const FDS AddSocket (CSocket *);
void Init ();
protected:
virtual void OnNewSocket (const FDS& id, const CSocket* socketX)=0;
virtual void OnReceiveData (const FDS& id)=0;
virtual void OnCloseSocket (const FDS& id)=0;
void *Loop ();
private:
CListener* m_pListener;
map<FDS,CSocket*> m_mapSocket;
};
class CListener : public CThread
{
public:
CListener(int iPortNum, const CManager *Manager);
~CListener();
protected:
void* Loop();
int m_fds;
CManager* m_pManager;
};
class CSocket
{
public:
CSocket(int fds, const CManager* pManager );
~CSocket();
void WriteStream (string str);
string ReadStream ();
int GetFDS();
private:
int m_fds;
CManager* m_pManager;
};
Т.е. релизуется две нити: первая - для того того что бы говорить listen и
accept,
а вторая ожидать входящие данные...
и так, реализация первой:
void * CListener::Loop()
{
struct sockaddr their_addr;
int sin_size;
int new_fd;
CSocket *pSocket;
if (::setpgrp () < 0)
{
p_debug (LOG_ERR, "setpgrp(): %s", strerror (errno));
::exit (-1);
}
while ( .... )
{
::listen(m_fds, LISTEN_BACKLOGSIZE);
sin_size = sizeof(struct sockaddr_in);
new_fd = ::accept(m_fds, &their_addr, (socklen_t *)&sin_size);
if (new_fd > 0)
{
pSocket = new CSocket(new_fd, m_pManager);
if ( (new_fd = m_pManager->AddSocket(pSocket)) == 0 )
{
p_debug(LOG_WARNING, "Can't AddSocket(0x%X) fd=0x%X", pSession, new_fd);
delete pSession; /* CSocket shall close new_fd */
}
}
else
{
p_debug(LOG_WARNING, "accept(): %s", strerror(errno));
}
}
return NULL;
}
и релизация второй нити:
void * CManager::Loop()
{
while ( .... )
{
struct pollfd* pfd;
int size = 0;
map<FDS,CSocket*>::iterator itr = m_mapSocket.begin();
if ( (size = m_mapSocket.size() ) > 0 )
{
lock();
pfd = (struct pollfd[])malloc(sizeof(struct pollfd[size]));
bzero(pfd, sizeof(struct pollfd[size]));
for (struct pollfd *ptr = pfd ; itr!=m_mapSocket.end(); itr++)
{
ptr->fd = (int)(itr->first);
ptr->events = POLLIN|POLLOUT|POLLHUP|POLLERR;
ptr++;
}
unlock();
if ((poll (pfd, size, POLLTIME)) < 0)
{
p_debug (LOG_WARNING, "poll(): %s", strerror (errno));
}
for (int i=0 ; i < size; i++)
{
if ((pfd[i].fd) > 0)
{
if (pfd[i].revents & POLLHUP || pfd[i].revents & POLLERR)
/* Socket hung-up */
{
p_debug(LOG_DEBUG, "Loop() got POLLHUP||POLLERR on fds=[%d]",(FDS)pfd[i].fd);
OnKillSocket((FDS)pfd[i].fd);
KillSocket((FDS)pfd[i].fd);
}
if (pfd[i].revents & POLLIN &&
(m_mapSocket.find((FDS)pfd[i].fd) != m_mapSocket.end()))
/* Got data & CSocket exist*/
{
p_debug(LOG_DEBUG, "Loop() got POLLIN on fds=[%d]",(FDS)pfd[i].fd);
OnReceiveData((FDS)pfd[i].fd);
}
}
}
free(pfd);
}
else
{
sleep(1); /* we have no opened sockets... what about using mutex here? */
}
}
}
return NULL;
}
Ранее был реализован вариант с ожиданием изменения состояния сокета через
sigwaitinfo(...) после того как с полученым сокетом (в CSocket::CSocket)
проводились следующие манипуляции:
if (fcntl(m_fds, F_SETOWN, m_pManager->get_pid()) < 0)
{
p_debug (LOG_ERR, "fcntl(F_SETOWN) : %s", ::strerror(errno));
::exit (1);
}
if (fcntl (m_fds, F_SETSIG, SIGRTMIN) < 0)
{
p_debug (LOG_ERR, "fcntl(F_SETSIG) : %s", ::strerror(errno));
::exit (1);
}
но к превеликому сожалению этот вариант имеет несколько недостатков. Во
первых этот вариант работает только тогда, когда pthreads реализованы как в
виде отдельных процессов. Во вторых fcntl(..,F_SETOWN,..) можно сделать
только от root'а. И в третьих, как не прискорбно, тестирование на 10000 и
более входящих соединениях показало что сигнал все таки облодает свойством
терятся.
Теперь смотрите как это используется:
class CServer : public CManager
{
void OnNewSocket (const FDS& id, const CSocket* socketX)
{
cout << "incomming fds=" << id << endl;
}
void OnReceiveData (const FDS& id)
{
GetSocket(id)->WriteStream(GetSocket(id)->ReadStream() + " processed by server.");
}
void OnCloseSocket (const FDS& id)
{
cout << "gone fds=" << id << endl;
}
}
Мне понравилось.
Почему я не рализовал listen в том же poll? Очень просто - представьте что
CManager'ов несколько, а CListener делает round-robin between them. T.е.
CManager::m_pListener становится статик и в CManager добовляется static
set<CManager *> m_setManager. Из конструктора CManager добовляем this в
m_setManger, из деструктора его соответсвенно убиваем. А в CListener::Loop
добавляется set<CManager*>::iterator который говорит куда мы добавляем вновь
созданый сокет.
мде... очень так себе. Зачем тут вообще 2 нити? Можно всё одной делать. да и спецально для линукса(как и для фри, соляры, etc.) есть весьма лучшие версии механизмов пулинга. Передавать в CSocket::WriteStream std::string по значению - вообще ахтунг. После этого дальше не стал копать. Вобщем низачот.