Browse Source

update: using global struct mutex

master
Dnomd343 1 year ago
parent
commit
43e0a55ee5
  1. 173
      tiny_pool.c
  2. 11
      tiny_pool.h

173
tiny_pool.c

@ -26,39 +26,48 @@ pool_t* tiny_pool_create(uint32_t size) {
pool->task_queue_front = NULL; pool->task_queue_front = NULL;
/// thread mutex initialization /// thread mutex initialization
if (pthread_mutex_init(&pool->status_mutex, NULL)) {
free(pool->threads); pthread_mutex_init(&pool->mutex, NULL);
free(pool);
return NULL; // status mutex init error -> stop create // if (pthread_mutex_init(&pool->status_mutex, NULL)) {
} // free(pool->threads);
if (pthread_mutex_init(&pool->task_queue_busy, NULL)) { // free(pool);
pthread_mutex_destroy(&pool->status_mutex); // return NULL; // status mutex init error -> stop create
free(pool->threads); // }
free(pool); // if (pthread_mutex_init(&pool->task_queue_busy, NULL)) {
return NULL; // queue mutex init error -> stop create // pthread_mutex_destroy(&pool->status_mutex);
} // free(pool->threads);
if (pthread_mutex_init(&pool->busy_thr_num_mutex, NULL)) { // free(pool);
pthread_mutex_destroy(&pool->task_queue_busy); // return NULL; // queue mutex init error -> stop create
pthread_mutex_destroy(&pool->status_mutex); // }
free(pool->threads); // if (pthread_mutex_init(&pool->busy_thr_num_mutex, NULL)) {
free(pool); // pthread_mutex_destroy(&pool->task_queue_busy);
return NULL; // busy thread num mutex init error -> stop create // pthread_mutex_destroy(&pool->status_mutex);
} // free(pool->threads);
// free(pool);
// return NULL; // busy thread num mutex init error -> stop create
// }
/// thread condition variable initialization /// thread condition variable initialization
if (pthread_cond_init(&pool->task_queue_empty, NULL)) { if (pthread_cond_init(&pool->task_queue_empty, NULL)) {
pthread_mutex_destroy(&pool->busy_thr_num_mutex); // pthread_mutex_destroy(&pool->busy_thr_num_mutex);
pthread_mutex_destroy(&pool->task_queue_busy); // pthread_mutex_destroy(&pool->task_queue_busy);
pthread_mutex_destroy(&pool->status_mutex); // pthread_mutex_destroy(&pool->status_mutex);
pthread_mutex_destroy(&pool->mutex);
free(pool->threads); free(pool->threads);
free(pool); free(pool);
return NULL; // pthread cond init error -> stop create return NULL; // pthread cond init error -> stop create
} }
if (pthread_cond_init(&pool->task_queue_not_empty, NULL)) { if (pthread_cond_init(&pool->task_queue_not_empty, NULL)) {
pthread_cond_destroy(&pool->task_queue_empty); pthread_cond_destroy(&pool->task_queue_empty);
pthread_mutex_destroy(&pool->busy_thr_num_mutex); // pthread_mutex_destroy(&pool->busy_thr_num_mutex);
pthread_mutex_destroy(&pool->task_queue_busy); // pthread_mutex_destroy(&pool->task_queue_busy);
pthread_mutex_destroy(&pool->status_mutex); // pthread_mutex_destroy(&pool->status_mutex);
pthread_mutex_destroy(&pool->mutex);
free(pool->threads); free(pool->threads);
free(pool); free(pool);
return NULL; return NULL;
@ -70,7 +79,10 @@ void task_queue_push(pool_t *pool, task_t *task) {
printf("push one task\n"); printf("push one task\n");
pthread_mutex_lock(&pool->task_queue_busy); // lock task queue // pthread_mutex_lock(&pool->task_queue_busy); // lock task queue
printf("start push process\n");
if (pool->task_queue_rear == NULL) { // task queue is empty if (pool->task_queue_rear == NULL) { // task queue is empty
pool->task_queue_front = task; pool->task_queue_front = task;
pool->task_queue_rear = task; // init task queue with one element pool->task_queue_rear = task; // init task queue with one element
@ -82,27 +94,40 @@ void task_queue_push(pool_t *pool, task_t *task) {
printf("push success -> size = %d\n", pool->task_queue_size); printf("push success -> size = %d\n", pool->task_queue_size);
pthread_mutex_unlock(&pool->task_queue_busy); // unlock task queue // bool signal_flag = false;
if (pool->status > PREPARING) { // avoid send signal in PREPARING stage // if (pool->status > PREPARING) {
// signal_flag = true;
// }
printf("signal -> queue not empty\n"); // pthread_mutex_unlock(&pool->task_queue_busy); // unlock task queue
pthread_mutex_unlock(&pool->mutex); // unlock task queue
pthread_cond_signal(&pool->task_queue_not_empty); // active one blocking thread // if (pool->status > PREPARING) { // avoid send signal in PREPARING stage
}
// if (signal_flag) {
//
// printf("signal -> queue not empty\n");
//
// pthread_cond_signal(&pool->task_queue_not_empty); // active one blocking thread
// }
} }
task_t* task_queue_pop(pool_t *pool) { // pop one task with blocking wait task_t* task_queue_pop(pool_t *pool) { // pop one task with blocking wait
printf("%lu -> try pop one task\n", pthread_self()); printf("%lu -> try pop one task\n", pthread_self());
pthread_mutex_lock(&pool->task_queue_busy); // lock task queue // pthread_mutex_lock(&pool->task_queue_busy); // lock task queue
pthread_mutex_lock(&pool->mutex); // lock task queue
while (pool->task_queue_front == NULL) { // loop until task queue not empty while (pool->task_queue_front == NULL) { // loop until task queue not empty
printf("%lu -> pop start wait\n", pthread_self()); printf("%lu -> pop start wait\n", pthread_self());
// TODO: at EXITING process may receive active broadcast -> we should stop pop task here // TODO: at EXITING process may receive active broadcast -> we should stop pop task here
// should we cancel thread here directly, or return NULL for sub thread loop? // should we cancel thread here directly, or return NULL for sub thread loop?
pthread_cond_wait(&pool->task_queue_not_empty, &pool->task_queue_busy); // wait new task added // pthread_cond_wait(&pool->task_queue_not_empty, &pool->task_queue_busy); // wait new task added
pthread_cond_wait(&pool->task_queue_not_empty, &pool->mutex); // wait new task added
// TODO: for now, it seems that first one is more suitable
printf("%lu -> pop exit wait\n", pthread_self()); printf("%lu -> pop exit wait\n", pthread_self());
@ -121,9 +146,12 @@ task_t* task_queue_pop(pool_t *pool) { // pop one task with blocking wait
} }
--pool->task_queue_size; --pool->task_queue_size;
++pool->busy_thr_num; // task must pop by one ready thread
printf("%lu -> pop success -> size = %d\n", pthread_self(), pool->task_queue_size); printf("%lu -> pop success -> size = %d\n", pthread_self(), pool->task_queue_size);
pthread_mutex_unlock(&pool->task_queue_busy); // unlock task queue // pthread_mutex_unlock(&pool->task_queue_busy); // unlock task queue
pthread_mutex_unlock(&pool->mutex); // unlock task queue
if (queue_empty) { // send signal after mutex unlocked if (queue_empty) { // send signal after mutex unlocked
pthread_cond_signal(&pool->task_queue_empty); // active blocking pool join thread pthread_cond_signal(&pool->task_queue_empty); // active blocking pool join thread
@ -132,23 +160,50 @@ task_t* task_queue_pop(pool_t *pool) { // pop one task with blocking wait
} }
bool tiny_pool_submit(pool_t *pool, void (*func)(void*), void *arg) { bool tiny_pool_submit(pool_t *pool, void (*func)(void*), void *arg) {
// TODO: pre-check status here: if > RUNNING --> return false
task_t *new_task = (task_t*)malloc(sizeof(task_t)); task_t *new_task = (task_t*)malloc(sizeof(task_t));
if (new_task == NULL) { if (new_task == NULL) {
return false; // malloc new task error -> stop submit return false; // malloc new task error -> stop submit
} }
new_task->func = func; // load custom task new_task->entry = func; // load custom task
new_task->arg = arg; new_task->arg = arg;
new_task->next = NULL; new_task->next = NULL;
// TODO: warning -> check dead lock here // TODO: warning -> check dead lock here
pthread_mutex_lock(&pool->status_mutex);
// pthread_mutex_lock(&pool->status_mutex);
pthread_mutex_lock(&pool->mutex);
// pthread_mutex_lock(&pool->mutex); // lock task queue
if (pool->status > RUNNING) { if (pool->status > RUNNING) {
pthread_mutex_unlock(&pool->mutex);
free(new_task); free(new_task);
return false; // adding task is prohibited after STOPPING return false; // adding task is prohibited after STOPPING
} else { } else {
task_queue_push(pool, new_task); // push into task queue task_queue_push(pool, new_task); // push into task queue
} }
pthread_mutex_unlock(&pool->status_mutex);
bool signal_flag = false;
if (pool->status > PREPARING) {
signal_flag = true;
}
// pthread_mutex_unlock(&pool->status_mutex);
pthread_mutex_unlock(&pool->mutex);
if (signal_flag) {
printf("signal -> queue not empty\n");
pthread_cond_signal(&pool->task_queue_not_empty); // active one blocking thread
}
return true; // task push success return true; // task push success
} }
@ -163,21 +218,27 @@ void* thread_entry(void *pool_ptr) { // main loop for sub thread
task_t *task = task_queue_pop(pool); // pop one task -> blocking function task_t *task = task_queue_pop(pool); // pop one task -> blocking function
pthread_mutex_lock(&pool->busy_thr_num_mutex);
++pool->busy_thr_num; // change busy thread number
pthread_mutex_unlock(&pool->busy_thr_num_mutex);
task->func(task->arg); // start running task function // pthread_mutex_lock(&pool->busy_thr_num_mutex);
// ++pool->busy_thr_num; // change busy thread number
// pthread_mutex_unlock(&pool->busy_thr_num_mutex);
pthread_mutex_lock(&pool->busy_thr_num_mutex); task->entry(task->arg); // start running task function
--pool->busy_thr_num; // change busy thread number
pthread_mutex_unlock(&pool->busy_thr_num_mutex);
free(task); // free allocated memory free(task); // free allocated memory
// pthread_mutex_lock(&pool->busy_thr_num_mutex);
pthread_mutex_lock(&pool->mutex);
--pool->busy_thr_num; // change busy thread number
pthread_mutex_unlock(&pool->mutex);
// pthread_mutex_unlock(&pool->busy_thr_num_mutex);
} }
printf("%lu -> sub thread exit\n", pthread_self()); printf("%lu -> sub thread exit\n", pthread_self());
pthread_exit(NULL);
return NULL; // sub thread exit return NULL; // sub thread exit
} }
@ -186,14 +247,20 @@ void tiny_pool_boot(pool_t *pool) {
// TODO: avoid booting multi-times // TODO: avoid booting multi-times
pthread_mutex_lock(&pool->status_mutex); // TODO: pre-check pool status: if != PREPARING --> return
// pthread_mutex_lock(&pool->status_mutex);
pthread_mutex_lock(&pool->mutex);
if (pool->status != PREPARING) { if (pool->status != PREPARING) {
pthread_mutex_unlock(&pool->status_mutex); // pthread_mutex_unlock(&pool->status_mutex);
pthread_mutex_unlock(&pool->mutex);
return; return;
} }
pthread_mutex_lock(&pool->task_queue_busy); pool->status = RUNNING;
// pthread_mutex_lock(&pool->task_queue_busy);
for (uint32_t i = 0; i < pool->thread_num; ++i) { for (uint32_t i = 0; i < pool->thread_num; ++i) {
pthread_create(&(pool->threads[i]), NULL, thread_entry, (void*)pool); pthread_create(&(pool->threads[i]), NULL, thread_entry, (void*)pool);
@ -201,10 +268,10 @@ void tiny_pool_boot(pool_t *pool) {
printf("thread boot complete\n"); printf("thread boot complete\n");
pool->status = RUNNING; // pthread_mutex_unlock(&pool->task_queue_busy);
// pthread_mutex_unlock(&pool->status_mutex);
pthread_mutex_unlock(&pool->task_queue_busy); pthread_mutex_unlock(&pool->mutex);
pthread_mutex_unlock(&pool->status_mutex);
} }
@ -221,11 +288,13 @@ bool tiny_pool_join(pool_t *pool) {
printf("start pool join\n"); printf("start pool join\n");
pthread_mutex_lock(&pool->status_mutex); // pthread_mutex_lock(&pool->status_mutex);
pthread_mutex_lock(&pool->mutex);
if (pool->status != RUNNING) { if (pool->status != RUNNING) {
pthread_mutex_unlock(&pool->status_mutex); // pthread_mutex_unlock(&pool->status_mutex);
pthread_mutex_unlock(&pool->mutex);
return false; return false;
@ -233,6 +302,8 @@ bool tiny_pool_join(pool_t *pool) {
pool->status = STOPPING; pool->status = STOPPING;
pthread_mutex_unlock(&pool->mutex);
// TODO: join process // TODO: join process
return true; return true;

11
tiny_pool.h

@ -53,25 +53,28 @@ enum pool_status {
}; };
typedef struct task_t { typedef struct task_t {
void (*func)(void*); // function pointer of the task void (*entry)(void*); // function pointer of the task
void *arg; // argument of task function void *arg; // argument of task function
struct task_t *next; // the next task in the queue struct task_t *next; // the next task in the queue
} task_t; } task_t;
typedef struct pool_t { typedef struct pool_t {
pthread_mutex_t mutex; // global pool mutex
pthread_t *threads; // store thread id pthread_t *threads; // store thread id
uint32_t thread_num; // number of threads uint32_t thread_num; // number of threads
enum pool_status status; // life cycle state enum pool_status status; // life cycle state
pthread_mutex_t status_mutex; // mutex for `status` // pthread_mutex_t status_mutex; // mutex for `status`
uint32_t busy_thr_num; // number of working threads uint32_t busy_thr_num; // number of working threads
pthread_mutex_t busy_thr_num_mutex; // mutex for `busy_thr_num` // pthread_mutex_t busy_thr_num_mutex; // mutex for `busy_thr_num`
task_t *task_queue_front; // head of task queue task_t *task_queue_front; // head of task queue
task_t *task_queue_rear; // end of task queue task_t *task_queue_rear; // end of task queue
uint32_t task_queue_size; // size of task queue uint32_t task_queue_size; // size of task queue
pthread_mutex_t task_queue_busy; // mutex for `task_queue_xxx` // pthread_mutex_t task_queue_busy; // mutex for `task_queue_xxx`
pthread_cond_t task_queue_empty; // condition for task queue becomes empty pthread_cond_t task_queue_empty; // condition for task queue becomes empty
pthread_cond_t task_queue_not_empty; // condition for task queue becomes not empty pthread_cond_t task_queue_not_empty; // condition for task queue becomes not empty

Loading…
Cancel
Save