epoll

epoll是一种事件驱动的io模式,epoll不仅可以应用在网络io上,时钟也是适用,得益于linux/Unix的一切皆为文件理论。这也意味着系统的所有资源都可以用文件来表示,这些资源都用一个具有read(),write(), close()接口的文件描述符进行管理。

Linux也提供了文件描述符的监听事件功能,比如在一个目录下创建一个文件时钟超时的事件。

epoll的触发模式有2种:1、水平触发(电平触发-LT);2、边沿触发(ET)

EPOLL 在有些情况下是不建议使用的 如:已连接套接字不多,并且这些套接字非常活跃;都是活跃,那就不用等什么事件了,直接操作就完事了。

  • 水平触发

Write EPOLLOUT EVENT: 高电平: writebuf内核有空闲空间,我们就说他处于高电平状态,也就是一直处于活跃状态。如果应用没有及时写入数据,此时可能会产生busy waitting loop 低电平: 当writebuf内核没有空闲空间,我们就说他处于低电平状态,没有激活。

或者这样解析: 内核中的socket接收缓冲区为空 =》 低电平 内核中的socket接收缓冲区不为空 =》 高电平

Read EPOLLIN EVENT:

高电平:ReadBuf内核有数据可读,如果应用没有及时把这些数据读取掉,那么也可能产生busy waitting loop的情况 低电平:ReadBuf内核没有数据可读,处于低电平状态,没有被激活。

或者这样解析: 内核中的socket发送缓冲区不满 =》 高电平 内核中的socket发送缓冲区满 =》 低电平

总之,应用在使用epoll是尽量写满内核写缓冲区和读空内核读缓存区,避免空闲等待.

  • 边缘触发

在该模式下,一开始我们就要关注EPOLLIN事件和EPOLLOUT事件 , 此时writbuf一直处于高电平,不会触发EPOLLOUT事件.而EPOLLIN可能会产生,因为开始Readbuf是空的,如果在 epoll_wait前,readbuf有数据了,那么就有unreadablr--->readable, 也就是产生了EPOLLLIN事件,这也是为什么监听socket 能够接收到外来请求的原因。 关注EPOLLINEPOLLOUT事件后,我们也没必要取消他们的关注,只有到断开他们的socketfd时 , 我们才需要取消。

区别:LT事件不会丢弃,而是只要读buffer里面有数据可以让用户读取,则不断的通知你。而ET则只在事件发生之时通知。

timerfd

link: timerfd-source

一般文件描述的使用步骤都是调用一个初始化函数来创建一个文件描述符,也就是句柄。如:

1
2
3
4
5
6
#include<sys/timerfd.h>

int fd = timerfd_create(CLOCK_MONOTONIC, TFD_MONOBLOCK | TFD_CLOEXEC); 
if (fd == -1) {
    //handle_error ... 
}

TFD_MONOBLOCK, TFD_CLOEXEC这些参数和MONOBLOCK,CLOEXEC是一样的意思,使用异或来互相操作,默认问0。

timerfd的使用例子

时钟的文件描述符使用timerfd_create来进行创建,创建成功后,调用timerfd_settime()来设置事件超时,时钟描述符主要使用一个struct timerspec的变量来初始化。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
// 时钟超时间隔
struct timespec it_interval {
    time_t tv_sec 
    time_t tv_nsec
}

// 时钟超时时间
struct timespec it_value {
    time_t tv_sec
    time_t tv_nsec 
}

应用调用read()函数来等待时钟超时事件的到来即可,read()返回一个uint_64的整数,这个整数表示超时的次数,这些次数不会叠加,如:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/* This program prints "Beep!" every second with timerfd. */
#include <inttypes.h>
#include <stdio.h>
#include <sys/timerfd.h>
#include <unistd.h>

int main(void)
{
    int fd;
    struct itimerspec timer = {
        .it_interval = {1, 0},  /* 1 second */
        .it_value    = {1, 0},
    };
    uint64_t count;

    /* No error checking! */
    fd = timerfd_create(CLOCK_MONOTONIC, 0);
    timerfd_settime(fd, 0, &timer, NULL);
    int i = 0; 
    for (;;) {
        sleep(i);
        i++;
        read(fd, &count, sizeof(count));
        printf("Beep!\n");
        printf("count: %d, i:%d\n", count, i);
    }

    return 0;
}

