Synchronous Messaging - laforge49/Asynchronous-Functional-Programming GitHub Wiki

When doing long computations or blocking I/O, there is a real advantage to asynchronous messaging. But synchronous messaging should be the default, because it is so much faster. This is implemented by the Exchange class, which runs the EchoTimingTest at about 98 nanoseconds per message and the BurstTimingTest at about 101 nanoseconds per message.

##ExchangeRequest

Request sent to an Exchange object must subclass ExchangeRequest.

/**
 * ExchangeRequest is used to support synchronous message passing between Exchanges.
 */
class ExchangeRequest(_sender: ExchangeMessengerSource, rf: Any => Unit)
  extends ExchangeMessengerRequest(_sender, rf) {

  /**
   * If a request is sent synchronously, fastSend is set to true.
   */
  var fastSend = false
}

ExchangeRequest

##Exchange

Exchanges almost always send messages synchronously, except when the target exchange has a non-empty message queue or when the target exchange was instantiated with the async argument set to true.

/**
 * The Exchange class supports synchronous exchanges of messages when the exchange
 * receiving the request is idle.
 */
abstract class Exchange(threadManager: ThreadManager,
                        async: Boolean = false,
                        _bufferedMessenger: BufferedMessenger[ExchangeMessengerMessage] = null)
  extends ExchangeMessenger(threadManager, _bufferedMessenger) {

  /**
   * Tracks which exchange has control. If an exchange can gain control
   * over another exchange, it can send requests to it synchronously.
   */
  val atomicControl = new AtomicReference[Exchange]

  /**
   * Recasts ExchangeRequest.curReq as an ExchangeRequest.
   */
  override def curReq = super.curReq.asInstanceOf[ExchangeRequest]

  /**
   * Returns the controlling exchange, or null.
   */
  def controllingExchange = atomicControl.get

  /**
   * The haveMessage method is called by a thread when the thread is been assigned to
   * the exchange. A call to haveMessage results a call to poll, but only if
   * no other exchange is in control.
   */
  override def haveMessage {
    if (async) super.haveMessage
    else if (atomicControl.compareAndSet(null, this)) {
      try {
        poll
      } finally {
        atomicControl.set(null)
      }
    }
  }

  /**
   * Process a request synchronously.
   */
  private def _sendReq(exchangeMessengerRequest: ExchangeMessengerRequest) {
    if (exchangeMessengerRequest != null) {
      exchangeMessengerRequest.asInstanceOf[ExchangeRequest].fastSend = true
      exchangeReq(exchangeMessengerRequest)
    }
    poll
  }

  /**
   * If control can be gained over the target exchange, process the request synchronously,
   * otherwise enqueue the request for subsequent processing on another thread.
   */
  final override def sendReq(targetActor: ExchangeMessengerActor,
                                      exchangeMessengerRequest: ExchangeMessengerRequest,
                                      srcExchange: ExchangeMessenger) {
    if (async) {
      super.sendReq(targetActor, exchangeMessengerRequest, srcExchange)
    } else {
      exchangeMessengerRequest.setOldRequest(srcExchange.curReq)
      val srcControllingExchange = srcExchange.asInstanceOf[Exchange].controllingExchange
      if (controllingExchange == srcControllingExchange) {
        _sendReq(exchangeMessengerRequest)
      } else if (!atomicControl.compareAndSet(null, srcControllingExchange)) {
        super.sendReq(targetActor, exchangeMessengerRequest, srcExchange)
      } else {
        try {
          _sendReq(exchangeMessengerRequest)
        } finally {
          atomicControl.set(null)
        }
      }
      if (!isEmpty) sendRem(srcExchange)
    }
  }

  /**
   * Handle a potential race condition.
   */
  @tailrec final def sendRem(srcExchange: ExchangeMessenger) {
    val srcControllingExchange = srcExchange.asInstanceOf[Exchange].controllingExchange
    if (controllingExchange == srcControllingExchange) {
      poll
    } else if (atomicControl.compareAndSet(null, srcControllingExchange)) {
      try {
        poll
      } finally {
        atomicControl.set(null)
      }
    } else return
    if (isEmpty) return
    sendRem(srcExchange)
  }

  /**
   * Return a response the same way the request was sent.
   */
  override def sendResponse(senderExchange: ExchangeMessenger, rsp: ExchangeMessengerResponse) {
    if (curReq.fastSend) {
      senderExchange.exchangeRsp(rsp)
    } else super.sendResponse(senderExchange, rsp)
  }
}

Exchange


Speed