Pages

06 March 2014

Streaming with previous and next

The Scalaz streams library is very attractive but it might feel unfamiliar because this is not your standard collection library.

This short post shows how to produce a stream of elements from another stream so that we get a triplet with: the previous element, the current element, the next element.

With Scala collections

With regular Scala collections, this is not too hard. We first create a list of all the previous elements. We create them as options because there will not be a previous element for the first element of the list. Then we create a list of next elements (also a list of options) and we zip everything with the input list:

def withPreviousAndNext[T] = (list: List[T]) => {
  val previousElements = None +: list.map(Some(_)).dropRight(1)
  val nextElements     = list.drop(1).map(Some(_)) :+ None

  // plus some flattening of the triplet
  (previousElements zip list zip nextElements) map { case ((a, b), c) => (a, b, c) }
}

withPreviousAndNext(List(1, 2, 3))

> List((None,1,Some(2)), (Some(1),2,Some(3)), (Some(2),3,None))

And streams

The code above can be translated pretty straightforwardly to scalaz processes:

def withPreviousAndNext[F[_], T] = (p: Process[F, T]) => {
  val previousElements = emit(None) fby p.map(Some(_))
  val nextElements     = p.drop(1).map(Some(_)) fby emit(None)

  (previousElements zip p zip nextElements).map { case ((a, b), c) => (a, b, c) }
}

val p1 = emitAll((1 to 3).toSeq).toSource
withPreviousAndNext(p1).runLog.run

> Vector((None,1,Some(2)), (Some(1),2,Some(3)), (Some(2),3,None))

However what we generally want with streams is combinators which you can pipe onto a given Process. We want to write

def withPreviousAndNext[T]: Process1[T, T] = ???

val p1 = emitAll((1 to 3).toSeq).toSource
// produces the stream of (previous, current, next)
p1 |> withPreviousAndNext

How can we write this?

As a combinator

The trick is to use recursion to keep state and this is actually how many of the process1 combinators in the library are written. Let's see how this works on a simpler example. What happens if we just want a stream where elements are zipped with their previous value? Here is what we can write:

def withPrevious[T]: Process1[T, (Option[T], T)] = {

  def go(previous: Option[T]): Process1[T, (Option[T], T)] =
    await1[T].flatMap { current =>
      emit((previous, current)) fby go(Some(current))
    }

  go(None)
}

val p1 = emitAll((1 to 3).toSeq).toSource
(p1 |> withPrevious).runLog.run

> Vector((None,1), (Some(1),2), (Some(2),3))

Inside the withPrevious method we recursively call go with the state we need to track. In this case we want to keep track of each previous element (and the first call is with None because there is no previous element for the first element of the stream). Then go awaits a new element. Each time there is a new element, we emit it, then call recursively go which is again going to wait for the next element, knowing that the new previous element is now current.

We can do something similar, but a bit more complex for withNext:

def withNext[T]: Process1[T, (T, Option[T])] = {
  def go(current: Option[T]): Process1[T, (T, Option[T])] =
    await1[T].flatMap { next =>
      current match {
        // accumulate the first element
        case None    => go(Some(next))
        // if we have a current element, emit it with the next
        // but when there's no more next, emit it with None
        case Some(c) => (emit((c, Some(next))) fby go(Some(next))).orElse(emit((c, None)))
      }
    }

  go(None)
}

val p1 = emitAll((1 to 3).toSeq).toSource
(p1 |> withNext).runLog.run

> Vector((1,Some(2)), (2,Some(3)), (2,None))

Here, we start by accumulating the first element of the stream, and then, when we get to the next, we emit both of them. And we make a recursive call remembering what is now the current element. But the process we return in flatMap has an orElse clause. It says "by the way, if you don't have anymore elements (no more next), just emit current and None".

Now with both withPrevious and withNext we can create a withPreviousAndNext process:

def withPreviousAndNext[T]: Process1[T, (Option[T], T, Option[T])] = {
  def go(previous: Option[T], current: Option[T]): Process1[T, (Option[T], T, Option[T])] =
    await1[T].flatMap { next =>
      current.map { c =>
        emit((previous, c, Some(next))) fby go(Some(c), Some(next))
      }.getOrElse(
          go(previous, Some(next))
        ).orElse(emit((current, next, None)))
    }
  go(None, None)
}

val p1 = emitAll((1 to 3).toSeq).toSource
(p1 |> withPreviousAndNext).runLog.run

> Vector((None,1,Some(2)), (Some(1),2,Some(3)), (Some(2),3,None))

The code is pretty similar but this time we keep track of both the "previous" element and the "current" one.

emit(last paragraph)

I hope this will help beginners like me to get started with scalaz-stream and I'd be happy if scalaz-stream experts out there leave comments if there's anything which can be improved (is there an effective way to combine withPrevious and withNext to get withPreviousAndNext?

I finally need to add that, in order to get proper performance/side-effect control for the withNext and withPreviousAndNext processes you need to use the lazy branch of scalaz-stream. It contains a fix for orElse which prevents it to be evaluated more than necessary.

2 comments:

Mortimer said...

Hello,
you say "what we generally want with streams is combinators which you can pipe onto a given Process", why is that? Is it because zip would load the whole stream in memory?

Eric said...

Hi Mortimer,

I think that this is even better because you have less to assume about the initial process. In particular, you don't care about the `F` context in `Process[F, A]`.