处理并发之二:libevent的eventbuffer

并发, 事件驱动, 网络库

Posted by Jingh on August 24, 2016
本文阅读量:

bufferevent这个结构体

    struct bufferevent {
    	struct event_base *ev_base;  
    	const struct bufferevent_ops *be_ops;  
    	struct event ev_read;  
    	struct event ev_write;  
    	struct evbuffer *input;  
    	struct evbuffer *output;   		
    	bufferevent_data_cb readcb;   	     
    	bufferevent_data_cb writecb;   
    	bufferevent_event_cb errorcb;  
        }  

可以看出struct bufferevent内置了两个event(读/写)和对应的缓冲区。当有数据被读入(input)的时候,readcb被调用,当output被输出完成的时候,writecb被调用,当网络I/O出现错误,如链接中断,超时或其他错误时,errorcb被调用。 使用bufferevent的过程:

1. 设置sock为非阻塞的

    eg:  evutil_make_socket_nonblocking(fd);  

2. 使用bufferevent_socket_new创建一个structbufferevent *bev,关联该sockfd,托管给event_base

函数原型为:

> struct bufferevent * bufferevent_socket_new(struct event_base
> *base, evutil_socket_t fd,  int options)  
> eg:  struct bufferevent *bev;  
> bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);

3. 设置读写对应的回调函数

函数原型为:

> void bufferevent_setcb(struct bufferevent *bufev,   
> bufferevent_data_cb readcb, bufferevent_data_cb writecb,  
> bufferevent_event_cb eventcb, void *cbarg)   eg. 
> bufferevent_setcb(bev, readcb, NULL, errorcb, NULL);

4. 启用读写事件,其实是调用了event_add将相应读写事件加入事件监听队列poll。正如文档所说,如果相应事件不置为true,bufferevent是不会读写数据的

函数原型:

> int bufferevent_enable(struct bufferevent *bufev, short event)   	eg.
> bufferevent_enable(bev, EV_READ|EV_WRITE);

5. 进入bufferevent_setcb回调函数:

在readcb里面从input中读取数据,处理完毕后填充到output中;

writecb对于服务端程序,只需要readcb就可以了,可以置为NULL;

errorcb用于处理一些错误信息。

针对这些使用过程进入源码进行分析:

1. bufferevent_socket_new

  • 在bufferevent_init_common中调用evbuffer_new()初始化input和output
  • 在event_assign中初始化bufferevent中的ev_read和ev_write事件。
  • 在evbuffer_add_cb中给output添加了一个callback bufferevent_socket_outbuf_cb

2. bufferevent_setcb

该函数的作用主要是赋值,把该函数后面的参数,赋值给第一个参数struct bufferevent *bufev定义的变量

3. bufferevent_enable

调用event_add将读写事件加入到事件监听队列中。

对bufferevent常用的几个函数进行分析:

> char *evbuffer_readln(struct evbuffer*buffer, size_t *n_read_out,enum
> evbuffer_eol_style eol_style);

含义:Read a single line from an evbuffer. 返回值:读到的一行内容


> int evbuffer_add(struct evbuffer *buf,const void *data, size_t
> datlen);

含义:将数据添加到evbuffer的结尾 返回值:成功返回0,失败返回-1


> int evbuffer_remove(struct evbuffer*buf, void *data, size_t datlen);

含义:从evbuffer读取数据到data 返回值:成功返回0,失败返回-1


> size_t evbuffer_get_length(const structevbuffer *buf);

含义:返回evbuffer中存储的字节长度 暂时先分析到这里,下面是代码,客户端发送消息:HTTP/1.0, Client 0 send Message: Request: Hello Server! over,服务端一条消息收完成后,会回复:Response ok! Hello Client! 服务端从bufferevent中取出消息是按行取的。代码可能有不完善的地方,由于才疏学浅,研究时间短(3天),希望高手提出宝贵意见。


