diff --git a/tiny_pool.c b/tiny_pool.c index e716785..a8b3798 100644 --- a/tiny_pool.c +++ b/tiny_pool.c @@ -83,7 +83,7 @@ void task_queue_push(pool_t *pool, task_t *task) { printf("push success -> size = %d\n", pool->task_queue_size); pthread_mutex_unlock(&pool->task_queue_busy); // unlock task queue - if (pool->status >= RUNNING) { // avoid send signal in PREPARING stage + if (pool->status > PREPARING) { // avoid send signal in PREPARING stage printf("signal -> queue not empty\n"); @@ -100,6 +100,8 @@ task_t* task_queue_pop(pool_t *pool) { // pop one task with blocking wait printf("pop start wait\n"); + // TODO: at EXITING process may receive active broadcast -> we should stop pop task here + // should we cancel thread here directly, or return NULL for sub thread loop? pthread_cond_wait(&pool->task_queue_not_empty, &pool->task_queue_busy); // wait new task added printf("pop exit wait\n"); @@ -108,11 +110,12 @@ task_t* task_queue_pop(pool_t *pool) { // pop one task with blocking wait printf("pop new task\n"); + bool queue_empty = false; task_t *front = pool->task_queue_front; if (pool->task_queue_front == pool->task_queue_rear) { // only one element pool->task_queue_front = NULL; // clear task queue pool->task_queue_rear = NULL; - pthread_cond_signal(&pool->task_queue_empty); // active blocking join thread + queue_empty = true; } else { pool->task_queue_front = front->next; // pop first task } @@ -121,65 +124,55 @@ task_t* task_queue_pop(pool_t *pool) { // pop one task with blocking wait printf("pop success -> size = %d\n", pool->task_queue_size); pthread_mutex_unlock(&pool->task_queue_busy); // unlock task queue + + if (queue_empty) { // send signal after mutex unlocked + pthread_cond_signal(&pool->task_queue_empty); // active blocking pool join thread + } return front; // success pop one task } bool tiny_pool_submit(pool_t *pool, void (*func)(void*), void *arg) { - - // check status -> failed - if (pool->status == EXITING) { - return false; - // TODO: return false here + if (pool->status > RUNNING) { + return false; // adding task is prohibited after STOPPING } - - // TODO: malloc error -> return bool false task_t *new_task = (task_t*)malloc(sizeof(task_t)); - - new_task->func = func; + if (new_task == NULL) { + return false; // malloc new task error -> stop submit + } + new_task->func = func; // load custom task new_task->arg = arg; new_task->next = NULL; - - // TODO: new task push into task queue - task_queue_push(pool, new_task); - - // TODO: queue push may failed -> return false - - // TODO: return bool true - - return true; + task_queue_push(pool, new_task); // push into task queue + return true; // task push success } -void* thread_entry(void *pool_ptr) { - - // TODO: main loop for one thread - - // TODO: check if thread pool exiting - - // TODO: pop one task --failed--> blocking wait - // --success--> start running and then free task_t - +void* thread_entry(void *pool_ptr) { // main loop for sub thread pool_t *pool = (pool_t*)pool_ptr; + while (pool->status != EXITING) { // loop until enter EXITING mode - while (pool->status != EXITING) { + printf("sub thread working\n"); - printf("thread working\n"); + task_t *task = task_queue_pop(pool); // pop one task -> blocking function - task_t *task = task_queue_pop(pool); + pthread_mutex_lock(&pool->busy_thr_num_mutex); + ++pool->busy_thr_num; // change busy thread number + pthread_mutex_unlock(&pool->busy_thr_num_mutex); - // task working - task->func(task->arg); + task->func(task->arg); // start running task function - free(task); + pthread_mutex_lock(&pool->busy_thr_num_mutex); + --pool->busy_thr_num; // change busy thread number + pthread_mutex_unlock(&pool->busy_thr_num_mutex); + free(task); // free allocated memory } printf("sub thread exit\n"); - return NULL; - + return NULL; // sub thread exit } - +// TODO: should we return a bool value? void tiny_pool_boot(pool_t *pool) { // TODO: create admin thread @@ -209,6 +202,7 @@ void tiny_pool_boot(pool_t *pool) { } + //void tiny_pool_kill(pool_t *pool) { // // printf("pool enter EXITING status\n"); diff --git a/tiny_pool.h b/tiny_pool.h index e0e5e9b..34d643d 100644 --- a/tiny_pool.h +++ b/tiny_pool.h @@ -46,10 +46,10 @@ /// or `tiny_pool_detach` interface. enum pool_status { - PREPARING, - RUNNING, - STOPPING, - EXITING, + PREPARING = 0, + RUNNING = 1, + STOPPING = 2, + EXITING = 3, }; typedef struct task_t {