Event Driven and Reactive Architecture

This article is a follow up from previous which explored traditional multi-threading to achieve concurrency and its challenges. As the name suggests, here we dive into Event Driven and Reactive Architecture as an alternative.

Going Event Driven

event-loop

Event driven architectures are at high level based on –

  • Event Loop
  • Events
  • Event Handlers

Event loops accept, maintain and manage events destined for defined Handlers. The events are passed on to the respective Handler where it is processed. The handler may react to the event successfully or with a failure. A failure is also passed to the event loop as just another event. The handler for the exception (shown as Exception Handler) decides to react accordingly. Event Loops in Java normally execute off a Thread Pool.  More complex systems could be organized in multiple event loops communicating with each other through – you guessed it! Events!!

One more concept that needs to be considered in this architecture is handling Backpressure. In case of event emitter (the Event Loop) and event consumer (Handlers), it is a possible to get into a situation where the events are emitted at a rate higher than what the consumer can consume and process. For example, the Event loop could be emitting 100 events per second for a handler which can only process 10 per second. Backpressure is a concept where the handler lets the emitter known about its capability and emitter (or some mediator) makes sure the events are sent to the handler based on the processing ability.

Now, how do we go about developing this system? Also, this seems like a common pattern, isn’t there a framework implements this?

As it turns out, whenever (good) developers start getting this feeling, there is a desired framework round the corner.

Introducing Akka

akka-analogy

We, the people, have been carrying out lot of complex tasks in collaboration for a long time now. Consider, big organizations for example; they are run (relatively) seamlessly, based on certain practices and methods.

The diagram above shows how a specific task, for instance, creating a movie scene could be carried out by people collaboratively.

We see various types of people involved – An actress, 2 actors (a crazy one and a dumb one!), a director, cameraman, writer and a production house. Now, the first trigger that could happen is writer informing the production house that script for the scene is ready. The production house in turn tells the director to start shooting the scene. Subsequently, the director informs various people involved to get ready for the scene. Now, at this point, various things can happen. The dumb actor might be lagging on processing previous scripts and may ask the director to wait! So as a solution, the director provides all actors and the cameraman a box to keep the scripts until the dumb actor is ready. The scripts need to be maintained in sequence and there could be a specialist to maintain scripts for all the stakeholders.

When eventually the dumb actor is ready to resume, the crazy actor (or a perfectionist as some people call them) says he can’t do the scene, because its not up to his standards! Now this puts the director in a situation, which he may resolve by trying to convince the actor a few times. Or if that’s not working he may just escalate the situation to the production house. Eventually, the situation is brought under control and everyone decides to resume and completes the scene.

The long story gives an idea of how people coordinate things – Everyone just knows what task they are supposed to do, and whom they should delegate the sub-tasks. They also know, how to handle the situation when the delegate fails to finish the task. They either handle it themselves or escalate higher in the order.

Akka works on this analogy to simplify and implement concurrency in an event driven and reactive manner.

Akka, what is involved

Akka ecosystem has various components. For the direct usage interface it defines following entities –

  1. Actors and Actor Model – Actors are the basic processing unit for Akka. Each actor knows how to process an event (or more than one events) and also is responsible for emitting events for child actors to process sub-tasks. The relationship between various actors thus forms an Actor Model.
  2. Supervision strategy – Akka gives first class importance to fault tolerance. This goes a long way in creating resilient applications. Each actor has a supervision strategy by which it can control the behavior in situations when a child actor fails. A note to show how naturally the concepts fit into Akka implementation – when we say an actor ‘has a’ supervision strategy, its actually implemented as a composition in the actor! We will soon get into the details…
  3. Actor System: Actor system is responsible for creating and initiating actors.
  4. Routers: Routers provide optimized message processing by efficiently receiving messages and passing them on to routees. Routers usually provide better performance over handling message processing directly in the router by providing concurrent pipeline instead of single threaded processing provided in the actor.

In the background akka uses following components for processing –

  1. Message Dispatchers: Dispatchers are core engine for akka. They have access to the message queue, mailboxes for actors and the thread pool for actor execution.
  2. Actor Mailboxes: Each actor is associated with a mailbox which maintains messages in-line to be processed by actor. This is also the way through which akka handles Backpressure.
  3. Message Queue: This is per message dispatcher queue for all the messages dispatched to the Message Dispatcher.

Here is how the whole system works –

akka-internals

  1. The process is initiated by a sender (usually an Actor itself) calling actorOf() method on ActorSystem.
  2. The ActorSystem creates a reference to the Actor. Note that, the actual Actor instance may not be created and instantiated at this time. ActorRef acts as a Proxy to the actual Actor and provides and interface to communicate with Actor.
  3. The sender Actor then tells the receiving Actor about the event.
  4. The ActorRef in turn dispatches the event on MessageDispatcher.
  5. MessageDispatcher enques the event on global (to the dispatcher) MessageQueue.
  6. MessageDispatcher also finds the Mailbox for receiving Actor and executes it on the available ThreadPool.
  7. Mailbox eventually schedules the task on the Actor.

