Thursday, August 26, 2010

EDA Akka as EventBus

Some time since last entry in the EDA-sequence, but here we are again. Today we are going to do something really interesting. A friend and colleague of us, Jonas Bonér, is the creator of a super interesting framework called Akka.
Akka is an event driven platform for constructing highly scalable and fault tolerant applications. It is built with Scala, but also have a rich API for java. It follows the Actor Model and together with Software Transactional Memory (STM), it raises the abstraction level and provides an easy to use tool for building highly concurrent applications.
So, today we are going to take advantage of the java API in Akka to do our own EventBus implementation.

First, update your pom with the stuff needed for Akka (repo's and dependency):
<repository>
<id>Akka</id>
<name>Akka Maven2 Repository</name>
<url>http://www.scalablesolutions.se/akka/repository/ </url>
</repository>

<repository>
<id>Multiverse</id>
<name>Multiverse Maven2 Repository</name>
<url>http://multiverse.googlecode.com/svn/maven-repository/releases/</url>
</repository>

<repository>
<id>GuiceyFruit</id>
<name>GuiceyFruit Maven2 Repository</name>
<url>http://guiceyfruit.googlecode.com/svn/repo/releases/ </url>
</repository>

<repository>
<id>JBoss</id>
<name>JBoss Maven2 Repository</name>
<url>https://repository.jboss.org/nexus/content/groups/public/ </url>
</repository>

...

<dependency>
<groupId>se.scalablesolutions.akka</groupId>
<artifactId>akka-core_2.8.0</artifactId>
<version>0.10</version>
</dependency>


Second, create your implementation of the bus, AkkaEventBus.java:


package org.foo;

import org.fornax.cartridges.sculptor.framework.event.Event;
import org.fornax.cartridges.sculptor.framework.event.EventBus;
import org.fornax.cartridges.sculptor.framework.event.EventSubscriber;

import se.scalablesolutions.akka.actor.ActorRef;
import se.scalablesolutions.akka.actor.ActorRegistry;
import se.scalablesolutions.akka.actor.UntypedActor;
import se.scalablesolutions.akka.actor.UntypedActorFactory;

public class AkkaEventBus implements EventBus {

public boolean subscribe(final String topic, final EventSubscriber subscriber) {
UntypedActor.actorOf(new UntypedActorFactory() {
public UntypedActor create() {
return new ActorListener(topic, subscriber);
}
}).start();

return true;
}

public boolean unsubscribe(String topic, EventSubscriber subscriber) {
// TODO : implement mapping between arbitrary subscriber and actor in registry
return true;
}

public boolean publish(String topic, Event event) {
ActorRef[] actorsForTopic = ActorRegistry.actorsFor(topic);
for (int i = 0; i < actorsForTopic.length; i++) {
actorsForTopic[i].sendOneWay(event);
}
return true;
}

@SuppressWarnings("unchecked")
private class ActorListener extends UntypedActor {
final String topic;
final EventSubscriber subscriber;

ActorListener(String topic, EventSubscriber subscriber) {
this.topic = topic;
this.subscriber = subscriber;
this.getContext().setId(topic);
}

@Override
public void onReceive(Object message) throws Exception {
this.subscriber.receive((Event) message);
}

}
}

As you can see, I lack the unsubscribe implementation and I left out equals and hashCode overrides, but that I leave to you.
Third, add it as our bus implementation through spring config:


<bean id="akkaEventBus" class="org.foo.AkkaEventBus"/>
<alias name="akkaEventBus" alias="eventBus"/>


What we have done now is built an highly scalable and concurrent EventBus that dispatches event asynchronously.
Pretty easy, right? :-)

It doesn't run over the network, but Akka has some nice modules for that as well, so that is our task for next time.

No comments:

Post a Comment