From 6df01de590f14cb24e851f422ce044c355402842 Mon Sep 17 00:00:00 2001 From: bkk07 <180952443+bkk07@users.noreply.github.com> Date: Mon, 1 Jun 2026 10:09:25 +0530 Subject: [PATCH] Fix thread safety issues in producer-consumer implementation --- .../baeldung/producerconsumer/Consumer.java | 51 ++++++++++------- .../baeldung/producerconsumer/DataQueue.java | 56 +++++++----------- .../baeldung/producerconsumer/Producer.java | 57 +++++++++++-------- 3 files changed, 81 insertions(+), 83 deletions(-) diff --git a/core-java-modules/core-java-concurrency-advanced-7/src/main/java/com/baeldung/producerconsumer/Consumer.java b/core-java-modules/core-java-concurrency-advanced-7/src/main/java/com/baeldung/producerconsumer/Consumer.java index de350a40c41b..ada9df9504f3 100644 --- a/core-java-modules/core-java-concurrency-advanced-7/src/main/java/com/baeldung/producerconsumer/Consumer.java +++ b/core-java-modules/core-java-concurrency-advanced-7/src/main/java/com/baeldung/producerconsumer/Consumer.java @@ -3,8 +3,12 @@ import java.util.logging.Logger; public class Consumer implements Runnable { - private static final Logger log = Logger.getLogger(Consumer.class.getCanonicalName()); - private boolean running = false; + + private static final Logger log = + Logger.getLogger(Consumer.class.getCanonicalName()); + + private volatile boolean running = false; + private final DataQueue dataQueue; public Consumer(DataQueue dataQueue) { @@ -22,36 +26,39 @@ public void stop() { } public void consume() { + while (running) { - if (dataQueue.isEmpty()) { - try { - dataQueue.waitIsNotEmpty(); - } catch (InterruptedException e) { - log.severe("Error while waiting to Consume messages."); - break; - } - } + try { - // avoid spurious wake-up - if (!running) { - break; - } + Message message = dataQueue.poll(); + + useMessage(message); + + ThreadUtil.sleep((long) (Math.random() * 100)); - Message message = dataQueue.poll(); - useMessage(message); + } catch (InterruptedException e) { - //Sleeping on random time to make it realistic - ThreadUtil.sleep((long) (Math.random() * 100)); + log.severe("Error while consuming messages."); + + Thread.currentThread().interrupt(); + + break; + } } + log.info("Consumer Stopped"); } private void useMessage(Message message) { + if (message != null) { - log.info(String.format("[%s] Consuming Message. Id: %d, Data: %f%n", - Thread.currentThread().getName(), message.getId(), message.getData())); + + log.info(String.format( + "[%s] Consuming Message. Id: %d, Data: %f%n", + Thread.currentThread().getName(), + message.getId(), + message.getData())); } } - -} +} \ No newline at end of file diff --git a/core-java-modules/core-java-concurrency-advanced-7/src/main/java/com/baeldung/producerconsumer/DataQueue.java b/core-java-modules/core-java-concurrency-advanced-7/src/main/java/com/baeldung/producerconsumer/DataQueue.java index 7286ed8af87d..44abbaa7c872 100644 --- a/core-java-modules/core-java-concurrency-advanced-7/src/main/java/com/baeldung/producerconsumer/DataQueue.java +++ b/core-java-modules/core-java-concurrency-advanced-7/src/main/java/com/baeldung/producerconsumer/DataQueue.java @@ -4,59 +4,41 @@ import java.util.Queue; public class DataQueue { + private final Queue queue = new LinkedList<>(); private final int maxSize; - private final Object IS_NOT_FULL = new Object(); - private final Object IS_NOT_EMPTY = new Object(); DataQueue(int maxSize) { this.maxSize = maxSize; } - public boolean isFull() { - return queue.size() == maxSize; - } - - public boolean isEmpty() { - return queue.isEmpty(); - } - - public void waitIsNotFull() throws InterruptedException { - synchronized (IS_NOT_FULL) { - IS_NOT_FULL.wait(); - } - } + public synchronized void add(Message message) + throws InterruptedException { - public void waitIsNotEmpty() throws InterruptedException { - synchronized (IS_NOT_EMPTY) { - IS_NOT_EMPTY.wait(); + while (queue.size() == maxSize) { + wait(); } - } - public void add(Message message) { queue.add(message); - notifyIsNotEmpty(); - } - public Message poll() { - Message mess = queue.poll(); - notifyIsNotFull(); - return mess; + notifyAll(); } - public Integer getSize() { - return queue.size(); - } + public synchronized Message poll() + throws InterruptedException { - private void notifyIsNotFull() { - synchronized (IS_NOT_FULL) { - IS_NOT_FULL.notify(); + while (queue.isEmpty()) { + wait(); } + + Message message = queue.poll(); + + notifyAll(); + + return message; } - private void notifyIsNotEmpty() { - synchronized (IS_NOT_EMPTY) { - IS_NOT_EMPTY.notify(); - } + public synchronized Integer getSize() { + return queue.size(); } -} +} \ No newline at end of file diff --git a/core-java-modules/core-java-concurrency-advanced-7/src/main/java/com/baeldung/producerconsumer/Producer.java b/core-java-modules/core-java-concurrency-advanced-7/src/main/java/com/baeldung/producerconsumer/Producer.java index 4bd0e9e6d154..4f78ab693b84 100644 --- a/core-java-modules/core-java-concurrency-advanced-7/src/main/java/com/baeldung/producerconsumer/Producer.java +++ b/core-java-modules/core-java-concurrency-advanced-7/src/main/java/com/baeldung/producerconsumer/Producer.java @@ -4,9 +4,15 @@ import java.util.logging.Logger; public class Producer implements Runnable { - private static final Logger log = Logger.getLogger(Producer.class.getCanonicalName()); - private static final AtomicInteger idSequence = new AtomicInteger(0); - private boolean running = false; + + private static final Logger log = + Logger.getLogger(Producer.class.getCanonicalName()); + + private static final AtomicInteger idSequence = + new AtomicInteger(0); + + private volatile boolean running = false; + private final DataQueue dataQueue; public Producer(DataQueue dataQueue) { @@ -27,37 +33,40 @@ public void produce() { while (running) { - if (dataQueue.isFull()) { - try { - dataQueue.waitIsNotFull(); - } catch (InterruptedException e) { - log.severe("Error while waiting to Produce messages."); - break; - } - } + try { - // avoid spurious wake-up - if (!running) { - break; - } + dataQueue.add(generateMessage()); + + log.info("Size of the queue is: " + + dataQueue.getSize()); - dataQueue.add(generateMessage()); + ThreadUtil.sleep((long) (Math.random() * 100)); - log.info("Size of the queue is: " + dataQueue.getSize()); + } catch (InterruptedException e) { - //Sleeping on random time to make it realistic - ThreadUtil.sleep((long) (Math.random() * 100)); + log.severe("Error while producing messages."); + + Thread.currentThread().interrupt(); + + break; + } } log.info("Producer Stopped"); } private Message generateMessage() { - Message message = new Message(idSequence.incrementAndGet(), Math.random()); - log.info(String.format("[%s] Generated Message. Id: %d, Data: %f%n", - Thread.currentThread().getName(), message.getId(), message.getData())); + + Message message = + new Message(idSequence.incrementAndGet(), + Math.random()); + + log.info(String.format( + "[%s] Generated Message. Id: %d, Data: %f%n", + Thread.currentThread().getName(), + message.getId(), + message.getData())); return message; } - -} +} \ No newline at end of file