Initial check-in of new "performance" subcommand of Daffodil CLI,
which allows multi-threaded parsing of multiple files

DFDL-888
diff --git a/daffodil-cli/src/main/scala/edu/illinois/ncsa/daffodil/Main.scala b/daffodil-cli/src/main/scala/edu/illinois/ncsa/daffodil/Main.scala
index a218f17..8c96985 100644
--- a/daffodil-cli/src/main/scala/edu/illinois/ncsa/daffodil/Main.scala
+++ b/daffodil-cli/src/main/scala/edu/illinois/ncsa/daffodil/Main.scala
@@ -36,6 +36,7 @@
 import java.io.BufferedWriter
 import java.io.OutputStreamWriter
 import java.io.FileInputStream
+import java.io.ByteArrayInputStream
 import java.io.File
 import scala.xml.SAXParseException
 import org.rogach.scallop
@@ -60,6 +61,7 @@
 import edu.illinois.ncsa.daffodil.externalvars.ExternalVariablesLoader
 import edu.illinois.ncsa.daffodil.configuration.ConfigurationLoader
 import edu.illinois.ncsa.daffodil.api.ValidationMode
+import scala.actors.Futures
 
 class CommandLineXMLLoaderErrorHandler() extends org.xml.sax.ErrorHandler with Logging {
 
@@ -287,6 +289,48 @@
     }
 
   }
+  
+  // Performance Subcommand Options
+  val performance = new scallop.Subcommand("performance") {
+    banner("""|Usage: daffodil performance (-s <schema>... [-r <root> [-n <namespace>]] [-p <path>] |
+              |                       -P <parser>)
+              |                      [--validate [mode]]
+              |                      [-N <number to parse> -t <threadcount>]
+              |                      [-D[{namespace}]<variable>=<value>...]
+              |                      [-c <file>] <infile>
+              |
+              |Run a performance test, using either a DFDL schema or a saved parser
+              |
+              |Performance Options:""".stripMargin)
+
+    descr("run performance test")
+    helpWidth(76)
+
+    val schemas = opt[List[String]]("schema", argName = "file", descr = "the annotated DFDL schema to use to create the parser. May be supplied multiple times for multi-schema support.")(singleListArgConverter[String](a => a))
+    val root = opt[String](argName = "node", descr = "the root element of the XML file to use. This needs to be one of the top-level elements of the DFDL schema defined with --schema. Requires --schema. If not supplied uses the first element of the first schema")
+    val namespace = opt[String](argName = "ns", descr = "the namespace of the root element. Requires --root.")
+    val number = opt[Int](short = 'N', argName = "number", default = Some(1), descr = "The total number of files to parse.")
+    val threads = opt[String](short = 't', argName = "threads", default = Some("1"), descr = "The number of threads to use.")
+    val path = opt[String](argName = "path", descr = "path to the node to create parser.")
+    val parser = opt[String](short = 'P', argName = "file", descr = "use a previously saved parser.")
+    val validate = opt[ValidationMode.Type](short = 'V', default = Some(ValidationMode.Off), argName = "mode", descr = "the validation mode. 'on', 'limited' or 'off'. Defaults to 'on' if mode is not supplied.")(optionalValueConverter[ValidationMode.Type](a => validateConverter(a)).map {
+      case None => ValidationMode.Full
+      case Some(mode) => mode
+    })
+    val vars = props[String]('D', keyName = "variable", valueName = "value", descr = "variables to be used when parsing. An option namespace may be provided.")
+    val config = opt[String](short = 'c', argName = "file", descr = "path to file containing configuration items.")
+    val infile = trailArg[String](required = true, descr = "input file or directory containing files to parse.")
+
+    validateOpt(schemas, parser, root, namespace) {
+      case (Some(Nil), None, _, _) => Left("One of --schema or --parser must be defined")
+      case (Some(_ :: _), Some(_), _, _) => Left("Only one of --parser and --schema may be defined")
+      case (Some(_ :: _), None, None, Some(_)) => Left("--root must be defined if --namespace is defined")
+      case (None, Some(_), Some(_), _) => Left("--root cannot be defined with --parser")
+      case (None, Some(_), _, Some(_)) => Left("--namespace cannot be defined with --parser")
+      case _ => Right(Unit)
+    }
+
+  }
 
   // Unparse Subcommand Options
   val unparse = new scallop.Subcommand("unparse") {
@@ -643,6 +687,111 @@
         }
         rc
       }
