Browse Source

perf: pool submit, boot and sub thread process

master
Dnomd343 1 year ago
parent
commit
35c2b3d7be
  1. 21
      main.c
  2. 206
      tiny_pool.c
  3. 10
      tiny_pool.h

21
main.c

@ -4,9 +4,12 @@
void demo_fun(void *i) { void demo_fun(void *i) {
int num = *(int*)i; int num = *(int*)i;
printf("demo func sleep %ds\n", num); printf("task %d start\n", num);
sleep(num); for (int t = 0; t < num; ++t) {
printf("demo func %d wake up\n", num); sleep(1);
printf("task %d running...\n", num);
}
printf("task %d complete\n", num);
} }
int main() { int main() {
@ -17,19 +20,21 @@ int main() {
tiny_pool_submit(pool, demo_fun, (void*)&dat[0]); tiny_pool_submit(pool, demo_fun, (void*)&dat[0]);
tiny_pool_submit(pool, demo_fun, (void*)&dat[1]); tiny_pool_submit(pool, demo_fun, (void*)&dat[1]);
printf("pool booting\n"); printf("main: pool booting\n");
tiny_pool_boot(pool); tiny_pool_boot(pool);
printf("pool running\n"); printf("main: pool boot complete\n");
printf("main thread sleep\n"); printf("main: sleep 5s\n");
sleep(5); sleep(5);
printf("main thread wake up\n"); printf("main: wake up\n");
tiny_pool_submit(pool, demo_fun, (void*)&dat[2]); tiny_pool_submit(pool, demo_fun, (void*)&dat[2]);
tiny_pool_submit(pool, demo_fun, (void*)&dat[3]); tiny_pool_submit(pool, demo_fun, (void*)&dat[3]);
tiny_pool_submit(pool, demo_fun, (void*)&dat[4]); tiny_pool_submit(pool, demo_fun, (void*)&dat[4]);
sleep(8); printf("main: sleep 8s\n");
sleep(6);
printf("main: wake up\n");
// TODO: tiny pool join // TODO: tiny pool join

206
tiny_pool.c

@ -6,7 +6,7 @@ pool_t* tiny_pool_create(uint32_t size) {
/// thread pool struct create /// thread pool struct create
pool_t *pool = (pool_t*)malloc(sizeof(pool_t)); pool_t *pool = (pool_t*)malloc(sizeof(pool_t));
if (pool == NULL) { if (pool == NULL) {
return NULL; // malloc pool failed -> stop create return NULL; // malloc pool failed -> revoke create
} }
/// threads memory initialize /// threads memory initialize
@ -14,7 +14,7 @@ pool_t* tiny_pool_create(uint32_t size) {
pool->threads = (pthread_t*)malloc(sizeof(pthread_t) * size); pool->threads = (pthread_t*)malloc(sizeof(pthread_t) * size);
if (pool->threads == NULL) { if (pool->threads == NULL) {
free(pool); free(pool);
return NULL; // malloc threads failed -> stop create return NULL; // malloc threads failed -> revoke create
} }
memset(pool->threads, 0, sizeof(pthread_t) * size); // clean thread ids as zero memset(pool->threads, 0, sizeof(pthread_t) * size); // clean thread ids as zero
@ -25,64 +25,30 @@ pool_t* tiny_pool_create(uint32_t size) {
pool->task_queue_rear = NULL; pool->task_queue_rear = NULL;
pool->task_queue_front = NULL; pool->task_queue_front = NULL;
/// thread mutex initialization /// mutex and conditions initialization
if (pthread_mutex_init(&pool->mutex, NULL)) {
pthread_mutex_init(&pool->mutex, NULL); free(pool->threads);
free(pool);
// if (pthread_mutex_init(&pool->status_mutex, NULL)) { return NULL; // global mutex init error -> revoke create
// free(pool->threads); }
// free(pool);
// return NULL; // status mutex init error -> stop create
// }
// if (pthread_mutex_init(&pool->task_queue_busy, NULL)) {
// pthread_mutex_destroy(&pool->status_mutex);
// free(pool->threads);
// free(pool);
// return NULL; // queue mutex init error -> stop create
// }
// if (pthread_mutex_init(&pool->busy_thr_num_mutex, NULL)) {
// pthread_mutex_destroy(&pool->task_queue_busy);
// pthread_mutex_destroy(&pool->status_mutex);
// free(pool->threads);
// free(pool);
// return NULL; // busy thread num mutex init error -> stop create
// }
/// thread condition variable initialization
if (pthread_cond_init(&pool->task_queue_empty, NULL)) { if (pthread_cond_init(&pool->task_queue_empty, NULL)) {
// pthread_mutex_destroy(&pool->busy_thr_num_mutex);
// pthread_mutex_destroy(&pool->task_queue_busy);
// pthread_mutex_destroy(&pool->status_mutex);
pthread_mutex_destroy(&pool->mutex); pthread_mutex_destroy(&pool->mutex);
free(pool->threads); free(pool->threads);
free(pool); free(pool);
return NULL; // pthread cond init error -> stop create return NULL; // task queue cond init error -> revoke create
} }
if (pthread_cond_init(&pool->task_queue_not_empty, NULL)) { if (pthread_cond_init(&pool->task_queue_not_empty, NULL)) {
pthread_cond_destroy(&pool->task_queue_empty); pthread_cond_destroy(&pool->task_queue_empty);
// pthread_mutex_destroy(&pool->busy_thr_num_mutex);
// pthread_mutex_destroy(&pool->task_queue_busy);
// pthread_mutex_destroy(&pool->status_mutex);
pthread_mutex_destroy(&pool->mutex); pthread_mutex_destroy(&pool->mutex);
free(pool->threads); free(pool->threads);
free(pool); free(pool);
return NULL; return NULL; // task queue cond init error -> revoke create
} }
return pool; // tiny thread pool create success return pool; // tiny thread pool create success
} }
void task_queue_push(pool_t *pool, task_t *task) { void task_queue_push(pool_t *pool, task_t *task) {
printf("push new task\n");
printf("push one task\n");
// pthread_mutex_lock(&pool->task_queue_busy); // lock task queue
printf("start push process\n");
if (pool->task_queue_rear == NULL) { // task queue is empty if (pool->task_queue_rear == NULL) { // task queue is empty
pool->task_queue_front = task; pool->task_queue_front = task;
pool->task_queue_rear = task; // init task queue with one element pool->task_queue_rear = task; // init task queue with one element
@ -91,188 +57,119 @@ void task_queue_push(pool_t *pool, task_t *task) {
pool->task_queue_rear = task; pool->task_queue_rear = task;
} }
++pool->task_queue_size; ++pool->task_queue_size;
printf("push success -> size = %d\n", pool->task_queue_size); printf("push success -> size = %d\n", pool->task_queue_size);
// bool signal_flag = false;
// if (pool->status > PREPARING) {
// signal_flag = true;
// }
// pthread_mutex_unlock(&pool->task_queue_busy); // unlock task queue
pthread_mutex_unlock(&pool->mutex); // unlock task queue
// if (pool->status > PREPARING) { // avoid send signal in PREPARING stage
// if (signal_flag) {
//
// printf("signal -> queue not empty\n");
//
// pthread_cond_signal(&pool->task_queue_not_empty); // active one blocking thread
// }
} }
task_t* task_queue_pop(pool_t *pool) { // pop one task with blocking wait task_t* task_queue_pop(pool_t *pool) { // pop one task with blocking wait
printf("%lu -> pop one task\n", pthread_self());
printf("%lu -> try pop one task\n", pthread_self()); /// wait until task queue not empty
pthread_mutex_lock(&pool->mutex); // lock pool struct
// pthread_mutex_lock(&pool->task_queue_busy); // lock task queue
pthread_mutex_lock(&pool->mutex); // lock task queue
while (pool->task_queue_front == NULL) { // loop until task queue not empty while (pool->task_queue_front == NULL) { // loop until task queue not empty
printf("%lu -> pop start wait\n", pthread_self()); printf("%lu -> pop start wait\n", pthread_self());
// TODO: at EXITING process may receive active broadcast -> we should stop pop task here
// should we cancel thread here directly, or return NULL for sub thread loop?
// pthread_cond_wait(&pool->task_queue_not_empty, &pool->task_queue_busy); // wait new task added
pthread_cond_wait(&pool->task_queue_not_empty, &pool->mutex); // wait new task added pthread_cond_wait(&pool->task_queue_not_empty, &pool->mutex); // wait new task added
// TODO: for now, it seems that first one is more suitable
printf("%lu -> pop exit wait\n", pthread_self()); printf("%lu -> pop exit wait\n", pthread_self());
if (pool->status == EXITING) {
pthread_exit(NULL); // sub thread exit at EXITING stage
}
} }
printf("%lu -> pop new task\n", pthread_self()); printf("%lu -> pop new task\n", pthread_self());
bool queue_empty = false; /// pop first task from queue
bool empty_flag = false;
task_t *front = pool->task_queue_front; task_t *front = pool->task_queue_front;
if (pool->task_queue_front == pool->task_queue_rear) { // only one element pool->task_queue_front = front->next; // pop first task
pool->task_queue_front = NULL; // clear task queue if (front->next == NULL) { // only one element
pool->task_queue_rear = NULL; pool->task_queue_rear = NULL; // clear task queue
queue_empty = true; empty_flag = true;
} else {
pool->task_queue_front = front->next; // pop first task
} }
--pool->task_queue_size; --pool->task_queue_size;
++pool->busy_thr_num; // task must pop by one ready thread ++pool->busy_thr_num; // task must pop by one ready thread
printf("%lu -> pop success -> size = %d\n", pthread_self(), pool->task_queue_size); printf("%lu -> pop success -> size = %d\n", pthread_self(), pool->task_queue_size);
// pthread_mutex_unlock(&pool->task_queue_busy); // unlock task queue
pthread_mutex_unlock(&pool->mutex); // unlock task queue pthread_mutex_unlock(&pool->mutex); // unlock task queue
if (queue_empty) { // send signal after mutex unlocked /// send signal to active blocking thread
pthread_cond_signal(&pool->task_queue_empty); // active blocking pool join thread if (empty_flag) { // send signal after mutex unlocked
pthread_cond_signal(&pool->task_queue_empty); // active pool join thread
} }
return front; // success pop one task return front; // success pop one task
} }
bool tiny_pool_submit(pool_t *pool, void (*func)(void*), void *arg) { bool tiny_pool_submit(pool_t *pool, void (*func)(void*), void *arg) {
/// pre-check to avoid invalid mutex waiting
if (pool->status > RUNNING) {
return false; // allow to add task at PREPARING and RUNNING stage
}
// TODO: pre-check status here: if > RUNNING --> return false /// initialize task structure
task_t *new_task = (task_t*)malloc(sizeof(task_t)); task_t *new_task = (task_t*)malloc(sizeof(task_t));
if (new_task == NULL) { if (new_task == NULL) {
return false; // malloc new task error -> stop submit return false; // malloc new task error -> failed submit
} }
new_task->entry = func; // load custom task new_task->entry = func; // load custom task
new_task->arg = arg; new_task->arg = arg;
new_task->next = NULL; new_task->next = NULL;
// TODO: warning -> check dead lock here /// handle task submit
// pthread_mutex_lock(&pool->status_mutex);
pthread_mutex_lock(&pool->mutex); pthread_mutex_lock(&pool->mutex);
if (pool->status > RUNNING) { // pool stage recheck after mutex lock
// pthread_mutex_lock(&pool->mutex); // lock task queue
if (pool->status > RUNNING) {
pthread_mutex_unlock(&pool->mutex); pthread_mutex_unlock(&pool->mutex);
free(new_task); free(new_task);
return false; // adding task is prohibited after RUNNING
return false; // adding task is prohibited after STOPPING
} else {
task_queue_push(pool, new_task); // push into task queue
} }
task_queue_push(pool, new_task); // push into task queue
bool signal_flag = false; bool signal_flag = false;
if (pool->status > PREPARING) { if (pool->status > PREPARING) { // only send active signal at RUNNING stage
signal_flag = true; signal_flag = true;
} }
pthread_mutex_unlock(&pool->mutex); // send signal after mutex unlock -> reduce thread churn
// pthread_mutex_unlock(&pool->status_mutex); /// send signal to active blocking thread
pthread_mutex_unlock(&pool->mutex);
if (signal_flag) { if (signal_flag) {
printf("signal -> queue not empty\n"); printf("signal -> queue not empty\n");
pthread_cond_signal(&pool->task_queue_not_empty); // active one blocking thread pthread_cond_signal(&pool->task_queue_not_empty); // active one blocking thread
} }
return true; // task push success return true; // task push success
} }
void* thread_entry(void *pool_ptr) { // main loop for sub thread void* thread_entry(void *pool_ptr) { // main loop for sub thread
pool_t *pool = (pool_t*)pool_ptr; pool_t *pool = (pool_t*)pool_ptr;
printf("%lu -> sub thread begin\n", pthread_self());
printf("start thread %lu\n", pthread_self()); while (pool->status != EXITING) { // loop until enter EXITING stage
while (pool->status != EXITING) { // loop until enter EXITING mode
printf("%lu -> sub thread working\n", pthread_self()); printf("%lu -> sub thread working\n", pthread_self());
/// pop a task and execute it
task_t *task = task_queue_pop(pool); // pop one task -> blocking function task_t *task = task_queue_pop(pool); // pop one task -> blocking function
// pthread_mutex_lock(&pool->busy_thr_num_mutex);
// ++pool->busy_thr_num; // change busy thread number
// pthread_mutex_unlock(&pool->busy_thr_num_mutex);
task->entry(task->arg); // start running task function task->entry(task->arg); // start running task function
free(task); // free allocated memory free(task); // free allocated memory
// pthread_mutex_lock(&pool->busy_thr_num_mutex); /// mark thread as idle
pthread_mutex_lock(&pool->mutex); pthread_mutex_lock(&pool->mutex);
--pool->busy_thr_num; // change busy thread number --pool->busy_thr_num; // change busy thread number
pthread_mutex_unlock(&pool->mutex); pthread_mutex_unlock(&pool->mutex);
// pthread_mutex_unlock(&pool->busy_thr_num_mutex);
} }
printf("%lu -> sub thread exit\n", pthread_self()); printf("%lu -> sub thread exit\n", pthread_self());
pthread_exit(NULL); // sub thread exit
pthread_exit(NULL);
return NULL; // sub thread exit
} }
// TODO: should we return a bool value?
void tiny_pool_boot(pool_t *pool) { void tiny_pool_boot(pool_t *pool) {
/// pre-check to avoid invalid mutex waiting
if (pool->status != PREPARING) {
return; // only allow to boot at PREPARING stage
}
// TODO: avoid booting multi-times /// handle pool thread booting
// TODO: pre-check pool status: if != PREPARING --> return
// pthread_mutex_lock(&pool->status_mutex);
pthread_mutex_lock(&pool->mutex); pthread_mutex_lock(&pool->mutex);
if (pool->status != PREPARING) { if (pool->status != PREPARING) {
// pthread_mutex_unlock(&pool->status_mutex);
pthread_mutex_unlock(&pool->mutex); pthread_mutex_unlock(&pool->mutex);
return; return; // only allow to boot at PREPARING stage
} }
pool->status = RUNNING; pool->status = RUNNING;
for (uint32_t i = 0; i < pool->thread_num; ++i) { // create working threads
// pthread_mutex_lock(&pool->task_queue_busy);
for (uint32_t i = 0; i < pool->thread_num; ++i) {
pthread_create(&(pool->threads[i]), NULL, thread_entry, (void*)pool); pthread_create(&(pool->threads[i]), NULL, thread_entry, (void*)pool);
} }
printf("thread boot complete\n"); printf("thread boot complete\n");
// pthread_mutex_unlock(&pool->task_queue_busy);
// pthread_mutex_unlock(&pool->status_mutex);
pthread_mutex_unlock(&pool->mutex); pthread_mutex_unlock(&pool->mutex);
} }
bool tiny_pool_join(pool_t *pool) { bool tiny_pool_join(pool_t *pool) {
@ -285,7 +182,6 @@ bool tiny_pool_join(pool_t *pool) {
// TODO: signal broadcast -> wait all thread exit // TODO: signal broadcast -> wait all thread exit
printf("start pool join\n"); printf("start pool join\n");
// pthread_mutex_lock(&pool->status_mutex); // pthread_mutex_lock(&pool->status_mutex);

10
tiny_pool.h

@ -45,7 +45,7 @@
/// as a fatal error in the main program). In other cases, it is recommended to use `tiny_pool_join` /// as a fatal error in the main program). In other cases, it is recommended to use `tiny_pool_join`
/// or `tiny_pool_detach` interface. /// or `tiny_pool_detach` interface.
enum pool_status { enum pool_stage {
PREPARING = 0, PREPARING = 0,
RUNNING = 1, RUNNING = 1,
STOPPING = 2, STOPPING = 2,
@ -59,22 +59,16 @@ typedef struct task_t {
} task_t; } task_t;
typedef struct pool_t { typedef struct pool_t {
pthread_mutex_t mutex; // global pool mutex pthread_mutex_t mutex; // global pool mutex
pthread_t *threads; // store thread id pthread_t *threads; // store thread id
uint32_t thread_num; // number of threads uint32_t thread_num; // number of threads
enum pool_status status; // life cycle state
// pthread_mutex_t status_mutex; // mutex for `status`
uint32_t busy_thr_num; // number of working threads uint32_t busy_thr_num; // number of working threads
// pthread_mutex_t busy_thr_num_mutex; // mutex for `busy_thr_num` enum pool_stage status; // tiny pool life cycle stage
task_t *task_queue_front; // head of task queue task_t *task_queue_front; // head of task queue
task_t *task_queue_rear; // end of task queue task_t *task_queue_rear; // end of task queue
uint32_t task_queue_size; // size of task queue uint32_t task_queue_size; // size of task queue
// pthread_mutex_t task_queue_busy; // mutex for `task_queue_xxx`
pthread_cond_t task_queue_empty; // condition for task queue becomes empty pthread_cond_t task_queue_empty; // condition for task queue becomes empty
pthread_cond_t task_queue_not_empty; // condition for task queue becomes not empty pthread_cond_t task_queue_not_empty; // condition for task queue becomes not empty

Loading…
Cancel
Save