Skip to content

Thread Pool

A thread pool is a collection of pre-allocated threads that manage the execution of tasks in a concurrent system, improving efficiency by reusing threads instead of creating new ones for each task. In a producer-consumer model, the producer generates tasks, and the consumer threads in the pool handle their execution. This reduces the overhead of thread creation and destruction and optimizes resource management, making it ideal for high-concurrent environment like web servers or databases. The thread pool controls the number of concurrent threads, ensuring scalable and efficient performance, especially in systems with frequent or I/O-bound tasks.

How it works

Initialization

When a thread pool is created, a certain number of worker threads are pre-created and kept idle, waiting to perform tasks. The number of threads is typically configurable, and the pool can adjust this number based on demand.

Task Submission

Tasks are submitted to the thread pool, usually in the form of callable functions or runnable tasks. These tasks are placed into a queue, where they wait until a worker thread becomes available.

Task Execution Once a thread is available, it picks up a task from the queue and begins execution. The thread works on the task until it finishes, at which point the thread becomes available again to pick up new tasks.

Thread Reuse

The primary advantage of thread pools is thread reuse. Rather than creating a new thread each time a task needs to be executed (which can be costly in terms of system resources), the threads in the pool are reused to execute multiple tasks over time. This reduces overhead and improves efficiency, especially when tasks are short-lived or frequent.

Task Queuing and Blocking

If all threads in the pool are busy when a new task arrives, the task will typically be placed in a task queue and wait until a thread becomes available. In some implementations, there may be limits on the queue size, after which new tasks might be rejected or handled differently (e.g. by waiting or being executed in an alternate manner).

Dynamic Sizing

Some thread pools can dynamically adjust the number of threads based on the system load or the volume of tasks being submitted. For example, if there are a lot of tasks waiting in the queue, the pool may create additional threads to handle the increased load. Conversely, if the system is idle, threads may be reduced to save resources.

Shutdown and Cleanup

When the thread pool is no longer needed, it can be shut down. Any remaining tasks in the queue may either be completed or discarded depending on the shutdown policy. Threads are then safely terminated and cleaned up.

Optimal Threads

The optimal number of threads depends on various factors such as the type of tasks, the system's hardware resources (e.g. CPU cores), and the workload characteristics.

CPU-bound Tasks

Optimal Threads = Number of CPU Cores

CPU-bound tasks are those that require significant processing power and do not spend much time waiting for external resources (e.g. disk I/O or network requests). The optimal number of threads for CPU-bound tasks is generally equal to the number of available CPU cores. This is because each thread can run on a separate core, allowing the system to perform tasks in parallel and avoid excessive content switching.

I/O-bound Tasks

Optimal Threads = Number of CPU Cores x Multiplier (2-3)

I/O-bound tasks, on the other hand, often spend more time waiting for I/O operations (e.g. file reads/writes, network requests) than actually performing computations. In these cases, the optimal number of threads can exceed the number of CPU cores because threads will often be idle while waiting for I/O, allowing other threads to execute in parallel. The multipler typically ranges from 2 to 3, depending on how much time is spent waiting on I/O operations.

Empirical Testing

In practice, we use empirical testing to fine-tune the thread pool size by experimenting with different configurations and observing how the system performs under various loads. This involves measuring key metrics like task completion time, CPU usage, and throughput, then adjusting the number of threads to find the optimal balance between resource usage and performance.

Implementation

A typical thread pool implementation uses a queue (often blocking queue) to manage tasks. Tasks are submitted to the queue, and worker threads, which are stored in pool, retrieve and execute them. The worker threads are typically stored in thread-safe linked list or a similar data structure. When no threads are available, tasks remain in the queue until a thread becomes free. The pool also manages synchronization using structures like mutex to ensure thread safety when accessing shared resources like the task queue. Here is a concise implementation of the key ideas:

#include <pthread.h>
#include <stdatomic.h>
#include <stdio.h>
#include <stdlib.h>

typedef struct spinlock_s {
  atomic_flag flag; // a flag used to implement the spinlock
} spinlock_t;

// initialize the spinlock
void spinlock_init(spinlock_t *lock) {
  atomic_flag_clear(&lock->flag); // unlocked
}

// acquire the spinlock
void spinlock_lock(spinlock_t *lock) {
  while (atomic_flag_test_and_set(&lock->flag)) {
    // busy-wait while the lock is held by another thread
  }
}

// release the spinlock
void spinlock_unlock(spinlock_t *lock) {
  atomic_flag_clear(&lock->flag); // clear the flag (lock released)
}

// destroy the spinlock
void spinlock_destroy(spinlock_t *lock) {
  // nothing specific needs to be done for this spinlock implementation
}


// handler type for tasks that the thread pool will execute
typedef void (*handler)(void *arg);

// task structure that will be added to the task queue
typedef struct task_s {
  void *next; // pointer to the next task in the queue (linked list)
  handler func; // function pointer to the task handler (function to execute)
  void *arg; // argument to pass to the handler function
} task_t;

