CQRS on Windows Azure – Event Sourcing

0
151

Introduction

Although event sourcing is not a mandatory part of CQRS, and indeed event sourcing is used outside of CQRS the two are often used together.  In this article I will be looking at an implementation that uses Windows Azure Storage (either Tables, Blobs or Files) as the persistence mechanism.

Getting your head around event sourcing 

If you have 45 minutes I recommend the video of this material :-

[embedded content]

For developers schooled in the relational database model, event sourcing can seem to be a very confusing way of doing things. It reverses the existing way of storing data in that instead of storing the current state of objects and updating the state when events occur we store the entire history of events that have occured to an object and use this to derive the current state of the object.

Hopefully the following hints can help clarify:-

  1. Events only get added to the end of the event list (you can conceptualise this as a stack that doesn’t have a pop option)
  2. Events are stored by the thing they occur to rather than the type of event that the are.  For example we don’t have a separate table for “payments” and “standing orders” in a bank account type of system – these are just different events that occur in the life of the bank account.
  3. Events cannot be deleted nor can their content be modified – if you need to “undo” something that has happend a reversal or undo event needs to be added to the event store
  4. Events are described in the past-tense.

However, if you get your head out of the CRUD mindset you will see some benefits

  1. You automatically get a complete audit trail for everything that occured (If you have fields like “Last Modified”, “Updated By” in your database tables and a set of triggers that write to an audit table whenever a field is updated in the database you are already doing this and doing it backwards)
  2. You can query the event history in ways that weren’t anticipated when the system was first created
  3. Your event payload schema can be very flexible – if a particular attribute doesn’t apply to a particular type of event you don’t need to store a “null” there
  4. You can store your evfent streams separately which allows for a highly scalable architecture as your system can scale across machines and even data centres.

Fig 1: Example of an event store for vehicles, using the vehicle registration number as the aggregation identifier

Aside – a quick glossary of terms 

There are a couple of words that can cause confusion when dealing with event sources: Aggregation and Sequence. My (perhaps over-simplicstic) definition is as follows:

The Aggregation is the thing to which events occur. In a bank example, this could be something like a bank account, in a vehicle leasing company it could be the vehicle and so on.

Each aggregation must be uniquely identified. Often a business domain already has unique identifiers you can use but if that is not the case you can create them using

The Sequence is the order in which the events occured – this is almost always implemented as an incremental number (although in the case of a file based stream I use a file pointer for the start of the event).

Architecture

The above whiteboard overview shows where event streams would fit in a CQRS architecture.  Within Azure storage there are a large number of different ways you could store the events underlying the event stream – in my case I have provided implementation for four of these:  SQL , Table, File or AppendBlob.  The implementation is different depending on which of these underlying technologies you choose to use.

1) Using Azure Storage “Tables”  to persist events 

An event can be represented by a CLR class. It is usual to have an empty IEvent interface to indicate the intent of the developer that a given class is an event.

When storing the event we need to add the agrgegation identifier and sequence number – so these two are specified in an interface for storing events:

Public Interface IEventIdentity
 
 Function GetAggregateIdentifier() As String
 
 ReadOnly Property Version As UInteger
 
 ReadOnly Property EventInstance As IEvent
 
End Interface

In this case the aggregate identifier is implemented as a string so that the business can dictate what actual unique identifier to use to for it. For clarity I also added an interface that can be used to set these aggregate identifiers but this is totally optional.

Public Interface IAggregateIdentity
 
 Function GetAggregateIdentifier() As String
 
End Interface

Turning a version into a row identifier

 

Since the version is an incremental number and the azure table takes a string for its row key you need to pad the version numbers with zero so as to store it in a manner that it

will sort correctly

Private Const VERSION_FORMAT As String = "0000000000000000000"
Public Shared Function VersionToRowkey(ByVal version As Long) As String If (version <= 0) Then Return Long.MaxValue.ToString(VERSION_FORMAT) Else Return (Long.MaxValue - version).ToString(VERSION_FORMAT) End If End Function

Saving an event record  

The event record itself (everything except the partition key and row key) can be any kind of .NET class that inherits IEventIdentity.  Because the fields this record can have are dynamic (depending on the event type – recall from above that different event types are stored in the same event store) we have to use a DynamicTableEntity class and fill it with the properties of our event class passed in:

