Pages

09 December 2011

Pragmatic IO - part 3

A follow-up to my previous posts about IO.

Why types and laws matter

It's embarrassing to write post after post only to find out I keep being wrong :-). While the technique I described in my last post (using StreamT) works ok, I actually don't have to use it. The traverse technique explained in EIP works perfectly, provided that:

  • we write a proper Traverse instance using foldLeft (as explained in the first part of this post)

  • we trampoline the State to avoid Stack overflows due to a long chain of flatMaps during the traversal

  • we use the right types!

Get the types right

That's the point I want to insist on today. Last evening I read the revisited WordCount example in Scalaz-seven which is like the canonical example of using the EIP ideas. Two words in the comments struck my mind: "compose" and "fuse". Indeed, one very important thing about EIP is the ability to compose Applicatives so that their actions should "fuse" during a traversal. As if they were executed in the same "for" loop!

So, when I wrote in my previous post that I needed to traverse the stream of lines several times to get the results, something had to be wrong somewhere. The types were wrong!

  1. instead of traversing with State[S, *] where * =:= IO[B], I should traverse with State[S, IO[*]]
  2. what I get back is a State[S, IO[Seq[B]]], instead of a State[S, Seq[IO[B]]
  3. this matters because passing in an initial state then returns IO[Seq[B]] instead of Seq[IO[B]]

Exactly what I want, without having to sequence anything.

Use the laws

Not only I get what I want, but also it is conceptually right as the result of an important traverse law:

  traverse(f compose g) == traverse(f) compose traverse(g)

That "fusion" law guarantees that composing 2 effects can be fused, and executed in only one traversal. It's worth instantiating f and g to make that more concrete:

  // what to do with the line and line number
  def importLine(i: Int, l: String): (Int, IO[Unit]) =
    (i, storeLine(l) >>= println("imported line "+i))

  // `g` keeps track of the line number
  val g : String => State[Int, String] =
    (s: String) => state((i: Int) => (i+1, s))

  // `f` takes the current line/line number and do the import/reporting
  val f : State[Int, String] => State[Int, IO[Unit]] =
    st => state((i: Int) => importLine(st(i)))

  // `f compose g` fuses both actions
  val f_g: String => State[Int, IO[Unit]] = f compose g

I'm really glad that I was able to converge on established principles. Why couldn't I see this earlier?

Scala and Scalaz-seven might help

In retrospect, I remember what led me astray. When I realized that I had to use a State transformer with a Trampoline, I just got scared by the Applicative instance I had to provide. Its type is:

  /**
   * Here we want M1 to `Trampoline` and M2 to be `IO`
   */
  implicit def StateTMApplicative[M1[_]: Monad, S, M2[_] : Applicative] =
    Applicative.applicative[({type l[a] = StateT[M1, S, M2[a]]})#l]

I also need some Pure and Apply instances in scope:

  implicit def StateTMApply[M1[_]: Monad, S, M2[_] : Applicative] =
    new Apply[({type l[a]=StateT[M1, S, M2[a]]})#l] {
      def apply[A, B](f: StateT[M1, S, M2[A => B]], a: StateT[M1, S, M2[A]]) =
        f.flatMap(ff => a.map(aa => aa <*> ff))
    }

  implicit def StateTMPure[M1[_] : Pure, S, M2[_] : Pure] =
    new Pure[({type l[a]=StateT[M1, S, M2[a]]})#l] {
      def pure[A](a: => A) = stateT((s: S) => (s, a.pure[M2]).pure[M1])
    }

Once it's written, it may not seem so hard but I got very confused trying to get there. How can it be made easier? First, we could have better type annotations for partial type application, like:

  // notice the * instead of the "type l" trick
  implicit def StateTMApplicative[M1[_]: Monad, S, M2[_] : Applicative] =
    Applicative.applicative[StateT[M1, S, M2[*]]]

  implicit def StateTMApply[M1[_]: Monad, S, M2[_] : Applicative] =
    new Apply[StateT[M1, S, M2[*]]] {
      def apply[A, B](f: StateT[M1, S, M2[A => B]], a: StateT[M1, S, M2[A]]) =
        f.flatMap(ff => a.map(aa => aa <*> ff))
    }

  implicit def StateTMPure[M1[_] : Pure, S, M2[_] : Pure] =
    new Pure[StateT[M1, S, M2[*]]] {
      def pure[A](a: => A) = stateT((s: S) => (s, a.pure[M2]).pure[M1])
    }

And with a better type inference, the first definition could be even be (we can always dream :-)):

  implicit def StateTMApplicative[M1[_]: Monad, S, M2[_] : Applicative]:
    Applicative[StateT[M1, S, M2[*]]] = Applicative.applicative

Which means that it may even be removed, just import Applicative.applicative!

Actually Scalaz-seven might help by:

  • providing those instances out-of-the box

  • even better, provide combinators to create those instances easily. That's what the compose method does

  • give even better type inference. Look at the traverseU method here, no type annotations Ma!

Now that the fundamentals are working ok for my application, I can go back to adding features, yay!

08 December 2011

Pragmatic IO - part 2

A follow-up to my previous post about IO.

Learning by being wrong, very wrong

I didn't think that diving into the functional IO world would have me think so hard about how to use some FP concepts.

Mind the law

The technique I described at the end of my previous post didn't really work. It was wrong on many accounts. First of all, my
Traverse instance was buggy because I was reversing the traversed Stream. This is why I had inverted messages in the console. During the traversal, starting from the left, I needed to append each traversed element to the result, not prepend it:

  /**
   * WRONG: because each new element `b` must be appended to the result.
   * it should be bs :+ b instead
   */
  def SeqLeftTraverse: Traverse[Seq] = new Traverse[Seq] {
    def traverse[F[_]: Applicative, A, B](f: A => F[B], as: Seq[A]): F[Seq[B]] =
      as.foldl[F[Seq[B]]](Seq[B]().pure) { (ys, x) =>
        implicitly[Apply[F]].apply(f(x) map ((b: B) => (bs: Seq[B]) => b +: bs), ys)
      }
  }

It is important to mention here that proper testing should have caught this. There are some laws that applicative traversals must satisfy (see EIP, section 5. One of them is that seq.traverse(identity) == seq. This is obviously not the case when I returned a reverted sequence.

Not at the right time

Then I realized something also which was also worrying. On big files, my application would do a lot, then, report its activity to the console. This would definitely not have happened in a simple for loop with unrestricted effects!

So I set out to find a better implementation for my file reading / records saving. The solution I'm going to present here is just one way of doing it, given my way of constraining the problem. There are others, specifically the use of Iteratees.

My goal

This is what I want to do: I want to read lines from a file, transform them into "records" and store them in a database. Along the way, I want to inform the user of the progress of the task, based on the number of lines already imported.

More precisely, I want 2 classes with the following interfaces:

  • the Source class is capable of reading lines and do something on each line, possibly based on some state, like the number of lines already read.
    The Source class should also be responsible for closing all the opened resources whether things go wrong or not

  • the Extractor class should provide a function declaring what to do with the current line and state. Ideally it should be possible to create that function by composing independent actions: storing records in the db and / or printing messages on the console

  • and, by the way, I don't want this to go out-of-memory or stack overflow at run-time just because I'm importing a big file :-)

Mind you, it took me quite a while to arrive there. In retrospect, I could have noticed that:

  • all the processing has to take place inside the Source.readLines method. Otherwise there's no way to close the resources properly (well unless you're using Iteratees, what Paul Chiusano refers to as "Consumer" processing)

  • the signature of the function passed to Source.readLines has to be String => State[IO[A]], meaning that for each line, we're doing an IO action (like saving it to the database) but based on a State

  • the end result of readLines has to be IO[Seq[A]] which is an action returning the sequence of all the return values when all the IO actions have been performed

Considering all that, I ended up with the following signatures:

  /**
   * @param  filePath the file to read from
   * @f      a function doing an IO action for each line, based on a State
   * @init   the initial value of the State
   * @return an IO action reading all the lines and returning the sequence of computed values
   */
   def readLinesIO[S, A](filePath: String)(f: String => State[S, IO[A]])(init: S): IO[Seq[A]]

and the Extractor code goes:

  // the initial state
  val counter = new LogarithmicCounter(level = 100, scale = 10)

  // a function to notify the user when we've reached a given level
  val onLevel            = (i: Int) => printTime("imported from "+filePath+": "+i+" lines")
  // a function to create and store each line
  def onCount(l: String) = (i: Int) => createAndStoreRecord(Line(file, l), creator)

  // for each line, apply the actions described as a State[LogarithmicCounter, IO[A]] object
  readLinesIO(file.getPath)(l => counter.asState(onLevel, onCount(l)))(counter)

I like that interface because it keeps things separated and generic enough:

  • reading the lines and opening/closing the resources goes to one class: Source
  • defining how the state evolves when an action is done goes to the LogarithmicCounter class
  • defining what to do exactly with each line goes in one class, the Extractor class

Now, the tricky implementation question is: how do you implement readLinesIO?

What not to do

Definitely the wrong way to do it is to use a "regular" traverse on the stream of input lines. Even with the trampolining trick described in my previous post.

