diff --git a/src/tiny_pool.c b/src/tiny_pool.c index 955a26b..508654f 100644 --- a/src/tiny_pool.c +++ b/src/tiny_pool.c @@ -2,6 +2,76 @@ #include #include "tiny_pool.h" +void* run_pool_join(void *pool) { // single function for detach + tiny_pool_join((pool_t*)pool); + pthread_exit(NULL); +} + +void tiny_pool_detach(pool_t *pool) { + pthread_t tid; + pthread_create(&tid, NULL, run_pool_join, (void*)pool); + pthread_detach(tid); +} + +void free_tiny_pool(pool_t *pool) { + pthread_cond_destroy(&pool->without_busy_thread); + pthread_cond_destroy(&pool->task_queue_not_empty); + pthread_cond_destroy(&pool->task_queue_empty); + pthread_mutex_destroy(&pool->mutex); + free(pool->threads); + free(pool); +} + +void tiny_pool_kill(pool_t *pool) { + if (pool->status > PREPARING) { + for (uint32_t i = 0; i < pool->thread_num; ++i) { + pthread_cancel(pool->threads[i]); + } + } + free_tiny_pool(pool); // release thread pool +} + +void task_queue_push(pool_t *pool, task_t *task) { + if (pool->task_queue_rear == NULL) { // task queue is empty + pool->task_queue_front = task; + pool->task_queue_rear = task; // init task queue with one element + } else { + pool->task_queue_rear->next = task; // task queue push back + pool->task_queue_rear = task; + } + ++pool->task_queue_size; +} + +task_t* task_queue_pop(pool_t *pool) { // pop one task with blocking wait + /// wait until task queue not empty + pthread_mutex_lock(&pool->mutex); // lock pool struct + while (pool->task_queue_front == NULL) { // loop until task queue not empty + pthread_cond_wait(&pool->task_queue_not_empty, &pool->mutex); // wait new task added + if (pool->status == EXITING) { + pthread_mutex_unlock(&pool->mutex); + pthread_exit(NULL); // sub thread exit at EXITING stage + } + } + + /// pop first task from queue + bool empty_flag = false; + task_t *front = pool->task_queue_front; + pool->task_queue_front = front->next; // pop first task + if (front->next == NULL) { // only one element + pool->task_queue_rear = NULL; // clear task queue + empty_flag = true; + } + --pool->task_queue_size; + ++pool->busy_thr_num; // task must pop by one ready thread + pthread_mutex_unlock(&pool->mutex); // unlock task queue + + /// send signal to active blocking thread + if (empty_flag) { // send signal after mutex unlocked + pthread_cond_signal(&pool->task_queue_empty); // active pool join thread + } + return front; // success pop one task +} + pool_t* tiny_pool_create(uint32_t size) { /// thread pool struct create pool_t *pool = (pool_t*)malloc(sizeof(pool_t)); @@ -55,57 +125,6 @@ pool_t* tiny_pool_create(uint32_t size) { return pool; // tiny thread pool create success } -void task_queue_push(pool_t *pool, task_t *task) { -// printf("push new task %d\n", *(int*)task->arg); - if (pool->task_queue_rear == NULL) { // task queue is empty - pool->task_queue_front = task; - pool->task_queue_rear = task; // init task queue with one element - } else { - pool->task_queue_rear->next = task; // task queue push back - pool->task_queue_rear = task; - } - ++pool->task_queue_size; -// printf("push success -> size = %d\n", pool->task_queue_size); -} - -task_t* task_queue_pop(pool_t *pool) { // pop one task with blocking wait -// printf("%lu -> start pop task\n", pthread_self()); - - /// wait until task queue not empty - pthread_mutex_lock(&pool->mutex); // lock pool struct - while (pool->task_queue_front == NULL) { // loop until task queue not empty -// printf("%lu -> pop wait\n", pthread_self()); - pthread_cond_wait(&pool->task_queue_not_empty, &pool->mutex); // wait new task added -// printf("%lu -> pop exit wait\n", pthread_self()); - if (pool->status == EXITING) { - pthread_mutex_unlock(&pool->mutex); -// printf("%lu -> sub thread exit from idle\n", pthread_self()); - pthread_exit(NULL); // sub thread exit at EXITING stage - } - } -// printf("%lu -> pop new task %d\n", pthread_self(), *(int*)pool->task_queue_front->arg); - - /// pop first task from queue - bool empty_flag = false; - task_t *front = pool->task_queue_front; - pool->task_queue_front = front->next; // pop first task - if (front->next == NULL) { // only one element - pool->task_queue_rear = NULL; // clear task queue - empty_flag = true; - } - --pool->task_queue_size; - ++pool->busy_thr_num; // task must pop by one ready thread -// printf("%lu -> pop success -> size = %d\n", pthread_self(), pool->task_queue_size); - pthread_mutex_unlock(&pool->mutex); // unlock task queue - - /// send signal to active blocking thread - if (empty_flag) { // send signal after mutex unlocked -// printf("signal -> task queue empty\n"); - pthread_cond_signal(&pool->task_queue_empty); // active pool join thread - } - return front; // success pop one task -} - bool tiny_pool_submit(pool_t *pool, void (*func)(void*), void *arg) { /// pre-check to avoid invalid mutex waiting if (pool->status > RUNNING) { @@ -137,7 +156,6 @@ bool tiny_pool_submit(pool_t *pool, void (*func)(void*), void *arg) { /// send signal to active blocking thread if (signal_flag) { -// printf("signal -> queue not empty\n"); pthread_cond_signal(&pool->task_queue_not_empty); // active one blocking thread } return true; // task push success @@ -145,10 +163,7 @@ bool tiny_pool_submit(pool_t *pool, void (*func)(void*), void *arg) { void* thread_entry(void *pool_ptr) { // main loop for sub thread pool_t *pool = (pool_t*)pool_ptr; -// printf("%lu -> sub thread begin\n", pthread_self()); while (pool->status != EXITING) { // loop until enter EXITING stage -// printf("%lu -> sub thread working\n", pthread_self()); - /// pop a task and execute it task_t *task = task_queue_pop(pool); // pop one task -> blocking function task->entry(task->arg); // start running task function @@ -163,7 +178,6 @@ void* thread_entry(void *pool_ptr) { // main loop for sub thread pthread_cond_signal(&pool->without_busy_thread); } } -// printf("%lu -> sub thread exit\n", pthread_self()); pthread_exit(NULL); // sub thread exit } @@ -183,31 +197,10 @@ void tiny_pool_boot(pool_t *pool) { for (uint32_t i = 0; i < pool->thread_num; ++i) { // create working threads pthread_create(&(pool->threads[i]), NULL, thread_entry, (void*)pool); } -// printf("thread boot complete\n"); pthread_mutex_unlock(&pool->mutex); } -void free_tiny_pool(pool_t *pool) { - -// printf("start free pool resource\n"); - - pthread_cond_destroy(&pool->without_busy_thread); - pthread_cond_destroy(&pool->task_queue_not_empty); - pthread_cond_destroy(&pool->task_queue_empty); - pthread_mutex_destroy(&pool->mutex); - - free(pool->threads); - free(pool); - -// printf("free pool resource complete\n"); - -} - -#include - bool tiny_pool_join(pool_t *pool) { -// printf("start pool join\n"); - /// pre-check to avoid invalid mutex waiting if (pool->status != RUNNING) { return false; @@ -220,63 +213,18 @@ bool tiny_pool_join(pool_t *pool) { return false; // only allow to join at RUNNING stage } pool->status = STOPPING; -// printf("pool status -> STOPPING\n"); - -// printf("wait task queue\n"); - while (pool->task_queue_front != NULL) { + while (pool->task_queue_front != NULL) { // wait empty task queue pthread_cond_wait(&pool->task_queue_empty, &pool->mutex); } -// printf("task queue empty\n"); - pool->status = EXITING; -// printf("pool status -> EXITING\n"); - -// printf("start wait busy threads -> %d\n", pool->busy_thr_num); - while (pool->busy_thr_num != 0) { + while (pool->busy_thr_num != 0) { // wait all threads exit pthread_cond_wait(&pool->without_busy_thread, &pool->mutex); } -// printf("all thread idle\n"); - pthread_mutex_unlock(&pool->mutex); // prevent other functions blocking waiting - -// printf("start sub threads joining\n"); - for (uint32_t i = 0; i < pool->thread_num; ++i) { pthread_cond_broadcast(&pool->task_queue_not_empty); // trigger idle threads -// printf("start join sub thread %lu\n", pool->threads[i]); pthread_join(pool->threads[i], NULL); -// printf("sub thread %lu joined\n", pool->threads[i]); } - -// printf("sub threads join complete\n"); - - free_tiny_pool(pool); - + free_tiny_pool(pool); // release thread pool return true; } - -void* run_pool_join(void *pool) { -// printf("run pool join from detach\n"); - tiny_pool_join((pool_t*)pool); -// printf("pool join complete\n"); - pthread_exit(NULL); -} - -void tiny_pool_detach(pool_t *pool) { - pthread_t tid; -// printf("run pool detach\n"); - pthread_create(&tid, NULL, run_pool_join, (void*)pool); - pthread_detach(tid); -} - -void tiny_pool_kill(pool_t *pool) { -// printf("run pool kill\n"); - if (pool->status > PREPARING) { -// printf("kill sub threads\n"); - for (uint32_t i = 0; i < pool->thread_num; ++i) { - pthread_cancel(pool->threads[i]); - } -// printf("kill complete\n"); - } - free_tiny_pool(pool); -}