blob: b75be33b75a67fb77bc2a69354afdaddffc9c856 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
package org.apache.edgent.samples.scenarios.iotp.range.sensor;
import static org.apache.edgent.analytics.math3.stat.Statistic.MAX;
import static org.apache.edgent.analytics.math3.stat.Statistic.MEAN;
import static org.apache.edgent.analytics.math3.stat.Statistic.MIN;
import static org.apache.edgent.analytics.math3.stat.Statistic.STDDEV;
import java.io.File;
import java.util.Date;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.apache.edgent.analytics.math3.json.JsonAnalytics;
import org.apache.edgent.connectors.iot.IotDevice;
import org.apache.edgent.connectors.iot.QoS;
import org.apache.edgent.connectors.iotp.IotpDevice;
import org.apache.edgent.function.Supplier;
import org.apache.edgent.providers.direct.DirectProvider;
import org.apache.edgent.providers.direct.DirectTopology;
import org.apache.edgent.topology.TStream;
import org.apache.edgent.topology.TWindow;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.pi4j.io.gpio.Pin;
import com.pi4j.io.gpio.RaspiPin;
public class IotpRangeSensor {
private static final Pin echoPin = RaspiPin.GPIO_05; // PI4J custom
// numbering (pin 18 on
// RPi2)
private static final Pin trigPin = RaspiPin.GPIO_04; // PI4J custom
// numbering (pin 16 on
// RPi2)
private static final Pin ledPin = RaspiPin.GPIO_01; // PI4J custom numbering
// (pin 12 on RPi2)
public static void main(String[] args) {
if (args.length != 3) {
System.out.println("Proper Usage is:\n " + " java program device.cfg sensorIsSimulated LEDIsSimulated\n"
+ "Example: \n"
+ " java -cp $EDGENT/build/distributions/java8/samples/lib/edgent.samples.scenarios.jar org.apache.edgent.samples.scenarios.iotp.range.sensor.IotpRangeSensor device.cfg false true");
System.exit(0);
}
String deviceCfg = args[0];
Boolean simulatedRange = Boolean.parseBoolean(args[1]);
Boolean simulatedLED = Boolean.parseBoolean(args[2]);
DirectProvider tp = new DirectProvider();
DirectTopology topology = tp.newTopology("IotpRangeSensor");
IotDevice device = getIotDevice(deviceCfg, topology);
// HC-SR04 Range sensor for this device.
rangeSensor(device, simulatedRange, true);
// In addition create a heart beat event to
// ensure there is some immediate output and
// the connection to IoTF happens as soon as possible.
TStream<Date> hb = topology.poll(() -> new Date(), 1, TimeUnit.MINUTES);
// Convert to JSON
TStream<JsonObject> hbj = hb.map(d -> {
JsonObject j = new JsonObject();
j.addProperty("when", d.toString());
j.addProperty("hearbeat", d.getTime());
return j;
});
hbj.print();
device.events(hbj, "heartbeat", QoS.FIRE_AND_FORGET);
// Subscribe to commands of id "display" for this
// device and print them to standard out
TStream<String> statusMsgs = displayMessages(device);
statusMsgs.print();
// Flash an LED for 1second when we receive commands from IoTF
if (!simulatedLED) {
LED led = new LED(ledPin);
statusMsgs.sink(j -> led.flash(1000));
} else {
statusMsgs.sink(j -> System.out.println("*******Simulated LED Flash!*******"));
}
tp.submit(topology);
}
/*
* Returns an IotDevice based on the device config parameter. If the type is
* "quickstart" then we also output the URL to view the data.
*/
private static IotDevice getIotDevice(String deviceCfg, DirectTopology topology) {
IotDevice device;
if (deviceCfg.equalsIgnoreCase("quickstart")) {
// Declare a connection to IoTF Quickstart service
String deviceId = "qs" + Long.toHexString(new Random().nextLong());
device = IotpDevice.quickstart(topology, deviceId);
System.out.println("Quickstart device type:" + IotpDevice.QUICKSTART_DEVICE_TYPE);
System.out.println("Quickstart device id :" + deviceId);
System.out.println("https://quickstart.internetofthings.ibmcloud.com/#/device/" + deviceId);
} else {
// Declare a connection to IoTF
device = new IotpDevice(topology, new File(deviceCfg));
}
return device;
}
/**
* Connect to an HC-SR04 Range Sensor
*
* @param device
* IoTF device
* @param print
* True if the data submitted as events should also be printed to
* standard out.
* @param simulated
* boolean flag
*/
public static void rangeSensor(IotDevice device, boolean simulated, boolean print) {
Supplier<Double> sensor;
if (simulated) {
sensor = new SimulatedRangeSensor();
} else {
sensor = new RangeSensor(echoPin, trigPin);
}
TStream<Double> distanceReadings = device.topology().poll(sensor, 1, TimeUnit.SECONDS);
distanceReadings.print();
// filter out bad readings that are out of the sensor's 4m range
distanceReadings = distanceReadings.filter(j -> j < 400.0);
TStream<JsonObject> sensorJSON = distanceReadings.map(v -> {
JsonObject j = new JsonObject();
j.addProperty("name", "rangeSensor");
j.addProperty("reading", v);
return j;
});
// Create a window on the stream of the last 10 readings partitioned
// by sensor name. In this case we only have one range sensor so there
// will be one partition.
TWindow<JsonObject, JsonElement> sensorWindow = sensorJSON.last(10, j -> j.get("name"));
// Aggregate the windows calculating the min, max, mean and standard
// deviation
// across each window independently.
sensorJSON = JsonAnalytics.aggregate(sensorWindow, "name", "reading", MIN, MAX, MEAN, STDDEV);
// Filter so that only when the mean sensor reading is that an object is
// closer than 30cm send data.
sensorJSON = sensorJSON
.filter(j -> Math.abs(j.get("reading").getAsJsonObject().get("MEAN").getAsDouble()) < 30.0);
if (print)
sensorJSON.print();
// Send the device streams as IoTF device events
// with event identifier "sensors".
device.events(sensorJSON, "sensors", QoS.FIRE_AND_FORGET);
}
/**
* Subscribe to IoTF device commands with identifier {@code display}.
* Subscribing to device commands returns a stream of JSON objects that
* include a timestamp ({@code tsms}), command identifier ({@code command})
* and payload ({@code payload}). Payload is the application specific
* portion of the command. <BR>
* In this case the payload is expected to be a JSON object containing a
* {@code msg} key with a string display message. <BR>
* The returned stream consists of the display message string extracted from
* the JSON payload.
* <P>
* Note to receive commands a analytic application must exist that generates
* them through IBM Watson IoT Platform.
* </P>
*
* @param device IoTF device
* @return JSON object includes tsms(timestamp) and payload.msg(status)
*
* @see IotDevice#commands(String...)
*/
public static TStream<String> displayMessages(IotDevice device) {
// Subscribe to commands of id "status" for this device
TStream<JsonObject> statusMsgs = device.commands("display");
// The returned JSON object includes several fields
// tsms - Timestamp in milliseconds (this is generic to a command)
// payload.msg - Status message (this is specific to this application)
// Map to a String object containing the message
return statusMsgs.map(j -> j.getAsJsonObject("payload").getAsJsonPrimitive("msg").getAsString());
}
}