Friday, October 29, 2010

Event Sourcing with Sculptor - Snapshots

Yesterday I described how events can be used as storage mechanism. Instead of storing current state we store each change as a Domain Event. Current state is reconstructed by loading and replaying all historical events. For Entities with a long life cycle it can be too many events. This can be solved with an optimization that is based on periodically storing a rolling snapshot of current state.

Let us look at the code in the Sculptor port of the Simple CQRS Example to understand what this means in practice.

When loading InventoryItem we start by applying latest snapshot, if any, and thereafter replaying the events after the snapshot. This is the code in the Repository:


@Override
public InventoryItem findByKey(String itemId)
throws InventoryItemNotFoundException {
InventoryItem result = super.findByKey(itemId);

loadFromHistory(result);

return result;
}

private void loadFromHistory(InventoryItem entity) {
InventoryItemSnapshot snapshot = getInventoryItemSnapshotRepository()
.getLatestSnapshot(entity.getItemId());
entity.applySnapshot(snapshot);
long snapshotVersion = snapshot == null ? 0 : snapshot.getVersion();

List<InventoryItemEvent> history = getInventoryItemEventRepository()
.findAllAfter(entity.getItemId(), snapshotVersion);
entity.loadFromHistory(history);
}



That is how the snapshots are used when loading InventoryItem. Let us see how they are saved. We define the Snapshot Value Object for InventoryItem like this


ValueObject InventoryItemSnapshot {
String itemId index
boolean activated
Long version

Repository InventoryItemSnapshotRepository {
@InventoryItemSnapshot getLatestSnapshot(String itemId);
protected findByCondition(PagingParameter pagingParameter);
save;
}
}


Here we store the state as explicit attributes, which is simple with Sculptor, since we got the persistence (to MongoDB or JPA) for free, but it can also be stored as a blob (ecoded as xml, protobuf, or whatever you prefer). In this example the only state is the activated flag, but it can be much more in a real application.

The storage of the snapshot is triggered by a subscriber on the ordinary Domain Event flow. Simply defined as this in the model:

Service InventoryItemSnapshotter {
subscribe to inventoryItemTopic
inject @InventoryItemRepository
inject @InventoryItemSnapshotRepository
}


The implementation calculates how many events has passed since previous snapshot by comparing version numbers. When the delta exceeds a threshold (e.g. 100 events) a snapshot is created and saved.

public void receive(Event event) {
if (!(event instanceof InventoryItemEvent)) {
return;
}

InventoryItemEvent inventoryItemEvent = (InventoryItemEvent) event;
String itemId = inventoryItemEvent.getItemId();

InventoryItemSnapshot snapshot = getInventoryItemSnapshotRepository()
.getLatestSnapshot(itemId);
long snapshotVersion = snapshot == null ? 1 : snapshot.getVersion();
long eventVersion = inventoryItemEvent.getAggregateVersion() == null ? 1
: inventoryItemEvent.getAggregateVersion();
if (eventVersion - snapshotVersion >= VERSION_DELTA) {
takeSnapshot(itemId);
}
}

private void takeSnapshot(String itemId) {
InventoryItem item;
try {
item = getInventoryItemRepository().findByKey(itemId);
} catch (InventoryItemNotFoundException e) {
log.warn("takeSnapshot failed: " + e.getMessage());
return;
}

InventoryItemSnapshot snapshot = item.createSnapshot();
getInventoryItemSnapshotRepository().save(snapshot);
}


By using snapshots we can dramatically improve performance for loading Entities with many historical changes. However, you can always start development without snapshotting and add it later, as a performance enhancement.

Also, note that snapshots and event are immutable and therefore we have great opportunities for using caching for improving performance.

The complete source code for this example is available here: http://github.com/patriknw/sculptor-simplecqrs/

Thursday, October 28, 2010

Event Sourcing with Sculptor

A while a go Greg Young published the Super Simple CQRS Example. There are also some good documents of how to use events as storage mechanism at the CQRS Info Site.

I have implemented the same example with Java and Sculptor. In this post I will describe how the Event Sourcing is implemented. Even though my implementation is facilitated by using Sculptor and MongoDB the design can easily be done with other techniques, such as JPA or plain JDBC, and without Sculptor.

A domain model typically holds current state of the world. Event Sourcing makes it possible to see how we got to this state. Essentially it means that we have to capture all changes to an application state as a sequence of events. These events can be used to reconstruct current and past states.

The sample application is a simplified inventory of items. In line with CQRS it has a strict separation of commands and queries. Changes to the InventoryItem domain object are published as Domain Events. The InventoryItem can be modified with a few commands that I have represented as operations in a Service.


Service InventoryFacade {
inject @InventoryItemRepository
createInventoryItem(String itemId, String name);
deactivateInventoryItem(String itemId);
renameInventoryItem(String itemId, String newName);
checkInItemsToInventory(String itemId, int count);
removeItemsFromInventory(String itemId, int count);
}


All these commands are handled in the same way, except createInventoryItem, which is slightly different. The Service looks up the Entity and calls corresponding method on the domain object.

public void deactivateInventoryItem(String itemId) {
InventoryItem item = tryGetItem(itemId);
item.deactivate();
getInventoryItemRepository().save(item);
}


The Entity performs validation and creates Domain Event for state changes.

public void deactivate() {
if (!isActivated())
throw new IllegalStateException("already deactivated");
applyChange(new InventoryItemDeactivated(new Date(), getItemId()));
}


In this case InventoryItem has a flag (state) indicating that the item is activated or not. Note that this state is not changed directly, but it is changed later as a result of the InventoryItemDeactivated event. All changes are done with Domain Events, which is important, because we don't save current state, we only save the Domain Events. The activated flag is not stored explicitly.

The DomainEvent is applied and added to a list of changes, which later will be stored and published.


private void applyChange(InventoryItemEvent event) {
applyChange(event, true);
}

private void applyChange(InventoryItemEvent event, boolean isNew) {
DynamicMethodDispatcher.dispatch(this, event, "apply");
if (isNew) {
changes.add(event);
} else {
setVersion(event.getAggregateVersion());
}
}

public void apply(InventoryItemDeactivated event) {
setActivated(false);
}


When saving, the InventoryItem instance is saved, but only as a key and version. The version is used for detecting concurrent modifications (optimistic locking). Additionally, the changes, the Domain Events are stored. The version number of InventoryItem is stored in Each Domain Event. An additional sequence number is also used to ensure the correct order of the events.

We need to override the default save method in the Repository to handle these sequence numbers and also save the events.


@Override
public InventoryItem save(InventoryItem entity) {
InventoryItem saved = super.save(entity);

List<InventoryItemEvent> changes = entity.getUncommittedChanges();
changes = applyVersionToChanges(changes, saved.getVersion());
for (InventoryItemEvent each : changes) {
getInventoryItemEventRepository().save(each);
}
entity.markChangesAsCommitted();

return saved;
}

private List<InventoryItemEvent> applyVersionToChanges(
List<InventoryItemEvent> changes, long version) {
List<InventoryItemEvent> result = new ArrayList<InventoryItemEvent>();
long sequence = version * 1000;
for (InventoryItemEvent each : changes) {
result.add(each.withAggregateVersion(version).withChangeSequence(
sequence));
sequence++;
}
return result;
}


When saving the Domain Events they are also published to a topic, which the read side subscribes on. That is handled with the publish/subscribe mechanism in Sculptor. In the model we simply need to specifiy publish on the save method.


abstract DomainEvent InventoryItemEvent {
persistent
String itemId index
Long aggregateVersion nullable
Long changeSequence nullable

Repository InventoryItemEventRepository {
save publish to inventoryItemTopic;
List<@InventoryItemEvent> findAllForItem(String itemId);
protected findByCondition;
}
}

DomainEvent InventoryItemDeactivated extends @InventoryItemEvent {
}


In the read side we add subscribers to this topic.

Service InventoryListView {
subscribe to inventoryItemTopic
inject @InventoryItemListRepository
}

Service InventoryItemDetailView {
subscribe to inventoryItemTopic
inject @InventoryItemDetailsRepository
}


Alright, then we are almost done. One more thing though, when retrieving a InventoryItem we must replay all events to recreate current state. We do that by overriding the default findByKey method in the Repository.


@Override
public InventoryItem findByKey(String itemId)
throws InventoryItemNotFoundException {
InventoryItem result = super.findByKey(itemId);

loadFromHistory(result);

return result;
}

private void loadFromHistory(InventoryItem entity) {
List<InventoryItemEvent> history = getInventoryItemEventRepository()
.findAllForItem(entity.getItemId());
entity.loadFromHistory(history);
}


To retrieve all events we use a simple query in the InventoryItemEventRepository.

public List<InventoryItemEvent> findAllForItem(String itemId) {
List<ConditionalCriteria> criteria = criteriaFor(
InventoryItemEvent.class).withProperty(itemId()).eq(itemId)
.orderBy(changeSequence()).build();
return findByCondition(criteria);
}


The loaded events are applied to the InventoryItem Domain Object.


public void loadFromHistory(List<InventoryItemEvent> history) {
for (InventoryItemEvent each : history) {
applyChange(each, false);
}
}

private void applyChange(InventoryItemEvent event, boolean isNew) {
DynamicMethodDispatcher.dispatch(this, event, "apply");
if (isNew) {
changes.add(event);
} else {
setVersion(event.getAggregateVersion());
}
}

public void apply(InventoryItemCreated event) {
setActivated(true);
}

public void apply(InventoryItemDeactivated event) {
setActivated(false);
}

public void apply(Object other) {
// ignore
}


In this example we have used a naive approach when loading the InventoryItem by replaying all historical events. For Entities with a long life cycle it can be too many events. Then we can use a snapshot technique, which I will describe in a separate blog post.

The complete source code for this example can be found here: http://github.com/patriknw/sculptor-simplecqrs/tree/event_sourcing_without_snapshots

Thursday, October 21, 2010

Switching an existing app to MongoDB

Ron has switched project from using Hibernate/MySQL to MongoDB and wrote notes on the changes and gotchas.
Read it: Switching an existing app to MongoDB