Buffered Messaging - laforge49/Asynchronous-Functional-Programming GitHub Wiki

Passing arrays of messages is one way to speed things up, though it does add overhead when there is only one message to be passed. This is a common technique in Flow-Based Programming (FBP) Again, the code for this is pretty straight forward though it was designed to support further enhancements rather than readability.

As before, benchmarks were done with an Intel Core 2 Duo T6400 @ 2GHz. In the echo test each message took about 1.4 microseconds to process: EchoTimingTest And in the burst test each message took about 40 nanoseconds to process: BurstTimingTest

###MessageListDestination

The MessageListDestination trait serves as a destination for arrays of messages. This allows other classes besides BufferedMessenger objects to receive messages.

/**
 * A MessageListDestination receives lists of messages from objects operating
 * on a different thread.
 */
trait MessageListDestination[T] {
  /**
   * The incomingMessageList method is called to process a list of messages
   * when the current thread is different
   * from the thread being used by the object being called.
   */
  def incomingMessageList(bufferedMessages: ArrayList[T])
}

MessageListDestination

###BufferedMessenger

The BufferedMessenger class uses a Messenger object to pass arrays of messages. Outgoing messages are aggregated and then sent when there are no further incoming messages to process.

/**
 * A BufferedMessenger sends lists of messages to MessageListDestination objects
 * which are operating on a different thread.
 */
class BufferedMessenger[T](threadManager: ThreadManager,
                           _messenger: Messenger[ArrayList[T]] = null)
  extends MessageListDestination[T] with MessageProcessor[ArrayList[T]] {

  private val messenger = {
    if (_messenger != null) _messenger
    else new Messenger[ArrayList[T]](threadManager)
  }
  private val pending = new java.util.HashMap[MessageListDestination[T], ArrayList[T]]
  private var messageProcessor: MessageProcessor[T] = null

  messenger.setMessageProcessor(this)

  /**
   * Specifies the object which will process the messages.
   */
  def setMessageProcessor(_messageProcessor: MessageProcessor[T]) {
    messageProcessor = _messageProcessor
  }

  /**
   * The incomingMessageList method is called to process a list of messages
   * when the current thread is different
   * from the thread being used by the object being called.
   */
  override def incomingMessageList(messageList: ArrayList[T]) {
    messenger.put(messageList)
  }

  /**
   * The isEmpty method returns true when there are no messages to be processed,
   * though the results may not always be correct due to concurrency issues.
   */
  def isEmpty = messenger.isEmpty

  /**
   * The poll method processes any messages in the queue.
   * Once complete, any pending outgoing messages are sent.
   */
  def poll {
    if (messenger.poll) flushPendingMsgs
  }

  /**
   * The processMessage method is used to process an incoming list of messages.
   */
  override def processMessage(messageList: ArrayList[T]) {
    var i = 0
    while (i < messageList.size){
      messageProcessor.processMessage(messageList.get(i))
      i += 1
    }
  }

  /**
   * The haveMessage method is called when there is an incoming message to be processed.
   */
  override def haveMessage {
    messageProcessor.haveMessage
  }

  /**
   * The flushPendingMsgs is called
   * when there are no pending incoming messages to process.
   */
  def flushPendingMsgs {
    if (isEmpty && !pending.isEmpty) {
      val it = pending.keySet.iterator
      while (it.hasNext) {
        val buffered = it.next
        val messageList = pending.get(buffered)
        buffered.incomingMessageList(messageList)
      }
      pending.clear
    }
  }

  /**
   * The putTo message builds lists of messages to be sent to other Buffered objects.
   */
  def putTo(messageListDestination: MessageListDestination[T], message: T) {
    var messageList = pending.get(messageListDestination)
    if (messageList == null) {
      messageList = new ArrayList[T]
      pending.put(messageListDestination, messageList)
    }
    messageList.add(message)
    if (messageList.size > 1023) {
      pending.remove(messageListDestination)
      messageListDestination.incomingMessageList(messageList)
    }
  }
}

BufferedMessenger


Speed