Indeed traverse is going to fold everything once to go from Stream[String] to State[S, Seq[IO[B]]] then, with the initial State value, to Seq[IO[B]] which then has to be sequenced into IO[Seq[B]]. This is guaranteed to do more work than necessary. More than what a single for loop would do.

The other thing not to do is so silly that I'm ashamed to write it here. Ok, I'll write it. At least to avoid someone else the same idea. I had an implicit conversion from A to IO[A],...

That's a very tempting thing to do and very convenient at first glance. Whenever a function was expecting an IO[A], I could just pass a without having to write a.pure[IO]. The trouble is, some actions were never executed! I don't have a specific example to show, but I'm pretty sure that, at some point, I had values of type IO[IO[A]]. In that case, doing unsafePerformIO, i.e. "executing" the IO action only returns another action to execute and doesn't do anything really!

Learn your FP classes

I must admit I've been reluctant to go into what was advised on the Scalaz mailing-list: "go with StreamT", "use Iteratees". Really? I just want to do IO! Can I learn all that stuff later? I finally decided to go with the first option and describe it here in detail.

StreamT is a "Stream transformer". It is the Scala version of the ListT done right in Haskell. That looks a bit scary at first but it is not so hard. It is an infinite list of elements which are not values but computations. So it more or less has the type Seq[M[A]] where M is some kind of effect, like changing a State or doing IO.

My first practical question was: "how do I even build that thing from a Stream of elements?"

Unfolding a data structure

Unfolding is the way. In the Scala world we are used to "folding" data structures into elements: get the sum of the elements of a list, get the maximum element in a tree. We less often do the opposite: generate a data structure from one element and a function. One example of this is generating the list of Fibonacci numbers out of an initial pair (0, 1) and a function computing the next "Fibonacci step".

With the StreamT class in Scalaz, you do this:

  /** our record-creation function */
  def f[B]: String => M[B]

  /**
   * unfolding the initial Stream to a StreamT where we apply
   * a `State` function `f` to each element
   */
  StreamT.unfoldM[M, B, Seq[A]](seq: Seq[A]) { (ss: Seq[A]) =>
    if (ss.isEmpty)  (None: Option[(B, Seq[A])]).pure[M]
    else              f(ss.head).map((b: B) => Some((b, ss.tail)))
  }