In comparison to traditional multi-threaded concurrency, the Actor execution starts as a reaction to an event destined for it instead of starting the execution and waiting on a condition. This inverts the dependency making it driven by events.

Now lets go back to the use case we started in previous article and see how it has changed.

use-case-2 use-case-with-akka

The basic difference to notice here as mentioned above is the inversion of arrows. The system is driven by events signaled after each actor finishes the task.

Lets zoom into two cases to see how the Actor model is implemented –

  1. Create/Launce instance task has to wait for only one signal/event to start instance creation.
  2. Assign Key Pair task can only start assignment after it has received two events – Key Pair Creation event and Instance Creation event.
Here is the snippet of how create/launch instance actor could be implemented –
public class LaunchInstanceActor extends UntypedActor {
    @Inject
    private InstanceServices instanceServices;
    ...
    public void onReceive(Object event) throws Exception {
        if (event instanceof LaunchInstance) {
            LaunchInstance launchInstanceRequest = (LaunchInstance) event;
            Instance instance = instanceServices.launchInstance(launchInstanceRequest);
            actorOf(KeyPairAssignmentActor.class).tell(new InstanceLaunched(instance), getSelf());
            ...
            ...
        } else
            unhandled(event);
        }
    }
}

Here is what’s happening in the code above –

  1. The LaunchInstanceActor extends from UntypedActor. UntypedActor is a class provided by Akka which provides basic functionality for Actors and a hook method (template method) onReceive() that the deriving Actor has to implement.
  • onReceive() method gets the event passed as an argument.
  • The LaunchInstanceActor in onReceive() method checks if it can process the event by checking if the event is of type LaunchInstance. This could be any condition. If true, it continues processing otherwise just marks it unhandled and leaves it up to akka (UntypedActor) to handle.
  • As an implementation it calls instanceServices.launchInstance() to actually launch the instance.
  • The instanceService is @injected in the actor as dependency by normal dependency injection mechanism. This is just a choice available if a dependency injection framework is available. This also makes the actors and with dependencies better suitable for testing.
  • Finally, the actor tells another actor, KeyPairAssignementActor about the instance being launched.

As mentioned earlier, this is a very straightforward way for an Actor to react to supported event and spawn relevant actors. The second case, reacting to multiple events is little more complex –

reacting to multiple events before resuming the actual processing –
public class KeyPairAssignmentActor extends AbstractActor {
    private PartialFunction<Object, BoxedUnit> readyWithKeyPair;
    private PartialFunction<Object, BoxedUnit> instanceLaunched;
    @Inject
    private KeyPairServices keyPairServices;
    public KeyPairAssignmentActor() {
       //Constructor, define ReceiveBuilder  
       receive(ReceiveBuilder.
              match(KeyPairAvailable.class, s -> {
                 context().become(readyWithKeyPair);
              }).
              match(InstanceLaunched.class, s -> {
                 context().become(instanceLaunched);
              }).build();
       instanceLaunched = ReceiveBuilder.
              match(InstanceLaunched.class, s -> {
                 //Ignore or do something else
              }).
              match(KeyPairAvailable.class, s -> {
                 //Instance was already launched, now assignKeyPair
                 KeyPairServices.assignKeyPair(s);
                 ...
              }).build();
       readyWithKeyPair = ReceiveBuilder.
              match(KeyPairAvailable.class, s -> {
                 //ignore or do something else
              }).
              match(InstanceLaunched.class, s -> {
                 
                 //Key pair was already available, now assign Key Pair
                 KeyPairServices.assignKeyPair(s);
                 ...
              }).build();
    );
    //Constructor finished
    }
}

What we are doing in the code above is defining a State Machine which transforms through the states based on the events received. Here is how the state transformation happens –

  • Initial State : If an event KeyPairAvailable is received, the actor just goes into state readyWithKeyPair. Conversly, if event InstanceLaunched is received, the actor goes into instanceLaunched state.
  • instanceLaunched state : In this state if actor receives InstanceLaunched event, it ignores that, because the event has already occurred cuasing the actor go in current, instanceLaunched state. However, if an event KeyPairAvailable is received, the actor determines it has received both necessary events and proceeds key pair assignment by calling assignKeyPair() on KeyPairServices
  • readyWithKeyPair state : The actor behaves exactly opposite to instanceLaunched state but follows similar assumptions before proceeding with actual key pair assignments.

And that’s it! Go over the example code again if you feel a little dizzy until you get hang of it.

Alright, what about failures?

As mentioned earlier, akka gives failures and fault tolerance first class importance. Any Actor can define a failure handling strategy, which akka calls SupervisionStrategy for its children actor. Here is how our LaunchInstanceActor might define the strategy for reacting to various failures.

public class LaunchClusterActor extends UntypedActor {
   ...
   private static SupervisorStrategy supervisorStrategy = new OneForOneStrategy(5, Duration.create("1 minute"),
   failureCause -> {
       if (failureCause instanceof EC2ServiceUnavailableException) {
          return restart();
       } else if (failureCause instanceof RuntimeException) {
          return stop();
       } else {
          return escalate();
       }});
   ...
   ...
}

