The OpenNET Project / Index page

[ новости /+++ | форум | теги | ]

форумы  помощь  поиск  регистрация  майллист  вход/выход  слежка  RSS
"Сервер с 3мя открытыми портами и select()"
Вариант для распечатки  
Пред. тема | След. тема 
Форумы Программирование под UNIX (Public)
Изначальное сообщение [ Отслеживать ]

"Сервер с 3мя открытыми портами и select()"  +/
Сообщение от socket_select_nonblock on 15-Сен-09, 17:23 
Hello, big bro..
Есть проблема..
Делаю сервер с 3 портами, работающими каждый независимо друг от друга...
Использую неблокируюшие сокеты + select. Возникает следующая проблема - после добавления слушающих сокетов в множество селект прекрасно разбирает входящие соединения и разруливает их, нок  примеру если мне необходимо принимать данные от нескольких клиентов (одновременно), после вызова accept только первый клиент посылает данные (на примере эхо-сервера и телнета проверил), а остальные хоть и подключаются, но при вводе строк не получают ответа.. Как только первый клиент отрубается, начинают работать остальные... Как поправить, где пропустил чего?
--
функция принимает 3 порта, функция makeServerSocket принимает порт и возвращает дискриптор слушающего сокета (в ней выполняются последовательно socket, bind, listen).
В этой функции также включен nonblocking mode через fcntl.
Спс :)
P.S. всякие kqueue, epoll и пр. не предлагать плиз взамен этому. Ибо геморрно и разбираться надо :( fork тоже не катит, как и треды...
int startServer(int emulator_port, int controller_port, int system_port)
{
    struct timeval timeout;
    fd_set master_set;
    fd_set working_set;
    char buffer[MAXLINE+1];
    int end_server=0;
    int close_conn;
    int rc,desc_ready;
    int max_sd,new_sd;
    int i,len;
    int emu_sock, contr_sock, sys_sock;
    FD_ZERO(&master_set);
    emu_sock=makeServerSocket(emulator_port);
    FD_SET(emu_sock,&master_set);
    contr_sock=makeServerSocket(controller_port);
    FD_SET(contr_sock,&master_set);
    sys_sock=makeServerSocket(system_port);
    FD_SET(sys_sock,&master_set);
    max_sd=emu_sock+contr_sock+sys_sock;
    do
    {
        memcpy(&working_set,&master_set,sizeof(master_set));
        printf("Waiting on select()...\n");
        rc=select(max_sd+1,&working_set,NULL,NULL,NULL);
        if(rc<0)
        {
            perror("select() failed");
            break;
        }
        if(rc==0)
        {
            printf("select() timed out. End program.\n");
            break;
        }
        desc_ready=rc;
        for(i=0;i<=max_sd && desc_ready>0;++i)
        {
            if(FD_ISSET(i,&working_set))
            {
                desc_ready-=1;
                if(i==emu_sock || i==contr_sock || i==sys_sock)
                {
                    
                    do
                    {
                        if(i==emu_sock)
                        {
                            new_sd=accept(i,NULL,NULL);
                            if(fcntl(i,F_SETFL,O_NONBLOCK)<0)
                            {
                                perror("fcntl failed");
                                return -1;
                            }
                            printf("Emulator socket is readable\n");
                        }
                        else if(i==contr_sock)
                        {
                            new_sd=accept(i,NULL,NULL);
                            if(fcntl(i,F_SETFL,O_NONBLOCK)<0)
                            {
                                perror("fcntl failed");
                                return -1;
                            }
                            printf("Controller socket is readable\n");
                        }
                        else
                        {
                            new_sd=accept(i,NULL,NULL);
                            if(fcntl(i,F_SETFL,O_NONBLOCK)<0)
                            {
                                perror("fcntl failed");
                                return -1;
                            }
                            printf("System socket is readable\n");
                        }
                        if(new_sd<0)
                        {
                            if(errno!=EWOULDBLOCK)
                            {
                                perror("accept() failed");
                                end_server=1;
                            }
                            break;
                        }
                        printf("New incoming connection - %d\n",new_sd);
                        FD_SET(new_sd,&master_set);
                        if(new_sd>max_sd)
                            max_sd=new_sd;
                    } while (new_sd!=-1);
                }
                else
                {
                    printf("Descriptor %d is readable\n",i);
                    close_conn=0;
                    do
                    {
                        rc=recv(i,buffer,sizeof(buffer),0);
                        if(rc<0)
                        {
                            if(errno!=EWOULDBLOCK)
                            {
                                perror("recv() failed");
                                close_conn=1;
                            }
                            break;
                        }
                        if(rc==0)
                        {
                            printf("Connection closed\n");
                            close_conn=1;
                            break;
                        }
                        len=rc;
                        printf("%d bytes received\n",len);
                        rc=send(i,buffer,len,0);
                        if(rc<0)
                        {
                            perror("send() failed");
                            close_conn=1;
                            break;
                        }
                    } while (1);
                    if(close_conn)
                    {
                        close(i);
                        FD_CLR(i,&master_set);
                        if(i==max_sd)
                        {
                            while(FD_ISSET(max_sd,&master_set) == 0)
                                max_sd-=1;
                        }
                    }
                }
            }
        }
    } while(!end_server);
    
    for(i=0;i<=max_sd;++i)
    {
        if(FD_ISSET(i,&master_set))
            close(i);
    }
    
    return 0;
}
Высказать мнение | Ответить | Правка | Cообщить модератору

Оглавление

Сообщения по теме [Сортировка по времени | RSS]


1. "Сервер с 3мя открытыми портами и select()"  +/
Сообщение от Andrey (??) on 15-Сен-09, 20:30 
У вас один тред который всё слушает и обрабатывает, значит когда первый клиент начинает слать данные то этот тред занят с этим клиентом и физически не может отвечать на другие запросы, как только с первым клиентом разобрались перешли к следующему в очереди.
По моему всё работает правильно, с одним тредом по другому сделать сложно.
Высказать мнение | Ответить | Правка | Наверх | Cообщить модератору

2. "Сервер с 3мя открытыми портами и select()"  +/
Сообщение от socket_select_nonblock on 15-Сен-09, 20:47 
>У вас один тред который всё слушает и обрабатывает, значит когда первый
>клиент начинает слать данные то этот тред занят с этим клиентом
>и физически не может отвечать на другие запросы, как только с
>первым клиентом разобрались перешли к следующему в очереди.
>По моему всё работает правильно, с одним тредом по другому сделать сложно.
>

Значит после того, как FD_ISSET просигналил о появлении соединения, нужно создавать отдельный поток и там уже вызывать функцию accept(), правильно я понимаю?

Высказать мнение | Ответить | Правка | Наверх | Cообщить модератору

3. "Сервер с 3мя открытыми портами и select()"  +/
Сообщение от DeadMustdie email(??) on 15-Сен-09, 21:12 
> Значит после того, как FD_ISSET просигналил о появлении соединения, нужно создавать
>отдельный поток и там уже вызывать функцию accept(), правильно я понимаю?

Это один из вариантов. Есть как минимум ещё два стандартных пути:

  - вместо потоков для обработки данных каждого соединения использовать дочерние процессы

  - использовать более совершенные по сравнению с select() механизмы (ну хотя бы вызов poll()) и обеспечить переключение контекста в одном-единственном потоке, с ожиданием активности клиентов и их готовности к приёму данных

Высказать мнение | Ответить | Правка | Наверх | Cообщить модератору

4. "Сервер с 3мя открытыми портами и select()"  +/
Сообщение от elvenic (ok) on 16-Сен-09, 00:24 
>[оверквотинг удален]
>>отдельный поток и там уже вызывать функцию accept(), правильно я понимаю?
>
>Это один из вариантов. Есть как минимум ещё два стандартных пути:
>
>  - вместо потоков для обработки данных каждого соединения использовать дочерние
>процессы
>
>  - использовать более совершенные по сравнению с select() механизмы (ну
>хотя бы вызов poll()) и обеспечить переключение контекста в одном-единственном потоке,
>с ожиданием активности клиентов и их готовности к приёму данных

И есть "самый" стандартный вариант :), который, очевидно и был нужен автору: не нужно цикла do-while вокруг recv()+send(), а вместо этого, нужно:

а) создавать для каждого сокета свой буффер и запоминать сколько уже данных записано в каждый буффер (текущая позиция в буфере, начальное значение 0),
б) для каждого сокета готового к чтению, прочитать (recv()) (без цикла - один раз!) столько, сколько сокет отдаст, в буффер этого сокета начиная с текущей позиции, и потом увеличить текущую позицию на количество байт реально прочитанных в этот раз (значение возвращаемое recv()), чтобы в следующий раз дописать в буфер новые данные, а не затереть уже прочитанные;
в) когда все ожидаемые данные прочитаны из конкретного сокета (или по длинне данных, или по какому-то спец. символу, например, NewLine), заносить дескрипторы таких сокетов во второй FD_SET и подавать этот FD_SET в 3-й параметер select() (see man select). Ну и, понятно, проверять в цикле готовность сокетов из этого FD_SET к записи;
г) для сокетов готовых к записи, так же создавать буферы с исходящими данными, с запоминанием текущей позиции для каждого буфера,
д) и точно так же как и при чтении сокетов, для каждого сокета готового к записи, записать (send()) (без цикла - один раз!) столько, сколько сокет возьмет, из его исходящего буфера, начиная с его текущей позиции, и увеличить эту текущую позицию на количество реально в этот раз записанных байт (значение возвращаемое send()), чтобы в следующий раз записать в сокет следующую порцию данных.

