Pages

26 October 2011

Scala collections are awesome

This is just a small post to show how incredibly easy it is to use scala concurrency constructs to speed-up your work.

Doing the job

This question occured to me as a new enhancement was requested for specs2. In specs2, the processing of examples was very sequential:

  1. all the examples are executed concurrently
  2. then they are reported to the console

The concurrent execution of examples makes it generally fast enough that the reporting appears fast on the screen. However, if examples take a long time to execute, you might think that sbt is stuck with nothing whatsoever being reported back to you. You might even think that the compilation is still going on!

I changed that in specs2 and now the examples are reported as soon as they are executed. But I also thought: "this question must happen all the time, for all sorts of tasks". This is why I'm going to demonstrate how straightforward it is to speed-up your computations or your scripts with just a few annotations and imports.

Let's say I have some jobs to execute, each of them taking a random time:

  def job(i: Int) = {
    Thread.sleep(50 * random.nextInt(i))
    println("executed "+i)
    i
  } 

And I have a function to report the results:

  def report(i: Int) = println("  reported "+i) 

Naively

The naive approach for executing and reporting the jobs would be:

  def naive(n: Int) = (1 to n).map(job).foreach(report)

  scala>naive(3)
  executed 1
  executed 2
  executed 3
    reported 1
    reported 2
    reported 3

Unfortunately this is slow since all the jobs are executed in sequence, then reported.

Wrongly

If we want to make a better use of our laptop's cores, we can write:

  def parallel(n: Int) = (1 to n).par.map(job).foreach(report)

This is better, because now the jobs are executed in parallel, but not satisfying because the reporting now comes out of order:

  scala>parallel(3)
  executed 3
  executed 1
  executed 2
    reported 3
    reported 1
    reported 2

Fast and right

The trick to have best of both worlds is to use futures and seq:

  import actors._
  import Futures._

  def best(n: Int) = (1 to n).par.map(future(job)).seq.foreach(f => report(f()))

In the code above, we execute concurrently each job in a Future. Then we force the collection to be evaluated sequentially again with seq and we report each result with f(), using the Future.apply method to wait until the value is available:

  scala>best(3)
  executed 3
  executed 1
    reported 1
  executed 2
    reported 2
    reported 3

Conclusion

With a few minor modifications to the first code version we managed to do exactly what we wanted. Having this kind of power in the toolbox feels great, and I hope this post contributes a bit more to showing that Scala is also a practical language (because there's nothing wrong with being academic :-)).

Update

Before I get flamed down in the comments, here an interesting update :-).

As soon as I published this post, I started wondering if the best behavior had anything to do with parallel collections at all. It turns out that using futures only gives good results as well:

  def futures(n: Int) = (1 to n).map(future(job)).foreach(f => report(f()))

  scala>futures(3)
  executed 2
  executed 1
    reported 1
  executed 3
    reported 2
    reported 3

However, if the input collection is a view, the results are completely different:

  def futures(n: Int) = (1 to n).view.map(future(job)).foreach(f => report(f()))

  scala>futures(3)
  executed 1
    reported 1
  executed 2
    reported 2
  executed 3
    reported 3

Hence the use of par is really necessary, in this scenario, to get a fully parallel execution.

9 comments:

  1. Why use Future at all? You should achieve the same effect by simply mapping in parallel and printing in sequence.

    ReplyDelete
  2. What do you mean?

    If I write this:

    def parallel2(n: Int)= (1 to n).par.map(job).seq.foreach(report)

    I get:

    scala>parallel2(3)
    executed 1
    executed 2
    executed 3
    reported 1
    reported 2
    reported 3

    ReplyDelete
  3. I'm not really sure why that would be. It does imply though that the only reason your article example is running in parallel is because of Future. So, you could remove the .par and the .seq and the results would be the same.

    ReplyDelete
  4. I just added an update on that point. If your input collection is a view, you get surprising results.

    The alternative could then be to "force" the input collection and use Futures only.

    ReplyDelete
  5. This comment has been removed by the author.

    ReplyDelete
  6. job should return the argument for report to have something to work with.

    ReplyDelete
  7. what is wrong with?:
    def parallel2(n: Int)= (1 to n).par.map(job).seq.foreach(report)

    when i use it, the executed messages are out of order and the reported messages are in order.

    ReplyDelete
  8. > Ittay Dror said...
    job should return the argument for report to have something to work with.

    I fixed that, that's a mistake when I copied the code, thanks.

    ReplyDelete
  9. > what is wrong with?:
    def parallel2(n: Int)= (1 to n).par.map(job).seq.foreach(report)

    The problem with this version is that all the executions are done *then* all the reporting is done. What I want is to have things being reported as soon as executed. I couldn't find a way to do that with parallel collections only.

    ReplyDelete