输出:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
Beep!
count: 1, i: 1
Beep!
count: 1, i: 2
Beep!
count: 2, i: 3
Beep!
count: 3, i: 4
Beep!
count: 4, i: 5
Beep!
count: 5, i: 6

signalfd

也可以使用时钟信号的方式来实现定时事件……, 这个此处省略…. 当然,其他信号也可以使用epoll来监听

inotify

这也不说了….

timerfd, signalfd使用epoll处理

epoll的触发模式有:水平触发(电平触发) , 边缘触发。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
#include <inttypes.h>
#include <signal.h>
#include <stdio.h>
#include <sys/epoll.h>
#include <sys/timerfd.h>
#include <sys/signalfd.h>
#include <unistd.h>

int main(void)
{
    int efd, sfd, tfd;
    struct itimerspec timer = {
        .it_interval = {1, 0},  /* 1 second */
        .it_value    = {1, 0},
    };
    uint64_t count;
    sigset_t sigmask;
    struct signalfd_siginfo siginfo;
#define MAX_EVENTS 2
    struct epoll_event ev, events[MAX_EVENTS];
    ssize_t nr_events;
    size_t i;

    /* No error checking! */
    tfd = timerfd_create(CLOCK_MONOTONIC, 0);
    timerfd_settime(tfd, 0, &timer, NULL);

    sigfillset(&sigmask);
    sigprocmask(SIG_BLOCK, &sigmask, NULL);
    sfd = signalfd(-1, &sigmask, 0);

    // new a epoll instance
    efd = epoll_create1(0);

        // event type ==> read
    ev.events = EPOLLIN;
    // add timer fd
    ev.data.fd = tfd;
    epoll_ctl(efd, EPOLL_CTL_ADD, tfd, &ev);

    // event type => read
    ev.events = EPOLLIN;
    // add signal fd
    ev.data.fd = sfd;
    epoll_ctl(efd, EPOLL_CTL_ADD, sfd, &ev);

    for (;;) {
        nr_events = epoll_wait(efd, events, MAX_EVENTS, -1);
        for (i = 0; i < nr_events; i++) {
            if (events[i].data.fd == tfd) {
                read(tfd, &count, sizeof(count));
                printf("Beep!\n");
            } else if (events[i].data.fd == sfd) {
                read(sfd, &siginfo, sizeof(siginfo));
                printf("Received signal number %d\n", siginfo.ssi_signo);
            }
        }
    }
    return 0;
}

输出:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
$ ./ep
Beep!
Beep!
Beep!
Beep!
Beep!
Beep!
^CReceived signal number 2
Beep!
Beep!
^CReceived signal number 2
Beep!
Beep!
Beep!

构建一个基于epoll的简单定时任务

首先需要构建一个简单的任务池,为了更好的并发,任务池需要一个任务队列,这个任务队列保存了任务类型和任务数据(其实就是TaskFunc and FuncData).

构建 ThreadPool

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
            |----------|
            |ThreadPool|
            |----------|
                  ↑
                  |
      ------------ ------------|           
      |                        |
      |                        |      
   |------|   ---------     |-------|             
   |Woker |   |  Init |     | free  |  
   | Job  |   |   Job |     | Job   |
   | Queue|   | Qeuue |     | Queue |  
   |------|   |-------|     |-------|  
       ↑                        ↑
       |                        |
       |----->|Job Thread|----->|

这个简单的线程池整体的设计就是这样,线程池对象包括主要包括三部分,worker Job QueueFree Job QueueJob Thread.

  • Worker Job Queue

这是一个任务队列,这些任务队列有任务函数job_cb和数据参数data组成

  • Free Job Queue

这是一个空闲队列,这个队列主要是用来做任务回收(可以说是一个内存管理器), 用户首先从空闲队列中获取一个job_cbdata指正,然后把自身的函数和数据赋给job_cbdata,在把组装好的任务放到worker Job Queue中。

1
2
3
TODO:

把 Init Job Queue去掉,并且把Free Job Queue交给一个内存管理线程进行管理
  • Job Thread

job thread 是线程对象,这些线程对象首先会到worker job queue中获取到一个任务,执行完这个任务时,会把这个人回收到free job queue

source code:

header:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
/**/
#ifndef THREADPOOL_H_
#define THREADPOOL_H_
struct threadpool;
/**
This function creates a newly allocated thread pool
**/

struct threadpool * threadpool_init(int numbers);

int threadpool_add_job(struct threadpool *pool, void (*func )(void *), void *data, int blocking);

