This series of posts feels like a whole saga for something which should have a quick an easy way to demonstrate the obvious superiority of functional programming over a simple loop. In the first post. Then the second post was about defining proper scalaz-stream
combinators to do the same thing, and particularly how to "chunk" the processing in order to get good performances.
However as I was writing unit tests for my requirements I realized that the problem was harder than I thought. In particular, the files I'm processing can have several sections made of HEADERs
and TRAILERs
. When you create chunks of lines to process this results in a number of combinations that need to be analysed. A chunk can:
- start with a
HEADER
but not finish with aTRAILER
which is in another chunk - contain lines only
- contains lines + a
TRAILER
+ a newHEADER
- and so on...
For each of these cases it is necessary to use the current state and the contents of the lines to determine if the file is malformed or not. This is a lot less easy that previously.
All the combinations
This is what I came up with:
def process(path: String, targetName: String, chunkSize: Int = 10000): String \/ File = {
val targetPath = path.replace(".DAT", "")+targetName
val read =
linesRChunk(path, chunkSize) |>
validateLines.map(lines => lines.mkString("\n"))
val task =
((read |> process1.intersperse("\n") |>
process1.utf8Encode) to io.fileChunkW(targetPath)).run
task.attemptRun.leftMap(_.getMessage).map(_ => new File(targetPath))
}
/**
* validate that the lines have the right sequence of HEADER/column names/lines/TRAILER
* and the right number of lines
*/
def validateLines: Process1[Vector[String], Vector[String]] = {
// feed lines into the lines parser with a given state
// when it's done, follow by parsing with a new state
def parse(lines: Vector[String], state: LineState, newState: LineState) =
emit(lines) |> linesParser(state) fby linesParser(newState)
// parse chunks of lines
def linesParser(state: LineState): Process1[Vector[String], Vector[String]] = {
receive1[Vector[String], Vector[String]] { case lines =>
lines match {
case first +: rest if isHeader(first) =>
if (state.openedSection) fail("A trailer is missing")
else
parse(lines.drop(2),
state.open,
LineState(lines.count(isHeader) > lines.count(isTrailer),
lines.drop(2).size))
case first +: rest if isTrailer(first) =>
val expected = "\\d+".r.findFirstIn(first).map(_.toInt).getOrElse(0)
if (!state.openedSection)
fail("A header is missing")
else if (state.lineCount != expected)
fail(s"expected $expected lines, got ${state.lineCount}")
else {
val dropped = lines.drop(1)
parse(dropped,
state.restart,
LineState(dropped.count(isHeader) > dropped.count(isTrailer),
dropped.size))
}
case first +: rest =>
if (!state.openedSection) fail("A header is missing")
else {
val (first, rest) = lines.span(line => !isTrailer(line))
emit(first) fby
parse(rest, state.addLines(first.size), state.addLines(lines.size))
}
case Vector() => halt
}
}
}
// initialise the parsing expecting a HEADER
linesParser(LineState())
}
private def fail(message: String) = Halt(new Exception(message))
private def isHeader(line: String) = line.startsWith("HEADER|")
private def isTrailer(line: String) = line.startsWith("TRAILER|")
The bulk of the code is the validateLines
process which verifies the file structure:
if the first line of this chunk is a
HEADER
the next line needs to be skipped, we know we opened a new section, and we feed the rest to the lines parser again. However wefail
the process if we were not expecting aHEADER
thereif the first line of this chunk is a
TRAILER
we do something similar but we also check the expected number of linesotherwise we try to emit as many lines as possible until the next
HEADER
orTRAILER
and we recurse
This is a bit complex because we need to analyse the first element of the chunk, then emit the rest and calculate the new state we will have when this whole chunk is emitted. On the other hand the processor is easy to test because I don't have to read or write files to check it. This would be a bit more difficult to do with the loop version.
But unfortunately not all the tests are green. One is still not passing. What if there is no ending TRAILER
in the file? How can I raise an exception? There's no process to run, because there are no more lines to process! My test is pending for now, and I'll post the solution once I have it (maybe there's a smarter way to rewrite all of this?).
Is it worth it?
This was definitely worth it for me in terms of learning the scalaz-stream
library. However in terms of pure programmer "productivity", for this kind of requirements, it feels like an overkill. The imperative solution is very easy to come up with and there is no problems with performances. This should change once streaming parsing is available (see the roadmap). Probably this use case will just be expressed as a one-liner. In the light of this post I'm just curious how the implementation will deal with chunking.