Building Event-Driven Microservices with AWS SNS/SQS and Scala
Introduction: The Challenge of Event-Driven Architecture
In modern distributed systems, services need to react to changes in real-time without tight coupling. Traditional request-response patterns create dependencies that limit scalability and make systems brittle. Event-driven architecture solves this by allowing services to react to events asynchronously, but implementing it correctly requires handling numerous edge cases: retry logic, backpressure management, message ordering, and fault tolerance.
This article explores a production-grade event-driven framework built on AWS SNS/SQS and Scala that processes millions of events daily. We’ll dive into the architectural patterns, implementation details, and hard-earned lessons from running this system at scale.
Architecture Overview: Change Data Capture → SNS → Lambda → Processing
The architecture follows a publish-subscribe pattern with these components:
Database Change
↓
Change Data Capture (CDC)
↓
SNS Topic (with FIFO support)
↓
SQS Queue (Dead Letter Queue for failures)
↓
AWS Lambda Consumer
↓
Business Logic Processing
↓
Downstream SystemsKey Design Decisions
- SNS for Fan-out: Single event published to SNS can trigger multiple subscribers
- SQS for Reliability: Messages persisted even if consumers are down
- Lambda for Scalability: Auto-scaling based on queue depth
- FIFO Queues: Message ordering guarantees within message groups
- Dead Letter Queues: Failed messages automatically routed for analysis
Building the Consumer Framework
Base Consumer Abstraction
The core of the system is a generic consumer trait that handles SQS event processing:
abstract class BaseSQSConsumerLambda[M, I, SPO, O] {
protected implicit val executionContext: ExecutionContext
// Convert SQS message string to domain model
def converter(inputStr: String): I
// Process single event
def process(input: I): Future[SPO]
// Aggregate results from batch processing
def outputProcessing(inputSeq: Seq[I], outputSeq: Seq[Try[SPO]]): O
// Maximum time for batch processing (AWS limit: 15 minutes)
def singleProcessTimeout: Duration
// Optional: Group related events for sequential processing
protected def groupLogically(inputSeq: Seq[I]): Seq[Seq[I]] =
inputSeq.map(input => Seq(input))
def handleRequest(event: SQSEvent, context: Context): O = {
setup()
// Extract messages from SQS event
val collectedInput = event
.getRecords
.asScala
.map(_.getBody)
.map(parseMessage) // Handle SNS wrapper
.map(converter)
.toSeq
// Process with controlled parallelism
val output = Await.result(
FutureUtil.doSequential(groupLogically(collectedInput)) { logicalGroup =>
// Events in each group processed asynchronously
FutureUtil.groupedSequence(logicalGroup)(process)
},
singleProcessTimeout
)
outputProcessing(collectedInput, output.flatten.flatten)
}
}Key Patterns
Type Safety: The generic parameters [M, I, SPO, O] ensure compile-time type safety:
M: Module type for dependency injectionI: Input event typeSPO: Single processing output typeO: Final aggregated output type
Grouped Processing: The groupLogically hook allows events to be processed sequentially within groups but concurrently between groups. This is critical for maintaining order when needed.
Timeout Management: AWS Lambda has a 15-minute maximum execution time. The framework respects this limit and fails fast if processing exceeds the threshold.
Entity Change Consumer Pattern
Building on the base consumer, we create a specialized consumer for database change events:
abstract class EntityChangeConsumer[M]
extends BaseSQSConsumerLambda[M, EntityChangeLog, EntityChangeLog, Option[EntityChangeLog]] {
protected val repositoryStore: RepositoryStore
// Subclasses implement specific business logic
protected def consumerSpecificImplementation(log: EntityChangeLog): Future[EntityChangeLog]
// Optionally filter deleted entities
def shouldFilterOutDeletedEntity: Boolean = false
// Main processing pipeline
override def process(log: EntityChangeLog): Future[EntityChangeLog] =
refreshNewEntity(log).flatMap { refreshedLog =>
if (refreshedLog.changeDetail.newEntity.deleted && shouldFilterOutDeletedEntity) {
Future.successful(
constructNoProcessingResponse(refreshedLog, "Entity deleted")
)
} else {
consumerSpecificImplementation(refreshedLog)
}
}
// Refresh entity from database to get latest state
protected def refreshNewEntity(log: EntityChangeLog): Future[EntityChangeLog] = {
val queryContext = QueryContext.getAllAccessContext(QueryingService.TaskWorker)
val entityId = log.changeDetail.newEntity.id
(log.changeDetail.entityType match {
case EntityType.Customer =>
repositoryStore.customerRepo
.where(Customer.IdField, Operator.Equal, entityId)(queryContext)
.buildReadQuery
.fetchOneOpt
case EntityType.Order =>
repositoryStore.orderRepo
.where(Order.IdField, Operator.Equal, entityId)(queryContext)
.buildReadQuery
.fetchOneOpt
// ... handle other entity types
}).map { refreshedEntity =>
log.copy(changeDetail = log.changeDetail.copy(newEntity = refreshedEntity))
}
}
// Parse BSON from SQS message
override def converter(inputStr: String): EntityChangeLog = {
val bsonDocument = BsonDocument(inputStr)
val bsonReader = new BsonDocumentReader(bsonDocument)
DatabaseProvider.CodecRegistry
.get(classOf[EntityChangeLog])
.decode(bsonReader, DecoderContext.builder().build())
}
}Why Refresh the Entity?
A critical insight: events can be stale by the time they’re processed. Between the database write and Lambda invocation, the entity may have changed. We always refresh from the database before processing to avoid race conditions.
SNS Producer with Retry Logic and Sharding
Publishing events requires careful handling of failures and load distribution:
class SNSProducerClient(
amazonSNS: AmazonSNS,
envConcurrencyFactor: Double,
pushSynchronously: Boolean
)(implicit ec: ExecutionContext) {
private val maxRetryCount = 3
private val threadSleepTime: Long = 5000
def addData(
topicArn: String,
partitionKey: String,
message: String,
entityType: String,
numberOfShards: Int = 15
): Future[Unit] = {
if (pushSynchronously) {
addDataWithRetry(topicArn, partitionKey, message, numberOfShards, entityType)
} else {
Future(addDataWithRetry(topicArn, partitionKey, message, numberOfShards, entityType))
}
}
private def addDataWithRetry(
topicArn: String,
partitionKey: String,
message: String,
numberOfShards: Int,
messageType: String,
retryCount: Int = 0
): Future[Unit] = {
if (retryCount == maxRetryCount) {
logger.error(s"Retry limit exceeded after $retryCount counts")
return Future.unit
}
val groupId = getUniqueHashForSharding(partitionKey, numberOfShards)
Future {
Try {
if (topicArn.endsWith("fifo")) {
// FIFO topic: message ordering guaranteed within group
val messageAttributes = Map(
"message_type" -> new MessageAttributeValue()
.withStringValue(messageType)
.withDataType("String")
).asJava
amazonSNS.publish(
new PublishRequest()
.withTopicArn(topicArn)
.withMessageDeduplicationId(generateUniqueId())
.withMessage(message)
.withMessageAttributes(messageAttributes)
.withMessageGroupId(groupId)
)
} else {
// Standard topic: best-effort ordering
amazonSNS.publish(
new PublishRequest()
.withTopicArn(topicArn)
.withMessage(message)
)
}
}
}.flatMap {
case Success(publishResult) =>
logger.debug(s"Published messageId=${publishResult.getMessageId}")
Future.unit
case Failure(exception: AmazonSNSException) if exception.getErrorCode == "Throttling" =>
// Exponential backoff on throttling
logger.warn(s"Throttled. Retrying after ${retryCount + 1} attempts")
Thread.sleep(getRetryTime(retryCount + 1))
addDataWithRetry(topicArn, partitionKey, message, numberOfShards, messageType, retryCount + 1)
case Failure(exception) =>
logger.error(s"Failed to publish: ${exception.getMessage}")
Future.unit
}
}
// Exponential backoff: 5s, 20s, 45s
private def getRetryTime(retryCount: Int): Long =
threadSleepTime * scala.math.pow(retryCount.toDouble, 2.0).toLong
// Shard distribution: sum of character codes modulo effective shards
private def getUniqueHashForSharding(key: String, numberOfShards: Int): String =
(key.map(_.toInt).sum % getEffectiveNumberOfShards(numberOfShards)).toString
// Scale shards based on environment (staging: 1x, prod: 2x)
private def getEffectiveNumberOfShards(numberOfShards: Int): Int =
Math.max(1, (numberOfShards * envConcurrencyFactor).ceil.toInt)
}Sharding Strategy
The getUniqueHashForSharding function distributes messages across FIFO message groups. Key insights:
- Deterministic Hashing: Same partition key always maps to same group
- Environment Scaling: Production gets 2x shards for higher throughput
- Load Distribution: Character sum provides reasonable distribution
Retry Strategy
The exponential backoff pattern 5s → 20s → 45s handles transient failures:
- Throttling Errors: AWS SNS rate limits are respected
- Network Glitches: Temporary network issues resolve themselves
- Downstream Outages: Brief service interruptions are tolerated
Controlled Parallelism with FutureUtil
A key challenge in async processing is controlling concurrency. Process too many events in parallel and you overwhelm databases or downstream services. Process too few and you leave capacity unused.
object FutureUtil {
private val DefaultConcurrentPoolSize = 10
private val MaxConcurrentPoolSize = 25
/**
* Process items in controlled batches
* - Individual batches execute concurrently (up to poolSize)
* - Batches execute sequentially
*/
def groupedSequence[T, R](
originSeq: Iterable[T],
concurrentPoolSize: Int = DefaultConcurrentPoolSize
)(futureCreator: T => Future[R])(implicit ec: ExecutionContext): Future[Seq[Try[R]]] = {
require(
concurrentPoolSize > 0 && concurrentPoolSize <= MaxConcurrentPoolSize,
s"Pool size must be between 1-$MaxConcurrentPoolSize"
)
doSequential(originSeq.grouped(concurrentPoolSize)) { batch =>
doConcurrent(batch)(futureCreator)
}.map(_.flatMap {
case Success(value) => value
case Failure(ex) =>
logger.error("Batch processing failed", ex)
throw ex
})
}
/**
* Execute futures sequentially
*/
def doSequential[T, R](originSeq: Iterable[T])(
futureCreator: T => Future[R]
)(implicit ec: ExecutionContext): Future[Seq[Try[R]]] = {
originSeq.headOption match {
case Some(head) =>
Future.fromTry(Try(futureCreator(head)))
.flatten
.map(Success(_))
.recover { case NonFatal(ex) => Failure(ex) }
.flatMap { headResult =>
doSequential(originSeq.tail)(futureCreator).map { tailResults =>
Seq(headResult) ++ tailResults
}
}
case None =>
Future.successful(Seq.empty)
}
}
/**
* Execute futures concurrently
*/
def doConcurrent[T, R](originSeq: Iterable[T])(
futureCreator: T => Future[R]
)(implicit ec: ExecutionContext): Future[Seq[Try[R]]] = {
Future.sequence(
originSeq.map { item =>
Future.fromTry(Try(futureCreator(item)))
.flatten
.map(Success(_))
.recover { case NonFatal(ex) => Failure(ex) }
}
).map(_.toSeq)
}
/**
* Compose multiple futures in parallel (type-safe tupling)
*/
def join[A, B](futureA: Future[A], futureB: Future[B])(
implicit ec: ExecutionContext
): Future[(A, B)] =
for {
a <- futureA
b <- futureB
} yield (a, b)
// Overloaded for 3-16 futures...
}Pattern: Grouped Sequence
This is the workhorse of the framework:
- Split input into batches of
poolSize(default 10) - Process each batch concurrently
- Wait for batch to complete before starting next batch
- Collect all results with error handling
This prevents overwhelming downstream services while maximizing throughput.
Handling Failures and Retries
Dead Letter Queues
Messages that fail processing after retries go to a Dead Letter Queue (DLQ):
Primary SQS Queue (maxReceiveCount=3)
↓
Lambda processes message
↓
Failure → message returned to queue
↓
After 3 failures → moved to DLQ
↓
Manual investigation/replayRate Limit Handling
Third-party API rate limits require special handling:
def processWithRateLimitRetry(log: EntityChangeLog): Future[EntityChangeLog] = {
accountingIntegration.syncInvoice(log).recoverWith {
case ex: RateLimitException =>
// Publish to retry topic with delay
snsProducer.addData(
topicArn = retryTopicArn,
partitionKey = log.id,
message = log.toJson,
entityType = "EntityChangeLog"
)
Future.successful(log.copy(
operationStatus = OperationStatus.Scheduled,
retryAttempts = log.retryAttempts + 1
))
}
}The retry topic has a delivery delay configured in SQS (1-15 minutes), allowing rate limits to reset.
Performance Optimization
1. Batch Processing
Instead of processing one event at a time:
// Anti-pattern: Sequential processing
events.foreach { event =>
Await.result(process(event), 30.seconds)
}
// Optimized: Grouped concurrent processing
FutureUtil.groupedSequence(events, poolSize = 10) { event =>
process(event)
}Result: 10x throughput improvement with controlled resource usage.
2. Entity Refresh Optimization
Refreshing entities individually is expensive:
// Before: N database queries
events.map { event =>
database.findById(event.entityId)
}
// After: Single batch query
val entityIds = events.map(_.entityId)
val entities = database.findByIds(entityIds)
val entityMap = entities.map(e => e.id -> e).toMap
events.map { event =>
entityMap.getOrElse(event.entityId, event.staleEntity)
}3. Connection Pooling
Lambda containers are reused, so we maintain connection pools:
object ConnectionPool {
lazy val mongoClient = MongoClient(uri)
lazy val httpClient = HttpClient.newBuilder()
.connectTimeout(Duration.ofSeconds(10))
.build()
}Important: Use lazy val for thread-safe initialization.
Production Lessons Learned
1. Always Handle Stale Data
Events can arrive out of order or be delayed. Always fetch the latest entity state before processing.
2. Make Consumers Idempotent
The same event may be delivered multiple times (at-least-once delivery). Ensure processing is idempotent:
def processPayment(event: PaymentEvent): Future[Unit] = {
// Check if already processed
database.findByIdempotencyKey(event.id).flatMap {
case Some(_) =>
logger.info(s"Already processed ${event.id}")
Future.unit
case None =>
// Process and store idempotency key
processNewPayment(event).flatMap { result =>
database.storeIdempotencyKey(event.id, result)
}
}
}3. Monitor Queue Depth
High queue depth indicates consumers can’t keep up:
// CloudWatch Alarm
QueueDepthAlarm:
Threshold: 1000 messages
Duration: 5 minutes
Action: Scale up Lambda concurrency4. Use Structured Logging
JSON logs with correlation IDs enable tracing events across services:
logger.info(
Map(
"event" -> "processing_started",
"entityId" -> entity.id,
"entityType" -> entity.entityType,
"correlationId" -> event.correlationId
).asJson
)5. Circuit Breakers for Downstream Services
Prevent cascading failures with circuit breakers:
val circuitBreaker = new CircuitBreaker(
maxFailures = 5,
callTimeout = 10.seconds,
resetTimeout = 1.minute
)
circuitBreaker.withCircuitBreaker {
downstreamService.call()
}.recover {
case _: CircuitBreakerOpenException =>
// Return cached value or fail gracefully
cachedValue
}When to Use This Pattern
✅ Use When:
- Processing database change events
- Fan-out to multiple subscribers needed
- Order preservation required within groups
- Retry and failure handling critical
- Scalability from 10 to 10,000 events/sec
❌ Avoid When:
- Synchronous responses required (use REST API)
- Processing must complete in <100ms (Lambda cold start overhead)
- Strict total ordering needed (single consumer may be better)
- Cost is primary concern (Lambda + SQS can be expensive at scale)
Conclusion
Building a production-grade event-driven system requires more than just connecting AWS services. The framework presented here handles the hard problems: retry logic, backpressure, entity consistency, and fault tolerance.
Key takeaways:
- Type-safe abstractions catch errors at compile time
- Controlled parallelism maximizes throughput without overwhelming services
- Retry strategies handle transient failures gracefully
- Entity refresh prevents stale data bugs
- Monitoring and logging enable rapid debugging
This architecture has processed billions of events over multiple years with 99.9% success rates. The patterns are battle-tested and ready for your production systems.