Merge remote-tracking branch 'upstream/master' into fixJavaDocErr
diff --git a/scenarios/src/main/java/quarks/samples/scenarios/iotf/IotfFullScenario.java b/scenarios/src/main/java/quarks/samples/scenarios/iotf/IotfFullScenario.java
index 5944b16..f1cd408 100644
--- a/scenarios/src/main/java/quarks/samples/scenarios/iotf/IotfFullScenario.java
+++ b/scenarios/src/main/java/quarks/samples/scenarios/iotf/IotfFullScenario.java
@@ -1,3 +1,21 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
 package quarks.samples.scenarios.iotf;
 
 
diff --git a/topology/src/main/java/quarks/samples/topology/JobEventsSample.java b/topology/src/main/java/quarks/samples/topology/JobEventsSample.java
index ea879bb..df4e221 100644
--- a/topology/src/main/java/quarks/samples/topology/JobEventsSample.java
+++ b/topology/src/main/java/quarks/samples/topology/JobEventsSample.java
@@ -28,7 +28,7 @@
 import com.google.gson.JsonObject;
 
 import quarks.execution.Job;
-import quarks.execution.services.job.JobRegistryService;
+import quarks.execution.services.JobRegistryService;
 import quarks.providers.direct.DirectProvider;
 import quarks.runtime.jobregistry.JobEvents;
 import quarks.runtime.jobregistry.JobRegistry;
@@ -62,7 +62,7 @@
         ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
 
         // Monitoring app
-        sample.startMonitorApp();
+        sample.startJobMonitorApp();
 
         // Asynchronously start two applications
         executor.schedule(sample.runMonitoredApp("MonitoredApp1"), 300, TimeUnit.MILLISECONDS);
@@ -108,8 +108,8 @@
      * Monitoring application generates tuples on job registrations, removals, 
      * and on registered job updates.
      */
