Libevent для с windows

Libevent для с windows

Использование языка C/C++ в разработке

среда, 8 апреля 2009 г.

Использование libevent в сетевых приложениях

API библиотеки libevent предоставляет удобный механизм callback-функций на появление соответствующего события, связанного с файловым дескриптором или по истечению таймаута, а также callback’и на сигналы или обычные временные интервалы.

На текущий момент, libevent поддерживает механизмы /dev/poll, kqueue, event ports, select, poll, epoll. За счет прозрачного API библиотека может развиваться без необходимости менять что-либо в приложениях, которые используют libevent. Как результат, libevent позволяет разрабатывать кроссплатформенные приложения и использовать наиболее подходящий механизм событий в конкретной операционной системе.
Так же, libevent можно использовать в многопоточных приложениях.
libevent работает на Linux, *BSD, Max OS X, Solaris и Windows.

К примеру, libevent используют такие системы, как Memcached – высокопроизводительная распределенная система кэширования, Tor – анонимайзер, Systrace – песочница для системных вызовов.

Использовать в своем проекте libevent достаточно просто:
Включить заголовочный файл:
и указать линковщику флаг -levent.
Перед использованием функций из этой библиотеки, необходимо выполнить инициализацию вызовом функции
После этого можно регистрировать callback’и на любые дескрипторы или таймауты.
Вот пример простейшего сетевого эхо-сервера:

В libevent есть простой асинхронный HTTP сервер, который можно встраивать в свое
приложение, а так же фреймворк для написания приложений, использующие RPC.

Записки программиста

Асинхронная работа с сокетами на C/C++ с libevent

Помните, как когда-то мы писали простой TCP-сервер на C, а потом разбирали типичные ошибки? Описанный в этих статьях подход прекрасно работает, но только до тех пор, пока количество одновременно обслуживаемых соединений невелико — условно, пара сотен. Если же вам нужно обслуживать 10 или 50 тысяч соединений (так называемая проблема C10K), программу нужно писать совершенно иначе. Давайте разберемся, почему так, и как же нужно писать.

Суть проблемы

Почему же не работает просто держать 10 000 сокетов и обслуживать каждый из них в отдельном треде? Дело в том, что треды не бесплатны — операционная система выделяет под треды память, а также распределяет между ними время CPU. Переключение контекста конкретного ядра CPU с одного треда на другой — операция тоже не бесплатная. Это не проблема, когда тредов всего лишь несколько сотен. Но когда счет идет на тысячи тредов, больше времени начинает тратиться на переключение контекстов, чем на выполнение полезной работы.

Можно ли уменьшить число тредов и при этом продолжить обслуживать 10 000 соединений? Оказывается, что можно. Простейшее решение заключается в использовании системного вызова select(), который позволяет по списку файловых дескрипторов определить, какие из них стали готовы для заданного класса операций ввода-вывода. Возможности select(), правда, не безграничны — хорошо работает он только в случае, если файловых дескрипторов не больше сотни. Поэтому для решения той же задачи в современных операционных системах есть лишенные этого недостатка системные вызовы: в Linux это epoll, во FreeBSD — kqueue, в Windows — IOCP.

Поскольку на разных платформах системные вызовы разные, было разработано несколько библиотек, предоставляющих соответствующий слой абстракции. В качестве примеров можно привести библиотеки libevent, libev и libuv. Эти библиотеки имеют немного разные компромиссы. К примеру, libev не поддерживает Windows, потому что, действительно, кто же использует Windows на серверах, да еще и обслуживающих столько соединений?

В рамках этой заметки мы сосредоточим свое внимание на библиотеке libevent. Помимо прочего, libevent интересен тем, что имеет отличную документацию на Doxygen, кучу примеров, и даже небольшую бесплатную книжку. Библиотека используется в множестве известных проектов, таких, как Chromium, Memcached, Tor, Transmission и PgBouncer. Кроме того, у libevent есть ряд интересных фичей, например, встроенных асинхронный HTTP-сервер.

Читайте также:  Не отображается справка windows

Использование libevent

Для примера напишем на базе libevent простенький чат. Клиенты цепляются к нему по TCP обычным telnet’ом и отправляют строчки. Строчку, отправленную одним клиентом, получают все остальные клиенты.

У меня код получился сравнительно большим, поэтому рассмотрим его по частям:

#define READ_BUFF_SIZE 128
#define WRITE_BUFF_SIZE ((READ_BUFF_SIZE)*8)

typedef struct connection_ctx_t <
struct connection_ctx_t * next ;
struct connection_ctx_t * prev ;

