diff --git a/.gitignore b/.gitignore index 4475b96..8cc57cd 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ +/.idea/ /cmake-build-debug/ /cmake-build-release/ diff --git a/main.c b/main.c index f3faefc..6fd3b77 100644 --- a/main.c +++ b/main.c @@ -1,24 +1,16 @@ #include -#include #include #include "tiny_pool.h" void demo_fun(void *i) { - - int k = *(int*)i; - - printf("demo func sleep %ds\n", k); - - sleep(k); - - printf("demo func %d wake up\n", k); - + int num = *(int*)i; + printf("demo func sleep %ds\n", num); + sleep(num); + printf("demo func %d wake up\n", num); } int main() { -// pthread_t tid; - pool_t *pool = tiny_pool_create(2); int dat[] = {1, 2, 3, 4, 5, 6, 7, 8}; @@ -41,12 +33,12 @@ int main() { // TODO: tiny pool join - printf("pool try exit\n"); - tiny_pool_kill(pool); +// printf("pool try exit\n"); +// tiny_pool_kill(pool); // TODO: tiny pool destroy - sleep(20); + sleep(10); printf("main exit\n"); diff --git a/tiny_pool.c b/tiny_pool.c index e40c4cb..f78e2e9 100644 --- a/tiny_pool.c +++ b/tiny_pool.c @@ -4,16 +4,26 @@ pool_t* tiny_pool_create(uint32_t size) { + // TODO: return NULL when error + printf("create thread pool size -> %d\n", size); pool_t *pool = (pool_t*)malloc(sizeof(pool_t)); -// for (int i = 0; i < 8; ++i) { -// pool->threads[i] = 0; -// } + if (pool == NULL) { + return NULL; + } pool->thread_num = size; pool->threads = (pthread_t*)malloc(sizeof(pthread_t) * size); + + if (pool->threads == NULL) { + + free(pool); + + return NULL; + } + memset(pool->threads, 0, sizeof(pthread_t) * size); pool->status = PREPARING; @@ -27,7 +37,7 @@ pool_t* tiny_pool_create(uint32_t size) { pool->task_queue_size = 0; pthread_mutex_init(&pool->task_queue_busy, NULL); -// pthread_cond_init(&pool->task_queue_empty, NULL); + pthread_cond_init(&pool->task_queue_empty, NULL); pthread_cond_init(&pool->task_queue_not_empty, NULL); return pool; @@ -65,17 +75,25 @@ void task_queue_push(pool_t *pool, task_t *task) { // TODO: send cond signal - printf("send signal -> queue not empty\n"); +// printf("send signal -> queue not empty\n"); +// pthread_cond_signal(&pool->task_queue_not_empty); - pthread_cond_signal(&pool->task_queue_not_empty); + printf("send broadcast -> queue not empty\n"); + pthread_cond_broadcast(&pool->task_queue_not_empty); } - } void tiny_pool_submit(pool_t *pool, void (*func)(void*), void *arg) { + // check status -> failed + if (pool->status == EXITING) { + return; + // TODO: return false here + } + + // TODO: malloc error -> return bool false task_t *new_task = (task_t*)malloc(sizeof(task_t)); new_task->func = func; @@ -85,6 +103,10 @@ void tiny_pool_submit(pool_t *pool, void (*func)(void*), void *arg) { // TODO: new task push into task queue task_queue_push(pool, new_task); + // TODO: queue push may failed -> return false + + // TODO: return bool true + } task_t* task_queue_pop(pool_t *pool) { @@ -101,9 +123,11 @@ task_t* task_queue_pop(pool_t *pool) { // TODO: wait new task added pthread_cond_wait(&pool->task_queue_not_empty, &pool->task_queue_busy); + printf("pop exit wait\n"); + } - printf("pop exit wait\n"); + printf("pop exit loop\n"); task_t *front = pool->task_queue_front; @@ -113,6 +137,9 @@ task_t* task_queue_pop(pool_t *pool) { pool->task_queue_front = NULL; pool->task_queue_rear = NULL; + /// will it cause dead lock? + pthread_cond_signal(&pool->task_queue_empty); + } else { pool->task_queue_front = front->next; @@ -192,15 +219,34 @@ void tiny_pool_boot(pool_t *pool) { } -void tiny_pool_kill(pool_t *pool) { - - printf("pool enter EXITING status\n"); +//void tiny_pool_kill(pool_t *pool) { +// +// printf("pool enter EXITING status\n"); +// +// pthread_mutex_lock(&pool->status_changing); +// +// pool->status = EXITING; +// +// pthread_mutex_unlock(&pool->status_changing); +// +//} - pthread_mutex_lock(&pool->status_changing); +void tiny_pool_wait(pool_t *pool) { - pool->status = EXITING; + // TODO: wait all tasks exit - pthread_mutex_unlock(&pool->status_changing); + // TODO: check `busy_thread_num` == 0 and queue empty } +void tiny_pool_join(pool_t *pool) { + + // TODO: set status -> JOINING -> avoid submit + + // TODO: wait --until--> queue empty + + // TODO: set status -> EXITING -> some thread may exit + + // TODO: signal broadcast -> wait all thread exit + +} diff --git a/tiny_pool.h b/tiny_pool.h index 871d32a..e0e5e9b 100644 --- a/tiny_pool.h +++ b/tiny_pool.h @@ -3,55 +3,107 @@ #include #include - -typedef struct task_t { - void (*func)(void*); - void *arg; - struct task_t *next; -} task_t; - -// TODO: thread pool status -> preparing / running / exiting / exited -enum tiny_pool_status { +#include + +/// This is a lightweight thread pool designed for linux. It is implemented in C language. Its +/// goal is to provide simple and stable services, so it does not provide functions such as priority +/// and dynamic adjustment of the number of threads, and only provides the simplest threads pool +/// implementation. + +/// The thread pool allows users to handle multiple tasks conveniently, and saves the additional +/// overhead of thread creation and destruction, which is very useful in tasks that require a large +/// number of short-term concurrent executions. There are four life cycle phases in this tiny thread +/// pool design: +/// +/// + PREPARING: At this stage, the thread pool is in a ready state, and tasks can be added at +/// this time, but will not be run. +/// +/// + RUNNING: The thread pool runs at this stage, all tasks will be assigned to each thread for +/// execution in sequence, and new tasks can still be added. +/// +/// + STOPPING: The thread pool is about to be closed, and new tasks are no longer allowed at +/// this time, and the remaining tasks will be completed one by one. +/// +/// + EXITING: At this point, there are no pending tasks, only working threads. When all threads +/// complete their tasks, the thread pool will be destroyed. +/// +/// These four life cycles must proceed sequentially: PREPARING -> RUNNING -> STOPPING -> EXITING + +/// When using it, you need to use `tiny_pool_create` to create a thread pool first, and then use +/// the `tiny_pool_submit` function to submit tasks to it. After the preparation is completed, use +/// `tiny_pool_boot` to start the operation of the thread pool, and then you can also add other tasks. + +/// When preparing to exit, you have two options. The first is to use `tiny_pool_join`, which will +/// block and wait for all remaining tasks to be completed, then destroy the threads and free the +/// memory. The second is to use `tiny_pool_detach`, which will detach the thread pool and perform +/// the same behavior as the former after all tasks are completed. It is worth noting that after +/// running these two functions, no tasks can be added to this thread pool, and you should no longer +/// perform any operations on the thread pool. + +/// In addition, there is a `tiny_pool_kill`, this command will directly clear all tasks, kill all +/// threads, all ongoing work will be canceled, it should only be used in emergency situations (such +/// as a fatal error in the main program). In other cases, it is recommended to use `tiny_pool_join` +/// or `tiny_pool_detach` interface. + +enum pool_status { PREPARING, RUNNING, + STOPPING, EXITING, }; -typedef struct { - - pthread_t *threads; - uint32_t thread_num; +typedef struct task_t { + void (*func)(void*); // function pointer of the task + void *arg; // argument of task function + struct task_t *next; // the next task in the queue +} task_t; - enum tiny_pool_status status; - pthread_mutex_t status_changing; +typedef struct pool_t { + pthread_t *threads; // store thread id + uint32_t thread_num; // number of threads - uint32_t busy_thread_num; - pthread_mutex_t busy_thread_num_mutex; + enum pool_status status; // life cycle state + pthread_mutex_t status_mutex; // mutex for `status` - task_t *task_queue_front; - task_t *task_queue_rear; - uint32_t task_queue_size; - pthread_mutex_t task_queue_busy; + uint32_t busy_thr_num; // number of working threads + pthread_mutex_t busy_thr_num_mutex; // mutex for `busy_thr_num` -// pthread_cond_t task_queue_empty; - pthread_cond_t task_queue_not_empty; + task_t *task_queue_front; // head of task queue + task_t *task_queue_rear; // end of task queue + uint32_t task_queue_size; // size of task queue + pthread_mutex_t task_queue_busy; // mutex for `task_queue_xxx` + pthread_cond_t task_queue_empty; // condition for task queue becomes empty + pthread_cond_t task_queue_not_empty; // condition for task queue becomes not empty } pool_t; - +/// This function create a new thread pool, you need to specify the number of threads, +/// it will be in the `PREPARING` state, and return NULL on failure. pool_t* tiny_pool_create(uint32_t size); -void tiny_pool_submit(pool_t *pool, void (*func)(void*), void *arg); +/// Submit a task to the specified thread pool, and the task is a function pointer (it +/// receives a void pointer and has no return value) and a parameter. It will return +/// true if the commit was successful, and false otherwise. +bool tiny_pool_submit(pool_t *pool, void (*func)(void*), void *arg); -// TODO: confirm just run once +/// This function will start the specified thread pool and change its state from +/// `PREPARING` to `RUNNING`. If the thread pool is already in non `PREPARING` at runtime, +/// it will have no effect. void tiny_pool_boot(pool_t *pool); -void tiny_pool_kill(pool_t *pool); +/// This function will change the thread pool from `RUNNING` to `STOPPING`, and enter the +/// `EXITING` state when the queue is empty, likewise, if the state is non `RUNNING` when +/// entered, it will also have no effect. All tasks will automatically free resources after +/// completion. Note that it is blocking and may take a considerable amount of time. +void tiny_pool_join(pool_t *pool); -// TODO: destroy method +/// It is basically the same as `tiny_pool_join` in function, the difference is that it is +/// non-blocking, that is, it will automatically handle the remaining tasks and complete +/// resource free process after execution. +void tiny_pool_detach(pool_t *pool); -// pool join -> handle to remain tasks -> return when queue empty and not thread working - -// pool destroy -> only wait current working task -> ignore waiting tasks in queue -> free memory and exit +/// This function will forcibly clear the task queue and reclaim all resources. Note that +/// this will cause the interruption of running tasks and the loss of un-running tasks. +void tiny_pool_kill(pool_t *pool); #endif