Конечно, возможны варианты разной степени сложности, но основы работы с select() или poll() и иже с ними именно такие.

Высказать мнение | Ответить | Правка | Наверх | Cообщить модератору

5. "Сервер с 3мя открытыми портами и select()"  +/
Сообщение от elvenic (ok) on 16-Сен-09, 01:01 
Прочитал сам свое сообщение, по-моему вышло не совсем понятно. Короче, цикл должен быть один, вокруг select(), и без вложеных циклов. Когда select() сработает, нужно проверять готовность всех сокетов на чтение/запись/ошибку (из FD_SET'ов которые были поданы как 2-й, 3-й и 4-и параметер select()'у. Для каждого активного сокета, должен быть свой, отдельный, буффер с данными, и запомнена текущая позиция в каждом буфере, которая нужна чтобы точно знать сколько мы уже прочитали из сокета (если мы сейчас читаем из этого сокета) или записали (если мы сейчас пишем в этот сокет). Нужен также некий протокол который определяет когда все ожидаемые данные прочитаны из конкретного сокета (напр. NewLine или фиксированная длинна, или, как в HTTP, длинна данных передается в начале самих данных). Сразу после accept(), сокет находится в режиме чтения, когда (по протоколу) все данные прочитаны, сокет переводится в режим записи. Вот, такое вот дополнение к моему сообщению, может, так яснее.

А так как реализовано автором, несмотря на то что сокеты в non-blocking режиме, вложеные циклы фактически реализуют блокировки для каждого сокета который только сказал что у него есть начало входящих данных. И кроме того, так как эти циклы написаны, если сеть решит передать данные в два приема, так что первый recv() прочитает только часть данных (а так вполне может случится, ведь сокет non-blocking, а сети никто не указ), второй recv() затрет в буфере данные прочитанные первым recv()'ом - оба recv()'а пишут с самого начала буфера.

Высказать мнение | Ответить | Правка | Наверх | Cообщить модератору

6. "Сервер с 3мя открытыми портами и select()"  +/
Сообщение от socket_select_nonblock on 16-Сен-09, 06:44 
>[оверквотинг удален]
>>отдельный поток и там уже вызывать функцию accept(), правильно я понимаю?
>
>Это один из вариантов. Есть как минимум ещё два стандартных пути:
>
>  - вместо потоков для обработки данных каждого соединения использовать дочерние
>процессы
>
>  - использовать более совершенные по сравнению с select() механизмы (ну
>хотя бы вызов poll()) и обеспечить переключение контекста в одном-единственном потоке,
>с ожиданием активности клиентов и их готовности к приёму данных

Потоки нужны, ибо планируется обеспечить взаимодействие между клиентами на разных портах, через процессы это вроде тяжело реализовать...

P.S. Может есть у кого примеры серверов с несколькими работающими портами и вызовами типа select\poll\epoll...?

Высказать мнение | Ответить | Правка | Наверх | Cообщить модератору

7. "Сервер с 3мя открытыми портами и select()"  +/
Сообщение от Andrey (??) on 16-Сен-09, 07:14 
>>[оверквотинг удален]
>Потоки нужны, ибо планируется обеспечить взаимодействие между клиентами на разных портах,
> через процессы это вроде тяжело реализовать...

Через процессы можно использовать IPC, но у меня опыта в IPC нету, поэтому тоже кажеться что сложно :-)

>P.S. Может есть у кого примеры серверов с несколькими работающими портами и
>вызовами типа select\poll\epoll...?

