Functional programming patterns in Scala that earned their keep
The first time I had to read someone else’s flatMap pyramid in a Scala service — six levels deep, every step waiting on the last one even though most of them didn’t depend on each other — I spent an afternoon untangling it before I touched any logic. None of it had to be sequential. The author had reached for for-comprehensions because that’s what the tutorials show, and the result ran at the speed of the slowest call times five.
This post is about the small set of Future utilities we ended up writing to stop that from happening, and the rules we eventually settled on for when to reach for them.
The pyramid
Most Scala codebases I’ve worked in start with something shaped like this:
def processOrder(orderId: String): Future[OrderConfirmation] = {
orderRepository.findById(orderId).flatMap { order =>
userRepository.findById(order.userId).flatMap { user =>
paymentService.charge(user, order.total).flatMap { payment =>
inventoryService.reserve(order.items).flatMap { reservation =>
emailService.sendConfirmation(user.email, order).map { _ =>
OrderConfirmation(order.id, payment.id, reservation.id)
}
}
}
}
}
}Two things are wrong here. The shape is hard to read, which a for-comprehension fixes. But the deeper problem is that the charge, the reservation, and the email all sit serial behind each other when only the first two have any real ordering relationship — and the email doesn’t depend on either. The for-comprehension fix doesn’t solve that; it just makes it look prettier.
Parallel composition with join
The trick is starting the futures outside the comprehension so they fire concurrently, then collecting them. We wrapped this in a small FutureUtil.join family because writing it inline every time leads to mistakes — particularly forgetting and ending up sequential again.
object FutureUtil {
def join[A, B](
futureA: Future[A],
futureB: Future[B]
)(implicit ec: ExecutionContext): Future[(A, B)] =
for {
a <- futureA
b <- futureB
} yield (a, b)
def join[A, B, C](
futureA: Future[A],
futureB: Future[B],
futureC: Future[C]
)(implicit ec: ExecutionContext): Future[(A, B, C)] =
for {
a <- futureA
b <- futureB
c <- futureC
} yield (a, b, c)
// ...overloaded up through the arity we actually use
}The futures are constructed at the call site — already running — and the for only awaits them. With that in hand, the order pyramid flattens:
def processOrder(orderId: String): Future[OrderConfirmation] =
for {
order <- orderRepository.findById(orderId)
user <- userRepository.findById(order.userId)
(payment, reservation, _) <- FutureUtil.join(
paymentService.charge(user, order.total),
inventoryService.reserve(order.items),
emailService.sendConfirmation(user.email, order)
)
} yield OrderConfirmation(order.id, payment.id, reservation.id)Whether this is actually faster depends on the relative latencies of the three calls — if the email is fast and the payment is slow, you save very little. The win we cared about was readability: the diff between “these run together” and “these run one after the other” is now visible in the source instead of hidden in the indentation.
You can do the same thing with Future.sequence over a list, or with applicative builders from Cats. The reason we kept a tupled join was type-checked positional results — (payment, reservation, _) destructures cleanly, and the compiler tells us when the arity drifts. Future.sequence returns a Seq and loses the per-position types.
Controlled concurrency with groupedSequence
join is for a fixed, small number of futures with different types. The other case is “I have N items of the same type, run them, but don’t blast the downstream service.” We hit this every time we did anything bulk — sending notifications, backfilling rows, calling a rate-limited third-party API.
object FutureUtil {
private val DefaultConcurrentPoolSize = 10
private val MaxConcurrentPoolSize = 25
def groupedSequence[T, R](
originSeq: Iterable[T],
concurrentPoolSize: Int = DefaultConcurrentPoolSize
)(futureCreator: T => Future[R])(implicit ec: ExecutionContext): Future[Seq[Try[R]]] = {
require(
concurrentPoolSize > 0 && concurrentPoolSize <= MaxConcurrentPoolSize,
s"Pool size must be between 1 and $MaxConcurrentPoolSize"
)
val batches = originSeq.grouped(concurrentPoolSize)
doSequential(batches) { batch =>
doConcurrent(batch)(futureCreator)
}
}
}Batches run sequentially; within a batch, items run concurrently. The cap of 25 was a guardrail — if someone needed more parallelism than that, they should be reaching for a streaming library, not a Future helper. We had Default = 10 because that was about what most of our downstreams could absorb without a noticeable latency tail. It’s a tunable, not a law.
Usage looked like this:
def sendBulkNotifications(userIds: Seq[String]): Future[Seq[Try[NotificationResult]]] =
FutureUtil.groupedSequence(userIds, concurrentPoolSize = 10) { userId =>
for {
user <- userRepository.findById(userId)
prefs <- preferenceRepository.findByUserId(userId)
result <-
if (prefs.emailEnabled) emailService.send(user.email, generateMessage(user))
else Future.successful(NotificationResult.Skipped)
} yield result
}The return type matters — Seq[Try[R]], not Seq[R]. One bad row shouldn’t take out the batch. Which leads to the next thing.
Errors are values
The problem with try/catch inside a Future chain is that an exception in one branch poisons everything downstream of it. recover and recoverWith help, but they encourage you to write recovery in line, where it’s easy to silently swallow.
We pushed errors to the type level instead. groupedSequence returns Seq[Try[R]], and the caller decides what “partial failure” means for them. Sometimes that’s “log and continue,” sometimes that’s “if any failed, fail the whole thing,” sometimes it’s “succeed if more than 90% succeeded.” Those are policy decisions the utility shouldn’t bake in.
def processOrdersBatch(orderIds: Seq[String]): Future[BatchResult] =
FutureUtil.groupedSequence(orderIds, concurrentPoolSize = 10) { orderId =>
processOrder(orderId)
}.map { results =>
val (successes, failures) = results.partition(_.isSuccess)
BatchResult(
totalProcessed = results.size,
successCount = successes.size,
failureCount = failures.size,
errors = failures.collect { case Failure(ex) => ex.getMessage }
)
}The thing this gets you that a try/catch wrapper doesn’t: failures don’t stop the run. If 7 of 1,000 orders blow up because of a bad payload, the other 993 still process, and the 7 land in errors for whoever picks up the alert.
What this doesn’t cover
Retries with backoff, timeouts, and circuit breakers are all reasonable next layers — but I’d reach for the libraries before I rolled my own. Akka’s CircuitBreaker is well-tested. pekko.pattern.after gives you a clean timeout. For retries, either Cats Retry or a small recoverWith + Scheduler.scheduleOnce loop is fine. The reason I’d not paste my own circuit breaker into this post is that the obvious sketch — @volatile state, check-then-set on success/failure — has a race window between the check and the assignment. It works in tests and bites you in incident review.
If you’re starting fresh on a service that’s going to lean heavily on async composition, look at Cats Effect or ZIO. The Future-based patterns above are what we used because we were already on Scala Future and didn’t want to rewrite half the codebase. They’re enough to keep the everyday cases legible. They’re not a substitute for a real effect system if you need cancellation, fiber-level resource safety, or precise execution-context control.
The actual rule
Use join when you have two or three futures of different types that don’t depend on each other. Use groupedSequence when you have a list and you need backpressure. Surface errors as Try at the boundary instead of catching them inline. Most of the rest is wallpaper over those three ideas.