evutil_socket_t fd ;
struct event_base * base ;
struct event * read_event ;
struct event * write_event ;

uint8_t read_buff [ READ_BUFF_SIZE ] ;
uint8_t write_buff [ WRITE_BUFF_SIZE ] ;

ssize_t read_buff_used ;
ssize_t write_buff_used ;
> connection_ctx_t ;

Эта структура представляет собой соединение пользователя. Все соединения объединяются в двусвязный список (поля next, prev). Каждое соединение хранит соответствующий ему сокет (fd), буфер принятых байт (read_buff, read_buff_used), а также буфер с байтами, готовыми к отправке (write_buff, write_buff_used). Есть также поля base, read_event и write_event, смысл которых станет понятен чуть ниже.

void run ( char * host , int port ) <
// allocate memory for a connection ctx (used as linked list head)
connection_ctx_t * head_ctx = ( connection_ctx_t * ) malloc (
sizeof ( connection_ctx_t ) ) ;
if ( ! head_ctx )
error ( «malloc() failed» ) ;

head_ctx -> next = head_ctx ;
head_ctx -> prev = head_ctx ;
head_ctx -> write_event = NULL ;
head_ctx -> read_buff_used = 0 ;
head_ctx -> write_buff_used = 0 ;

// create a socket
head_ctx -> fd = socket ( AF_INET , SOCK_STREAM , 0 ) ;
if ( head_ctx -> fd 0 )
error ( «socket() failed» ) ;

// make it nonblocking
if ( evutil_make_socket_nonblocking ( head_ctx -> fd ) 0 )
error ( «evutil_make_socket_nonblocking() failed» ) ;

// bind and listen
struct sockaddr_in sin ;
sin . sin_family = AF_INET ;
sin . sin_port = htons ( port ) ;
sin . sin_addr . s_addr = inet_addr ( host ) ;
if ( bind ( head_ctx -> fd , ( struct sockaddr * ) & sin , sizeof ( sin ) ) 0 )
error ( «bind() failed» ) ;

if ( listen ( head_ctx -> fd , 1000 ) 0 )
error ( «listen() failed» ) ;

// create an event base
struct event_base * base = event_base_new ( ) ;
if ( ! base )
error ( «event_base_new() failed» ) ;

// create a new event
struct event * accept_event = event_new ( base , head_ctx -> fd ,
EV_READ | EV_PERSIST , on_accept , ( void * ) head_ctx ) ;
if ( ! accept_event )
error ( «event_new() failed» ) ;

head_ctx -> base = base ;
head_ctx -> read_event = accept_event ;

// schedule the execution of accept_event
if ( event_add ( accept_event , NULL ) 0 )
error ( «event_add() failed» ) ;

// run the event dispatching loop
if ( event_base_dispatch ( base ) 0 )
error ( «event_base_dispatch() failed» ) ;

// free allocated resources
on_close ( head_ctx ) ;
event_base_free ( base ) ;
>

Помимо обычной последовательности вызовов socket, bind, listen, созданный сокет здесь делается неблокирующим при помощи соответствующей процедуры из библиотеки libevent. Создается event_base, главный «класс» в libevent, благодаря которому осуществляется работа всего остального. Также создается событие с именем accept_event, выстреливающее, когда сокет становится доступен на чтение, то есть, в данном случае — готов принять новое соединение. Событие «включается» с помощью процедуры event_add, после чего выполнение программы уходит в недры процедуры event_base_dispatch, откуда при нормальном выполнении программы управление уже не вернется. Другими словами, этот код говорит «забиндить порт, и когда на него кто-то постучится, вызвать процедуру on_accept».

void on_accept ( evutil_socket_t listen_sock , short flags , void * arg ) <
connection_ctx_t * head_ctx = ( connection_ctx_t * ) arg ;
evutil_socket_t fd = accept ( listen_sock , 0 , 0 ) ;

if ( fd 0 )
error ( «accept() failed» ) ;

// make in nonblocking
if ( evutil_make_socket_nonblocking ( fd ) 0 )
error ( «evutil_make_socket_nonblocking() failed» ) ;

connection_ctx_t * ctx = ( connection_ctx_t * ) malloc (
sizeof ( connection_ctx_t ) ) ;
if ( ! ctx )
error ( «malloc() failed» ) ;

// add ctx to the linked list
ctx -> prev = head_ctx ;
ctx -> next = head_ctx -> next ;
head_ctx -> next -> prev = ctx ;
head_ctx -> next = ctx ;

ctx -> base = head_ctx -> base ;

ctx -> read_buff_used = 0 ;
ctx -> write_buff_used = 0 ;