Примеров у меня нету, но могу посоветовать использовать какую-нибудь библиотеку, котороя умеет делать мультплексирование и прочие низкоуровневые штуки.
для C++ можно посмотреть ACE Framework (http://www.cs.wustl.edu/~schmidt/ACE.html) или POCO C++ (http://pocoproject.org/), обе библиотеки имеют примеры.
для C можно использовать glib (http://library.gnome.org/devel/glib/2.20/), я думаю что найдётся много OpenSource прокетов где можно посмотреть примеры использования glib, например pidgin использует glib


Высказать мнение | Ответить | Правка | Наверх | Cообщить модератору

8. "Сервер с 3мя открытыми портами и select()"  +/
Сообщение от const86 (ok) on 16-Сен-09, 11:51 
>Потоки нужны, ибо планируется обеспечить взаимодействие между клиентами на разных портах, через процессы это вроде тяжело реализовать...

Потоки нужны, если обработка тяжёлая и хочется заюзать несколько процов. Количество потоков определяется количеством процов. А клиентов всех засовывать в select (poll/epoll...) и обрабатывать асинхронно, это никак не помешает никаким взаимодействиям.

Высказать мнение | Ответить | Правка | Наверх | Cообщить модератору

9. "Сервер с 3мя открытыми портами и select()"  +/
Сообщение от socket_select_nonblock on 16-Сен-09, 20:46 
Фуф, вроде справился, правда код вырос в 2 раза, но основную цель выполняет - есть 3 открытых порта, к к-ым может прицепиться множество народу на каждый, и будут они работать все вместе..
И вообщем ушел я от селектов...
Сделал на epoll, вроде он побыстрее будет судя по диаграммкам С10К...
Кого заинтересует - могу выложить код, не жалко.. Сам долго гуглил на более-менее нормальные примеры epoll.
Высказать мнение | Ответить | Правка | Наверх | Cообщить модератору

10. "Сервер с 3мя открытыми портами и select()"  +/
Сообщение от nikll email on 17-Сен-09, 13:10 
>Фуф, вроде справился, правда код вырос в 2 раза, но основную цель
>выполняет - есть 3 открытых порта, к к-ым может прицепиться множество
>народу на каждый, и будут они работать все вместе..
>И вообщем ушел я от селектов...
>Сделал на epoll, вроде он побыстрее будет судя по диаграммкам С10К...
>Кого заинтересует - могу выложить код, не жалко.. Сам долго гуглил на
>более-менее нормальные примеры epoll.

Выложи плиз :), интересно стало.


Высказать мнение | Ответить | Правка | Наверх | Cообщить модератору

11. "Сервер с 3мя открытыми портами и select()"  +/
Сообщение от socket_select_nonblock on 17-Сен-09, 13:51 
>Выложи плиз :), интересно стало.

Держи, не жалко. Думаю, разберешься что куда...
Это не итоговая версия - здесь много сообщений для отслеживания ошибок..

