Distributed Systems 7 min read

Event-driven microservices on SNS, SQS and Scala

Notes on a Scala framework that consumes SNS/SQS change events through Lambda — the pieces that mattered: type-safe consumer base classes, controlled parallelism via grouped futures, FIFO sharding, and the entity-refresh trick that saves you from stale reads.

Event-driven microservices on SNS, SQS and Scala

A retry topic was the thing that finally made me write this down. Specifically: an accounting integration that kept hitting a third-party rate limit, and the realisation that “retry” wasn’t one behaviour but three different ones depending on which failure we’d hit. That’s the post — the bits of an SNS/SQS/Lambda framework in Scala that earned their keep, and the bits where the abstraction was the whole point.

The shape is unsurprising. A database write triggers change data capture, CDC publishes to an SNS topic, SNS fans out to one or more SQS queues, a Lambda drains each queue. SNS gives you fan-out; SQS gives you durability when consumers are down; Lambda scales on queue depth; FIFO topics give you in-group ordering when you actually need it. None of that is novel. What’s worth talking about is what sits inside the Lambda.

The consumer base class

Every consumer we run inherits from one abstract class. The generics are doing real work — they keep the input type, the per-event output, and the aggregated batch output distinct, so a misuse shows up at compile time rather than as a JSON shape mismatch in production.

abstract class BaseSQSConsumerLambda[M, I, SPO, O] {
  protected implicit val executionContext: ExecutionContext

  def converter(inputStr: String): I
  def process(input: I): Future[SPO]
  def outputProcessing(inputSeq: Seq[I], outputSeq: Seq[Try[SPO]]): O
  def singleProcessTimeout: Duration

  protected def groupLogically(inputSeq: Seq[I]): Seq[Seq[I]] =
    inputSeq.map(input => Seq(input))

  def handleRequest(event: SQSEvent, context: Context): O = {
    setup()

    val collectedInput = event
      .getRecords
      .asScala
      .map(_.getBody)
      .map(parseMessage) // Handle SNS wrapper
      .map(converter)
      .toSeq

    val output = Await.result(
      FutureUtil.doSequential(groupLogically(collectedInput)) { logicalGroup =>
        FutureUtil.groupedSequence(logicalGroup)(process)
      },
      singleProcessTimeout
    )

    outputProcessing(collectedInput, output.flatten.flatten)
  }
}

The piece that gets used the most is groupLogically. By default each event is its own group, but a consumer that needs sequential ordering inside, say, an order’s lifecycle can override it to bucket events by order ID. Inside a group we run sequentially. Across groups we run concurrently. That’s the only ordering guarantee we make, and it’s enough for everything we actually do.

singleProcessTimeout exists because Lambda will kill you at 15 minutes whether you’re ready or not. If a batch is going to bust that ceiling we’d rather fail fast and let SQS redeliver than have the runtime cut us off mid-write.

Why we always refresh the entity

The most useful pattern in the framework is also the least clever: before processing a change event, re-read the entity from the database.

override def process(log: EntityChangeLog): Future[EntityChangeLog] =
  refreshNewEntity(log).flatMap { refreshedLog =>
    if (refreshedLog.changeDetail.newEntity.deleted && shouldFilterOutDeletedEntity) {
      Future.successful(constructNoProcessingResponse(refreshedLog, "Entity deleted"))
    } else {
      consumerSpecificImplementation(refreshedLog)
    }
  }

The reason is that the event in the queue is, by the time you read it, almost always older than the row in the database. Network, queue depth, retry — pick one. If your consumer acts on the event payload directly, you’ll happily process state that no longer exists, send notifications about prices that have already changed, or worse, replay a delete against an entity that’s since been resurrected. Reading once on entry is the cheapest insurance you can buy against that whole class of bug.

There’s a cost — it’s an extra read per event — but for almost every consumer we run, the read is small and the alternative is “occasionally do the wrong thing, silently.”

Controlled parallelism

Future.sequence over a hundred items will happily fire a hundred concurrent calls at whatever’s downstream. A few of those have shared databases, third-party APIs with quotas, or both. The framework forces every batch through a groupedSequence helper:

def groupedSequence[T, R](
  originSeq: Iterable[T],
  concurrentPoolSize: Int = DefaultConcurrentPoolSize
)(futureCreator: T => Future[R])(implicit ec: ExecutionContext): Future[Seq[Try[R]]] = {

  require(
    concurrentPoolSize > 0 && concurrentPoolSize <= MaxConcurrentPoolSize,
    s"Pool size must be between 1-$MaxConcurrentPoolSize"
  )

  doSequential(originSeq.grouped(concurrentPoolSize)) { batch =>
    doConcurrent(batch)(futureCreator)
  }.map(_.flatMap {
    case Success(value) => value
    case Failure(ex)    =>
      logger.error("Batch processing failed", ex)
      throw ex
  })
}

