Merge remote-tracking branch 'upstream/master' into fixJavaDocErr
diff --git a/scenarios/src/main/java/quarks/samples/scenarios/iotf/IotfFullScenario.java b/scenarios/src/main/java/quarks/samples/scenarios/iotf/IotfFullScenario.java
index 5944b16..f1cd408 100644
--- a/scenarios/src/main/java/quarks/samples/scenarios/iotf/IotfFullScenario.java
+++ b/scenarios/src/main/java/quarks/samples/scenarios/iotf/IotfFullScenario.java
@@ -1,3 +1,21 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
package quarks.samples.scenarios.iotf;
diff --git a/topology/src/main/java/quarks/samples/topology/JobEventsSample.java b/topology/src/main/java/quarks/samples/topology/JobEventsSample.java
index ea879bb..df4e221 100644
--- a/topology/src/main/java/quarks/samples/topology/JobEventsSample.java
+++ b/topology/src/main/java/quarks/samples/topology/JobEventsSample.java
@@ -28,7 +28,7 @@
import com.google.gson.JsonObject;
import quarks.execution.Job;
-import quarks.execution.services.job.JobRegistryService;
+import quarks.execution.services.JobRegistryService;
import quarks.providers.direct.DirectProvider;
import quarks.runtime.jobregistry.JobEvents;
import quarks.runtime.jobregistry.JobRegistry;
@@ -62,7 +62,7 @@
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
// Monitoring app
- sample.startMonitorApp();
+ sample.startJobMonitorApp();
// Asynchronously start two applications
executor.schedule(sample.runMonitoredApp("MonitoredApp1"), 300, TimeUnit.MILLISECONDS);
@@ -108,8 +108,8 @@
* Monitoring application generates tuples on job registrations, removals,
* and on registered job updates.
*/
- Job startMonitorApp() throws InterruptedException, ExecutionException {
- Topology topology = dp.newTopology("MonitorApp");
+ Job startJobMonitorApp() throws InterruptedException, ExecutionException {
+ Topology topology = dp.newTopology("JobMonitorApp");
TStream<JsonObject> jobEvents = JobEvents.source(
topology,
diff --git a/topology/src/main/java/quarks/samples/topology/TerminateAfterNTuples.java b/topology/src/main/java/quarks/samples/topology/TerminateAfterNTuples.java
new file mode 100644
index 0000000..bffee69
--- /dev/null
+++ b/topology/src/main/java/quarks/samples/topology/TerminateAfterNTuples.java
@@ -0,0 +1,67 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package quarks.samples.topology;
+
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import quarks.providers.direct.DirectProvider;
+import quarks.topology.TStream;
+import quarks.topology.Topology;
+
+/**
+ * This application simulates a crash and terminates the JVM after processing
+ * a preset number of tuples. This application is used in conjunction with a
+ * monitoring script to demonstrate the restart of a JVM which has terminated
+ * because of a Quarks application crash.
+ */
+public class TerminateAfterNTuples {
+ /** The application will terminate the JVM after this tuple count */
+ public final static int TERMINATE_COUNT = 15;
+
+ public static void main(String[] args) throws Exception {
+
+ DirectProvider tp = new DirectProvider();
+
+ Topology t = tp.newTopology("PeriodicSource");
+
+ // Since this is the Direct provider the graph can access
+ // objects created while the topology is being defined
+ // (in this case the Random object r).
+ Random r = new Random();
+ TStream<Double> gaussian = t.poll(() -> r.nextGaussian(), 1, TimeUnit.SECONDS);
+
+ // Program termination
+ AtomicInteger count = new AtomicInteger(0);
+ gaussian = gaussian.peek(g -> {
+ if (count.incrementAndGet() >= TERMINATE_COUNT) {
+ System.err.println("The JVM terminates after processing " +
+ TERMINATE_COUNT + " tuples");
+ System.exit(1);
+ }
+ });
+
+ // Peek at the value on the Stream printing it to System.out
+ gaussian = gaussian.peek(g -> System.out.println("R:" + g));
+
+ tp.submit(t);
+ }
+}
diff --git a/utils/build.xml b/utils/build.xml
index c411e40..74f3560 100644
--- a/utils/build.xml
+++ b/utils/build.xml
@@ -25,7 +25,8 @@
<path id="compile.classpath">
<path refid="quarks.samples.classpath"/>
<pathelement location="${quarks.utils}/metrics/lib/quarks.utils.metrics.jar"/>
- <pathelement location="${quarks.analytics}/math3/ext/commons-math3-3.4.1/commons-math3-3.4.1.jar"/>
+ <pathelement location="${quarks.analytics}/math3/lib/quarks.analytics.math3.jar"/>
+ <pathelement location="${quarks.analytics}/sensors/lib/quarks.analytics.sensors.jar"/>
<path refid="quarks.ext.classpath"/>
</path>
diff --git a/utils/src/main/java/quarks/samples/utils/sensor/SimpleSimulatedSensor.java b/utils/src/main/java/quarks/samples/utils/sensor/SimpleSimulatedSensor.java
new file mode 100644
index 0000000..2ff3479
--- /dev/null
+++ b/utils/src/main/java/quarks/samples/utils/sensor/SimpleSimulatedSensor.java
@@ -0,0 +1,170 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package quarks.samples.utils.sensor;
+
+import java.text.DecimalFormat;
+import java.util.Random;
+
+import quarks.analytics.sensors.Range;
+import quarks.function.Supplier;
+
+/**
+ * A simple simulated sensor.
+ * <p>
+ * The sensor starts off with an initial value.
+ * Each call to {@link #get()} changes the current value by
+ * a random amount between plus/minus {@code deltaFactor}.
+ * The new current value is limited to a {@code range}
+ * and then rounded to 1 fractional digit.
+ * See {@link #setNumberFractionalDigits(int)}.
+ * </p><p>
+ * Sample use:
+ * <pre>{@code
+ * Topology t = ...;
+ * // a miles-per-gallon sensor
+ * SimpleSimulatedSensor avgMpgSensor = new SimpleSimulatedSensor(10.5, 0.4,
+ * Ranges<Double>.closed(7.0,14.0));
+ * TStream<Double> avgMpg = t.poll(avgMpgSensor, 1, TimeUnit.SECONDS);
+ *
+ * // an integer valued sensor
+ * SimpleSimulatedSensor doubleSensor = new SimpleSimulatedSensor();
+ * TStream<Integer> intSensor = t.poll(() -> doubleSensor.get().intValue(),
+ * 1, TimeUnit.SECONDS);
+ * }</pre>
+ * </p>
+ */
+public class SimpleSimulatedSensor implements Supplier<Double> {
+ private static final long serialVersionUID = 1L;
+ private int numFracDigits;
+ private volatile DecimalFormat df;
+ private Random r = new Random();
+ private final Range<Double> range;
+ private final double deltaFactor;
+ private double currentValue;
+
+ /**
+ * Create a sensor.
+ * <p>
+ * Same as {@code SimpleSimulatedSensor(0.0, 1.0, null)};
+ * </p>
+ */
+ public SimpleSimulatedSensor() {
+ this(0.0, 1.0, null);
+ }
+
+ /**
+ * Create a sensor.
+ * <p>
+ * Same as {@code SimpleSimulatedSensor(initialValue, 1.0, null)};
+ * </p>
+ * @param initialValue the initial value
+ */
+ public SimpleSimulatedSensor(double initialValue) {
+ this(initialValue, 1.0, null);
+ }
+
+ /**
+ * Create a sensor.
+ *
+ * <p>
+ * Same as {@code SimpleSimulatedSensor(initialValue, deltaFactor, null)};
+ * </p>
+ * @param initialValue the initial value.
+ * @param deltaFactor maximum plus/minus change on each {@code get()}.
+ * e.g., 1.0 to limit change to +/- 1.0.
+ * Must be > 0.0
+ */
+ public SimpleSimulatedSensor(double initialValue, double deltaFactor) {
+ this(initialValue, deltaFactor, null);
+ }
+
+ /**
+ * Create a sensor.
+ *
+ * @param initialValue the initial value. Must be within range.
+ * @param deltaFactor maximum plus/minus change on each {@link #get()}.
+ * e.g., 1.0 to limit change to +/- 1.0.
+ * Must be > 0.0
+ * @param range maximum sensor value range. Unlimited if null.
+ */
+ public SimpleSimulatedSensor(double initialValue,
+ double deltaFactor, Range<Double> range) {
+ if (range!=null && !range.contains(initialValue))
+ throw new IllegalArgumentException("initialValue");
+ if (deltaFactor <= 0.0)
+ throw new IllegalArgumentException("deltaFactor");
+ this.currentValue = initialValue;
+ this.deltaFactor = deltaFactor;
+ this.range = range;
+ setNumberFractionalDigits(1);
+ }
+
+ /**
+ * Set number of fractional digits to round sensor values to.
+ * <p>
+ * This class offers rounding as a convenience and because
+ * ancestors of this implementation had such a scheme.
+ * </p><p>
+ * @param numFracDigits if <= 0, no rounding will be performed
+ */
+ public void setNumberFractionalDigits(int numFracDigits) {
+ this.numFracDigits = numFracDigits;
+ if (numFracDigits <= 0) {
+ df = null;
+ }
+ else {
+ String fracPattern = "";
+ for (int i = 0; i < numFracDigits; i++)
+ fracPattern += "#";
+ df = new DecimalFormat("#."+fracPattern);
+ }
+ }
+
+ /** Get the number of fractional digits setting */
+ public int getNumberFractionalDigits() {
+ return numFracDigits;
+ }
+
+ /** Get the range setting */
+ public Range<Double> getRange() {
+ return range;
+ }
+
+ /** Get the deltaFactor setting */
+ public double getDeltaFactor() {
+ return deltaFactor;
+ }
+
+ /** Get the next sensor value as described in the class documentation. */
+ @Override
+ public Double get() {
+ double delta = 2 * r.nextDouble() - 1.0; // between -1.0 and 1.0
+ double nextValue = currentValue + delta * deltaFactor;
+ if (range!=null && !range.contains(nextValue)) {
+ nextValue = nextValue > currentValue
+ ? range.upperEndpoint()
+ : range.lowerEndpoint();
+ }
+ if (df != null)
+ nextValue = Double.valueOf(df.format(nextValue));
+ currentValue = nextValue;
+ return currentValue;
+ }
+}
diff --git a/utils/src/main/java/quarks/samples/utils/sensor/SimulatedTemperatureSensor.java b/utils/src/main/java/quarks/samples/utils/sensor/SimulatedTemperatureSensor.java
new file mode 100644
index 0000000..f398f87
--- /dev/null
+++ b/utils/src/main/java/quarks/samples/utils/sensor/SimulatedTemperatureSensor.java
@@ -0,0 +1,104 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package quarks.samples.utils.sensor;
+
+import java.util.Objects;
+
+import quarks.analytics.sensors.Range;
+import quarks.analytics.sensors.Ranges;
+import quarks.function.Supplier;
+
+/**
+ * A Simulated temperature sensor.
+ * <p>
+ * The sensor starts off with an initial value.
+ * Each call to {@link #get()} changes the current value by
+ * a random amount between plus/minus a {@code deltaFactor}.
+ * The new current value is limited to a {@code tempRange}
+ * and then rounded to 1 fractional digit.
+ * </p><p>
+ * No temperature scale is implied (e.g., Fahrenheit, Kelvin, ...).
+ * The {@code double} temperature values are simply generated as described.
+ * The user of the class decides how to interpret them.
+ * </p><p>
+ * Sample use:
+ * <pre>{@code
+ * Topology t = ...;
+ * SimulatedTemperatureSensor tempSensor = new SimulatedTemperatureSensor();
+ * TStream<Double> temp = t.poll(tempSensor, 1, TimeUnit.SECONDS);
+ * }</pre>
+ * </p>
+ * @see SimpleSimulatedSensor
+ */
+public class SimulatedTemperatureSensor implements Supplier<Double> {
+ private static final long serialVersionUID = 1L;
+ private final SimpleSimulatedSensor sensor;
+
+ /**
+ * Create a temperature sensor.
+ * <p>
+ * Same as {@code SimulatedTemperatureSensor(80.0,
+ * Ranges.closed(28.0, 112.0), 1.0)}
+ * </p><p>
+ * These default values roughly correspond to normal air temperature
+ * in the Fahrenheit scale.
+ * </p>
+ */
+ public SimulatedTemperatureSensor() {
+ this(80.0, Ranges.closed(28.0, 112.0), 1.0);
+ }
+
+ /**
+ * Create a temperature sensor.
+ * <p>
+ * No temperature scale is implied.
+ * </p>
+ * @param initialTemp the initial temperature. Must be within tempRange.
+ * @param tempRange maximum sensor value range
+ * @param deltaFactor maximum plus/minus change on each {@code get()}.
+ * e.g., 1.0 to limit change to +/- 1.0.
+ * Must be > 0.0
+ */
+ public SimulatedTemperatureSensor(double initialTemp,
+ Range<Double> tempRange, double deltaFactor) {
+ Objects.requireNonNull(tempRange, "tempRange");
+ if (!tempRange.contains(initialTemp))
+ throw new IllegalArgumentException("initialTemp");
+ if (deltaFactor <= 0.0)
+ throw new IllegalArgumentException("deltaFactor");
+ sensor = new SimpleSimulatedSensor(initialTemp, deltaFactor, tempRange);
+ }
+
+ /** Get the tempRange setting */
+ public Range<Double> getTempRange() {
+ return sensor.getRange();
+ }
+
+ /** Get the deltaFactor setting */
+ public double getDeltaFactor() {
+ return sensor.getDeltaFactor();
+ }
+
+ /** Get the next sensor value. */
+ @Override
+ public Double get() {
+ return sensor.get();
+ }
+}