top of page
  • Writer's picturePradeep P

A simple thread pool (using a blocking queue) in C++

A general practice of running tasks on separate threads is creating new threads and running the required tasks on these threads. Creating and destroying threads is indeed lighter than creating and destroying processes but still will take some valid time. Also consider a web server that spins up new threads to handle each request. If we don’t have a check, the web server might end up creating too many threads and this will result in a lot of context switching and reduce the overall efficiency of the system.

A better approach is to have a limit on the number of threads that are spun up so that we reduce these switches. To achieve this, we can make use of thread pools. Thread pools allow us to create a set of threads initially and have them wait for some to be performed. As soon as such a task is received, one of the threads pick this, perform it and go back to waiting for new tasks to do. An analogy is how we work in our software service companies, right? Companies don’t hire a new person for each new project and let them go at the end of it. They hire a bunch of people who wait for work. As soon as something is assigned, they do the work and then go back to waiting. The on boarding period for each new person is like the thread creation time :).

There are many things to consider when implementing a thread pool. Some of these are:

  1. How many threads so we support?

  2. Should the user be allowed to specify the thread count or should it be based on the hardware concurrency supported?

  3. Should we have a limit on the number of tasks that can be submitted to the thread pool or can the thread pool’s task queue keep growing to accommodate all the requests?

  4. Should we have tasks that can accept parameters and return values to the caller?

and so on.

In this example, I will consider a simple thread pool that can accept any number of requests but will have a limited set of worker threads. Once a worker thread is free, it shall pull a task from the queue and perform it. Once done, it will check if it can get any new tasks. If the queue contains something, it continues. Else, it will wait till a new task arrives.

To implement this, I am going to make use of a thread safe blocking queue. This queue shall block the caller who tries to get the front element till one is available. By offloading this capability to the queue, the thread pool need not have to worry about checking if the queue contains any elements or not.

The blocking queue code is thus:

threadsafe_queue.h

#pragma once
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
 
template <typename  T>
class threadsafe_queue
{
	std::queue<T> _tsQueue;
	std::mutex _mu;
	std::condition_variable _cv;
 
public:
	void Push(T value);
	bool Empty();
	void WaitAndPop(T& value);
};
 
template <typename  T>
void threadsafe_queue<T>::Push(T value)
{
	std::lock_guard<std::mutex> lg(_mu);
	// An optimization could be that we move the value instead of copying it
	_tsQueue.push(value); 
	// Notify if the condition variable is waiting at the wait and pop function
	_cv.notify_one(); 
}
 
 
template <typename  T>
bool threadsafe_queue<T>::Empty()
{
	// Lock the mutex before checking if the queue is indeed empty. 
	// This is needed since some other thread might try to add 
	// some elements or remove some elements from the queue when this call occurs
	std::lock_guard<std::mutex> lg(_mu);
	return _tsQueue.empty();
	return false;
}
 
template <typename  T>
void threadsafe_queue<T>::WaitAndPop(T& value)
{
	/*
	Instead of having a simple pop that shall pop the top element and return it (if available) 
	or throw an exception if the queue is empty, 	we can have this wait and pop function that
	will block the caller till some element is pushed into the queue (if it is currently empty)
	This way, the caller doesn't have to continously keep polling to see if the queue contains any elements
	*/
	std::unique_lock<std::mutex> ul(_mu);
	_cv.wait(ul, [this]()
		{
			// To prevent the cv being notified by spurious calls, we will 
			// check to be sure that the queue indeed has some element
			return !_tsQueue.empty(); 
		});
	value = _tsQueue.front(); // set the value in the reference
	_tsQueue.pop(); // Remove the element
} 

What we have here is a class that contains a std::queue. Pushing to the queue pushes an element to the queue and calls notify on a condition variable. The WaitAndPop function waits for the queue to get some elements. Once it does, the condition variable unblocks and the front element of the queue is retrieved, popped off and returned.

Next, the actual thread pool code:

thread_pool.h

Snippet

#include <thread>
#include <vector>
#include <atomic>
#include <functional>
#include <iostream>
#include <iostream>
#include "threadsafe_queue.h"
 
 
 
class thread_pool
{
	/*
	The thread pool should instantiate and keep ready a set of threads
	When a new task is added to the threadpool, one of the threads should pick it and complete it
	When no tasks are available, the threadpool should remain idle
	We should have an option to stop the threads
	*/
 