The shape is “split into batches of N, run each batch concurrently, run batches sequentially, collect results with Try so one failure doesn’t poison the rest.” Default pool size is 10, hard ceiling is 25. That ceiling is set low on purpose — if a consumer thinks it needs more than 25, what it actually needs is a different downstream contract.

Producer: sharding and the retry topic

The SNS producer is two things welded together. One is sharding for FIFO topics — a deterministic hash of the partition key into one of N message-group IDs, so events for the same key land on the same group and order is preserved within it:

private def getUniqueHashForSharding(key: String, numberOfShards: Int): String =
  (key.map(_.toInt).sum % getEffectiveNumberOfShards(numberOfShards)).toString

private def getEffectiveNumberOfShards(numberOfShards: Int): Int =
  Math.max(1, (numberOfShards * envConcurrencyFactor).ceil.toInt)

The character-sum hash is fine for our key distribution; if your keys cluster heavily you’d want a real hash function. The envConcurrencyFactor exists so staging can run with fewer shards than prod without changing call sites.

The other thing the producer does is exponential backoff on Throttling errors specifically — 5s → 20s → 45s — and then gives up. The give-up is deliberate. We don’t want a poison message looping forever on a transient failure that’s actually a permanent one; SQS’s maxReceiveCount plus a DLQ is the right place to catch that, not application-level retries.

The interesting retry case isn’t transient AWS throttling, though. It’s third-party rate limits — the kind where you’ve burned your daily quota and need to come back tomorrow:

def processWithRateLimitRetry(log: EntityChangeLog): Future[EntityChangeLog] = {
  accountingIntegration.syncInvoice(log).recoverWith {
    case ex: RateLimitException =>
      snsProducer.addData(
        topicArn       = retryTopicArn,
        partitionKey   = log.id,
        message        = log.toJson,
        entityType     = "EntityChangeLog"
      )
      Future.successful(log.copy(
        operationStatus = OperationStatus.Scheduled,
        retryAttempts   = log.retryAttempts + 1
      ))
  }
}

Catching the rate-limit and republishing to a separate retry topic — with the SQS delivery delay set to something like 15 minutes — turns “we can’t process this right now” into “we’ll process this when we can,” without holding a Lambda thread or blocking the rest of the batch. The first time this saved us from a CI-level reprocessing job at 3am was the moment the abstraction paid for itself.

A few things we learned the boring way

Idempotency isn’t optional. SQS is at-least-once, which means redelivery is normal, not exceptional. Every consumer that mutates state checks an idempotency key before doing the work. We’ve never regretted adding that check; we have regretted not having it.

Lambda containers are reused, so connection pools belong in lazy vals at object scope, not inside the handler. The first invocation pays for the pool; everything after that gets the warm version.

Logs need correlation IDs. Without one, “this entity got into a weird state” is hours of grep across three Lambdas and four queues. With one, it’s a single query.

When a consumer’s queue depth alarms fire, the answer is almost never “raise Lambda concurrency.” It’s “find the downstream that’s gating us.” Concurrency limits at the Lambda level are usually the symptom, not the bottleneck.

Where this pattern earns its keep

CDC fan-out — one database change, several consumers, each doing a different thing — is what this shape is for. Order preservation only inside groups, not globally. Failure handling that distinguishes “transient” from “rate-limited” from “broken.” Throughput high enough that you can’t process events serially but bounded enough that 25 concurrent calls per batch is a reasonable ceiling.

It’s the wrong shape if you need a synchronous response, or sub-100ms tail latency (Lambda cold starts will eat you), or strict global ordering. For those, a single long-running consumer or a different transport is honest. For everything else, the pattern’s been quiet and steady, which is roughly the highest compliment a piece of infrastructure can get.

Back to Blog

Related Posts

View All Posts »

Some Decisions Aren't Decisions

Someone senior pushed back on why we'd isolated our callback servers instead of just scaling the API vertically. I stopped arguing mid-explanation — not because he was right, but because I couldn't put words to defaults my team had long since stopped questioning.

We Tripled the Test Suite. Then Everything Else Had to Change.

I started the quarter trying to raise test coverage on one monorepo. I finished it having also rewritten the test pipeline, the coverage gate, the deploy workflows, and most of the backend's dependency stack. None of it was on the ticket. All of it followed from the ticket.