The OpenNET Project / Index page

[ новости /+++ | форум | теги | ]

Реализация multithreaded сервера на C++ (threads linux mutex)


<< Предыдущая ИНДЕКС Поиск в статьях src Установить закладку Перейти на закладку Следующая >>
Ключевые слова: threads, linux, mutex,  (найти похожие документы)
Date: Tue, 10 Dec 2002 16:15:22 +0500 From: Maksym Shevchenko <[email protected]> Newsgroups: ftn.ru.unix.prog Subject: Реализация multithreaded сервера на C++ Вот реализовал один алгоритм и решил поделится с уважаемым All. Постановка задачи: Требуется принимать входящие соединения и асинхронно обрабатывать принимаемые данные и посылать что-то в ответ. Язык - С++, target platform = Linux RH 7.x. Что я сделал: Первое - врапперы для POSIX mutex & pthread (на самом деле остались от старого проекта): class CMutex { public: CMutex (); ~CMutex (); bool lock (); bool unlock (); void cond_wait(); void cond_timedwait(long utime); void cond_broadcast(); private: pthread_mutex_t m_mutex; pthread_cond_t m_cv; }; class CThread { public: CThread(); ~CThread(); static void* _start(void* lwp); int start(); //start thread void join(void *ptr); //wait for thread end void cancel(); void detach(); //detach thread void stop(); //join (NULL) void exit(void *ptr); //terminate thread from itself void kill(); pid_t get_pid(); // get thread pid virtual void * Loop ()=0; private: static void* _start(void* lwp); void set_pid(pid_t p) { pid = p; }; pthread_attr_t attr; pthread_t thread; pid_t pid; }; Здесь остановлюсь только на реализации функции CThread::start(): int CThread::start () { pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_JOINABLE); pthread_attr_setscope(&attr,PTHREAD_SCOPE_PROCESS); { int ret; pthread_setcancelstate(PTHREAD_CANCEL_ENABLE , &ret); pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &ret); } return pthread_create(&thread, NULL, _start, this); } void* CThread::_start(void* lwp) { ((CThread*)lwp)->set_pid(getpid()); p_debug(LOG_DEBUG, "Started thread [%d]", getpid()); return ((CThread*)lwp)->Loop(); } Скажем, мне такая реализация показалась удобной, но обсуждаемой. ;-) Далее реализовано несколько классов: СListener - собствеено говоря он принимает входящие соединения; СSocket - класс отвечающий за интерфейс с открытым сокетом; СManager - управляет жзненым циклом CListener и хранит список СSocket. class CManager : private CMutex, public CThread { public: CManager(); ~CManager(); CSocket* GetSocket (const FDS& id); void KillSocket (const FDS& id); const FDS AddSocket (CSocket *); void Init (); protected: virtual void OnNewSocket (const FDS& id, const CSocket* socketX)=0; virtual void OnReceiveData (const FDS& id)=0; virtual void OnCloseSocket (const FDS& id)=0; void *Loop (); private: CListener* m_pListener; map<FDS,CSocket*> m_mapSocket; }; class CListener : public CThread { public: CListener(int iPortNum, const CManager *Manager); ~CListener(); protected: void* Loop(); int m_fds; CManager* m_pManager; }; class CSocket { public: CSocket(int fds, const CManager* pManager ); ~CSocket(); void WriteStream (string str); string ReadStream (); int GetFDS(); private: int m_fds; CManager* m_pManager; }; Т.е. релизуется две нити: первая - для того того что бы говорить listen и accept, а вторая ожидать входящие данные... и так, реализация первой: void * CListener::Loop() { struct sockaddr their_addr; int sin_size; int new_fd; CSocket *pSocket; if (::setpgrp () < 0) { p_debug (LOG_ERR, "setpgrp(): %s", strerror (errno)); ::exit (-1); } while ( .... ) { ::listen(m_fds, LISTEN_BACKLOGSIZE); sin_size = sizeof(struct sockaddr_in); new_fd = ::accept(m_fds, &their_addr, (socklen_t *)&sin_size); if (new_fd > 0) { pSocket = new CSocket(new_fd, m_pManager); if ( (new_fd = m_pManager->AddSocket(pSocket)) == 0 ) { p_debug(LOG_WARNING, "Can't AddSocket(0x%X) fd=0x%X", pSession, new_fd); delete pSession; /* CSocket shall close new_fd */ } } else { p_debug(LOG_WARNING, "accept(): %s", strerror(errno)); } } return NULL; } и релизация второй нити: void * CManager::Loop() { while ( .... ) { struct pollfd* pfd; int size = 0; map<FDS,CSocket*>::iterator itr = m_mapSocket.begin(); if ( (size = m_mapSocket.size() ) > 0 ) { lock(); pfd = (struct pollfd[])malloc(sizeof(struct pollfd[size])); bzero(pfd, sizeof(struct pollfd[size])); for (struct pollfd *ptr = pfd ; itr!=m_mapSocket.end(); itr++) { ptr->fd = (int)(itr->first); ptr->events = POLLIN|POLLOUT|POLLHUP|POLLERR; ptr++; } unlock(); if ((poll (pfd, size, POLLTIME)) < 0) { p_debug (LOG_WARNING, "poll(): %s", strerror (errno)); } for (int i=0 ; i < size; i++) { if ((pfd[i].fd) > 0) { if (pfd[i].revents & POLLHUP || pfd[i].revents & POLLERR) /* Socket hung-up */ { p_debug(LOG_DEBUG, "Loop() got POLLHUP||POLLERR on fds=[%d]",(FDS)pfd[i].fd); OnKillSocket((FDS)pfd[i].fd); KillSocket((FDS)pfd[i].fd); } if (pfd[i].revents & POLLIN && (m_mapSocket.find((FDS)pfd[i].fd) != m_mapSocket.end())) /* Got data & CSocket exist*/ { p_debug(LOG_DEBUG, "Loop() got POLLIN on fds=[%d]",(FDS)pfd[i].fd); OnReceiveData((FDS)pfd[i].fd); } } } free(pfd); } else { sleep(1); /* we have no opened sockets... what about using mutex here? */ } } } return NULL; } Ранее был реализован вариант с ожиданием изменения состояния сокета через sigwaitinfo(...) после того как с полученым сокетом (в CSocket::CSocket) проводились следующие манипуляции: if (fcntl(m_fds, F_SETOWN, m_pManager->get_pid()) < 0) { p_debug (LOG_ERR, "fcntl(F_SETOWN) : %s", ::strerror(errno)); ::exit (1); } if (fcntl (m_fds, F_SETSIG, SIGRTMIN) < 0) { p_debug (LOG_ERR, "fcntl(F_SETSIG) : %s", ::strerror(errno)); ::exit (1); } но к превеликому сожалению этот вариант имеет несколько недостатков. Во первых этот вариант работает только тогда, когда pthreads реализованы как в виде отдельных процессов. Во вторых fcntl(..,F_SETOWN,..) можно сделать только от root'а. И в третьих, как не прискорбно, тестирование на 10000 и более входящих соединениях показало что сигнал все таки облодает свойством терятся. Теперь смотрите как это используется: class CServer : public CManager { void OnNewSocket (const FDS& id, const CSocket* socketX) { cout << "incomming fds=" << id << endl; } void OnReceiveData (const FDS& id) { GetSocket(id)->WriteStream(GetSocket(id)->ReadStream() + " processed by server."); } void OnCloseSocket (const FDS& id) { cout << "gone fds=" << id << endl; } } Мне понравилось. Почему я не рализовал listen в том же poll? Очень просто - представьте что CManager'ов несколько, а CListener делает round-robin between them. T.е. CManager::m_pListener становится статик и в CManager добовляется static set<CManager *> m_setManager. Из конструктора CManager добовляем this в m_setManger, из деструктора его соответсвенно убиваем. А в CListener::Loop добавляется set<CManager*>::iterator который говорит куда мы добавляем вновь созданый сокет.

<< Предыдущая ИНДЕКС Поиск в статьях src Установить закладку Перейти на закладку Следующая >>

Обсуждение [ RSS ]
  • 1.1, Evgen (??), 16:04, 16/04/2006 [ответить]  
  • +/
    Конструктор Класса
    CSocket(int fds, const CManager* pManager );

    void * CListener::Loop()
    {.....
    pSocket = new CSocket(new_fd, m_pManager);
    .....}

    Вот не пойму, что можнет сделать объект класса CSocket, зная какой объект CManager им управляет? Чтобы было? :)

     
  • 1.2, R0lanD (?), 17:43, 17/04/2006 [ответить]  
  • +/
    Разрегистрироватся в нем на удалении.
     
  • 1.3, Evgen (??), 01:18, 20/04/2006 [ответить]  
  • +/
    Разве Managet::Loop не занимается всем эти действиями?
    Зачем CSocket::~CSocket() разрегистрироватся в CManeger? Когда такие ситуации могут быть :-/
     
  • 1.4, Maksym (?), 16:48, 21/04/2006 [ответить]  
  • +/
    На всякий пожарный случай. ;o)
     
  • 1.5, Evgen (??), 01:27, 26/04/2006 [ответить]  
  • +/
    Ок. Понял :)
     
  • 1.6, Kriz (??), 12:40, 15/02/2007 [ответить]  
  • +/
    мде... очень так себе. Зачем тут вообще 2 нити? Можно всё одной делать. да и спецально для линукса(как и для фри, соляры, etc.) есть весьма лучшие версии механизмов пулинга. Передавать в CSocket::WriteStream std::string по значению - вообще ахтунг. После этого дальше не стал копать. Вобщем низачот.
     

    игнорирование участников | лог модерирования

     Добавить комментарий
    Имя:
    E-Mail:
    Заголовок:
    Текст:




    Партнёры:
    PostgresPro
    Inferno Solutions
    Hosting by Hoster.ru
    Хостинг:

    Закладки на сайте
    Проследить за страницей
    Created 1996-2024 by Maxim Chirkov
    Добавить, Поддержать, Вебмастеру