+      
+      case Some(conf.performance) => {
+        val performanceOpts = conf.performance
+
+        val validate = performanceOpts.validate.get.get
+
+        val cfgFileNode = performanceOpts.config.get match {
+          case None => None
+          case Some(pathToConfig) => Some(this.loadConfigurationFile(pathToConfig))
+        }
+        val extVarsBindings = retrieveExternalVariables(performanceOpts.vars, cfgFileNode)
+
+        val processor = {
+          if (performanceOpts.parser.isDefined) {
+            createProcessorFromParser(performanceOpts.parser(), performanceOpts.path.get, validate)
+          } else {
+            val files: List[File] = performanceOpts.schemas().map(s => new File(s))
+            createProcessorFromSchemas(files, performanceOpts.root.get, performanceOpts.namespace.get, performanceOpts.path.get, extVarsBindings, validate)
+          }
+        }
+
+        def displayDiagnostics(lvl: LogLevel.Type, pr: WithDiagnostics) {
+          pr.getDiagnostics.foreach { d =>
+            log(lvl, "%s", d.getMessage())
+          }
+        }
+
+        val rc = processor match {
+          case Some(processor) if (processor.canProceed) => {
+            val infile = new java.io.File(performanceOpts.infile())
+
+            val files = {
+              if (infile.isDirectory()){
+                infile.listFiles.filter(!_.isDirectory)
+              } else {
+                Array(infile)
+              }
+            }
+
+            val dataSeq = files.map { filePath =>
+              val input = (new FileInputStream(filePath))
+              val dataSize = filePath.length()
+              var fileContent = new Array[Byte](dataSize.toInt)
+              input.read(fileContent)
+              (filePath, fileContent, dataSize*8)
+            }
+
+            val channels = (0 until performanceOpts.number()).map { n =>
+              val index = n % dataSeq.length
+              val (path, data, dataLen) = dataSeq(index)
+              val newArr:Array[Byte] = data.clone()
+              val bais:ByteArrayInputStream = new ByteArrayInputStream(newArr)
+              val inChannel = java.nio.channels.Channels.newChannel(bais);
+              (path, inChannel, dataLen)
+            }
+            val channelsWithIndex = channels.zipWithIndex
+            
+            processor.setValidationMode(validate)
+
+            //set number of threads in thread pool based on user input
+            //the method for doing this will change to the commented-out method
+            //below, once we upgrade to Scala 2.10 (DFDL-943)
+            System.setProperty("actors.corePoolSize", performanceOpts.threads())
+
+            /*implicit val executionContext = new ExecutionContext {
+              val threadPool = Executors.newFixedThreadPool(performanceOpts.threads())
+              def execute(runnable: Runnable) {
+                threadPool.submit(runnable)
+              }
+            }*/
+
+            val NSConvert = 1000000000.0
+
+            val (totalTime, results) = Timer.getTimeResult({
+              val tasks = channelsWithIndex.map { case (c, n) =>
+                val task = Futures.future[(Int, Long)] {
+                  val (path, channel, len) = c
+                  val (time, parseResult) = Timer.getTimeResult({processor.parse(channel, len)})
+                  (n, time)
+                }
+                task
+              }
+              val results = Futures.awaitAll(10000000L, tasks: _*)
+              results
+            })
+
+            val rates = results.map { results =>
+                val (runNum: Int, nsTime: Long) = results.get
+                val rate = 1/(nsTime/NSConvert)
+                log(LogLevel.Info, "\nrun: %d\nseconds: %f\nrate: %f\n", runNum, nsTime/NSConvert, rate)
+                rate
+            }
+
+            val sec = totalTime/NSConvert
+            printf("total parse time (sec): %f\n", sec)
+            printf("min rate (files/sec): %f\n", rates.min) 
+            printf("max rate (files/sec): %f\n", rates.max) 
+            printf("avg rate (files/sec): %f\n", (performanceOpts.number()/sec)) 
+            0
+          }
+          case Some(processor) => 1
+          case None => 1
+        }
+        rc
+      }
 
       case Some(conf.unparse) => {
         val unparseOpts = conf.unparse