Use scala 2.10 method for setting number of performance threads
This uses an execution context which is specific to the futures created
in Main.scala, rather than modifing a JVM parameter which would be
global, and potentially affect other things running in the JVM.
DFDL-943
diff --git a/daffodil-cli/src/main/scala/edu/illinois/ncsa/daffodil/Main.scala b/daffodil-cli/src/main/scala/edu/illinois/ncsa/daffodil/Main.scala
index d34f0f4..a0748cc 100644
--- a/daffodil-cli/src/main/scala/edu/illinois/ncsa/daffodil/Main.scala
+++ b/daffodil-cli/src/main/scala/edu/illinois/ncsa/daffodil/Main.scala
@@ -63,6 +63,8 @@
import edu.illinois.ncsa.daffodil.api.ValidationMode
import scala.language.reflectiveCalls
import scala.concurrent.Future
+import java.util.concurrent.Executors
+import scala.concurrent.ExecutionContext
import scala.reflect.runtime.universe._
import org.rogach.scallop.ScallopOption
import scala.concurrent.duration.Duration
@@ -323,7 +325,7 @@
val root = opt[String](argName = "node", descr = "the root element of the XML file to use. This needs to be one of the top-level elements of the DFDL schema defined with --schema. Requires --schema. If not supplied uses the first element of the first schema")
val namespace = opt[String](argName = "ns", descr = "the namespace of the root element. Requires --root.")
val number = opt[Int](short = 'N', argName = "number", default = Some(1), descr = "The total number of files to parse.")
- val threads = opt[String](short = 't', argName = "threads", default = Some("1"), descr = "The number of threads to use.")
+ val threads = opt[Int](short = 't', argName = "threads", default = Some(1), descr = "The number of threads to use.")
val path = opt[String](argName = "path", descr = "path to the node to create parser.")
val parser = opt[String](short = 'P', argName = "file", descr = "use a previously saved parser.")
val validate: ScallopOption[ValidationMode.Type] = opt[ValidationMode.Type](short = 'V', default = Some(ValidationMode.Off), argName = "mode", descr = "the validation mode. 'on', 'limited' or 'off'. Defaults to 'on' if mode is not supplied.")(optionalValueConverter[ValidationMode.Type](a => validateConverter(a)).map {
@@ -759,46 +761,49 @@
processor.setValidationMode(validate)
- //set number of threads in thread pool based on user input
- //the method for doing this will change to the commented-out method
- //below, once we upgrade to Scala 2.10 (DFDL-943)
- System.setProperty("actors.corePoolSize", performanceOpts.threads())
-
- /*implicit val executionContext = new ExecutionContext {
+ implicit val executionContext = new ExecutionContext {
val threadPool = Executors.newFixedThreadPool(performanceOpts.threads())
+
def execute(runnable: Runnable) {
threadPool.submit(runnable)
}
- }*/
+
+ def reportFailure(t: Throwable) {}
+ }
val NSConvert = 1000000000.0
- import scala.concurrent.ExecutionContext.Implicits.global
val (totalTime, results) = Timer.getTimeResult({
val tasks = channelsWithIndex.map { case (c, n) =>
- val task: Future[(Int, Long)] = Future {
+ val task: Future[(Int, Long, Boolean)] = Future {
val (path, channel, len) = c
val (time, parseResult) = Timer.getTimeResult({processor.parse(channel, len)})
- (n, time)
+ (n, time, parseResult.isError)
}
task
}
- val results = tasks.map{ Await.result(_, Duration.fromNanos(10000000L)) }
+ val results = tasks.map{ Await.result(_, Duration(10, "seconds")) }
results
})
val rates = results.map { results =>
- val (runNum: Int, nsTime: Long) = results
+ val (runNum: Int, nsTime: Long, error: Boolean) = results
val rate = 1/(nsTime/NSConvert)
- log(LogLevel.Info, "\nrun: %d\nseconds: %f\nrate: %f\n", runNum, nsTime/NSConvert, rate)
+ log(LogLevel.Info, "run: %d, seconds: %f, rate: %f, status: %s", runNum, nsTime/NSConvert, rate, if (error) "fail" else "pass" )
rate
}
+ val numFailures = results.map { _._3 }.filter { e => e }.length
+ if (numFailures > 0) {
+ log(LogLevel.Error, "%d failures found\n", numFailures)
+ }
+
val sec = totalTime / NSConvert
printf("total parse time (sec): %f\n", sec)
printf("min rate (files/sec): %f\n", rates.min)
printf("max rate (files/sec): %f\n", rates.max)
printf("avg rate (files/sec): %f\n", (performanceOpts.number()/sec))
- 0
+
+ numFailures
}
case Some(processor) => 1
case None => 1