Building aggregations with DynamoDB Streams
Race To Cloud Castle game in full swing now seems like a good time to break down one interesting aspect of the game’s construction: the leaderboard aggregations.
Jun 08, 2023 • 9 Minute Read
With the Cloud Adventure: Race To Cloud Castle game in full swing (get your paths in by Sept 4!) now seems like a good time to break down one interesting aspect of the game's construction: the leaderboard aggregations.
How the game works
In "Cloud Adventure", players compete to find and report paths marked by gems hidden across various cloud services. When you click "Submit Path" on the game page, the following event flow is triggered:
If your path is valid, we store it in a DynamoDB table in the following format:
This works great for storing and retrieving paths. By storing our user identifier in the partition key and our submitted path in the sort key, we can query the user identifier to get all paths associated with this user in one shot -- great for checking your progress in the game.
Adding Aggregations to DynamoDB
But we want to do more than just store and retrieve paths. As you can see on the leaderboard page, we want to ask the following questions about our data:
- How many people ("active players") have submitted at least one valid path to the leaderboard?
- How many people have found X valid paths, where X is any number between 1 and 12?
- How many total paths (with overlap) have been discovered by all players combined?
There's no simple query you can write against the DynamoDB model laid out above to find the answers to these questions; you'd have to scan the whole table for each and do a bunch of client-side math. Slow and expensive!
Instead, we need to build some aggregates. And if you're feeling like this must be much more complex than putting the data in a relational database and writing SUM queries, don't worry: with a bit of help from AWS SAM, we'll be up and running in just a few minutes.
NoSQL for Grownups: DynamoDB Single-Table Modeling w/ Rick Houlihan
DynamoDB can be a scalable, cost-effective replacement for a traditional relational database . . . if you use it correctly! In this free on-demand webinar, Rick Houlihan, Sr. Practice Manager at AWS and inventor of single-table DynamoDB design, shows his tricks for modeling complex data access patterns in DynamoDB.
DynamoDB Table Layout
Remember, DynamoDB tables are "schema-on-read", not "schema-on-write" -- you can throw any data into the attributes you want; it's up to the client to interpret them correctly. This allows us to do something called index overloading. We can store our aggregations as additional records in the same DynamoDB table. After all, why configure two tables when we can just as easily use one?
This is why I gave my partition and sort keys generic names ("pk" and "sk"), so I don't limit myself to thinking of them as "username" or "path" fields.
Setting up DynamoDB streams
Now all we have to do is keep these aggregates up to date. We could do this by synchronously updating a bunch of aggregation fields in DynamoDB whenever a user reports a new path. But that would quickly start to bog down the user experience.
Instead, we can treat our path updates as events and process them asynchronously, behind the user's back. Sneaky!
Handily, DynamoDB gives us a cool feature called Streams, which behaves a lot like a direct Kinesis integration with our table. Once we set this up, we can get any changes to table data exported as events for downstream processing.
(And I do mean "any events". DynamoDB Streams don't support filtering. So our handlers are going to have to pick through all the table changes in order to find the ones they care about.)
The good news is that we can define DynamoDB Streams with minimal config in an AWS SAM template. Here is my table resource:
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: SAM template for DynamoDB aggregations
Resources:
ResultsTable:
Type: AWS::DynamoDB::Table
Properties:
StreamSpecification:
StreamViewType: NEW_AND_OLD_IMAGES
AttributeDefinitions:
- AttributeName: "pk"
AttributeType: "S"
- AttributeName: "sk"
AttributeType: "S"
KeySchema:
- AttributeName: "pk"
KeyType: "HASH"
- AttributeName: "sk"
KeyType: "RANGE"
BillingMode: "PAY_PER_REQUEST"
SAM uses the StreamViewType
attribute to intuit that we want to wire up a stream to this table. The NEW_AND_OLD_IMAGES
value means that we want to see both the old and modified data on the stream each time we make a change to the table.
Notice that we are NOT defining any additional secondary indexes on this table -- we are going to make our aggregations work using nothing but the main table index!
Now we just need to connect the stream to the Lambda function that will process the aggregations. Under the Events
section of our Lambda resource, we'll add a function trigger of type DynamoDB
, wired to the stream we implicitly created on our table. The StartingPosition
of TRIM_HORIZON
means we always want to start from the oldest unprocessed record in the stream, and the BatchSize
means we'll work with up to ten records at a time.
AggregatorFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: backend/
Handler: handler.aggregation_handler
Runtime: python3.8
Policies:
- DynamoDBCrudPolicy:
TableName: !Ref ResultsTable
Environment:
Variables:
TABLE_NAME: !Ref ResultsTable
Events:
DynamoDBEvent:
Type: DynamoDB
Properties:
Stream:
!GetAtt ResultsTable.StreamArn
StartingPosition: TRIM_HORIZON
BatchSize: 10
Building the aggregates
As you can see in the function definition, I chose to write my Lambda in Python 3. You'll note that I'm using Python's logger
module to write structured output to CloudWatch -- very helpful if you are trying to parse through mounds of debug logs! I've also chosen to initialize my boto3 resource and table object in global space, outside the function handler, so I only incur the initialization time penalty on cold start.
import os
import logging
import boto3
logger = logging.getLogger()
logger.setLevel(os.environ.get("LOG_LEVEL", "INFO"))
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(os.environ["TABLE_NAME"])
def aggregation_handler(event, context):
logger.info(event)
for record in event['Records']:
if record['eventName'] == "INSERT" and "user_submitted_path" in record["dynamodb"]["NewImage"]:
#Make sure the user hasn't already reached their 12-path limit
num_paths = len(get_recorded_paths_for(record["dynamodb"]["Keys"]["pk"]["S"]))
if num_paths <= 12:
#Perform Aggregations
Inside the aggregation_handler
itself (the code that gets triggered when DynamoDB Streams invokes the function), I'm looping through the set of up to 10 records that DynamoDB will send me.
Note: because this is a single-producer system where my own code controls what gets written to the table, I'm not super concerned about malformed events making their way onto the stream. But if I was, this code would need more error handling to deal with unexpected data in the records. Otherwise, the stream will back up as Lambda tries and fails to process the same record set over and over again.
Inside the loop, we'll first need to perform a filtering check on the record, because not every record on the stream is relevant for our aggregations. In our case, we are looking for newly inserted records that contain a user_submitted_path
-- i.e., a path we need to aggregate into our various metrics.
Assuming this is a relevant record, we will then have to make a single DynamoDB QUERY
to retrieve the full set of previously reported paths for this user. It's important to note that this is the only additional query we will need to make in our aggregation efforts; all other information is contained in the streamed record. The query helper function looks something like this:
def get_recorded_paths_for(user):
response = table.query(KeyConditionExpression=Key('pk').eq(user))
logger.info(response)
paths = response['Items'] if 'Items' in response else []
return paths
Okay, now that we have our data, we'll just build the aggregations as follows:
Aggregation #1: update total paths found
response = table.update_item(
Key={
'pk': 'AGGREGATES',
'sk': 'TOTAL_PATHS_FOUND'
},
ReturnValues='UPDATED_NEW',
UpdateExpression='ADD data_value :d',
ExpressionAttributeValues={':d': 1}
)
logger.info(response)
#Aggregation #2: update number of players who have found this number of paths
response = table.update_item(
Key={
'pk': 'AGGREGATES',
'sk': str(num_paths) + '_PATHS_FOUND'
},
ReturnValues='UPDATED_NEW',
UpdateExpression='ADD data_value :d',
ExpressionAttributeValues={':d': 1}
)
logger.info(response)
#Aggregation #3: update total number of players (if necessary)
if num_paths == 1:
response = table.update_item(
Key={
'pk': 'AGGREGATES',
'sk': 'TOTAL_PLAYERS'
},
ReturnValues='UPDATED_NEW',
UpdateExpression='ADD data_value :d',
ExpressionAttributeValues={':d': 1}
)
logger.info(response)
I'm using the ADD
action here on my DynamoDB updates, rather than the generally recommended SET
, because it initializes records if they do not exist - something that's always desirable in this case.
And in case you're wondering, yes, each of these updates triggers a change on the table, which itself places a new record on the stream and triggers this same aggregation function over again! Now you see why it's so important to place that filter check at the top of the code -- otherwise we'd be stuck in an infinite Lambda loop.
You might be wondering: would it make more sense just to put our aggregates in a separate DynamoDB table, so we wouldn't trigger a recursive streaming action when we update them? Yeah, maybe! As long as we don't think we'd ever need to write a query that retrieves aggregate and individual data at the same time.
Retrieving our aggregates
Now, from my front end, I can make a single API call that requests the aggregates, and retrieve them as follows:
table.query(KeyConditionExpression=Key('pk').eq("AGGREGATES"))
This makes loading the leaderboard quite fast - try it out! And in case you're wondering, the total time to asynchronously build and update the aggregates is consistently sub-second, so the leaderboard updates in what feels like real time.
Takeaways
Yes, this may seem like a more complex and fragile way to build aggregations than just putting our data in a relational database and doing a bit of math every time we load the leaderboard. But because the aggregates are precomputed, this leaderboard will load quickly, in constant time -- even at millions of records. And at small scale, my table costs remain comfortably in the free tier.
Moreover, once you get the DynamoDB Streams infrastructure set up, the work to add additional aggregations doesn't feel any worse to me than mapping a new backend function to a relational database with an ORM library. Your mileage there may vary.
Either way, enjoy the game, and I hope the knowledge of how the leaderboard works makes it a little more fun!