The OpenNET Project / Index page

[ новости /+++ | форум | теги | ]

форумы  помощь  поиск  регистрация  майллист  вход/выход  слежка  RSS
"stream server с использованием epoll на c"
Вариант для распечатки  
Пред. тема | След. тема 
Форум Программирование под UNIX (C/C++)
Изначальное сообщение [ Отслеживать ]

"stream server с использованием epoll на c"  +/
Сообщение от NEO (ok) on 01-Мрт-13, 11:13 
Добрый день. Пишу программу, которая читает поток текста, потом в соответствии с правилом записывает его в определенное хранилище..
Проблема в том ,что после первого коннекта,второй и третий не всегда срабатывают,нужно отключить первый ,чтоб очередь дошла до них... Но когда коннектишся ,записываешь данные и отключаешся,все работает на ура даже при очень больших количествах клиентов..
Все это реализованн на C и epoll.

Вот код, собственно:
void epoll(){

    int sz = 128;
    int rc = 0;
    char network_msg[sz];

    /* Buffer where events are returned */
    events = calloc(globals.maxcon, sizeof event);

    listenfd = create_and_bind(arguments.listen_host, arguments.listen_port);

    if (0 != setnonblocking(listenfd)) {
        snprintf(network_msg, sz, "Can not make non-blocking socket: - %s", strerror(errno));
        log_debug(network_msg);
        halt();
    }

    if (-1 == listen(listenfd, globals.maxcon)) {
        snprintf(network_msg, sz, "Can not listen on %s: %s", arguments.listen_host, strerror(errno));
        log_debug(network_msg);
        halt();
    }

    snprintf(network_msg, sz, "Listening on %s:%d", arguments.listen_host, arguments.listen_port);
    log_debug(network_msg);

    snprintf(network_msg, sz, "Can serve maximum %d clients", globals.maxcon);
    log_debug(network_msg);

    efd = epoll_create(globals.maxcon);
    if (-1 == efd) {
        snprintf(network_msg, sz, "Error with epoll initialization: - %s", strerror(errno));
        log_debug(network_msg);
        halt();
    }

    event.data.fd = listenfd;
    event.events = EPOLLIN;
    //
    if (-1 == epoll_ctl(efd, EPOLL_CTL_ADD, listenfd, &event)) {
        snprintf(network_msg, sz, "Error with epoll_ctl: - %s", strerror(errno));
        log_debug(network_msg);
        halt();
    }

    /* The event loop */
    while (1) {
        int n, i;

        n = epoll_wait(efd, events, globals.maxcon, -1);
        for (i = 0; i < n; i++) {
            if ((events[i].events & EPOLLERR) ||
                    (events[i].events & EPOLLHUP) ||
                    (!(events[i].events & EPOLLIN))) {

                snprintf(network_msg, sz, "Error with epoll_ctl: - %s", strerror(errno));
                log_debug(network_msg);
                close(events[i].data.fd);
                continue;
            } else if (listenfd == events[i].data.fd) {
                /* We have a notification on the listening socket, which
                   means one or more incoming connections. */
                while (1) {
                    in_len = sizeof in_addr;
                    connfd = accept(listenfd, &in_addr, &in_len);
                    if (connfd == -1) {
                        if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
                            /* We have processed all incoming
                               connections. */
                            break;
                        } else {
                            snprintf(network_msg, sz, "Can not accept: %s", strerror(errno));
                            log_debug(network_msg);
                            break;
                        }
                    }

                    if (connection_count == globals.maxcon) {
                        snprintf(network_msg, sz, "Connections count limit exceeded [%d]", connection_count);
                        log_debug(network_msg);
                        close(connfd);
                        break;
                    }
                    if (0 == getnameinfo(&in_addr, in_len,
                            ip, sizeof ip,
                            port, sizeof port,
                            NI_NUMERICHOST | NI_NUMERICSERV)) {
                        snprintf(network_msg, sz, "Connection attempt from %s,port %d", ip, atoi(port));
                        log_connection(network_msg);
                    }
                    if (!has_applicable_route(ip)) {
                        Writeline(connfd, NOT_APPLICABLE_ROUTE, strlen(NOT_APPLICABLE_ROUTE));
                        close(connfd);
                        break;
                    }
                    /* route counter */
                    rc = find_proper_router_counter(ip);
                    //
                    if (0 != setnonblocking(connfd)) {
                        snprintf(network_msg, sz, "Can not make non-blocking socket: - %s", strerror(errno));
                        log_debug(network_msg);
                        halt();
                    }

                    event.data.fd = connfd;
                    event.events = EPOLLIN;
                    if (0 > epoll_ctl(efd, EPOLL_CTL_ADD, connfd, &event)) {
                        snprintf(network_msg, sz, "Error with epoll_ctl: - %s", strerror(errno));
                        log_debug(network_msg);
                    }
                    //
                    connection_count++;
                    save_active_connection(ip, atoi(port), connfd, &event, NULL, DEF_CON_COUNTER);
                    snprintf(network_msg, sz, "Connection %d established from %s,port %d", connection_count, ip, atoi(port));
                    log_connection(network_msg);
                    snprintf(network_msg, sz, "Connections count = %d\n", connection_count);
                    log_connection(network_msg);
                    //
                    Writeline(connfd, welcome, strlen(welcome));

                    if (TRANSFER_ACTIONS[rc][DEF_CON_COUNTER] == UNINITIALIZED) {
                        init_descriptor(rc, DEF_CON_COUNTER);
                    } else {
                        snprintf(network_msg, sz, "Data channel for router id: [%d] already exist", rc);
                        log_debug(network_msg);
                    }
                    //
                }
                np.cfd = connfd;
                np.lfd = listenfd;
                continue;
            } else {
                int fd = events[i].data.fd;
                int done = 0;

                while (1) {
                    ssize_t count;
                    bzero(inbuff[rc], sizeof inbuff[rc]);
                    if (globals.readline) {
                        count = Readline(fd, inbuff[rc], sizeof inbuff[rc]);
                    } else {
                        count = read(fd, inbuff[rc], sizeof inbuff[rc]);
                    }

                    if (count == -1) {
                        if (errno == ECONNRESET) {
                            snprintf(network_msg, sz, "Connection on descriptor %d [from %s,port %d] aborted", fd, ip, atoi(port));
                            log_connection(network_msg);
                            done = 1;
                        } else if (errno != EAGAIN) {
                            snprintf(network_msg, sz, "Connection on descriptor %d [from %s,port %d] has error: %s", fd, ip, atoi(port), strerror(errno));
                            log_connection(network_msg);
                            done = 1;
                        }

                        break;
                    } else if (count == 0) {
                        /* End of file. The remote has closed the connection. */
                        done = 1;
                        break;
                    }

                    if (inbuff[rc][0] == '\n' || inbuff[rc][0] == '\0' || (inbuff[rc][0] == '\r' && inbuff[rc][1] == '\n'))continue;
                    process_input_data(inbuff[rc], rc, DEF_CON_COUNTER);

                }

                if (done) {
                    if (0 > epoll_ctl(efd, EPOLL_CTL_DEL, fd, &event)) {
                        snprintf(network_msg, sz, "Error with epoll_ctl: - %s", strerror(errno));
                        log_debug(network_msg);
                    }
                    close(fd);
                    deactivate_connection(rc, DEF_CON_COUNTER);
                    connection_count--;
                    snprintf(network_msg, sz, "Connection from %s,port %d closed", ip, atoi(port));
                    log_connection(network_msg);
                    snprintf(network_msg, sz, "Connections count = %d\n", connection_count);
                    log_connection(network_msg);
                }
            }
        }
    }

    free(events);
    close(listenfd);
}
Заранее благодарен!

