Understanding Broadcaster - Atmosphere/atmosphere GitHub Wiki

Understanding Broadcaster

A Broadcaster implements the publish/subscribe paradigm. An application can subscribe to one or many Broadcasters to get notified about events. By default, a single Broadcaster is created by the framework and associated with every new AtmosphereResource. The default Broadcaster's id is always "/*", which means that if you invoke

 Future<Object> f = broadcaster.broadcast("hello");

all AtmosphereResource objects associated with the broadcaster will get notified and will have a chance to handle the event. By default, broadcaster.broadcast("hello") is an asynchronous operation and won't block. If you want to get notified when the asynchronous execution complete, you can either use the returned Future

  Future<Object> f = broadcaster.broadcast("hello");
  f.get(); // blocking until broadcast completed.

or you can add a BroadcasterListener that will be invoked when the broadcast operation completes.

broadcaster.addBroadcasterListener(new BroadcasterListener() {
   public void onComplete(Broadcaster b) {
       // do something
   }
}.broadcast("hello");

You can associate a Broadcaster with an AtmosphereResource by doing

  atmosphereResource.setBroadcaster(broadcaster);

  ...

  // Always return the broadcaster at position 0, or the one used when calling setBroadcaster(...);
  Broadcaster b = atmosphereResource.getBroadcaster();

or

  atmosphereResource.addBroadcaster(broadcaster);

  ...

  // Return all Broadcasters
  List<Broadcaster> l = atmosphereResource.broadcasters();

You can associate an AtmosphereResource to several Broadcasters by doing

 broadcaster.addAtmosphereResource(atmosphereResource);

As an example, implementing a Twitter like application using the Atmosphere Framework would consist of creating one Broadcaster per Twitter account (check this page for information on how to retrieve a BroadcasterFactory instance):

 Broadcaster b = broadcasterFactory.get("jfarcand");
 Broadcaster b2 = broadcasterFactory.get("atmo_framework");

If user 'jfarcand" wants to be notified when 'atmo_framework' publish a tweet, you just associate the AtmosphereResource representing 'jfarcand' to the 'atmo_framework' Broadcaster:

  b2.addAtmosphereResource(atmosphereResource);

You can create channels (or Broadcasters) on the fly, using the BroadcasterFactory class.

MetaBroadcaster

The MetaBroadcaster utility class can be really helpful when you want to broadcast messages to all or a subset of existing Broadcasters. For example, executing

metaBroadcaster.broadcastTo("/", "hello world");

will broadcast the "hello world" message to all AtmosphereResource associated to all Broadcaster. To be more specific, assume you create

  broadcasterFactory.get("/a");
  broadcasterFactory.get("/a/a1");
  broadcasterFactory.get("/a/a2");
  broadcasterFactory.get("/b");
  broadcasterFactory.get("/c");

Doing

   metaBroadcaster.broadcastTo("/a/*", "hello world");

is equivalent of doing:

  // Retrieve the Broadcaster named "/a/a1"
  broadcasterFactory.lookup("/a/a1", true).broadcast("hello world");
  // Retrieve the Broadcaster named "/a/a2"
  broadcasterFactory).lookup("/a/a2", true).broadcast("hello world");

You can also schedule periodic tasks using the

   metaBroadcaster.scheduleTo("/a/*", new Callable<String>() {
         public String call() {
             return "Hello world";   
         }
   }, 10, TimeUnit.SECONDS);

is equivalent of doing:

  // Retrieve the Broadcaster named "/a/a1"
  broadcasterFactory.lookup("/a/a1", true).scheduleFixedBroadcast(new Callable<String>() {
         public String call() {
             return "Hello world";   
         }
   }, 10, TimeUnit.SECONDS)
  // Retrieve the Broadcaster named "/a/a2"
  broadcasterFactory.lookup("/a/a2", true).scheduleFixedBroadcast(new Callable<String>() {
         public String call() {
             return "Hello world";   
         }
   }, 10, TimeUnit.SECONDS)

By default there is no cache associated with a MetaBroadcaster. That means broadcasted messages can be lost if you do

  metaBroadcaster.broadcastTo("/a/*", "hello world");

when there are no associated AtmosphereResource found. You can configure a MetaBroadcasterCache by doing:

  metaBroadcaster.cache(new MetaBroadcaster.ThirtySecondsCache(m, framework.getAtmosphereConfig()));

Finally, MetaBroadcaster also supports BroadcastListener

