This series of posts feels like a whole saga for something which should have a quick an easy way to demonstrate the obvious superiority of functional programming over a simple loop. In the first post. Then the second post was about defining proper scalaz-stream combinators to do the same thing, and particularly how to "chunk" the processing in order to get good performances.
However as I was writing unit tests for my requirements I realized that the problem was harder than I thought. In particular, the files I'm processing can have several sections made of HEADERs and TRAILERs. When you create chunks of lines to process this results in a number of combinations that need to be analysed. A chunk can:
- start with a
HEADERbut not finish with aTRAILERwhich is in another chunk - contain lines only
- contains lines + a
TRAILER+ a newHEADER - and so on...
For each of these cases it is necessary to use the current state and the contents of the lines to determine if the file is malformed or not. This is a lot less easy that previously.
All the combinations
This is what I came up with:
def process(path: String, targetName: String, chunkSize: Int = 10000): String \/ File = {
val targetPath = path.replace(".DAT", "")+targetName
val read =
linesRChunk(path, chunkSize) |>
validateLines.map(lines => lines.mkString("\n"))
val task =
((read |> process1.intersperse("\n") |>
process1.utf8Encode) to io.fileChunkW(targetPath)).run
task.attemptRun.leftMap(_.getMessage).map(_ => new File(targetPath))
}
/**
* validate that the lines have the right sequence of HEADER/column names/lines/TRAILER
* and the right number of lines
*/
def validateLines: Process1[Vector[String], Vector[String]] = {
// feed lines into the lines parser with a given state
// when it's done, follow by parsing with a new state
def parse(lines: Vector[String], state: LineState, newState: LineState) =
emit(lines) |> linesParser(state) fby linesParser(newState)
// parse chunks of lines
def linesParser(state: LineState): Process1[Vector[String], Vector[String]] = {
receive1[Vector[String], Vector[String]] { case lines =>
lines match {
case first +: rest if isHeader(first) =>
if (state.openedSection) fail("A trailer is missing")
else
parse(lines.drop(2),
state.open,
LineState(lines.count(isHeader) > lines.count(isTrailer),
lines.drop(2).size))
case first +: rest if isTrailer(first) =>
val expected = "\\d+".r.findFirstIn(first).map(_.toInt).getOrElse(0)
if (!state.openedSection)
fail("A header is missing")
else if (state.lineCount != expected)
fail(s"expected $expected lines, got ${state.lineCount}")
else {
val dropped = lines.drop(1)
parse(dropped,
state.restart,
LineState(dropped.count(isHeader) > dropped.count(isTrailer),
dropped.size))
}
case first +: rest =>
if (!state.openedSection) fail("A header is missing")
else {
val (first, rest) = lines.span(line => !isTrailer(line))
emit(first) fby
parse(rest, state.addLines(first.size), state.addLines(lines.size))
}
case Vector() => halt
}
}
}
// initialise the parsing expecting a HEADER
linesParser(LineState())
}
private def fail(message: String) = Halt(new Exception(message))
private def isHeader(line: String) = line.startsWith("HEADER|")
private def isTrailer(line: String) = line.startsWith("TRAILER|")
The bulk of the code is the validateLines process which verifies the file structure:
if the first line of this chunk is a
HEADERthe next line needs to be skipped, we know we opened a new section, and we feed the rest to the lines parser again. However wefailthe process if we were not expecting aHEADERthereif the first line of this chunk is a
TRAILERwe do something similar but we also check the expected number of linesotherwise we try to emit as many lines as possible until the next
HEADERorTRAILERand we recurse
This is a bit complex because we need to analyse the first element of the chunk, then emit the rest and calculate the new state we will have when this whole chunk is emitted. On the other hand the processor is easy to test because I don't have to read or write files to check it. This would be a bit more difficult to do with the loop version.
But unfortunately not all the tests are green. One is still not passing. What if there is no ending TRAILER in the file? How can I raise an exception? There's no process to run, because there are no more lines to process! My test is pending for now, and I'll post the solution once I have it (maybe there's a smarter way to rewrite all of this?).
Is it worth it?
This was definitely worth it for me in terms of learning the scalaz-stream library. However in terms of pure programmer "productivity", for this kind of requirements, it feels like an overkill. The imperative solution is very easy to come up with and there is no problems with performances. This should change once streaming parsing is available (see the roadmap). Probably this use case will just be expressed as a one-liner. In the light of this post I'm just curious how the implementation will deal with chunking.