Ответить | Правка | Cообщить модератору

Оглавление

Сообщения по теме [Сортировка по времени | RSS]


1. "stream server с использованием epoll на c"  +/
Сообщение от Mr. Mistoffelees email on 01-Мрт-13, 15:29 
Привет,

> Проблема в том ,что после первого коннекта,второй и третий не всегда срабатывают,нужно
> отключить первый ,чтоб очередь дошла до них... Но когда коннектишся ,записываешь
> данные и отключаешся,все работает на ура даже при очень больших количествах
> клиентов..

Если вам нужно обрабатывать несколько одновременных сессий, вам нужно несколько одновременных обработчиков:
- Основной процесс висит, слушает (т.е. от только диспетчер).
- Приходит заявка; основной процес дает ее на обработку дочерному процессу и продолжает слушать
- Приходит вторая заявка, основной процесс отдает ее второму дочерному процессу и т.д.

Насчет создания дочерных процессов читаем либо про fork()/exec(), либо про threads.

Насчет передачи заявки, либо держим пул готовых дочерных процессов (и учимся IPC), либо порождаем процесс как только придет заявка.

WWel,

Ответить | Правка | ^ к родителю #0 | Наверх | Cообщить модератору

2. "stream server с использованием epoll на c"  +/
Сообщение от NEO (ok) on 01-Мрт-13, 15:39 
>[оверквотинг удален]
> обработчиков:
> - Основной процесс висит, слушает (т.е. от только диспетчер).
> - Приходит заявка; основной процес дает ее на обработку дочерному процессу и
> продолжает слушать
> - Приходит вторая заявка, основной процесс отдает ее второму дочерному процессу и
> т.д.
> Насчет создания дочерных процессов читаем либо про fork()/exec(), либо про threads.
> Насчет передачи заявки, либо держим пул готовых дочерных процессов (и учимся IPC),
> либо порождаем процесс как только придет заявка.
> WWel,

Благодарю за ответ

Ответить | Правка | ^ к родителю #1 | Наверх | Cообщить модератору

3. "stream server с использованием epoll на c"  +/
Сообщение от anonymous (??) on 07-Апр-13, 20:28 
>[оверквотинг удален]
>> - Основной процесс висит, слушает (т.е. от только диспетчер).
>> - Приходит заявка; основной процес дает ее на обработку дочерному процессу и
>> продолжает слушать
>> - Приходит вторая заявка, основной процесс отдает ее второму дочерному процессу и
>> т.д.
>> Насчет создания дочерных процессов читаем либо про fork()/exec(), либо про threads.
>> Насчет передачи заявки, либо держим пул готовых дочерных процессов (и учимся IPC),
>> либо порождаем процесс как только придет заявка.
>> WWel,
> Благодарю за ответ

Вы также можете посмотреть соседний тред, http://www.opennet.me/openforum/vsluhforumID9/9642.html пост 12.

Ответить | Правка | ^ к родителю #2 | Наверх | Cообщить модератору

Архив | Удалить

Рекомендовать для помещения в FAQ | Индекс форумов | Темы | Пред. тема | След. тема




Партнёры:
PostgresPro
Inferno Solutions
Hosting by Hoster.ru
Хостинг:

Закладки на сайте
Проследить за страницей
Created 1996-2024 by Maxim Chirkov
Добавить, Поддержать, Вебмастеру