metaBroadcaster.addBroadcasterListener(new BroadcasterListener() {
   public void onComplete(Broadcaster b) {
       // do something
   }
}.broadcastTo("/*", hello");

BroadcasterConfig

A unique instance of BroadcasterConfig is always associated with a Broadcaster. You can use a BroadcasterConfig to set ExecutorServices, add BroadcastFilter and set a BroadcasterCache.

Configuring the default Broadcaster

By default, Atmosphere is using the DefaultBroadcaster and JerseyBroadcaster if atmosphere-jersey is used. You can either extend those Broadcasters or write your own. You can configure it by doing: In web.xml

        <init-param>
            <param-name>org.atmosphere.cpr.broadcasterClass</param-name>
            <param-value>org....</param-value>
        </init-param>

or in atmosphere.xml

        <applicationConfig>
            <param-name>org.atmosphere.cpr.broadcasterClass</param-name>
            <param-value>org...</param-value>
        </applicationConfig>

You can also annotate your Broadcaster implementation by using the BroadcasterService annotation:

   @BroadcasterService
   public class MyBroadcaster implements Broadcaster {...}

Atmosphere auto discovery of Broadcaster

Atmosphere is able to auto discover the following Broadcaster when available on the classpath

That means you don't have to specify them by default.

Asynchronous I/O and Broadcast

By default, a Broadcaster always creates three ExecutorServices: one for supporting asynchronous broadcast, one for supporting asynchronous write and one for scheduling tasks. If you don't need asynchronous I/O, it is recommended you use the SimpleBroadcaster or SimpleJerseyBroadcaster, they aren't using any ExecutorServices.

Preventing Out Of Memory

Using shareable ExecutorServices

If your application creates a lot of Broadcasters, you may experience some Out Of Memory error because too many instances of ExecutorServices have been created, e.g number of broadcaster * 3. If that's the case, you can configure Atmosphere to share ExecutorServices amongst Broadcasters. In that case only two ExecutorServices will be created:

Programatically
 Broadcaster b = broadcasterFactory.get();
 b.getBroacasterConfig().setExecutorServices(...).setAsyncWriteService(...);
Using web/atmosphere.xml

In web.xml

        <init-param>
            <param-name>org.atmosphere.cpr.broadcaster.shareableThreadPool</param-name>
            <param-value>true</param-value>
        </init-param>

or in atmosphere.xml

        <applicationConfig>
            <param-name>org.atmosphere.cpr.broadcaster.shareableThreadPool</param-name>
            <param-value>true</param-value>
        </applicationConfig>

Configuring the maximum threads used by a Broadcaster.

To configure the maximum threads created by the Broadcaster of the message delivery, just add

        <init-param>
            <param-name>org.atmosphere.cpr.broadcaster.maxProcessingThreads</param-name>
            <param-value>10</param-value>
        </init-param>

and for the one used for the write operation,

        <init-param>
            <param-name>org.atmosphere.cpr.broadcaster.maxAsyncWriteThreads</param-name>
            <param-value>10</param-value>
        </init-param>

Using BroadcasterFilter

Instead of creating several Broadcasters, you can always reduce the number by instead adding BroadcasterFilter and PerRequestBroadcasterFilter to a Broadcaster. You can then filter which broadcasted messages get delivered to which AtmosphereResource (client).

 Broadcaster b = broadcasterFactory.get();
 b.getBroacasterConfig().addFilter(...);

Using BroadcasterLifeCyclePolicy

Another way to prevent or reduce memory usage is by configuring a BroadcasterFactoryLifecyclePolicy. Supported policies are:

  • IDLE: Release all resources associated with the Broadcaster when the idle time expires. Suspended AtmosphereResource will NOT get resumed and instead be closed right away.
  • IDLE_DESTROY: Release all resources associated with the Broadcaster when the idle time expires and destroy the Broadcaster. This operation removes the Broadcaster from it's associated BroadcasterFactory. Suspended AtmosphereResource will NOT get resumed and instead be closed right away.
  • IDLE_RESUME: Release all resources associated with the Broadcaster when the idle time expires. All associated AtmosphereResource WILL BE resumed and the Broadcaster will be destroyed.
  • EMPTY: If there is no AtmosphereResource associated with the Broadcaster, release all resources.
  • EMPTY_DESTROY: If there is no AtmosphereResource associated with the Broadcaster, release all resources and destroy the Broadcaster. This operation removes the Broadcaster from it's associated BroadcasterFactory
  • NEVER: Never release or destroy the Broadcaster from it's associated BroadcasterFactory

The default is NEVER, which means that a fair amount of Broadcasters may 'polute' the BroadcasterFactory if not handled properly. BroadcasterFactoryLifecyclePolicy can be configured

Programmatically

You can configure the policy on a Broadcaster directly:

 Broadcaster b = broadcasterFactory.get();
 b.setBroadcasterLifeCyclePolicy(BroadcasterLifeCyclePolicy.IDLE);

You can also associate BroadcasterLifeCyclePolicyListener to a Broadcaster so you get notified when a policy is executed.

 b.addBroadcasterLifeCyclePolicyListener(new BroadcasterLifeCyclePolicyListener() {...});

so instead of creating a new one, the "destroyed" one will be marked as "alive" and can be re-used.

Using web/atmosphere.xml

In web.xml

        <init-param>
            <param-name>org.atmosphere.cpr.broadcasterLifeCyclePolicy</param-name>
            <param-value>IDLE</param-value>
        </init-param>

or in atmosphere.xml

        <applicationConfig>
            <param-name>org.atmosphere.cpr.broadcasterLifeCyclePolicy</param-name>
            <param-value>IDLE</param-value>
        </applicationConfig>

NOTE: If you set the policy to xxx_DESTROY, the associated BroadcasterCache will also be destroyed, and your application may start LOOSING messages between the time the Broacascaster with the same name is getting recreated.

Recovering a destroyed Broadcaster

A Broadcaster may have been destroyed but still available from the BroadcasterFactory. To re-use destroyed broadcaster, you can add, in web/application.xml

        <init-param>
            <param-name>org.atmosphere.cpr.recoverFromDestroyedBroadcaster</param-name>
            <param-value>true</param-value>
        </init-param>
⚠️ **GitHub.com Fallback** ⚠️