blob: 7a9929ea374387a429c15d703243802427987fe1 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
package org.apache.edgent.connectors.iot.aws;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import org.apache.edgent.connectors.iot.IotDevice;
import org.apache.edgent.connectors.iot.IotGateway;
import org.apache.edgent.connectors.iot.aws.help.Command;
import org.apache.edgent.connectors.iot.aws.runtime.IotAwsGatewayConnector;
import org.apache.edgent.function.Function;
import org.apache.edgent.function.UnaryOperator;
import org.apache.edgent.topology.TSink;
import org.apache.edgent.topology.TStream;
import org.apache.edgent.topology.Topology;
import java.io.File;
import java.util.*;
public class IotAwsGateway implements IotGateway {
private final IotAwsGatewayConnector connector;
private final Topology topology;
private TStream<Command> commandStream;
public IotAwsGateway(Topology topology, Properties options) {
this.topology = topology;
this.connector = new IotAwsGatewayConnector(options);
}
public IotAwsGateway(Topology topology, File optionsFile) {
this.topology = topology;
this.connector = new IotAwsGatewayConnector(optionsFile);
}
@Override
public String getDeviceType() {
return connector.getDeviceType();
}
@Override
public String getDeviceId() {
return connector.getDeviceId();
}
@Override
public TSink<JsonObject> events(TStream<JsonObject> stream, Function<JsonObject, String> eventId, UnaryOperator<JsonObject> payload, Function<JsonObject, Integer> qos) {
return null;
}
@Override
public TSink<JsonObject> events(TStream<JsonObject> stream, String eventId, int qos) {
return null;
}
@Override
public TStream<JsonObject> commands(String... commands) {
return null;
}
private TStream<Command> allCommands() {
if (commandStream == null)
commandStream = topology.events(new IotAwsGatewayCommands(connector));
return commandStream;
}
@Override
public Topology topology() {
return topology;
}
@Override
public String getIotDeviceId(Map<String, String> deviceIdAttrs) {
return connector.getIotDeviceId(deviceIdAttrs);
}
@Override
public IotDevice getIotDevice(Map<String, String> deviceIdAttrs) {
return getIotDevice(getIotDeviceId(deviceIdAttrs));
}
@Override
public IotDevice getIotDevice(String deviceId) {
return new IotAwsGatewayDevice(this, connector, topology, deviceId);
}
@Override
public TSink<JsonObject> eventsForDevice(Function<JsonObject, String> deviceId, TStream<JsonObject> stream, Function<JsonObject, String> eventId, UnaryOperator<JsonObject> payload, Function<JsonObject, Integer> qos) {
return stream.sink(new IotAwsGatewayDeviceEventsFunction(connector, deviceId, eventId, payload, qos));
}
@Override
public TSink<JsonObject> eventsForDevice(String deviceId, TStream<JsonObject> stream, String eventId, int qos) {
return stream.sink(new IotAwsGatewayDeviceEventsFixed(connector, deviceId, eventId, qos));
}
@Override
public TStream<JsonObject> commandsForDevice(Set<String> deviceIds, String... commands) {
TStream<Command> all = allCommands();
if (deviceIds.size() != 0) {
// support "all devices of type T" - fqDeviceId of typeId and "*" for the simple deviceId
boolean allDevicesOfType = deviceIds.size() == 1
&& IotAwsGatewayConnector.splitDeviceId(deviceIds.iterator().next())[1].equals("*");
all = all.filter(cmd -> {
String fqDeviceId = IotAwsGatewayConnector.toDeviceId(cmd.getDeviceType(),
allDevicesOfType ? "*" : cmd.getDeviceId());
return deviceIds.contains(fqDeviceId);
});
}
if (commands.length != 0) {
Set<String> uniqueCommands = new HashSet<>(Arrays.asList(commands));
all = all.filter(cmd -> uniqueCommands.contains(cmd.getCommand()));
}
return all.map(cmd -> {
JsonObject full = new JsonObject();
full.addProperty(IotDevice.CMD_DEVICE,
IotAwsGatewayConnector.toDeviceId(cmd.getDeviceType(), cmd.getDeviceId()));
full.addProperty(IotDevice.CMD_ID, cmd.getCommand());
full.addProperty(IotDevice.CMD_TS, System.currentTimeMillis());
full.addProperty(IotDevice.CMD_FORMAT, cmd.getFormat());
if ("json".equalsIgnoreCase(cmd.getFormat())) {
JsonParser parser = new JsonParser();
// iot-java 0.2.2 bug https://github.com/ibm-watson-iot/iot-java/issues/81
// cmd.getData() returns byte[] instead of JsonObject (or String).
// Must continue to use the deprecated method until that's fixed.
// final JsonObject jsonPayload = (JsonObject) cmd.getData();
// final JsonObject jsonPayload = (JsonObject) parser.parse((String)cmd.getData());
@SuppressWarnings("deprecation")
final JsonObject jsonPayload = (JsonObject) parser.parse(cmd.getPayload());
final JsonObject cmdData;
// wiotp java client API >= 0.2.1 (other clients earlier?)
// A json fmt command's msg payload may or may not have "d" wrapping of
// the actual command data.
// The wiotp client API doesn't mask that from clients
// so deal with that here.
if (jsonPayload.has("d")) {
cmdData = jsonPayload.getAsJsonObject("d");
} else {
cmdData = jsonPayload;
}
full.add(IotDevice.CMD_PAYLOAD, cmdData);
} else {
full.addProperty(IotDevice.CMD_PAYLOAD, cmd.getData().toString());
}
return full;
}); }
@Override
public TStream<JsonObject> commandsForDevice(String deviceId, String... commands) {
return commandsForDevice(Collections.singleton(deviceId), commands);
}
@Override
public TStream<JsonObject> commandsForType(String deviceTypeId, String... commands) {
return commandsForDevice(
Collections.singleton(IotAwsGatewayConnector.toDeviceId(deviceTypeId, "*")), commands);
}
}