Thursday, August 26, 2010

EDA Akka as EventBus

Some time since last entry in the EDA-sequence, but here we are again. Today we are going to do something really interesting. A friend and colleague of us, Jonas Bonér, is the creator of a super interesting framework called Akka.
Akka is an event driven platform for constructing highly scalable and fault tolerant applications. It is built with Scala, but also have a rich API for java. It follows the Actor Model and together with Software Transactional Memory (STM), it raises the abstraction level and provides an easy to use tool for building highly concurrent applications.
So, today we are going to take advantage of the java API in Akka to do our own EventBus implementation.

First, update your pom with the stuff needed for Akka (repo's and dependency):
<repository>
<id>Akka</id>
<name>Akka Maven2 Repository</name>
<url>http://www.scalablesolutions.se/akka/repository/ </url>
</repository>

<repository>
<id>Multiverse</id>
<name>Multiverse Maven2 Repository</name>
<url>http://multiverse.googlecode.com/svn/maven-repository/releases/</url>
</repository>

<repository>
<id>GuiceyFruit</id>
<name>GuiceyFruit Maven2 Repository</name>
<url>http://guiceyfruit.googlecode.com/svn/repo/releases/ </url>
</repository>

<repository>
<id>JBoss</id>
<name>JBoss Maven2 Repository</name>
<url>https://repository.jboss.org/nexus/content/groups/public/ </url>
</repository>

...

<dependency>
<groupId>se.scalablesolutions.akka</groupId>
<artifactId>akka-core_2.8.0</artifactId>
<version>0.10</version>
</dependency>


Second, create your implementation of the bus, AkkaEventBus.java:


package org.foo;

import org.fornax.cartridges.sculptor.framework.event.Event;
import org.fornax.cartridges.sculptor.framework.event.EventBus;
import org.fornax.cartridges.sculptor.framework.event.EventSubscriber;

import se.scalablesolutions.akka.actor.ActorRef;
import se.scalablesolutions.akka.actor.ActorRegistry;
import se.scalablesolutions.akka.actor.UntypedActor;
import se.scalablesolutions.akka.actor.UntypedActorFactory;

public class AkkaEventBus implements EventBus {

public boolean subscribe(final String topic, final EventSubscriber subscriber) {
UntypedActor.actorOf(new UntypedActorFactory() {
public UntypedActor create() {
return new ActorListener(topic, subscriber);
}
}).start();

return true;
}

public boolean unsubscribe(String topic, EventSubscriber subscriber) {
// TODO : implement mapping between arbitrary subscriber and actor in registry
return true;
}

public boolean publish(String topic, Event event) {
ActorRef[] actorsForTopic = ActorRegistry.actorsFor(topic);
for (int i = 0; i < actorsForTopic.length; i++) {
actorsForTopic[i].sendOneWay(event);
}
return true;
}

@SuppressWarnings("unchecked")
private class ActorListener extends UntypedActor {
final String topic;
final EventSubscriber subscriber;

ActorListener(String topic, EventSubscriber subscriber) {
this.topic = topic;
this.subscriber = subscriber;
this.getContext().setId(topic);
}

@Override
public void onReceive(Object message) throws Exception {
this.subscriber.receive((Event) message);
}

}
}

As you can see, I lack the unsubscribe implementation and I left out equals and hashCode overrides, but that I leave to you.
Third, add it as our bus implementation through spring config:


<bean id="akkaEventBus" class="org.foo.AkkaEventBus"/>
<alias name="akkaEventBus" alias="eventBus"/>


What we have done now is built an highly scalable and concurrent EventBus that dispatches event asynchronously.
Pretty easy, right? :-)

It doesn't run over the network, but Akka has some nice modules for that as well, so that is our task for next time.

Wednesday, August 11, 2010

EDA events over system boundaries - with camel

Last time we examined how we could make our events travel over system boundaries with the help of JMS and Spring-integration.
Today we will do exactly the same but we will use Apache Camel as event engine instead. The task is to enable application domain events over system boundaries through JMS. If you don't remember our model, here it is again:
Application Universe {
basePackage=org.helloworld

Module milkyway {
Service PlanetService {
@BigLandingSuccess landOnPlanet(String planetName, String astronautName)
publish to milkywayChannel;
}

DomainEvent BigLandingSuccess {
String planetName
String astronautName
}
}
Module houston {
Service GroundControlService {
subscribe to milkywayChannel
bringOutTheChampagne(int noOfBottles);
}
}
}
To use Apache Camel as event engine, you need to specify it in the sculptor-generator.properties file:
integration.product=camel
And add the needed dependencies to the projects pom-file:

<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-core</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-jms</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-spring</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-camel</artifactId>
<version>5.3.2</version>
</dependency>
<!-- xbean is required for ActiveMQ broker configuration in the spring xml file -->
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>3.7</version>
</dependency>
<dependency>
<groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>
<version>2.1</version>
</dependency>
Regenerate and you will have a new camel.xml file in src/main/resources. This file is only generated once, and you can use it to add configuration for Camel.
To do the same as we did with spring-integration, i.e. put a domain event on a jms topic, we just have to add route rule to the camel.xml file:

<camel:route>
<camel:from uri="direct:shippingChannel"/>
<camel:to uri="jms:topic:shippingEvent"/>
</camel:route>

And that's it. Now we are publishing our domain event to an ActiveMQ topic. Yes, a bit strange that it works, but it gets clearer if we look in camel.xml and see whats was already there:

<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:camel="http://camel.apache.org/schema/spring" xmlns:broker="http://activemq.apache.org/schema/core" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<bean id="camelEventBusImpl" class="org.fornax.cartridges.sculptor.framework.event.CamelEventBusImpl"/>
<alias name="camelEventBusImpl" alias="eventBus"/>

<camel:camelContext id="camel">
<camel:package>org.sculptor.shipping</camel:package>
<camel:template id="producerTemplate"/>
</camel:camelContext>
<!--
Camel ActiveMQ to use the ActiveMQ broker
-->
<bean id="jms" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="brokerURL" value="tcp://localhost:61616"/>
</bean>
</beans>

We have a bean that wires the jms api's to the actual ActiveMQ instance together. So you have to have an ActiveMQ instance up and running listening on port 61616.

All for today, next time...we'll see what the subject is...

Monday, August 2, 2010

EDA events over system boundaries

In the last post we explained why we have the 'event bus' notion. And earlier we have seen how to use it to publish and subscribe, both through the dsl in the model, or through plain java code.
Today we thought we should look how you can make domain events part of the public api of your business component and publish them to the rest of the world.
To do this we will use the event bus implementation that is based on spring integration. We will use its JMS outbound channel adapter to publish the 'BigLandingSuccess' domain event to a jms topic that the rest of the world can listen to.
If you don't remember, here is the model:

Application Universe {
basePackage=org.helloworld

Module milkyway {
Service PlanetService {
@BigLandingSuccess landOnPlanet(String planetName, String astronautName)
publish to milkywayChannel;
}

DomainEvent BigLandingSuccess {
String planetName
String astronautName
}
}
Module houston {
Service GroundControlService {
subscribe to milkywayChannel
bringOutTheChampagne(int noOfBottles);
}
}
}

But first, how do we replace the default event bus implementation with the spring-integration based?
In sculptor-generator.properties, add the property:
integration.product=spring-integration

And in the project pom file, add the dependency:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>org.springframework.integration</artifactId>
<version>1.0.4.RELEASE</version>
</dependency>
Regenerate, and you are home. So, now all event routing will use spring-integration as engine. So without any other changes, it works exactly as before.
Now you have to decide how your public api should look like, and in our case we decides that the 'BigLandingSuccess' domain event should be made available for other applications to listen to.
And now we take advantage of the power of spring-integration. When we regenerated with spring-integration enabled, we ended up with the file:
src/main/resources/spring-integration.xml
This is the place where all spring-integration definitions lands. It is generated once, so you are now free to edit it. Open the file and add:
<jms:outbound-channel-adapter id="publicMilkywayChannel"  destination="publicMilkywayTopic" channel="milkywayChannel"/>
You will need a spring jms bean named 'publicMilkywayTopic' and that bean needs to map to an actual message broker instance, for example ActiveMQ. But the spring bean and message broker setup is out scope for this blog entry, so that one we leave to you.
The only caveat now is that we are exposing our java objects in the message. This can be cured with a transformation step before the jms adapter. So add a transformer to your spring-integration configuration:
<object-to-string-transformer input-channel="milkywayChannel" output-channel="milkywayMessagesAsStringChannel"/>
And change the jms outbound channel to wire up with the new channel:
<jms:outbound-channel-adapter id="publicMilkywayChannel"  destination="publicMilkywayTopic" channel="milkywayMessagesAsStringChannel"/>
Spring-integration has a lot of transformers you can use. The above just transforms a java object to a string through its toString method. That is of course not always what you want, but it works for this example.

So that was all that had to be done to make a domain event part of your public api. Of course, you need to have proper documentation to give others a fair chance of finding your event.

If we take one step back and consider what we have done.
First, we declared the 'BigLandingSuccess' domain event and publishes it internally.
Second, we add an adapter that listens to the channel and in its turn publishes it on a jms topic, i.e. makes it public.
Third, we added a transformation step before doing a public publish to remove the dependency to our classes.

Now, this is nice, isn't it? We can have a lot of domain events internally in our application and by that take advantage of all the nice attributes of EDA. And we can by choice make a domain event public with 'just' configuration changes.

Quite long post, but we got pretty much done. Next time we will look how to do the same with Apache Camel

Sunday, August 1, 2010

EDA why the event bus in sculptor?

We have come to our seventh entry about Event Driven Architecture. We are in the topic of how Sculptor supports EDA. In previous posts we have covered how to publish and subscribe, both through the dsl in the model, and through plain java code.
Today we will talk a bit more on the thin layer we call 'the event bus'.

As stated before, there are a lot of different approaches to EDA. You can use/implement it locally or apply it too the entire enterprise. But also, in its simplest form its about the observer pattern, regardless if we talk about big or small implementations. This leads us to the main motivations of our event bus abstraction.
We want to keep it simple, but at the same time still have the power of doing event handling big and small. In the Enterprise, or locally. And doing this with the same programming interfaces, i.e. keeping it simple.
So, based on the above we have the event bus api. And with the risk of repeating my self, it is a very simple one. Methods for publishing, subscribing, and un-subscribing.

It also ships with a default implementation named (you got it) simple event bus. This one is really easy (hey, I found another word instead of simple) to use and fully functional on its own. Though, if you need to integrate with another system you are going to need another implementation.
And that is one of our other motivation for the event bus, it should be easy to swap implementations.
Beside the default, we currently have two implementations based on Spring-integration and Apache Camel. Both of these are what's called "lightweight integration frameworks". Supporting these two frameworks brings a lot of power to the solution when it comes to integration.
And that brings us to the next topic of this series. Some examples of how to bring your events to life over system boundaries, i.e. integration stuff.