| /* |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, |
| software distributed under the License is distributed on an |
| "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| KIND, either express or implied. See the License for the |
| specific language governing permissions and limitations |
| under the License. |
| */ |
| |
| package org.apache.edgent.connectors.iotp; |
| |
| import java.io.File; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashSet; |
| import java.util.Map; |
| import java.util.Properties; |
| import java.util.Set; |
| |
| import org.apache.edgent.connectors.iot.IotDevice; |
| import org.apache.edgent.connectors.iot.IotGateway; |
| import org.apache.edgent.connectors.iotp.runtime.IotpGWCommands; |
| import org.apache.edgent.connectors.iotp.runtime.IotpGWConnector; |
| import org.apache.edgent.connectors.iotp.runtime.IotpGWDeviceEventsFixed; |
| import org.apache.edgent.connectors.iotp.runtime.IotpGWDeviceEventsFunction; |
| import org.apache.edgent.connectors.iotp.runtime.IotpGWEventsFixed; |
| import org.apache.edgent.connectors.iotp.runtime.IotpGWEventsFunction; |
| import org.apache.edgent.function.Function; |
| import org.apache.edgent.function.UnaryOperator; |
| import org.apache.edgent.topology.TSink; |
| import org.apache.edgent.topology.TStream; |
| import org.apache.edgent.topology.Topology; |
| |
| import com.google.gson.JsonObject; |
| import com.google.gson.JsonParser; |
| import com.ibm.iotf.client.gateway.Command; |
| import com.ibm.iotf.client.gateway.GatewayCallback; |
| import com.ibm.iotf.client.gateway.GatewayClient; |
| |
| /** |
| * An IoT Gateway Device Connector to IBM Watson IoT Platform (WIoTP). |
| * <p> |
| * <b>This class is incubating and is subject to change.</b> |
| * <p> |
| * See {@link IotpDevice} for common WIoTP documentation references. |
| * <p> |
| * {@code IotpGateway} establishes its own WIoTP {@link GatewayCallback} |
| * handler in its embedded or the passed in WIoTP {@link GatewayClient}. |
| * An application can use |
| * {@link #setExternalCallbackHandler(GatewayCallback) setExternalCallbackHandler} |
| * to also receive and process callbacks. |
| * <p> |
| * Limitations: |
| * <ul> |
| * <li>{@code IotProvider} and {@code PublishSubscribeService} do not yet support |
| * the gateway model. An IotProvider may be initialized with an IotpGateway |
| * but the PublishSubscribeService mechanism will only publish events and receive |
| * commands for the gateway device, not any of its connected devices. |
| * </li> |
| * </ul> |
| */ |
| public class IotpGateway implements IotGateway { |
| |
| private final IotpGWConnector connector; |
| private final Topology topology; |
| private TStream<Command> commandStream; |
| |
| /** |
| * Create a connector for the IoT gateway device specified by {@code options}. |
| * <BR> |
| * These properties must be set in {@code options}. |
| * |
| * <UL> |
| * <LI>{@code org=}<em>organization identifier</em></LI> |
| * <LI>{@code type=}<em>gateway device type</em></LI> |
| * <LI>{@code id=}<em>gateway device identifier</em></LI> |
| * <LI>{@code auth-method=token}</LI> |
| * <LI>{@code auth-token=}<em>authorization token</em></LI> |
| * </UL> |
| * For example: |
| * <pre> |
| * <code> |
| * Properties options = new Properties(); |
| * options.setProperty("org", "uguhsp"); |
| * options.setProperty("type", "iotsample-gateway"); |
| * options.setProperty("id", "00aabbccde03"); |
| * options.setProperty("auth-method", "token"); |
| * options.setProperty("auth-token", "AJfKQV@&bBo@VX6Dcg"); |
| * |
| * IotDevice iotDevice = new IotpDevice(options); |
| * </code> |
| * </pre> |
| * <p> |
| * Connecting to the server occurs when the topology is submitted for |
| * execution. |
| * </p> |
| * <p> |
| * See the IBM Watson IoT Platform documentation for additional properties. |
| * </p> |
| * |
| * @param options control options |
| * @param topology |
| * the connector's associated {@code Topology}. |
| */ |
| public IotpGateway(Topology topology, Properties options) { |
| this.topology = topology; |
| this.connector = new IotpGWConnector(options); |
| } |
| |
| /** |
| * Create a connector for the IoT gateway device specified by {@code optionsFile}. |
| * <BR> |
| * The format of the file is: |
| * <pre> |
| * <code> |
| * [device] |
| * org = <em>organization identifier</em> |
| * type = <em>gateway device type</em> |
| * id = <em>gateway device identifier</em> |
| * auth-method = token |
| * auth-token = <em>authorization token</em> |
| * </code> |
| * </pre> |
| * For example: |
| * <pre> |
| * <code> |
| * [device] |
| * org = uguhsp |
| * type = iotsample-gateway |
| * id = 00aabbccde03 |
| * auth-method = token |
| * auth-token = AJfKQV@&bBo@VX6Dcg |
| * </code> |
| * </pre> |
| * <p> |
| * Connecting to the server occurs when the topology is submitted for |
| * execution. |
| * </p> |
| * <p> |
| * See the IBM Watson IoT Platform documentation for additional properties. |
| * </p> |
| * @param topology the connector's associated {@code Topology}. |
| * @param optionsFile File containing connection information. |
| */ |
| public IotpGateway(Topology topology, File optionsFile) { |
| this.topology = topology; |
| this.connector = new IotpGWConnector(optionsFile); |
| } |
| |
| /** |
| * Create a connector using the supplied WIoTP {@code DeviceClient}. |
| * @param topology the connector's associated {@code Topology}. |
| * @param gatewayClient a WIoTP device client API object. |
| */ |
| public IotpGateway(Topology topology, GatewayClient gatewayClient) { |
| this.topology = topology; |
| this.connector = new IotpGWConnector(gatewayClient); |
| } |
| |
| @Override |
| public String getDeviceType() { |
| return connector.getDeviceType(); |
| } |
| |
| @Override |
| public String getDeviceId() { |
| return connector.getFqDeviceId(); |
| } |
| |
| @Override |
| public TSink<JsonObject> events(TStream<JsonObject> stream, Function<JsonObject, String> eventId, |
| UnaryOperator<JsonObject> payload, Function<JsonObject, Integer> qos) { |
| return stream.sink(new IotpGWEventsFunction(connector, eventId, payload, qos)); |
| } |
| |
| @Override |
| public TSink<JsonObject> events(TStream<JsonObject> stream, String eventId, int qos) { |
| return stream.sink(new IotpGWEventsFixed(connector, eventId, qos)); |
| } |
| |
| @Override |
| public TStream<JsonObject> commands(String... commands) { |
| return commandsForDevice(Collections.singleton(connector.getFqDeviceId()), commands); |
| } |
| |
| private TStream<Command> allCommands() { |
| if (commandStream == null) |
| commandStream = topology.events(new IotpGWCommands(connector)); |
| return commandStream; |
| } |
| |
| @Override |
| public Topology topology() { |
| return topology; |
| } |
| |
| /** |
| * WIoTP Device Type identifier key. |
| * Key is {@value}. |
| * |
| * @see #getIotDevice(Map) |
| */ |
| public static final String ATTR_DEVICE_TYPE = "deviceType"; |
| |
| /** |
| * WIoTP Device Id identifier key. |
| * Key is {@value}. |
| * |
| * @see #getIotDevice(Map) |
| */ |
| public static final String ATTR_DEVICE_ID = "deviceId"; |
| |
| /** |
| * {@inheritDoc} |
| * <p> |
| * The device's WIoTP deviceType and deviceId must be supplied |
| * using the {@link #ATTR_DEVICE_TYPE} and {@link #ATTR_DEVICE_ID} |
| * keys respectively. |
| */ |
| @Override |
| public String getIotDeviceId(Map<String, String> deviceIdAttrs) { |
| return connector.getIotDeviceId(deviceIdAttrs); |
| } |
| |
| /** |
| * {@inheritDoc} |
| * <p> |
| * See {@link #getIotDeviceId(Map)} for the required attribute keys. |
| */ |
| @Override |
| public IotDevice getIotDevice(Map<String, String> deviceIdAttrs) { |
| return getIotDevice(getIotDeviceId(deviceIdAttrs)); |
| } |
| |
| @Override |
| public IotDevice getIotDevice(String fqDeviceId) { |
| return new IotpGWDevice(this, connector, topology, fqDeviceId); |
| } |
| |
| @Override |
| public TSink<JsonObject> eventsForDevice(Function<JsonObject, String> fqDeviceId, |
| TStream<JsonObject> stream, Function<JsonObject, String> eventId, |
| UnaryOperator<JsonObject> payload, Function<JsonObject, Integer> qos) { |
| return stream.sink(new IotpGWDeviceEventsFunction(connector, fqDeviceId, eventId, payload, qos)); |
| } |
| |
| @Override |
| public TSink<JsonObject> eventsForDevice(String fqDeviceId, TStream<JsonObject> stream, |
| String eventId, int qos) { |
| return stream.sink(new IotpGWDeviceEventsFixed(connector, fqDeviceId, eventId, qos)); |
| } |
| |
| @Override |
| public TStream<JsonObject> commandsForDevice(Set<String> fqDeviceIds, String... commands) { |
| TStream<Command> all = allCommands(); |
| |
| if (fqDeviceIds.size() != 0) { |
| // support "all devices of type T" - fqDeviceId of typeId and "*" for the simple deviceId |
| boolean allDevicesOfType = fqDeviceIds.size() == 1 |
| && IotpGWConnector.splitFqDeviceId(fqDeviceIds.iterator().next())[1].equals("*"); |
| |
| all = all.filter(cmd -> { |
| String fqDeviceId = IotpGWConnector.toFqDeviceId(cmd.getDeviceType(), |
| allDevicesOfType ? "*" : cmd.getDeviceId()); |
| return fqDeviceIds.contains(fqDeviceId); |
| }); |
| } |
| |
| if (commands.length != 0) { |
| Set<String> uniqueCommands = new HashSet<>(); |
| uniqueCommands.addAll(Arrays.asList(commands)); |
| all = all.filter(cmd -> uniqueCommands.contains(cmd.getCommand())); |
| } |
| |
| return all.map(cmd -> { |
| JsonObject full = new JsonObject(); |
| full.addProperty(IotDevice.CMD_DEVICE, |
| IotpGWConnector.toFqDeviceId(cmd.getDeviceType(), cmd.getDeviceId())); |
| full.addProperty(IotDevice.CMD_ID, cmd.getCommand()); |
| full.addProperty(IotDevice.CMD_TS, System.currentTimeMillis()); |
| full.addProperty(IotDevice.CMD_FORMAT, cmd.getFormat()); |
| if ("json".equalsIgnoreCase(cmd.getFormat())) { |
| JsonParser parser = new JsonParser(); |
| // iot-java 0.2.2 bug https://github.com/ibm-watson-iot/iot-java/issues/81 |
| // cmd.getData() returns byte[] instead of JsonObject (or String). |
| // Must continue to use the deprecated method until that's fixed. |
| // final JsonObject jsonPayload = (JsonObject) cmd.getData(); |
| // final JsonObject jsonPayload = (JsonObject) parser.parse((String)cmd.getData()); |
| @SuppressWarnings("deprecation") |
| final JsonObject jsonPayload = (JsonObject) parser.parse(cmd.getPayload()); |
| final JsonObject cmdData; |
| // wiotp java client API >= 0.2.1 (other clients earlier?) |
| // A json fmt command's msg payload may or may not have "d" wrapping of |
| // the actual command data. |
| // The wiotp client API doesn't mask that from clients |
| // so deal with that here. |
| if (jsonPayload.has("d")) { |
| cmdData = jsonPayload.getAsJsonObject("d"); |
| } else { |
| cmdData = jsonPayload; |
| } |
| full.add(IotDevice.CMD_PAYLOAD, cmdData); |
| } else { |
| full.addProperty(IotDevice.CMD_PAYLOAD, cmd.getData().toString()); |
| } |
| return full; |
| }); |
| } |
| |
| @Override |
| public TStream<JsonObject> commandsForDevice(String fqDeviceId, String... commands) { |
| return commandsForDevice(Collections.singleton(fqDeviceId), commands); |
| } |
| |
| @Override |
| public TStream<JsonObject> commandsForType(String deviceTypeId, String... commands) { |
| return commandsForDevice( |
| Collections.singleton(IotpGWConnector.toFqDeviceId(deviceTypeId, "*")), commands); |
| } |
| |
| /** |
| * Set an external WIoTP {@link GatewayCallback} handler. |
| * |
| * @param handler the handler to call. May be null. |
| * @return the previously set handler. May be null. |
| */ |
| public GatewayCallback setExternalCallbackHandler(GatewayCallback handler) { |
| return connector.setExternalCallbackHandler(handler); |
| } |
| |
| @Override |
| public String toString() { |
| return String.format("IotpGateway %s", getDeviceId()); |
| } |
| |
| } |