	std::atomic_bool done; // A flag to indicate whether the threads should run or exit
	std::vector<std::thread> workers;
	threadsafe_queue < std::function<void()>> taskQueue;
 
	void workerTask() // This is the function that shall be run on each threads
	{
		while (!done) // Loop till the done flag is set to true
		{
			std::function<void()> task;
			// This will block till a task is available in the queue
			taskQueue.WaitAndPop(task); 
			task(); // Run the task
		}
	}
 
 
public:
	// If no count specified, better to use the hardware_concurrency value to avoid too many context switches
	thread_pool(size_t threadCount = std::thread::hardware_concurrency()) 
	{
		workers.reserve(threadCount);
		for (size_t index = 0; index < threadCount; index++)
		{
			// Create a new thread and initialize it with the worker task
			workers.push_back(std::thread(&thread_pool::workerTask, this));
		}
 
		for (std::thread& thrd : workers)
		{
			std::cout << " Thread Id : " << thrd.get_id() << " is now created \n\n";
			thrd.detach(); // Detach from the main thread
		}
	}
 
	void AddTask(std::function<void()> task)
	{
		taskQueue.Push(task);
	}
 
	void TerminateThreadPool()
	{
		done = true;
 
		for (size_t index = 0; index < workers.size(); index++)
		{
			AddTask([]()
				{
					std::cout << std::this_thread::get_id() << " exitting\n\n";
					// Dummy task to enable the threads to break off from the wait and pop function
				});
		}
	}
};
 

If the user doesn’t specify the thread count for the thread pool, we set the value to the hardware concurrency value. We have a vector that will contain the handles of all the threads we are creating. The “workerTask” function acts as an infinitely looping thread function. Each thread will keep looping through this and waiting for a task (from the task queue). Since we are using the blocking queue, the threads will sleep till any task is available here. The function loops till the “done” field is set.

The queue itself is initialized to hold function objects which return void and accept no parameters ( “std::function < void() >”).

The AddTask function accepts the functions from the client who uses the thread pool and adds them to the queue.

Next, let us look at how we can use the thread pool. The below is a simple program that does so:

Snippet

#include <iostream>
#include <mutex>
#include <thread>
#include "thread_pool.h"
 
using namespace std;
 
mutex mu;
 
int main()
{
	thread_pool tp;
 
	/*
	Here we are adding a few dummy tasks for the thread pool to run.
	Just to see if we are able to push different tasks, we are 
	adding different lambdas when the index is even and when it is odd
	*/
	while (true)
	{
 
		for (int index = 0; index < 100; index++)
		{
			if (index % 2 == 0)
			{
				tp.AddTask([=]()
					{
						// Adding this to prevent the output from different threads overlapping
						lock_guard<mutex> lg(mu); 
						cout << " Printing the index " << index << " from thread id : " 
							<< this_thread::get_id() << endl;
					}
				);
			}
			else
			{
				tp.AddTask([=]()
					{
						lock_guard<mutex> lg(mu);
						cout << " Printing the index x 2 " << index * 2 << " from thread id : " 
							<< this_thread::get_id() << endl;
					}
				);
			}
 
			// Add a pause between adding tasks just to see the output appear clearly
			this_thread::sleep_for(chrono::milliseconds(10)); 
		}
 
		cout << "Do you want to repeat the operation? 1 for continue, anything else to exit\n";
 
		char input;
		cin >> input;
 
		if (input != '1')
		{
			tp.TerminateThreadPool();
			cout << "\n\nBye!\n\n";
			// Wait for the threads to exit (not an actual need but just to see the thread exit output)
			this_thread::sleep_for(chrono::milliseconds(1000)); 
			break;
		}
		else
		{
			cout << "\n\nRepeating the same 100 operations\n";
		}
	}
} 

Here, we are just adding new tasks into the thread pool. Just to be sure that we can add different functions, I have created a separation between odd numbers and even numbers. The task here is to just print the input number. I am using lambdas here the task. The function will print the numbers between 0 to 99 on different threads. You can run the code and see the output.

As you can see, the numbers are being printed from different threads. 🙂

732 views

Recent Posts

See All

Comments


bottom of page