惯性聚合 高效追踪和阅读你感兴趣的博客、新闻、科技资讯
阅读原文 在惯性聚合中打开

推荐订阅源

N
Netflix TechBlog - Medium
V
Vulnerabilities – Threatpost
Google Online Security Blog
Google Online Security Blog
Hugging Face - Blog
Hugging Face - Blog
L
LINUX DO - 热门话题
云风的 BLOG
云风的 BLOG
P
Proofpoint News Feed
D
Docker
C
Cyber Attacks, Cyber Crime and Cyber Security
MyScale Blog
MyScale Blog
P
Palo Alto Networks Blog
T
Tenable Blog
P
Privacy International News Feed
Google DeepMind News
Google DeepMind News
小众软件
小众软件
Cisco Talos Blog
Cisco Talos Blog
aimingoo的专栏
aimingoo的专栏
Cyber Security Advisories - MS-ISAC
Cyber Security Advisories - MS-ISAC
A
Arctic Wolf
C
Cybersecurity and Infrastructure Security Agency CISA
C
Cisco Blogs
T
Threat Research - Cisco Blogs
NISL@THU
NISL@THU
The Hacker News
The Hacker News
Project Zero
Project Zero
AWS News Blog
AWS News Blog
Simon Willison's Weblog
Simon Willison's Weblog
cs.CL updates on arXiv.org
cs.CL updates on arXiv.org
T
Threatpost
V
Visual Studio Blog
The GitHub Blog
The GitHub Blog
The Cloudflare Blog
Last Week in AI
Last Week in AI
Jina AI
Jina AI
Cyberwarzone
Cyberwarzone
The Register - Security
The Register - Security
C
CXSECURITY Database RSS Feed - CXSecurity.com
Vercel News
Vercel News
D
Darknet – Hacking Tools, Hacker News & Cyber Security
MongoDB | Blog
MongoDB | Blog
U
Unit 42
Scott Helme
Scott Helme
A
About on SuperTechFans
WordPress大学
WordPress大学
F
Fortinet All Blogs
大猫的无限游戏
大猫的无限游戏
G
GRAHAM CLULEY
Latest news
Latest news
让小产品的独立变现更简单 - ezindie.com
让小产品的独立变现更简单 - ezindie.com
S
Schneier on Security

博客园 - 南守拥