The code above takes an initial sequence seq (the Stream of lines to read) and:

  • if there are no more elements, you return None.pure[M], there's nothing left to do. In my specific use case that'll be State[LogarithmicCounter, None]

  • if there is an element in the Stream, f is applied to that element, that is, we create a record from the line and store it in the database (that's the b:B parameter, which is going to be an IO action)

  • because M is the State monad, when we apply f, this also computes the next state to keep track of how many rows we've imported so far

  • then the rest of the stream of lines to process, ss.tail, is also returned and unfoldM will use that to compute the next "unfolding" step

One very important thing to notice here is that we haven't consumed any element at that stage. We've only declared what we plan to do with each element of the input stream.

Running the StreamT

In the StreamT object in Scalaz there is this method call runStreamT taking in:

  • a StreamT[State[S, *], A], so that's a StreamT where the state may change with each element of the Stream
  • an initial state s0
  • and returning a StreamT[Id, A]

Note: State[S, *] is a non-existing notation for ({type l[a]=State[S, a]})#l

That doesn't see to be game-changing, but this does a useful thing for us. runStreamT "threads" the State through all elements, starting with the initial value. In our case that means that we're going to create IO actions, where each created action depends on the current state (the number of lines read so far).

The cool thing is that so far we've described transformations to apply: Stream[String] to StreamT[State, IO[B]] to StreamT[Id, IO[B]] but we still haven't executed anything!

UnsafePerformIO at last

Then, I coded an unsafePerformIO method specialized for Stream[Id, IO[A]] to execute all the IO actions. The method itself is not difficult to write but we need to make sure it's tail-recursive:

  /** execute all the IO actions on a StreamT when it only contains IO actions */
  def unsafePerformIO: Seq[A] = toSeq(streamT, Seq[A]())

  /**
   * execute a StreamT containing `IO` actions.
   *
   * Each IO action is executed and the result goes into a list of results
   */
  @tailrec
  private def toSeq[A](streamT: StreamT[Id, IO[A]], result: Seq[A]): Seq[A] =
    streamT.uncons match {
      case Some((head, tail)) => toSeq(tail, result :+ head.unsafePerformIO)
      case None               => result
    }
Hidding everything under the rug

Let's assemble the pieces of the puzzle now. With an implicit conversion, it is possible to transform any Seq to a StreamT performing some action on its elements:

   /**
    * for this to work, M[_] has to be a "PointedFunctor", i.e. have "pure" and
    * "fmap" methods. It is implemented using the `unfoldM` method code seen above
    */
   seq.toStreamT(f: A => M[B]): StreamT[M, B]

Then, we can traverse a whole sequence with a composition of a State and IO actions:

   seq.traverseStateIO(f)

Where traverseStateIO is using the toStreamT transformation and is defined as:

  /**
   * traverse a stream with a State and IO actions by using a Stream transformer
   */
  def traverseStateIO[S, B](f: A => State[S, IO[B]])(init: S): Seq[B] =
    StreamT.runStreamT(seq.toStreamT[State[S, *], IO[B]](f), init).unsafePerformIO

[you'll get bonus points for anyone providing a Traverse[Stream] instance based on StreamT - if that exists]

Finally the Source.readLines method is implemented as:

  /**
   * this method reads the lines of a file and apply stateful actions.
   * @return an IO action doing all the reading and return a value for each processed line
   */
  def readLinesIO[S, A](path: String)(f: String => State[S, IO[A]])(init: S): IO[Seq[A]] =
    readLines(path)(f)(init).pure[IO]

  private
  def readLines[S, A](path: String)(f: String => State[S, IO[A]])(init: S): Seq[A] = {
    // store the opened resource
    var source: Option[scala.io.Source] = None
    def getSourceLines(src: scala.io.Source) = { source = Some(src); getLines(src) }

    try {
      // read the lines and execute the actions
      getSourceLines(fromFile(path)).toSeq.traverseStateIO(f)(init)
    } finally { sources.map(_.close()) }
  }

And the client code, the Extractor class does:

  val counter = new LogarithmicCounter(level = 100, scale = 10)

  // notify the user when we've reached a given level
  val onLevel            = (i: Int) => printTime("imported from "+filePath+": "+i+" lines")
  // create and store a record for each new line
  def onCount(l: String) = (i: Int) => createAndStoreRecord(Line(file, l), creator)

  // an `IO` action doing the extraction
  readLinesIO(file.getPath)(l => counter.asState(onLevel, onCount(l)))(counter)

Conclusion

I've had more than one WTF moment when trying to mix-in State, IO and resources management together. I've been very tempted to go back to vars and unrestricted effects :-).

Yet, I got to learn useful abstractions like StreamT and I've seen that it was indeed possible to define generic, composable and functional interfaces between components. I also got the impression that there lots of different situations where we are manually passing state around which is error-prone and which can be done generically by traverse-like methods.

05 December 2011

Pragmatic IO

This post is my exploration of the use of the IO type in a small Scala application.

A short IO introduction

Scala is a pragmatic language when you start learning Functional Programming (FP). Indeed there are some times when you can't apply the proper FP techniques just because you don't know them yet. In those cases you can still resort to variables and side-effects to your heart's content. One of these situations is IO (input/output).

Let's take an example: you need to save data in a database (not a rare thing :-)). The signature of your method will certainly look like that:

  /** @return the unique database id of the saved user */
  def save(user: User): Int

This method is not referentially transparent since it changes the state of the "outside world": if you call it twice, you'll get a different result. This kind of thing is very troubling for a Functional Programmer, that doesn't make him sleep well at night. And anyway, in a language like Haskell, where each function is pure you can't implement it. So how do you do?

The neat trick is to return an action saying "I'm going to save the user" instead of actually saving it:

  def save(user: User): IO[Int]

At first, it seems difficult to do anything from there because we just have an IO[Int], not a real Int. If we want to display the result on the screen, what do we do?

We use IO as a Monad, with the flatMap operation, to sequence the 2 actions, into a new one:

  /** create an IO action to print a line on the screen */
  def printLine(line: Any): IO[Unit] = println(line).pure[IO]

  /** save a user and print the new id on the screen */
                                           // or save(user) >>= printLine
  def saveAndPrint(user: User): IO[Unit] = save(user).flatMap(id => printLine(id))

Then finally when we want to execute the actions for real, we call unsafePerformIO in the main method:

  def main(args: Array[String]) {
    saveAndPrint(User("Eric")).unsafePerformIO
  }

That's more or less the Haskell way to do IO in Scala but nothing forces us to do so. The natural question which arises is: what should we do?

IO or not IO

Before starting this experiment, I fully re-read this thread on the Scala mailing-list, and the answer is not clear for many people apparently. Martin Odersky doesn't seemed convinced that this is the way to go. He thinks that there should be a more lightweight and polymorphic way to get effect checking and Bob Harper doesn't see the point.

Surely the best way to form my own judgement was to try it out by myself (as encouraged by Runar). One thing I know for sure is that, the moment I started printing out files in specs2, I got an uneasy feeling that something could/would go wrong. Since then,
IO has been on my radar of things to try out.

Luckily, these days, I'm developing an application which is just the perfect candidate for that. This application:

  • reads log files
  • stores the records in a MongoDB database
  • creates graphs to analyze the data (maximums, average, distribution of some variables)

Perfect but,... where do I start :-)?

