From 43e0a55ee57996ed0ebe0d3f2d9d232efca828d6 Mon Sep 17 00:00:00 2001 From: Dnomd343 Date: Fri, 3 Feb 2023 14:38:52 +0800 Subject: [PATCH] update: using global struct mutex --- tiny_pool.c | 173 ++++++++++++++++++++++++++++++++++++---------------- tiny_pool.h | 11 ++-- 2 files changed, 129 insertions(+), 55 deletions(-) diff --git a/tiny_pool.c b/tiny_pool.c index a4ae63c..0083639 100644 --- a/tiny_pool.c +++ b/tiny_pool.c @@ -26,39 +26,48 @@ pool_t* tiny_pool_create(uint32_t size) { pool->task_queue_front = NULL; /// thread mutex initialization - if (pthread_mutex_init(&pool->status_mutex, NULL)) { - free(pool->threads); - free(pool); - return NULL; // status mutex init error -> stop create - } - if (pthread_mutex_init(&pool->task_queue_busy, NULL)) { - pthread_mutex_destroy(&pool->status_mutex); - free(pool->threads); - free(pool); - return NULL; // queue mutex init error -> stop create - } - if (pthread_mutex_init(&pool->busy_thr_num_mutex, NULL)) { - pthread_mutex_destroy(&pool->task_queue_busy); - pthread_mutex_destroy(&pool->status_mutex); - free(pool->threads); - free(pool); - return NULL; // busy thread num mutex init error -> stop create - } + + pthread_mutex_init(&pool->mutex, NULL); + +// if (pthread_mutex_init(&pool->status_mutex, NULL)) { +// free(pool->threads); +// free(pool); +// return NULL; // status mutex init error -> stop create +// } +// if (pthread_mutex_init(&pool->task_queue_busy, NULL)) { +// pthread_mutex_destroy(&pool->status_mutex); +// free(pool->threads); +// free(pool); +// return NULL; // queue mutex init error -> stop create +// } +// if (pthread_mutex_init(&pool->busy_thr_num_mutex, NULL)) { +// pthread_mutex_destroy(&pool->task_queue_busy); +// pthread_mutex_destroy(&pool->status_mutex); +// free(pool->threads); +// free(pool); +// return NULL; // busy thread num mutex init error -> stop create +// } /// thread condition variable initialization if (pthread_cond_init(&pool->task_queue_empty, NULL)) { - pthread_mutex_destroy(&pool->busy_thr_num_mutex); - pthread_mutex_destroy(&pool->task_queue_busy); - pthread_mutex_destroy(&pool->status_mutex); +// pthread_mutex_destroy(&pool->busy_thr_num_mutex); +// pthread_mutex_destroy(&pool->task_queue_busy); +// pthread_mutex_destroy(&pool->status_mutex); + + pthread_mutex_destroy(&pool->mutex); + free(pool->threads); free(pool); return NULL; // pthread cond init error -> stop create } if (pthread_cond_init(&pool->task_queue_not_empty, NULL)) { pthread_cond_destroy(&pool->task_queue_empty); - pthread_mutex_destroy(&pool->busy_thr_num_mutex); - pthread_mutex_destroy(&pool->task_queue_busy); - pthread_mutex_destroy(&pool->status_mutex); +// pthread_mutex_destroy(&pool->busy_thr_num_mutex); +// pthread_mutex_destroy(&pool->task_queue_busy); +// pthread_mutex_destroy(&pool->status_mutex); + + pthread_mutex_destroy(&pool->mutex); + free(pool->threads); free(pool); return NULL; @@ -70,7 +79,10 @@ void task_queue_push(pool_t *pool, task_t *task) { printf("push one task\n"); - pthread_mutex_lock(&pool->task_queue_busy); // lock task queue +// pthread_mutex_lock(&pool->task_queue_busy); // lock task queue + + printf("start push process\n"); + if (pool->task_queue_rear == NULL) { // task queue is empty pool->task_queue_front = task; pool->task_queue_rear = task; // init task queue with one element @@ -82,27 +94,40 @@ void task_queue_push(pool_t *pool, task_t *task) { printf("push success -> size = %d\n", pool->task_queue_size); - pthread_mutex_unlock(&pool->task_queue_busy); // unlock task queue - if (pool->status > PREPARING) { // avoid send signal in PREPARING stage +// bool signal_flag = false; +// if (pool->status > PREPARING) { +// signal_flag = true; +// } - printf("signal -> queue not empty\n"); +// pthread_mutex_unlock(&pool->task_queue_busy); // unlock task queue + pthread_mutex_unlock(&pool->mutex); // unlock task queue - pthread_cond_signal(&pool->task_queue_not_empty); // active one blocking thread - } +// if (pool->status > PREPARING) { // avoid send signal in PREPARING stage + +// if (signal_flag) { +// +// printf("signal -> queue not empty\n"); +// +// pthread_cond_signal(&pool->task_queue_not_empty); // active one blocking thread +// } } task_t* task_queue_pop(pool_t *pool) { // pop one task with blocking wait printf("%lu -> try pop one task\n", pthread_self()); - pthread_mutex_lock(&pool->task_queue_busy); // lock task queue +// pthread_mutex_lock(&pool->task_queue_busy); // lock task queue + pthread_mutex_lock(&pool->mutex); // lock task queue + while (pool->task_queue_front == NULL) { // loop until task queue not empty printf("%lu -> pop start wait\n", pthread_self()); // TODO: at EXITING process may receive active broadcast -> we should stop pop task here // should we cancel thread here directly, or return NULL for sub thread loop? - pthread_cond_wait(&pool->task_queue_not_empty, &pool->task_queue_busy); // wait new task added +// pthread_cond_wait(&pool->task_queue_not_empty, &pool->task_queue_busy); // wait new task added + pthread_cond_wait(&pool->task_queue_not_empty, &pool->mutex); // wait new task added + // TODO: for now, it seems that first one is more suitable printf("%lu -> pop exit wait\n", pthread_self()); @@ -121,9 +146,12 @@ task_t* task_queue_pop(pool_t *pool) { // pop one task with blocking wait } --pool->task_queue_size; + ++pool->busy_thr_num; // task must pop by one ready thread + printf("%lu -> pop success -> size = %d\n", pthread_self(), pool->task_queue_size); - pthread_mutex_unlock(&pool->task_queue_busy); // unlock task queue +// pthread_mutex_unlock(&pool->task_queue_busy); // unlock task queue + pthread_mutex_unlock(&pool->mutex); // unlock task queue if (queue_empty) { // send signal after mutex unlocked pthread_cond_signal(&pool->task_queue_empty); // active blocking pool join thread @@ -132,23 +160,50 @@ task_t* task_queue_pop(pool_t *pool) { // pop one task with blocking wait } bool tiny_pool_submit(pool_t *pool, void (*func)(void*), void *arg) { + + // TODO: pre-check status here: if > RUNNING --> return false + task_t *new_task = (task_t*)malloc(sizeof(task_t)); if (new_task == NULL) { return false; // malloc new task error -> stop submit } - new_task->func = func; // load custom task + new_task->entry = func; // load custom task new_task->arg = arg; new_task->next = NULL; // TODO: warning -> check dead lock here - pthread_mutex_lock(&pool->status_mutex); + +// pthread_mutex_lock(&pool->status_mutex); + pthread_mutex_lock(&pool->mutex); + + // pthread_mutex_lock(&pool->mutex); // lock task queue + if (pool->status > RUNNING) { + + pthread_mutex_unlock(&pool->mutex); + free(new_task); + return false; // adding task is prohibited after STOPPING } else { task_queue_push(pool, new_task); // push into task queue } - pthread_mutex_unlock(&pool->status_mutex); + + bool signal_flag = false; + if (pool->status > PREPARING) { + signal_flag = true; + } + +// pthread_mutex_unlock(&pool->status_mutex); + pthread_mutex_unlock(&pool->mutex); + + if (signal_flag) { + + printf("signal -> queue not empty\n"); + + pthread_cond_signal(&pool->task_queue_not_empty); // active one blocking thread + } + return true; // task push success } @@ -163,21 +218,27 @@ void* thread_entry(void *pool_ptr) { // main loop for sub thread task_t *task = task_queue_pop(pool); // pop one task -> blocking function - pthread_mutex_lock(&pool->busy_thr_num_mutex); - ++pool->busy_thr_num; // change busy thread number - pthread_mutex_unlock(&pool->busy_thr_num_mutex); - task->func(task->arg); // start running task function +// pthread_mutex_lock(&pool->busy_thr_num_mutex); +// ++pool->busy_thr_num; // change busy thread number +// pthread_mutex_unlock(&pool->busy_thr_num_mutex); - pthread_mutex_lock(&pool->busy_thr_num_mutex); - --pool->busy_thr_num; // change busy thread number - pthread_mutex_unlock(&pool->busy_thr_num_mutex); + task->entry(task->arg); // start running task function free(task); // free allocated memory + +// pthread_mutex_lock(&pool->busy_thr_num_mutex); + pthread_mutex_lock(&pool->mutex); + --pool->busy_thr_num; // change busy thread number + pthread_mutex_unlock(&pool->mutex); +// pthread_mutex_unlock(&pool->busy_thr_num_mutex); + } printf("%lu -> sub thread exit\n", pthread_self()); + pthread_exit(NULL); + return NULL; // sub thread exit } @@ -186,14 +247,20 @@ void tiny_pool_boot(pool_t *pool) { // TODO: avoid booting multi-times - pthread_mutex_lock(&pool->status_mutex); + // TODO: pre-check pool status: if != PREPARING --> return + +// pthread_mutex_lock(&pool->status_mutex); + pthread_mutex_lock(&pool->mutex); if (pool->status != PREPARING) { - pthread_mutex_unlock(&pool->status_mutex); +// pthread_mutex_unlock(&pool->status_mutex); + pthread_mutex_unlock(&pool->mutex); return; } - pthread_mutex_lock(&pool->task_queue_busy); + pool->status = RUNNING; + +// pthread_mutex_lock(&pool->task_queue_busy); for (uint32_t i = 0; i < pool->thread_num; ++i) { pthread_create(&(pool->threads[i]), NULL, thread_entry, (void*)pool); @@ -201,10 +268,10 @@ void tiny_pool_boot(pool_t *pool) { printf("thread boot complete\n"); - pool->status = RUNNING; +// pthread_mutex_unlock(&pool->task_queue_busy); +// pthread_mutex_unlock(&pool->status_mutex); - pthread_mutex_unlock(&pool->task_queue_busy); - pthread_mutex_unlock(&pool->status_mutex); + pthread_mutex_unlock(&pool->mutex); } @@ -221,11 +288,13 @@ bool tiny_pool_join(pool_t *pool) { printf("start pool join\n"); - pthread_mutex_lock(&pool->status_mutex); +// pthread_mutex_lock(&pool->status_mutex); + pthread_mutex_lock(&pool->mutex); if (pool->status != RUNNING) { - pthread_mutex_unlock(&pool->status_mutex); +// pthread_mutex_unlock(&pool->status_mutex); + pthread_mutex_unlock(&pool->mutex); return false; @@ -233,6 +302,8 @@ bool tiny_pool_join(pool_t *pool) { pool->status = STOPPING; + pthread_mutex_unlock(&pool->mutex); + // TODO: join process return true; diff --git a/tiny_pool.h b/tiny_pool.h index a9b5497..ceab0ed 100644 --- a/tiny_pool.h +++ b/tiny_pool.h @@ -53,25 +53,28 @@ enum pool_status { }; typedef struct task_t { - void (*func)(void*); // function pointer of the task + void (*entry)(void*); // function pointer of the task void *arg; // argument of task function struct task_t *next; // the next task in the queue } task_t; typedef struct pool_t { + + pthread_mutex_t mutex; // global pool mutex + pthread_t *threads; // store thread id uint32_t thread_num; // number of threads enum pool_status status; // life cycle state - pthread_mutex_t status_mutex; // mutex for `status` +// pthread_mutex_t status_mutex; // mutex for `status` uint32_t busy_thr_num; // number of working threads - pthread_mutex_t busy_thr_num_mutex; // mutex for `busy_thr_num` +// pthread_mutex_t busy_thr_num_mutex; // mutex for `busy_thr_num` task_t *task_queue_front; // head of task queue task_t *task_queue_rear; // end of task queue uint32_t task_queue_size; // size of task queue - pthread_mutex_t task_queue_busy; // mutex for `task_queue_xxx` +// pthread_mutex_t task_queue_busy; // mutex for `task_queue_xxx` pthread_cond_t task_queue_empty; // condition for task queue becomes empty pthread_cond_t task_queue_not_empty; // condition for task queue becomes not empty