Browse Source

update: enhance submit process and sub thread loop

master
Dnomd343 1 year ago
parent
commit
efe3bef4d1
  1. 70
      tiny_pool.c
  2. 8
      tiny_pool.h

70
tiny_pool.c

@ -83,7 +83,7 @@ void task_queue_push(pool_t *pool, task_t *task) {
printf("push success -> size = %d\n", pool->task_queue_size); printf("push success -> size = %d\n", pool->task_queue_size);
pthread_mutex_unlock(&pool->task_queue_busy); // unlock task queue pthread_mutex_unlock(&pool->task_queue_busy); // unlock task queue
if (pool->status >= RUNNING) { // avoid send signal in PREPARING stage if (pool->status > PREPARING) { // avoid send signal in PREPARING stage
printf("signal -> queue not empty\n"); printf("signal -> queue not empty\n");
@ -100,6 +100,8 @@ task_t* task_queue_pop(pool_t *pool) { // pop one task with blocking wait
printf("pop start wait\n"); printf("pop start wait\n");
// TODO: at EXITING process may receive active broadcast -> we should stop pop task here
// should we cancel thread here directly, or return NULL for sub thread loop?
pthread_cond_wait(&pool->task_queue_not_empty, &pool->task_queue_busy); // wait new task added pthread_cond_wait(&pool->task_queue_not_empty, &pool->task_queue_busy); // wait new task added
printf("pop exit wait\n"); printf("pop exit wait\n");
@ -108,11 +110,12 @@ task_t* task_queue_pop(pool_t *pool) { // pop one task with blocking wait
printf("pop new task\n"); printf("pop new task\n");
bool queue_empty = false;
task_t *front = pool->task_queue_front; task_t *front = pool->task_queue_front;
if (pool->task_queue_front == pool->task_queue_rear) { // only one element if (pool->task_queue_front == pool->task_queue_rear) { // only one element
pool->task_queue_front = NULL; // clear task queue pool->task_queue_front = NULL; // clear task queue
pool->task_queue_rear = NULL; pool->task_queue_rear = NULL;
pthread_cond_signal(&pool->task_queue_empty); // active blocking join thread queue_empty = true;
} else { } else {
pool->task_queue_front = front->next; // pop first task pool->task_queue_front = front->next; // pop first task
} }
@ -121,65 +124,55 @@ task_t* task_queue_pop(pool_t *pool) { // pop one task with blocking wait
printf("pop success -> size = %d\n", pool->task_queue_size); printf("pop success -> size = %d\n", pool->task_queue_size);
pthread_mutex_unlock(&pool->task_queue_busy); // unlock task queue pthread_mutex_unlock(&pool->task_queue_busy); // unlock task queue
if (queue_empty) { // send signal after mutex unlocked
pthread_cond_signal(&pool->task_queue_empty); // active blocking pool join thread
}
return front; // success pop one task return front; // success pop one task
} }
bool tiny_pool_submit(pool_t *pool, void (*func)(void*), void *arg) { bool tiny_pool_submit(pool_t *pool, void (*func)(void*), void *arg) {
if (pool->status > RUNNING) {
// check status -> failed return false; // adding task is prohibited after STOPPING
if (pool->status == EXITING) {
return false;
// TODO: return false here
} }
// TODO: malloc error -> return bool false
task_t *new_task = (task_t*)malloc(sizeof(task_t)); task_t *new_task = (task_t*)malloc(sizeof(task_t));
if (new_task == NULL) {
new_task->func = func; return false; // malloc new task error -> stop submit
}
new_task->func = func; // load custom task
new_task->arg = arg; new_task->arg = arg;
new_task->next = NULL; new_task->next = NULL;
task_queue_push(pool, new_task); // push into task queue
// TODO: new task push into task queue return true; // task push success
task_queue_push(pool, new_task);
// TODO: queue push may failed -> return false
// TODO: return bool true
return true;
} }
void* thread_entry(void *pool_ptr) { void* thread_entry(void *pool_ptr) { // main loop for sub thread
// TODO: main loop for one thread
// TODO: check if thread pool exiting
// TODO: pop one task --failed--> blocking wait
// --success--> start running and then free task_t
pool_t *pool = (pool_t*)pool_ptr; pool_t *pool = (pool_t*)pool_ptr;
while (pool->status != EXITING) { // loop until enter EXITING mode
while (pool->status != EXITING) { printf("sub thread working\n");
printf("thread working\n"); task_t *task = task_queue_pop(pool); // pop one task -> blocking function
task_t *task = task_queue_pop(pool); pthread_mutex_lock(&pool->busy_thr_num_mutex);
++pool->busy_thr_num; // change busy thread number
pthread_mutex_unlock(&pool->busy_thr_num_mutex);
// task working task->func(task->arg); // start running task function
task->func(task->arg);
free(task); pthread_mutex_lock(&pool->busy_thr_num_mutex);
--pool->busy_thr_num; // change busy thread number
pthread_mutex_unlock(&pool->busy_thr_num_mutex);
free(task); // free allocated memory
} }
printf("sub thread exit\n"); printf("sub thread exit\n");
return NULL; return NULL; // sub thread exit
} }
// TODO: should we return a bool value?
void tiny_pool_boot(pool_t *pool) { void tiny_pool_boot(pool_t *pool) {
// TODO: create admin thread // TODO: create admin thread
@ -209,6 +202,7 @@ void tiny_pool_boot(pool_t *pool) {
} }
//void tiny_pool_kill(pool_t *pool) { //void tiny_pool_kill(pool_t *pool) {
// //
// printf("pool enter EXITING status\n"); // printf("pool enter EXITING status\n");

8
tiny_pool.h

@ -46,10 +46,10 @@
/// or `tiny_pool_detach` interface. /// or `tiny_pool_detach` interface.
enum pool_status { enum pool_status {
PREPARING, PREPARING = 0,
RUNNING, RUNNING = 1,
STOPPING, STOPPING = 2,
EXITING, EXITING = 3,
}; };
typedef struct task_t { typedef struct task_t {

Loading…
Cancel
Save