09 December 2011

Pragmatic IO - part 3

A follow-up to my previous posts about `IO`.

Why types and laws matter

It's embarrassing to write post after post only to find out I keep being wrong :-). While the technique I described in my last post (using `StreamT`) works ok, I actually don't have to use it. The `traverse` technique explained in EIP works perfectly, provided that:

• we write a proper `Traverse` instance using `foldLeft` (as explained in the first part of this post)

• we `trampoline` the `State` to avoid Stack overflows due to a long chain of `flatMaps` during the traversal

• we use the right types!

Get the types right

That's the point I want to insist on today. Last evening I read the revisited WordCount example in Scalaz-seven which is like the canonical example of using the EIP ideas. Two words in the comments struck my mind: "compose" and "fuse". Indeed, one very important thing about EIP is the ability to compose `Applicatives` so that their actions should "fuse" during a traversal. As if they were executed in the same "for" loop!

So, when I wrote in my previous post that I needed to traverse the stream of lines several times to get the results, something had to be wrong somewhere. The types were wrong!

1. instead of traversing with `State[S, *]` where `* =:= IO[B]`, I should traverse with `State[S, IO[*]]`
2. what I get back is a `State[S, IO[Seq[B]]]`, instead of a `State[S, Seq[IO[B]]`
3. this matters because passing in an initial state then returns `IO[Seq[B]]` instead of `Seq[IO[B]]`

Exactly what I want, without having to `sequence` anything.

Use the laws

Not only I get what I want, but also it is conceptually right as the result of an important `traverse` law:

``````  traverse(f compose g) == traverse(f) compose traverse(g)
``````

That "fusion" law guarantees that composing 2 effects can be fused, and executed in only one traversal. It's worth instantiating `f` and `g` to make that more concrete:

``````  // what to do with the line and line number
def importLine(i: Int, l: String): (Int, IO[Unit]) =
(i, storeLine(l) >>= println("imported line "+i))

// `g` keeps track of the line number
val g : String => State[Int, String] =
(s: String) => state((i: Int) => (i+1, s))

// `f` takes the current line/line number and do the import/reporting
val f : State[Int, String] => State[Int, IO[Unit]] =
st => state((i: Int) => importLine(st(i)))

// `f compose g` fuses both actions
val f_g: String => State[Int, IO[Unit]] = f compose g
``````

I'm really glad that I was able to converge on established principles. Why couldn't I see this earlier?

Scala and Scalaz-seven might help

In retrospect, I remember what led me astray. When I realized that I had to use a State transformer with a `Trampoline`, I just got scared by the `Applicative` instance I had to provide. Its type is:

``````  /**
* Here we want M1 to `Trampoline` and M2 to be `IO`
*/
implicit def StateTMApplicative[M1[_]: Monad, S, M2[_] : Applicative] =
Applicative.applicative[({type l[a] = StateT[M1, S, M2[a]]})#l]
``````

I also need some `Pure` and `Apply` instances in scope:

``````  implicit def StateTMApply[M1[_]: Monad, S, M2[_] : Applicative] =
new Apply[({type l[a]=StateT[M1, S, M2[a]]})#l] {
def apply[A, B](f: StateT[M1, S, M2[A => B]], a: StateT[M1, S, M2[A]]) =
f.flatMap(ff => a.map(aa => aa <*> ff))
}

implicit def StateTMPure[M1[_] : Pure, S, M2[_] : Pure] =
new Pure[({type l[a]=StateT[M1, S, M2[a]]})#l] {
def pure[A](a: => A) = stateT((s: S) => (s, a.pure[M2]).pure[M1])
}
``````

Once it's written, it may not seem so hard but I got very confused trying to get there. How can it be made easier? First, we could have better type annotations for partial type application, like:

``````  // notice the * instead of the "type l" trick
implicit def StateTMApplicative[M1[_]: Monad, S, M2[_] : Applicative] =
Applicative.applicative[StateT[M1, S, M2[*]]]

implicit def StateTMApply[M1[_]: Monad, S, M2[_] : Applicative] =
new Apply[StateT[M1, S, M2[*]]] {
def apply[A, B](f: StateT[M1, S, M2[A => B]], a: StateT[M1, S, M2[A]]) =
f.flatMap(ff => a.map(aa => aa <*> ff))
}

implicit def StateTMPure[M1[_] : Pure, S, M2[_] : Pure] =
new Pure[StateT[M1, S, M2[*]]] {
def pure[A](a: => A) = stateT((s: S) => (s, a.pure[M2]).pure[M1])
}
``````

And with a better type inference, the first definition could be even be (we can always dream :-)):

