Distributed Systems 10 min read

Building Event-Driven Microservices with AWS SNS/SQS and Scala

Learn production-grade patterns for building scalable event-driven systems using AWS SNS/SQS and Scala. Covers retry logic, backpressure, message ordering, fault tolerance, and controlled parallelism for processing millions of events daily.

Building Event-Driven Microservices with AWS SNS/SQS and Scala

Introduction: The Challenge of Event-Driven Architecture

In modern distributed systems, services need to react to changes in real-time without tight coupling. Traditional request-response patterns create dependencies that limit scalability and make systems brittle. Event-driven architecture solves this by allowing services to react to events asynchronously, but implementing it correctly requires handling numerous edge cases: retry logic, backpressure management, message ordering, and fault tolerance.

This article explores a production-grade event-driven framework built on AWS SNS/SQS and Scala that processes millions of events daily. We’ll dive into the architectural patterns, implementation details, and hard-earned lessons from running this system at scale.

Architecture Overview: Change Data Capture → SNS → Lambda → Processing

The architecture follows a publish-subscribe pattern with these components:

Database Change

Change Data Capture (CDC)

SNS Topic (with FIFO support)

SQS Queue (Dead Letter Queue for failures)

AWS Lambda Consumer

Business Logic Processing

Downstream Systems

Key Design Decisions

  1. SNS for Fan-out: Single event published to SNS can trigger multiple subscribers
  2. SQS for Reliability: Messages persisted even if consumers are down
  3. Lambda for Scalability: Auto-scaling based on queue depth
  4. FIFO Queues: Message ordering guarantees within message groups
  5. Dead Letter Queues: Failed messages automatically routed for analysis

Building the Consumer Framework

Base Consumer Abstraction

The core of the system is a generic consumer trait that handles SQS event processing:

abstract class BaseSQSConsumerLambda[M, I, SPO, O] {
  protected implicit val executionContext: ExecutionContext

  // Convert SQS message string to domain model
  def converter(inputStr: String): I

  // Process single event
  def process(input: I): Future[SPO]

  // Aggregate results from batch processing
  def outputProcessing(inputSeq: Seq[I], outputSeq: Seq[Try[SPO]]): O

  // Maximum time for batch processing (AWS limit: 15 minutes)
  def singleProcessTimeout: Duration

  // Optional: Group related events for sequential processing
  protected def groupLogically(inputSeq: Seq[I]): Seq[Seq[I]] =
    inputSeq.map(input => Seq(input))

  def handleRequest(event: SQSEvent, context: Context): O = {
    setup()

    // Extract messages from SQS event
    val collectedInput = event
      .getRecords
      .asScala
      .map(_.getBody)
      .map(parseMessage) // Handle SNS wrapper
      .map(converter)
      .toSeq

    // Process with controlled parallelism
    val output = Await.result(
      FutureUtil.doSequential(groupLogically(collectedInput)) { logicalGroup =>
        // Events in each group processed asynchronously
        FutureUtil.groupedSequence(logicalGroup)(process)
      },
      singleProcessTimeout
    )

    outputProcessing(collectedInput, output.flatten.flatten)
  }
}

Key Patterns

Type Safety: The generic parameters [M, I, SPO, O] ensure compile-time type safety:

  • M: Module type for dependency injection
  • I: Input event type
  • SPO: Single processing output type
  • O: Final aggregated output type

Grouped Processing: The groupLogically hook allows events to be processed sequentially within groups but concurrently between groups. This is critical for maintaining order when needed.

Timeout Management: AWS Lambda has a 15-minute maximum execution time. The framework respects this limit and fails fast if processing exceeds the threshold.

Entity Change Consumer Pattern

Building on the base consumer, we create a specialized consumer for database change events:

abstract class EntityChangeConsumer[M]
  extends BaseSQSConsumerLambda[M, EntityChangeLog, EntityChangeLog, Option[EntityChangeLog]] {

  protected val repositoryStore: RepositoryStore

  // Subclasses implement specific business logic
  protected def consumerSpecificImplementation(log: EntityChangeLog): Future[EntityChangeLog]

  // Optionally filter deleted entities
  def shouldFilterOutDeletedEntity: Boolean = false

  // Main processing pipeline
  override def process(log: EntityChangeLog): Future[EntityChangeLog] =
    refreshNewEntity(log).flatMap { refreshedLog =>
      if (refreshedLog.changeDetail.newEntity.deleted && shouldFilterOutDeletedEntity) {
        Future.successful(
          constructNoProcessingResponse(refreshedLog, "Entity deleted")
        )
      } else {
        consumerSpecificImplementation(refreshedLog)
      }
    }

  // Refresh entity from database to get latest state
  protected def refreshNewEntity(log: EntityChangeLog): Future[EntityChangeLog] = {
    val queryContext = QueryContext.getAllAccessContext(QueryingService.TaskWorker)
    val entityId = log.changeDetail.newEntity.id

    (log.changeDetail.entityType match {
      case EntityType.Customer =>
        repositoryStore.customerRepo
          .where(Customer.IdField, Operator.Equal, entityId)(queryContext)
          .buildReadQuery
          .fetchOneOpt
      case EntityType.Order =>
        repositoryStore.orderRepo
          .where(Order.IdField, Operator.Equal, entityId)(queryContext)
          .buildReadQuery
          .fetchOneOpt
      // ... handle other entity types
    }).map { refreshedEntity =>
      log.copy(changeDetail = log.changeDetail.copy(newEntity = refreshedEntity))
    }
  }

  // Parse BSON from SQS message
  override def converter(inputStr: String): EntityChangeLog = {
    val bsonDocument = BsonDocument(inputStr)
    val bsonReader = new BsonDocumentReader(bsonDocument)
    DatabaseProvider.CodecRegistry
      .get(classOf[EntityChangeLog])
      .decode(bsonReader, DecoderContext.builder().build())
  }
}

Why Refresh the Entity?

A critical insight: events can be stale by the time they’re processed. Between the database write and Lambda invocation, the entity may have changed. We always refresh from the database before processing to avoid race conditions.

SNS Producer with Retry Logic and Sharding

Publishing events requires careful handling of failures and load distribution:

class SNSProducerClient(
  amazonSNS: AmazonSNS,
  envConcurrencyFactor: Double,
  pushSynchronously: Boolean
)(implicit ec: ExecutionContext) {

  private val maxRetryCount = 3
  private val threadSleepTime: Long = 5000

  def addData(
    topicArn: String,
    partitionKey: String,
    message: String,
    entityType: String,
    numberOfShards: Int = 15
  ): Future[Unit] = {
    if (pushSynchronously) {
      addDataWithRetry(topicArn, partitionKey, message, numberOfShards, entityType)
    } else {
      Future(addDataWithRetry(topicArn, partitionKey, message, numberOfShards, entityType))
    }
  }

  private def addDataWithRetry(
    topicArn: String,
    partitionKey: String,
    message: String,
    numberOfShards: Int,
    messageType: String,
    retryCount: Int = 0
  ): Future[Unit] = {
    if (retryCount == maxRetryCount) {
      logger.error(s"Retry limit exceeded after $retryCount counts")
      return Future.unit
    }

    val groupId = getUniqueHashForSharding(partitionKey, numberOfShards)

    Future {
      Try {
        if (topicArn.endsWith("fifo")) {
          // FIFO topic: message ordering guaranteed within group
          val messageAttributes = Map(
            "message_type" -> new MessageAttributeValue()
              .withStringValue(messageType)
              .withDataType("String")
          ).asJava

          amazonSNS.publish(
            new PublishRequest()
              .withTopicArn(topicArn)
              .withMessageDeduplicationId(generateUniqueId())
              .withMessage(message)
              .withMessageAttributes(messageAttributes)
              .withMessageGroupId(groupId)
          )
        } else {
          // Standard topic: best-effort ordering
          amazonSNS.publish(
            new PublishRequest()
              .withTopicArn(topicArn)
              .withMessage(message)
          )
        }
      }
    }.flatMap {
      case Success(publishResult) =>
        logger.debug(s"Published messageId=${publishResult.getMessageId}")
        Future.unit

      case Failure(exception: AmazonSNSException) if exception.getErrorCode == "Throttling" =>
        // Exponential backoff on throttling
        logger.warn(s"Throttled. Retrying after ${retryCount + 1} attempts")
        Thread.sleep(getRetryTime(retryCount + 1))
        addDataWithRetry(topicArn, partitionKey, message, numberOfShards, messageType, retryCount + 1)

      case Failure(exception) =>
        logger.error(s"Failed to publish: ${exception.getMessage}")
        Future.unit
    }
  }

  // Exponential backoff: 5s, 20s, 45s
  private def getRetryTime(retryCount: Int): Long =
    threadSleepTime * scala.math.pow(retryCount.toDouble, 2.0).toLong

  // Shard distribution: sum of character codes modulo effective shards
  private def getUniqueHashForSharding(key: String, numberOfShards: Int): String =
    (key.map(_.toInt).sum % getEffectiveNumberOfShards(numberOfShards)).toString

  // Scale shards based on environment (staging: 1x, prod: 2x)
  private def getEffectiveNumberOfShards(numberOfShards: Int): Int =
    Math.max(1, (numberOfShards * envConcurrencyFactor).ceil.toInt)
}

