Akka Actor - Yash-777/MyWorld GitHub Wiki

Actors (Java) 1.3.1

The Actor Model provides a higher level of abstraction for writing concurrent and distributed systems. It alleviates the developer from having to deal with explicit locking and thread management, making it easier to write correct concurrent and parallel systems.

Defining an Actor class: Actors in Java are created either by extending the ‘UntypedActor’ class and implementing the ‘onReceive’ method. This method takes the message as a parameter.

Here is an example:

import akka.actor.UntypedActor;
import akka.event.EventHandler;
 
public class SampleUntypedActor extends UntypedActor {
 
  public void onReceive(Object message) throws Exception {
    if (message instanceof String)
      EventHandler.info(this, String.format("Received String message: %s",
        message));
    else
      throw new IllegalArgumentException("Unknown message: " + message);
  }
}

Receive messages: When an actor receives a message it is passed into the ‘onReceive’ method, this is an abstract method on the ‘UntypedActor’ base class that needs to be defined.

Creating Actors

Creating an Actor is done using the ‘akka.actor.Actors.actorOf’ factory method. This method returns a reference to the UntypedActor’s ActorRef. This ‘ActorRef’ is an immutable serializable reference that you should use to communicate with the actor, send messages, link to it etc. This reference also functions as the context for the actor and holds run-time type information such as sender of the last message,

ActorRef myActor = Actors.actorOf(SampleUntypedActor.class);
myActor.start();

Messages and immutability

IMPORTANT: Messages can be any kind of object but have to be immutable. Akka can’t enforce immutability (yet) so this has to be by convention.

Send messages: Messages are sent to an Actor through one of the ‘send’ methods.

  1. Fire-forget
Fire-forget Receive messages
This is the preferred way of sending messages. No blocking waiting for a message. Give best concurrency and scalability characteristics.
actor.tell("Hello");
// Or with the sender reference passed along:
actor.tell("Hello", getContext());
When an actor receives a message it is passed into the ‘onReceive’ method, this is an abstract method on the ‘UntypedActor’ base class that needs to be defined. Here is an example:
public class SampleUntypedActor extends UntypedActor {
  public void onReceive(Object message) throws Exception {
    if (message instanceof String)
      EventHandler.info(this, String.format("Received String message: %s", message));
    else
      throw new IllegalArgumentException("Unknown message: " + message);
  }
}
      
  1. Send-And-Receive-Eventually

Using ‘sendRequestReply’ will send a message to the receiving Actor asynchronously but it will wait for a reply on a ‘Future’, blocking the sender Actor until either:

  • A reply is received, or
  • The Future times out and an ‘ActorTimeoutException’ is thrown.

You can pass an explicit time-out to the ‘sendRequestReply’ method and if none is specified then the default time-out defined in the sender Actor will be used.

Here are some examples:

UntypedActorRef actorRef = ...
 
try {
  Object result = actorRef.sendRequestReply("Hello", getContext(), 1000);
  ... // handle reply
} catch(ActorTimeoutException e) {
  ... // handle timeout
}


🛠️ Working Example: 🧾 Spring Boot + Akka Integration baeldung.com

🔧 Objective

This guide demonstrates how to integrate Akka Actors with Spring Boot, where an actor handles asynchronous integration tasks like sending messages to external systems. It enables the use of Spring-managed beans as Akka actors, thus combining Spring DI with Akka’s powerful actor model.

🏗️ Project Structure

src/main/java/com/example/integration/
├── actor/
   └── JpaIntegrationActor.java
├── config/
   └── SpringExtension.java
   └── SpringActorProducer.java
├── model/
   └── IntegrationDataMsg.java
├── service/
   └── IntegrationServiceImpl.java
   └── IntegrationExtService.java
   └── IntegrationExtServiceImpl.java
├── controller/
   └── IntegrationController.java
└── Application.java

1️⃣ Define the Actor – JpaIntegrationActor

Vanilla Spring – Separate AkkaConfig.java (WAR file (.war)) Spring Boot – Application.java with Embedded ActorSystem (Runnable JAR file (.jar))
📦 Dependencies and Build
<!-- Akka Actor -->
<dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-actor_2.11</artifactId>
    <version>2.4.17</version>
</dependency>
<!--  -->
<dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-persistence_2.11</artifactId>
    <version>2.4.17</version>
</dependency>
<!-- Akka Actor -->
<dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-actor_2.12</artifactId>
    <version>2.5.32</version>
</dependency>
<!-- Spring-Akka Integration -->
<dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-spring</artifactId>
    <version>2.5.32</version>
</dependency>
Actor Definition
@Component("integrationActorObj")
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) // A new actor instance per usage
public class JpaIntegrationActor extends akka.actor.UntypedActor {
    @Value("${integration.enabled:false}")
    private boolean integrationEnabled;
    @Override
    public void onReceive(Object message) throws Throwable {
        if (integrationEnabled && message instanceof IntegrationDataMsg) {
            IntegrationDataMsg data = (IntegrationDataMsg) message;
            // Perform your integration logic here (e.g., call REST API, save to DB)
            System.out.println("Received integration message: " + data);
        } else {
            unhandled(message);
        }
    }
}
@Component("integrationActor")
public class JpaIntegrationActor extends akka.actor.AbstractActor {
    @Override
    public Receive createReceive() {
        return receiveBuilder()
                .match(IntegrationDataMsg.class, msg -> {
                    // Process the message
                })
                .build();
    }
}

2️⃣ Enable Spring-managed Actor Creation – SpringActorProducer

package com.example.integration.config;

import akka.actor.Actor;
import akka.actor.IndirectActorProducer;
import org.springframework.context.ApplicationContext;