Public Shared Function MakeDynamicTableEntity(ByVal eventToSave As IEventContext) 
 As DynamicTableEntity

 Dim ret As New DynamicTableEntity

 ret.PartitionKey = eventToSave.GetAggregateIdentifier()
 ret.RowKey = VersionToRowkey(eventToSave.Version)

 
 ret.Properties.Add("EventType", 
 New EntityProperty(eventToSave.EventInstance.GetType().Name))

 
 If (eventToSave.SequenceNumber <= 0) Then
 
 ret.Properties.Add("SequenceNumber", 
 New EntityProperty(DateTime.UtcNow.Ticks))
 Else
 ret.Properties.Add("SequenceNumber", 
 New EntityProperty(eventToSave.SequenceNumber))
 End If

 If (Not String.IsNullOrWhiteSpace(eventToSave.Commentary)) Then
 ret.Properties.Add("Commentary", 
 New EntityProperty(eventToSave.Commentary))
 End If



 If (Not String.IsNullOrWhiteSpace(eventToSave.Who)) Then
 ret.Properties.Add("Who", New EntityProperty(eventToSave.Who))
 End If


 If (Not String.IsNullOrWhiteSpace(eventToSave.Source)) Then
 ret.Properties.Add("Source", New EntityProperty(eventToSave.Source))
 End If


 
 For Each pi As System.Reflection.PropertyInfo In eventToSave.EventInstance.GetType().GetProperties()
 If (pi.CanRead) Then
 ret.Properties.Add(pi.Name, MakeEntityProperty(pi, eventToSave.EventInstance))
 End If
 Next pi
 End Function 

Then turning the DynamicTableEntity back into an appropriate event class is a matter of reading the event type, creating an instance of that type and then populating its properties from the DynamicTableEntity.Properties collection by reflection.

Points of Interest 

When using Windows Azure Table Storage, queries that use the partition key and row identifier are very fast. Those that don’t are much slower – therefore mapping these two fields

to the aggregate identifier and sequence number is a very sensible way to start.

2) Using Azure Storage “Append Blob” to persist events 

An append blob is a new, special type of binary large object store in Windows Azure storage which is optimised such that you can only add data onto the end of it. 

The append blob has a maximum size of 195Mb (or 50,000 events) so the usual setup is to create one blob per unique event stream.  This also allows for a very high degree of prallelism.

The additional metadata needed for each event stream (such as the record count) can then be stored using the file metadata for the Azure blob:-

Protected Const METATDATA_DOMAIN As String = "DOMAIN"
Protected Const METADATA_AGGREGATE_CLASS As String = "AGGREGATECLASS"
Protected Const METADATA_SEQUENCE As String = "SEQUENCE"
Protected Const METADATA_RECORD_COUNT As String = "RECORDCOUNT"
Protected Const METADATA_AGGREGATE_KEY As String = "AGGREGATEKEY"

The blob itself is created when the event stream reader or writer class is instantiated : 

Protected Sub New(ByVal AggregateDomainName As String, ByVal AggregateKey As TAggregationKey) MyBase.New(AggregateDomainName) m_key = AggregateKey If (BlobContainer IsNot Nothing) Then m_blob = BlobContainer.GetAppendBlobReference(EventStreamBlobFilename) If Not m_blob.Exists() Then m_blob.CreateOrReplace() m_blob.Metadata(METATDATA_DOMAIN) = DomainName m_blob.Metadata(METADATA_AGGREGATE_CLASS) = GetType(TAggregate).Name m_blob.Metadata(METADATA_AGGREGATE_KEY) = m_key.ToString() m_blob.Metadata(METADATA_SEQUENCE) = "0" m_blob.Metadata(METADATA_RECORD_COUNT) = "0" m_blob.SetMetadata() Else m_blob.FetchAttributes() End If End If
End Sub

This is then used to append an event as a two-step process:  first wrap the raw event in a class that indicates the event type, sequence number and other event level metadata and then write the whole to the end of the append blob.

