From 35c2b3d7be5d279af9da10b0cff2a2223037aac1 Mon Sep 17 00:00:00 2001 From: Dnomd343 Date: Fri, 3 Feb 2023 15:50:22 +0800 Subject: [PATCH] perf: pool submit, boot and sub thread process --- main.c | 21 ++++-- tiny_pool.c | 206 +++++++++++++--------------------------------------- tiny_pool.h | 10 +-- 3 files changed, 66 insertions(+), 171 deletions(-) diff --git a/main.c b/main.c index 6fd3b77..e0a0955 100644 --- a/main.c +++ b/main.c @@ -4,9 +4,12 @@ void demo_fun(void *i) { int num = *(int*)i; - printf("demo func sleep %ds\n", num); - sleep(num); - printf("demo func %d wake up\n", num); + printf("task %d start\n", num); + for (int t = 0; t < num; ++t) { + sleep(1); + printf("task %d running...\n", num); + } + printf("task %d complete\n", num); } int main() { @@ -17,19 +20,21 @@ int main() { tiny_pool_submit(pool, demo_fun, (void*)&dat[0]); tiny_pool_submit(pool, demo_fun, (void*)&dat[1]); - printf("pool booting\n"); + printf("main: pool booting\n"); tiny_pool_boot(pool); - printf("pool running\n"); + printf("main: pool boot complete\n"); - printf("main thread sleep\n"); + printf("main: sleep 5s\n"); sleep(5); - printf("main thread wake up\n"); + printf("main: wake up\n"); tiny_pool_submit(pool, demo_fun, (void*)&dat[2]); tiny_pool_submit(pool, demo_fun, (void*)&dat[3]); tiny_pool_submit(pool, demo_fun, (void*)&dat[4]); - sleep(8); + printf("main: sleep 8s\n"); + sleep(6); + printf("main: wake up\n"); // TODO: tiny pool join diff --git a/tiny_pool.c b/tiny_pool.c index 0083639..9c6c8be 100644 --- a/tiny_pool.c +++ b/tiny_pool.c @@ -6,7 +6,7 @@ pool_t* tiny_pool_create(uint32_t size) { /// thread pool struct create pool_t *pool = (pool_t*)malloc(sizeof(pool_t)); if (pool == NULL) { - return NULL; // malloc pool failed -> stop create + return NULL; // malloc pool failed -> revoke create } /// threads memory initialize @@ -14,7 +14,7 @@ pool_t* tiny_pool_create(uint32_t size) { pool->threads = (pthread_t*)malloc(sizeof(pthread_t) * size); if (pool->threads == NULL) { free(pool); - return NULL; // malloc threads failed -> stop create + return NULL; // malloc threads failed -> revoke create } memset(pool->threads, 0, sizeof(pthread_t) * size); // clean thread ids as zero @@ -25,64 +25,30 @@ pool_t* tiny_pool_create(uint32_t size) { pool->task_queue_rear = NULL; pool->task_queue_front = NULL; - /// thread mutex initialization - - pthread_mutex_init(&pool->mutex, NULL); - -// if (pthread_mutex_init(&pool->status_mutex, NULL)) { -// free(pool->threads); -// free(pool); -// return NULL; // status mutex init error -> stop create -// } -// if (pthread_mutex_init(&pool->task_queue_busy, NULL)) { -// pthread_mutex_destroy(&pool->status_mutex); -// free(pool->threads); -// free(pool); -// return NULL; // queue mutex init error -> stop create -// } -// if (pthread_mutex_init(&pool->busy_thr_num_mutex, NULL)) { -// pthread_mutex_destroy(&pool->task_queue_busy); -// pthread_mutex_destroy(&pool->status_mutex); -// free(pool->threads); -// free(pool); -// return NULL; // busy thread num mutex init error -> stop create -// } - - /// thread condition variable initialization + /// mutex and conditions initialization + if (pthread_mutex_init(&pool->mutex, NULL)) { + free(pool->threads); + free(pool); + return NULL; // global mutex init error -> revoke create + } if (pthread_cond_init(&pool->task_queue_empty, NULL)) { -// pthread_mutex_destroy(&pool->busy_thr_num_mutex); -// pthread_mutex_destroy(&pool->task_queue_busy); -// pthread_mutex_destroy(&pool->status_mutex); - pthread_mutex_destroy(&pool->mutex); - free(pool->threads); free(pool); - return NULL; // pthread cond init error -> stop create + return NULL; // task queue cond init error -> revoke create } if (pthread_cond_init(&pool->task_queue_not_empty, NULL)) { pthread_cond_destroy(&pool->task_queue_empty); -// pthread_mutex_destroy(&pool->busy_thr_num_mutex); -// pthread_mutex_destroy(&pool->task_queue_busy); -// pthread_mutex_destroy(&pool->status_mutex); - pthread_mutex_destroy(&pool->mutex); - free(pool->threads); free(pool); - return NULL; + return NULL; // task queue cond init error -> revoke create } return pool; // tiny thread pool create success } void task_queue_push(pool_t *pool, task_t *task) { - - printf("push one task\n"); - -// pthread_mutex_lock(&pool->task_queue_busy); // lock task queue - - printf("start push process\n"); - + printf("push new task\n"); if (pool->task_queue_rear == NULL) { // task queue is empty pool->task_queue_front = task; pool->task_queue_rear = task; // init task queue with one element @@ -91,188 +57,119 @@ void task_queue_push(pool_t *pool, task_t *task) { pool->task_queue_rear = task; } ++pool->task_queue_size; - printf("push success -> size = %d\n", pool->task_queue_size); - -// bool signal_flag = false; -// if (pool->status > PREPARING) { -// signal_flag = true; -// } - -// pthread_mutex_unlock(&pool->task_queue_busy); // unlock task queue - pthread_mutex_unlock(&pool->mutex); // unlock task queue - -// if (pool->status > PREPARING) { // avoid send signal in PREPARING stage - -// if (signal_flag) { -// -// printf("signal -> queue not empty\n"); -// -// pthread_cond_signal(&pool->task_queue_not_empty); // active one blocking thread -// } } task_t* task_queue_pop(pool_t *pool) { // pop one task with blocking wait + printf("%lu -> pop one task\n", pthread_self()); - printf("%lu -> try pop one task\n", pthread_self()); - -// pthread_mutex_lock(&pool->task_queue_busy); // lock task queue - pthread_mutex_lock(&pool->mutex); // lock task queue - + /// wait until task queue not empty + pthread_mutex_lock(&pool->mutex); // lock pool struct while (pool->task_queue_front == NULL) { // loop until task queue not empty - printf("%lu -> pop start wait\n", pthread_self()); - - // TODO: at EXITING process may receive active broadcast -> we should stop pop task here - // should we cancel thread here directly, or return NULL for sub thread loop? -// pthread_cond_wait(&pool->task_queue_not_empty, &pool->task_queue_busy); // wait new task added pthread_cond_wait(&pool->task_queue_not_empty, &pool->mutex); // wait new task added - // TODO: for now, it seems that first one is more suitable - printf("%lu -> pop exit wait\n", pthread_self()); - + if (pool->status == EXITING) { + pthread_exit(NULL); // sub thread exit at EXITING stage + } } - printf("%lu -> pop new task\n", pthread_self()); - bool queue_empty = false; + /// pop first task from queue + bool empty_flag = false; task_t *front = pool->task_queue_front; - if (pool->task_queue_front == pool->task_queue_rear) { // only one element - pool->task_queue_front = NULL; // clear task queue - pool->task_queue_rear = NULL; - queue_empty = true; - } else { - pool->task_queue_front = front->next; // pop first task + pool->task_queue_front = front->next; // pop first task + if (front->next == NULL) { // only one element + pool->task_queue_rear = NULL; // clear task queue + empty_flag = true; } --pool->task_queue_size; - ++pool->busy_thr_num; // task must pop by one ready thread - printf("%lu -> pop success -> size = %d\n", pthread_self(), pool->task_queue_size); - -// pthread_mutex_unlock(&pool->task_queue_busy); // unlock task queue pthread_mutex_unlock(&pool->mutex); // unlock task queue - if (queue_empty) { // send signal after mutex unlocked - pthread_cond_signal(&pool->task_queue_empty); // active blocking pool join thread + /// send signal to active blocking thread + if (empty_flag) { // send signal after mutex unlocked + pthread_cond_signal(&pool->task_queue_empty); // active pool join thread } return front; // success pop one task } bool tiny_pool_submit(pool_t *pool, void (*func)(void*), void *arg) { + /// pre-check to avoid invalid mutex waiting + if (pool->status > RUNNING) { + return false; // allow to add task at PREPARING and RUNNING stage + } - // TODO: pre-check status here: if > RUNNING --> return false - + /// initialize task structure task_t *new_task = (task_t*)malloc(sizeof(task_t)); if (new_task == NULL) { - return false; // malloc new task error -> stop submit + return false; // malloc new task error -> failed submit } new_task->entry = func; // load custom task new_task->arg = arg; new_task->next = NULL; - // TODO: warning -> check dead lock here - -// pthread_mutex_lock(&pool->status_mutex); + /// handle task submit pthread_mutex_lock(&pool->mutex); - - // pthread_mutex_lock(&pool->mutex); // lock task queue - - if (pool->status > RUNNING) { - + if (pool->status > RUNNING) { // pool stage recheck after mutex lock pthread_mutex_unlock(&pool->mutex); - free(new_task); - - return false; // adding task is prohibited after STOPPING - } else { - task_queue_push(pool, new_task); // push into task queue + return false; // adding task is prohibited after RUNNING } - + task_queue_push(pool, new_task); // push into task queue bool signal_flag = false; - if (pool->status > PREPARING) { + if (pool->status > PREPARING) { // only send active signal at RUNNING stage signal_flag = true; } + pthread_mutex_unlock(&pool->mutex); // send signal after mutex unlock -> reduce thread churn -// pthread_mutex_unlock(&pool->status_mutex); - pthread_mutex_unlock(&pool->mutex); - + /// send signal to active blocking thread if (signal_flag) { - printf("signal -> queue not empty\n"); - pthread_cond_signal(&pool->task_queue_not_empty); // active one blocking thread } - return true; // task push success } void* thread_entry(void *pool_ptr) { // main loop for sub thread pool_t *pool = (pool_t*)pool_ptr; - - printf("start thread %lu\n", pthread_self()); - - while (pool->status != EXITING) { // loop until enter EXITING mode - + printf("%lu -> sub thread begin\n", pthread_self()); + while (pool->status != EXITING) { // loop until enter EXITING stage printf("%lu -> sub thread working\n", pthread_self()); + /// pop a task and execute it task_t *task = task_queue_pop(pool); // pop one task -> blocking function - - -// pthread_mutex_lock(&pool->busy_thr_num_mutex); -// ++pool->busy_thr_num; // change busy thread number -// pthread_mutex_unlock(&pool->busy_thr_num_mutex); - task->entry(task->arg); // start running task function - free(task); // free allocated memory -// pthread_mutex_lock(&pool->busy_thr_num_mutex); + /// mark thread as idle pthread_mutex_lock(&pool->mutex); --pool->busy_thr_num; // change busy thread number pthread_mutex_unlock(&pool->mutex); -// pthread_mutex_unlock(&pool->busy_thr_num_mutex); - } - printf("%lu -> sub thread exit\n", pthread_self()); - - pthread_exit(NULL); - - return NULL; // sub thread exit + pthread_exit(NULL); // sub thread exit } -// TODO: should we return a bool value? void tiny_pool_boot(pool_t *pool) { + /// pre-check to avoid invalid mutex waiting + if (pool->status != PREPARING) { + return; // only allow to boot at PREPARING stage + } - // TODO: avoid booting multi-times - - // TODO: pre-check pool status: if != PREPARING --> return - -// pthread_mutex_lock(&pool->status_mutex); + /// handle pool thread booting pthread_mutex_lock(&pool->mutex); - if (pool->status != PREPARING) { -// pthread_mutex_unlock(&pool->status_mutex); pthread_mutex_unlock(&pool->mutex); - return; + return; // only allow to boot at PREPARING stage } - pool->status = RUNNING; - -// pthread_mutex_lock(&pool->task_queue_busy); - - for (uint32_t i = 0; i < pool->thread_num; ++i) { + for (uint32_t i = 0; i < pool->thread_num; ++i) { // create working threads pthread_create(&(pool->threads[i]), NULL, thread_entry, (void*)pool); } - printf("thread boot complete\n"); - -// pthread_mutex_unlock(&pool->task_queue_busy); -// pthread_mutex_unlock(&pool->status_mutex); - pthread_mutex_unlock(&pool->mutex); - } bool tiny_pool_join(pool_t *pool) { @@ -285,7 +182,6 @@ bool tiny_pool_join(pool_t *pool) { // TODO: signal broadcast -> wait all thread exit - printf("start pool join\n"); // pthread_mutex_lock(&pool->status_mutex); diff --git a/tiny_pool.h b/tiny_pool.h index ceab0ed..826598e 100644 --- a/tiny_pool.h +++ b/tiny_pool.h @@ -45,7 +45,7 @@ /// as a fatal error in the main program). In other cases, it is recommended to use `tiny_pool_join` /// or `tiny_pool_detach` interface. -enum pool_status { +enum pool_stage { PREPARING = 0, RUNNING = 1, STOPPING = 2, @@ -59,22 +59,16 @@ typedef struct task_t { } task_t; typedef struct pool_t { - pthread_mutex_t mutex; // global pool mutex pthread_t *threads; // store thread id uint32_t thread_num; // number of threads - - enum pool_status status; // life cycle state -// pthread_mutex_t status_mutex; // mutex for `status` - uint32_t busy_thr_num; // number of working threads -// pthread_mutex_t busy_thr_num_mutex; // mutex for `busy_thr_num` + enum pool_stage status; // tiny pool life cycle stage task_t *task_queue_front; // head of task queue task_t *task_queue_rear; // end of task queue uint32_t task_queue_size; // size of task queue -// pthread_mutex_t task_queue_busy; // mutex for `task_queue_xxx` pthread_cond_t task_queue_empty; // condition for task queue becomes empty pthread_cond_t task_queue_not_empty; // condition for task queue becomes not empty