``````  implicit def StateTMApplicative[M1[_]: Monad, S, M2[_] : Applicative]:
Applicative[StateT[M1, S, M2[*]]] = Applicative.applicative
``````

Which means that it may even be removed, just import `Applicative.applicative`!

Actually Scalaz-seven might help by:

• providing those instances out-of-the box

• even better, provide combinators to create those instances easily. That's what the `compose` method does

• give even better type inference. Look at the `traverseU` method here, no type annotations Ma!

Now that the fundamentals are working ok for my application, I can go back to adding features, yay!

08 December 2011

Pragmatic IO - part 2

A follow-up to my previous post about `IO`.

Learning by being wrong, very wrong

I didn't think that diving into the functional `IO` world would have me think so hard about how to use some FP concepts.

Mind the law

The technique I described at the end of my previous post didn't really work. It was wrong on many accounts. First of all, my
`Traverse` instance was buggy because I was reversing the traversed `Stream`. This is why I had inverted messages in the console. During the traversal, starting from the left, I needed to append each traversed element to the result, not prepend it:

``````  /**
* WRONG: because each new element `b` must be appended to the result.
* it should be bs :+ b instead
*/
def SeqLeftTraverse: Traverse[Seq] = new Traverse[Seq] {
def traverse[F[_]: Applicative, A, B](f: A => F[B], as: Seq[A]): F[Seq[B]] =
as.foldl[F[Seq[B]]](Seq[B]().pure) { (ys, x) =>
implicitly[Apply[F]].apply(f(x) map ((b: B) => (bs: Seq[B]) => b +: bs), ys)
}
}
``````

It is important to mention here that proper testing should have caught this. There are some laws that applicative traversals must satisfy (see EIP, section 5. One of them is that `seq.traverse(identity) == seq`. This is obviously not the case when I returned a reverted sequence.

Not at the right time

Then I realized something also which was also worrying. On big files, my application would do a lot, then, report its activity to the console. This would definitely not have happened in a simple for loop with unrestricted effects!

So I set out to find a better implementation for my file reading / records saving. The solution I'm going to present here is just one way of doing it, given my way of constraining the problem. There are others, specifically the use of `Iteratees`.

My goal

This is what I want to do: I want to read lines from a file, transform them into "records" and store them in a database. Along the way, I want to inform the user of the progress of the task, based on the number of lines already imported.

More precisely, I want 2 classes with the following interfaces:

• the `Source` class is capable of reading lines and do something on each line, possibly based on some state, like the number of lines already read.
The `Source` class should also be responsible for closing all the opened resources whether things go wrong or not

• the `Extractor` class should provide a function declaring what to do with the current line and state. Ideally it should be possible to create that function by composing independent actions: storing records in the db and / or printing messages on the console

• and, by the way, I don't want this to go out-of-memory or stack overflow at run-time just because I'm importing a big file :-)

Mind you, it took me quite a while to arrive there. In retrospect, I could have noticed that:

• all the processing has to take place inside the `Source.readLines` method. Otherwise there's no way to close the resources properly (well unless you're using Iteratees, what Paul Chiusano refers to as "Consumer" processing)

• the signature of the function passed to `Source.readLines` has to be `String => State[IO[A]]`, meaning that for each line, we're doing an `IO` action (like saving it to the database) but based on a `State`

• the end result of `readLines` has to be `IO[Seq[A]]` which is an action returning the sequence of all the return values when all the `IO` actions have been performed

Considering all that, I ended up with the following signatures:

``````  /**
* @param  filePath the file to read from
* @f      a function doing an IO action for each line, based on a State
* @init   the initial value of the State
* @return an IO action reading all the lines and returning the sequence of computed values
*/
def readLinesIO[S, A](filePath: String)(f: String => State[S, IO[A]])(init: S): IO[Seq[A]]
``````

and the `Extractor` code goes:

``````  // the initial state
val counter = new LogarithmicCounter(level = 100, scale = 10)

// a function to notify the user when we've reached a given level
val onLevel            = (i: Int) => printTime("imported from "+filePath+": "+i+" lines")
// a function to create and store each line
def onCount(l: String) = (i: Int) => createAndStoreRecord(Line(file, l), creator)

// for each line, apply the actions described as a State[LogarithmicCounter, IO[A]] object
``````

I like that interface because it keeps things separated and generic enough:

• reading the lines and opening/closing the resources goes to one class: `Source`
• defining how the state evolves when an action is done goes to the `LogarithmicCounter` class
• defining what to do exactly with each line goes in one class, the `Extractor` class

Now, the tricky implementation question is: how do you implement `readLinesIO`?

What not to do

Definitely the wrong way to do it is to use a "regular" `traverse` on the stream of input lines. Even with the trampolining trick described in my previous post.

Indeed `traverse` is going to fold everything once to go from `Stream[String]` to `State[S, Seq[IO[B]]]` then, with the initial `State` value, to `Seq[IO[B]]` which then has to be `sequence`d into `IO[Seq[B]]`. This is guaranteed to do more work than necessary. More than what a single `for` loop would do.

The other thing not to do is so silly that I'm ashamed to write it here. Ok, I'll write it. At least to avoid someone else the same idea. I had an implicit conversion from `A` to `IO[A]`,...

That's a very tempting thing to do and very convenient at first glance. Whenever a function was expecting an `IO[A]`, I could just pass `a` without having to write `a.pure[IO]`. The trouble is, some actions were never executed! I don't have a specific example to show, but I'm pretty sure that, at some point, I had values of type `IO[IO[A]]`. In that case, doing `unsafePerformIO`, i.e. "executing" the `IO` action only returns another action to execute and doesn't do anything really!

I must admit I've been reluctant to go into what was advised on the Scalaz mailing-list: "go with StreamT", "use Iteratees". Really? I just want to do `IO`! Can I learn all that stuff later? I finally decided to go with the first option and describe it here in detail.

`StreamT` is a "Stream transformer". It is the Scala version of the `ListT` done right in Haskell. That looks a bit scary at first but it is not so hard. It is an infinite list of elements which are not values but computations. So it more or less has the type `Seq[M[A]]` where `M` is some kind of effect, like changing a `State` or doing `IO`.

My first practical question was: "how do I even build that thing from a Stream of elements?"

Unfolding a data structure

Unfolding is the way. In the Scala world we are used to "folding" data structures into elements: get the sum of the elements of a list, get the maximum element in a tree. We less often do the opposite: generate a data structure from one element and a function. One example of this is generating the list of Fibonacci numbers out of an initial pair `(0, 1)` and a function computing the next "Fibonacci step".

With the `StreamT` class in Scalaz, you do this:

``````  /** our record-creation function */
def f[B]: String => M[B]

/**
* unfolding the initial Stream to a StreamT where we apply
* a `State` function `f` to each element
*/
StreamT.unfoldM[M, B, Seq[A]](seq: Seq[A]) { (ss: Seq[A]) =>
if (ss.isEmpty)  (None: Option[(B, Seq[A])]).pure[M]
else              f(ss.head).map((b: B) => Some((b, ss.tail)))
}
``````

The code above takes an initial sequence `seq` (the Stream of lines to read) and:

• if there are no more elements, you return `None.pure[M]`, there's nothing left to do. In my specific use case that'll be `State[LogarithmicCounter, None]`

