09 December 2013

`runState` 0 - combinators 1

In my previous blog post I was trying to implement a runState method with scalaz-stream to process a file and try to validate its internal structure. That was however not a good solution because:

  • it doesn't use combinators but a special purpose runState method
  • it stackoverflows on large files!

It turns out that there is a much better way of dealing with this use case.


First of all it is possible to propagate some state with scalaz-stream without having to write a special runState method. The following uses only combinators to do the job:

def process(path: String, targetName: String): String \/ File = {

  val HEADER  = "HEADER(.*)".r
  val TRAILER = "TRAILER\\|(\\d+)".r

  val lineOrTrailer: Process1[String, String]  = {
    def go(lines: Int): Process1[String, String] =
      receive1[String, String] {
        case TRAILER(count) => 
          if (count.toInt == lines) halt 
          else Halt(new Exception(s"Expected $count lines, but got $lines"))
        case HEADER(h)      => 
          Halt(new Exception(s"Didn't expected a HEADER here: $h"))
        case s              => 
          emit(s) fby go(lines + 1)

  val linesStructure =
    discardRegex("HEADER.*") fby
    discardLine              fby

  val read       = io.linesR(path) |> linesStructure
  val targetPath = path.replace(".DAT", "")+targetName
  val task       = 
    ((read |> process1.intersperse("\n") |> 
     process1.utf8Encode) to io.fileChunkW(targetPath)).run

  task.attemptRun.leftMap(_.getMessage).map(_ => new File(targetPath))

val discardLine = receive1[String, String] { _ => halt }

/** discard a line if it matches the expected pattern */
def discardRegex(pattern: String): Process1[String,String] = {
  val compiled = Pattern.compile(pattern)
  receive1[String, String] { line =>
    if (compiled.matcher(line).matches) halt
    else Halt(new Exception(s"Failed to parse $line, does not match regex: $pattern"))

With the code above, processing a file amounts to:

  • reading the lines
  • analysing them with linesStructure which propagates the current state, the number of lines already processed, with a recursive method (go) calling itself
  • writing the lines to a new file

The linesStructure method almost looks like a parser combinators expression with parsers sequenced with the fby ("followed by") method.

That looks pretty good but... it performs horribly! With the good-old "loop school", it took 8 seconds to process a 700M file:

def processLoop(path: String, targetName: String): String \/ File = {

  val targetPath = path.replace(".DAT", "")+targetName
  val writer = new FileWriter(targetPath)
  val source = File(path))
  var count = 0
  var skipNextLine = false
  try {
    source.getLines().foreach { line =>
      if (line.startsWith("HEADER")) skipNextLine = true
      else if (skipNextLine) { skipNextLine = false }
      else if (line.startsWith("TRAILER")) {
        val expected = line.drop(8)
        if (expected != count) throw new Exception(s"expected $expected, got $count")
      else {
        count = count + 1
  } catch {
    case t: Throwable => t.getMessage.left
  } finally {
  new File(targetPath).right

With the nice, "no-variables, no loop", method it took almost,... 8 minutes!

Chunky streaming

It is fortunately possible to retrieve correct performances by "chunking" the lines before processing them. To do this, we need a new combinator, very close to the io.linesR combinator in scalaz-stream:

// read a file, returning one "chunk" of lines at the time
def linesRChunk(filename: String, chunkSize: Int = 10000): Process[Task, Vector[String]] =
 io.resource(Task.delay( => Task.delay(src.close)) { src =>
    lazy val lines = src.getLines.sliding(chunkSize, chunkSize) // A stateful iterator
    Task.delay {
      if (lines.hasNext)
      else throw End

Now we can process each chunk with:

def process(path: String, targetName: String, bufferSize: Int = 1): String \/ File = {

  val HEADER  = "HEADER(.*)".r
  val TRAILER = "TRAILER\\|(\\d+)".r

  def linesParser(state: LineState): Process1[Vector[String], Vector[String]] = {

    def onHeader(rest: Vector[String]) =
      (emit(rest) |> linesParser(ExpectLineOrTrailer(0))) fby

    def onLines(ls: Vector[String], actual: Int) =
      emit(ls) fby linesParser(ExpectLineOrTrailer(actual + ls.size))

    def onTrailer(ls: Vector[String], count: Int, actual: Int) =
      if ((actual + ls.size) == count) emit(ls)
      else fail(new Exception(s"expected $count lines, got $actual"))

    receive1[Vector[String], Vector[String]] { case lines =>
      (lines, state) match {
        case (Vector(),                  _)                      => 
        case (HEADER(_) +: cols +: rest, ExpectHeader)           => 
        case (_,                         ExpectHeader)           => 
          fail(new Exception("expected a header"))
        case (ls :+ TRAILER(count),      ExpectLineOrTrailer(n)) =>
          onTrailer(ls, count.toInt, n)
        case (ls,                        ExpectLineOrTrailer(n)) => 
          onLines(ls, n)

  val targetPath = path.replace(".DAT", "")+targetName

  val read = linesRChunk(path, bufferSize) |> 
             linesParser(ExpectHeader).map(lines => lines.mkString("\n"))
  val task = 
    ((read |> process1.intersperse("\n") |> 
    process1.utf8Encode) to io.fileChunkW(targetPath)).run

  task.attemptRun.leftMap(_.getMessage).map(_ => new File(targetPath))

The linesParser method uses receive1 to analyse:

  • the current state: are we expecting a HEADER, or some lines followed by a TRAILER?
  • the current chunk of lines

When we expect a HEADER and we have one, we skip the row containing the column names (see onHeader), we emit the rest of the lines to the linesParser (this is the recursive call) and we change the state to ExpectLineOrTrailer. If we get some lines with no TRAILER, we emit those lines and make a recursive call to linesParser with an incremented count to signal how many lines we've emitted so far (in the onLines method). Finally, if we get some lines and a TRAILER we check that the expected number of lines is equal to the actual one before emitting the lines and stopping the processing (no more recursive call in onTrailer).

For reference, here are the state objects used to track the current processing state:

sealed trait LineState

case object ExpectHeader                            extends LineState
case class  ExpectLineOrTrailer(lineCount: Int = 0) extends LineState

This new way of processing lines gets us:

  • a readable state machine with clear transitions, which was my first objective
  • adequate performances; it takes around 10 seconds to process a 700M file which is slightly more than the processLoop version but acceptable

One other explored avenue

It took me a loooooooooooong time to get there. I think I hit this issue when trying to use the built-in chunk combinator. When using chunk, my parser was being fed the same lines several times. For a chunk of 10 lines, I first had the first line, then the first 2, then the first 3,... Even with a modified version of chunk the performances were still very bad. This is why I wrote my own linesRChunk.

Now I got something working I hope that this will boost other's development time and show that it is possible to avoid loops + variables in that case!

No comments: