Skip to content

Synchronous Messaging

laforge49 edited this page Nov 17, 2011 · 7 revisions

When doing long computations or blocking I/O, there is a real advantage to asynchronous messaging. But synchronous messaging should be the default, because it is so much faster. This is implemented by the Exchange class, which runs the EchoTimingTest at about 94 nanoseconds per message and the BurstTimingTest at about 98 nanoseconds per message.

##ExchangeRequest

Request sent to an Exchange object must subclass ExchangeRequest.

/**
 * ExchangeRequest is used to support synchronous message passing between Exchanges.
 */
class ExchangeRequest(_sender: ExchangeMessengerSource, rf: Any => Unit)
  extends ExchangeMessengerRequest(_sender, rf) {

  /**
   * If a request is sent synchronously, fastSend is set to true.
   */
  var fastSend = false
}

ExchangeRequest

##Exchange

Exchanges almost always send messages synchronously, except when the target exchange has a non-empty message queue or when the target exchange was instantiated with the async argument set to true.

/**
 * The Exchange class supports synchronous exchanges of messages when the exchange
 * receiving the request is idle.
 */
abstract class Exchange(threadManager: ThreadManager,
                        async: Boolean = false,
                        _bufferedMessenger: BufferedMessenger[ExchangeMessengerMessage] = null)
  extends ExchangeMessenger(threadManager, _bufferedMessenger) {

  /**
   * Tracks which exchange has control. If an exchange can gain control
   * over another exchange, it can send requests to it synchronously.
   */
  val atomicControl = new AtomicReference[Exchange]

  /**
   * A control semaphore is released when another exchange releases control.
   * This semaphore is used to wake up a thread which has been assigned to
   * the exchange.
   */
  val control = new Semaphore(1)

  /**
   * Recasts ExchangeRequest.curReq as an ExchangeRequest.
   */
  override def curReq = super.curReq.asInstanceOf[ExchangeRequest]

  /**
   * Returns the controlling exchange, or null.
   */
  def controllingExchange = atomicControl.get

  /**
   * The haveMessage method is called by a thread when the thread is been assigned to
   * the exchange. A call to haveMessage results a call to poll, but only if
   * no other exchange is in control.
   */
  override def haveMessage {
    if (async) poll
    else if (atomicControl.compareAndSet(null, this)) {
      try {
        poll
      } finally {
        atomicControl.set(null)
      }
    }
  }

  /**
   * If control can be gained over the target exchange, process the request synchronously,
   * otherwise enqueue the request for subsequent processing on another thread.
   */
  override def sendReq(targetActor: ExchangeMessengerActor,
              exchangeMessengerRequest: ExchangeMessengerRequest,
              srcExchange: ExchangeMessenger) {
    if (async) super.sendReq(targetActor, exchangeMessengerRequest, srcExchange)
    else {
      exchangeMessengerRequest.setOldRequest(srcExchange.curReq)
      val srcControllingExchange = srcExchange.asInstanceOf[Exchange].controllingExchange
      if (controllingExchange == srcControllingExchange) {
        _sendReq(exchangeMessengerRequest)
      } else if (!atomicControl.compareAndSet(null, srcControllingExchange)) {
        super.sendReq(targetActor, exchangeMessengerRequest, srcExchange)
      } else {
        control.acquire
        try {
          _sendReq(exchangeMessengerRequest)
        } finally {
          atomicControl.set(null)
          control.release
        }
      }
    }
  }

  /**
   * Process a request synchronously.
   */
  private def _sendReq(exchangeMessengerRequest: ExchangeMessengerRequest) {
    exchangeMessengerRequest.asInstanceOf[ExchangeRequest].fastSend = true
    exchangeReq(exchangeMessengerRequest)
    poll
  }

  /**
   * Return a response the same way the request was sent.
   */
  override def sendResponse(senderExchange: ExchangeMessenger, rsp: ExchangeMessengerResponse) {
    if (curReq.fastSend) {
      senderExchange.exchangeRsp(rsp)
    } else super.sendResponse(senderExchange, rsp)
  }
}

Exchange

Clone this wiki locally