diff --git a/main.c b/main.c index 08fe349..67aa713 100644 --- a/main.c +++ b/main.c @@ -1,23 +1,46 @@ #include #include +#include #include "tiny_pool.h" -void* demo_fun(void *i) { - printf("demo func -> %d\n", *(int*)i); - return NULL; +void demo_fun(void *i) { + + int k = *(int*)i; + + printf("demo func sleep %ds\n", k); + + sleep(k); + + printf("demo func %d wake up\n", k); + } int main() { - pthread_t tid; - - int d = 123; +// pthread_t tid; pool_t *pool = tiny_pool_create(0); - tiny_pool_submit(pool, demo_fun, (void*)&d); + int dat[] = {1, 2, 3, 4, 5, 6}; + tiny_pool_submit(pool, demo_fun, (void*)&dat[0]); + tiny_pool_submit(pool, demo_fun, (void*)&dat[1]); + + printf("pool booting\n"); + tiny_pool_boot(pool); + printf("pool running\n"); + + printf("main thread sleep\n"); + sleep(5); + printf("main thread wake up\n"); + + tiny_pool_submit(pool, demo_fun, (void*)&dat[2]); + tiny_pool_submit(pool, demo_fun, (void*)&dat[3]); + tiny_pool_submit(pool, demo_fun, (void*)&dat[4]); + // TODO: tiny pool join + sleep(15); + // TODO: tiny pool destroy return 0; } diff --git a/tiny_pool.c b/tiny_pool.c index 7f76da7..8a015bb 100644 --- a/tiny_pool.c +++ b/tiny_pool.c @@ -9,55 +9,94 @@ pool_t* tiny_pool_create(uint32_t size) { pool->threads[i] = 0; } + pool->status = PREPARING; + pthread_mutex_init(&pool->busy_thread_num_mutex, NULL); + pool->task_queue_front = NULL; pool->task_queue_rear = NULL; pool->task_queue_size = 0; + pthread_mutex_init(&pool->task_queue_busy, NULL); + +// pthread_cond_init(&pool->task_queue_empty, NULL); + pthread_cond_init(&pool->task_queue_not_empty, NULL); return pool; } -void tiny_pool_submit(pool_t *pool, void* (*func)(void*), void *arg) { - - task_t *new_task = (task_t*)malloc(sizeof(task_t)); +void task_queue_push(pool_t *pool, task_t *task) { - new_task->func = func; - new_task->arg = arg; - new_task->next = NULL; + printf("push one task\n"); // TODO: lock task queue + pthread_mutex_lock(&pool->task_queue_busy); - if (pool->task_queue_rear == NULL) { // queue without element + if (pool->task_queue_rear == NULL) { // empty queue - pool->task_queue_front = new_task; - pool->task_queue_rear = new_task; + pool->task_queue_front = task; + pool->task_queue_rear = task; - } else { // queue emplace back + } else { // queue with element - pool->task_queue_rear->next = new_task; - pool->task_queue_rear = new_task; + pool->task_queue_rear->next = task; + pool->task_queue_rear = task; } ++pool->task_queue_size; // TODO: unlock task queue + pthread_mutex_unlock(&pool->task_queue_busy); + + printf("push success\n"); + + if (pool->status == RUNNING) { + + // TODO: send cond signal + + printf("send signal -> queue not empty\n"); + + pthread_cond_signal(&pool->task_queue_not_empty); + + } + + +} + +void tiny_pool_submit(pool_t *pool, void (*func)(void*), void *arg) { + + task_t *new_task = (task_t*)malloc(sizeof(task_t)); + + new_task->func = func; + new_task->arg = arg; + new_task->next = NULL; + + // TODO: new task push into task queue + task_queue_push(pool, new_task); } task_t* task_queue_pop(pool_t *pool) { + printf("try pop one task\n"); + // TODO: lock task queue + pthread_mutex_lock(&pool->task_queue_busy); + + while (pool->task_queue_front == NULL) { - if (pool->task_queue_front == NULL) { + printf("pop enter wait\n"); - return NULL; // pop failed -> empty queue + // TODO: wait new task added + pthread_cond_wait(&pool->task_queue_not_empty, &pool->task_queue_busy); } - task_t *tmp = pool->task_queue_front; + printf("pop exit wait\n"); + + task_t *front = pool->task_queue_front; - if (pool->task_queue_front == pool->task_queue_rear) { + if (pool->task_queue_front == pool->task_queue_rear) { // only one element // queue is empty now pool->task_queue_front = NULL; @@ -65,18 +104,21 @@ task_t* task_queue_pop(pool_t *pool) { } else { - pool->task_queue_front = tmp->next; + pool->task_queue_front = front->next; } // TODO: unlock task queue + pthread_mutex_unlock(&pool->task_queue_busy); - return tmp; + printf("pop success\n"); + + return front; } -void* thread_working() { +void* thread_entry(void *pool_ptr) { // TODO: main loop for one thread @@ -85,6 +127,23 @@ void* thread_working() { // TODO: pop one task --failed--> blocking wait // --success--> start running and then free task_t + pool_t *pool = (pool_t*)pool_ptr; + + while (pool->status != EXITING) { + + printf("thread working\n"); + + task_t *task = task_queue_pop(pool); + + // task working + task->func(task->arg); + + free(task); + + } + + printf("sub thread exit\n"); + return NULL; } @@ -96,5 +155,24 @@ void tiny_pool_boot(pool_t *pool) { // TODO: create N work-threads (using N = 8 in dev) + // TODO: avoid booting multi-times + + + pthread_mutex_lock(&pool->task_queue_busy); + + for (uint32_t i = 0; i < 8; ++i) { + + printf("start thread %d\n", i); + + pthread_create(&(pool->threads[i]), NULL, thread_entry, (void*)pool); + + } + + printf("thread boot complete\n"); + + pool->status = RUNNING; + + pthread_mutex_unlock(&pool->task_queue_busy); + } diff --git a/tiny_pool.h b/tiny_pool.h index 2c8a01d..9bc8ce5 100644 --- a/tiny_pool.h +++ b/tiny_pool.h @@ -5,27 +5,41 @@ #include typedef struct task_t { - void* (*func)(void*); + void (*func)(void*); void *arg; struct task_t *next; } task_t; +enum tiny_pool_status { + PREPARING, + RUNNING, + EXITING, +}; typedef struct { pthread_t threads[8]; + enum tiny_pool_status status; + pthread_mutex_t status_changing; + + uint32_t busy_thread_num; + pthread_mutex_t busy_thread_num_mutex; + task_t *task_queue_front; task_t *task_queue_rear; uint32_t task_queue_size; pthread_mutex_t task_queue_busy; +// pthread_cond_t task_queue_empty; + pthread_cond_t task_queue_not_empty; + } pool_t; pool_t* tiny_pool_create(uint32_t size); -void tiny_pool_submit(pool_t *pool, void* (*func)(void*), void *arg); +void tiny_pool_submit(pool_t *pool, void (*func)(void*), void *arg); // TODO: confirm just run once void tiny_pool_boot(pool_t *pool);