Skip to content
laforge49 edited this page Nov 10, 2011 · 10 revisions

EchoTimingTest

BurstTimingTest

##MessageListDestination

/**
 * A MessageListDestination receives lists of messages from objects operating
 * on a different thread.
 */
trait MessageListDestination[T] {
  /**
   * The incomingMessageList method is called to process a list of messages
   * when the current thread is different
   * from the thread being used by the object being called.
   */
  def incomingMessageList(bufferedMessages: ArrayList[T])
}

MessageListDestination

##BufferedMessenger

/**
 * A BufferedMessenger exchanges lists of messages with other Buffered objects,
 * where each Buffered object is operating on a different thread.
 */
class BufferedMessenger[T](messageProcessor: MessageProcessor[T], threadManager: ThreadManager)
  extends MessageListDestination[T] with MessageProcessor[ArrayList[T]] {
  val messenger = new Messenger[ArrayList[T]](this, threadManager)
  val pending = new java.util.HashMap[MessageListDestination[T], ArrayList[T]]

  /**
   * The incomingMessageList method is called to process a list of messages
   * when the current thread is different
   * from the thread being used by the object being called.
   */
  override def incomingMessageList(messageList: ArrayList[T]) {
    messenger.put(messageList)
  }

  /**
   * The isEmpty method returns true when there are no messages to be processed,
   * though the results may not always be correct due to concurrency issues.
   */
  def isEmpty = messenger.isEmpty

  /**
   * The poll method processes any messages in the queue.
   * Once complete, any pending outgoing messages are sent.
   */
  def poll {
    if (messenger.poll) flushPendingMsgs
  }

  /**
   * The processMessage method is used to process an incoming list of messages.
   */
  override def processMessage(messageList: ArrayList[T]) {
    var i = 0
    while (i < messageList.size){
      messageProcessor.processMessage(messageList.get(i))
      i += 1
    }
  }

  /**
   * The haveMessage method is called when there is an incoming message to be processed.
   */
  override def haveMessage {
    messageProcessor.haveMessage
  }

  /**
   * The flushPendingMsgs is called when there are no pending incoming messages to process.
   */
  def flushPendingMsgs {
    if (isEmpty && !pending.isEmpty) {
      val it = pending.keySet.iterator
      while (it.hasNext) {
        val buffered = it.next
        val messageList = pending.get(buffered)
        buffered.incomingMessageList(messageList)
      }
      pending.clear
    }
  }

  /**
   * The putTo message builds lists of messages to be sent to other Buffered objects.
   */
  def putTo(buffered: MessageListDestination[T], message: T) {
    var messageList = pending.get(buffered)
    if (messageList == null) {
      messageList = new ArrayList[T]
      pending.put(buffered, messageList)
    }
    messageList.add(message)
    if (messageList.size > 1023) {
      pending.remove(buffered)
      buffered.incomingMessageList(messageList)
    }
  }
}

BufferedMessenger

Clone this wiki locally