Theory => Practice

That was indeed my first question. Even in a small Swing application like mine the effects were interleaved in many places:

  • when I opened the MongoDB connection, I was printing some messages on the application console to inform the user about the db name, the MongoDB version,...
  • the class responsible for creating the reports was fetching its own data, effectively doing IO
  • datastore queries are cached, and getting some records (an IO action) required to get other data (the date of the latest import, another IO action) to decide if we could reuse the cached ones instead

The other obvious question was: "where do I call unsafePerformIO?". This in an interactive application, I can't put just one
unsafePerformIO in the main method!

The last question was: "OMG, I started to put IO in a few places, now it's eating all of my application, what can I do?" :-))

Segregating the effects

Here's what I ended up doing. First of all let's draw a diagram of the main components of my application before refactoring:

   +-------+    extract/getReport       +------ IO +   store      +------ IO +
   + Gui   + <------------------------> + Process  + -----------> + Store    +
   +-------+     Query/LogReport        +----------+              +----------+
                                             | getReport               ^
                                             v                         |
                                        +------ IO +     getRecords    |
                                        + Reports  + ------------------+
                                        +----------+

Simple app, I told you.

The first issue was that the Reports class (actually a bit more than just one class) was fetching records and building reports at the same time. So if I decided to put IO types on the store I would drag them in any subsequent transformation (marked as IO on the diagram).