void threadpool_free(struct threadpool *pool, int blocking);
#endif /*THREADPOOL_H_*/

source:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
#include "thr_pool.h"

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

#define LOG_DEBUG(s) printf("line %d,:%s, %s- \r\n", __LINE__, __func__, s); 
#define THREAD_POOL_QUEUE_SIZE 10

// queue iterm 
struct threadpool_job {
    void (*job_cb)(void*);
    void *data;
};  

struct threadpool_queue {
    unsigned int head;
    unsigned int tail;
    
    unsigned int iterm_size;
    
    void *iterms[THREAD_POOL_QUEUE_SIZE];
};

struct threadpool {
    struct threadpool_queue job_queue; // job queue 
    struct threadpool_queue free_job_queue; // free job queue 
    
    struct threadpool_job jobs[THREAD_POOL_QUEUE_SIZE];
    
    pthread_t *thr_arr;
    
    unsigned short thread_size; // thread size 
    volatile unsigned short stop_flag; // stop or start flag 
    
    pthread_mutex_t free_job_mutex; // free job queue 
    pthread_mutex_t mutex;  // 
    pthread_cond_t free_job_cond;
    pthread_cond_t cond;
} ;

/**
* This funtion inits a thread queue 
*
*@param queue The queue structure
*/

static void threadpool_queue_init(struct threadpool_queue *queue){
    int i ; 
    for (i =0; i < THREAD_POOL_QUEUE_SIZE; i ++){
        queue->iterms[i] = NULL;
    }
    
    queue->head = 0;
    queue->tail = 0;
    queue->iterm_size = 0;
}

/**
 * This function adds data to the tail of the queue.
 *
 * @param queue The queue structure.
 * @param data The data to be added to the queue.
 * @return 0 on success (the data was added to the queue) else -1 is returned.
 */

static int threadpool_queue_enqueue(struct threadpool_queue *queue, void *data) {
    if (queue->iterm_size == THREAD_POOL_QUEUE_SIZE) {
        LOG_DEBUG("The queue is full, unable to add data to it.");
        return -1;
    }
    
    if(queue->iterms[queue->tail] != NULL) {
        LOG_DEBUG("A problem was detected in the queque (expected NULL, but found a different value.");
        return -1;
    }
    
    queue->iterms[queue->tail] = data;
    queue->tail++;
    queue->iterm_size ++;
    LOG_DEBUG("Add Job\n");
    if(queue->tail == THREAD_POOL_QUEUE_SIZE) {
        queue->tail = 0;
    }
    return 0;
}

static void *threadpool_queue_dequeue(struct threadpool_queue *queque){
    void *data;
    
    if(queque->iterm_size == 0) {
        LOG_DEBUG("Tried to dequeue from an empty queque");
        return NULL;
    }
    
    data = queque->iterms[queque->head];
    queque->iterms[queque->head] = NULL;
    queque->iterm_size --;
    
    if (queque->iterm_size == 0){
        queque->head = 0;
        queque->tail = 0;
    }else{
        queque->head++;
        if(queque->head == THREAD_POOL_QUEUE_SIZE) {
            queque->head=0;
        }
    }
    
    return data;
}

static int threadpool_queue_is_empty(struct threadpool_queue *q){
    //return q->iterm_size?0:1;
    if (q->iterm_size==0){
        return 1;
    }
    return 0;
}

static int threadpool_queue_size(struct threadpool_queue *queque) {
    return queque->iterm_size;
}

static void threadpool_job_init(struct threadpool_job *job){
    job->data = NULL;
    job->job_cb = NULL;
}

/*
 * Ths function obtains a queued job from the pool and return it. 
 * if no such job is available the operation blocks. 
 *
 * @param pool the thread pool structure. 
 * @return a job or null error (or if thread pool should shut down ) . *
 * */
static struct threadpool_job * threadpool_get_job(struct threadpool *pool){
    struct threadpool_job *job;
    
    // lock threadpool
    if (pthread_mutex_lock(&(pool->mutex))){
        printf("pthread_mutex_lock error.\r\n");
        return NULL;
    }