-    Job startMonitorApp() throws InterruptedException, ExecutionException {
-        Topology topology = dp.newTopology("MonitorApp");
+    Job startJobMonitorApp() throws InterruptedException, ExecutionException {
+        Topology topology = dp.newTopology("JobMonitorApp");
 
         TStream<JsonObject> jobEvents = JobEvents.source(
                 topology, 
diff --git a/topology/src/main/java/quarks/samples/topology/TerminateAfterNTuples.java b/topology/src/main/java/quarks/samples/topology/TerminateAfterNTuples.java
new file mode 100644
index 0000000..bffee69
--- /dev/null
+++ b/topology/src/main/java/quarks/samples/topology/TerminateAfterNTuples.java
@@ -0,0 +1,67 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package quarks.samples.topology;
+
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import quarks.providers.direct.DirectProvider;
+import quarks.topology.TStream;
+import quarks.topology.Topology;
+
+/**
+ * This application simulates a crash and terminates the JVM after processing
+ * a preset number of tuples. This application is used in conjunction with a 
+ * monitoring script to demonstrate the restart of a JVM which has terminated
+ * because of a Quarks application crash.
+ */
+public class TerminateAfterNTuples {
+    /** The application will terminate the JVM after this tuple count */
+    public final static int TERMINATE_COUNT = 15;
+    
+    public static void main(String[] args) throws Exception {
+
+        DirectProvider tp = new DirectProvider();
+
+        Topology t = tp.newTopology("PeriodicSource");
+
+        // Since this is the Direct provider the graph can access
+        // objects created while the topology is being defined
+        // (in this case the Random object r).
+        Random r = new Random();
+        TStream<Double> gaussian = t.poll(() -> r.nextGaussian(), 1, TimeUnit.SECONDS);
+
+        // Program termination
+        AtomicInteger count = new AtomicInteger(0);
+        gaussian = gaussian.peek(g -> {
+            if (count.incrementAndGet() >= TERMINATE_COUNT) {
+                System.err.println("The JVM terminates after processing " + 
+                        TERMINATE_COUNT + " tuples");
+                System.exit(1);
+            }
+        });
+
+        // Peek at the value on the Stream printing it to System.out
+        gaussian = gaussian.peek(g -> System.out.println("R:" + g));
+
+        tp.submit(t);
+    }
+}
diff --git a/utils/build.xml b/utils/build.xml
index c411e40..74f3560 100644
--- a/utils/build.xml
+++ b/utils/build.xml
@@ -25,7 +25,8 @@
   <path id="compile.classpath">
     <path refid="quarks.samples.classpath"/>
     <pathelement location="${quarks.utils}/metrics/lib/quarks.utils.metrics.jar"/>
-    <pathelement location="${quarks.analytics}/math3/ext/commons-math3-3.4.1/commons-math3-3.4.1.jar"/>
+    <pathelement location="${quarks.analytics}/math3/lib/quarks.analytics.math3.jar"/>
+    <pathelement location="${quarks.analytics}/sensors/lib/quarks.analytics.sensors.jar"/>
     <path refid="quarks.ext.classpath"/>
   </path>
 
diff --git a/utils/src/main/java/quarks/samples/utils/sensor/SimpleSimulatedSensor.java b/utils/src/main/java/quarks/samples/utils/sensor/SimpleSimulatedSensor.java
new file mode 100644
index 0000000..2ff3479
--- /dev/null
+++ b/utils/src/main/java/quarks/samples/utils/sensor/SimpleSimulatedSensor.java
@@ -0,0 +1,170 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package quarks.samples.utils.sensor;
+
+import java.text.DecimalFormat;
+import java.util.Random;
+
+import quarks.analytics.sensors.Range;
+import quarks.function.Supplier;
+
+/**
+ * A simple simulated sensor.
+ * <p>
+ * The sensor starts off with an initial value.
+ * Each call to {@link #get()} changes the current value by
+ * a random amount between plus/minus {@code deltaFactor}.
+ * The new current value is limited to a {@code range}
+ * and then rounded to 1 fractional digit. 
+ * See {@link #setNumberFractionalDigits(int)}.
+ * </p><p>
+ * Sample use:
+ * <pre>{@code
+ * Topology t = ...;
+ * // a miles-per-gallon sensor
+ * SimpleSimulatedSensor avgMpgSensor = new SimpleSimulatedSensor(10.5, 0.4,
+ *                                          Ranges<Double>.closed(7.0,14.0));
+ * TStream<Double> avgMpg = t.poll(avgMpgSensor, 1, TimeUnit.SECONDS);
+ * 
+ * // an integer valued sensor
+ * SimpleSimulatedSensor doubleSensor = new SimpleSimulatedSensor();
+ * TStream<Integer> intSensor = t.poll(() -> doubleSensor.get().intValue(),
+ *                                          1, TimeUnit.SECONDS);
+ * }</pre>
+ * </p>
+ */
+public class SimpleSimulatedSensor implements Supplier<Double> {
+    private static final long serialVersionUID = 1L;
+    private int numFracDigits;
+    private volatile DecimalFormat df;
+    private Random r = new Random();
+    private final Range<Double> range;
+    private final double deltaFactor;
+    private double currentValue;
+   
+    /**
+     * Create a sensor.
+     * <p>
+     * Same as {@code SimpleSimulatedSensor(0.0, 1.0, null)};
+     * </p>
+     */
+    public SimpleSimulatedSensor() {
+        this(0.0, 1.0, null);
+    }
+    
+    /**
+     * Create a sensor.
+     * <p>
+     * Same as {@code SimpleSimulatedSensor(initialValue, 1.0, null)};
+     * </p>
+     * @param initialValue the initial value
+     */
+    public SimpleSimulatedSensor(double initialValue) {
+        this(initialValue, 1.0, null);
+    }
+    
+    /**
+     * Create a sensor.
+     * 
+     * <p>
+     * Same as {@code SimpleSimulatedSensor(initialValue, deltaFactor, null)};
+     * </p>
+     * @param initialValue the initial value.
+     * @param deltaFactor maximum plus/minus change on each {@code get()}.
+     *              e.g., 1.0 to limit change to +/- 1.0.
+     *              Must be > 0.0
+     */
+    public SimpleSimulatedSensor(double initialValue, double deltaFactor) {
+        this(initialValue, deltaFactor, null);
+    }
+    
+    /**
+     * Create a sensor.
+     * 
+     * @param initialValue the initial value.  Must be within range.
+     * @param deltaFactor maximum plus/minus change on each {@link #get()}.
+     *              e.g., 1.0 to limit change to +/- 1.0.
+     *              Must be > 0.0
+     * @param range maximum sensor value range. Unlimited if null.
+     */
+    public SimpleSimulatedSensor(double initialValue,
+            double deltaFactor, Range<Double> range) {
+        if (range!=null && !range.contains(initialValue))
+            throw new IllegalArgumentException("initialValue");
+        if (deltaFactor <= 0.0)
+            throw new IllegalArgumentException("deltaFactor");
+        this.currentValue = initialValue;
+        this.deltaFactor = deltaFactor;
+        this.range = range;
+        setNumberFractionalDigits(1);
+    }
+    
+    /**
+     * Set number of fractional digits to round sensor values to.
+     * <p>
+     * This class offers rounding as a convenience and because
+     * ancestors of this implementation had such a scheme.
+     * </p><p>
+     * @param numFracDigits  if <= 0, no rounding will be performed
+     */
+    public void setNumberFractionalDigits(int numFracDigits) {
+        this.numFracDigits = numFracDigits;
+        if (numFracDigits <= 0) {
+            df = null;
+        }
+        else {
+            String fracPattern = "";
+            for (int i = 0; i < numFracDigits; i++)
+                fracPattern += "#";
+            df = new DecimalFormat("#."+fracPattern);
+        }
+    }
+    
+    /** Get the number of fractional digits setting */
+    public int getNumberFractionalDigits() {
+        return numFracDigits;
+    }
+    
+    /** Get the range setting */
+    public Range<Double> getRange() {
+        return range;
+    }
+    
+    /** Get the deltaFactor setting */
+    public double getDeltaFactor() {
+        return deltaFactor;
+    }
+    
+    /** Get the next sensor value as described in the class documentation. */
+    @Override
+    public Double get() {
+        double delta = 2 * r.nextDouble() - 1.0; // between -1.0 and 1.0
+        double nextValue = currentValue + delta * deltaFactor;
+        if (range!=null && !range.contains(nextValue)) {
+            nextValue = nextValue > currentValue
+                        ? range.upperEndpoint()
+                        : range.lowerEndpoint();
+        }
+        if (df != null)
+            nextValue = Double.valueOf(df.format(nextValue));
+        currentValue = nextValue;
+        return currentValue;
+    }
+}
diff --git a/utils/src/main/java/quarks/samples/utils/sensor/SimulatedTemperatureSensor.java b/utils/src/main/java/quarks/samples/utils/sensor/SimulatedTemperatureSensor.java
new file mode 100644
index 0000000..f398f87
--- /dev/null
+++ b/utils/src/main/java/quarks/samples/utils/sensor/SimulatedTemperatureSensor.java
@@ -0,0 +1,104 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package quarks.samples.utils.sensor;
+
+import java.util.Objects;
+
+import quarks.analytics.sensors.Range;
+import quarks.analytics.sensors.Ranges;
+import quarks.function.Supplier;
+
+/**
+ * A Simulated temperature sensor.
+ * <p>
+ * The sensor starts off with an initial value.
+ * Each call to {@link #get()} changes the current value by
+ * a random amount between plus/minus a {@code deltaFactor}.
+ * The new current value is limited to a {@code tempRange}
+ * and then rounded to 1 fractional digit.
+ * </p><p>
+ * No temperature scale is implied (e.g., Fahrenheit, Kelvin, ...).
+ * The {@code double} temperature values are simply generated as described.
+ * The user of the class decides how to interpret them.
+ * </p><p>
+ * Sample use:
+ * <pre>{@code
+ * Topology t = ...;
+ * SimulatedTemperatureSensor tempSensor = new SimulatedTemperatureSensor();
+ * TStream<Double> temp = t.poll(tempSensor, 1, TimeUnit.SECONDS);
+ * }</pre>
+ * </p>
+ * @see SimpleSimulatedSensor
+ */
+public class SimulatedTemperatureSensor implements Supplier<Double> {
+    private static final long serialVersionUID = 1L;
+    private final SimpleSimulatedSensor sensor;
+   
+    /**
+     * Create a temperature sensor.
+     * <p>
+     * Same as {@code SimulatedTemperatureSensor(80.0, 
+     *              Ranges.closed(28.0, 112.0), 1.0)}
+     * </p><p>
+     * These default values roughly correspond to normal air temperature
+     * in the Fahrenheit scale.
+     * </p>
+     */
+    public SimulatedTemperatureSensor() {
+        this(80.0, Ranges.closed(28.0, 112.0), 1.0);
+    }
+    
+    /**
+     * Create a temperature sensor.
+     * <p>
+     * No temperature scale is implied.
+     * </p> 
+     * @param initialTemp the initial temperature.  Must be within tempRange.
+     * @param tempRange maximum sensor value range
+     * @param deltaFactor maximum plus/minus change on each {@code get()}.
+     *              e.g., 1.0 to limit change to +/- 1.0.
+     *              Must be > 0.0
+     */
+    public SimulatedTemperatureSensor(double initialTemp,
+            Range<Double> tempRange, double deltaFactor) {
+        Objects.requireNonNull(tempRange, "tempRange");
+        if (!tempRange.contains(initialTemp))
+            throw new IllegalArgumentException("initialTemp");
+        if (deltaFactor <= 0.0)
+            throw new IllegalArgumentException("deltaFactor");
+        sensor = new SimpleSimulatedSensor(initialTemp, deltaFactor, tempRange);
+    }
+    
+    /** Get the tempRange setting */
+    public Range<Double> getTempRange() {
+        return sensor.getRange();
+    }
+    
+    /** Get the deltaFactor setting */
+    public double getDeltaFactor() {
+        return sensor.getDeltaFactor();
+    }
+    
+    /** Get the next sensor value. */
+    @Override
+    public Double get() {
+        return sensor.get();
+    }
+}