DB2 sequence 迁移 申请认证 java monitor file java mina unix client asp.net 跑 socket server PAZU 是4Fang 为配合“四方在线”软件于2004年开发的WEB打印控件,适用于各种WEB软件项目的打印。 jbpm表说明 7-1 6-3 6-2 6-1 同步机制摘要 3-3 - 南守拥 - 博客园 3-2 - 南守拥 - 博客园 3-1 2-6 - 南守拥 - 博客园 2-4 - 南守拥 - 博客园 2-3 - 南守拥 - 博客园 2-2 - 南守拥 - 博客园
epoll 多线程 服务器
南守拥 · 2010-12-21 · via 博客园 - 南守拥

  1 // 
  2 // a simple agi server using epoll in linux
  3 // 
  4 // 2010-12-20
  5 // by nsy
  6 //
  7 #include <sys/socket.h>
  8 #include <sys/epoll.h>
  9 #include <netinet/in.h>
 10 #include <arpa/inet.h>
 11 #include <fcntl.h>
 12 #include <unistd.h>
 13 #include <stdio.h>
 14 #include <stdlib.h>
 15 #include <errno.h>
 16 #include <string.h>
 17 #include "CallSvr.h"
 18 #include <pthread.h>
 19 #include "epoll.h"
 20 
 21 //test
 22 #include "msg.h"
 23 
 24 // set event
 25 void EventSet(struct myevent_s *ev, int fd,int status)
 26 {
 27   ev->fd = fd;
 28   ev->status = status;
 29   ev->last_active = time(NULL);
 30   fprintf(stderr,"function=%s,line=%d,fd=%d\n",__func__,__LINE__,fd);
 31 }
 32 // add/mod an event to epoll
 33 void EventAdd(int epollFd, int events,struct myevent_s *ev)
 34 {
 35   struct epoll_event epv = {0, {0}};
 36   int op;
 37   epv.data.ptr = ev;
 38   epv.events = events;
 39   if(ev->status == 1){
 40     op = EPOLL_CTL_MOD;
 41     fprintf(stderr,"mod:function=%s,line=%d,fd=%d,status=%d\n",__func__,__LINE__,ev->fd,ev->status);
 42   }
 43   else{
 44     op = EPOLL_CTL_ADD;
 45     ev->status = 1;
 46     fprintf(stderr,"add:function=%s,line=%d,fd=%d,status=%d\n",__func__,__LINE__,ev->fd,ev->status);
 47   }
 48   if(epoll_ctl(epollFd, op, ev->fd, &epv) < 0)
 49     {
 50       fprintf(stderr,"failed:function=%s,line=%d,fd=%d:errno=%d\n",__func__,__LINE__,ev->fd,errno);
 51     }
 52   else
 53     {
 54       fprintf(stderr,"success:function=%s,line=%d,fd=%d\n",__func__,__LINE__,ev->fd);
 55     }
 56 }
 57 // delete an event from epoll
 58 void EventDel(int epollFd,struct myevent_s *ev)
 59 {
 60   struct epoll_event epv = {0, {0}};
 61   if(ev->status != 1return;
 62   epv.data.ptr = ev;
 63   ev->status = 0;
 64   epoll_ctl(epollFd, EPOLL_CTL_DEL, ev->fd, &epv);
 65   fprintf(stderr,"function=%s,line=%d,fd=%d\n",__func__,__LINE__,ev->fd);
 66 }
 67 
 68 // receive data
 69 void RecvData(struct myevent_s *ev)
 70 {
 71   msg_header header;
 72   int recvbytes;
 73   if ((recvbytes=recv(ev->fd, &header, sizeof(msg_header), 0)) ==-1
 74     {
 75       fprintf(stderr,"RecvHeaderErr:function=%s,line=%d,fd=%d\n",__func__,__LINE__,ev->fd);
 76       goto errret;
 77     }
 78   if(recvbytes == sizeof(msg_header))
 79     {
 80       switch(header.msg_type)
 81     {
 82     case msg_lost:
 83       {
 84         rq_lost rq;
 85         if ((recvbytes=recv(ev->fd, ((char*)(&rq))+sizeof(msg_header), sizeof(rq_lost)-sizeof(msg_header), 0)) ==-1)
 86           {
 87         fprintf(stderr,"RecvAfter:function=%s,line=%d,fd=%d\n",__func__,__LINE__,ev->fd);
 88         goto errret;
 89           }
 90         else if(recvbytes > 0)
 91           {                
 92         printf("recv sucess%d,hope to recv %d\n",recvbytes,sizeof(rq_lost)-sizeof(msg_header));
 93         printf("cardno is '%s'\n",rq.cardno);
 94         printf("password is '%s'\n",rq.password);
 95     
 96         //init ev recv
 97         memcpy(&ev->header,&header,sizeof(msg_header));
 98         memcpy(&ev->recv_buff,&rq,sizeof(rq_lost));
 99         ev->recv_len = recvbytes;
100         fprintf(stderr,"addtologicqueue:function=%s,line=%d,fd=%d\n",__func__,__LINE__,ev->fd);
101         //add to logic queue
102         sem_wait(&bin_sem_logic_data_produce);
103         struct QUEUE_LOGIC_DATA_ITEM *item;
104         item = malloc(sizeof(struct QUEUE_LOGIC_DATA_ITEM));
105         item->ev = ev;
106         pthread_mutex_lock(&queue_logic_data_mutex);
107         TAILQ_INSERT_TAIL(&queue_logic_data_header,item,logic_data_entries);
108         pthread_mutex_unlock(&queue_logic_data_mutex);
109         sem_post(&bin_sem_logic_data_consume);
110         fprintf(stderr,"addtologicqueue--end:function=%s,line=%d,fd=%d\n",__func__,__LINE__,ev->fd);
111           }else
112           {
113         fprintf(stderr,"RecvAfter:function=%s,line=%d,fd=%d:errno=%d\n",__func__,__LINE__,ev->fd,errno);
114         goto errret;
115           }
116         break;
117       }
118     }
119       return;//switch end function end
120     }else
121     {
122       fprintf(stderr,"function=%s,line=%d,fd=%d\n",__func__,__LINE__,ev->fd);
123       goto errret;
124     }
125  errret:
126   EventDel(g_epollFd, ev);
127   close(ev->fd);
128 }
129 // send data
130 void SendData(struct myevent_s *ev)
131 {
132   fprintf(stderr,"JustIn:function=%s,line=%d,fd=%d\n",__func__,__LINE__,ev->fd);
133   int len;
134   // send data
135   len = send(ev->fd, ev->send_buff, ev->send_len, 0);
136   ev->send_len = 0;
137   fprintf(stderr,"sendlen=%d:function=%s,line=%d,fd=%d\n",len,__func__,__LINE__,ev->fd);
138   if(len < 0)   
139     {
140       close(ev->fd);
141       fprintf(stderr,"err=%d:function=%s,line=%d,fd=%d\n",errno,__func__,__LINE__,ev->fd);
142     }else
143     {
144       //let system known can recv
145       EventAdd(g_epollFd, EPOLLIN|EPOLLET,ev);
146     }
147 }
148 
149 void *accept_thread_work(void *arg)
150 {
151   while(1)
152     {
153       int* plistenFd = (int*)arg;
154       fprintf(stderr,"function=%s,line=%d,listenfd=%d\n",__func__,__LINE__,*plistenFd);
155       struct sockaddr_in sin;
156       socklen_t len = sizeof(struct sockaddr_in);
157       int nfd, i;
158       // accept
159       if((nfd = accept(*plistenFd, (struct sockaddr*)&sin, &len)) == -1)
160     {
161       if(errno != EAGAIN && errno != EINTR)
162         {
163           fprintf(stderr,"%s: bad accept", __func__);
164         }
165       continue;
166     }
167       do
168     {
169       for(i = 0; i < MAX_EVENTS; i++)
170         {
171           if(g_Events[i].status == 0)
172         {
173           fprintf(stderr,"function=%s,line=%d,listenfd=%d,currentindex=%d\n",__func__,__LINE__,*plistenFd,i);
174           break;
175         }
176         }
177       if(i == MAX_EVENTS)
178         {
179           fprintf(stderr,"max events:function=%s,line=%d,listenFd=%d\n",__func__,__LINE__,*plistenFd);
180           break;
181         }
182       // set nonblocking
183       fprintf(stderr,"set nonblocking:function=%s,line=%d,listenfd=%d\n",__func__,__LINE__,*plistenFd);
184       if(fcntl(nfd, F_SETFL, O_NONBLOCK) < 0break;
185       // add a read event for receive data
186       EventSet(&g_Events[i], nfd,0);
187       EventAdd(g_epollFd, EPOLLIN|EPOLLET, &g_Events[i]);
188       fprintf(stderr,"new conn[%s:%d][time:%d]\n", inet_ntoa(sin.sin_addr), ntohs(sin.sin_port),(int) g_Events[i].last_active);
189     }while(0);
190     }
191   return NULL;
192 }
193 
194 void *epoll_wait_thread_work(void *arg)
195 {
196   fprintf(stderr,"justin:function=%s,line=%d\n",__func__,__LINE__);
197   // event loop
198   struct epoll_event events[MAX_EVENTS];
199 
200   int checkPos = 0;
201   while(1){
202     // a simple timeout check here, every time 100, better to use a mini-heap, and add timer event
203     long now = time(NULL);
204     int i;
205     for(i = 0; i < 100; i++, checkPos++// doesn't check listen fd
206       {
207     if(checkPos == MAX_EVENTS) checkPos = 0// recycle
208     if(g_Events[checkPos].status != 1continue;
209     long duration = now - g_Events[checkPos].last_active;
210     if(duration >= 60// 60s timeout
211       {
212         close(g_Events[checkPos].fd);
213         fprintf(stderr,"[fd=%d] timeout[%d--%d].\n",(int) g_Events[checkPos].fd,(int) g_Events[checkPos].last_active, (int)now);
214         EventDel(g_epollFd, &g_Events[checkPos]);
215       }
216       }
217     // wait for events to happen
218     int fds = epoll_wait(g_epollFd, events, MAX_EVENTS, 1000);
219     if(fds < 0){
220       fprintf(stderr,"epoll_wait error, exit\n");
221       break;
222     }
223     for(i = 0; i < fds; i++){
224       struct myevent_s *ev = (struct myevent_s*)events[i].data.ptr;
225       if(events[i].events&EPOLLIN) // read event
226     {
227       sem_wait(&bin_sem_recv_fd_produce);
228       fprintf(stderr,"readEvent:function=%s,line=%d:fd=%d\n",__func__,__LINE__,ev->fd);
229       //ev->call_back(ev->fd, events[i].events, ev->arg);
230       struct QUEUE_RECV_FD_ITEM *item;
231       item = malloc(sizeof(struct QUEUE_RECV_FD_ITEM));
232       item->ev = ev;
233       pthread_mutex_lock(&queue_recv_fd_mutex);
234       TAILQ_INSERT_TAIL(&queue_recv_fd_header,item,recv_fd_entries);
235       pthread_mutex_unlock(&queue_recv_fd_mutex);
236       sem_post(&bin_sem_recv_fd_consume);
237     }else if(events[i].events&EPOLLOUT) // write event
238     {
239       sem_post(&bin_sem_send_fd_consume);
240       fprintf(stderr,"post send fd consume:function=%s,line=%d:fd=%d\n",__func__,__LINE__,ev->fd);
241     }
242     }
243   }
244   return NULL;
245 }
246 
247 void *recv_data_thread_work(void *arg)
248 {
249   while(1)
250     {
251       sem_wait(&bin_sem_recv_fd_consume);
252       fprintf(stderr,"justin:function=%s,line=%d\n",__func__,__LINE__);
253       int index = (int)arg;
254       fprintf(stderr,"recv thread id is %d\n",index);
255       pthread_mutex_lock(&queue_recv_fd_mutex);
256       struct QUEUE_RECV_FD_ITEM *item;
257       item = TAILQ_FIRST(&queue_recv_fd_header);
258       TAILQ_REMOVE(&queue_recv_fd_header,item,recv_fd_entries); 
259       pthread_mutex_unlock(&queue_recv_fd_mutex);
260       RecvData(item->ev);
261     }
262   return NULL;
263 }
264 
265 void *send_data_thread_work(void *arg)
266 {
267   while(1)
268     {  
269       sem_wait(&bin_sem_send_fd_consume);
270       fprintf(stderr,"justin:function=%s,line=%d\n",__func__,__LINE__);
271       pthread_mutex_lock(&queue_send_fd_mutex);
272       struct QUEUE_SEND_FD_ITEM *item;
273       item = TAILQ_FIRST(&queue_send_fd_header);
274       TAILQ_REMOVE(&queue_send_fd_header,item,send_fd_entries); 
275       pthread_mutex_unlock(&queue_send_fd_mutex);
276       SendData(item->ev);
277     }
278   return NULL;
279 }
280 
281 void *logic_data_thread_work(void *arg)
282 {
283   while(1)
284     {
285       //remove logic queue
286       sem_wait(&bin_sem_logic_data_consume);
287       //for test
288       int index = (int)arg;
289       fprintf(stderr,"logic thread id is %d\n",index);
290 
291       pthread_mutex_lock(&queue_logic_data_mutex);
292       struct QUEUE_LOGIC_DATA_ITEM *item;
293       item = TAILQ_FIRST(&queue_logic_data_header);
294       TAILQ_REMOVE(&queue_logic_data_header,item,logic_data_entries); 
295       pthread_mutex_unlock(&queue_logic_data_mutex);
296       //logic header
297       switch(item->ev->header.msg_type)
298     {
299     case msg_lost:
300       {
301         rq_lost* rq = (rq_lost*)item->ev->recv_buff;
302         
303         rs_lost rs;
304         rs.header.msg_type = msg_lost;
305         rs.header.size = sizeof(rs_lost);
306         rs.header.length = 0;
307 
308         if(strcmp(rq->cardno,"12345")==0)
309           {
310         rs.is_ok = 1;                        
311           }
312         else
313           {
314         rs.is_ok = 0;
315           }
316         memcpy(&item->ev->header,&rs.header,sizeof(msg_header));
317         item->ev->send_len = sizeof(rs);
318         memcpy(item->ev->send_buff,&rs,sizeof(rs));
319         break;
320       }
321     }
322    
323       //add to send fd queue
324       sem_wait(&bin_sem_send_fd_produce);
325       fprintf(stderr,"after wait send fd produce\n");
326       struct QUEUE_SEND_FD_ITEM *sendItem;
327       sendItem = malloc(sizeof(struct QUEUE_SEND_FD_ITEM));
328       sendItem->ev = item->ev;
329       pthread_mutex_lock(&queue_send_fd_mutex);
330       TAILQ_INSERT_TAIL(&queue_send_fd_header,sendItem,send_fd_entries);
331       pthread_mutex_unlock(&queue_send_fd_mutex);
332       //let system known can send
333       EventAdd(g_epollFd, EPOLLOUT|EPOLLET, item->ev);
334     }
335   return NULL;
336 }
337 
338 int main(int argc, char **argv)
339 {
340   int res;
341   //recv fd queue
342   TAILQ_INIT(&queue_recv_fd_header);
343   res = sem_init(&bin_sem_recv_fd_consume,0,0);
344   if(res)
345     {
346       fprintf(stderr,"sem init consume failed\n");
347       exit(EXIT_FAILURE);
348     }
349 
350   res = sem_init(&bin_sem_recv_fd_produce,0,MAX_EVENTS);
351   if(res)
352     {
353       fprintf(stderr,"sem init produce failed\n");
354       exit(EXIT_FAILURE);
355     }
356 
357   res = pthread_mutex_init(&queue_recv_fd_mutex,NULL);
358   if(res!=0)
359     {
360       perror("create mutex for queue recv failed\n");
361       exit(EXIT_FAILURE);
362     }
363   //logic data queue
364   TAILQ_INIT(&queue_logic_data_header);
365   res = sem_init(&bin_sem_logic_data_consume,0,0);
366   if(res)
367     {
368       fprintf(stderr,"sem init logic data consume failed\n");
369       exit(EXIT_FAILURE);
370     }
371 
372   res = sem_init(&bin_sem_logic_data_produce,0,MAX_EVENTS);
373   if(res)
374     {
375       fprintf(stderr,"sem init logic data produce failed\n");
376       exit(EXIT_FAILURE);
377     }
378 
379   res = pthread_mutex_init(&queue_logic_data_mutex,NULL);
380   if(res!=0)
381     {
382       perror("create mutex for queue logic data failed\n");
383       exit(EXIT_FAILURE);
384     }
385 
386   //send fd queue
387   TAILQ_INIT(&queue_send_fd_header);
388   res = sem_init(&bin_sem_send_fd_consume,0,0);
389   if(res)
390     {
391       fprintf(stderr,"sem init send fd consume failed\n");
392       exit(EXIT_FAILURE);
393     }
394 
395   res = sem_init(&bin_sem_send_fd_produce,0,MAX_EVENTS);
396   if(res)
397     {
398       fprintf(stderr,"sem init send fd produce failed\n");
399       exit(EXIT_FAILURE);
400     }
401 
402   res = pthread_mutex_init(&queue_send_fd_mutex,NULL);
403   if(res!=0)
404     {
405       perror("create mutex for queue send fd failed\n");
406       exit(EXIT_FAILURE);
407     }
408 
409   short port = 3342// default port
410   if(argc == 2){
411     port = atoi(argv[1]);
412   }
413   // create epoll
414   g_epollFd = epoll_create(MAX_EVENTS);
415   if(g_epollFd <= 0
416     {
417       fprintf(stderr,"create epoll failed:fd=%d:function=%s,line=%d\n", g_epollFd,__func__,__LINE__);
418       exit(EXIT_FAILURE);
419     }
420   // create & bind listen socket
421   int listenFd = socket(AF_INET, SOCK_STREAM, 0);
422   // bind & listen
423   struct sockaddr_in sin;
424   bzero(&sin, sizeof(sin));
425   sin.sin_family = AF_INET;
426   sin.sin_addr.s_addr = INADDR_ANY;
427   sin.sin_port = htons(port);
428   bind(listenFd, (const struct sockaddr*)&sin, sizeof(sin));
429   listen(listenFd, 5);
430   fprintf(stderr,"server running:port[%d]\n", port);
431   //create accept thread
432 
433   void *thread_result;
434   pthread_t accept_t;  
435   res = pthread_create(&accept_t,NULL,accept_thread_work,(void *)&listenFd);
436   if(res != 0)
437     {
438       perror("accept create failed\n");
439       exit(EXIT_FAILURE);
440     }
441 
442   //create epoll wait thread
443   pthread_t epoll_wait_t;
444   res = pthread_create(&epoll_wait_t,NULL,epoll_wait_thread_work,NULL);
445   if(res != 0)
446     {
447       perror("create epoll wait thread failed\n");
448       exit(EXIT_FAILURE);
449     }
450   //create two recv data thread
451   pthread_t recv_data_t;
452   res = pthread_create(&recv_data_t,NULL,recv_data_thread_work,(void*)1);
453   if(res!=0)
454     {
455       perror("create recv data thread failed\n");
456       exit(EXIT_FAILURE);
457     }
458 
459   pthread_t recv_data_t_1;
460   res = pthread_create(&recv_data_t_1,NULL,recv_data_thread_work,(void*)2);
461   if(res!=0)
462     {
463       perror("create recv data thread failed\n");
464       exit(EXIT_FAILURE);
465     }
466   //create two send data thread
467   pthread_t send_data_t;
468   res = pthread_create(&send_data_t,NULL,send_data_thread_work,(void*)1);
469   if(res!=0)
470     {
471       perror("create send data thread failed\n");
472       exit(EXIT_FAILURE);
473     }
474 
475   pthread_t send_data_t_1;
476   res = pthread_create(&send_data_t_1,NULL,send_data_thread_work,(void*)2);
477   if(res!=0)
478     {
479       perror("create send data thread failed\n");
480       exit(EXIT_FAILURE);
481     }
482 
483   //create two logic work thread
484   pthread_t logic_work_t;
485   res = pthread_create(&logic_work_t,NULL,logic_data_thread_work,(void*)1);
486   if(res!=0)
487     {
488       perror("create logic work thread failed\n");
489       exit(EXIT_FAILURE);
490     }
491 
492  pthread_t logic_work_t_1;
493   res = pthread_create(&logic_work_t_1,NULL,logic_data_thread_work,(void*)2);
494   if(res!=0)
495     {
496       perror("create logic work thread failed\n");
497       exit(EXIT_FAILURE);
498     }
499 
500   //wait child thread
501   res = pthread_join(accept_t,&thread_result);
502   if(res!=0)
503     {
504       perror("accept thread join failed\n");
505       exit(EXIT_FAILURE);
506     }
507 
508   //wait child thread
509   res = pthread_join(epoll_wait_t,&thread_result);
510   if(res!=0)
511     {
512       perror("epoll wait thread join failed\n");
513       exit(EXIT_FAILURE);
514     }
515 
516   //wait child thread
517   res = pthread_join(recv_data_t,&thread_result);
518   if(res!=0)
519     {
520       perror("recv data thread join failed\n");
521       exit(EXIT_FAILURE);      
522     }
523   //wait child thread
524   res = pthread_join(recv_data_t_1,&thread_result);
525   if(res!=0)
526     {
527       perror("recv data thread join failed\n");
528       exit(EXIT_FAILURE);      
529     }
530 
531   //wait child thread
532   res = pthread_join(send_data_t,&thread_result);
533   if(res!=0)
534     {
535       perror("send data thread join failed\n");
536       exit(EXIT_FAILURE);      
537     }
538   //wait child thread
539   res = pthread_join(send_data_t_1,&thread_result);
540   if(res!=0)
541     {
542       perror("send data thread join failed\n");
543       exit(EXIT_FAILURE);      
544     }
545   //wait child thread
546   res = pthread_join(logic_work_t,&thread_result);
547   if(res!=0)
548     {
549       perror("logic work thread join failed\n");
550       exit(EXIT_FAILURE);      
551     }
552   //wait child thread
553   res = pthread_join(logic_work_t_1,&thread_result);
554   if(res!=0)
555     {
556       perror("logic work thread join failed\n");
557       exit(EXIT_FAILURE);      
558     }
559   // free resource
560   close(g_epollFd);
561   sem_destroy(&bin_sem_recv_fd_consume);
562   sem_destroy(&bin_sem_recv_fd_produce);
563   pthread_mutex_destroy(&queue_recv_fd_mutex);
564 
565   sem_destroy(&bin_sem_logic_data_consume);
566   sem_destroy(&bin_sem_logic_data_produce);
567   pthread_mutex_destroy(&queue_logic_data_mutex);
568 
569   sem_destroy(&bin_sem_send_fd_consume);
570   sem_destroy(&bin_sem_send_fd_produce);
571   pthread_mutex_destroy(&queue_send_fd_mutex);
572   return 0;
573

服务器头

 1 #ifndef _epoll_h_
 2 #define _epoll_h_
 3 
 4 #include "sys/queue.h"
 5 #include <semaphore.h>
 6 #include "msg.h"
 7 
 8 #define MAX_EVENTS 500
 9 
10 int g_epollFd;
11 
12 void *accept_thread_work(void *arg);
13 void *epoll_wait_thread_work(void *arg);
14 void *recv_data_thread_work(void *arg);
15 void *send_data_thread_work(void *arg);
16 void *logic_data_thread_work(void *arg);
17 
18 struct myevent_s
19 {
20   int fd;
21   int status; // 1: in epoll wait list, 0 not in
22   msg_header header;
23   char recv_buff[256]; // recv data buffer
24   int recv_len;
25   char send_buff[256];//send data buffer
26   int send_len;
27   long last_active; // last active time
28 };
29 
30 struct myevent_s g_Events[MAX_EVENTS+1]; // g_Events[MAX_EVENTS] is used by listen fd
31 
32 //recv fd queue
33 struct QUEUE_RECV_FD_ITEM{
34   struct myevent_s* ev;
35   TAILQ_ENTRY(QUEUE_RECV_FD_ITEM) recv_fd_entries;
36 };
37 
38 TAILQ_HEAD(,QUEUE_RECV_FD_ITEM) queue_recv_fd_header;
39 
40 sem_t bin_sem_recv_fd_produce;
41 sem_t bin_sem_recv_fd_consume;
42 
43 pthread_mutex_t queue_recv_fd_mutex;
44 
45 //send fd queue
46 struct QUEUE_SEND_FD_ITEM{
47   struct myevent_s* ev;
48   TAILQ_ENTRY(QUEUE_SEND_FD_ITEM) send_fd_entries;
49 };
50 
51 TAILQ_HEAD(,QUEUE_SEND_FD_ITEM) queue_send_fd_header;
52 
53 sem_t bin_sem_send_fd_produce;
54 sem_t bin_sem_send_fd_consume;
55 
56 pthread_mutex_t queue_send_fd_mutex;
57 
58 //logic data buff
59 struct QUEUE_LOGIC_DATA_ITEM{
60   struct myevent_s* ev;
61   TAILQ_ENTRY(QUEUE_LOGIC_DATA_ITEM) logic_data_entries;
62 };
63 
64 TAILQ_HEAD(,QUEUE_LOGIC_DATA_ITEM) queue_logic_data_header;
65 
66 sem_t bin_sem_logic_data_produce;
67 sem_t bin_sem_logic_data_consume;
68 
69 pthread_mutex_t queue_logic_data_mutex;
70 
71 #endif
72 

 }