    while(threadpool_queue_is_empty(&(pool->job_queue))){
            if (pool->stop_flag) {
                if(pthread_cond_broadcast(&(pool->cond))){
                    LOG_DEBUG("thread_cond_broadcast error.\r\n");
                }

                if (pthread_mutex_unlock(&(pool->mutex))){
                    LOG_DEBUG("pthread_mutex_unlock.\r\n");
                    return NULL;
                }
                return NULL;
            }
            // Block until a new job arrives .  
            if (pthread_cond_wait(&(pool->cond), &(pool->mutex))) {
                LOG_DEBUG("pthread_cond_wait.\r\n"); 
                if (pthread_mutex_unlock(&(pool->mutex))) {
                    LOG_DEBUG("pthread_mutex_unlock. \r\n");
                } 
                return NULL;
            }
            LOG_DEBUG("Wait for a job comming");
        }
        
        job = (struct threadpool_job*)threadpool_queue_dequeue(&(pool->job_queue));
    if (job== NULL){
        LOG_DEBUG("Failded to obtain a job from the job queue.");
    }
    
    if (pthread_mutex_unlock(&(pool->mutex))){
        LOG_DEBUG("pthread_mutex_unlock.");
        return NULL;
    }
    
    return job; 
}

static void *job_cb_func(void *data){
    struct threadpool *pool = (struct threadpool*)data;
    struct threadpool_job *job;
    LOG_DEBUG("do in thread ...");
    while(1){
        // take a job 
        LOG_DEBUG("REUEST A JOB");
        job = threadpool_get_job(pool);
        LOG_DEBUG("GOT A JOB");
        if (job == NULL) {
            if (pool->stop_flag) {
                // job thr need to exit (thread pool was shutdown)
                break;
            }else {
                // An error has occurented.
                LOG_DEBUG("a error happend when trying to obtain a work job.");
                LOG_DEBUG("The job thread has exited.");
                break;
            }
        }
        
        if(job->job_cb){
            job->job_cb(job->data);
            
            // release the job by returing it to the free job queue.
            threadpool_job_init(job);
            
            if (pthread_mutex_lock(&(pool->free_job_mutex))) {
                LOG_DEBUG("pthread mutex lock error.");
                LOG_DEBUG("The job thread has exited.");
                break;
            }
            
            // put the idle job into thread pool's free job queue   
            if (threadpool_queue_enqueue(&(pool->free_job_queue), job)) {
                LOG_DEBUG("Failed to enqueue a job to free job queue.");
                if (pthread_mutex_unlock(&(pool->free_job_mutex))) {
                    LOG_DEBUG("pthread mutext unlock error.");
                }
                
                LOG_DEBUG("The Job thread has exited."); 
                break;
            }
            
            if (threadpool_queue_size(&(pool->free_job_queue)) ==1) {
                // Notify all waiting threads that new tasks can added 
                if (pthread_cond_broadcast(&(pool->free_job_cond))) {
                    LOG_DEBUG("pthread cond broadcast . "); 
                    
                    if (pthread_mutex_unlock(&(pool->free_job_mutex))) {
                        LOG_DEBUG("pthread mutex unlock.");
                    }
                    
                    break;
                }
            }
            
            if (pthread_mutex_unlock(&(pool->free_job_mutex))) {
                LOG_DEBUG("The job thread has exited."); 
                break;
            }
        }
    }   

    LOG_DEBUG("Thread Exit ...");
    
    return NULL;
}

static void *stop_job_thr_func_cb(void *ptr) {
    int i ; 
    struct threadpool *pool = (struct threadpool *)ptr;
    
    if (pthread_mutex_lock(&(pool->mutex))) {
        LOG_DEBUG("Warning: Memory was not released .");
        LOG_DEBUG("Warning: Some of the job threads may have failed to exit.");
        return NULL;
    }
    
    pool->stop_flag = 1; 
    
    while(!threadpool_queue_is_empty(&(pool->job_queue))){
        LOG_DEBUG("Wait All Sub thread exit...");
        // Blocking until all job have been executed, safely stop the thread pool .
        if (pthread_cond_wait(&(pool->cond), &(pool->mutex))){
            LOG_DEBUG("pthread cond wait.");
            if (pthread_mutex_unlock(&(pool->mutex))){
                LOG_DEBUG("pthread mutex unlock.");
            }
            return NULL;
        }
    }
    
    LOG_DEBUG("Wait All Sub thread exit...");
    // Wakeup all job threads (broadcast operation). 
    if (pthread_cond_broadcast(&(pool->cond))){
        LOG_DEBUG("Warning: Memory was not released."); 
        LOG_DEBUG("Warning: Some of the job threads my have failed to exit .");
        return NULL;
    }

    if (pthread_mutex_unlock(&(pool->mutex))){
        LOG_DEBUG("pthread mutex unlock error.");
        return NULL;
    }

    // Wait until all job threads are done. 
    for (i = 0; i < pool->thread_size; i ++) {
        LOG_DEBUG("Wait sub thread exit..");
        if (pthread_join(pool->thr_arr[i], NULL)) {
            LOG_DEBUG("pthread join error.");
        }
    }
    
    // Free all allocated memory 
    free(pool->thr_arr);
    free(pool);
    
    return NULL;    
}