buffer_server.c

    
    #include <netinet/in.h>  
    #include <sys/socket.h>  
    #include <fcntl.h>   		  
    #include <event2/event.h>  
    #include <event2/buffer.h>  
    #include <event2/bufferevent.h>      		  
    #include <assert.h>  
    #include <unistd.h>  
    #include <string.h>  
    #include <stdlib.h>  
    #include <stdio.h>  
    #include <errno.h>  
   	void do_read(evutil_socket_t fd, short events, void *arg);  
   	//struct bufferevent内建了两个event(read/write)和对应的缓冲区(struct evbuffer *input, *output),并提供相应的函数用来操作缓冲区(或者直接操作bufferevent)   
   	
   	//接收到数据后,判断是不一样一条消息的结束,结束标志为"over"字符

    void readcb(struct bufferevent *bev, void *ctx)  
    {	 
        printf("called readcb!\n");   		
        struct evbuffer *input, *output;  
        char *request_line;  
        size_t len;  
        //其实就是取出bufferevent中的input  
        input = bufferevent_get_input(bev);
        //其实就是取出bufferevent中的output 
        output = bufferevent_get_output(bev);						  
        size_t input_len = evbuffer_get_length(input);  								    
        printf("input_len: %d\n", input_len);  
        size_t output_len = evbuffer_get_length(output);  										   
        printf("output_len: %d\n", output_len);  											  											   
        while(1){														    														        
        	//从evbuffer前面取出一行,用一个新分配的空字符结束		
        	//的字符串返回这一行,EVBUFFER_EOL_CRLF表示行尾是一个可选的回车,后随一个换行符
        	request_line = evbuffer_readln(input, &len, EVBUFFER_EOL_CRLF);
        	if(NULL == request_line){ 		
    			printf("The first line has not arrived yet.\n");  																										           
    			free(request_line);//之所以要进行free是因为 line = mm_malloc(n_to_copy+1)),在这里进行了malloc  																													
    			break;  																																      
        	} else {																				  																				   
        			printf("Get one line date: %s\n", request_line); 																				         
        			//用于判断是不是一条消息的结束  																										             
    				if(strstr(request_line, "over") != NULL)					{																														    																														     
    					char *response = "Response ok! Hello Client!\r\n";  																																				                 
    					//Adds data to an event buffer  
    					evbuffer_add(output, response, strlen(response));
    																																							                 
    					printf("服务端接收一条数据完成,回复客户端一条消息: %s\n", response); 																																												                
    					free(request_line);  																																														   
    					break;  																																																		             
    					}  																										       
        			}  																          
        	free(request_line);  																		     
        	}  												 
    		size_t input_len1 = evbuffer_get_length(input);  													   
    		printf("input_len1: %d\n", input_len1);  														   
    		size_t output_len1 = evbuffer_get_length(output);  															    
    		printf("output_len1: %d\n\n", output_len1);  
    }         
    
    void errorcb(struct bufferevent *bev, short error, void *ctx)  
    {  
    	if (error & BEV_EVENT_EOF) { 
    		/* connection has been closed, do any clean up here */   	 
    		printf("connection closed\n");  
    		}else if (error & BEV_EVENT_ERROR){					    
    			/* check errno to see what error occurred */  
    			printf("some other error\n");  
    			} else if (error & BEV_EVENT_TIMEOUT) 
    				/* must be a timeout event handle, handle it */  
    				printf("Timed out\n");  
    	      		}  
    	bufferevent_free(bev);   
    }
    
    void do_accept(evutil_socket_t listener, short event, void *arg)  
    {
    	struct event_base *base = arg;  
    	struct sockaddr_storage ss;  
    	socklen_t slen = sizeof(ss);  
    	int fd = accept(listener, (struct sockaddr*)&ss, &slen);  
    	if (fd < 0){
    		perror("accept");  
    	}else if (fd > FD_SETSIZE){  
    		close(fd);  
    	 }else {
    		 struct bufferevent *bev;
    		 evutil_make_socket_nonblocking(fd);  													    														         
    		 //使用bufferevent_socket_new创建一个struct bufferevent*bev,
    		 //关联该sockfd,托管给event_base  													         
    		 //BEV_OPT_CLOSE_ON_FREE表示释放bufferevent时关闭底层传输端口。
    		 //这将关闭底层套接字,释放底层bufferevent等。  														         
    		 bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);  														    																         
    		 //设置读写对应的回调函数  
    		 bufferevent_setcb(bev, readcb, NULL, errorcb, NULL);  
    		 bufferevent_setwatermark(bev, EV_READ, 0, MAX_LINE);  																    																		        
    		 //启用读写事件,其实是调用了event_add将相应读写事件加入事件监听队列poll。
    		 //正如文档所说,如果相应事件不置为true,bufferevent是不会读写数据的  																			         
    		 bufferevent_enable(bev, EV_READ|EV_WRITE);  																		
    	 }  
    }     
    	
    void run(void){	  
    	evutil_socket_t listener;   		
    	struct sockaddr_in sin;  
    	struct event_base *base;  
    	struct event *listener_event;  			  
    	base = event_base_new();  
    	if (!base)  
    	return; 
    	 sin.sin_family = AF_INET;  
    	 sin.sin_addr.s_addr = 0;  			
    	 sin.sin_port = htons(8000); 								  									
    	 listener = socket(AF_INET, SOCK_STREAM, 0);  
    	 evutil_make_socket_nonblocking(listener);  		
    	 
    	#ifndef WIN32{    				

										 														        
	int one = 1;  																      
	setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, 
	&one, sizeof(one));  
    }    
	#endif  
	if(bind(listener, (struct sockaddr*)&sin, sizeof(sin)) < 0){																   															         
		perror("bind");  																		         
		return;  																				    
	}  														
	if (listen(listener, 16)<0){																    																         
		perror("listen");    																    																          
		return;     
	}  										  
												  
	listener_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, (void*)base);
	event_add(listener_event, NULL);  																									     													   
	event_base_dispatch(base);  
    }  
    int main(int argc, char **argv){    	  
    	setvbuf(stdout, NULL, _IONBF, 0); 	  
    	run();  
    	return 0; 
    }