blob: e2f4b128917e1ca60d60f5de993bf36c8f7272c2 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
package org.apache.edgent.samples.connectors.iotp;
import java.io.File;
import java.util.concurrent.TimeUnit;
import org.apache.edgent.connectors.iot.HeartBeat;
import org.apache.edgent.connectors.iot.IotDevice;
import org.apache.edgent.connectors.iot.QoS;
import org.apache.edgent.connectors.iotp.IotpDevice;
import org.apache.edgent.providers.direct.DirectProvider;
import org.apache.edgent.providers.direct.DirectTopology;
import org.apache.edgent.samples.topology.SensorsAggregates;
import org.apache.edgent.topology.TStream;
import com.google.gson.JsonObject;
/**
* Sample sending sensor device events to IBM Watson IoT Platform. <BR>
* Simulates a couple of bursty sensors and sends the readings from the sensors
* to IBM Watson IoT Platform as device events with id {@code sensors}. <BR>
* Subscribes to device commands with identifier {@code display}.
* <P>
* In addition a device event with id {@code hearbeat} is sent
* every minute. This ensure a connection attempt to IBM Watson IoT Platform
* is made immediately rather than waiting for a bursty sensor to become
* active.
* <P>
* This sample requires an IBM Watson IoT Platform service and a device configuration.<BR>
* In order to see commands send from IBM Watson IoT Platform
* there must be an analytic application
* that sends commands with the identifier {@code display}.
* </P>
*
* <p>See {@code scripts/connectors/iotp/README} for information about a
* prototype device configuration file and running the sample.
*/
public class IotpSensors {
/**
* Run the IotpSensors application.
*
* Takes a single argument that is the path to the
* device configuration file containing the connection
* authentication information.
*
* @param args Must contain the path to the device configuration file.
*
* @see IotpDevice#IotpDevice(org.apache.edgent.topology.Topology, File)
*/
public static void main(String[] args) {
String deviceCfg = args[0];
DirectProvider tp = new DirectProvider();
DirectTopology topology = tp.newTopology("IotpSensors");
// Declare a connection to IoTF
IotDevice device = new IotpDevice(topology, new File(deviceCfg));
// Simulated sensors for this device.
simulatedSensors(device, true);
// Heartbeat
heartBeat(device, true);
// Subscribe to commands of id "display" for this
// device and print them to standard out
displayMessages(device, true);
tp.submit(topology);
}
/**
* Simulate two bursty sensors and send the readings as IoTF device events
* with an identifier of {@code sensors}.
*
* @param device
* IoT device
* @param print
* True if the data submitted as events should also be printed to
* standard out.
*/
public static void simulatedSensors(IotDevice device, boolean print) {
TStream<JsonObject> sensors = SensorsAggregates.sensorsAB(device.topology());
if (print)
sensors.print();
// Send the device streams as IoTF device events
// with event identifier "sensors".
device.events(sensors, "sensors", QoS.FIRE_AND_FORGET);
}
/**
* Create a heart beat device event with
* identifier {@code heartbeat} to
* ensure there is some immediate output and
* the connection to IoTF happens as soon as possible.
* @param device IoT device
* @param print true to print generated heartbeat tuples to System.out.
*/
public static void heartBeat(IotDevice device, boolean print) {
TStream<JsonObject> hbs =
HeartBeat.addHeartBeat(device, 1, TimeUnit.MINUTES, "heartbeat");
if (print)
hbs.print();
}
/**
* Subscribe to IoTP device commands with identifier {@code display}.
* Subscribing to device commands returns a stream of JSON objects that
* include a timestamp ({@code tsms}), command identifier ({@code command})
* and payload ({@code payload}). Payload is the application specific
* portion of the command. <BR>
* In this case the payload is expected to be a JSON object containing a
* {@code msg} key with a string display message. <BR>
* The returned stream consists of the display message string extracted from
* the JSON payload.
* <P>
* Note to receive commands a analytic application must exist that generates
* them through IBM Watson IoT Platform.
* </P>
*
* @param device the device
* @param print true to print the received command's payload to System.out.
* @return the stream
* @see IotDevice#commands(String...)
*/
public static TStream<String> displayMessages(IotDevice device, boolean print) {
// Subscribe to commands of id "display" for this device
TStream<JsonObject> statusMsgs = device.commands("display");
// The returned JSON object includes several fields
// tsms - Timestamp in milliseconds (this is generic to a command)
// payload.msg - Status message (this is specific to this application)
// Map to a String object containing the message
TStream<String> messages = statusMsgs.map(j -> j.getAsJsonObject("payload").getAsJsonPrimitive("msg").getAsString());
if (print)
messages.print();
return messages;
}
}