Browse Source

feat: thread pool booting

master
Dnomd343 1 year ago
parent
commit
dacc9f0484
  1. 37
      main.c
  2. 116
      tiny_pool.c
  3. 18
      tiny_pool.h

37
main.c

@ -1,23 +1,46 @@
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
#include "tiny_pool.h"
void* demo_fun(void *i) {
printf("demo func -> %d\n", *(int*)i);
return NULL;
void demo_fun(void *i) {
int k = *(int*)i;
printf("demo func sleep %ds\n", k);
sleep(k);
printf("demo func %d wake up\n", k);
}
int main() {
pthread_t tid;
int d = 123;
// pthread_t tid;
pool_t *pool = tiny_pool_create(0);
tiny_pool_submit(pool, demo_fun, (void*)&d);
int dat[] = {1, 2, 3, 4, 5, 6};
tiny_pool_submit(pool, demo_fun, (void*)&dat[0]);
tiny_pool_submit(pool, demo_fun, (void*)&dat[1]);
printf("pool booting\n");
tiny_pool_boot(pool);
printf("pool running\n");
printf("main thread sleep\n");
sleep(5);
printf("main thread wake up\n");
tiny_pool_submit(pool, demo_fun, (void*)&dat[2]);
tiny_pool_submit(pool, demo_fun, (void*)&dat[3]);
tiny_pool_submit(pool, demo_fun, (void*)&dat[4]);
// TODO: tiny pool join
sleep(15);
// TODO: tiny pool destroy
return 0;
}

116
tiny_pool.c

@ -9,55 +9,94 @@ pool_t* tiny_pool_create(uint32_t size) {
pool->threads[i] = 0;
}
pool->status = PREPARING;
pthread_mutex_init(&pool->busy_thread_num_mutex, NULL);
pool->task_queue_front = NULL;
pool->task_queue_rear = NULL;
pool->task_queue_size = 0;
pthread_mutex_init(&pool->task_queue_busy, NULL);
// pthread_cond_init(&pool->task_queue_empty, NULL);
pthread_cond_init(&pool->task_queue_not_empty, NULL);
return pool;
}
void tiny_pool_submit(pool_t *pool, void* (*func)(void*), void *arg) {
task_t *new_task = (task_t*)malloc(sizeof(task_t));
void task_queue_push(pool_t *pool, task_t *task) {
new_task->func = func;
new_task->arg = arg;
new_task->next = NULL;
printf("push one task\n");
// TODO: lock task queue
pthread_mutex_lock(&pool->task_queue_busy);
if (pool->task_queue_rear == NULL) { // queue without element
if (pool->task_queue_rear == NULL) { // empty queue
pool->task_queue_front = new_task;
pool->task_queue_rear = new_task;
pool->task_queue_front = task;
pool->task_queue_rear = task;
} else { // queue emplace back
} else { // queue with element
pool->task_queue_rear->next = new_task;
pool->task_queue_rear = new_task;
pool->task_queue_rear->next = task;
pool->task_queue_rear = task;
}
++pool->task_queue_size;
// TODO: unlock task queue
pthread_mutex_unlock(&pool->task_queue_busy);
printf("push success\n");
if (pool->status == RUNNING) {
// TODO: send cond signal
printf("send signal -> queue not empty\n");
pthread_cond_signal(&pool->task_queue_not_empty);
}
}
void tiny_pool_submit(pool_t *pool, void (*func)(void*), void *arg) {
task_t *new_task = (task_t*)malloc(sizeof(task_t));
new_task->func = func;
new_task->arg = arg;
new_task->next = NULL;
// TODO: new task push into task queue
task_queue_push(pool, new_task);
}
task_t* task_queue_pop(pool_t *pool) {
printf("try pop one task\n");
// TODO: lock task queue
pthread_mutex_lock(&pool->task_queue_busy);
while (pool->task_queue_front == NULL) {
if (pool->task_queue_front == NULL) {
printf("pop enter wait\n");
return NULL; // pop failed -> empty queue
// TODO: wait new task added
pthread_cond_wait(&pool->task_queue_not_empty, &pool->task_queue_busy);
}
task_t *tmp = pool->task_queue_front;
printf("pop exit wait\n");
task_t *front = pool->task_queue_front;
if (pool->task_queue_front == pool->task_queue_rear) {
if (pool->task_queue_front == pool->task_queue_rear) { // only one element
// queue is empty now
pool->task_queue_front = NULL;
@ -65,18 +104,21 @@ task_t* task_queue_pop(pool_t *pool) {
} else {
pool->task_queue_front = tmp->next;
pool->task_queue_front = front->next;
}
// TODO: unlock task queue
pthread_mutex_unlock(&pool->task_queue_busy);
return tmp;
printf("pop success\n");
return front;
}
void* thread_working() {
void* thread_entry(void *pool_ptr) {
// TODO: main loop for one thread
@ -85,6 +127,23 @@ void* thread_working() {
// TODO: pop one task --failed--> blocking wait
// --success--> start running and then free task_t
pool_t *pool = (pool_t*)pool_ptr;
while (pool->status != EXITING) {
printf("thread working\n");
task_t *task = task_queue_pop(pool);
// task working
task->func(task->arg);
free(task);
}
printf("sub thread exit\n");
return NULL;
}
@ -96,5 +155,24 @@ void tiny_pool_boot(pool_t *pool) {
// TODO: create N work-threads (using N = 8 in dev)
// TODO: avoid booting multi-times
pthread_mutex_lock(&pool->task_queue_busy);
for (uint32_t i = 0; i < 8; ++i) {
printf("start thread %d\n", i);
pthread_create(&(pool->threads[i]), NULL, thread_entry, (void*)pool);
}
printf("thread boot complete\n");
pool->status = RUNNING;
pthread_mutex_unlock(&pool->task_queue_busy);
}

18
tiny_pool.h

@ -5,27 +5,41 @@
#include <pthread.h>
typedef struct task_t {
void* (*func)(void*);
void (*func)(void*);
void *arg;
struct task_t *next;
} task_t;
enum tiny_pool_status {
PREPARING,
RUNNING,
EXITING,
};
typedef struct {
pthread_t threads[8];
enum tiny_pool_status status;
pthread_mutex_t status_changing;
uint32_t busy_thread_num;
pthread_mutex_t busy_thread_num_mutex;
task_t *task_queue_front;
task_t *task_queue_rear;
uint32_t task_queue_size;
pthread_mutex_t task_queue_busy;
// pthread_cond_t task_queue_empty;
pthread_cond_t task_queue_not_empty;
} pool_t;
pool_t* tiny_pool_create(uint32_t size);
void tiny_pool_submit(pool_t *pool, void* (*func)(void*), void *arg);
void tiny_pool_submit(pool_t *pool, void (*func)(void*), void *arg);
// TODO: confirm just run once
void tiny_pool_boot(pool_t *pool);

Loading…
Cancel
Save