printf ( «[%p] New connection! fd = %d \n » , ctx , fd ) ;

ctx -> fd = fd ;
ctx -> read_event = event_new ( ctx -> base , fd , EV_READ | EV_PERSIST ,
on_read , ( void * ) ctx ) ;
if ( ! ctx -> read_event )
error ( «event_new(. EV_READ . ) failed» ) ;

Читайте также:  Windows 10 не работает зум

ctx -> write_event = event_new ( ctx -> base , fd , EV_WRITE | EV_PERSIST ,
on_write , ( void * ) ctx ) ;
if ( ! ctx -> write_event )
error ( «event_new(. EV_WRITE . ) failed» ) ;

if ( event_add ( ctx -> read_event , NULL ) 0 )
error ( «event_add(read_event, . ) failed» ) ;
>

Эта процедура принимает входящее соединение с помощью системного вызова accept. Также она выделяет память для структуры connection_ctx_t, полностью заполняет ее и добавляет в двусвязных список всех открытых соединений. В процессе заполнения структуры с помощью процедуры event_new создаются события read_event и write_event по аналогии с тем, как это делалось в рассмотренной выше процедуре run. Из этих двух событий с помощью event_add «включается» только read_event. Другими словами, код говорит «принять соединение, добавить его в список, и когда что-то придет, дернуть процедуру on_read».

void on_read ( evutil_socket_t fd , short flags , void * arg ) <
connection_ctx_t * ctx = arg ;

printf ( «[%p] on_read called, fd = %d \n » , ctx , fd ) ;

ssize_t bytes ;
for ( ;; ) <
bytes = read ( fd , ctx -> read_buff + ctx -> read_buff_used ,
READ_BUFF_SIZE — ctx -> read_buff_used ) ;
if ( bytes == 0 ) <
printf ( «[%p] client disconnected! \n » , ctx ) ;
on_close ( ctx ) ;
return ;
>

if ( bytes 0 ) <
if ( errno == EINTR )
continue ;

printf ( «[%p] read() failed, errno = %d, »
«closing connection. \n » , ctx , errno ) ;
on_close ( ctx ) ;
return ;
>

break ; // read() succeeded
>

ssize_t check = ctx -> read_buff_used ;
ssize_t check_end = ctx -> read_buff_used + bytes ;
ctx -> read_buff_used = check_end ;

while ( check check_end ) <
if ( ctx -> read_buff [ check ] != ‘ \n ‘ ) <
check ++;
continue ;
>

on_string_received ( ( const char * ) ctx -> read_buff , length , ctx ) ;

// shift read_buff (optimize!)
memmove ( ctx -> read_buff , ctx -> read_buff + check ,
check_end — check — 1 ) ;
ctx -> read_buff_used -= check + 1 ;
check_end -= check ;
check = 0 ;
>

if ( ctx -> read_buff_used == READ_BUFF_SIZE ) <
printf ( «[%p] client sent a very long string, »
«closing connection. \n » , ctx ) ;
on_close ( ctx ) ;
>
>

Процедура on_read вызывается, когда в одном из соединений есть данные, которые можно прочитать. Процедура считывает эти данные и помещает в read_buff. Когда в буфере оказывается строка, заканчивающаяся символом \n , он заменяется на \0 , после чего вызывается on_string_received с полученной строкой, ее длиной и информацией о соединении в качестве аргументов. После обработки полученной строки содержимое буфера сдвигается. В случае возникновения ошибок (read_buff полностью заполнился, не удалось считать данные, …) клиентское соединение закрывается.

// called manually
void on_string_received ( const char * str , int len ,
connection_ctx_t * ctx ) <
printf ( «[%p] a complete string received: ‘%s’, length = %d \n » ,
ctx , str , len ) ;

connection_ctx_t * peer = ctx -> next ;
while ( peer != ctx ) <
if ( peer -> write_event == NULL ) < // list head, skipping
peer = peer -> next ;
continue ;
>

printf ( «[%p] sending a message to %p. \n » , ctx , peer ) ;

// check that there is enough space in the write buffer
if ( WRITE_BUFF_SIZE — peer -> write_buff_used len + 1 ) <
// if it’s not, call on_close being careful with
// the links in the linked list
printf ( «[%p] unable to send a message to %p — »
«not enough space in the buffer; »
«closing %p’s connection \n » ,
ctx ,
peer ,
peer ) ;
connection_ctx_t * next = peer -> next ;
on_close ( peer ) ;
peer = next ;
continue ;
>

// append data to the buffer
memcpy ( peer -> write_buff + peer -> write_buff_used , str , len ) ;
peer -> write_buff [ peer -> write_buff_used + len ] = ‘ \n ‘ ;
peer -> write_buff_used += len + 1 ;

