The Producer Consumer problem is apparently a classic multi thread synchronization problem in computer science. Essentially, we have a producer who is producing some data and we have a consumer who is waiting to consume some data. Till the producer produces some data, the consumer cannot do anything. We can draw parallels from a hotel here. Till the cook sends out the food, the customer has to wait. Here, the cook is the producer and the customer is the consumer, obviously. We can also add an additional constraint here. If the customer has too many dishes on their table, the cook better wait to send out the next part of the meal. Else, the meal will get cold (or warm if ice creams are what are coming out next). This means the producer also has to wait for the consumer to complete processing the data.
What we can do to simulate this in C++ is have two threads and their associated functions: Producer and Consumer. We will also have a queue into which the producer will add some data. In our case, let us just add some numbers to the queue and the consumer shall pop the queue and consume the same.
The following is the full code with comments, will explain a few more details later as well:
Snippet
Snippet
#include <iostream>
#include <queue>
#include <condition_variable>
#include <thread>
#include <mutex>
#include <string>
using namespace std;
// Queue into which producer adds data and from which the consumer picks data
queue<long> dataQ;
// first one is for the producer consumer operation, the second one to sync the prints
mutex pc_mu, print_mu;
// Conditional variable for the producer consumer sync
condition_variable consumerwait_cv;
void Print(string str)
{
// Let us have the Print also guarded so that the output appears sequential
lock_guard<mutex> lg(print_mu);
cout << str << endl;
}
void Producer()
{
int index = 0;
while (1)
{
unique_lock<mutex> ul(pc_mu);
// If more than 4 unprocessed items are in the data queue,
// wait for sometime before adding more
if (dataQ.size() >= 5)
{
consumerwait_cv.wait(ul, []() {return !(dataQ.size() >= 5); });
}
dataQ.push(index);
// Unlock the lock and notify the consumer that data is available
ul.unlock();
consumerwait_cv.notify_one();
// Just adding some random delay
this_thread::sleep_for(chrono::milliseconds(100));
Print(" Producer produced " + to_string(index));
index++;
}
}
void Consumer()
{
while (1)
{
unique_lock<mutex> ul(pc_mu);
if (dataQ.empty()) // If the data queue is empty,
// wait for the producer to add something to it
{
// Predicate should return false to continue waiting.
// Thus, if the queue is empty, predicate should return false (!q.empty())
consumerwait_cv.wait(ul, []() {return !dataQ.empty(); });
}
ul.unlock(); // Unlock the lock to unblock the producer.
// If this statement is commented, the producer is blocked till this loop ends
int element = dataQ.front(); // Pick the element from the queue
dataQ.pop();
consumerwait_cv.notify_one(); // Tell the producer that they can go ahead
// since 1 element is now popped off for processing
// Wait for some time to show that the consumer is slower than the producer
this_thread::sleep_for(chrono::milliseconds(1000));
Print(" Consumer got " + to_string(element));
}
}
int main()
{
thread prod(Producer);
thread cons(Consumer);
prod.detach();
cons.detach();
// Wait in an infinite loop to see the producer consumer flow happen
while (1);
}
The basic idea here is that the producer adds some data into the data queue and notifies anyone waiting on the condition variable that some data is available. The consumer would be waiting for some data to come on to the data queue. As soon as the producer produces the data and calls the “consumerwait_cv.notify_one();”, the consumer thread is woken up and it shall start processing the data. Another thing to note here is the “ul.unlock();” after this wait statement in the Consumer. If we don’t do this, the producer will be blocked since it is also trying to lock the same mutex.
Here, assume that we wanted to unblock the producer and keep them independent of the consumer speed. We can simply remove the “if (dataQ.size() >= 5)” if block and then the producer is free to keep on adding new data to the queue. We can also remove the “consumerwait_cv.notify_one();” in the consumer if we do this.
One thing I also found out during this implementation is that the same condition variable can be used to wait on different predicates. Should have realized this earlier since the predicate is specified during the wait call and not the constructor but I didn’t. 🙂 So in this case, I am using the same condition variable to block the consumer till the producer produces some data and also to block the producer if the data queue has too many unprocessed items.
This is how the output appears:
As you can see, the producer first produces some items but is then stuck till the consumer can consume it. After this, each time the consumer pops off one element to process, the producer can add one more element to the queue. If the consumer is able to speed up its processing, we might also end up back in situation where the consumer has to wait for the producer again.
References:
I am having doubt if consumer code shall have ul.ublock() call before accessing the data element? Isn't queue is a critical resource here and we want to synchronise access of it among consumer and producer? Correct me if I am wrong, but I think ul.ublock shoud be after dataQ.pop().
BDW, enjoyed going through the code, thanks!!