Wednesday, December 7, 2022
No menu items!
HomeDatabase ManagementBuild a CQRS event store with Amazon DynamoDB

Build a CQRS event store with Amazon DynamoDB

The command query responsibility segregation (CQRS) pattern, derived from the principle of command-query separation, has been popularized by the domain-driven design community. CQRS architectures that use event sourcing save generated events in an append-only log called an event store. By using event sourcing, you can, among other benefits:

Design applications to update a database and send events or messages without having to use expensive distributed transactions (also known as two-phase commit protocol transactions) that span a database and a message broker.
Mitigate object-relational impedance mismatch issues.
Store the full history of all data changes.
Use the event store as an audit log and to assist with debugging.
Support high-performance queries using dedicated denormalized views.

Please note that the CQRS and event sourcing patterns are complex and should not be used for simple create, read, update, and delete style applications. Furthermore, implementers must allow for and handle eventual consistency. However, CQRS and event sourcing may prove to be beneficial if you looking to build a system that:

Supports a rich domain model that encapsulates complicated business logic and rules.
Is required to propagate events between microservices or over system boundaries.
Is designed with an event-first approach using methods like event storming.

In this post, I discuss how to build an event store using Amazon DynamoDB.

Before you get started

This post assumes that you have a basic understanding of CQRS, event sourcing, and DynamoDB.

Event sourcing and CQRS

The diagram shown in Figure 1 that follows shows that a CQRS architecture includes both a READ and a WRITE model.

Figure 1 – CQRS READ and WRITE models with event sourcing

CQRS READ and WRITE models can be designed independently to meet an application’s requirements. The WRITE model consists of aggregates that process commands. Commands represent an intent to modify the system. The READ Model receives queries that are applied against materialized views and are used to support read requests.

An aggregate is composed of one or more component objects that encapsulate and enforce a domain’s business logic. When a CQRS architecture uses event sourcing, the data generated by an aggregate is stored as a collection of ordered events. For example, assume there is a Widget aggregate that’s responsible for managing the domain of widgets where a widget is a small device being sold on a website. The Widget aggregate includes functionality for changing a widget’s name. A client system can request a widget name change by sending the ChangeWidgetName command to this aggregate. Upon successfully processing the command, the Widget aggregate yields its updated state in the form of the WidgetNameChanged event. This new event is then saved to the event store as a new record.

To restore its state, an aggregate loads its events from the event store in chronological order. The events are then streamed to the READ model of the architecture where denormalizing components use these events to build materialized views that support query requests.

DynamoDB event store design

DynamoDB is a key-value NoSQL database that delivers single-digit-millisecond performance at any scale. It’s a fully managed, multi-Region, multi-active database with built-in security, backup and restore, and in-memory caching for internet-scale applications.

Important: When working with DynamoDB, use the best practices for designing and architecting with DynamoDB. In DynamoDB, table schemas are flexible. Items with differing structure can be stored in the same table and only the primary key for the table and secondary indexes is enforced. To support item uniqueness, you can prefix different values to the partition key or sort keys if you’re using a composite primary key. Understanding the access patterns for your application is important to the success of your DynamoDB implementation.

An event store can be implemented using traditional relational database tables. However, by using DynamoDB you gain a number of advantages over this approach including:

DynamoDB belongs to the serverless technology family which means you don’t have to manage servers or spend significant amounts of time on infrastructure management. This means you can spend more time on coding business logic and enhancing your product.
You gain performance benefits associated with NoSQL technology. An event store design is not characterized by joins between tables or relations which makes it an ideal candidate for DynamoDB.
DynamoDB supports change data capture (CDC) natively using Amazon DynamoDB Streams which is discussed in detail later in this post. Adding CDC to a relational database can be a non-trivial exercise and may involve purchasing additional tools.
The DynamoDB global tables feature makes it an excellent choice for building distributed architectures where you can write to different Amazon Web Services (AWS) Regions and let the service coordinate the data replication and conflict resolution for you.