struct threadpool * threadpool_init( int threads_size){
    struct threadpool *pool; 
    int i; 
    
    // Create the thread pool struct. 
    pool = malloc(sizeof(struct threadpool));
    if (pool == NULL) {
        LOG_DEBUG("malloc thread pool error.");
        return NULL;
    }
    
    pool->stop_flag = 0; 
    
    // Init the mutex and cond vars 
    if (pthread_mutex_init(&(pool->free_job_mutex), NULL)) {
        LOG_DEBUG("pthread mutex init ."); 
        free(pool);
        return NULL;
    }
    if (pthread_mutex_init(&(pool->mutex), NULL)) {
        LOG_DEBUG("pthread mutex init error"); 
        free(pool);
        return NULL;
    }
    if (pthread_cond_init(&(pool->free_job_cond), NULL)) {
        LOG_DEBUG("pthread cond init error");
        free(pool);
        return NULL;
    }
    if (pthread_cond_init(&(pool->cond), NULL)) {
        LOG_DEBUG("pthread cond init error"); 
        free(pool);
        return NULL;
    }
    
    // Init the jobs queue 
    threadpool_queue_init(&(pool->job_queue)); 
    
    // Init the free job queue . 
    threadpool_queue_init(&(pool->free_job_queue)); 
    
    // Add all the free job to the free job queue 
    for (i = 0; i < THREAD_POOL_QUEUE_SIZE; i++){
        threadpool_job_init((pool->jobs) + i);
        if (threadpool_queue_enqueue(&(pool->free_job_queue), (pool->jobs) +i)) {
            LOG_DEBUG("Failed to a job to the free job queue during initialization."); 
            free(pool);
            return NULL;
        }
    }

    printf("The free Queue is : %d\n", pool->free_job_queue.iterm_size);
    printf("The Worker Queue size is: %d\n", pool->job_queue.iterm_size);

    
    // Create the thr_arr. 
    pool->thr_arr = malloc(sizeof(pthread_t) * threads_size);
    if (pool->thr_arr == NULL) {
        LOG_DEBUG("malloc thread arr error."); 
        free(pool);
        return NULL;
    }
    
    // Start the job threads 
    for (pool->thread_size = 0; pool->thread_size < threads_size; (pool->thread_size ++)) {
        if (pthread_create(&(pool->thr_arr[pool->thread_size]), NULL, job_cb_func, pool)) {
            LOG_DEBUG("pthread create function error."); 
            threadpool_free(pool, 0); 
            return NULL;
        }
    }
    
    return pool;
}