// task queue structure to store tasks waiting to be processed by worker threads
typedef struct task_queue_s {
  void *head; // head of the linked list of tasks
  void **tail; // tail of the linked list (pointer to the last task)
  int block; // whether the queue is blocking threads when empty (1: blocking, 0: non-blocking)
  spinlock_t lock; // spinlock for protecting shared data in the queue
  pthread_mutex_t mutex; // mutex to synchronize access to the queue 
  pthread_cond_t cond; // condition variable to signal threads when new tasks arrive
} task_queue_t;

typedef struct thrdpool_s {
  task_queue_t *task_queue; // pointer to the task queue
  atomic_int quit; // flag to signal threads to terminate
  int thrd_count; // number of worker threads
  pthread_t *threads; // array of worker threads IDs
} thrdpool_t;

// create a task queue
static task_queue_t* __taskqueue_create() {
  task_queue_t *queue = (task_queue_t *)malloc(sizeof(task_queue_t));
  if (queue) {
    pthread_mutex_init(&queue->mutex, NULL);
    pthread_cond_init(&queue->cond, NULL);
    spinlock_init(&queue->lock);

    queue->head = NULL;
    queue->tail = &queue->head;
    queue->block = 1; // default behavior is blocking
    return queue;
  }
  return NULL;
}

// set the task queue to non-blocking mode
static void __set_nonblock(task_queue_t *queue) {
  pthread_mutex_lock(&queue->mutex); // lock the mutex to safely modify queue
  queue->block = 0;
  pthread_mutex_unlock(&queue->mutex);
  pthread_cond_broadcast(&queue->cond); // wake up all threads waiting on the condition variable
}

// add a task to the task queue
static inline void __add_task(task_queue_t *queue, void *task) {
  void **link = (void **)task; // point to the next pointer
  *link = NULL;

  // lock the queue to ensure thread safety while modifying it
  spinlock_lock(&queue->lock);

  // add the task to the queue and update the tail pointer
  *queue->tail = link;
  queue->tail = link; // now this task is the new tail of the queue

  spinlock_unlock(&queue->lock);
  pthread_cond_signal(&queue->cond); // signal a waiting thread that a task is available
}

// pop a task from the task queue
static inline void* __pop_task(task_queue_t *queue) {
  spinlock_lock(&queue->lock);
  if (queue->head == NULL) {
    // queue is empty
    spinlock_unlock(&queue->lock);
    return NULL; // no task to pop
  }

  // pop the task from the queue
  task_t *task = queue->head;
  void **link = (void**)task;
  queue->head = *link; // move the head pointer to the next task

  if (queue->head == NULL) {
    queue->tail = &queue->head; // if the queue is now empty, reset the tail
  }

  spinlock_unlock(&queue->lock);
  return task;
}

// destroy the task queue by freeing all tasks and related resources
static void __taskqueue_destroy(task_queue_t *queue) {
  task_t *task;
  while ((task = __pop_task(queue))) {
    free(task);
  }
  spinlock_destroy(&queue->lock);
  pthread_cond_destroy(&queue->cond);
  pthread_mutex_destroy(&queue->mutex);
  free(queue);
}


// get a task from the queue, blocking if none available
static inline void* __get_task(task_queue_t *queue) {
  task_t *task;
  // handle potential false wakeups with a loop to recheck
  while ((task = __pop_task(queue)) == NULL) {
    pthread_mutex_lock(&queue->mutex); // lock mutex to prevent race conditions

    // if the queue is non-blocking, return NULL if no tasks are available
    // without waiting for a signal
    if (queue->block == 0) {
      pthread_mutex_unlock(&queue->mutex);
      return NULL;
    }

    // wait until a task is added to the queue. there could be false wakeups
    pthread_cond_wait(&queue->cond, &queue->mutex);
    pthread_mutex_unlock(&queue->mutex);// unlock after waiting
  }
  return task;
}

// worker thread function that processes tasks from the queue
static void* __thrdpool_worker(void *arg) {
  thrdpool_t *pool = (thrdpool_t *) arg;
  task_t *task;
  void *ctx;

  // keep running until the quit flag is set
  while (atomic_load(&pool->quit) == 0) {
    task = (task_t *)__get_task(pool->task_queue); // get a task from the queue
    if (!task) break; // if no task, break out of the loop

    // extract the task function and argument
    handler func = task->func;
    ctx = task->arg;

    free(task); // free the task structure after use
    func(ctx);
  }

  return NULL; // exit the worker thread
}

// terminate all threads in the pool
static void __threads_terminate(thrdpool_t *pool) {
  atomic_store(&pool->quit, 1); // set the quit flag to terminate threads
  __set_nonblock(pool->task_queue); // ensure threads don't block waiting for tasks
  for (int i = 0; i < pool->thrd_count; i++) { // join all threads
    pthread_join(pool->threads[i], NULL);
  }
}