LaunchInstanceActor above has defined OneForOneStrategy where it defines –

  • If the failureCause is EC2ServiceUnavailableException, restart() the child actor that failed to try again.
  • If the cause is a RuntimeException though, stop() the child actor that failed.
  • In case of any other cause, just escalate() it to the parent actor!

Note that the OneForOneStrategy also takes two more parameters. First parameter (with value 5) defines, how many times the strategy should be employed. This provides out-of the box Circuit Breaker facility for handling failures. The Second parameter is the duration after which the retry will happen.

Akka also supports AllForOneStrategy which basically directs action applicable to all child actors spawned by current actor.

To be used in cases where the ensemble of children has such tight dependencies among them, that a failure of one child affects the function of the others. Default strategy is OneForOneStrategy

Testing Akka

Non testable frameworks/patterns/implementations smell of bad practices.

Akka provides a class JavaTestKit ( for Java ) – Inheriting from this class enables reception of replies from actors, which are queued by an internal actor and can be examined using the expectMsg… methods. Assertions and bounds concerning timing are available in the form of Within blocks

Apart from above other testing strategies remain similar to normal Java objects. Akka Actors can be considered common Java objects from that perspective.

For actors with dependencies on external services such as DB or external APIs, use DI with factory for different implementations for tests. This is a normal pattern which works with akka as well.

Summarizing the benefits of Akka

Akka really simplifies dealing with concurrency and makes it easy to reason about. It shines from various perspectives –

  1. Actors are managed by Akka, they only process one message at a time so they can have state without worrying much about multi-threaded access.
  2. Actor models create a very decoupled system. The Actors act as simplified workers.
  3. Akka provides fault tolerance as first class behavior
  4. Out of the box handling for Backpressure
  5. Actors are lightweight. Akka claims to support ~2.5 million actors per GB of heap, which in modern Java applications is fairly reasonable to assume.
  6. Although not in the scope of this article, actor systems can be distributed fairly easily.
  7. Testable

Pretty interesting stuff really! I am sure you have heard though –

“With great power comes great responsibility”

Some Caveats/things to remember

In process of providing the simplicity, akka makes certain assumptions and expects developers to be responsible enough to follow certain conventions –

  • Beware of blocking, use alternatives:
    This is not really specific to Akka, but to any event driven systems work efficiently its imperative that your code doesn’t block for long time usually doing I/O. The async capabilities are available for many services nowadays so make sure you utilize them. Fow examples Mongo, Cassandra, Postgre all provide async reads/writes. There are also HTTP clients available to communicate over the network asynchronously.
    If you still need to block for some reason, make sure you isolate the MessageDispatcher for such actors so that the impact is limited to underlying thread pool.
  • Immutability:
    Akka events/messages are basic form of communication between actors that are executed on different threads. To ensure state consistency, they must be kept immutable. Java doesn’t provide an assertion or checks on immutability, so it remains developer’s responsibility.
    A typical scenario to keep in mind is also when Actors use asynchronous processing internally to achieve certain functionality. Since Actors themselves can have state, measures need to be taken not to share this state with threads executing the asynchronous functionality. Here is a very simple example to clarify –
private String actorLocalState;
Future<Void> someFuture = future(() -> {
   actorLocalState = “a change”;
   return null;
}, getContext().dispatcher());

someFuture.onSuccess(new OnSuccess<Void>(){
   @Override
   public void onSuccess(Void aVoid) throws Throwable {
   }
}, getContext().dispatcher());

someFuture.onFailure(new OnFailure(){
   @Override
   public void onFailure(Throwable throwable) throws Throwable {
   }
}, getContext().dispatcher());

This is a snippet from an Actor which maintains an internal state variable actorLocalState. It also defines a future (Akka feature to implement asynchronous behavior) which internally modifies the state. Since there is no guarantee that the asynchronous code will be executed on the same thread, this could possibly lead to a situation where different threads are modifying the actorLocalState. Developers need to beware of using the async execution this way. Nevertheless, the code above illustrates another cool akka utility for executing code asynchronously.

You might ask – this is great, but, who is building a PAAS everyday?

With CPUs getting faster, and number of CPUs getting more, concurrency and parallel computing has become critical to building responsive applications. With the simplicity provided by akka, its really easy to visualize any application use case with Actor Model and use concurrency during development.

I am going to end the discussion about akka with actor models for two use cases which are not PAAS applications.

use-case-2

use-case-3

I am going to leave you with mention of two more frameworks to build reactive frameworks, just to point out that akka is not the only framework to build event driven and reactive applications.

Reactive Extensions: Very Popular framework provides libraries for building reactive applications in many languages including

RxJava

RxScala

RxJS

Others

Spring Reactor for Java: A framework by spring.io for reactive programming

Reference: Akka – http://akka.io/

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s