public class SpringActorProducer implements IndirectActorProducer {

    private final ApplicationContext applicationContext;
    private final String beanActorName;

    public SpringActorProducer(ApplicationContext applicationContext, String beanActorName) {
        this.applicationContext = applicationContext;
        this.beanActorName = beanActorName;
    }

    @Override
    public Actor produce() {
        return (Actor) applicationContext.getBean(beanActorName);
    }

    @Override
    public Class<? extends Actor> actorClass() {
        return (Class<? extends Actor>) applicationContext.getType(beanActorName);
    }
}

3️⃣ Akka Extension to Access Spring Context – SpringExtension

package com.example.integration.config;

import akka.actor.AbstractExtensionId;
import akka.actor.ExtendedActorSystem;
import akka.actor.Extension;
import akka.actor.Props;
import akka.routing.RoundRobinPool;
import org.springframework.context.ApplicationContext;

public class SpringExtension extends AbstractExtensionId<SpringExtension.SpringExt> {

    public static final SpringExtension SPRING_EXTENSION_PROVIDER = new SpringExtension();

    @Override
    public SpringExt createExtension(ExtendedActorSystem system) {
        return new SpringExt();
    }

    public static class SpringExt implements Extension {

        private volatile ApplicationContext applicationContext;

        public void initialize(ApplicationContext applicationContext) {
            this.applicationContext = applicationContext;
        }

        public Props props(String actorBeanName) {
            return new RoundRobinPool(1).props(
                Props.create(SpringActorProducer.class, applicationContext, actorBeanName)
            );
        }

        public Props txnProps(String actorBeanName) {
            return new RoundRobinPool(1).props(
                Props.create(SpringActorProducer.class, applicationContext, actorBeanName)
            );
        }
    }
}

4️⃣ Integration Message Class – IntegrationDataMsg

package com.example.integration.model;

import lombok.AllArgsConstructor;
import lombok.Data;
import java.util.Map;

@Data
@AllArgsConstructor
public class IntegrationDataMsg {
    private String message;
    private Map<String, String> properties;
}

5️⃣ Actor Wrapper Service – IntegrationExtService

package com.example.integration.service;

import akka.actor.ActorRef;

public interface IntegrationExtService {
    ActorRef getActor();
}

6️⃣ Actor Initialization Service – IntegrationExtServiceImpl

package com.example.integration.service;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import com.example.integration.config.SpringExtension;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import static com.example.integration.config.SpringExtension.SPRING_EXTENSION_PROVIDER;

@Service
public class IntegrationExtServiceImpl implements IntegrationExtService {

    @Autowired
    private ActorSystem actorSystemRef;

    private static ActorRef actor;

    @Override
    public ActorRef getActor() {
        if (actor == null) {
            actor = actorSystemRef.actorOf(
                SPRING_EXTENSION_PROVIDER.get(actorSystemRef).props("integrationActorObj"),
                "integrationActor"
            );
        }
        return actor;
    }
}

7️⃣ Send Messages Asynchronously – IntegrationServiceImpl

package com.example.integration.service;

import com.example.integration.model.IntegrationDataMsg;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.Map;

@Service
public class IntegrationServiceImpl {

    @Autowired
    private IntegrationExtService integrationExtService;

    public void sendIntegrationMessage(String msg, Map<String, String> properties) {
        IntegrationDataMsg integrationData = new IntegrationDataMsg(msg, properties);
        integrationExtService.getActor().tell(integrationData, null); // Asynchronous fire-and-forget
    }
}

🔁 Setup and Configuration

The AkkaConfig class is essential — it bootstraps the Akka ActorSystem within the Spring context and enables the Spring-Akka integration using the custom SpringExtension.

✅ Vanilla Spring vs Spring Boot (with Akka Actor Integration)

Vanilla Spring – Separate AkkaConfig.java (WAR file (.war)) Spring Boot – Application.java with Embedded ActorSystem (Runnable JAR file (.jar))
@Configuration
@ComponentScan(basePackages = "com.example")
public class AkkaConfig {
    @Autowired
    private ApplicationContext applicationContext;
    @Bean
    public ActorSystem actorSystem() {
        ActorSystem system = ActorSystem.create("IntegrationActorSystem");
        SpringExtension.SPRING_EXTENSION_PROVIDER.get(system).initialize(applicationContext);
        return system;
    }
}
@SpringBootApplication
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
    @Bean
    public ActorSystem actorSystem(ApplicationContext context) {
        ActorSystem system = ActorSystem.create("IntegrationActorSystem");
        SpringExtension.SPRING_EXTENSION_PROVIDER.get(system).initialize(context);
        return system;
    }
}
      
Development Speed
⏳ Slower: more config, less convention ⚡ Faster: convention over config
Unit Testing Ease
✅ Good but requires manual config or mocks ✅ Easier with @SpringBootTest, profiles, autoconfig
🚀 Running the Application
* You need to deploy to an external servlet container (like Tomcat or Jetty).
* More common in legacy or enterprise WAR deployments.
* Runs as a standalone application with embedded server.
* Just run the main() method or use `mvn spring-boot:run`.
✅ Recommendation
✅ If you use Spring Boot, move the `@Bean` to your `main @SpringBootApplication class` (Application.java). ✅ If you use vanilla Spring, keep the separate `AkkaConfig class`.

Controller

@RestController
@RequestMapping("/integration")
public class IntegrationController {

    @Autowired
    private IntegrationService integrationService;

    @PostMapping("/send")
    public ResponseEntity<String> sendMessage(@RequestBody String message) {
        integrationService.sendIntegrationMessage(message);
        return ResponseEntity.ok("Message sent to actor");
    }
}
⚠️ **GitHub.com Fallback** ⚠️