From 5076e5a3ce86a30651b9db4950dcfd1a912a5beb Mon Sep 17 00:00:00 2001 From: Dnomd343 Date: Fri, 3 Feb 2023 16:34:40 +0800 Subject: [PATCH] feat: thread pool joining --- main.c | 26 ++++++++++--------- tiny_pool.c | 74 +++++++++++++++++++++++++++++++++++------------------ tiny_pool.h | 1 + 3 files changed, 64 insertions(+), 37 deletions(-) diff --git a/main.c b/main.c index e0a0955..94c022a 100644 --- a/main.c +++ b/main.c @@ -4,12 +4,12 @@ void demo_fun(void *i) { int num = *(int*)i; - printf("task %d start\n", num); + printf(" task %d start\n", num); for (int t = 0; t < num; ++t) { sleep(1); - printf("task %d running...\n", num); + printf(" task %d running...\n", num); } - printf("task %d complete\n", num); + printf(" task %d complete\n", num); } int main() { @@ -20,32 +20,34 @@ int main() { tiny_pool_submit(pool, demo_fun, (void*)&dat[0]); tiny_pool_submit(pool, demo_fun, (void*)&dat[1]); - printf("main: pool booting\n"); + printf("+ main: pool booting\n"); tiny_pool_boot(pool); - printf("main: pool boot complete\n"); + printf("+ main: pool boot complete\n"); - printf("main: sleep 5s\n"); + printf("+ main: sleep 5s\n"); sleep(5); - printf("main: wake up\n"); + printf("+ main: wake up\n"); tiny_pool_submit(pool, demo_fun, (void*)&dat[2]); tiny_pool_submit(pool, demo_fun, (void*)&dat[3]); tiny_pool_submit(pool, demo_fun, (void*)&dat[4]); - printf("main: sleep 8s\n"); + printf("+ main: sleep 8s\n"); sleep(6); - printf("main: wake up\n"); + printf("+ main: wake up\n"); - // TODO: tiny pool join + printf("+ main: pool joining\n"); + tiny_pool_join(pool); + printf("+ main: pool join complete\n"); // printf("pool try exit\n"); // tiny_pool_kill(pool); // TODO: tiny pool destroy - sleep(10); +// sleep(10); - printf("main exit\n"); + printf("+ main exit\n"); return 0; } diff --git a/tiny_pool.c b/tiny_pool.c index 9c6c8be..cb70e41 100644 --- a/tiny_pool.c +++ b/tiny_pool.c @@ -44,6 +44,14 @@ pool_t* tiny_pool_create(uint32_t size) { free(pool); return NULL; // task queue cond init error -> revoke create } + if (pthread_cond_init(&pool->without_busy_thread, NULL)) { + pthread_cond_destroy(&pool->task_queue_not_empty); + pthread_cond_destroy(&pool->task_queue_empty); + pthread_mutex_destroy(&pool->mutex); + free(pool->threads); + free(pool); + return NULL; // busy thread num cond init error -> revoke create + } return pool; // tiny thread pool create success } @@ -70,6 +78,7 @@ task_t* task_queue_pop(pool_t *pool) { // pop one task with blocking wait pthread_cond_wait(&pool->task_queue_not_empty, &pool->mutex); // wait new task added printf("%lu -> pop exit wait\n", pthread_self()); if (pool->status == EXITING) { + printf("%lu -> sub thread exit from idle\n", pthread_self()); pthread_exit(NULL); // sub thread exit at EXITING stage } } @@ -146,7 +155,11 @@ void* thread_entry(void *pool_ptr) { // main loop for sub thread /// mark thread as idle pthread_mutex_lock(&pool->mutex); --pool->busy_thr_num; // change busy thread number + bool busy_flag = (pool->busy_thr_num == 0); pthread_mutex_unlock(&pool->mutex); + if (busy_flag) { + pthread_cond_signal(&pool->without_busy_thread); + } } printf("%lu -> sub thread exit\n", pthread_self()); pthread_exit(NULL); // sub thread exit @@ -158,7 +171,7 @@ void tiny_pool_boot(pool_t *pool) { return; // only allow to boot at PREPARING stage } - /// handle pool thread booting + /// handle pool booting pthread_mutex_lock(&pool->mutex); if (pool->status != PREPARING) { pthread_mutex_unlock(&pool->mutex); @@ -173,42 +186,53 @@ void tiny_pool_boot(pool_t *pool) { } bool tiny_pool_join(pool_t *pool) { - - // TODO: set status -> JOINING -> avoid submit - - // TODO: wait --until--> queue empty - - // TODO: set status -> EXITING -> some thread may exit - - // TODO: signal broadcast -> wait all thread exit - printf("start pool join\n"); -// pthread_mutex_lock(&pool->status_mutex); - pthread_mutex_lock(&pool->mutex); - + /// pre-check to avoid invalid mutex waiting if (pool->status != RUNNING) { - -// pthread_mutex_unlock(&pool->status_mutex); - pthread_mutex_unlock(&pool->mutex); - return false; + } + /// handle pool threads joining + pthread_mutex_lock(&pool->mutex); + if (pool->status != RUNNING) { + pthread_mutex_unlock(&pool->mutex); + return false; // only allow to join at RUNNING stage } pool->status = STOPPING; + printf("pool status -> STOPPING\n"); + printf("wait task queue\n"); + while (pool->task_queue_front != NULL) { + pthread_cond_wait(&pool->task_queue_empty, &pool->mutex); + } + printf("task queue empty\n"); - pthread_mutex_unlock(&pool->mutex); - // TODO: join process + pool->status = EXITING; + printf("pool status -> EXITING\n"); - return true; -} + printf("start wait busy threads -> %d\n", pool->busy_thr_num); + while (pool->busy_thr_num != 0) { + pthread_cond_wait(&pool->without_busy_thread, &pool->mutex); + } + printf("all thread idle\n"); -//void tiny_pool_wait(pool_t *pool) { + pthread_mutex_unlock(&pool->mutex); // prevent other functions blocking waiting - // TODO: wait all tasks exit - // TODO: check `busy_thread_num` == 0 and queue empty + printf("unlock mutex and send broadcast\n"); +// pthread_cond_broadcast(&pool->task_queue_not_empty); // send broadcast to trigger idle threads -//} + // TODO: signal broadcast and wait all thread exit + + printf("start sub threads joining\n"); + for (uint32_t i = 0; i < pool->thread_num; ++i) { + pthread_cond_broadcast(&pool->task_queue_not_empty); // trigger idle threads + pthread_join(pool->threads[i], NULL); + printf("sub thread %lu joined\n", pool->threads[i]); + } + printf("sub threads join complete\n"); + + return true; +} diff --git a/tiny_pool.h b/tiny_pool.h index 826598e..4ee507c 100644 --- a/tiny_pool.h +++ b/tiny_pool.h @@ -72,6 +72,7 @@ typedef struct pool_t { pthread_cond_t task_queue_empty; // condition for task queue becomes empty pthread_cond_t task_queue_not_empty; // condition for task queue becomes not empty + pthread_cond_t without_busy_thread; // condition for busy thread number becomes zero } pool_t; /// This function create a new thread pool, you need to specify the number of threads,