Messenger - laforge49/Asynchronous-Functional-Programming GitHub Wiki
The Messenger class provides a basic facility for sending messages that will be processed on another thread.
When a Messenger object calls the haveMessage method on a MessageProcessor object, the MessageProcessor object needs to call the poll method on its Messenger object. The Messenger object then calls the processMessage method on the MessageProcessor object for each of the messages found in the queue.
/**
* A Messenger receives messages, queues them, and then processes them on another thread.
*/
class Messenger[T](threadManager: ThreadManager)
extends Runnable {
private var messageProcessor: MessageProcessor[T] = null
private val queue = new ConcurrentLinkedBlockingQueue[T]
private val running = new AtomicBoolean
private var incomingMessage: T = null.asInstanceOf[T]
/**
* Specifies the object which will process the messages.
*/
def setMessageProcessor(_messageProcessor: MessageProcessor[T]) {
messageProcessor = _messageProcessor
}
/**
* The isEmpty method returns true when there are no messages to be processed,
* though the results may not always be correct due to concurrency issues.
*/
def isEmpty = queue.size() == 0
/**
* The put method adds a message to the queue of messages to be processed.
*/
def put(message: T) {
queue.put(message)
if (running.compareAndSet(false, true)) {
threadManager.process(this)
}
}
/**
* The poll method processes any messages in the queue.
* True is returned if any messages were processed.
*/
def poll: Boolean = {
if (incomingMessage == null) incomingMessage = queue.poll
if (incomingMessage == null) return false
while (incomingMessage != null) {
val msg = incomingMessage
incomingMessage = null.asInstanceOf[T]
messageProcessor.processMessage(msg)
incomingMessage = queue.poll
}
true
}
/**
* The run method is used to process the messages in the message queue.
* Each message is in turn processed using the MessageDispatcher.
*/
@tailrec final override def run {
incomingMessage = queue.poll
if (incomingMessage == null) {
running.set(false)
if (queue.peek == null || !running.compareAndSet(false, true)) return
}
messageProcessor.haveMessage
run
}
}