• if there is an element in the `Stream`, `f` is applied to that element, that is, we create a record from the line and store it in the database (that's the `b:B` parameter, which is going to be an `IO` action)

• because `M` is the `State` monad, when we apply `f`, this also computes the next state to keep track of how many rows we've imported so far

• then the rest of the stream of lines to process, `ss.tail`, is also returned and `unfoldM` will use that to compute the next "unfolding" step

One very important thing to notice here is that we haven't consumed any element at that stage. We've only declared what we plan to do with each element of the input stream.

Running the StreamT

In the `StreamT` object in Scalaz there is this method call `runStreamT` taking in:

• a `StreamT[State[S, *], A]`, so that's a `StreamT` where the state may change with each element of the `Stream`
• an initial state `s0`
• and returning a `StreamT[Id, A]`

Note: `State[S, *]` is a non-existing notation for `({type l[a]=State[S, a]})#l`

That doesn't see to be game-changing, but this does a useful thing for us. `runStreamT` "threads" the `State` through all elements, starting with the initial value. In our case that means that we're going to create IO actions, where each created action depends on the current state (the number of lines read so far).

The cool thing is that so far we've described transformations to apply: `Stream[String]` to `StreamT[State, IO[B]]` to `StreamT[Id, IO[B]]` but we still haven't executed anything!

UnsafePerformIO at last

Then, I coded an `unsafePerformIO` method specialized for `Stream[Id, IO[A]]` to execute all the `IO` actions. The method itself is not difficult to write but we need to make sure it's tail-recursive:

``````  /** execute all the IO actions on a StreamT when it only contains IO actions */
def unsafePerformIO: Seq[A] = toSeq(streamT, Seq[A]())

/**
* execute a StreamT containing `IO` actions.
*
* Each IO action is executed and the result goes into a list of results
*/
@tailrec
private def toSeq[A](streamT: StreamT[Id, IO[A]], result: Seq[A]): Seq[A] =
streamT.uncons match {
case None               => result
}
``````
Hidding everything under the rug

Let's assemble the pieces of the puzzle now. With an implicit conversion, it is possible to transform any `Seq` to a `StreamT` performing some action on its elements:

``````   /**
* for this to work, M[_] has to be a "PointedFunctor", i.e. have "pure" and
* "fmap" methods. It is implemented using the `unfoldM` method code seen above
*/
seq.toStreamT(f: A => M[B]): StreamT[M, B]
``````

Then, we can traverse a whole sequence with a composition of a `State` and `IO` actions:

``````   seq.traverseStateIO(f)
``````

Where `traverseStateIO` is using the `toStreamT` transformation and is defined as:

``````  /**
* traverse a stream with a State and IO actions by using a Stream transformer
*/
def traverseStateIO[S, B](f: A => State[S, IO[B]])(init: S): Seq[B] =
StreamT.runStreamT(seq.toStreamT[State[S, *], IO[B]](f), init).unsafePerformIO
``````

[you'll get bonus points for anyone providing a `Traverse[Stream]` instance based on `StreamT` - if that exists]

Finally the `Source.readLines` method is implemented as:

``````  /**
* this method reads the lines of a file and apply stateful actions.
* @return an IO action doing all the reading and return a value for each processed line
*/
def readLinesIO[S, A](path: String)(f: String => State[S, IO[A]])(init: S): IO[Seq[A]] =

private
def readLines[S, A](path: String)(f: String => State[S, IO[A]])(init: S): Seq[A] = {
// store the opened resource
var source: Option[scala.io.Source] = None
def getSourceLines(src: scala.io.Source) = { source = Some(src); getLines(src) }

try {
// read the lines and execute the actions
getSourceLines(fromFile(path)).toSeq.traverseStateIO(f)(init)
} finally { sources.map(_.close()) }
}
``````

And the client code, the `Extractor` class does:

``````  val counter = new LogarithmicCounter(level = 100, scale = 10)

// notify the user when we've reached a given level
val onLevel            = (i: Int) => printTime("imported from "+filePath+": "+i+" lines")
// create and store a record for each new line
def onCount(l: String) = (i: Int) => createAndStoreRecord(Line(file, l), creator)

// an `IO` action doing the extraction
``````

Conclusion

I've had more than one WTF moment when trying to mix-in `State`, `IO` and resources management together. I've been very tempted to go back to vars and unrestricted effects :-).

Yet, I got to learn useful abstractions like `StreamT` and I've seen that it was indeed possible to define generic, composable and functional interfaces between components. I also got the impression that there lots of different situations where we are manually passing state around which is error-prone and which can be done generically by `traverse`-like methods.

05 December 2011

Pragmatic IO

This post is my exploration of the use of the `IO` type in a small Scala application.

A short `IO` introduction

Scala is a pragmatic language when you start learning Functional Programming (FP). Indeed there are some times when you can't apply the proper FP techniques just because you don't know them yet. In those cases you can still resort to variables and side-effects to your heart's content. One of these situations is IO (input/output).

Let's take an example: you need to save data in a database (not a rare thing :-)). The signature of your method will certainly look like that:

``````  /** @return the unique database id of the saved user */
def save(user: User): Int
``````

This method is not referentially transparent since it changes the state of the "outside world": if you call it twice, you'll get a different result. This kind of thing is very troubling for a Functional Programmer, that doesn't make him sleep well at night. And anyway, in a language like Haskell, where each function is pure you can't implement it. So how do you do?

The neat trick is to return an action saying "I'm going to save the user" instead of actually saving it:

``````  def save(user: User): IO[Int]
``````

At first, it seems difficult to do anything from there because we just have an `IO[Int]`, not a real `Int`. If we want to display the result on the screen, what do we do?

We use `IO` as a `Monad`, with the `flatMap` operation, to sequence the 2 actions, into a new one:

``````  /** create an IO action to print a line on the screen */
def printLine(line: Any): IO[Unit] = println(line).pure[IO]

/** save a user and print the new id on the screen */
// or save(user) >>= printLine
def saveAndPrint(user: User): IO[Unit] = save(user).flatMap(id => printLine(id))
``````

Then finally when we want to execute the actions for real, we call `unsafePerformIO` in the `main` method:

``````  def main(args: Array[String]) {
saveAndPrint(User("Eric")).unsafePerformIO
}
``````

That's more or less the Haskell way to do IO in Scala but nothing forces us to do so. The natural question which arises is: what should we do?

`IO` or not `IO`

Before starting this experiment, I fully re-read this thread on the Scala mailing-list, and the answer is not clear for many people apparently. Martin Odersky doesn't seemed convinced that this is the way to go. He thinks that there should be a more lightweight and polymorphic way to get effect checking and Bob Harper doesn't see the point.

Surely the best way to form my own judgement was to try it out by myself (as encouraged by Runar). One thing I know for sure is that, the moment I started printing out files in specs2, I got an uneasy feeling that something could/would go wrong. Since then,
`IO` has been on my radar of things to try out.

Luckily, these days, I'm developing an application which is just the perfect candidate for that. This application:

• stores the records in a MongoDB database
• creates graphs to analyze the data (maximums, average, distribution of some variables)

Perfect but,... where do I start :-)?

