-
Notifications
You must be signed in to change notification settings - Fork 1
Buffered Messaging
laforge49 edited this page Nov 10, 2011
·
10 revisions
##MessageListDestination
/**
* A MessageListDestination receives lists of messages from objects operating
* on a different thread.
*/
trait MessageListDestination[T] {
/**
* The incomingMessageList method is called to process a list of messages
* when the current thread is different
* from the thread being used by the object being called.
*/
def incomingMessageList(bufferedMessages: ArrayList[T])
}
##BufferedMessenger
/**
* A BufferedMessenger exchanges lists of messages with other Buffered objects,
* where each Buffered object is operating on a different thread.
*/
class BufferedMessenger[T](messageProcessor: MessageProcessor[T], threadManager: ThreadManager)
extends MessageListDestination[T] with MessageProcessor[ArrayList[T]] {
val messenger = new Messenger[ArrayList[T]](this, threadManager)
val pending = new java.util.HashMap[MessageListDestination[T], ArrayList[T]]
/**
* The incomingMessageList method is called to process a list of messages
* when the current thread is different
* from the thread being used by the object being called.
*/
override def incomingMessageList(messageList: ArrayList[T]) {
messenger.put(messageList)
}
/**
* The isEmpty method returns true when there are no messages to be processed,
* though the results may not always be correct due to concurrency issues.
*/
def isEmpty = messenger.isEmpty
/**
* The poll method processes any messages in the queue.
* Once complete, any pending outgoing messages are sent.
*/
def poll {
if (messenger.poll) flushPendingMsgs
}
/**
* The processMessage method is used to process an incoming list of messages.
*/
override def processMessage(messageList: ArrayList[T]) {
var i = 0
while (i < messageList.size){
messageProcessor.processMessage(messageList.get(i))
i += 1
}
}
/**
* The haveMessage method is called when there is an incoming message to be processed.
*/
override def haveMessage {
messageProcessor.haveMessage
}
/**
* The flushPendingMsgs is called when there are no pending incoming messages to process.
*/
def flushPendingMsgs {
if (isEmpty && !pending.isEmpty) {
val it = pending.keySet.iterator
while (it.hasNext) {
val buffered = it.next
val messageList = pending.get(buffered)
buffered.incomingMessageList(messageList)
}
pending.clear
}
}
/**
* The putTo message builds lists of messages to be sent to other Buffered objects.
*/
def putTo(buffered: MessageListDestination[T], message: T) {
var messageList = pending.get(buffered)
if (messageList == null) {
messageList = new ArrayList[T]
pending.put(buffered, messageList)
}
messageList.add(message)
if (messageList.size > 1023) {
pending.remove(buffered)
buffered.incomingMessageList(messageList)
}
}
}