In my previous blog post I was trying to implement a runState
method with scalaz-stream
to process a file and try to validate its internal structure. That was however not a good solution because:
- it doesn't use combinators but a special purpose
runState
method - it stackoverflows on large files!
It turns out that there is a much better way of dealing with this use case.
Combinators
First of all it is possible to propagate some state with scalaz-stream
without having to write a special runState
method. The following uses only combinators to do the job:
def process(path: String, targetName: String): String \/ File = {
val HEADER = "HEADER(.*)".r
val TRAILER = "TRAILER\\|(\\d+)".r
val lineOrTrailer: Process1[String, String] = {
def go(lines: Int): Process1[String, String] =
receive1[String, String] {
case TRAILER(count) =>
if (count.toInt == lines) halt
else Halt(new Exception(s"Expected $count lines, but got $lines"))
case HEADER(h) =>
Halt(new Exception(s"Didn't expected a HEADER here: $h"))
case s =>
emit(s) fby go(lines + 1)
}
go(0)
}
val linesStructure =
discardRegex("HEADER.*") fby
discardLine fby
lineOrTrailer
val read = io.linesR(path) |> linesStructure
val targetPath = path.replace(".DAT", "")+targetName
val task =
((read |> process1.intersperse("\n") |>
process1.utf8Encode) to io.fileChunkW(targetPath)).run
task.attemptRun.leftMap(_.getMessage).map(_ => new File(targetPath))
}
val discardLine = receive1[String, String] { _ => halt }
/** discard a line if it matches the expected pattern */
def discardRegex(pattern: String): Process1[String,String] = {
val compiled = Pattern.compile(pattern)
receive1[String, String] { line =>
if (compiled.matcher(line).matches) halt
else Halt(new Exception(s"Failed to parse $line, does not match regex: $pattern"))
}
}
With the code above, processing a file amounts to:
- reading the lines
- analysing them with
linesStructure
which propagates the current state, the number of lines already processed, with a recursive method (go
) calling itself - writing the lines to a new file
The linesStructure
method almost looks like a parser combinators expression with parsers sequenced with the fby
("followed by") method.
That looks pretty good but... it performs horribly! With the good-old "loop school", it took 8 seconds to process a 700M file:
def processLoop(path: String, targetName: String): String \/ File = {
val targetPath = path.replace(".DAT", "")+targetName
val writer = new FileWriter(targetPath)
val source = scala.io.Source.fromFile(new File(path))
var count = 0
var skipNextLine = false
try {
source.getLines().foreach { line =>
if (line.startsWith("HEADER")) skipNextLine = true
else if (skipNextLine) { skipNextLine = false }
else if (line.startsWith("TRAILER")) {
val expected = line.drop(8).headOption.map(_.toInt).getOrElse(0)
if (expected != count) throw new Exception(s"expected $expected, got $count")
}
else {
count = count + 1
writer.write(line)
}
}
} catch {
case t: Throwable => t.getMessage.left
} finally {
source.close
writer.close
}
new File(targetPath).right
}
With the nice, "no-variables, no loop", method it took almost,... 8 minutes!
Chunky streaming
It is fortunately possible to retrieve correct performances by "chunking" the lines before processing them. To do this, we need a new combinator, very close to the io.linesR
combinator in scalaz-stream
:
// read a file, returning one "chunk" of lines at the time
def linesRChunk(filename: String, chunkSize: Int = 10000): Process[Task, Vector[String]] =
io.resource(Task.delay(scala.io.Source.fromFile(filename)))(src => Task.delay(src.close)) { src =>
lazy val lines = src.getLines.sliding(chunkSize, chunkSize) // A stateful iterator
Task.delay {
if (lines.hasNext) lines.next.toVector
else throw End
}
}
Now we can process each chunk with:
def process(path: String, targetName: String, bufferSize: Int = 1): String \/ File = {
val HEADER = "HEADER(.*)".r
val TRAILER = "TRAILER\\|(\\d+)".r
def linesParser(state: LineState): Process1[Vector[String], Vector[String]] = {
def onHeader(rest: Vector[String]) =
(emit(rest) |> linesParser(ExpectLineOrTrailer(0))) fby
linesParser(ExpectLineOrTrailer(rest.size))
def onLines(ls: Vector[String], actual: Int) =
emit(ls) fby linesParser(ExpectLineOrTrailer(actual + ls.size))
def onTrailer(ls: Vector[String], count: Int, actual: Int) =
if ((actual + ls.size) == count) emit(ls)
else fail(new Exception(s"expected $count lines, got $actual"))
receive1[Vector[String], Vector[String]] { case lines =>
(lines, state) match {
case (Vector(), _) =>
halt
case (HEADER(_) +: cols +: rest, ExpectHeader) =>
onHeader(rest)
case (_, ExpectHeader) =>
fail(new Exception("expected a header"))
case (ls :+ TRAILER(count), ExpectLineOrTrailer(n)) =>
onTrailer(ls, count.toInt, n)
case (ls, ExpectLineOrTrailer(n)) =>
onLines(ls, n)
}
}
}
val targetPath = path.replace(".DAT", "")+targetName
val read = linesRChunk(path, bufferSize) |>
linesParser(ExpectHeader).map(lines => lines.mkString("\n"))
val task =
((read |> process1.intersperse("\n") |>
process1.utf8Encode) to io.fileChunkW(targetPath)).run
task.attemptRun.leftMap(_.getMessage).map(_ => new File(targetPath))
}
The linesParser
method uses receive1
to analyse:
- the current state: are we expecting a
HEADER
, or some lines followed by aTRAILER
? - the current chunk of lines
When we expect a HEADER and we have one, we skip the row containing the column names (see onHeader
), we emit
the rest of the lines to the linesParser (this is the recursive call) and we change the state to ExpectLineOrTrailer
. If we get some lines with no TRAILER
, we emit
those lines and make a recursive call to linesParser
with an incremented count to signal how many lines we've emitted so far (in the onLines
method). Finally, if we get some lines and a TRAILER
we check that the expected number of lines is equal to the actual one before emitting the lines and stopping the processing (no more recursive call in onTrailer
).
For reference, here are the state objects used to track the current processing state:
sealed trait LineState
case object ExpectHeader extends LineState
case class ExpectLineOrTrailer(lineCount: Int = 0) extends LineState
This new way of processing lines gets us:
- a readable state machine with clear transitions, which was my first objective
- adequate performances; it takes around 10 seconds to process a 700M file which is slightly more than the
processLoop
version but acceptable
One other explored avenue
It took me a loooooooooooong time to get there. I think I hit this issue when trying to use the built-in chunk
combinator. When using chunk
, my parser was being fed the same lines several times. For a chunk of 10 lines, I first had the first line, then the first 2, then the first 3,... Even with a modified version of chunk
the performances were still very bad. This is why I wrote my own linesRChunk
.
Now I got something working I hope that this will boost other's development time and show that it is possible to avoid loops + variables in that case!
No comments:
Post a Comment