The next issue was that the Process class was also doing too much: reading files and storing records.

Those 2 things led me to this refactoring and "architectural" decision, IO annotations would only happen on the middle layer:

                                        +------- IO +            store
                               read     + Extractor + -----------------------------+
                               files    +-----------+          |                   |
                                             ^                 |                   |
                                             | extract         v                   v
   +-------+    extract/getReport       +----------+      +------ IO +        +----------+
   + Gui   + <------------------------> + Process  +      + Status   +        + Store    +
   +-------+     Query/LogReport        +----------+      +----------+        +----------+
                                             | getReport       ^                   ^
                                             v                 |                   |
                                        +------ IO +           |    getRecords     |
                                        + Reporter + ------------------------------+
                                        +----------+
                                             | createReport
                                             v
                                        +----------+
                                        + Reports  +
                                        +----------+
  • all the IO actions are being segregrated to special classes. For example extracting lines for log files creates a big composite IO action to read the lines, to store the lines as a record and to report the status of the extraction. This is handled by the Extractor class

  • the Status class handles all the printing on the the console. It provides methods like:

      /**
       * print a message with a timestamp, execute an action,
       * and print an end message with a timestamp also.
       *
       * This method used by the `Extractor`, `Reporter` and `Process` classes.
       */
       def statusIO[T](before: String, action: IO[T], after: String): IO[T]`
    
  • the Reports class only deals with the aggregation of data from records coming from the Store. However it is completely ignorant of this fact, so testing becomes easier (true story, that really helped!)

  • the Store does not have any IO type. In that sense my approach is not very pure, since nothing prevents "someone" from directly calling the store from the middle layer without encapsulating the call in an IO action. I might argue that this is pragmatic. Instead of sprinkling IO everywhere I started by defining a layer isolating the IO world (the data store, the file system) from the non-IO world (the reports, the GUI. Then, if I want, I can extend the IO marking the rest of the application if I want to. That being said I didn't do it because I didn't really see the added-value at that stage. A name for this approach could be "gradual effecting" (the equivalent of "gradual typing")

  • the Process class is not marked as IO because all the methods provided by that class are actually calling
    unsafePerformIO to really execute the actions. This is the only place in the application where this kind of call occurs and the GUI layer could be left unchanged

All of that was not too hard, but was not a walk in the park either. Some details of the implementation were hard to come up with.

Under the bonnet: syntactic tricks

First of all, what was easy?

"Monadic" sequencing

Sequencing IO actions is indeed easy. I've used the for comprehension:

  def statusIO[T](before: String, action: IO[T], after: String): IO[T] = {
    for {
      _      <- printTime(before)
      result <- action
      _      <- printTime(after)
    } yield result
  }
"Semi-column" sequencing

But also the >>=| operator in Scalaz to just sequence 2 actions when you don't care about the first one:

  def statusIO[T](before: String)(action: IO[T]): IO[T] = printTime(before) >>=| action
"Conditional" sequencing

And, for the fun of it, I implemented a "conditional flatMap operator", >>=?:

  action1 >>=? (condition, action2)

In the code above the action1 is executed and, if the condition is true, we execute action2 and forget about the result of action1, otherwise we keep the result of action1.

"if else" with a twist

Not completely related to IO I also liked the ?? operator. Say I want to execute an action only if a file exists:

   if (file.exists) createRecords(file)
   else             ()

The else line really feels ugly. This kind of "default value" should be inferred from somewhere. Indeed! This "default value" is the Zero of a given type in Scalaz, so it is possible to write:

   file.exists ?? createRecords(file)

It just works (tm) but it's worth knowing where that zero values really comes from:

  1. createRecords(file) returns IO[Int] (the last created id - that's a simplification of the real program)

  2. there is a way to create a Monoid from another Monoid + an Applicative:

    // from scalaz.Monoid
    /** A monoid for sequencing Applicative effects. */
    def liftMonoid[F[_], M](implicit m: Monoid[M], a: Applicative[F]): Monoid[F[M]] =
      new Monoid[F[M]] {
        val zero: F[M] = a.pure(m.zero)
        def append(x: F[M], y: => F[M]): F[M] =
          a.liftA2(x, y, (m1: M, m2: M) => m.append(m1, m2))
      }
    
  3. in this case IO has an Applicative, M is Int so it has a Monoid hence IO[Int] defines a Monoid where the zero is IO(0). I had to open up the debugger to check that precisely :-)

Under the bonnet: it just works,... not

The next piece of implementation I was really happy with was the "logarithmic reporting". This is a feature I implemented using vars at first, which I wanted to make pure in my quest for IO.

What I want is to extract log lines and notify the user when a bunch of lines have been imported (so that he doesn't get too bored). But I don't know how many lines there are in a given file. It could be 100 but it could be 30.000 or 100.000. So I thought that a "logarithmic" counter would be nice. With that counter, I notify the user every 100, 1000, 10.000, 100.000 lines.

The LogarithmicCounter works by creating a State object encapsulating 2 actions to do, one on each tick, one when a
level is reached:

    // create and store a line on each `tick`
    def onCount(line: String) = (i: Int) => createAndStoreRecord(line, creator)

    // and notify the user when we've reached a given level
    val onLevel = (i: Int) => printTime("imported from "+filePath+": "+i+" lines")

    /** @return a State[LogarithmicCounter, IO[Unit]] */
    val readLine = (line: String) => counter.asState(onLevel, onCount(line))

The readLine method is used in a traversal of the lines returned by a FileReader:

    (lines traverseState readLine): State[LogarithmicCounter, Stream[IO[Unit]]]

Pass it an initial counter and you get back a Stream of IO actions which you can then sequence to get an IO[Stream[Unit]]:

    // initial traversal with a function returning a `State`
    val traversed: State[LogarithmicCounter, Stream[IO[Unit]]] = lines traverseState readLine

    // feed in the initial values: count = 1, level = 100 to get the end `State`
    val traversedEndState: Stream[IO[Unit]] = traversed ! new LogarithmicCounter

    // finally get a global action which will execute a stream of
    // record-storing actions and printing actions
    val toStore: IO[Stream[Unit]] = traversedEndState.sequence

Some readers of this blog may recognize one usage of the Essence of the Iterator Pattern - EIP and that's indeed what it is (my first real use case, yes!). traverseState is just a way to use the more traditional traverse method but hiding the ugly type annotations.

This kind of type-directed development is nice. You add some type here and there and you let the compiler guide you to which method to apply in order to get the desired results:

  1. after the traversal I get back a State
  2. if I want to get the final state, the Stream of IO, I need to feed in some initial values, that's the '!' method
  3. if I want to get an action equivalent to the stream of actions, I need the sequence method which has exactly the type signature doing what I want

I was about to call it a day when I actually tried my application,... StackOverflow! What?!?

Trampolining to the rescue

It turns out that traversing a "big" sequence with a State is not so easy. First of all, State is an Applicative because it is also a Monad (please read the earlier EIP post for the details). So basically this amounts to chaining a lot of flatMap operations which blows up the stack.

Fortunately for me, Runar has implemented a generic solution for this kind of issue, like, less than 2 months ago! I leave you to his excellent post for a detailed explanation but the gist of it is to use continuations to describe computations and store them on the heap instead of letting calls happen on the stack. So instead of using State[S, A] I use StateT[Trampoline, S, A] where each computation (S, A) returned by the State monad is actually encapsulated in a Trampoline to be executed on the heap.

The application of this idea was not too easy at first and Runar helped me with a code snippet (thanks!). Eventually I managed to keep everything well hidden behind the traverseState function. The first thing I did was to "trampoline" the function passed to traverseState:

  /**
   * transform a function into its "trampolined" version
   * val f:             T => State[S, A]              = (t: T) => State[S, A]
   * val f_trampolined: T => StateT[Trampoline, S, A] = f.trampolined
   */
  implicit def liftToTrampoline[T, S, A](f: T => State[S, A]) = new LiftToTrampoline(f)

  class LiftToTrampoline[T, S, A](f: T => State[S, A]) {
    def trampolined = (t: T) => stateT((s: S) => suspend(st.apply(s)))
  }

So the traverseState function definition becomes:

  // with the full ugly type annotation
  def traverseState(f: T => State[S, B]) =
    seq.traverse[({type l[a]=StateT[Trampoline, S, a]})#l, B](f.trampolined)

However I can't leave things that like that because traverseState then returns a StateT[Trampoline, S, B] when the client of the function expects a State[S, B]. So I added an untrampolined method to recover a State from a "trampolined" one:

  /** @return a normal State from a "trampolined" one */
  implicit def fromTrampoline[S, A](st: StateT[Trampoline, S, A]) = new FromTrampoline(st)

  class FromTrampoline[S, A](st: StateT[Trampoline, S, A]) {
    def untrampolined: State[S, A] = state((s: S) => st(s).run)
  }

The end result is not so bad. The "trampoline" trick is hidden as an implementation detail and I don't get StackOverflows anymore. Really? Not really,...

The subtleties of foldRight and foldLeft

I was still getting a StackOverflow error but not in the same place as before (#&$^@!!!). It was in the traversal function itself, not in the chaining of flatMaps. The reason for that one was that the Traverse instance for a Stream in Scalaz is using foldRight (or foldr):

  implicit def StreamTraverse: Traverse[Stream] = new Traverse[Stream] {
    def traverse[F[_]: Applicative, A, B](f: A => F[B], as: Stream[A]): F[Stream[B]] =
      as.foldr[F[Stream[B]]](Stream.empty.pure) { (x, ys) =>
        implicitly[Apply[F]].apply(f(x) map ((a: B) => (b: Stream[B]) => a #:: b), ys)
      }
  }

and foldr is recursively intensive. It basically says: foldRight(n) = f(n, foldRight(n-1)) whereas foldl is implemented with a for loop and a variable to accumulate the result.

The workaround for this situation is simple: just provide a Traverse instance using foldLeft. But then you can wonder: "why is traverse even using foldRight in the first place?". The answer is in my next bug! After doing the modifications above I didn't get a SOE anymore but the output in the console was like:

  imported from test.log: 10000 lines [12:33:30]
  imported from test.log: 1000 lines  [12:33:30]
  imported from test.log: 100 lines   [12:33:30]

Cool, I have invented a Time Machine, Marty! That one left me puzzled for a while but I found the solution if not the explanation. The "left-folding" Traverse instance I had left in scope was being used by the sequence method to transform a Stream[IO] into an IO[Stream]. Changing that to the standard "right-folding" behaviour for a Stream traversal was ok. So there is a difference (meaning that something is not associative somewhere,...)

Conclusion

The main conclusion from this experiment is that tagging methods with IO made me really think about where are the effects of my application. It also encouraged functional programming techniques such as traverse, sequence and al.

I must however say that I was surprised on more than one account:

  • I stumbled upon a whole new class of bugs: non-execution. My effects were not executed improperly, they were not executed at all because I forgot to call unsafePerformIO!

  • I was not expecting to be needing an optimisation which had just been added to Scalaz, for something which I thought was a casual traversal

  • there are still some FP mysteries to me. For example I don't know yet how to traverse a full Stream

  • I also don't get why my messages were inverted on the console. I tried different experiments, with a Seq instead of a Stream, with Identity or Option as an Applicative instead of IO and I could only reproduce this specific behavior with Stream and IO

Anyway, I would say that it was overall worth using the IO type, at least for the architectural clarification and the better testing. It also brought back the warm, fuzzy feeling that things are under control. On the other hand refactoring to IO took me longer than expected and required more knowledge than just using vars and regular I/O. But that should really be accounted for in the 'investment for the future' column.

PS

There are certainly many stones left unturned in that post for programmers new to Functional Programming and Scalaz. I do apologize for that (this post is certainly long enough :-)) and I encourage those readers to ask questions on the Scalaz mailing-list.