Sharding Strategy

The getUniqueHashForSharding function distributes messages across FIFO message groups. Key insights:

  1. Deterministic Hashing: Same partition key always maps to same group
  2. Environment Scaling: Production gets 2x shards for higher throughput
  3. Load Distribution: Character sum provides reasonable distribution

Retry Strategy

The exponential backoff pattern 5s → 20s → 45s handles transient failures:

  • Throttling Errors: AWS SNS rate limits are respected
  • Network Glitches: Temporary network issues resolve themselves
  • Downstream Outages: Brief service interruptions are tolerated

Controlled Parallelism with FutureUtil

A key challenge in async processing is controlling concurrency. Process too many events in parallel and you overwhelm databases or downstream services. Process too few and you leave capacity unused.

object FutureUtil {
  private val DefaultConcurrentPoolSize = 10
  private val MaxConcurrentPoolSize = 25

  /**
   * Process items in controlled batches
   * - Individual batches execute concurrently (up to poolSize)
   * - Batches execute sequentially
   */
  def groupedSequence[T, R](
    originSeq: Iterable[T],
    concurrentPoolSize: Int = DefaultConcurrentPoolSize
  )(futureCreator: T => Future[R])(implicit ec: ExecutionContext): Future[Seq[Try[R]]] = {

    require(
      concurrentPoolSize > 0 && concurrentPoolSize <= MaxConcurrentPoolSize,
      s"Pool size must be between 1-$MaxConcurrentPoolSize"
    )

    doSequential(originSeq.grouped(concurrentPoolSize)) { batch =>
      doConcurrent(batch)(futureCreator)
    }.map(_.flatMap {
      case Success(value) => value
      case Failure(ex) =>
        logger.error("Batch processing failed", ex)
        throw ex
    })
  }

  /**
   * Execute futures sequentially
   */
  def doSequential[T, R](originSeq: Iterable[T])(
    futureCreator: T => Future[R]
  )(implicit ec: ExecutionContext): Future[Seq[Try[R]]] = {
    originSeq.headOption match {
      case Some(head) =>
        Future.fromTry(Try(futureCreator(head)))
          .flatten
          .map(Success(_))
          .recover { case NonFatal(ex) => Failure(ex) }
          .flatMap { headResult =>
            doSequential(originSeq.tail)(futureCreator).map { tailResults =>
              Seq(headResult) ++ tailResults
            }
          }
      case None =>
        Future.successful(Seq.empty)
    }
  }

  /**
   * Execute futures concurrently
   */
  def doConcurrent[T, R](originSeq: Iterable[T])(
    futureCreator: T => Future[R]
  )(implicit ec: ExecutionContext): Future[Seq[Try[R]]] = {
    Future.sequence(
      originSeq.map { item =>
        Future.fromTry(Try(futureCreator(item)))
          .flatten
          .map(Success(_))
          .recover { case NonFatal(ex) => Failure(ex) }
      }
    ).map(_.toSeq)
  }

  /**
   * Compose multiple futures in parallel (type-safe tupling)
   */
  def join[A, B](futureA: Future[A], futureB: Future[B])(
    implicit ec: ExecutionContext
  ): Future[(A, B)] =
    for {
      a <- futureA
      b <- futureB
    } yield (a, b)

  // Overloaded for 3-16 futures...
}

Pattern: Grouped Sequence

This is the workhorse of the framework:

  1. Split input into batches of poolSize (default 10)
  2. Process each batch concurrently
  3. Wait for batch to complete before starting next batch
  4. Collect all results with error handling

This prevents overwhelming downstream services while maximizing throughput.

Handling Failures and Retries

Dead Letter Queues

Messages that fail processing after retries go to a Dead Letter Queue (DLQ):

Primary SQS Queue (maxReceiveCount=3)

Lambda processes message

Failure → message returned to queue

After 3 failures → moved to DLQ

Manual investigation/replay

Rate Limit Handling

Third-party API rate limits require special handling:

def processWithRateLimitRetry(log: EntityChangeLog): Future[EntityChangeLog] = {
  accountingIntegration.syncInvoice(log).recoverWith {
    case ex: RateLimitException =>
      // Publish to retry topic with delay
      snsProducer.addData(
        topicArn = retryTopicArn,
        partitionKey = log.id,
        message = log.toJson,
        entityType = "EntityChangeLog"
      )
      Future.successful(log.copy(
        operationStatus = OperationStatus.Scheduled,
        retryAttempts = log.retryAttempts + 1
      ))
  }
}

The retry topic has a delivery delay configured in SQS (1-15 minutes), allowing rate limits to reset.

Performance Optimization

1. Batch Processing

Instead of processing one event at a time:

// Anti-pattern: Sequential processing
events.foreach { event =>
  Await.result(process(event), 30.seconds)
}

// Optimized: Grouped concurrent processing
FutureUtil.groupedSequence(events, poolSize = 10) { event =>
  process(event)
}

Result: 10x throughput improvement with controlled resource usage.

2. Entity Refresh Optimization

Refreshing entities individually is expensive:

// Before: N database queries
events.map { event =>
  database.findById(event.entityId)
}

// After: Single batch query
val entityIds = events.map(_.entityId)
val entities = database.findByIds(entityIds)
val entityMap = entities.map(e => e.id -> e).toMap
events.map { event =>
  entityMap.getOrElse(event.entityId, event.staleEntity)
}

3. Connection Pooling

Lambda containers are reused, so we maintain connection pools:

object ConnectionPool {
  lazy val mongoClient = MongoClient(uri)
  lazy val httpClient = HttpClient.newBuilder()
    .connectTimeout(Duration.ofSeconds(10))
    .build()
}

Important: Use lazy val for thread-safe initialization.

Production Lessons Learned

1. Always Handle Stale Data

Events can arrive out of order or be delayed. Always fetch the latest entity state before processing.

2. Make Consumers Idempotent

The same event may be delivered multiple times (at-least-once delivery). Ensure processing is idempotent:

def processPayment(event: PaymentEvent): Future[Unit] = {
  // Check if already processed
  database.findByIdempotencyKey(event.id).flatMap {
    case Some(_) =>
      logger.info(s"Already processed ${event.id}")
      Future.unit
    case None =>
      // Process and store idempotency key
      processNewPayment(event).flatMap { result =>
        database.storeIdempotencyKey(event.id, result)
      }
  }
}

3. Monitor Queue Depth

High queue depth indicates consumers can’t keep up:

// CloudWatch Alarm
QueueDepthAlarm:
  Threshold: 1000 messages
  Duration: 5 minutes
  Action: Scale up Lambda concurrency

4. Use Structured Logging

JSON logs with correlation IDs enable tracing events across services:

logger.info(
  Map(
    "event" -> "processing_started",
    "entityId" -> entity.id,
    "entityType" -> entity.entityType,
    "correlationId" -> event.correlationId
  ).asJson
)

5. Circuit Breakers for Downstream Services

Prevent cascading failures with circuit breakers:

val circuitBreaker = new CircuitBreaker(
  maxFailures = 5,
  callTimeout = 10.seconds,
  resetTimeout = 1.minute
)

circuitBreaker.withCircuitBreaker {
  downstreamService.call()
}.recover {
  case _: CircuitBreakerOpenException =>
    // Return cached value or fail gracefully
    cachedValue
}

When to Use This Pattern

✅ Use When:

  • Processing database change events
  • Fan-out to multiple subscribers needed
  • Order preservation required within groups
  • Retry and failure handling critical
  • Scalability from 10 to 10,000 events/sec

❌ Avoid When:

  • Synchronous responses required (use REST API)
  • Processing must complete in <100ms (Lambda cold start overhead)
  • Strict total ordering needed (single consumer may be better)
  • Cost is primary concern (Lambda + SQS can be expensive at scale)

Conclusion

Building a production-grade event-driven system requires more than just connecting AWS services. The framework presented here handles the hard problems: retry logic, backpressure, entity consistency, and fault tolerance.

Key takeaways:

  1. Type-safe abstractions catch errors at compile time
  2. Controlled parallelism maximizes throughput without overwhelming services
  3. Retry strategies handle transient failures gracefully
  4. Entity refresh prevents stale data bugs
  5. Monitoring and logging enable rapid debugging

This architecture has processed billions of events over multiple years with 99.9% success rates. The patterns are battle-tested and ready for your production systems.

Further Reading

Back to Blog

Related Posts

View All Posts »

Designing Type-Safe Query DSLs in Scala

Build compile-time safe database queries with zero runtime string errors. Learn how to create fluent query APIs that catch typos, type mismatches, and schema changes at compile time using Scala's type system.