Public Sub AppendEvent(EventInstance As IEvent) Implements IEventStreamWriter(Of TAggregate, TAggregationKey).AppendEvent
 If (AppendBlob IsNot Nothing) Then
 Dim nextSequence As Long = IncrementSequence()
 Dim evtToWrite As New BlobBlockWrappedEvent(nextSequence, 0, EventInstance)
 
 Dim recordWritten As Boolean = False
 Try
 Using es As System.IO.Stream = evtToWrite.ToBinaryStream()
 Dim offset As Long = AppendBlob.AppendBlock(es)
 End Using
 recordWritten = True
 Catch exBlob As Microsoft.WindowsAzure.Storage.StorageException
 Throw New EventStreamWriteException(DomainName, AggregateClassName, Key.ToString(), _ 
 nextSequence, "Unable to save a record to the event stream - " & evtToWrite.EventName, exBlob)
 End Try
 If (recordWritten) Then
 IncrementRecordCount()
 End If
 End If
End Sub 

To read events we simply open a snapshot of the append blob as a stream and deserialise wrapped events from that stream:

Private Function GetAppendBlobSnapshot() As CloudAppendBlob
 If (AppendBlob IsNot Nothing) Then
 Return AppendBlob.CreateSnapshot()
 Else
 Return Nothing
 End If
End Function

Private Function GetUnderlyingStream() As System.IO.Stream
 If (AppendBlob IsNot Nothing) Then
 Dim targetStream As New System.IO.MemoryStream()
 Try
 GetAppendBlobSnapshot().DownloadToStream(targetStream)
 Catch exBlob As Microsoft.WindowsAzure.Storage.StorageException
 Throw New EventStreamReadException(DomainName, AggregateClassName, m_key.ToString(), 
 0, "Unable to access underlying event stream", exBlob)
 End Try
 targetStream.Seek(0, IO.SeekOrigin.Begin)
 Return targetStream
 Else
 Return Nothing
 End If
End Function

Public Function GetEvents() As IEnumerable(Of IEvent) 
 Implements IEventStreamReader(Of TAggregate, TAggregationKey).GetEvents
 If (AppendBlob IsNot Nothing) Then
 Dim ret As New List(Of IEvent)
 Dim bf As New BinaryFormatter()
 Using rawStream As System.IO.Stream = GetUnderlyingStream()
 While Not (rawStream.Position >= rawStream.Length)
 Dim record As BlobBlockWrappedEvent = CTypeDynamic(Of BlobBlockWrappedEvent)(bf.Deserialize(rawStream))
 If (record IsNot Nothing) Then
 ret.Add(record.EventInstance)
 End If
 End While
 End Using
 Return ret
 Else
 Throw New EventStreamReadException(DomainName, AggregateClassName, MyBase.m_key.ToString(),
 0, "Unable to read events - Azure blob not initialised")
 End If
End Function 

3) Using Azure Storage “Files” to persist events 

The implementation that uses Azure files is quite similar to that which uses append blobs except that it pre-allocates the full size of file the first time it is used and thereafter just fills this in as events are added.  In addition it co-opts the file pointer of the start of the event record to be the sequence number of that event.

The record count, aggregate type, aggregate key and current sequence number are also stored as attributes in each event stream file.