int threadpool_add_job (struct threadpool *pool, void (*job_cb)(void *), void *data, int blocking){
    struct threadpool_job * job ; 
    if (pool == NULL) {
        LOG_DEBUG("The thread pool is null"); 
        return -1;
    }
    
    // lock free job queque
    if (pthread_mutex_lock(&(pool->free_job_mutex))){
        LOG_DEBUG("pthread mutext lock err."); 
        return -1;
    }
    
    // Check if the free job queque is empty 
    while(threadpool_queue_is_empty(&(pool->free_job_queue))) {
        // The job is empty 
        LOG_DEBUG("The Free Queue Size equal zero");
        if(!blocking) {
            // Return immediately if the command is non blocking 
            if (pthread_mutex_unlock(&(pool->free_job_mutex))) {
                LOG_DEBUG("pthread mutex unlock."); 
                return -1;
            }
            return -2;
        }
        
        // blocking is set to 1, wait until free job queue has a job to obtain 
        if (pthread_cond_wait(&(pool->free_job_cond), &(pool->free_job_mutex))) {
            LOG_DEBUG("pthread cond mutex."); 
            if (pthread_mutex_unlock(&(pool->free_job_mutex))) {
                LOG_DEBUG("pthread mutex unlock."); 
            }
            return -1;
        }
    }
    
    // Obtain an empty job. 
    job = (struct threadpool_job*) threadpool_queue_dequeue(&(pool->free_job_queue));
    if (job == NULL) {
        LOG_DEBUG("Failed to obtain an empty job from the free job queue."); 
        if (pthread_mutex_unlock(&(pool->free_job_mutex))) {
            LOG_DEBUG("pthread mutex unlock.");
        }
        
        return -1;
    }
    
    if (pthread_mutex_unlock(&(pool->free_job_mutex))){
        LOG_DEBUG("pthread mutex unlock."); 
        return -1;
    }
    
    job->data = data; 
    job->job_cb = job_cb; 
    
    // Add the job into the job queue 
    if (pthread_mutex_lock(&(pool->mutex))) {
        LOG_DEBUG("pthread mutex lock."); 
        return -1;
    }
    
    if (threadpool_queue_enqueue(&(pool->job_queue), job)){
        LOG_DEBUG("Failed to add new job to the job queue."); 
        if (pthread_mutex_unlock(&(pool->mutex))) {
            LOG_DEBUG("pthread mutex unlock."); 
        }
        return -1;
    }
    
    if (threadpool_queue_size(&(pool->job_queue)) == 1) {
        // Notify all worker threads that there are new jobs 
        if (pthread_cond_broadcast(&(pool->cond))) {
            LOG_DEBUG("pthread cond broadcast error"); 
            if (pthread_mutex_unlock(&(pool->mutex))) {
                LOG_DEBUG("pthread mutex unlock error");
            }
            return -1;
        }
    }
    if (pthread_mutex_unlock(&(pool->mutex))) {
        LOG_DEBUG("pthread mutex unlock error");
        return -1; 
    } 
    return 0;
}

void threadpool_free(struct threadpool *pool, int blocking){
    pthread_t thr; 
    if (blocking) {
        stop_job_thr_func_cb(pool);
    }else{
        if (pthread_create(&thr, NULL, stop_job_thr_func_cb, pool)) {
            LOG_DEBUG("pthread create function error.");
            pool->stop_flag = 1;
        }       
    }
}

test Code:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
#include "thr_pool.h"
#include<stdio.h>
#include<stdlib.h>
#include<unistd.h>
#include<pthread.h>

#define ARR_SIZE 1000000

static pthread_mutex_t count_mutex = PTHREAD_MUTEX_INITIALIZER; 
static int count; 

void fast_job1(void *ptr) {
    int *pval = (int*)ptr;
    int i; 
    printf("Do in fast_job1");
    for (i =0; i < 1000; i ++) {
        (*pval) ++;
    }
    
    printf("ptr:%d\r\n", pval);    
    pthread_mutex_lock(&count_mutex);
    count++; 
    pthread_mutex_unlock(&count_mutex);
}

void slow_job(void *ptr) {
    printf("slow job: count value is %d\n", count); 
    
    pthread_mutex_lock(&count_mutex);
    
    count ++; 
    
    pthread_mutex_unlock(&count_mutex);
}


int main (){
    struct threadpool *p; 
    
    int arr[ARR_SIZE], i, ret, failed_count =0; 
    for (i =0; i < ARR_SIZE; i++){
        arr[i] = i;
    }
    
    p = threadpool_init(200) ; 
    if (p==NULL) {
        printf("Error! Failed to create a threadpool struct \n"); 
        exit(EXIT_FAILURE);
    }
        
    
    for (i =0 ; i < ARR_SIZE; i ++) {
        if (i % 2==0){
            ret = threadpool_add_job(p, slow_job, arr +i , 1);
        }else {
            ret = threadpool_add_job(p, fast_job1, arr+i,  0);
        }
        
        if (ret == -1) {
            printf("An error had occurred while adding a job, %d\r\n", ret);
            exit(EXIT_FAILURE);
        }
        if (ret == -2 ){
            failed_count ++;
        }
    }
    threadpool_free(p, 1);
    // stop the pool 
    printf("Example ended.\n");
    printf("%d tasks out of %d have been executed.\n",count,ARR_SIZE);
    printf("%d tasks out of %d did not execute since the pool was overloaded.\n",failed_count,ARR_SIZE);
    printf("All other tasks had not executed yet.");
    return 0;

}

构建一个eventloop对象

好吧,暂时没时间,先这样…….

以后有时间再能吧.

资料来源: - http://www.enodev.fr/posts/notifications-with-file-descriptors.html - http://docs.oracle.com/cd/E19253-01/816-5137/ggedn/index.html