10 December 2013

The revenge of the chunks

This series of posts feels like a whole saga for something which should have a quick an easy way to demonstrate the obvious superiority of functional programming over a simple loop. In the first post. Then the second post was about defining proper scalaz-stream combinators to do the same thing, and particularly how to "chunk" the processing in order to get good performances.

However as I was writing unit tests for my requirements I realized that the problem was harder than I thought. In particular, the files I'm processing can have several sections made of HEADERs and TRAILERs. When you create chunks of lines to process this results in a number of combinations that need to be analysed. A chunk can:

  • start with a HEADER but not finish with a TRAILER which is in another chunk
  • contain lines only
  • contains lines + a TRAILER + a new HEADER
  • and so on...

For each of these cases it is necessary to use the current state and the contents of the lines to determine if the file is malformed or not. This is a lot less easy that previously.

All the combinations

This is what I came up with:

  def process(path: String, targetName: String, chunkSize: Int = 10000): String \/ File = {

    val targetPath = path.replace(".DAT", "")+targetName

    val read = 
      linesRChunk(path, chunkSize) |> => lines.mkString("\n"))

    val task = 
      ((read |> process1.intersperse("\n") |> 
      process1.utf8Encode) to io.fileChunkW(targetPath)).run

    task.attemptRun.leftMap(_.getMessage).map(_ => new File(targetPath))

   * validate that the lines have the right sequence of HEADER/column names/lines/TRAILER
   * and the right number of lines
  def validateLines: Process1[Vector[String], Vector[String]] = {

    // feed lines into the lines parser with a given state
    // when it's done, follow by parsing with a new state
    def parse(lines: Vector[String], state: LineState, newState: LineState) =
      emit(lines) |> linesParser(state) fby linesParser(newState)

    // parse chunks of lines
    def linesParser(state: LineState): Process1[Vector[String], Vector[String]] = {
      receive1[Vector[String], Vector[String]] { case lines =>
        lines match {
          case first +: rest if isHeader(first) =>
            if (state.openedSection) fail("A trailer is missing")
                    LineState(lines.count(isHeader) > lines.count(isTrailer), 

          case first +: rest if isTrailer(first) =>
            val expected = "\\d+".r.findFirstIn(first).map(_.toInt).getOrElse(0)

            if (!state.openedSection)             
              fail("A header is missing")
            else if (state.lineCount != expected) 
              fail(s"expected $expected lines, got ${state.lineCount}")
            else {
              val dropped = lines.drop(1)
                    LineState(dropped.count(isHeader) > dropped.count(isTrailer), 

          case first +: rest =>
            if (!state.openedSection) fail("A header is missing")
            else {
              val (first, rest) = lines.span(line => !isTrailer(line))
              emit(first) fby
              parse(rest, state.addLines(first.size), state.addLines(lines.size))

          case Vector() => halt
    // initialise the parsing expecting a HEADER

  private def fail(message: String) = Halt(new Exception(message))
  private def isHeader(line: String) = line.startsWith("HEADER|")
  private def isTrailer(line: String) = line.startsWith("TRAILER|")

The bulk of the code is the validateLines process which verifies the file structure:

  • if the first line of this chunk is a HEADER the next line needs to be skipped, we know we opened a new section, and we feed the rest to the lines parser again. However we fail the process if we were not expecting a HEADER there

  • if the first line of this chunk is a TRAILER we do something similar but we also check the expected number of lines

  • otherwise we try to emit as many lines as possible until the next HEADER or TRAILER and we recurse

This is a bit complex because we need to analyse the first element of the chunk, then emit the rest and calculate the new state we will have when this whole chunk is emitted. On the other hand the processor is easy to test because I don't have to read or write files to check it. This would be a bit more difficult to do with the loop version.

But unfortunately not all the tests are green. One is still not passing. What if there is no ending TRAILER in the file? How can I raise an exception? There's no process to run, because there are no more lines to process! My test is pending for now, and I'll post the solution once I have it (maybe there's a smarter way to rewrite all of this?).

Is it worth it?

This was definitely worth it for me in terms of learning the scalaz-stream library. However in terms of pure programmer "productivity", for this kind of requirements, it feels like an overkill. The imperative solution is very easy to come up with and there is no problems with performances. This should change once streaming parsing is available (see the roadmap). Probably this use case will just be expressed as a one-liner. In the light of this post I'm just curious how the implementation will deal with chunking.

09 December 2013

`runState` 0 - combinators 1

In my previous blog post I was trying to implement a runState method with scalaz-stream to process a file and try to validate its internal structure. That was however not a good solution because:

  • it doesn't use combinators but a special purpose runState method
  • it stackoverflows on large files!

It turns out that there is a much better way of dealing with this use case.


First of all it is possible to propagate some state with scalaz-stream without having to write a special runState method. The following uses only combinators to do the job:

def process(path: String, targetName: String): String \/ File = {

  val HEADER  = "HEADER(.*)".r
  val TRAILER = "TRAILER\\|(\\d+)".r

  val lineOrTrailer: Process1[String, String]  = {
    def go(lines: Int): Process1[String, String] =
      receive1[String, String] {
        case TRAILER(count) => 
          if (count.toInt == lines) halt 
          else Halt(new Exception(s"Expected $count lines, but got $lines"))
        case HEADER(h)      => 
          Halt(new Exception(s"Didn't expected a HEADER here: $h"))
        case s              => 
          emit(s) fby go(lines + 1)

  val linesStructure =
    discardRegex("HEADER.*") fby
    discardLine              fby

  val read       = io.linesR(path) |> linesStructure
  val targetPath = path.replace(".DAT", "")+targetName
  val task       = 
    ((read |> process1.intersperse("\n") |> 
     process1.utf8Encode) to io.fileChunkW(targetPath)).run

  task.attemptRun.leftMap(_.getMessage).map(_ => new File(targetPath))

val discardLine = receive1[String, String] { _ => halt }

/** discard a line if it matches the expected pattern */
def discardRegex(pattern: String): Process1[String,String] = {
  val compiled = Pattern.compile(pattern)
  receive1[String, String] { line =>
    if (compiled.matcher(line).matches) halt
    else Halt(new Exception(s"Failed to parse $line, does not match regex: $pattern"))

With the code above, processing a file amounts to:

  • reading the lines
  • analysing them with linesStructure which propagates the current state, the number of lines already processed, with a recursive method (go) calling itself
  • writing the lines to a new file

The linesStructure method almost looks like a parser combinators expression with parsers sequenced with the fby ("followed by") method.

That looks pretty good but... it performs horribly! With the good-old "loop school", it took 8 seconds to process a 700M file:

def processLoop(path: String, targetName: String): String \/ File = {

  val targetPath = path.replace(".DAT", "")+targetName
  val writer = new FileWriter(targetPath)
  val source = File(path))
  var count = 0
  var skipNextLine = false
  try {
    source.getLines().foreach { line =>
      if (line.startsWith("HEADER")) skipNextLine = true
      else if (skipNextLine) { skipNextLine = false }
      else if (line.startsWith("TRAILER")) {
        val expected = line.drop(8)
        if (expected != count) throw new Exception(s"expected $expected, got $count")
      else {
        count = count + 1
  } catch {
    case t: Throwable => t.getMessage.left
  } finally {
  new File(targetPath).right

With the nice, "no-variables, no loop", method it took almost,... 8 minutes!

Chunky streaming

It is fortunately possible to retrieve correct performances by "chunking" the lines before processing them. To do this, we need a new combinator, very close to the io.linesR combinator in scalaz-stream:

// read a file, returning one "chunk" of lines at the time
def linesRChunk(filename: String, chunkSize: Int = 10000): Process[Task, Vector[String]] =
 io.resource(Task.delay( => Task.delay(src.close)) { src =>
    lazy val lines = src.getLines.sliding(chunkSize, chunkSize) // A stateful iterator
    Task.delay {
      if (lines.hasNext)
      else throw End

Now we can process each chunk with:

def process(path: String, targetName: String, bufferSize: Int = 1): String \/ File = {

  val HEADER  = "HEADER(.*)".r
  val TRAILER = "TRAILER\\|(\\d+)".r

  def linesParser(state: LineState): Process1[Vector[String], Vector[String]] = {

    def onHeader(rest: Vector[String]) =
      (emit(rest) |> linesParser(ExpectLineOrTrailer(0))) fby

    def onLines(ls: Vector[String], actual: Int) =
      emit(ls) fby linesParser(ExpectLineOrTrailer(actual + ls.size))

    def onTrailer(ls: Vector[String], count: Int, actual: Int) =
      if ((actual + ls.size) == count) emit(ls)
      else fail(new Exception(s"expected $count lines, got $actual"))

    receive1[Vector[String], Vector[String]] { case lines =>
      (lines, state) match {
        case (Vector(),                  _)                      => 
        case (HEADER(_) +: cols +: rest, ExpectHeader)           => 
        case (_,                         ExpectHeader)           => 
          fail(new Exception("expected a header"))
        case (ls :+ TRAILER(count),      ExpectLineOrTrailer(n)) =>
          onTrailer(ls, count.toInt, n)
        case (ls,                        ExpectLineOrTrailer(n)) => 
          onLines(ls, n)

  val targetPath = path.replace(".DAT", "")+targetName

  val read = linesRChunk(path, bufferSize) |> 
             linesParser(ExpectHeader).map(lines => lines.mkString("\n"))
  val task = 
    ((read |> process1.intersperse("\n") |> 
    process1.utf8Encode) to io.fileChunkW(targetPath)).run

  task.attemptRun.leftMap(_.getMessage).map(_ => new File(targetPath))

The linesParser method uses receive1 to analyse:

  • the current state: are we expecting a HEADER, or some lines followed by a TRAILER?
  • the current chunk of lines

When we expect a HEADER and we have one, we skip the row containing the column names (see onHeader), we emit the rest of the lines to the linesParser (this is the recursive call) and we change the state to ExpectLineOrTrailer. If we get some lines with no TRAILER, we emit those lines and make a recursive call to linesParser with an incremented count to signal how many lines we've emitted so far (in the onLines method). Finally, if we get some lines and a TRAILER we check that the expected number of lines is equal to the actual one before emitting the lines and stopping the processing (no more recursive call in onTrailer).

For reference, here are the state objects used to track the current processing state:

sealed trait LineState

case object ExpectHeader                            extends LineState
case class  ExpectLineOrTrailer(lineCount: Int = 0) extends LineState

This new way of processing lines gets us:

  • a readable state machine with clear transitions, which was my first objective
  • adequate performances; it takes around 10 seconds to process a 700M file which is slightly more than the processLoop version but acceptable

One other explored avenue

It took me a loooooooooooong time to get there. I think I hit this issue when trying to use the built-in chunk combinator. When using chunk, my parser was being fed the same lines several times. For a chunk of 10 lines, I first had the first line, then the first 2, then the first 3,... Even with a modified version of chunk the performances were still very bad. This is why I wrote my own linesRChunk.

Now I got something working I hope that this will boost other's development time and show that it is possible to avoid loops + variables in that case!