Blog

Parallel Collections and Actors in Scala

28 Jun, 2024
Xebia Background Header Wave

A while ago, I wrote a small python program that accesses the api provided by my utility company. The program collects data about my power usage, which allows me to create a graph. This works, but then I thought I might use this problem (read data from an api, store it in a database and run queries) to practice some scala. I’ll be joining a team using scala as their main language for services, so after not having used scala in about a decade, I figured a refresher was in order.

I used a starter project from Typesafe to get going. The advantage of having solved the problem in a different language, is that you don’t have to think about algorithms and solutions. All you have to do is translate python into scala. Easy.

What I ended up with was my python program with scala syntax, though I had to make many changes of course.

One of the things that bothered me, was that the api of my utility company is really slow, just like the app they offer (Rant finding out yesterdays’ power consumption requires closing two adds, the same ones I’ve closed for over two years now, then clicking on a graph, then I get a graph by year which I never want, so I can select a graph by day, then select yesterday. Sigh. EndOfRant). My python program uses the same api the website does, so it’s basically also really slow. If you want to load all data since 2016, you’re in for serious wait. “Why would you want to do that more than once?” you’ll ask. Well because you’ve removed the Docker volume that stored the data, in a misguided attempt to clean up junk from your laptop (sigh, again).

To speed things up, we need parallelism. That could be done in python, but the last time I looked into that, it used operating system threads, so you can’t go faster than the number of threads in your cpu. That would be just fine for my usecase, but I was beyond being practical by now. And besides, this was about learning new things, so practicality would only be a minor issue anyway.

I started out with a literal translation of my python code. The algorithm is roughly like this:

  1. initialize a bunch of variables
  2. access the api to get new data
  3. transform and save the data, update a record count
  4. check if there may be more, if yes go to step 2

Here’s version 1:

def updateImperativeVersion(startDate: DateTime, endDate: DateTime): Future[UpdateResult] = {
  logger.info("imperative update")
  val token = pesReader.login()
  var count = 0
  var startOfInterval = startDate
  val endOfPeriod = minDate(endDate, DateTime.now().minusDays(1))
  while (shouldIContinue(startOfInterval, endOfPeriod)) {
    val newRecordCount = updateDataForInterval(startOfInterval, endOfPeriod, token)
    count = count + newRecordCount
    startOfInterval = startOfInterval.plusDays(14)
  }

  logger.info(s"updated $count records")
  val updateResult = UpdateResult("200", count)
  Future {
    updateResult
  }
}

Note the two vars:

  var count = 0
  var startOfInterval = startDate

Always a smell, of course. Even if we wished, this solution couldn’t be parallelized because of the vars.

Let’s move on. The problem can at least be done a bit more functional, like so:

def update(startDate: DateTime, endDate: DateTime): Future[UpdateResult] = {
  logger.info("sequential update")
  val token = pesReader.login()
  val endOfPeriod = minDate(endDate, DateTime.now().minusDays(1))
  val days = Days.daysBetween(startDate, endDate).getDays
  val numberOfDaysUpdated = (0 to days by 14)
    .to(LazyList)
    .map(i => updateDataForInterval(startDate.plusDays(i), endOfPeriod, token))
    .sum

  logger.info(s"updated $numberOfDaysUpdated records")
  val updateResult = UpdateResult("200", numberOfDaysUpdated)
  Future {
    updateResult
  }
}

No more vars. And a nice and modern LazyList. But also no parallelism, this solution is not faster than the first one:

reading data from 2018-01-01T00:00:00.000+01:00 found 360 records
reading data from 2018-01-15T00:00:00.000+01:00 found 360 records
reading data from 2018-01-29T00:00:00.000+01:00 found 96 records

It’s really easy to parallelize this solution, all it takes is adding a simple .par in the right place:

import scala.collection.parallel.CollectionConverters.ImmutableIterableIsParallelizable

// ...

def updatePar(startDate: DateTime, endDate: DateTime): Future[UpdateResult] = {
  logger.info("par update")
  val token = pesReader.login()
  val endOfPeriod = minDate(endDate, DateTime.now().minusDays(1))
  val days = Days.daysBetween(startDate, endDate).getDays

  val numberOfDaysUpdated =
    (0 to days by 14)
      .to(LazyList)
      .par     // <------- Magic
      .map(i => updateDataForInterval(startDate.plusDays(i), endOfPeriod, token))
      .sum

  logger.info(s"updated $numberOfDaysUpdated records")
  val updateResult = UpdateResult("200", numberOfDaysUpdated)
  Future {
    updateResult
  }
}

