Engineering Full Stack Apps with Java and JavaScript
The producer consumer problem describes two processes, the producer and consumer, who share a common fixed size buffer used as a queue. The producer generates data and put data into buffer continuously. At the same time, the consumer will be consuming the data continuously. The problem is to make sure that producer won’t try to add data into buffer if it is full and that the consumer won’t try to remove data from an empty buffer. This version of the producer consumer problem is also known as bounded buffer problem. We can also have an unbounded buffer and in that case we have to take care of only the consumer that the consumer won’t try to remove data from an empty buffer. The producer consumer problem is a classic example of a multi-process synchronization problem.
We can easily implement the producer consumer problem using the BlockingQueue. A BlockingQueue already supports operations that wait for the queue to become non-empty when retrieving an element, and wait for space to become available in the queue when storing an element. Without BlockingQueue, every time we put data to queue at the producer side, we need to check if queue is full, and if full, wait for some time, check again and continue. Similarly on the consumer side, we would have to check if queue is empty, and if empty, wait for some time, check again and continue. However with BlockingQueue we don’t have to write any extra logic than to just add data from Producer and poll data from Consumer.
Producer-Consumer Example using BlockingQueue
Producer1.java
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
class Producer1 extends Thread {
BlockingQueue<String> bq;
Producer1() {
bq = new ArrayBlockingQueue<String>(5);
}
public void run() {
for (int i = 0; i <= 10; i++) {
try {
System.out.println("adding str"+i);
bq.put("str"+i);
Thread.sleep(1);
} catch (InterruptedException e) {
}
}
}
}
Consumer1.java
import java.util.concurrent.TimeUnit;
class Consumer1 extends Thread {
Producer1 prod;
Consumer1(Producer1 prod) {
this.prod = prod;
}
public void run() {
for(int i=0;i<10;i++)
{
try {
System.out.println("received "+prod.bq.poll(10, TimeUnit.SECONDS));
Thread.sleep(2000);
} catch (InterruptedException e) {
}
}
}
}
ProducerConsumer1.java
public class ProducerConsumer1 {
public static void main(String[] args) {
Producer1 obj1 = new Producer1();
Consumer1 obj2 = new Consumer1(obj1);
Thread t1 = new Thread(obj1);
Thread t2 = new Thread(obj2);
t2.start();
t1.start();
}
}
Change the values in Thread.sleep() in both Producer and Consumer to see how a faster producer and slow consumer work; and how a slow producer and faster consumer work.