// add writing event (it’s not a problem to call it
// multiple times)
if ( event_add ( peer -> write_event , NULL ) 0 )
error ( «event_add(peer->write_event, . ) failed» ) ;

Читайте также:  Как без биос загрузить windows

Процедура on_string_received берет принятую от одного из клиентов строку и записывает ее в конец write_buff остальных клиентов. Также для этих клиентов происходит «включение» событий write_event с помощью процедуры event_add. Таким образом мы говорим, что готовы что-то послать клиентам, если, конечно, они готовы что-то принять. В случае возникновения ошибки, например, нехватки места в write_buff, соответствующее соединение закрывается.

void on_write ( evutil_socket_t fd , short flags , void * arg ) <
connection_ctx_t * ctx = arg ;
printf ( «[%p] on_write called, fd = %d \n » , ctx , fd ) ;

ssize_t bytes ;
for ( ;; ) <
bytes = write ( fd , ctx -> write_buff , ctx -> write_buff_used ) ;
if ( bytes 0 ) <
if ( errno == EINTR )
continue ;

printf ( «[%p] write() failed, errno = %d, »
«closing connection. \n » , ctx , errno ) ;
on_close ( ctx ) ;
return ;
>

break ; // write() succeeded
>

// shift the write_buffer (optimize!)
memmove ( ctx -> write_buff , ctx -> write_buff + bytes ,
ctx -> write_buff_used — bytes ) ;
ctx -> write_buff_used -= bytes ;

// if there is nothing to send call event_del
if ( ctx -> write_buff_used == 0 ) <
printf ( «[%p] write_buff is empty, »
«calling event_del(write_event) \n » , ctx ) ;
if ( event_del ( ctx -> write_event ) 0 )
error ( «event_del() failed» ) ;
>
>

Процедура on_write вызывается при включенном событии write_event (то есть, когда write_buff не пуст), когда сокет готов к передаче данных. Процедура просто записывает в сокет столько, сколько получится записать, и сдвигает write_buff на соответствующее количество байт. Если буфер при этом становится пустым, событие write_event выключается с помощью процедуры event_del. Если этого не сделать, on_write будет постоянно вызываться с пустым буфером на отправку, лишь зря грея CPU.

// called manually
void on_close ( connection_ctx_t * ctx ) <
printf ( «[%p] on_close called, fd = %d \n » , ctx , ctx -> fd ) ;

// remove ctx from the lined list
ctx -> prev -> next = ctx -> next ;
ctx -> next -> prev = ctx -> prev ;

event_del ( ctx -> read_event ) ;
event_free ( ctx -> read_event ) ;

if ( ctx -> write_event ) <
event_del ( ctx -> write_event ) ;
event_free ( ctx -> write_event ) ;
>

close ( ctx -> fd ) ;
free ( ctx ) ;
>

Наконец, on_close вызывается при закрытии соединения — штатном или в случае той или иной ошибки. Информация о соединении удаляется из двусвязного списка, все выделенные ресурсы освобождаются, сокет закрывается.

Как видите, приведенный код довольно прост, во всяком случае, концептуально. Я протестировал его на Linux и FreeBSD как локально, так и по сети, и не смог выявить каких-либо дефектов. Желающие могут проверить его самостоятельно, в том числе и на Windows, и в случае необходимости прислать мне pull request. Репозиторий, как обычно, вы найдете на GitHub. Помимо приведенного выше кода в репозитории вы также найдете и небольшую тестовую утилиту, написанную на Go.

Заключение

Приведенный материал не претендует на рассмотрение абсолютно всех возможностей libevent. Помимо прочего, за кадром остались такие возможности, как I/O Buffers, фреймворк для построения RPC, функции для асинхронной работы с DNS, а также уже упомянутый асинхронный HTTP-сервер. В качестве домашнего задания вы можете изучить какую-нибудь из этих возможностей. Документацию и примеры вы найдете на официальном сайте libevent’а.

Также вы можете доработать мой пример, сделав из него, к примеру, очередь сообщений типа RabbitMQ — с топиками, подписками на них, и вот этим вот всем. При желании можно даже реализовать протокол AMQP. Вообще, если вы ищите идею для проекта на базе libevent или одной из его альтернатив, можно взять произвольный протокол (HTTP/2, WebSockets, Socks5, SMTP, FTP, IRC, …) и реализовать соответствующий сервер, клиент, или и то, и другое.

А как вы пишиште кроссплатформенные асинхронные сетевые приложения на C и C++?

Оцените статью