A thread pool is a collection of pre-allocated threads that manage the execution of
tasks in a concurrent system, improving efficiency by reusing threads instead of
creating new ones for each task. In a producer-consumer model, the producer generates
tasks, and the consumer threads in the pool handle their execution. This reduces the
overhead of thread creation and destruction and optimizes resource management, making
it ideal for high-concurrent environment like web servers or databases. The thread
pool controls the number of concurrent threads, ensuring scalable and efficient
performance, especially in systems with frequent or I/O-bound tasks.
How it works
Initialization
When a thread pool is created, a certain number of worker threads are pre-created and
kept idle, waiting to perform tasks. The number of threads is typically configurable,
and the pool can adjust this number based on demand.
Task Submission
Tasks are submitted to the thread pool, usually in the form of callable functions or
runnable tasks. These tasks are placed into a queue, where they wait until a worker
thread becomes available.
Task Execution
Once a thread is available, it picks up a task from the queue and begins execution.
The thread works on the task until it finishes, at which point the thread becomes
available again to pick up new tasks.
Thread Reuse
The primary advantage of thread pools is thread reuse. Rather than creating a new
thread each time a task needs to be executed (which can be costly in terms of system
resources), the threads in the pool are reused to execute multiple tasks over time.
This reduces overhead and improves efficiency, especially when tasks are short-lived
or frequent.
Task Queuing and Blocking
If all threads in the pool are busy when a new task arrives, the task will typically
be placed in a task queue and wait until a thread becomes available. In some
implementations, there may be limits on the queue size, after which new tasks might
be rejected or handled differently (e.g. by waiting or being executed in an alternate
manner).
Dynamic Sizing
Some thread pools can dynamically adjust the number of threads based on the system
load or the volume of tasks being submitted. For example, if there are a lot of tasks
waiting in the queue, the pool may create additional threads to handle the increased
load. Conversely, if the system is idle, threads may be reduced to save resources.
Shutdown and Cleanup
When the thread pool is no longer needed, it can be shut down. Any remaining tasks in
the queue may either be completed or discarded depending on the shutdown policy.
Threads are then safely terminated and cleaned up.
Optimal Threads
The optimal number of threads depends on various factors such as the type of tasks,
the system's hardware resources (e.g. CPU cores), and the workload characteristics.
CPU-bound Tasks
Optimal Threads = Number of CPU Cores
CPU-bound tasks are those that require significant processing power and do not spend
much time waiting for external resources (e.g. disk I/O or network requests). The
optimal number of threads for CPU-bound tasks is generally equal to the number of
available CPU cores. This is because each thread can run on a separate core, allowing
the system to perform tasks in parallel and avoid excessive content switching.
I/O-bound Tasks
Optimal Threads = Number of CPU Cores x Multiplier (2-3)
I/O-bound tasks, on the other hand, often spend more time waiting for I/O operations
(e.g. file reads/writes, network requests) than actually performing computations. In
these cases, the optimal number of threads can exceed the number of CPU cores because
threads will often be idle while waiting for I/O, allowing other threads to execute
in parallel. The multipler typically ranges from 2 to 3, depending on how much time
is spent waiting on I/O operations.
Empirical Testing
In practice, we use empirical testing to fine-tune the thread pool size by
experimenting with different configurations and observing how the system performs
under various loads. This involves measuring key metrics like task completion time,
CPU usage, and throughput, then adjusting the number of threads to find the optimal
balance between resource usage and performance.
Implementation
A typical thread pool implementation uses a queue (often blocking queue) to manage
tasks. Tasks are submitted to the queue, and worker threads, which are stored in
pool, retrieve and execute them. The worker threads are typically stored in
thread-safe linked list or a similar data structure. When no threads are available,
tasks remain in the queue until a thread becomes free. The pool also manages
synchronization using structures like mutex to ensure thread safety when accessing
shared resources like the task queue. Here is a concise implementation of the key
ideas:
#include<pthread.h>#include<stdatomic.h>#include<stdio.h>#include<stdlib.h>typedefstructspinlock_s{atomic_flagflag;// a flag used to implement the spinlock}spinlock_t;// initialize the spinlockvoidspinlock_init(spinlock_t*lock){atomic_flag_clear(&lock->flag);// unlocked}// acquire the spinlockvoidspinlock_lock(spinlock_t*lock){while(atomic_flag_test_and_set(&lock->flag)){// busy-wait while the lock is held by another thread}}// release the spinlockvoidspinlock_unlock(spinlock_t*lock){atomic_flag_clear(&lock->flag);// clear the flag (lock released)}// destroy the spinlockvoidspinlock_destroy(spinlock_t*lock){// nothing specific needs to be done for this spinlock implementation}// handler type for tasks that the thread pool will executetypedefvoid(*handler)(void*arg);// task structure that will be added to the task queuetypedefstructtask_s{void*next;// pointer to the next task in the queue (linked list)handlerfunc;// function pointer to the task handler (function to execute)void*arg;// argument to pass to the handler function}task_t;// task queue structure to store tasks waiting to be processed by worker threadstypedefstructtask_queue_s{void*head;// head of the linked list of tasksvoid**tail;// tail of the linked list (pointer to the last task)intblock;// whether the queue is blocking threads when empty (1: blocking, 0: non-blocking)spinlock_tlock;// spinlock for protecting shared data in the queuepthread_mutex_tmutex;// mutex to synchronize access to the queue pthread_cond_tcond;// condition variable to signal threads when new tasks arrive}task_queue_t;typedefstructthrdpool_s{task_queue_t*task_queue;// pointer to the task queueatomic_intquit;// flag to signal threads to terminateintthrd_count;// number of worker threadspthread_t*threads;// array of worker threads IDs}thrdpool_t;// create a task queuestatictask_queue_t*__taskqueue_create(){task_queue_t*queue=(task_queue_t*)malloc(sizeof(task_queue_t));if(queue){pthread_mutex_init(&queue->mutex,NULL);pthread_cond_init(&queue->cond,NULL);spinlock_init(&queue->lock);queue->head=NULL;queue->tail=&queue->head;queue->block=1;// default behavior is blockingreturnqueue;}returnNULL;}// set the task queue to non-blocking modestaticvoid__set_nonblock(task_queue_t*queue){pthread_mutex_lock(&queue->mutex);// lock the mutex to safely modify queuequeue->block=0;pthread_mutex_unlock(&queue->mutex);pthread_cond_broadcast(&queue->cond);// wake up all threads waiting on the condition variable}// add a task to the task queuestaticinlinevoid__add_task(task_queue_t*queue,void*task){void**link=(void**)task;// point to the next pointer*link=NULL;// lock the queue to ensure thread safety while modifying itspinlock_lock(&queue->lock);// add the task to the queue and update the tail pointer*queue->tail=link;queue->tail=link;// now this task is the new tail of the queuespinlock_unlock(&queue->lock);pthread_cond_signal(&queue->cond);// signal a waiting thread that a task is available}// pop a task from the task queuestaticinlinevoid*__pop_task(task_queue_t*queue){spinlock_lock(&queue->lock);if(queue->head==NULL){// queue is emptyspinlock_unlock(&queue->lock);returnNULL;// no task to pop}// pop the task from the queuetask_t*task=queue->head;void**link=(void**)task;queue->head=*link;// move the head pointer to the next taskif(queue->head==NULL){queue->tail=&queue->head;// if the queue is now empty, reset the tail}spinlock_unlock(&queue->lock);returntask;}// destroy the task queue by freeing all tasks and related resourcesstaticvoid__taskqueue_destroy(task_queue_t*queue){task_t*task;while((task=__pop_task(queue))){free(task);}spinlock_destroy(&queue->lock);pthread_cond_destroy(&queue->cond);pthread_mutex_destroy(&queue->mutex);free(queue);}// get a task from the queue, blocking if none availablestaticinlinevoid*__get_task(task_queue_t*queue){task_t*task;// handle potential false wakeups with a loop to recheckwhile((task=__pop_task(queue))==NULL){pthread_mutex_lock(&queue->mutex);// lock mutex to prevent race conditions// if the queue is non-blocking, return NULL if no tasks are available// without waiting for a signalif(queue->block==0){pthread_mutex_unlock(&queue->mutex);returnNULL;}// wait until a task is added to the queue. there could be false wakeupspthread_cond_wait(&queue->cond,&queue->mutex);pthread_mutex_unlock(&queue->mutex);// unlock after waiting}returntask;}// worker thread function that processes tasks from the queuestaticvoid*__thrdpool_worker(void*arg){thrdpool_t*pool=(thrdpool_t*)arg;task_t*task;void*ctx;// keep running until the quit flag is setwhile(atomic_load(&pool->quit)==0){task=(task_t*)__get_task(pool->task_queue);// get a task from the queueif(!task)break;// if no task, break out of the loop// extract the task function and argumenthandlerfunc=task->func;ctx=task->arg;free(task);// free the task structure after usefunc(ctx);}returnNULL;// exit the worker thread}// terminate all threads in the poolstaticvoid__threads_terminate(thrdpool_t*pool){atomic_store(&pool->quit,1);// set the quit flag to terminate threads__set_nonblock(pool->task_queue);// ensure threads don't block waiting for tasksfor(inti=0;i<pool->thrd_count;i++){// join all threadspthread_join(pool->threads[i],NULL);}}// creates threads for the poolstaticint__threads_create(thrdpool_t*pool,size_tthrd_count){pthread_attr_tattr;intret;ret=pthread_attr_init(&attr);// initialize thread attributesif(ret==0){// allocate memory for threadspool->threads=(pthread_t*)malloc(sizeof(pthread_t)*thrd_count);if(pool->threads){inti=0;for(;i<thrd_count;i++){if(pthread_create(&pool->threads[i],&attr,__thrdpool_worker,pool)!=0){break;// break on failure to create a thread}}pool->thrd_count=i;// set the actual number of created threadspthread_attr_destroy(&attr);// destroy thread attributes after useif(i==thrd_count)// if all threads were created, return successreturn0;__threads_terminate(pool);// terminate threads if creation failsfree(pool->threads);// free memory allocated for threads}ret=-1;// return failure if memory allocation fails}returnret;}// create a thread poolthrdpool_t*thrdpool_create(intthrd_count){thrdpool_t*pool;pool=(thrdpool_t*)malloc(sizeof(*pool));// allocate memory for the thread poolif(pool){task_queue_t*queue=__taskqueue_create();if(queue){pool->task_queue=queue;atomic_init(&pool->quit,0);// initialize the quit flagif(__threads_create(pool,thrd_count)==0)// create worker threadsreturnpool;__taskqueue_destroy(queue);// destroy the queue if thread creation fails}free(pool);// free memory if thread pool creation fails}returnNULL;// return NULL if thread pool creation fails}// post a task to the thread poolintthrdpool_post(thrdpool_t*pool,handlerfunc,void*arg){if(atomic_load(&pool->quit)==1)// check if the pool is quittingreturn-1;task_t*task=(task_t*)malloc(sizeof(task_t));if(!task)return-1;task->func=func;task->arg=arg;__add_task(pool->task_queue,task);return0;// return success}// terminate the thread poolvoidthrdpool_terminate(thrdpool_t*pool){atomic_store(&pool->quit,1);__set_nonblock(pool->task_queue);}// wait for all threads in the pool to finish their tasksvoidthrdpool_waitdone(thrdpool_t*pool){for(inti=0;i<pool->thrd_count;i++){pthread_join(pool->threads[i],NULL);}__taskqueue_destroy(pool->task_queue);free(pool->threads);free(pool);}voidsample_task(void*arg){int*num=(int*)arg;printf("Task started with arg: %d\n",*num);// simulate some workfor(volatileinti=0;i<1000000;i++){}printf("Task completed with arg: %d\n",*num);}intmain(){// create a thread pool with 4 threadsthrdpool_t*pool=thrdpool_create(4);if(pool==NULL){printf("Failed to create thread pool\n");return-1;}// post 10 tasks to the poolfor(inti=0;i<10;i++){int*arg=(int*)malloc(sizeof(int));*arg=i;// set the argument for the taskif(thrdpool_post(pool,sample_task,arg)!=0){printf("Failed to post task %d\n",i);free(arg);}}// wait for all tasks to completethrdpool_waitdone(pool);// terminate the thread poolthrdpool_terminate(pool);printf("Thread pool terminated.\n");return0;}
./build/main
Task started with arg: 0
Task started with arg: 1
Task started with arg: 2
Task started with arg: 3
Task completed with arg: 1
Task started with arg: 4
Task completed with arg: 0
Task started with arg: 5
Task completed with arg: 2
Task started with arg: 6
Task completed with arg: 3
Task started with arg: 7
Task completed with arg: 4
Task started with arg: 8
Task completed with arg: 5
Task started with arg: 9
Task completed with arg: 6
Task completed with arg: 7
Task completed with arg: 8
Task completed with arg: 9
rollback or defensive
The rollback style is typically used in complex resource allocation, such as
when creating a task queue, where resources are allocated in stages. If an
allocation fails at any point, previously acquired resources are released in
reverse order to prevent leaks and maintain consistency. The defensive style
for logic flow focuses on preventing errors by validating inputs, checking
conditions at every step, and using safeguards like assertions and bounds checks
(or early returns).
void**
Converting a void* pointer to a void** is a common technique in system
programming. In the implementation above, the tail actually points to the next
pointer of the last task in the queue. This simplifies managing linked lists.
There is also a convertion from void* to void** when appending a task.
Considering the memory layout, when a task pointer points to the entire task,
void* removes the type information, while a void** pointer starts at the same
memory address spans 8 bytes and points to the next pointer of a task.
(void** is a pointer to a void* pointer, which means it holds the
address of a void*)