The Scalaz streams library is very attractive but it might feel unfamiliar because this is not your standard collection library.
This short post shows how to produce a stream of elements from another stream so that we get a triplet with: the previous element, the current element, the next element.
With Scala collections
With regular Scala collections, this is not too hard. We first create a list of all the previous elements. We create them as options because there will not be a previous element for the first element of the list. Then we create a list of next elements (also a list of options) and we zip everything with the input list:
def withPreviousAndNext[T] = (list: List[T]) => {
val previousElements = None +: list.map(Some(_)).dropRight(1)
val nextElements = list.drop(1).map(Some(_)) :+ None
// plus some flattening of the triplet
(previousElements zip list zip nextElements) map { case ((a, b), c) => (a, b, c) }
}
withPreviousAndNext(List(1, 2, 3))
> List((None,1,Some(2)), (Some(1),2,Some(3)), (Some(2),3,None))
And streams
The code above can be translated pretty straightforwardly to scalaz processes:
def withPreviousAndNext[F[_], T] = (p: Process[F, T]) => {
val previousElements = emit(None) fby p.map(Some(_))
val nextElements = p.drop(1).map(Some(_)) fby emit(None)
(previousElements zip p zip nextElements).map { case ((a, b), c) => (a, b, c) }
}
val p1 = emitAll((1 to 3).toSeq).toSource
withPreviousAndNext(p1).runLog.run
> Vector((None,1,Some(2)), (Some(1),2,Some(3)), (Some(2),3,None))
However what we generally want with streams is combinators which you can pipe onto a given Process. We want to write
def withPreviousAndNext[T]: Process1[T, T] = ???
val p1 = emitAll((1 to 3).toSeq).toSource
// produces the stream of (previous, current, next)
p1 |> withPreviousAndNext
How can we write this?
As a combinator
The trick is to use recursion to keep state and this is actually how many of the process1
combinators in the library are written. Let's see how this works on a simpler example. What happens if we just want a stream where elements are zipped with their previous value? Here is what we can write:
def withPrevious[T]: Process1[T, (Option[T], T)] = {
def go(previous: Option[T]): Process1[T, (Option[T], T)] =
await1[T].flatMap { current =>
emit((previous, current)) fby go(Some(current))
}
go(None)
}
val p1 = emitAll((1 to 3).toSeq).toSource
(p1 |> withPrevious).runLog.run
> Vector((None,1), (Some(1),2), (Some(2),3))
Inside the withPrevious
method we recursively call go
with the state we need to track. In this case we want to keep track of each previous element (and the first call is with None
because there is no previous element for the first element of the stream). Then go
awaits a new element. Each time there is a new element, we emit
it, then call recursively go
which is again going to wait for the next element, knowing that the new previous element is now current
.
We can do something similar, but a bit more complex for withNext
:
def withNext[T]: Process1[T, (T, Option[T])] = {
def go(current: Option[T]): Process1[T, (T, Option[T])] =
await1[T].flatMap { next =>
current match {
// accumulate the first element
case None => go(Some(next))
// if we have a current element, emit it with the next
// but when there's no more next, emit it with None
case Some(c) => (emit((c, Some(next))) fby go(Some(next))).orElse(emit((c, None)))
}
}
go(None)
}
val p1 = emitAll((1 to 3).toSeq).toSource
(p1 |> withNext).runLog.run
> Vector((1,Some(2)), (2,Some(3)), (2,None))
Here, we start by accumulating the first element of the stream, and then, when we get to the next, we emit both of them. And we make a recursive call remembering what is now the current element. But the process we return in flatMap
has an orElse
clause. It says "by the way, if you don't have anymore elements (no more next
), just emit current and None
".
Now with both withPrevious
and withNext
we can create a withPreviousAndNext
process:
def withPreviousAndNext[T]: Process1[T, (Option[T], T, Option[T])] = {
def go(previous: Option[T], current: Option[T]): Process1[T, (Option[T], T, Option[T])] =
await1[T].flatMap { next =>
current.map { c =>
emit((previous, c, Some(next))) fby go(Some(c), Some(next))
}.getOrElse(
go(previous, Some(next))
).orElse(emit((current, next, None)))
}
go(None, None)
}
val p1 = emitAll((1 to 3).toSeq).toSource
(p1 |> withPreviousAndNext).runLog.run
> Vector((None,1,Some(2)), (Some(1),2,Some(3)), (Some(2),3,None))
The code is pretty similar but this time we keep track of both the "previous" element and the "current" one.
emit(last paragraph)
I hope this will help beginners like me to get started with scalaz-stream and I'd be happy if scalaz-stream experts out there leave comments if there's anything which can be improved (is there an effective way to combine withPrevious
and withNext
to get withPreviousAndNext
?
I finally need to add that, in order to get proper performance/side-effect control for the withNext
and withPreviousAndNext
processes you need to use the lazy branch of scalaz-stream. It contains a fix for orElse
which prevents it to be evaluated more than necessary.