Note that the difference between this solution and the previous one, is just one line with .par, and an import statement. Subtle.

This code will start threads and the number should be configurable with a ForkJoinPool, but that was beyond my skills and I didn’t feel like spending a lot of time find out how that works. Because there’s an even fancier solution available.

Because I selected the Typesafe stack, I was already using Akka, so I thought I might as well use actors. The code below was inspired by the streams cookbook in the Akka docs.

  def updateActors(startDate: DateTime, endDate: DateTime): Future[UpdateResult] = {
  type Result = Int

  logger.info("actor update")
  val token = pesReader.login()
  val endOfPeriod = minDate(endDate, DateTime.now().minusDays(1))
  val days: Int = Days.daysBetween(startDate, endDate).getDays
  val data = Source(0 to days by 14)

  val worker = Flow[Int].map(i => {
    updateDataForInterval(startDate.plusDays(i), endOfPeriod, token)
  })

  def balancer[In, Out](worker: Flow[In, Out, Any], workerCount: Int): Flow[In, Out, NotUsed] = {
    import GraphDSL.Implicits._

    Flow.fromGraph(GraphDSL.create() { implicit b =>
      val balancer = b.add(Balance[In](workerCount, waitForAllDownstreams = true))
      val merge = b.add(Merge[Out](workerCount))

      for (_ <- 1 to workerCount) {
        balancer ~> worker.async ~> merge
      }

      FlowShape(balancer.in, merge.out)
    })
  }

  val processedJobs = data.via(balancer(worker, 3))
  val updateCounts = Await.result(processedJobs.runWith(Sink.seq), scala.concurrent.duration.Duration(50, TimeUnit.SECONDS))
  val updateResult = UpdateResult("200", updateCounts.sum)

  Future {
    updateResult
  }
}

There’s quite a bit more code, much of which involves setting up the balancer that configures the workers. It might be useful to move it somewhere else, so it can be reused and doesn’t clutter the logic too much.

The important parts are:

val data = Source(0 to days by 14)

val worker = Flow[Int].map(i => {
  updateDataForInterval(startDate.plusDays(i), endOfPeriod, token)
})

So we have a Source of work, integers in this case, that are handed to workers using the balancer:

//                           v-- distribute work
val processedJobs = data.via(balancer(worker, 3)) // <-- with 3 workers
//                  ^-- source        ^-- do the job

The code Awaits the result, which is a Seq of update counts, because each worker will return a count, we get a list of counts. So to return a total count of updated records, we need UpdateResult("200", updateCounts.sum). This is concise, but I can imagine it’s easy to overlook.

The output of this latest version will look like this:

[PESActorSystem-akka.actor.default-dispatcher-10] actor update
[PESActorSystem-akka.actor.default-dispatcher-16] reading data from 2019-01-01T00:00:00.000+01:00
[PESActorSystem-akka.actor.default-dispatcher-13] reading data from 2019-01-15T00:00:00.000+01:00
[PESActorSystem-akka.actor.default-dispatcher-17] reading data from 2019-01-29T00:00:00.000+01:00
[PESActorSystem-akka.actor.default-dispatcher-17]  found 360 records
[PESActorSystem-akka.actor.default-dispatcher-13]  found 360 records
[PESActorSystem-akka.actor.default-dispatcher-16]  found 360 records
[PESActorSystem-akka.actor.default-dispatcher-13] reading data from 2019-02-26T00:00:00.000+01:00
[PESActorSystem-akka.actor.default-dispatcher-16] reading data from 2019-02-12T00:00:00.000+01:00
[PESActorSystem-akka.actor.default-dispatcher-13]  found 96 records
[PESActorSystem-akka.actor.default-dispatcher-16]  found 360 records

Note there are three dispatchers processing batches: 13, 16, and 17 in this case. This is as expected because I configured the balancer like this:

val processedJobs: Source[Result, NotUsed] = data.via(balancer(worker, 3))

hence 3 workers.

I’m happy with the final solution, though for day-to-day usage, the python version is way easier on the hardware it can run on. Python for the real world and scala because why not.

References

Excellent introduction on streams by Aniefiok Akpan

Scala cookbook code that explains a pool of workers

Source code, see scala/src/main/scala/nl/vermeir/scala/controller/PESController.scala

Jan Vermeir
Developing software and infrastructure in teams, doing whatever it takes to get stable, safe and efficient systems in production.
Questions?

Get in touch with us to learn more about the subject and related solutions

Explore related posts