Initial check-in of new "performance" subcommand of Daffodil CLI,
which allows multi-threaded parsing of multiple files
DFDL-888
diff --git a/daffodil-cli/src/main/scala/edu/illinois/ncsa/daffodil/Main.scala b/daffodil-cli/src/main/scala/edu/illinois/ncsa/daffodil/Main.scala
index a218f17..8c96985 100644
--- a/daffodil-cli/src/main/scala/edu/illinois/ncsa/daffodil/Main.scala
+++ b/daffodil-cli/src/main/scala/edu/illinois/ncsa/daffodil/Main.scala
@@ -36,6 +36,7 @@
import java.io.BufferedWriter
import java.io.OutputStreamWriter
import java.io.FileInputStream
+import java.io.ByteArrayInputStream
import java.io.File
import scala.xml.SAXParseException
import org.rogach.scallop
@@ -60,6 +61,7 @@
import edu.illinois.ncsa.daffodil.externalvars.ExternalVariablesLoader
import edu.illinois.ncsa.daffodil.configuration.ConfigurationLoader
import edu.illinois.ncsa.daffodil.api.ValidationMode
+import scala.actors.Futures
class CommandLineXMLLoaderErrorHandler() extends org.xml.sax.ErrorHandler with Logging {
@@ -287,6 +289,48 @@
}
}
+
+ // Performance Subcommand Options
+ val performance = new scallop.Subcommand("performance") {
+ banner("""|Usage: daffodil performance (-s <schema>... [-r <root> [-n <namespace>]] [-p <path>] |
+ | -P <parser>)
+ | [--validate [mode]]
+ | [-N <number to parse> -t <threadcount>]
+ | [-D[{namespace}]<variable>=<value>...]
+ | [-c <file>] <infile>
+ |
+ |Run a performance test, using either a DFDL schema or a saved parser
+ |
+ |Performance Options:""".stripMargin)
+
+ descr("run performance test")
+ helpWidth(76)
+
+ val schemas = opt[List[String]]("schema", argName = "file", descr = "the annotated DFDL schema to use to create the parser. May be supplied multiple times for multi-schema support.")(singleListArgConverter[String](a => a))
+ val root = opt[String](argName = "node", descr = "the root element of the XML file to use. This needs to be one of the top-level elements of the DFDL schema defined with --schema. Requires --schema. If not supplied uses the first element of the first schema")
+ val namespace = opt[String](argName = "ns", descr = "the namespace of the root element. Requires --root.")
+ val number = opt[Int](short = 'N', argName = "number", default = Some(1), descr = "The total number of files to parse.")
+ val threads = opt[String](short = 't', argName = "threads", default = Some("1"), descr = "The number of threads to use.")
+ val path = opt[String](argName = "path", descr = "path to the node to create parser.")
+ val parser = opt[String](short = 'P', argName = "file", descr = "use a previously saved parser.")
+ val validate = opt[ValidationMode.Type](short = 'V', default = Some(ValidationMode.Off), argName = "mode", descr = "the validation mode. 'on', 'limited' or 'off'. Defaults to 'on' if mode is not supplied.")(optionalValueConverter[ValidationMode.Type](a => validateConverter(a)).map {
+ case None => ValidationMode.Full
+ case Some(mode) => mode
+ })
+ val vars = props[String]('D', keyName = "variable", valueName = "value", descr = "variables to be used when parsing. An option namespace may be provided.")
+ val config = opt[String](short = 'c', argName = "file", descr = "path to file containing configuration items.")
+ val infile = trailArg[String](required = true, descr = "input file or directory containing files to parse.")
+
+ validateOpt(schemas, parser, root, namespace) {
+ case (Some(Nil), None, _, _) => Left("One of --schema or --parser must be defined")
+ case (Some(_ :: _), Some(_), _, _) => Left("Only one of --parser and --schema may be defined")
+ case (Some(_ :: _), None, None, Some(_)) => Left("--root must be defined if --namespace is defined")
+ case (None, Some(_), Some(_), _) => Left("--root cannot be defined with --parser")
+ case (None, Some(_), _, Some(_)) => Left("--namespace cannot be defined with --parser")
+ case _ => Right(Unit)
+ }
+
+ }
// Unparse Subcommand Options
val unparse = new scallop.Subcommand("unparse") {
@@ -643,6 +687,111 @@
}
rc
}
+
+ case Some(conf.performance) => {
+ val performanceOpts = conf.performance
+
+ val validate = performanceOpts.validate.get.get
+
+ val cfgFileNode = performanceOpts.config.get match {
+ case None => None
+ case Some(pathToConfig) => Some(this.loadConfigurationFile(pathToConfig))
+ }
+ val extVarsBindings = retrieveExternalVariables(performanceOpts.vars, cfgFileNode)
+
+ val processor = {
+ if (performanceOpts.parser.isDefined) {
+ createProcessorFromParser(performanceOpts.parser(), performanceOpts.path.get, validate)
+ } else {
+ val files: List[File] = performanceOpts.schemas().map(s => new File(s))
+ createProcessorFromSchemas(files, performanceOpts.root.get, performanceOpts.namespace.get, performanceOpts.path.get, extVarsBindings, validate)
+ }
+ }
+
+ def displayDiagnostics(lvl: LogLevel.Type, pr: WithDiagnostics) {
+ pr.getDiagnostics.foreach { d =>
+ log(lvl, "%s", d.getMessage())
+ }
+ }
+
+ val rc = processor match {
+ case Some(processor) if (processor.canProceed) => {
+ val infile = new java.io.File(performanceOpts.infile())
+
+ val files = {
+ if (infile.isDirectory()){
+ infile.listFiles.filter(!_.isDirectory)
+ } else {
+ Array(infile)
+ }
+ }
+
+ val dataSeq = files.map { filePath =>
+ val input = (new FileInputStream(filePath))
+ val dataSize = filePath.length()
+ var fileContent = new Array[Byte](dataSize.toInt)
+ input.read(fileContent)
+ (filePath, fileContent, dataSize*8)
+ }
+
+ val channels = (0 until performanceOpts.number()).map { n =>
+ val index = n % dataSeq.length
+ val (path, data, dataLen) = dataSeq(index)
+ val newArr:Array[Byte] = data.clone()
+ val bais:ByteArrayInputStream = new ByteArrayInputStream(newArr)
+ val inChannel = java.nio.channels.Channels.newChannel(bais);
+ (path, inChannel, dataLen)
+ }
+ val channelsWithIndex = channels.zipWithIndex
+
+ processor.setValidationMode(validate)
+
+ //set number of threads in thread pool based on user input
+ //the method for doing this will change to the commented-out method
+ //below, once we upgrade to Scala 2.10 (DFDL-943)
+ System.setProperty("actors.corePoolSize", performanceOpts.threads())
+
+ /*implicit val executionContext = new ExecutionContext {
+ val threadPool = Executors.newFixedThreadPool(performanceOpts.threads())
+ def execute(runnable: Runnable) {
+ threadPool.submit(runnable)
+ }
+ }*/
+
+ val NSConvert = 1000000000.0
+
+ val (totalTime, results) = Timer.getTimeResult({
+ val tasks = channelsWithIndex.map { case (c, n) =>
+ val task = Futures.future[(Int, Long)] {
+ val (path, channel, len) = c
+ val (time, parseResult) = Timer.getTimeResult({processor.parse(channel, len)})
+ (n, time)
+ }
+ task
+ }
+ val results = Futures.awaitAll(10000000L, tasks: _*)
+ results
+ })
+
+ val rates = results.map { results =>
+ val (runNum: Int, nsTime: Long) = results.get
+ val rate = 1/(nsTime/NSConvert)
+ log(LogLevel.Info, "\nrun: %d\nseconds: %f\nrate: %f\n", runNum, nsTime/NSConvert, rate)
+ rate
+ }
+
+ val sec = totalTime/NSConvert
+ printf("total parse time (sec): %f\n", sec)
+ printf("min rate (files/sec): %f\n", rates.min)
+ printf("max rate (files/sec): %f\n", rates.max)
+ printf("avg rate (files/sec): %f\n", (performanceOpts.number()/sec))
+ 0
+ }
+ case Some(processor) => 1
+ case None => 1
+ }
+ rc
+ }
case Some(conf.unparse) => {
val unparseOpts = conf.unparse