For the event store, as shown in Figure 2 that follows, there are three entity types:

Aggregate: A CQRS pattern that encapsulates the implementation of business logic.
Event: An event represents something that has occurred.
Snapshot: A snapshot represents the derived state of existing events at a certain point in time. A snapshot is used so that the full history of events doesn’t need to be stored or loaded at runtime.

Figure 2 – Event store entity relationship diagram

In this design, each entity type is read using its own access patterns, so the model includes a table per entity-type. This lets you optimize the primary key and data type for efficiency in storage and throughput, and also supports table-level choices such as backups, exports, periodic scans, storage class, and capacity mode based on the differing requirements for each entity type and its access patterns.

Event table

Assume that there is a Widget aggregate with the identifier (ID) of 123. Using this example, Figure 3 that follows shows how, over time, this aggregate’s state is stored as a series of events to the event table.

Figure 3 – DynamoDB event table

In DynamoDB, items are equivalent to rows in a relational database and attributes are comparable to fields. The event table in the preceding Figure 3 includes three events:

WidgetCreated, which has partition key (PK) = 123 and sort key (SK) = 1.
WidgetNameChanged, which has PK = 123 and SK = 2.
WidgetDescriptionChanged, has PK = 123 and SK = 3.

The design featured in the event table uses a composite primary key that consists of both a partition key attribute as a number and a sort key attribute as a number. When the sort key is a number, you can issue range queries and sort the events based on the event’s number, which increases chronologically. When using strings as sort keys, DynamoDB collates and compares strings using the bytes of the underlying UTF-8 string encoding.

Note: A production-ready CQRS solution requires more attribute values but for the purpose of describing the event store design, the attributes listed will suffice.

Event payloads are stored in the payload attribute and are serialized to JSON. However, because the payload attribute is specified as a string, you can use a different serialization format for this value (for example protocol buffers).

The sort key value for events represents the sequence in which these entities are created. This value is unique within the context of the owning aggregate and is managed by the aggregate itself which tracks event counts and order.

Event items should be immutable as they represent a fact. To protect them, you can create identity-based access policies to control access to specific tables and items.

Aggregate table

Figure 4 that follows depicts the design of the aggregate table with one example: the primary aggregate item Widget, which has PK = 123.

Figure 4 – DynamoDB aggregate table

For the aggregate item 123, only one entry is required; the main item, which describes a Widget aggregate. The item includes a last_events attribute, which contains a sorted map of the last set of events generated by the Widget aggregate. In this example, the last time aggregate 123 was invoked by a command , event 4 WidgetStockUpdated was created but this isn’t recorded as a separate event item at this stage. This event will eventually be created as an individual item in the event table as part of a subsequent step, which is discussed in the later section: Loading an aggregate algorithm. The other attribute of interest is version. The main aggregate item 123 has a version value of 2, which means it’s been updated twice since its creation. The version attribute value is initially set to 0. DynamoDB supports optimistic locking through the use of a version attribute. This is a concurrency-control mechanism that you can use to handle race conditions. This uses the functionality of DynamoDB condition expressions.

Important: For production systems, ensure that your own aggregate ID partition key values are unique and support the even distribution of data.

Snapshot table

Figure 5 shows the design of the snapshot table which includes the snapshot, representing all events up to and including event 2 which has PK = 123 and SK = 1.

Figure 5 – DynamoDB Snapshot table

The Sort key value for snapshots, like events, represents the sequence in which snapshot items are created. This value is unique within the context of the owning aggregate and is managed by the aggregate itself which tracks snapshot counts and order. The event_number attribute for a snapshot is the number of the last event that was processed at the time the snapshot was taken.

Note: A production-ready CQRS solution might only take a snapshot after tens, hundreds, or even thousands of events have been created since the last time a snapshot was taken.

Expiring event and snapshot items with Time to Live attributes