Theory => Practice

That was indeed my first question. Even in a small Swing application like mine the effects were interleaved in many places:

• when I opened the MongoDB connection, I was printing some messages on the application console to inform the user about the db name, the MongoDB version,...
• the class responsible for creating the reports was fetching its own data, effectively doing IO
• datastore queries are cached, and getting some records (an IO action) required to get other data (the date of the latest import, another IO action) to decide if we could reuse the cached ones instead

The other obvious question was: "where do I call `unsafePerformIO`?". This in an interactive application, I can't put just one
`unsafePerformIO` in the main method!

The last question was: "OMG, I started to put IO in a few places, now it's eating all of my application, what can I do?" :-))

Segregating the effects

Here's what I ended up doing. First of all let's draw a diagram of the main components of my application before refactoring:

``````   +-------+    extract/getReport       +------ IO +   store      +------ IO +
+ Gui   + <------------------------> + Process  + -----------> + Store    +
+-------+     Query/LogReport        +----------+              +----------+
| getReport               ^
v                         |
+------ IO +     getRecords    |
+ Reports  + ------------------+
+----------+
``````

Simple app, I told you.

The first issue was that the `Reports` class (actually a bit more than just one class) was fetching records and building reports at the same time. So if I decided to put `IO` types on the store I would drag them in any subsequent transformation (marked as `IO` on the diagram).

The next issue was that the `Process` class was also doing too much: reading files and storing records.

