From dc2f088de0ab6097649fbf3a63b59376fa95a000 Mon Sep 17 00:00:00 2001 From: Dnomd343 Date: Sun, 5 Feb 2023 21:38:28 +0800 Subject: [PATCH] update: enhance cpp usage demo --- demo.cc | 175 +++++++++++++++++------------------------------- src/tiny_pool.c | 84 +++++++++-------------- 2 files changed, 94 insertions(+), 165 deletions(-) diff --git a/demo.cc b/demo.cc index d042a6d..7c64aa8 100644 --- a/demo.cc +++ b/demo.cc @@ -1,144 +1,91 @@ -#include -#include -#include "tiny_pool.h" - -#include #include #include - -void demo_func(void *arg) { - - std::cout << *(int*)arg << std::endl; - - sleep(1); - -} - -int test_func(char c) { - - - printf("char -> `%c`\n", c); - - return 233; - -} - - -void convert(void *f) { - -// auto *func = ( std::function * ) f; -// -// (*func)(); - - ( *static_cast< std::function* >(f) )(); - -// auto t = (std::function*)task; -// -// (*t)(NULL); - - free(f); - -} +#include "tiny_pool.h" class TinyPool { pool_t *pool; -public: - - TinyPool(uint32_t size); - - ~TinyPool(); - - - template - auto submit(Func &&func, Args &&...args) -> std::future { - - std::function wrap_func = std::bind( - std::forward(func), std::forward(args)... - ); - - auto func_ptr = std::make_shared< - std::packaged_task - >(wrap_func); - -// std::function task_func = [func_ptr](void*) { -// (*func_ptr)(); -// }; - - auto *t = new std::function; - *t = [func_ptr](){ - (*func_ptr)(); - }; - - - // TODO: run task_func - -// (*t)(); - -// convert( (void*)t ); - -// auto t = reinterpret_cast(task_func); - - tiny_pool_submit(pool, convert, (void*)t); - - return func_ptr->get_future(); - + static void wrap_c_func(void *func) { + (*static_cast*>(func))(); + free(func); } +public: + void boot() { tiny_pool_boot(pool); } + void join() { tiny_pool_join(pool); } + void kill() { tiny_pool_kill(pool); } + void detach() { tiny_pool_detach(pool); } + explicit TinyPool(uint32_t size) { pool = tiny_pool_create(size); } - void boot() { - tiny_pool_boot(pool); - } - + template + auto submit(Func &&func, Args &&...args) -> std::future; }; +template +auto TinyPool::submit(Func &&func, Args &&...args) -> std::future { + std::function wrap_func = std::bind( + std::forward(func), std::forward(args)... + ); + auto func_ptr = std::make_shared< + std::packaged_task + >(wrap_func); + tiny_pool_submit(pool, TinyPool::wrap_c_func, (void*)( + new std::function ( + [func_ptr]() { (*func_ptr)(); } + ) + )); + return func_ptr->get_future(); +} +/// -------------------------------- start test -------------------------------- +#include +#include -TinyPool::TinyPool(uint32_t size) { - - pool = tiny_pool_create(size); - -// if (pool == (void*)0) { // NULL in c-style -// throw std::runtime_error("thread pool create error"); -// } - -} - -TinyPool::~TinyPool() { - tiny_pool_kill(pool); +int test_func(char c) { + int num = c - '0'; + printf("char -> `%c`\n", c); + for (int i = 0; i < num; ++i) { + printf("task %d running...\n", num); + usleep(500 * 1000); + } + return num; } int main() { std::cout << "tiny thread pool demo start" << std::endl; - auto p = TinyPool(1); + auto pool = TinyPool(3); - auto f = p.submit(test_func, 'A'); + auto f0 = pool.submit(test_func, '0'); + auto f1 = pool.submit(test_func, '1'); + auto f2 = pool.submit(test_func, '2'); + auto f3 = pool.submit(test_func, '3'); - p.boot(); + pool.boot(); - std::cout << "get future: " << f.get() << std::endl; + auto f4 = pool.submit(test_func, '4'); + auto f5 = pool.submit(test_func, '5'); + printf("get future: %d\n", f0.get()); + printf("get future: %d\n", f4.get()); + printf("get future: %d\n", f3.get()); -// auto f = submit(test_func, 'D'); + auto f6 = pool.submit(test_func, '6'); + auto f7 = pool.submit(test_func, '7'); + auto f8 = pool.submit(test_func, '8'); + auto f9 = pool.submit(test_func, '9'); -// auto pool = tiny_pool_create(3); + printf("get future: %d\n", f2.get()); + printf("get future: %d\n", f5.get()); + printf("get future: %d\n", f8.get()); -// int dat[] = {0, 1, 2, 3, 4, 5, 6}; -// tiny_pool_submit(pool, demo_func, (void*)&dat[0]); -// tiny_pool_submit(pool, demo_func, (void*)&dat[1]); -// tiny_pool_submit(pool, demo_func, (void*)&dat[2]); -// tiny_pool_submit(pool, demo_func, (void*)&dat[3]); -// tiny_pool_submit(pool, demo_func, (void*)&dat[4]); -// tiny_pool_submit(pool, demo_func, (void*)&dat[5]); -// tiny_pool_submit(pool, demo_func, (void*)&dat[6]); -// -// tiny_pool_boot(pool); -// -// tiny_pool_join(pool); + pool.join(); - sleep(10); + printf("get future: %d\n", f6.get()); + printf("get future: %d\n", f1.get()); + printf("get future: %d\n", f9.get()); + printf("get future: %d\n", f7.get()); std::cout << "tiny thread pool demo exit" << std::endl; return 0; diff --git a/src/tiny_pool.c b/src/tiny_pool.c index 97e0d83..955a26b 100644 --- a/src/tiny_pool.c +++ b/src/tiny_pool.c @@ -56,7 +56,7 @@ pool_t* tiny_pool_create(uint32_t size) { } void task_queue_push(pool_t *pool, task_t *task) { - printf("push new task %d\n", *(int*)task->arg); +// printf("push new task %d\n", *(int*)task->arg); if (pool->task_queue_rear == NULL) { // task queue is empty pool->task_queue_front = task; pool->task_queue_rear = task; // init task queue with one element @@ -65,25 +65,25 @@ void task_queue_push(pool_t *pool, task_t *task) { pool->task_queue_rear = task; } ++pool->task_queue_size; - printf("push success -> size = %d\n", pool->task_queue_size); +// printf("push success -> size = %d\n", pool->task_queue_size); } task_t* task_queue_pop(pool_t *pool) { // pop one task with blocking wait - printf("%lu -> start pop task\n", pthread_self()); +// printf("%lu -> start pop task\n", pthread_self()); /// wait until task queue not empty pthread_mutex_lock(&pool->mutex); // lock pool struct while (pool->task_queue_front == NULL) { // loop until task queue not empty - printf("%lu -> pop wait\n", pthread_self()); +// printf("%lu -> pop wait\n", pthread_self()); pthread_cond_wait(&pool->task_queue_not_empty, &pool->mutex); // wait new task added - printf("%lu -> pop exit wait\n", pthread_self()); +// printf("%lu -> pop exit wait\n", pthread_self()); if (pool->status == EXITING) { pthread_mutex_unlock(&pool->mutex); - printf("%lu -> sub thread exit from idle\n", pthread_self()); +// printf("%lu -> sub thread exit from idle\n", pthread_self()); pthread_exit(NULL); // sub thread exit at EXITING stage } } - printf("%lu -> pop new task %d\n", pthread_self(), *(int*)pool->task_queue_front->arg); +// printf("%lu -> pop new task %d\n", pthread_self(), *(int*)pool->task_queue_front->arg); /// pop first task from queue bool empty_flag = false; @@ -95,12 +95,12 @@ task_t* task_queue_pop(pool_t *pool) { // pop one task with blocking wait } --pool->task_queue_size; ++pool->busy_thr_num; // task must pop by one ready thread - printf("%lu -> pop success -> size = %d\n", pthread_self(), pool->task_queue_size); +// printf("%lu -> pop success -> size = %d\n", pthread_self(), pool->task_queue_size); pthread_mutex_unlock(&pool->mutex); // unlock task queue /// send signal to active blocking thread if (empty_flag) { // send signal after mutex unlocked - printf("signal -> task queue empty\n"); +// printf("signal -> task queue empty\n"); pthread_cond_signal(&pool->task_queue_empty); // active pool join thread } return front; // success pop one task @@ -137,7 +137,7 @@ bool tiny_pool_submit(pool_t *pool, void (*func)(void*), void *arg) { /// send signal to active blocking thread if (signal_flag) { - printf("signal -> queue not empty\n"); +// printf("signal -> queue not empty\n"); pthread_cond_signal(&pool->task_queue_not_empty); // active one blocking thread } return true; // task push success @@ -145,9 +145,9 @@ bool tiny_pool_submit(pool_t *pool, void (*func)(void*), void *arg) { void* thread_entry(void *pool_ptr) { // main loop for sub thread pool_t *pool = (pool_t*)pool_ptr; - printf("%lu -> sub thread begin\n", pthread_self()); +// printf("%lu -> sub thread begin\n", pthread_self()); while (pool->status != EXITING) { // loop until enter EXITING stage - printf("%lu -> sub thread working\n", pthread_self()); +// printf("%lu -> sub thread working\n", pthread_self()); /// pop a task and execute it task_t *task = task_queue_pop(pool); // pop one task -> blocking function @@ -163,7 +163,7 @@ void* thread_entry(void *pool_ptr) { // main loop for sub thread pthread_cond_signal(&pool->without_busy_thread); } } - printf("%lu -> sub thread exit\n", pthread_self()); +// printf("%lu -> sub thread exit\n", pthread_self()); pthread_exit(NULL); // sub thread exit } @@ -183,36 +183,30 @@ void tiny_pool_boot(pool_t *pool) { for (uint32_t i = 0; i < pool->thread_num; ++i) { // create working threads pthread_create(&(pool->threads[i]), NULL, thread_entry, (void*)pool); } - printf("thread boot complete\n"); +// printf("thread boot complete\n"); pthread_mutex_unlock(&pool->mutex); } void free_tiny_pool(pool_t *pool) { - printf("start free pool resource\n"); - -// pthread_mutex_unlock(&pool->mutex); - -// printf("flag 1\n"); +// printf("start free pool resource\n"); pthread_cond_destroy(&pool->without_busy_thread); pthread_cond_destroy(&pool->task_queue_not_empty); pthread_cond_destroy(&pool->task_queue_empty); -// printf("flag 2\n"); - pthread_mutex_destroy(&pool->mutex); free(pool->threads); free(pool); - printf("free pool resource complete\n"); +// printf("free pool resource complete\n"); } #include bool tiny_pool_join(pool_t *pool) { - printf("start pool join\n"); +// printf("start pool join\n"); /// pre-check to avoid invalid mutex waiting if (pool->status != RUNNING) { @@ -226,47 +220,35 @@ bool tiny_pool_join(pool_t *pool) { return false; // only allow to join at RUNNING stage } pool->status = STOPPING; - printf("pool status -> STOPPING\n"); +// printf("pool status -> STOPPING\n"); - printf("wait task queue\n"); +// printf("wait task queue\n"); while (pool->task_queue_front != NULL) { pthread_cond_wait(&pool->task_queue_empty, &pool->mutex); } - printf("task queue empty\n"); +// printf("task queue empty\n"); pool->status = EXITING; - printf("pool status -> EXITING\n"); +// printf("pool status -> EXITING\n"); - printf("start wait busy threads -> %d\n", pool->busy_thr_num); +// printf("start wait busy threads -> %d\n", pool->busy_thr_num); while (pool->busy_thr_num != 0) { pthread_cond_wait(&pool->without_busy_thread, &pool->mutex); } - printf("all thread idle\n"); +// printf("all thread idle\n"); pthread_mutex_unlock(&pool->mutex); // prevent other functions blocking waiting - printf("start sub threads joining\n"); - -// for (uint32_t i = 0; i < pool->thread_num; ++i) { -// pthread_cond_broadcast(&pool->task_queue_not_empty); // trigger idle threads -// while (pthread_mutex_trylock(&pool->mutex)) { -// printf("try lock again\n"); -// } -// pthread_mutex_unlock(&pool->mutex); -// } -// -// printf("free threads complete\n"); +// printf("start sub threads joining\n"); for (uint32_t i = 0; i < pool->thread_num; ++i) { pthread_cond_broadcast(&pool->task_queue_not_empty); // trigger idle threads - printf("start join sub thread %lu\n", pool->threads[i]); +// printf("start join sub thread %lu\n", pool->threads[i]); pthread_join(pool->threads[i], NULL); - printf("sub thread %lu joined\n", pool->threads[i]); +// printf("sub thread %lu joined\n", pool->threads[i]); } -// sleep(10); - - printf("sub threads join complete\n"); +// printf("sub threads join complete\n"); free_tiny_pool(pool); @@ -274,27 +256,27 @@ bool tiny_pool_join(pool_t *pool) { } void* run_pool_join(void *pool) { - printf("run pool join from detach\n"); +// printf("run pool join from detach\n"); tiny_pool_join((pool_t*)pool); - printf("pool join complete\n"); +// printf("pool join complete\n"); pthread_exit(NULL); } void tiny_pool_detach(pool_t *pool) { pthread_t tid; - printf("run pool detach\n"); +// printf("run pool detach\n"); pthread_create(&tid, NULL, run_pool_join, (void*)pool); pthread_detach(tid); } void tiny_pool_kill(pool_t *pool) { - printf("run pool kill\n"); +// printf("run pool kill\n"); if (pool->status > PREPARING) { - printf("kill sub threads\n"); +// printf("kill sub threads\n"); for (uint32_t i = 0; i < pool->thread_num; ++i) { pthread_cancel(pool->threads[i]); } - printf("kill complete\n"); +// printf("kill complete\n"); } free_tiny_pool(pool); }