Once an event or snapshot has been superseded by a newer snapshot, DynamoDB Time to Live (TTL) attributes are added to the older items. Using TTL, the size of the event and snapshot tables is minimized as DynamoDB cleans and removes items marked with expiry timestamps. However, the items can be archived prior to the expiry event taking place. When a snapshot item is created, the TTL can be set by using Amazon DynamoDB Streams and an AWS Lambda trigger.

Figure 6 that follows shows how a Lambda function, initiated by DynamoDB Streams, can be used to set the TTL attribute on event items in the event table as a result of a snapshot item being written to the snapshot table.

Figure 6 – DynamoDB Streams Lambda function

Note: Items deleted using TTL can also be processed with DynamoDB Streams. DynamoDB attempts to clean items within 48 hours of the TTL value.

Loading an aggregate algorithm

An aggregate’s state is loaded or restored when it receives a command for processing. The command carries the ID of the aggregate (in the example provided, this is 123).

This load aggregate algorithm uses queries that apply key condition expressions, where the aggregate ID is used as a partition key to scope selective reads within the associated item collection. Since we’re using sort keys, the most recent snapshot is returned by setting the ScanIndexForward parameter to false and Limit to 1 and new events can be loaded through the use of range functions like between, greater than or equal to, and less than or equal to.

Note: If the command being processed creates a new aggregate, the loading process described in the preceding paragraph isn’t used.

The steps for restoring an aggregate’s state are:

Load the primary aggregate item.
Prepare existing aggregate state (preparation phase).
Load the last snapshot (if present).
Load the remaining events.

Figure 7 that follows shows the sequence of steps for loading an aggregate’s state from the event store tables. The diagram also depicts the service and repository application layers and the interactions between those layers.

Figure 7 – Load aggregate sequence diagram

The steps to load an aggregate’s state, depicted in Figure 7, are as follows:

Load the primary aggregate item – includes using the GetItem operation to retrieve the relevant aggregate item from the aggregate table using the received identifier. If the operation finds no matched item, then return an error to the client.
Prepare existing aggregate state – consists of the event data preparation phase. The events that were created the last time the aggregate was saved are now persisted as separate items in the event table. A PutItem operation is made for each event in the last_events attribute. If an event already exists in the event table, the event doesn’t need to be persisted as events are immutable. Generally, only one event will be present in the map so this doesn’t represent a significant overhead. This is an important step as it will guarantee event and aggregate data is in the correct state before the application of business logic.

Important: The maximum item size in DynamoDB is 400 KB so it’s important to save the events as separate items to ensure the aggregate item size remains under this limit.

Load the last snapshot – starts by determining if the aggregate contains a snapshot. To do this, issue a query to DynamoDB to find the most recent snapshot in the Snapshot table. As part of this query, reverse sort on Sort key by setting ScanIndexForward to false and Limit to 1. This will make certain only the latest snapshot item is returned. A snapshot item contains an event_number attribute which can be used to determine which events to load in the next step. This is used to ensure that only those events that have occurred since the snapshot was taken will be retrieved from the event table.
Load the remaining items – restores the aggregate state by loading all relevant events. These are the events that have been created since the last snapshot was taken or all events if no snapshots are available. You can query for events using range key queries. To do this, query for events items in the event table using Sort key >= n where n is the number of the earliest event that needs to retrieved.

With the completion of step 4, the aggregate and its data are fully loaded into memory and the current received command can be processed.

Saving an aggregate algorithm

An aggregate is saved after it processes a command successfully. The aggregate will emit events and optionally a snapshot.

To save an aggregate’s state to the event store:

Save the primary aggregate item.
Save the snapshot Item (optional).

Note: A precondition for step 1 is that if the aggregate already exists, its state has been fully loaded into memory. This doesn’t apply to new aggregates. Figure 8 that follows shows the sequence of steps for saving an aggregate to the event store tables.