int startServer(int emulator_port, int controller_port, int system_port)
{
    int emu_sock=0, contr_sock=0, sys_sock=0;
    int emu_fd, contr_fd, sys_fd;
    int epfd=0;
    struct sockaddr_in my_addr,emu_addr,contr_addr,sys_addr;
    int res=0,res1=0,res2=0;
    int i=0;
    int new_fd,kdpfd,ret;
    static struct epoll_event ev;
    static struct epoll_event events[EPOLL_QUEUE_LEN]; /*When no size, error - Bad address*/
    int len=sizeof(struct sockaddr_in);
    int curfds,nfds;
    int n;
    
    if((emu_sock=makeServerSocket(emulator_port))==-1)
    {
        perror("error emulator listen");
        return -1;
    }
    if((contr_sock=makeServerSocket(controller_port))==-1)
    {
        perror("error controller listen");
        return -1;
    }
    if((sys_sock=makeServerSocket(system_port))==-1)
    {
        perror("error system listen");
        return -1;
    }
    if((epfd=epoll_create(EPOLL_QUEUE_LEN))==-1)
    {
        perror("epoll_create error");
        return -1;
    }
    ev.events= EPOLLIN | EPOLLET;
    ev.data.fd=emu_sock;
    if(epoll_ctl(epfd,EPOLL_CTL_ADD,emu_sock,&ev)<0)
    {
        perror("epoll_ctl error");
        return -1;
    }
    ev.data.fd=contr_sock;
    if(epoll_ctl(epfd,EPOLL_CTL_ADD,contr_sock,&ev)<0)
    {
        perror("epoll_ctl error");
        return -1;
    }
    ev.data.fd=sys_sock;
    if(epoll_ctl(epfd,EPOLL_CTL_ADD,sys_sock,&ev)<0)
    {
        perror("epoll_ctl error");
        return -1;
    }
    curfds=1;
    while(1)
    {
        /*waiting for some operations on socket*/

        nfds=epoll_wait(epfd,events,curfds,-1);
        if(nfds==-1)
        {
            perror("inside error in epoll_wait");
            break;
        }
        for (n = 0; n < nfds; ++n)
        {
            if (events[n].data.fd == emu_sock)
            {
                emu_fd = accept(emu_sock, (struct sockaddr *)&emu_addr,&len);
                
                if (emu_fd < 0)
                {
                    perror("accept from emu_sock");
                    continue;
                }
                else
                    printf("Connection allocated socket is:%d\n", emu_fd);
                setNonBlocking(emu_fd);
                ev.events = EPOLLIN | EPOLLET;
                ev.data.fd = emu_fd;
                if (epoll_ctl(epfd, EPOLL_CTL_ADD, emu_fd, &ev) < 0)
                {
                    perror("inside error in epoll_wait");
                    fprintf(stderr, "to socket %d to join epoll failed!%s\n",emu_fd, strerror(errno));
                    return -1;
                }
                curfds++;
            }
            else if (events[n].data.fd == contr_sock)
            {
                contr_fd = accept(contr_sock, (struct sockaddr *)&contr_addr,&len);
                
                if (contr_fd < 0)
                {
                    perror("accept from contr_sock");
                    continue;
                }
                else
                    printf("Connection allocated socket is:%d\n", contr_fd);
                setNonBlocking(contr_fd);
                ev.events = EPOLLIN | EPOLLET;
                ev.data.fd = contr_fd;
                if (epoll_ctl(epfd, EPOLL_CTL_ADD, contr_fd, &ev) < 0)
                {
                    fprintf(stderr, "to socket %d to join epoll failed!%s\n",contr_fd, strerror(errno));
                    return -1;
                }
                curfds++;
            }  
            else if (events[n].data.fd == sys_sock)
            {
                sys_fd= accept(events[n].data.fd, (struct sockaddr *)&sys_addr,&len);
                
                if (sys_fd < 0)
                {
                    perror("accept from sys_sock");
                    continue;
                }
                else
                    printf("Connection allocated socket is:%d\n", sys_fd);
                setNonBlocking(sys_fd);    
                ev.events = EPOLLIN | EPOLLET;
                ev.data.fd = sys_fd;
                if ((epoll_ctl(epfd, EPOLL_CTL_ADD, sys_fd, &ev))==-1)
                {
                    fprintf(stderr, "To socket [%d] to join epoll failed! %s\n",sys_fd, strerror(errno));
                    return -1;
                }
                curfds++;
            }
            else if(events[n].data.fd == emu_fd)
            {
                printf("Descriptor readable from EMULATOR accept\n");
                ret = handle_message_emulator(events[n].data.fd);
                if (ret < 1 && errno != 11)
                {
                    epoll_ctl(epfd, EPOLL_CTL_DEL, events[n].data.fd,&ev);
                    curfds--;
                }
            }
            else if(events[n].data.fd == contr_fd)
            {
                printf("Descriptor readable from CONTROLLER accept\n");
                ret = handle_message_controller(events[n].data.fd);
                if (ret < 1 && errno != 11)
                {
                    epoll_ctl(epfd, EPOLL_CTL_DEL, events[n].data.fd,&ev);
                    curfds--;
                }
            }
            else if(events[n].data.fd == sys_fd)
            {
                printf("Descriptor readable from SYSTEM accept\n");
                ret = handle_message_system(events[n].data.fd);
                if (ret < 1 && errno != 11)
                {
                    epoll_ctl(epfd, EPOLL_CTL_DEL, events[n].data.fd,&ev);
                    curfds--;
                }
            }
            else
            {
                printf("UNKNOWN readable descriptor\n");
                ret = handle_message(events[n].data.fd);
                if (ret < 1 && errno != 11)
                {
                    epoll_ctl(epfd, EPOLL_CTL_DEL, events[n].data.fd,&ev);
                    curfds--;
                }
            }
        }
        
    }
    
    return 0;
}
Здесь handle_message_* - свои обработчики для каждого accept'a
Пример обычного обработчика привожу ниже...

int handle_message(int new_fd)
{
    char buf[MAXBUF + 1];
    int rlen;
    bzero(buf, MAXBUF + 1);
    rlen = recv(new_fd, buf, MAXBUF, 0);
    if (rlen > 0)
        printf ("Descriptor [%d] receives the message successfully: %s . Total of %d bytes of data \n", new_fd, buf, rlen);
    else {
        if (rlen < 0)
            printf ("Message reception failed! Error code is %d, error message is %s \n", errno, strerror(errno));
        close(new_fd);
        return -1;
    }
    return rlen;
}

На здоровье ;)

Высказать мнение | Ответить | Правка | Наверх | Cообщить модератору

Архив | Удалить

Индекс форумов | Темы | Пред. тема | След. тема




Партнёры:
PostgresPro
Inferno Solutions
Hosting by Hoster.ru
Хостинг:

Закладки на сайте
Проследить за страницей
Created 1996-2024 by Maxim Chirkov
Добавить, Поддержать, Вебмастеру