<Serializable()>
<DataContract()>
Public Class FileBlockWrappedEvent <DataMember(Name:="EventName", Order:=0)> Private ReadOnly m_eventName As String Public ReadOnly Property EventName As String Get Return m_eventName End Get End Property <DataMember(Name:="Sequence", Order:=1)> Private ReadOnly m_sequence As Long Public ReadOnly Property Sequence As Long Get Return m_sequence End Get End Property <DataMember(Name:="Version", Order:=2)> Private ReadOnly m_version As UInteger Public ReadOnly Property Version As UInteger Get Return m_version End Get End Property <DataMember(Name:="Timestamp", Order:=3)> Private ReadOnly m_timestamp As DateTime Public ReadOnly Property Timestamp As DateTime Get Return m_timestamp End Get End Property <DataMember(Name:="Size", Order:=4)> Private ReadOnly m_eventSize As UInteger Public ReadOnly Property EventSize As UInteger Get Return m_eventSize End Get End Property <DataMember(Name:="Class", Order:=4)> Private ReadOnly m_eventClassName As String Public ReadOnly Property ClassName As String Get Return m_eventClassName End Get End Property <DataMember(Name:="Data", Order:=5)> Private ReadOnly m_eventData As Byte() Public ReadOnly Property EventInstance As IEvent Get If (String.IsNullOrWhiteSpace(m_eventClassName)) Then Throw New SerializationException("Unable to determine the event type that wrote this event instance") End If If (m_eventSize = 0) Then Throw New SerializationException("Unable to return the event data for this event instance - size is zero") End If Dim evtType As Type = Type.GetType(m_eventClassName, True, True) If (evtType IsNot Nothing) Then Dim bf As New BinaryFormatter() Using memStream As New System.IO.MemoryStream(m_eventData) Return CTypeDynamic(bf.Deserialize(memStream), evtType) End Using End If Return Nothing End Get End Property Public Sub New(ByVal sequenceInit As String, ByVal versionInit As UInteger, ByVal timestampInit As DateTime, ByVal eventInstanceInit As IEvent) m_eventName = EventNameAttribute.GetEventName(eventInstanceInit) m_sequence = sequenceInit m_version = versionInit m_timestamp = timestampInit Dim bf As New BinaryFormatter() Using memStream As New System.IO.MemoryStream() bf.Serialize(memStream, eventInstanceInit) m_eventSize = memStream.Length m_eventData = memStream.ToArray() End Using m_eventClassName = eventInstanceInit.GetType().AssemblyQualifiedName End Sub Public Sub WriteToBinaryStream(ByVal stream As System.IO.Stream) Dim bf As New BinaryFormatter() bf.Serialize(stream, Me) End Sub End Class

4) Using local files to store events

If you want to perform unit testing on an event sourcing system or want to set up a trial project to get your head around the concepts before committing to use the technique in a full scale project you can use files on your local machine to store the event streams. 

To do this it uses the aggregate identifier to create an unique filename and appends events to each file accordingly.  There is an “information record” at the start of the file that indicates what domain and full class name the event stream pertains to:

Private Sub AppendEventInternal(EventInstance As IEvent(Of TAggregate)) If (MyBase.m_file IsNot Nothing) Then Dim evtToWrite As New LocalFileWrappedEvent(m_eventStreamDetailBlock.SequenceNumber, EventInstance.Version, DateTime.UtcNow, EventInstance, m_setings.UnderlyingSerialiser) If (evtToWrite IsNot Nothing) Then Using fs = m_file.OpenWrite() fs.Seek(0, IO.SeekOrigin.End) evtToWrite.WriteToBinaryStream(fs) m_eventStreamDetailBlock.SequenceNumber = fs.Position End Using m_eventStreamDetailBlock.RecordCount += 1 End If End If End Sub

5) Using local memory to store events

For unit testing or for scenarios where the whole event stream can be stored in the local memory of a machine (for example when an undo-redo buffer is implemented using this technology).

Choosing the storage type to use

In my experience the choice of storage technology depends on the specifics of the system you are building but I would recommend using Azure Tables (or even SQL on Azure) if you want to be able to look into the event streams but use Append Blobs or Files when you need the maximum performance and horizontal scaling. 

In particular if your event streams are likely to have a higher write rate (for example in any IoT instrumentation scenario, a multi player game or a trading platform) then the AppendBlob scales well and is very very fast.

In order to switch between these without major rewriting I have allowed for configuration by specific configuration settings (I think there is still some work to do on this however) to map the aggregate class to the backing technology used to store its event streams and projection:

<CQRSAzureEventSourcingConfiguration> <ImplementationMaps> <Map AggregateDomainQualifiedName="HospitalWard.Nurse" ImplementationName="InMemoryImplementationExample" SnapshotSettingsName="InMemorySnapshotExample" /> </ImplementationMaps> <Implementations> <Implementation Name="InMemoryImplementationExample" ImplementationType="InMemory"> <InMemorySettings /> </Implementation> <Implementation Name="AzureBlobImplementationExample" ImplementationType="AzureBlob"> <BlobSettings ConnectionStringName="UnitTestStorageConnectionString" /> </Implementation> <Implementation Name="AzureBlobImplementationDomainExample" ImplementationType="AzureBlob"> <BlobSettings ConnectionStringName="UnitTestStorageConnectionString" DomainName="Test" /> </Implementation> <Implementation Name="AzureFileImplementationExample" ImplementationType="AzureFile"> <FileSettings ConnectionStringName="UnitTestStorageConnectionString" InitialSize="20000" /> </Implementation> <Implementation Name="AzureSQLImplementationExample" ImplementationType="AzureSQL"> <SQLSettings ConnectionStringName="UnitTestStorageConnectionString" AggregateIdentifierField="AggregateKey" /> </Implementation> <Implementation Name="AzureTableImplementationExample" ImplementationType="AzureTable"> <TableSettings ConnectionStringName="UnitTestStorageConnectionString" SequenceNumberFormat="00000000" /> </Implementation> <Implementation Name="LocalFileSettingsExample" ImplementationType="LocalFileSettings"> <LocalFileSettings EventStreamRootFolder="C:\CQRS\Data\EventStreams" UnderlyingSerialiser="JSON"/> </Implementation> </Implementations> <SnapshotSettings> <SnapshotSetting Name="InMemorySnapshotExample" ImplementationType="InMemory"> <InMemorySettings /> </SnapshotSetting> </SnapshotSettings> </CQRSAzureEventSourcingConfiguration>

Consuming events and projections 

In order to turn your event stream into something interesting (at least, interesting to a user that wants to query the data) you need to create a projection.  A projection is a view of the effect of a set of events.  For example a financial projection on the above cars event example would be interested in any event that impacted the cost or profit from any given car.   

To consume events you need to create a class that “knows” what kind of events it deals with and what to do with them.  These specific projections can be run over a single aggregate’s event stream in order to perform some calculation or operation based on the underlying data of that event stream.

The underlying interface for any projection is:-

 Public Interface IProjection(Of TAggregate As IAggregationIdentifier, TAggregateKey)
 Inherits IProjection

 Sub HandleEvent(Of TEvent As IEvent(Of TAggregate))(ByVal eventToHandle As TEvent)

 

End Interface

Each specific projection that implements the class decides what action to perform with the event’s data payload

 Public Overrides Sub HandleEvent(Of TEvent As IEvent(Of MockAggregate))(eventToHandle As TEvent) _ Implements IProjection(Of MockAggregate, String).HandleEvent Select Case eventToHandle.GetType() Case GetType(MockEventTypeOne) HandleMockEventOne(CTypeDynamic(Of MockEventTypeOne)(eventToHandle)) Case GetType(MockEventTypeTwo) HandleMockEventTwo(CTypeDynamic(Of MockEventTypeTwo)(eventToHandle)) Case Else Throw New ArgumentException("Unexpected event type - " & eventToHandle.GetType().Name) End Select End Sub Private Sub HandleMockEventOne(ByVal eventToHandle As MockEventTypeOne) AddOrUpdateValue(Of Integer)(NameOf(Total), _ ProjectionSnapshotProperty.NO_ROW_NUMBER, Total + eventToHandle.EventOneIntegerProperty) AddOrUpdateValue(Of String)(NameOf(LastString), _ ProjectionSnapshotProperty.NO_ROW_NUMBER, eventToHandle.EventOneStringProperty) End Sub

Snapshots

To allow the current state of a projection to be saved – both for use by any readers and for allowing us to have a starting point if we have to rebuild a projection after a service interruption – an interface to define a snapshot of a projection is also defined. This is a way of saying “it was like this at a given known point”

Public Interface ISnaphot(Of In IAggregateIdentity)

 ReadOnly Property AsOfVersion As Long

End Interface

In practice I save these snapshots as a blob (file) in JSON format – this makes (some of) the query side of the CQRS architecture as simple as finding the snapshot and reading it.

Sequencing and synchronising events

It is often useful to be able to sequence events that occurred to different aggregations together – for example you might have one aggregation for a stock price and another for an account and need to combine the two to give an account valuation at a given point in time.

To facilitate this a master synchronisation field is needed – this could be an incremental number or you can use the date/time of the event occurrence.

To do this an abstract class is used as the base class of all the event types and it handles the synchronisation key.