Those 2 things led me to this refactoring and "architectural" decision, `IO` annotations would only happen on the middle layer:

``````                                        +------- IO +            store
files    +-----------+          |                   |
^                 |                   |
| extract         v                   v
+-------+    extract/getReport       +----------+      +------ IO +        +----------+
+ Gui   + <------------------------> + Process  +      + Status   +        + Store    +
+-------+     Query/LogReport        +----------+      +----------+        +----------+
| getReport       ^                   ^
v                 |                   |
+------ IO +           |    getRecords     |
+ Reporter + ------------------------------+
+----------+
| createReport
v
+----------+
+ Reports  +
+----------+
``````
• all the `IO` actions are being segregrated to special classes. For example extracting lines for log files creates a big composite `IO` action to read the lines, to store the lines as a record and to report the status of the extraction. This is handled by the `Extractor` class

• the `Status` class handles all the printing on the the console. It provides methods like:

``````  /**
* print a message with a timestamp, execute an action,
* and print an end message with a timestamp also.
*
* This method used by the `Extractor`, `Reporter` and `Process` classes.
*/
def statusIO[T](before: String, action: IO[T], after: String): IO[T]`
``````
• the `Reports` class only deals with the aggregation of data from records coming from the `Store`. However it is completely ignorant of this fact, so testing becomes easier (true story, that really helped!)

• the `Store` does not have any `IO` type. In that sense my approach is not very pure, since nothing prevents "someone" from directly calling the store from the middle layer without encapsulating the call in an `IO` action. I might argue that this is pragmatic. Instead of sprinkling `IO` everywhere I started by defining a layer isolating the `IO` world (the data store, the file system) from the non-`IO` world (the reports, the GUI. Then, if I want, I can extend the `IO` marking the rest of the application if I want to. That being said I didn't do it because I didn't really see the added-value at that stage. A name for this approach could be "gradual effecting" (the equivalent of "gradual typing")

• the `Process` class is not marked as `IO` because all the methods provided by that class are actually calling
`unsafePerformIO` to really execute the actions. This is the only place in the application where this kind of call occurs and the GUI layer could be left unchanged

All of that was not too hard, but was not a walk in the park either. Some details of the implementation were hard to come up with.

Under the bonnet: syntactic tricks

First of all, what was easy?

Sequencing `IO` actions is indeed easy. I've used the `for` comprehension:

``````  def statusIO[T](before: String, action: IO[T], after: String): IO[T] = {
for {
_      <- printTime(before)
result <- action
_      <- printTime(after)
} yield result
}
``````
"Semi-column" sequencing

But also the `>>=|` operator in Scalaz to just sequence 2 actions when you don't care about the first one:

``````  def statusIO[T](before: String)(action: IO[T]): IO[T] = printTime(before) >>=| action
``````
"Conditional" sequencing

And, for the fun of it, I implemented a "conditional flatMap operator", `>>=?`:

``````  action1 >>=? (condition, action2)
``````

In the code above the `action1` is executed and, if the `condition` is true, we execute `action2` and forget about the result of `action1`, otherwise we keep the result of `action1`.

"if else" with a twist

Not completely related to `IO` I also liked the `??` operator. Say I want to execute an action only if a file exists:

``````   if (file.exists) createRecords(file)
else             ()
``````

The `else` line really feels ugly. This kind of "default value" should be inferred from somewhere. Indeed! This "default value" is the `Zero` of a given type in Scalaz, so it is possible to write:

``````   file.exists ?? createRecords(file)
``````

It just works (tm) but it's worth knowing where that zero values really comes from:

1. `createRecords(file)` returns `IO[Int]` (the last created id - that's a simplification of the real program)

2. there is a way to create a `Monoid` from another `Monoid` + an `Applicative`:

``````// from scalaz.Monoid
/** A monoid for sequencing Applicative effects. */
def liftMonoid[F[_], M](implicit m: Monoid[M], a: Applicative[F]): Monoid[F[M]] =
new Monoid[F[M]] {
val zero: F[M] = a.pure(m.zero)
def append(x: F[M], y: => F[M]): F[M] =
a.liftA2(x, y, (m1: M, m2: M) => m.append(m1, m2))
}
``````
3. in this case `IO` has an `Applicative`, `M` is `Int` so it has a `Monoid` hence `IO[Int]` defines a `Monoid` where the zero is `IO(0)`. I had to open up the debugger to check that precisely :-)

Under the bonnet: it just works,... not

The next piece of implementation I was really happy with was the "logarithmic reporting". This is a feature I implemented using vars at first, which I wanted to make pure in my quest for `IO`.

What I want is to extract log lines and notify the user when a bunch of lines have been imported (so that he doesn't get too bored). But I don't know how many lines there are in a given file. It could be 100 but it could be 30.000 or 100.000. So I thought that a "logarithmic" counter would be nice. With that counter, I notify the user every 100, 1000, 10.000, 100.000 lines.

The `LogarithmicCounter` works by creating a `State` object encapsulating 2 actions to do, one on each `tick`, one when a
`level` is reached:

``````    // create and store a line on each `tick`
def onCount(line: String) = (i: Int) => createAndStoreRecord(line, creator)

// and notify the user when we've reached a given level
val onLevel = (i: Int) => printTime("imported from "+filePath+": "+i+" lines")

/** @return a State[LogarithmicCounter, IO[Unit]] */
val readLine = (line: String) => counter.asState(onLevel, onCount(line))
``````

The `readLine` method is used in a `traversal` of the lines returned by a `FileReader`:

``````    (lines traverseState readLine): State[LogarithmicCounter, Stream[IO[Unit]]]
``````

Pass it an initial counter and you get back a `Stream` of `IO` actions which you can then `sequence` to get an `IO[Stream[Unit]]`:

``````    // initial traversal with a function returning a `State`
val traversed: State[LogarithmicCounter, Stream[IO[Unit]]] = lines traverseState readLine

// feed in the initial values: count = 1, level = 100 to get the end `State`
val traversedEndState: Stream[IO[Unit]] = traversed ! new LogarithmicCounter

// finally get a global action which will execute a stream of
// record-storing actions and printing actions
val toStore: IO[Stream[Unit]] = traversedEndState.sequence
``````

Some readers of this blog may recognize one usage of the Essence of the Iterator Pattern - EIP and that's indeed what it is (my first real use case, yes!). `traverseState` is just a way to use the more traditional `traverse` method but hiding the ugly type annotations.

This kind of type-directed development is nice. You add some type here and there and you let the compiler guide you to which method to apply in order to get the desired results:

1. after the traversal I get back a `State`
2. if I want to get the final state, the `Stream` of `IO`, I need to feed in some initial values, that's the '!' method
3. if I want to get an action equivalent to the stream of actions, I need the `sequence` method which has exactly the type signature doing what I want

I was about to call it a day when I actually tried my application,... StackOverflow! What?!?

Trampolining to the rescue

It turns out that traversing a "big" sequence with a `State` is not so easy. First of all, `State` is an `Applicative` because it is also a `Monad` (please read the earlier EIP post for the details). So basically this amounts to chaining a lot of `flatMap` operations which blows up the stack.

Fortunately for me, Runar has implemented a generic solution for this kind of issue, like, less than 2 months ago! I leave you to his excellent post for a detailed explanation but the gist of it is to use continuations to describe computations and store them on the heap instead of letting calls happen on the stack. So instead of using `State[S, A]` I use `StateT[Trampoline, S, A]` where each computation `(S, A)` returned by the `State` monad is actually encapsulated in a `Trampoline` to be executed on the heap.

The application of this idea was not too easy at first and Runar helped me with a code snippet (thanks!). Eventually I managed to keep everything well hidden behind the `traverseState` function. The first thing I did was to "trampoline" the function passed to `traverseState`:

``````  /**
* transform a function into its "trampolined" version
* val f:             T => State[S, A]              = (t: T) => State[S, A]
* val f_trampolined: T => StateT[Trampoline, S, A] = f.trampolined
*/
implicit def liftToTrampoline[T, S, A](f: T => State[S, A]) = new LiftToTrampoline(f)

class LiftToTrampoline[T, S, A](f: T => State[S, A]) {
def trampolined = (t: T) => stateT((s: S) => suspend(st.apply(s)))
}
``````

So the `traverseState` function definition becomes:

``````  // with the full ugly type annotation
def traverseState(f: T => State[S, B]) =
seq.traverse[({type l[a]=StateT[Trampoline, S, a]})#l, B](f.trampolined)
``````

However I can't leave things that like that because `traverseState` then returns a `StateT[Trampoline, S, B]` when the client of the function expects a `State[S, B]`. So I added an `untrampolined` method to recover a `State` from a "trampolined" one:

``````  /** @return a normal State from a "trampolined" one */
implicit def fromTrampoline[S, A](st: StateT[Trampoline, S, A]) = new FromTrampoline(st)

class FromTrampoline[S, A](st: StateT[Trampoline, S, A]) {
def untrampolined: State[S, A] = state((s: S) => st(s).run)
}
``````

The end result is not so bad. The "trampoline" trick is hidden as an implementation detail and I don't get StackOverflows anymore. Really? Not really,...

The subtleties of `foldRight` and `foldLeft`

I was still getting a StackOverflow error but not in the same place as before (`#&\$^@!!!`). It was in the traversal function itself, not in the chaining of `flatMaps`. The reason for that one was that the `Traverse` instance for a `Stream` in Scalaz is using `foldRight` (or `foldr`):

