Browse Source

perf: update tiny pool core

master
Dnomd343 1 year ago
parent
commit
77351b1aac
  1. 198
      src/tiny_pool.c

198
src/tiny_pool.c

@ -2,6 +2,76 @@
#include <memory.h>
#include "tiny_pool.h"
void* run_pool_join(void *pool) { // single function for detach
tiny_pool_join((pool_t*)pool);
pthread_exit(NULL);
}
void tiny_pool_detach(pool_t *pool) {
pthread_t tid;
pthread_create(&tid, NULL, run_pool_join, (void*)pool);
pthread_detach(tid);
}
void free_tiny_pool(pool_t *pool) {
pthread_cond_destroy(&pool->without_busy_thread);
pthread_cond_destroy(&pool->task_queue_not_empty);
pthread_cond_destroy(&pool->task_queue_empty);
pthread_mutex_destroy(&pool->mutex);
free(pool->threads);
free(pool);
}
void tiny_pool_kill(pool_t *pool) {
if (pool->status > PREPARING) {
for (uint32_t i = 0; i < pool->thread_num; ++i) {
pthread_cancel(pool->threads[i]);
}
}
free_tiny_pool(pool); // release thread pool
}
void task_queue_push(pool_t *pool, task_t *task) {
if (pool->task_queue_rear == NULL) { // task queue is empty
pool->task_queue_front = task;
pool->task_queue_rear = task; // init task queue with one element
} else {
pool->task_queue_rear->next = task; // task queue push back
pool->task_queue_rear = task;
}
++pool->task_queue_size;
}
task_t* task_queue_pop(pool_t *pool) { // pop one task with blocking wait
/// wait until task queue not empty
pthread_mutex_lock(&pool->mutex); // lock pool struct
while (pool->task_queue_front == NULL) { // loop until task queue not empty
pthread_cond_wait(&pool->task_queue_not_empty, &pool->mutex); // wait new task added
if (pool->status == EXITING) {
pthread_mutex_unlock(&pool->mutex);
pthread_exit(NULL); // sub thread exit at EXITING stage
}
}
/// pop first task from queue
bool empty_flag = false;
task_t *front = pool->task_queue_front;
pool->task_queue_front = front->next; // pop first task
if (front->next == NULL) { // only one element
pool->task_queue_rear = NULL; // clear task queue
empty_flag = true;
}
--pool->task_queue_size;
++pool->busy_thr_num; // task must pop by one ready thread
pthread_mutex_unlock(&pool->mutex); // unlock task queue
/// send signal to active blocking thread
if (empty_flag) { // send signal after mutex unlocked
pthread_cond_signal(&pool->task_queue_empty); // active pool join thread
}
return front; // success pop one task
}
pool_t* tiny_pool_create(uint32_t size) {
/// thread pool struct create
pool_t *pool = (pool_t*)malloc(sizeof(pool_t));
@ -55,57 +125,6 @@ pool_t* tiny_pool_create(uint32_t size) {
return pool; // tiny thread pool create success
}
void task_queue_push(pool_t *pool, task_t *task) {
// printf("push new task %d\n", *(int*)task->arg);
if (pool->task_queue_rear == NULL) { // task queue is empty
pool->task_queue_front = task;
pool->task_queue_rear = task; // init task queue with one element
} else {
pool->task_queue_rear->next = task; // task queue push back
pool->task_queue_rear = task;
}
++pool->task_queue_size;
// printf("push success -> size = %d\n", pool->task_queue_size);
}
task_t* task_queue_pop(pool_t *pool) { // pop one task with blocking wait
// printf("%lu -> start pop task\n", pthread_self());
/// wait until task queue not empty
pthread_mutex_lock(&pool->mutex); // lock pool struct
while (pool->task_queue_front == NULL) { // loop until task queue not empty
// printf("%lu -> pop wait\n", pthread_self());
pthread_cond_wait(&pool->task_queue_not_empty, &pool->mutex); // wait new task added
// printf("%lu -> pop exit wait\n", pthread_self());
if (pool->status == EXITING) {
pthread_mutex_unlock(&pool->mutex);
// printf("%lu -> sub thread exit from idle\n", pthread_self());
pthread_exit(NULL); // sub thread exit at EXITING stage
}
}
// printf("%lu -> pop new task %d\n", pthread_self(), *(int*)pool->task_queue_front->arg);
/// pop first task from queue
bool empty_flag = false;
task_t *front = pool->task_queue_front;
pool->task_queue_front = front->next; // pop first task
if (front->next == NULL) { // only one element
pool->task_queue_rear = NULL; // clear task queue
empty_flag = true;
}
--pool->task_queue_size;
++pool->busy_thr_num; // task must pop by one ready thread
// printf("%lu -> pop success -> size = %d\n", pthread_self(), pool->task_queue_size);
pthread_mutex_unlock(&pool->mutex); // unlock task queue
/// send signal to active blocking thread
if (empty_flag) { // send signal after mutex unlocked
// printf("signal -> task queue empty\n");
pthread_cond_signal(&pool->task_queue_empty); // active pool join thread
}
return front; // success pop one task
}
bool tiny_pool_submit(pool_t *pool, void (*func)(void*), void *arg) {
/// pre-check to avoid invalid mutex waiting
if (pool->status > RUNNING) {
@ -137,7 +156,6 @@ bool tiny_pool_submit(pool_t *pool, void (*func)(void*), void *arg) {
/// send signal to active blocking thread
if (signal_flag) {
// printf("signal -> queue not empty\n");
pthread_cond_signal(&pool->task_queue_not_empty); // active one blocking thread
}
return true; // task push success
@ -145,10 +163,7 @@ bool tiny_pool_submit(pool_t *pool, void (*func)(void*), void *arg) {
void* thread_entry(void *pool_ptr) { // main loop for sub thread
pool_t *pool = (pool_t*)pool_ptr;
// printf("%lu -> sub thread begin\n", pthread_self());
while (pool->status != EXITING) { // loop until enter EXITING stage
// printf("%lu -> sub thread working\n", pthread_self());
/// pop a task and execute it
task_t *task = task_queue_pop(pool); // pop one task -> blocking function
task->entry(task->arg); // start running task function
@ -163,7 +178,6 @@ void* thread_entry(void *pool_ptr) { // main loop for sub thread
pthread_cond_signal(&pool->without_busy_thread);
}
}
// printf("%lu -> sub thread exit\n", pthread_self());
pthread_exit(NULL); // sub thread exit
}
@ -183,31 +197,10 @@ void tiny_pool_boot(pool_t *pool) {
for (uint32_t i = 0; i < pool->thread_num; ++i) { // create working threads
pthread_create(&(pool->threads[i]), NULL, thread_entry, (void*)pool);
}
// printf("thread boot complete\n");
pthread_mutex_unlock(&pool->mutex);
}
void free_tiny_pool(pool_t *pool) {
// printf("start free pool resource\n");
pthread_cond_destroy(&pool->without_busy_thread);
pthread_cond_destroy(&pool->task_queue_not_empty);
pthread_cond_destroy(&pool->task_queue_empty);
pthread_mutex_destroy(&pool->mutex);
free(pool->threads);
free(pool);
// printf("free pool resource complete\n");
}
#include <unistd.h>
bool tiny_pool_join(pool_t *pool) {
// printf("start pool join\n");
/// pre-check to avoid invalid mutex waiting
if (pool->status != RUNNING) {
return false;
@ -220,63 +213,18 @@ bool tiny_pool_join(pool_t *pool) {
return false; // only allow to join at RUNNING stage
}
pool->status = STOPPING;
// printf("pool status -> STOPPING\n");
// printf("wait task queue\n");
while (pool->task_queue_front != NULL) {
while (pool->task_queue_front != NULL) { // wait empty task queue
pthread_cond_wait(&pool->task_queue_empty, &pool->mutex);
}
// printf("task queue empty\n");
pool->status = EXITING;
// printf("pool status -> EXITING\n");
// printf("start wait busy threads -> %d\n", pool->busy_thr_num);
while (pool->busy_thr_num != 0) {
while (pool->busy_thr_num != 0) { // wait all threads exit
pthread_cond_wait(&pool->without_busy_thread, &pool->mutex);
}
// printf("all thread idle\n");
pthread_mutex_unlock(&pool->mutex); // prevent other functions blocking waiting
// printf("start sub threads joining\n");
for (uint32_t i = 0; i < pool->thread_num; ++i) {
pthread_cond_broadcast(&pool->task_queue_not_empty); // trigger idle threads
// printf("start join sub thread %lu\n", pool->threads[i]);
pthread_join(pool->threads[i], NULL);
// printf("sub thread %lu joined\n", pool->threads[i]);
}
// printf("sub threads join complete\n");
free_tiny_pool(pool);
free_tiny_pool(pool); // release thread pool
return true;
}
void* run_pool_join(void *pool) {
// printf("run pool join from detach\n");
tiny_pool_join((pool_t*)pool);
// printf("pool join complete\n");
pthread_exit(NULL);
}
void tiny_pool_detach(pool_t *pool) {
pthread_t tid;
// printf("run pool detach\n");
pthread_create(&tid, NULL, run_pool_join, (void*)pool);
pthread_detach(tid);
}
void tiny_pool_kill(pool_t *pool) {
// printf("run pool kill\n");
if (pool->status > PREPARING) {
// printf("kill sub threads\n");
for (uint32_t i = 0; i < pool->thread_num; ++i) {
pthread_cancel(pool->threads[i]);
}
// printf("kill complete\n");
}
free_tiny_pool(pool);
}

Loading…
Cancel
Save