Public MustInherit Class EventBase Public Property SynchronisationStamp As Long Public Sub New() SynchronisationStamp = DateTime.UtcNow.Ticks End Sub
End Class

Running a projection

Now we want to run a projection over the event stream in order to read the current state of the aggregate from the events that have occured to it.  

Because a projection is a business object rather than a framework object I want to separate it from the underlying implementation of the event stream itself. (This is also very handy to allow you to perform unit tests on a projection by using an in-memory event stream created by the unit test method itself).

In order to do this we have an interface IEventStreamReader which provides the event stream to the projection. Each implementation of a different backing store to an event stream must support this interface:

Public Interface IEventStreamReader(Of TAggregate As IAggregationIdentifier, 
 TAggregationKey)

 Function GetEvents() As IEnumerable(Of IEvent(Of TAggregate))

 Function GetEvents(ByVal StartingVersion As UInteger) As IEnumerable(Of IEvent(Of TAggregate))

 Function GetEventsWithContext() As IEnumerable(Of IEventContext)


End Interface

The particular event stream reader to use is passed to a “Projection Processor” which in turn can take a projection class and run that over the event stream:-

 Public Class ProjectionProcessor(Of TAggregate As IAggregationIdentifier,
 TAggregateKey)

 Private ReadOnly m_streamReader As IEventStreamReader(Of TAggregate, 
 TAggregateKey)


 Public Sub Process(ByVal projectionToProcess As IProjection(Of TAggregate, 
 TAggregateKey))

 If (m_streamReader IsNot Nothing) Then
 If (projectionToProcess IsNot Nothing) Then
 
 Dim startingSequence As UInteger = 0
 If (projectionToProcess.SupportsSnapshots) Then
 

 
 End If
 For Each evt In m_streamReader.GetEvents(startingSequence)
 If (projectionToProcess.HandlesEventType(evt.GetType())) Then
 projectionToProcess.HandleEvent(evt)
 End If
 Next
 End If
 Else
 
 End If

 End Sub

 Friend Sub New(ByVal readerTouse As IEventStreamReader(Of TAggregate, TAggregateKey))
 m_streamReader = readerTouse
 End Sub

End Class

The specific event stream reader implementation class has a factory method to create a projection processor for any given aggregate instance thus:-

 #Region "Factory methods"
 Public Shared Function Create(ByVal instance As IAggregationIdentifier(Of TAggregationKey)) 
 As IEventStreamReader(Of TAggregate, TAggregationKey)

 Return New BlobEventStreamReader(Of TAggregate, TAggregationKey)
 (DomainNameAttribute.GetDomainName(instance), instance.GetKey())

 End Function

 Public Shared Function CreateProjectionProces(
 ByVal instance As IAggregationIdentifier(Of TAggregationKey)) 
 As ProjectionProcessor(Of TAggregate, TAggregationKey)

 Return New ProjectionProcessor(Of TAggregate, TAggregationKey )
 (Create(instance))

 End Function

#End Region

Having created this framework code we can now create our aggregate, event and projection (business logic) classes totally independent of the underlying implementation of their event stream and allowing them to be portable between these different technologies.

Running a classifier

A classifier is a special kind of projection that is used to decide whether or not a given instance of an aggregate is in or out of some defined business meaningful group.  For example if you were implementing a banking system you might have a classifier which ran over the event stream of each account so as to decide which accounts were in the group “accounts in arrears”.

Once you have a framework to use as the basis for an event stream based system you have to create the classes that represent the business parts of that system: the aggregates, events and projections.  You can do this with a graphical designer or if you prefer you can just create the classes in code yourself.

Starting with the aggregate identifier (the “thing” to which events can occur and be recorded) you need to create a class that defines how that aggregate can be uniquely identified.  For this we need to identify the data type for its unique key and, because so many systems use strings for their data storage, a consistent way to turn that unique key into a string.

If we take the example of a bank account that is identified by an unique account number we would end up with an aggregate class like:-

<DomainName("CloudBank")>
Public Class Account Implements IAggregationIdentifier(Of String) Private m_bankAccount As String Public Sub SetKey(key As String) _ Implements IAggregationIdentifier(Of String).SetKey m_bankAccount = key End Sub Public Function GetAggregateIdentifier() As String _ Implements IAggregationIdentifier.GetAggregateIdentifier Return m_bankAccount End Function Public Function GetBankAccountNumber() As String _ Implements IAggregationIdentifier(Of String).GetKey Return m_bankAccount End Function Public Sub New(ByVal accountNumber As String) m_bankAccount = accountNumber End Sub End Class