``````  implicit def StreamTraverse: Traverse[Stream] = new Traverse[Stream] {
def traverse[F[_]: Applicative, A, B](f: A => F[B], as: Stream[A]): F[Stream[B]] =
as.foldr[F[Stream[B]]](Stream.empty.pure) { (x, ys) =>
implicitly[Apply[F]].apply(f(x) map ((a: B) => (b: Stream[B]) => a #:: b), ys)
}
}
``````

and `foldr` is recursively intensive. It basically says: `foldRight(n) = f(n, foldRight(n-1))` whereas `foldl` is implemented with a for loop and a variable to accumulate the result.

The workaround for this situation is simple: just provide a `Traverse` instance using `foldLeft`. But then you can wonder: "why is `traverse` even using `foldRight` in the first place?". The answer is in my next bug! After doing the modifications above I didn't get a SOE anymore but the output in the console was like:

``````  imported from test.log: 10000 lines [12:33:30]
imported from test.log: 1000 lines  [12:33:30]
imported from test.log: 100 lines   [12:33:30]
``````

Cool, I have invented a Time Machine, Marty! That one left me puzzled for a while but I found the solution if not the explanation. The "left-folding" `Traverse` instance I had left in scope was being used by the `sequence` method to transform a `Stream[IO]` into an `IO[Stream]`. Changing that to the standard "right-folding" behaviour for a `Stream` traversal was ok. So there is a difference (meaning that something is not associative somewhere,...)

Conclusion

The main conclusion from this experiment is that tagging methods with `IO` made me really think about where are the effects of my application. It also encouraged functional programming techniques such as `traverse`, `sequence` and al.

I must however say that I was surprised on more than one account:

• I stumbled upon a whole new class of bugs: non-execution. My effects were not executed improperly, they were not executed at all because I forgot to call `unsafePerformIO`!

• I was not expecting to be needing an optimisation which had just been added to Scalaz, for something which I thought was a casual traversal

• there are still some FP mysteries to me. For example I don't know yet how to traverse a full `Stream`

• I also don't get why my messages were inverted on the console. I tried different experiments, with a `Seq` instead of a `Stream`, with `Identity` or `Option` as an `Applicative` instead of `IO` and I could only reproduce this specific behavior with `Stream` and `IO`

Anyway, I would say that it was overall worth using the `IO` type, at least for the architectural clarification and the better testing. It also brought back the warm, fuzzy feeling that things are under control. On the other hand refactoring to `IO` took me longer than expected and required more knowledge than just using vars and regular I/O. But that should really be accounted for in the 'investment for the future' column.

PS

There are certainly many stones left unturned in that post for programmers new to Functional Programming and Scalaz. I do apologize for that (this post is certainly long enough :-)) and I encourage those readers to ask questions on the Scalaz mailing-list.