Figure 8 – Save aggregate sequence diagram

The steps to save an aggregate, depicted in Figure 8, are as follows:

Save the primary aggregate item consists of a single PutItem operation against the aggregate table. This operation uses the version attribute to ensure existing data is not incorrectly overwritten. DynamoDB supports using condition expressions with single item atomic writes. Transactions are recommended when you have more than one item to modify. In this example, the design only requires a single item write as part of the aggregate save process for step 1. The PutItem operation for the aggregate item includes a sorted map (the last_events attribute) of the events generated by the aggregate in response to processing the current command. The events in the map will be saved as individual items as part of the preparation phase when an aggregate is loaded. This is described in the section Loading an aggregate algorithm.
Save the snapshot Item is an optional step that’s run if the aggregate emits a snapshot in addition to the events. Saving a snapshot requires an additional PutItem operation, which is applied against the Snapshot table.

Loading and saving aggregate examples

This section provides examples that show how the event store tables are updated when an aggregate is saved and loaded. The save and load processes are initiated when an aggregate processes a command.

CreateWidget command

As shown in Figure 9 that follows, a Widget aggregate item is created in the aggregate table after the successful processing of the CreateWidget command.

Figure 9 – CreateWidget command updates to the aggregate table

ChangeWidgetName command

Figure 10 that follows shows the updates to the event table when a ChangeWidgetName command is received for processing. The WidgetCreated event saved previously is created as a new item in the event table.

Figure 10 – CreateWidget command pre-processing updates to the event table

Figure 11 that follows shows the updates to the aggregate item in the aggregate table after the aggregate processes a ChangeWidgetName command.

Figure 11 – ChangeWidgetName command updates to the aggregate table

ChangeWidgetDescription command

Figure 12 that follows shows the updates to the event table when a ChangeWidgetDescription command is received for processing. The WidgetNameChanged event saved previously is created as a new item in the event table.

Figure 12 – ChangeWidgetDescription command pre-processing updates to the event table

Figure 13 that follows shows the updates to the aggregate item in the aggregate table after the aggregate processes a ChangeWidgetDescription command.

Figure 13 – ChangeWidgetDescription command updates to the aggregate table

Event store change data capture with DynamoDB Streams

All event table changes can be made available through the use of DynamoDB Streams, a message bus that persists entries for 24 hours. DynamoDB Streams stores every modification made to a table and delivers the associated items exactly once, in order (by item). Applications can read from this change log and propagate events to one or more subscribers.

Conclusion

If your organization is data-driven, you need to ensure that your designs support data insights now and in the future. By building CQRS and event sourcing solutions on AWS with DynamoDB at their core, you can make significant progress towards future-proofing your data platforms.

In this post, you received an overview of how to build an event store using DynamoDB. You saw the basic steps required to implement the event store using DynamoDB features, including:

TTL attributes.
Optimistic locking and condition expressions.
DynamoDB Streams for change data capture.
Maximizing the efficiency of DynamoDB queries with partition and sort keys.

AWS provides services that you can use to build other components of a CQRS architecture, including:

Amazon Kinesis Data Streams and Amazon EventBridge to provide event bus and choreography functionality.
AWS Step Functions to support orchestration and the Saga pattern.
AWS Lambda to host aggregate and denormalization code.
Amazon S3 to archive events and to host a data lake composed of the rich business events streamed from the event store.

For more information, see:

Martin Fowler’s articles at martinFowler.com about event sourcing and CQRS.
Chris Richardson’s articles at Microservices.io about event sourcing and CQRS.

About the author

Luke Popplewell works primarily with federal entities in the Australian Government. In his role as an architect, Luke uses his knowledge and experience to help organizations reach their goals on the AWS cloud. Luke has a keen interest in serverless technology, digital innovation, modernization, DevOps, and event-driven architectures.

Read MoreAWS Database Blog

RELATED ARTICLES

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Most Popular

Recent Comments