Then for each event that can occur against this bank account you would need to create an event class that has all of the properties that can be known about the event.  Note that there is no need to define any key event or the mandatory nature – basically any data that can be stored about an event should go into the event definition.

So the event that corresponds to money being deposited into a bank account could look something like:-

<DomainName("CloudBank")>
<AggregateIdentifier(GetType(Account))>
<EventAsOfDate(NameOf(MoneyDeposited.DepositDate))>
Public Class MoneyDeposited Implements IEvent(Of Account) Public Property DepositDate As Nullable(Of DateTime) Public Property Amount As Decimal Public ReadOnly Property Version As UInteger Implements IEvent(Of Account).Version Get Return 1 End Get End Property End Class

Notice that the event definition also includes a version number, which you should increment if you ever change or add properties. There is also an optional attribute EventAsOfDate which allows you to specify which property of the event contains the actual real world date and time that the event occured.

Having defined all of the event types you would then move to defining the projections that allow you to get the point-in-time state of the aggregate from its event stream. A projection class needs to know which event types it handles, and for each of these what to do when that event type is encountered. For example a projection that was to give you the running balance of an account would need to handle MoneyDeposited and MoneyWithdrawn events and to update the CurrentBalance property when it encounters them:-

<DomainName("CloudBank")>
<AggregateIdentifier(GetType(Account))>
Public Class CurrentBalanceProjection Inherits ProjectionBase(Of Account, String) Implements IHandleEvent(Of MoneyDeposited) Implements IHandleEvent(Of MoneyWithdrawn) Private m_currentBalance As Decimal Public ReadOnly Property CurrentBalance As Decimal Get Return m_currentBalance End Get End Property Public Overrides Function HandlesEventType(eventType As Type) As Boolean If eventType Is GetType(MoneyDeposited) Then Return True End If If eventType Is GetType(MoneyDeposited) Then Return True End If Return False End Function Public Overrides ReadOnly Property SupportsSnapshots As Boolean Get Return True End Get End Property Public Overrides Sub HandleEvent(Of TEvent As IEvent)(eventToHandle As TEvent) If GetType(TEvent) Is GetType(MoneyDeposited) Then HandleMoneyDepositedEvent(CTypeDynamic(Of MoneyDeposited)(eventToHandle)) End If If GetType(TEvent) Is GetType(MoneyWithdrawn) Then HandleMoneyWithdrawnEvent(CTypeDynamic(Of MoneyWithdrawn)(eventToHandle)) End If End Sub Public Sub HandleMoneyWithdrawnEvent(eventHandled As MoneyWithdrawn) _ Implements IHandleEvent(Of MoneyWithdrawn).HandleEvent m_currentBalance -= eventHandled.Amount End Sub Public Shadows Sub HandleMoneyDepositedEvent(eventHandled As MoneyDeposited) _ Implements IHandleEvent(Of MoneyDeposited).HandleEvent m_currentBalance += eventHandled.Amount End Sub End Class

As you can see in this example there is a clear and complete separation between the business logic code and the franework code that supports it, allowing for mock-free testing of all the business rule classes.

Notes on the source code

The source code I have attached to this project contains the framework code (and unit test projects) for all the “CQRS on Azure” articles I have uploaded:-

Each project has a read-me file and I recommend reading this and also running through the unit tests to improve the use you can get from this code.

I have deleted the App.Config from the unit test as it contains a reference to my own Azure storage account.  You will need an Azure account in order to run the on-cloud unit tests, but the in memory tests and local file tests can be run without any such access.

<connectionStrings> <add name="UnitTestStorageConnectionString" connectionString="TODO - Add your connection string here" providerName="" />  <add name="StorageConnectionString" connectionString="TODO - Add your connection string here" providerName="" />
</connectionStrings>

Further reading / resources

This article is background for a talk I did for the Dublin Microservices meetup group – video on YouTube

LEAVE A REPLY