You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

230 lines
8.0 KiB

#include <malloc.h>
#include <memory.h>
#include "tiny_pool.h"
void* run_pool_join(void *pool) { // single function for detach
tiny_pool_join((pool_t*)pool);
pthread_exit(NULL);
}
void tiny_pool_detach(pool_t *pool) {
pthread_t tid;
pthread_create(&tid, NULL, run_pool_join, (void*)pool); // new thread for detach
pthread_detach(tid);
}
void free_tiny_pool(pool_t *pool) { // free memory and destroy thread pool
pthread_cond_destroy(&pool->without_busy_thread);
pthread_cond_destroy(&pool->task_queue_not_empty);
pthread_cond_destroy(&pool->task_queue_empty);
pthread_mutex_destroy(&pool->mutex);
free(pool->threads);
free(pool);
}
void tiny_pool_kill(pool_t *pool) {
if (pool->status > PREPARING) { // threads are running
for (uint32_t i = 0; i < pool->thread_num; ++i) {
pthread_cancel(pool->threads[i]); // kill sub threads
}
}
free_tiny_pool(pool); // release thread pool
}
void task_queue_push(pool_t *pool, task_t *task) {
if (pool->task_queue_rear == NULL) { // task queue is empty
pool->task_queue_front = task;
pool->task_queue_rear = task; // init task queue with one element
} else {
pool->task_queue_rear->next = task; // task queue push back
pool->task_queue_rear = task;
}
++pool->task_queue_size;
}
task_t* task_queue_pop(pool_t *pool) { // pop one task with blocking wait
/// wait until task queue not empty
pthread_mutex_lock(&pool->mutex); // lock pool struct
while (pool->task_queue_front == NULL) { // loop until task queue not empty
pthread_cond_wait(&pool->task_queue_not_empty, &pool->mutex); // wait new task added
if (pool->status == EXITING) {
pthread_mutex_unlock(&pool->mutex);
pthread_exit(NULL); // sub thread exit at EXITING stage
}
}
/// pop first task from queue
bool empty_flag = false;
task_t *front = pool->task_queue_front;
pool->task_queue_front = front->next; // pop first task
if (front->next == NULL) { // only one element
pool->task_queue_rear = NULL; // clear task queue
empty_flag = true;
}
--pool->task_queue_size;
++pool->busy_thr_num; // task must pop by one ready thread
pthread_mutex_unlock(&pool->mutex); // unlock task queue
/// send signal to active blocking thread
if (empty_flag) { // send signal after mutex unlocked
pthread_cond_signal(&pool->task_queue_empty); // active pool join thread
}
return front; // success pop one task
}
pool_t* tiny_pool_create(uint32_t size) {
/// thread pool struct create
pool_t *pool = (pool_t*)malloc(sizeof(pool_t));
if (pool == NULL) {
return NULL; // malloc pool failed -> revoke create
}
/// threads memory initialize
pool->thread_num = size;
pool->threads = (pthread_t*)malloc(sizeof(pthread_t) * size);
if (pool->threads == NULL) {
free(pool);
return NULL; // malloc threads failed -> revoke create
}
memset(pool->threads, 0, sizeof(pthread_t) * size); // clean thread ids as zero
/// variable initialization
pool->busy_thr_num = 0;
pool->status = PREPARING;
pool->task_queue_size = 0;
pool->task_queue_rear = NULL;
pool->task_queue_front = NULL;
/// mutex and conditions initialization
if (pthread_mutex_init(&pool->mutex, NULL)) {
free(pool->threads);
free(pool);
return NULL; // global mutex init error -> revoke create
}
if (pthread_cond_init(&pool->task_queue_empty, NULL)) {
pthread_mutex_destroy(&pool->mutex);
free(pool->threads);
free(pool);
return NULL; // task queue cond init error -> revoke create
}
if (pthread_cond_init(&pool->task_queue_not_empty, NULL)) {
pthread_cond_destroy(&pool->task_queue_empty);
pthread_mutex_destroy(&pool->mutex);
free(pool->threads);
free(pool);
return NULL; // task queue cond init error -> revoke create
}
if (pthread_cond_init(&pool->without_busy_thread, NULL)) {
pthread_cond_destroy(&pool->task_queue_not_empty);
pthread_cond_destroy(&pool->task_queue_empty);
pthread_mutex_destroy(&pool->mutex);
free(pool->threads);
free(pool);
return NULL; // busy thread num cond init error -> revoke create
}
return pool; // tiny thread pool create success
}
bool tiny_pool_submit(pool_t *pool, void (*func)(void*), void *arg) {
/// pre-check to avoid invalid mutex waiting
if (pool->status > RUNNING) {
return false; // allow to add task at PREPARING and RUNNING stage
}
/// initialize task structure
task_t *new_task = (task_t*)malloc(sizeof(task_t));
if (new_task == NULL) {
return false; // malloc new task error -> failed submit
}
new_task->entry = func; // load custom task
new_task->arg = arg;
new_task->next = NULL;
/// handle task submit
pthread_mutex_lock(&pool->mutex);
if (pool->status > RUNNING) { // pool stage recheck after mutex lock
pthread_mutex_unlock(&pool->mutex);
free(new_task);
return false; // adding task is prohibited after RUNNING
}
task_queue_push(pool, new_task); // push into task queue
bool signal_flag = false;
if (pool->status > PREPARING) { // only send active signal at RUNNING stage
signal_flag = true;
}
pthread_mutex_unlock(&pool->mutex); // send signal after mutex unlock -> reduce thread churn
/// send signal to active blocking thread
if (signal_flag) {
pthread_cond_signal(&pool->task_queue_not_empty); // active one blocking thread
}
return true; // task push success
}
void* thread_entry(void *pool_ptr) { // main loop for sub thread
pool_t *pool = (pool_t*)pool_ptr;
while (pool->status != EXITING) { // loop until enter EXITING stage
/// pop a task and execute it
task_t *task = task_queue_pop(pool); // pop one task -> blocking function
task->entry(task->arg); // start running task function
free(task); // free allocated memory
/// mark thread as idle
pthread_mutex_lock(&pool->mutex);
--pool->busy_thr_num; // change busy thread number
bool busy_flag = (pool->busy_thr_num == 0);
pthread_mutex_unlock(&pool->mutex);
if (busy_flag) {
pthread_cond_signal(&pool->without_busy_thread);
}
}
pthread_exit(NULL); // sub thread exit
}
void tiny_pool_boot(pool_t *pool) {
/// pre-check to avoid invalid mutex waiting
if (pool->status != PREPARING) {
return; // only allow to boot at PREPARING stage
}
/// handle pool booting
pthread_mutex_lock(&pool->mutex);
if (pool->status != PREPARING) {
pthread_mutex_unlock(&pool->mutex);
return; // only allow to boot at PREPARING stage
}
pool->status = RUNNING;
for (uint32_t i = 0; i < pool->thread_num; ++i) { // create working threads
pthread_create(&(pool->threads[i]), NULL, thread_entry, (void*)pool);
}
pthread_mutex_unlock(&pool->mutex);
}
bool tiny_pool_join(pool_t *pool) {
/// pre-check to avoid invalid mutex waiting
if (pool->status != RUNNING) {
return false;
}
/// handle pool threads joining
pthread_mutex_lock(&pool->mutex);
if (pool->status != RUNNING) {
pthread_mutex_unlock(&pool->mutex);
return false; // only allow to join at RUNNING stage
}
pool->status = STOPPING;
while (pool->task_queue_front != NULL) { // wait empty task queue
pthread_cond_wait(&pool->task_queue_empty, &pool->mutex);
}
pool->status = EXITING;
while (pool->busy_thr_num != 0) { // wait all threads exit
pthread_cond_wait(&pool->without_busy_thread, &pool->mutex);
}
pthread_mutex_unlock(&pool->mutex); // prevent other functions blocking waiting
for (uint32_t i = 0; i < pool->thread_num; ++i) {
pthread_cond_broadcast(&pool->task_queue_not_empty); // trigger idle threads
pthread_join(pool->threads[i], NULL);
}
free_tiny_pool(pool); // release thread pool
return true;
}