Browse Source

feat: thread pool joining

master
Dnomd343 1 year ago
parent
commit
5076e5a3ce
  1. 26
      main.c
  2. 74
      tiny_pool.c
  3. 1
      tiny_pool.h

26
main.c

@ -4,12 +4,12 @@
void demo_fun(void *i) {
int num = *(int*)i;
printf("task %d start\n", num);
printf(" task %d start\n", num);
for (int t = 0; t < num; ++t) {
sleep(1);
printf("task %d running...\n", num);
printf(" task %d running...\n", num);
}
printf("task %d complete\n", num);
printf(" task %d complete\n", num);
}
int main() {
@ -20,32 +20,34 @@ int main() {
tiny_pool_submit(pool, demo_fun, (void*)&dat[0]);
tiny_pool_submit(pool, demo_fun, (void*)&dat[1]);
printf("main: pool booting\n");
printf("+ main: pool booting\n");
tiny_pool_boot(pool);
printf("main: pool boot complete\n");
printf("+ main: pool boot complete\n");
printf("main: sleep 5s\n");
printf("+ main: sleep 5s\n");
sleep(5);
printf("main: wake up\n");
printf("+ main: wake up\n");
tiny_pool_submit(pool, demo_fun, (void*)&dat[2]);
tiny_pool_submit(pool, demo_fun, (void*)&dat[3]);
tiny_pool_submit(pool, demo_fun, (void*)&dat[4]);
printf("main: sleep 8s\n");
printf("+ main: sleep 8s\n");
sleep(6);
printf("main: wake up\n");
printf("+ main: wake up\n");
// TODO: tiny pool join
printf("+ main: pool joining\n");
tiny_pool_join(pool);
printf("+ main: pool join complete\n");
// printf("pool try exit\n");
// tiny_pool_kill(pool);
// TODO: tiny pool destroy
sleep(10);
// sleep(10);
printf("main exit\n");
printf("+ main exit\n");
return 0;
}

74
tiny_pool.c

@ -44,6 +44,14 @@ pool_t* tiny_pool_create(uint32_t size) {
free(pool);
return NULL; // task queue cond init error -> revoke create
}
if (pthread_cond_init(&pool->without_busy_thread, NULL)) {
pthread_cond_destroy(&pool->task_queue_not_empty);
pthread_cond_destroy(&pool->task_queue_empty);
pthread_mutex_destroy(&pool->mutex);
free(pool->threads);
free(pool);
return NULL; // busy thread num cond init error -> revoke create
}
return pool; // tiny thread pool create success
}
@ -70,6 +78,7 @@ task_t* task_queue_pop(pool_t *pool) { // pop one task with blocking wait
pthread_cond_wait(&pool->task_queue_not_empty, &pool->mutex); // wait new task added
printf("%lu -> pop exit wait\n", pthread_self());
if (pool->status == EXITING) {
printf("%lu -> sub thread exit from idle\n", pthread_self());
pthread_exit(NULL); // sub thread exit at EXITING stage
}
}
@ -146,7 +155,11 @@ void* thread_entry(void *pool_ptr) { // main loop for sub thread
/// mark thread as idle
pthread_mutex_lock(&pool->mutex);
--pool->busy_thr_num; // change busy thread number
bool busy_flag = (pool->busy_thr_num == 0);
pthread_mutex_unlock(&pool->mutex);
if (busy_flag) {
pthread_cond_signal(&pool->without_busy_thread);
}
}
printf("%lu -> sub thread exit\n", pthread_self());
pthread_exit(NULL); // sub thread exit
@ -158,7 +171,7 @@ void tiny_pool_boot(pool_t *pool) {
return; // only allow to boot at PREPARING stage
}
/// handle pool thread booting
/// handle pool booting
pthread_mutex_lock(&pool->mutex);
if (pool->status != PREPARING) {
pthread_mutex_unlock(&pool->mutex);
@ -173,42 +186,53 @@ void tiny_pool_boot(pool_t *pool) {
}
bool tiny_pool_join(pool_t *pool) {
// TODO: set status -> JOINING -> avoid submit
// TODO: wait --until--> queue empty
// TODO: set status -> EXITING -> some thread may exit
// TODO: signal broadcast -> wait all thread exit
printf("start pool join\n");
// pthread_mutex_lock(&pool->status_mutex);
pthread_mutex_lock(&pool->mutex);
/// pre-check to avoid invalid mutex waiting
if (pool->status != RUNNING) {
// pthread_mutex_unlock(&pool->status_mutex);
pthread_mutex_unlock(&pool->mutex);
return false;
}
/// handle pool threads joining
pthread_mutex_lock(&pool->mutex);
if (pool->status != RUNNING) {
pthread_mutex_unlock(&pool->mutex);
return false; // only allow to join at RUNNING stage
}
pool->status = STOPPING;
printf("pool status -> STOPPING\n");
printf("wait task queue\n");
while (pool->task_queue_front != NULL) {
pthread_cond_wait(&pool->task_queue_empty, &pool->mutex);
}
printf("task queue empty\n");
pthread_mutex_unlock(&pool->mutex);
// TODO: join process
pool->status = EXITING;
printf("pool status -> EXITING\n");
return true;
}
printf("start wait busy threads -> %d\n", pool->busy_thr_num);
while (pool->busy_thr_num != 0) {
pthread_cond_wait(&pool->without_busy_thread, &pool->mutex);
}
printf("all thread idle\n");
//void tiny_pool_wait(pool_t *pool) {
pthread_mutex_unlock(&pool->mutex); // prevent other functions blocking waiting
// TODO: wait all tasks exit
// TODO: check `busy_thread_num` == 0 and queue empty
printf("unlock mutex and send broadcast\n");
// pthread_cond_broadcast(&pool->task_queue_not_empty); // send broadcast to trigger idle threads
//}
// TODO: signal broadcast and wait all thread exit
printf("start sub threads joining\n");
for (uint32_t i = 0; i < pool->thread_num; ++i) {
pthread_cond_broadcast(&pool->task_queue_not_empty); // trigger idle threads
pthread_join(pool->threads[i], NULL);
printf("sub thread %lu joined\n", pool->threads[i]);
}
printf("sub threads join complete\n");
return true;
}

1
tiny_pool.h

@ -72,6 +72,7 @@ typedef struct pool_t {
pthread_cond_t task_queue_empty; // condition for task queue becomes empty
pthread_cond_t task_queue_not_empty; // condition for task queue becomes not empty
pthread_cond_t without_busy_thread; // condition for busy thread number becomes zero
} pool_t;
/// This function create a new thread pool, you need to specify the number of threads,

Loading…
Cancel
Save