Browse Source

update: enhance cpp usage demo

master
Dnomd343 1 year ago
parent
commit
dc2f088de0
  1. 175
      demo.cc
  2. 84
      src/tiny_pool.c

175
demo.cc

@ -1,144 +1,91 @@
#include <iostream>
#include <stdexcept>
#include "tiny_pool.h"
#include <unistd.h>
#include <future>
#include <functional>
void demo_func(void *arg) {
std::cout << *(int*)arg << std::endl;
sleep(1);
}
int test_func(char c) {
printf("char -> `%c`\n", c);
return 233;
}
void convert(void *f) {
// auto *func = ( std::function<void()> * ) f;
//
// (*func)();
( *static_cast< std::function<void()>* >(f) )();
// auto t = (std::function<void(void*)>*)task;
//
// (*t)(NULL);
free(f);
}
#include "tiny_pool.h"
class TinyPool {
pool_t *pool;
public:
TinyPool(uint32_t size);
~TinyPool();
template <typename Func, typename ...Args>
auto submit(Func &&func, Args &&...args) -> std::future<decltype(func(args...))> {
std::function<decltype(func(args...))()> wrap_func = std::bind(
std::forward<Func>(func), std::forward<Args>(args)...
);
auto func_ptr = std::make_shared<
std::packaged_task<decltype(func(args...))()>
>(wrap_func);
// std::function<void(void*)> task_func = [func_ptr](void*) {
// (*func_ptr)();
// };
auto *t = new std::function<void()>;
*t = [func_ptr](){
(*func_ptr)();
};
// TODO: run task_func
// (*t)();
// convert( (void*)t );
// auto t = reinterpret_cast<void(void*)>(task_func);
tiny_pool_submit(pool, convert, (void*)t);
return func_ptr->get_future();
static void wrap_c_func(void *func) {
(*static_cast<std::function<void()>*>(func))();
free(func);
}
public:
void boot() { tiny_pool_boot(pool); }
void join() { tiny_pool_join(pool); }
void kill() { tiny_pool_kill(pool); }
void detach() { tiny_pool_detach(pool); }
explicit TinyPool(uint32_t size) { pool = tiny_pool_create(size); }
void boot() {
tiny_pool_boot(pool);
}
template <typename Func, typename ...Args>
auto submit(Func &&func, Args &&...args) -> std::future<decltype(func(args...))>;
};
template <typename Func, typename ...Args>
auto TinyPool::submit(Func &&func, Args &&...args) -> std::future<decltype(func(args...))> {
std::function<decltype(func(args...))()> wrap_func = std::bind(
std::forward<Func>(func), std::forward<Args>(args)...
);
auto func_ptr = std::make_shared<
std::packaged_task<decltype(func(args...))()>
>(wrap_func);
tiny_pool_submit(pool, TinyPool::wrap_c_func, (void*)(
new std::function<void()> (
[func_ptr]() { (*func_ptr)(); }
)
));
return func_ptr->get_future();
}
/// -------------------------------- start test --------------------------------
#include <iostream>
#include <unistd.h>
TinyPool::TinyPool(uint32_t size) {
pool = tiny_pool_create(size);
// if (pool == (void*)0) { // NULL in c-style
// throw std::runtime_error("thread pool create error");
// }
}
TinyPool::~TinyPool() {
tiny_pool_kill(pool);
int test_func(char c) {
int num = c - '0';
printf("char -> `%c`\n", c);
for (int i = 0; i < num; ++i) {
printf("task %d running...\n", num);
usleep(500 * 1000);
}
return num;
}
int main() {
std::cout << "tiny thread pool demo start" << std::endl;
auto p = TinyPool(1);
auto pool = TinyPool(3);
auto f = p.submit(test_func, 'A');
auto f0 = pool.submit(test_func, '0');
auto f1 = pool.submit(test_func, '1');
auto f2 = pool.submit(test_func, '2');
auto f3 = pool.submit(test_func, '3');
p.boot();
pool.boot();
std::cout << "get future: " << f.get() << std::endl;
auto f4 = pool.submit(test_func, '4');
auto f5 = pool.submit(test_func, '5');
printf("get future: %d\n", f0.get());
printf("get future: %d\n", f4.get());
printf("get future: %d\n", f3.get());
// auto f = submit(test_func, 'D');
auto f6 = pool.submit(test_func, '6');
auto f7 = pool.submit(test_func, '7');
auto f8 = pool.submit(test_func, '8');
auto f9 = pool.submit(test_func, '9');
// auto pool = tiny_pool_create(3);
printf("get future: %d\n", f2.get());
printf("get future: %d\n", f5.get());
printf("get future: %d\n", f8.get());
// int dat[] = {0, 1, 2, 3, 4, 5, 6};
// tiny_pool_submit(pool, demo_func, (void*)&dat[0]);
// tiny_pool_submit(pool, demo_func, (void*)&dat[1]);
// tiny_pool_submit(pool, demo_func, (void*)&dat[2]);
// tiny_pool_submit(pool, demo_func, (void*)&dat[3]);
// tiny_pool_submit(pool, demo_func, (void*)&dat[4]);
// tiny_pool_submit(pool, demo_func, (void*)&dat[5]);
// tiny_pool_submit(pool, demo_func, (void*)&dat[6]);
//
// tiny_pool_boot(pool);
//
// tiny_pool_join(pool);
pool.join();
sleep(10);
printf("get future: %d\n", f6.get());
printf("get future: %d\n", f1.get());
printf("get future: %d\n", f9.get());
printf("get future: %d\n", f7.get());
std::cout << "tiny thread pool demo exit" << std::endl;
return 0;

84
src/tiny_pool.c

@ -56,7 +56,7 @@ pool_t* tiny_pool_create(uint32_t size) {
}
void task_queue_push(pool_t *pool, task_t *task) {
printf("push new task %d\n", *(int*)task->arg);
// printf("push new task %d\n", *(int*)task->arg);
if (pool->task_queue_rear == NULL) { // task queue is empty
pool->task_queue_front = task;
pool->task_queue_rear = task; // init task queue with one element
@ -65,25 +65,25 @@ void task_queue_push(pool_t *pool, task_t *task) {
pool->task_queue_rear = task;
}
++pool->task_queue_size;
printf("push success -> size = %d\n", pool->task_queue_size);
// printf("push success -> size = %d\n", pool->task_queue_size);
}
task_t* task_queue_pop(pool_t *pool) { // pop one task with blocking wait
printf("%lu -> start pop task\n", pthread_self());
// printf("%lu -> start pop task\n", pthread_self());
/// wait until task queue not empty
pthread_mutex_lock(&pool->mutex); // lock pool struct
while (pool->task_queue_front == NULL) { // loop until task queue not empty
printf("%lu -> pop wait\n", pthread_self());
// printf("%lu -> pop wait\n", pthread_self());
pthread_cond_wait(&pool->task_queue_not_empty, &pool->mutex); // wait new task added
printf("%lu -> pop exit wait\n", pthread_self());
// printf("%lu -> pop exit wait\n", pthread_self());
if (pool->status == EXITING) {
pthread_mutex_unlock(&pool->mutex);
printf("%lu -> sub thread exit from idle\n", pthread_self());
// printf("%lu -> sub thread exit from idle\n", pthread_self());
pthread_exit(NULL); // sub thread exit at EXITING stage
}
}
printf("%lu -> pop new task %d\n", pthread_self(), *(int*)pool->task_queue_front->arg);
// printf("%lu -> pop new task %d\n", pthread_self(), *(int*)pool->task_queue_front->arg);
/// pop first task from queue
bool empty_flag = false;
@ -95,12 +95,12 @@ task_t* task_queue_pop(pool_t *pool) { // pop one task with blocking wait
}
--pool->task_queue_size;
++pool->busy_thr_num; // task must pop by one ready thread
printf("%lu -> pop success -> size = %d\n", pthread_self(), pool->task_queue_size);
// printf("%lu -> pop success -> size = %d\n", pthread_self(), pool->task_queue_size);
pthread_mutex_unlock(&pool->mutex); // unlock task queue
/// send signal to active blocking thread
if (empty_flag) { // send signal after mutex unlocked
printf("signal -> task queue empty\n");
// printf("signal -> task queue empty\n");
pthread_cond_signal(&pool->task_queue_empty); // active pool join thread
}
return front; // success pop one task
@ -137,7 +137,7 @@ bool tiny_pool_submit(pool_t *pool, void (*func)(void*), void *arg) {
/// send signal to active blocking thread
if (signal_flag) {
printf("signal -> queue not empty\n");
// printf("signal -> queue not empty\n");
pthread_cond_signal(&pool->task_queue_not_empty); // active one blocking thread
}
return true; // task push success
@ -145,9 +145,9 @@ bool tiny_pool_submit(pool_t *pool, void (*func)(void*), void *arg) {
void* thread_entry(void *pool_ptr) { // main loop for sub thread
pool_t *pool = (pool_t*)pool_ptr;
printf("%lu -> sub thread begin\n", pthread_self());
// printf("%lu -> sub thread begin\n", pthread_self());
while (pool->status != EXITING) { // loop until enter EXITING stage
printf("%lu -> sub thread working\n", pthread_self());
// printf("%lu -> sub thread working\n", pthread_self());
/// pop a task and execute it
task_t *task = task_queue_pop(pool); // pop one task -> blocking function
@ -163,7 +163,7 @@ void* thread_entry(void *pool_ptr) { // main loop for sub thread
pthread_cond_signal(&pool->without_busy_thread);
}
}
printf("%lu -> sub thread exit\n", pthread_self());
// printf("%lu -> sub thread exit\n", pthread_self());
pthread_exit(NULL); // sub thread exit
}
@ -183,36 +183,30 @@ void tiny_pool_boot(pool_t *pool) {
for (uint32_t i = 0; i < pool->thread_num; ++i) { // create working threads
pthread_create(&(pool->threads[i]), NULL, thread_entry, (void*)pool);
}
printf("thread boot complete\n");
// printf("thread boot complete\n");
pthread_mutex_unlock(&pool->mutex);
}
void free_tiny_pool(pool_t *pool) {
printf("start free pool resource\n");
// pthread_mutex_unlock(&pool->mutex);
// printf("flag 1\n");
// printf("start free pool resource\n");
pthread_cond_destroy(&pool->without_busy_thread);
pthread_cond_destroy(&pool->task_queue_not_empty);
pthread_cond_destroy(&pool->task_queue_empty);
// printf("flag 2\n");
pthread_mutex_destroy(&pool->mutex);
free(pool->threads);
free(pool);
printf("free pool resource complete\n");
// printf("free pool resource complete\n");
}
#include <unistd.h>
bool tiny_pool_join(pool_t *pool) {
printf("start pool join\n");
// printf("start pool join\n");
/// pre-check to avoid invalid mutex waiting
if (pool->status != RUNNING) {
@ -226,47 +220,35 @@ bool tiny_pool_join(pool_t *pool) {
return false; // only allow to join at RUNNING stage
}
pool->status = STOPPING;
printf("pool status -> STOPPING\n");
// printf("pool status -> STOPPING\n");
printf("wait task queue\n");
// printf("wait task queue\n");
while (pool->task_queue_front != NULL) {
pthread_cond_wait(&pool->task_queue_empty, &pool->mutex);
}
printf("task queue empty\n");
// printf("task queue empty\n");
pool->status = EXITING;
printf("pool status -> EXITING\n");
// printf("pool status -> EXITING\n");
printf("start wait busy threads -> %d\n", pool->busy_thr_num);
// printf("start wait busy threads -> %d\n", pool->busy_thr_num);
while (pool->busy_thr_num != 0) {
pthread_cond_wait(&pool->without_busy_thread, &pool->mutex);
}
printf("all thread idle\n");
// printf("all thread idle\n");
pthread_mutex_unlock(&pool->mutex); // prevent other functions blocking waiting
printf("start sub threads joining\n");
// for (uint32_t i = 0; i < pool->thread_num; ++i) {
// pthread_cond_broadcast(&pool->task_queue_not_empty); // trigger idle threads
// while (pthread_mutex_trylock(&pool->mutex)) {
// printf("try lock again\n");
// }
// pthread_mutex_unlock(&pool->mutex);
// }
//
// printf("free threads complete\n");
// printf("start sub threads joining\n");
for (uint32_t i = 0; i < pool->thread_num; ++i) {
pthread_cond_broadcast(&pool->task_queue_not_empty); // trigger idle threads
printf("start join sub thread %lu\n", pool->threads[i]);
// printf("start join sub thread %lu\n", pool->threads[i]);
pthread_join(pool->threads[i], NULL);
printf("sub thread %lu joined\n", pool->threads[i]);
// printf("sub thread %lu joined\n", pool->threads[i]);
}
// sleep(10);
printf("sub threads join complete\n");
// printf("sub threads join complete\n");
free_tiny_pool(pool);
@ -274,27 +256,27 @@ bool tiny_pool_join(pool_t *pool) {
}
void* run_pool_join(void *pool) {
printf("run pool join from detach\n");
// printf("run pool join from detach\n");
tiny_pool_join((pool_t*)pool);
printf("pool join complete\n");
// printf("pool join complete\n");
pthread_exit(NULL);
}
void tiny_pool_detach(pool_t *pool) {
pthread_t tid;
printf("run pool detach\n");
// printf("run pool detach\n");
pthread_create(&tid, NULL, run_pool_join, (void*)pool);
pthread_detach(tid);
}
void tiny_pool_kill(pool_t *pool) {
printf("run pool kill\n");
// printf("run pool kill\n");
if (pool->status > PREPARING) {
printf("kill sub threads\n");
// printf("kill sub threads\n");
for (uint32_t i = 0; i < pool->thread_num; ++i) {
pthread_cancel(pool->threads[i]);
}
printf("kill complete\n");
// printf("kill complete\n");
}
free_tiny_pool(pool);
}

Loading…
Cancel
Save