Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@
import java.util.logging.Logger;

public class Consumer implements Runnable {
private static final Logger log = Logger.getLogger(Consumer.class.getCanonicalName());
private boolean running = false;

private static final Logger log =
Logger.getLogger(Consumer.class.getCanonicalName());

private volatile boolean running = false;

private final DataQueue dataQueue;

public Consumer(DataQueue dataQueue) {
Expand All @@ -22,36 +26,39 @@ public void stop() {
}

public void consume() {

while (running) {

if (dataQueue.isEmpty()) {
try {
dataQueue.waitIsNotEmpty();
} catch (InterruptedException e) {
log.severe("Error while waiting to Consume messages.");
break;
}
}
try {

// avoid spurious wake-up
if (!running) {
break;
}
Message message = dataQueue.poll();

useMessage(message);

ThreadUtil.sleep((long) (Math.random() * 100));

Message message = dataQueue.poll();
useMessage(message);
} catch (InterruptedException e) {

//Sleeping on random time to make it realistic
ThreadUtil.sleep((long) (Math.random() * 100));
log.severe("Error while consuming messages.");

Thread.currentThread().interrupt();

break;
}
}

log.info("Consumer Stopped");
}

private void useMessage(Message message) {

if (message != null) {
log.info(String.format("[%s] Consuming Message. Id: %d, Data: %f%n",
Thread.currentThread().getName(), message.getId(), message.getData()));

log.info(String.format(
"[%s] Consuming Message. Id: %d, Data: %f%n",
Thread.currentThread().getName(),
message.getId(),
message.getData()));
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,59 +4,41 @@
import java.util.Queue;

public class DataQueue {

private final Queue<Message> queue = new LinkedList<>();
private final int maxSize;
private final Object IS_NOT_FULL = new Object();
private final Object IS_NOT_EMPTY = new Object();

DataQueue(int maxSize) {
this.maxSize = maxSize;
}

public boolean isFull() {
return queue.size() == maxSize;
}

public boolean isEmpty() {
return queue.isEmpty();
}

public void waitIsNotFull() throws InterruptedException {
synchronized (IS_NOT_FULL) {
IS_NOT_FULL.wait();
}
}
public synchronized void add(Message message)
throws InterruptedException {

public void waitIsNotEmpty() throws InterruptedException {
synchronized (IS_NOT_EMPTY) {
IS_NOT_EMPTY.wait();
while (queue.size() == maxSize) {
wait();
}
}

public void add(Message message) {
queue.add(message);
notifyIsNotEmpty();
}

public Message poll() {
Message mess = queue.poll();
notifyIsNotFull();
return mess;
notifyAll();
}

public Integer getSize() {
return queue.size();
}
public synchronized Message poll()
throws InterruptedException {

private void notifyIsNotFull() {
synchronized (IS_NOT_FULL) {
IS_NOT_FULL.notify();
while (queue.isEmpty()) {
wait();
}

Message message = queue.poll();

notifyAll();

return message;
}

private void notifyIsNotEmpty() {
synchronized (IS_NOT_EMPTY) {
IS_NOT_EMPTY.notify();
}
public synchronized Integer getSize() {
return queue.size();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,15 @@
import java.util.logging.Logger;

public class Producer implements Runnable {
private static final Logger log = Logger.getLogger(Producer.class.getCanonicalName());
private static final AtomicInteger idSequence = new AtomicInteger(0);
private boolean running = false;

private static final Logger log =
Logger.getLogger(Producer.class.getCanonicalName());

private static final AtomicInteger idSequence =
new AtomicInteger(0);

private volatile boolean running = false;

private final DataQueue dataQueue;

public Producer(DataQueue dataQueue) {
Expand All @@ -27,37 +33,40 @@ public void produce() {

while (running) {

if (dataQueue.isFull()) {
try {
dataQueue.waitIsNotFull();
} catch (InterruptedException e) {
log.severe("Error while waiting to Produce messages.");
break;
}
}
try {

// avoid spurious wake-up
if (!running) {
break;
}
dataQueue.add(generateMessage());

log.info("Size of the queue is: "
+ dataQueue.getSize());

dataQueue.add(generateMessage());
ThreadUtil.sleep((long) (Math.random() * 100));

log.info("Size of the queue is: " + dataQueue.getSize());
} catch (InterruptedException e) {

//Sleeping on random time to make it realistic
ThreadUtil.sleep((long) (Math.random() * 100));
log.severe("Error while producing messages.");

Thread.currentThread().interrupt();

break;
}
}

log.info("Producer Stopped");
}

private Message generateMessage() {
Message message = new Message(idSequence.incrementAndGet(), Math.random());
log.info(String.format("[%s] Generated Message. Id: %d, Data: %f%n",
Thread.currentThread().getName(), message.getId(), message.getData()));

Message message =
new Message(idSequence.incrementAndGet(),
Math.random());

log.info(String.format(
"[%s] Generated Message. Id: %d, Data: %f%n",
Thread.currentThread().getName(),
message.getId(),
message.getData()));

return message;
}

}
}