// creates threads for the pool
static int __threads_create(thrdpool_t *pool, size_t thrd_count) {
  pthread_attr_t attr;
  int ret;

  ret = pthread_attr_init(&attr); // initialize thread attributes

  if (ret == 0) {
    // allocate memory for threads
    pool->threads = (pthread_t *)malloc(sizeof(pthread_t) * thrd_count);
    if (pool->threads) {
      int i = 0;
      for (; i < thrd_count; i++) {
        if (pthread_create(&pool->threads[i], &attr, __thrdpool_worker, pool) != 0) {
          break; // break on failure to create a thread
        }
      }
      pool->thrd_count = i; // set the actual number of created threads
      pthread_attr_destroy(&attr); // destroy thread attributes after use
      if (i == thrd_count) // if all threads were created, return success
        return 0;
      __threads_terminate(pool); // terminate threads if creation fails
      free(pool->threads); // free memory allocated for threads
    }
    ret = -1; // return failure if memory allocation fails
  }

  return ret;
}

// create a thread pool
thrdpool_t* thrdpool_create(int thrd_count) {
  thrdpool_t *pool;
  pool = (thrdpool_t *)malloc(sizeof(*pool)); // allocate memory for the thread pool
  if (pool) {
    task_queue_t *queue = __taskqueue_create();
    if (queue) {
      pool->task_queue = queue;
      atomic_init(&pool->quit, 0); // initialize the quit flag
      if (__threads_create(pool, thrd_count) == 0) // create worker threads
        return pool;
      __taskqueue_destroy(queue); // destroy the queue if thread creation fails
    }
    free(pool); // free memory if thread pool creation fails
  }

  return NULL; // return NULL if thread pool creation fails
}

// post a task to the thread pool
int thrdpool_post(thrdpool_t *pool, handler func, void *arg) {
  if (atomic_load(&pool->quit) == 1) // check if the pool is quitting
    return -1;
  task_t *task = (task_t *)malloc(sizeof(task_t));
  if (!task) return -1;
  task->func = func;
  task->arg = arg;
  __add_task(pool->task_queue, task);
  return 0; // return success
}

// terminate the thread pool
void thrdpool_terminate(thrdpool_t *pool) {
  atomic_store(&pool->quit, 1);
  __set_nonblock(pool->task_queue);
}

// wait for all threads in the pool to finish their tasks
void thrdpool_waitdone(thrdpool_t *pool) {
  for (int i = 0; i < pool->thrd_count; i++) {
    pthread_join(pool->threads[i], NULL);
  }
  __taskqueue_destroy(pool->task_queue);
  free(pool->threads);
  free(pool);
}

void sample_task(void *arg) {
  int *num = (int *)arg;
  printf("Task started with arg: %d\n", *num);
  // simulate some work
  for (volatile int i = 0; i < 1000000; i++) {}
  printf("Task completed with arg: %d\n", *num);
}

int main() {
  // create a thread pool with 4 threads
  thrdpool_t *pool = thrdpool_create(4);
  if (pool == NULL) {
    printf("Failed to create thread pool\n");
    return -1;
  }

  // post 10 tasks to the pool
  for (int i = 0; i < 10; i++) {
    int *arg = (int *)malloc(sizeof(int));
    *arg = i; // set the argument for the task
    if (thrdpool_post(pool, sample_task, arg) != 0) {
      printf("Failed to post task %d\n", i);
      free(arg);
    }
  }

  // wait for all tasks to complete
  thrdpool_waitdone(pool);
  // terminate the thread pool
  thrdpool_terminate(pool);
  printf("Thread pool terminated.\n");

  return 0;
}
1
2
3
4
5
6
7
8
9
cmake_minimum_required(VERSION 3.10)

project(ThreadPoolDemo)

set(CMAKE_STANDARD 99)

add_executable(main main.c)

target_link_libraries(main pthread)
./build/main

Task started with arg: 0
Task started with arg: 1
Task started with arg: 2
Task started with arg: 3
Task completed with arg: 1
Task started with arg: 4
Task completed with arg: 0
Task started with arg: 5
Task completed with arg: 2
Task started with arg: 6
Task completed with arg: 3
Task started with arg: 7
Task completed with arg: 4
Task started with arg: 8
Task completed with arg: 5
Task started with arg: 9
Task completed with arg: 6
Task completed with arg: 7
Task completed with arg: 8
Task completed with arg: 9

rollback or defensive

The rollback style is typically used in complex resource allocation, such as when creating a task queue, where resources are allocated in stages. If an allocation fails at any point, previously acquired resources are released in reverse order to prevent leaks and maintain consistency. The defensive style for logic flow focuses on preventing errors by validating inputs, checking conditions at every step, and using safeguards like assertions and bounds checks (or early returns).

void**

Converting a void* pointer to a void** is a common technique in system programming. In the implementation above, the tail actually points to the next pointer of the last task in the queue. This simplifies managing linked lists. There is also a convertion from void* to void** when appending a task. Considering the memory layout, when a task pointer points to the entire task, void* removes the type information, while a void** pointer starts at the same memory address spans 8 bytes and points to the next pointer of a task. (void** is a pointer to a void* pointer, which means it holds the address of a void*)