08 December 2011

Pragmatic IO - part 2

A follow-up to my previous post about IO.

Learning by being wrong, very wrong

I didn't think that diving into the functional IO world would have me think so hard about how to use some FP concepts.

Mind the law

The technique I described at the end of my previous post didn't really work. It was wrong on many accounts. First of all, my
Traverse instance was buggy because I was reversing the traversed Stream. This is why I had inverted messages in the console. During the traversal, starting from the left, I needed to append each traversed element to the result, not prepend it:

   * WRONG: because each new element `b` must be appended to the result.
   * it should be bs :+ b instead
  def SeqLeftTraverse: Traverse[Seq] = new Traverse[Seq] {
    def traverse[F[_]: Applicative, A, B](f: A => F[B], as: Seq[A]): F[Seq[B]] =
      as.foldl[F[Seq[B]]](Seq[B]().pure) { (ys, x) =>
        implicitly[Apply[F]].apply(f(x) map ((b: B) => (bs: Seq[B]) => b +: bs), ys)

It is important to mention here that proper testing should have caught this. There are some laws that applicative traversals must satisfy (see EIP, section 5. One of them is that seq.traverse(identity) == seq. This is obviously not the case when I returned a reverted sequence.

Not at the right time

Then I realized something also which was also worrying. On big files, my application would do a lot, then, report its activity to the console. This would definitely not have happened in a simple for loop with unrestricted effects!

So I set out to find a better implementation for my file reading / records saving. The solution I'm going to present here is just one way of doing it, given my way of constraining the problem. There are others, specifically the use of Iteratees.

My goal

This is what I want to do: I want to read lines from a file, transform them into "records" and store them in a database. Along the way, I want to inform the user of the progress of the task, based on the number of lines already imported.

More precisely, I want 2 classes with the following interfaces:

  • the Source class is capable of reading lines and do something on each line, possibly based on some state, like the number of lines already read.
    The Source class should also be responsible for closing all the opened resources whether things go wrong or not

  • the Extractor class should provide a function declaring what to do with the current line and state. Ideally it should be possible to create that function by composing independent actions: storing records in the db and / or printing messages on the console

  • and, by the way, I don't want this to go out-of-memory or stack overflow at run-time just because I'm importing a big file :-)

Mind you, it took me quite a while to arrive there. In retrospect, I could have noticed that:

  • all the processing has to take place inside the Source.readLines method. Otherwise there's no way to close the resources properly (well unless you're using Iteratees, what Paul Chiusano refers to as "Consumer" processing)

  • the signature of the function passed to Source.readLines has to be String => State[IO[A]], meaning that for each line, we're doing an IO action (like saving it to the database) but based on a State

  • the end result of readLines has to be IO[Seq[A]] which is an action returning the sequence of all the return values when all the IO actions have been performed

Considering all that, I ended up with the following signatures:

   * @param  filePath the file to read from
   * @f      a function doing an IO action for each line, based on a State
   * @init   the initial value of the State
   * @return an IO action reading all the lines and returning the sequence of computed values
   def readLinesIO[S, A](filePath: String)(f: String => State[S, IO[A]])(init: S): IO[Seq[A]]

and the Extractor code goes:

  // the initial state
  val counter = new LogarithmicCounter(level = 100, scale = 10)

  // a function to notify the user when we've reached a given level
  val onLevel            = (i: Int) => printTime("imported from "+filePath+": "+i+" lines")
  // a function to create and store each line
  def onCount(l: String) = (i: Int) => createAndStoreRecord(Line(file, l), creator)

  // for each line, apply the actions described as a State[LogarithmicCounter, IO[A]] object
  readLinesIO(file.getPath)(l => counter.asState(onLevel, onCount(l)))(counter)

I like that interface because it keeps things separated and generic enough:

  • reading the lines and opening/closing the resources goes to one class: Source
  • defining how the state evolves when an action is done goes to the LogarithmicCounter class
  • defining what to do exactly with each line goes in one class, the Extractor class

Now, the tricky implementation question is: how do you implement readLinesIO?

What not to do

Definitely the wrong way to do it is to use a "regular" traverse on the stream of input lines. Even with the trampolining trick described in my previous post.

Indeed traverse is going to fold everything once to go from Stream[String] to State[S, Seq[IO[B]]] then, with the initial State value, to Seq[IO[B]] which then has to be sequenced into IO[Seq[B]]. This is guaranteed to do more work than necessary. More than what a single for loop would do.

The other thing not to do is so silly that I'm ashamed to write it here. Ok, I'll write it. At least to avoid someone else the same idea. I had an implicit conversion from A to IO[A],...

That's a very tempting thing to do and very convenient at first glance. Whenever a function was expecting an IO[A], I could just pass a without having to write a.pure[IO]. The trouble is, some actions were never executed! I don't have a specific example to show, but I'm pretty sure that, at some point, I had values of type IO[IO[A]]. In that case, doing unsafePerformIO, i.e. "executing" the IO action only returns another action to execute and doesn't do anything really!

Learn your FP classes

I must admit I've been reluctant to go into what was advised on the Scalaz mailing-list: "go with StreamT", "use Iteratees". Really? I just want to do IO! Can I learn all that stuff later? I finally decided to go with the first option and describe it here in detail.

StreamT is a "Stream transformer". It is the Scala version of the ListT done right in Haskell. That looks a bit scary at first but it is not so hard. It is an infinite list of elements which are not values but computations. So it more or less has the type Seq[M[A]] where M is some kind of effect, like changing a State or doing IO.

My first practical question was: "how do I even build that thing from a Stream of elements?"

Unfolding a data structure

Unfolding is the way. In the Scala world we are used to "folding" data structures into elements: get the sum of the elements of a list, get the maximum element in a tree. We less often do the opposite: generate a data structure from one element and a function. One example of this is generating the list of Fibonacci numbers out of an initial pair (0, 1) and a function computing the next "Fibonacci step".

With the StreamT class in Scalaz, you do this:

  /** our record-creation function */
  def f[B]: String => M[B]

   * unfolding the initial Stream to a StreamT where we apply
   * a `State` function `f` to each element
  StreamT.unfoldM[M, B, Seq[A]](seq: Seq[A]) { (ss: Seq[A]) =>
    if (ss.isEmpty)  (None: Option[(B, Seq[A])]).pure[M]
    else              f(ss.head).map((b: B) => Some((b, ss.tail)))

The code above takes an initial sequence seq (the Stream of lines to read) and:

  • if there are no more elements, you return None.pure[M], there's nothing left to do. In my specific use case that'll be State[LogarithmicCounter, None]

  • if there is an element in the Stream, f is applied to that element, that is, we create a record from the line and store it in the database (that's the b:B parameter, which is going to be an IO action)

  • because M is the State monad, when we apply f, this also computes the next state to keep track of how many rows we've imported so far

  • then the rest of the stream of lines to process, ss.tail, is also returned and unfoldM will use that to compute the next "unfolding" step

One very important thing to notice here is that we haven't consumed any element at that stage. We've only declared what we plan to do with each element of the input stream.

Running the StreamT

In the StreamT object in Scalaz there is this method call runStreamT taking in:

  • a StreamT[State[S, *], A], so that's a StreamT where the state may change with each element of the Stream
  • an initial state s0
  • and returning a StreamT[Id, A]

Note: State[S, *] is a non-existing notation for ({type l[a]=State[S, a]})#l

That doesn't see to be game-changing, but this does a useful thing for us. runStreamT "threads" the State through all elements, starting with the initial value. In our case that means that we're going to create IO actions, where each created action depends on the current state (the number of lines read so far).

The cool thing is that so far we've described transformations to apply: Stream[String] to StreamT[State, IO[B]] to StreamT[Id, IO[B]] but we still haven't executed anything!

UnsafePerformIO at last

Then, I coded an unsafePerformIO method specialized for Stream[Id, IO[A]] to execute all the IO actions. The method itself is not difficult to write but we need to make sure it's tail-recursive:

  /** execute all the IO actions on a StreamT when it only contains IO actions */
  def unsafePerformIO: Seq[A] = toSeq(streamT, Seq[A]())

   * execute a StreamT containing `IO` actions.
   * Each IO action is executed and the result goes into a list of results
  private def toSeq[A](streamT: StreamT[Id, IO[A]], result: Seq[A]): Seq[A] =
    streamT.uncons match {
      case Some((head, tail)) => toSeq(tail, result :+ head.unsafePerformIO)
      case None               => result
Hidding everything under the rug

Let's assemble the pieces of the puzzle now. With an implicit conversion, it is possible to transform any Seq to a StreamT performing some action on its elements:

    * for this to work, M[_] has to be a "PointedFunctor", i.e. have "pure" and
    * "fmap" methods. It is implemented using the `unfoldM` method code seen above
   seq.toStreamT(f: A => M[B]): StreamT[M, B]

Then, we can traverse a whole sequence with a composition of a State and IO actions:


Where traverseStateIO is using the toStreamT transformation and is defined as:

   * traverse a stream with a State and IO actions by using a Stream transformer
  def traverseStateIO[S, B](f: A => State[S, IO[B]])(init: S): Seq[B] =
    StreamT.runStreamT(seq.toStreamT[State[S, *], IO[B]](f), init).unsafePerformIO

[you'll get bonus points for anyone providing a Traverse[Stream] instance based on StreamT - if that exists]

Finally the Source.readLines method is implemented as:

   * this method reads the lines of a file and apply stateful actions.
   * @return an IO action doing all the reading and return a value for each processed line
  def readLinesIO[S, A](path: String)(f: String => State[S, IO[A]])(init: S): IO[Seq[A]] =

  def readLines[S, A](path: String)(f: String => State[S, IO[A]])(init: S): Seq[A] = {
    // store the opened resource
    var source: Option[] = None
    def getSourceLines(src: = { source = Some(src); getLines(src) }

    try {
      // read the lines and execute the actions
    } finally { }

And the client code, the Extractor class does:

  val counter = new LogarithmicCounter(level = 100, scale = 10)

  // notify the user when we've reached a given level
  val onLevel            = (i: Int) => printTime("imported from "+filePath+": "+i+" lines")
  // create and store a record for each new line
  def onCount(l: String) = (i: Int) => createAndStoreRecord(Line(file, l), creator)

  // an `IO` action doing the extraction
  readLinesIO(file.getPath)(l => counter.asState(onLevel, onCount(l)))(counter)


I've had more than one WTF moment when trying to mix-in State, IO and resources management together. I've been very tempted to go back to vars and unrestricted effects :-).

Yet, I got to learn useful abstractions like StreamT and I've seen that it was indeed possible to define generic, composable and functional interfaces between components. I also got the impression that there lots of different situations where we are manually passing state around which is error-prone and which can be done generically by traverse-like methods.

No comments: