blob: ccefd72af2978af53bcfd177dbe613fd49ff0dcb [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
package org.apache.edgent.connectors.iotp.runtime;
import static org.apache.edgent.connectors.iotp.IotpGateway.ATTR_DEVICE_ID;
import static org.apache.edgent.connectors.iotp.IotpGateway.ATTR_DEVICE_TYPE;
import java.io.File;
import java.io.Serializable;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import org.apache.edgent.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.gson.JsonObject;
import com.ibm.iotf.client.device.DeviceClient;
import com.ibm.iotf.client.gateway.Command;
import com.ibm.iotf.client.gateway.GatewayCallback;
import com.ibm.iotf.client.gateway.GatewayClient;
import com.ibm.iotf.client.gateway.Notification;
/**
* Gateway Device connector for IoTf.
*/
public class IotpGWConnector implements Serializable, AutoCloseable {
private static final long serialVersionUID = 1L;
private static final Logger logger = LoggerFactory.getLogger(IotpGWConnector.class);
private Properties options;
private File optionsFile;
private transient GatewayClient client;
private boolean disconnectOnClose = true;
private boolean isInitialConnect = true;
private GatewayCallback externalCallbackHandler;
private String deviceType; // for the gateway device
private String deviceId; // raw WIoTP deviceId for the gateway device
private String fqDeviceId; // for the gateway device
/**
* Create a new connector to the specified MQTT server.
*
* @param options connector options
*/
public IotpGWConnector(Properties options) {
this.options = options;
init();
}
public IotpGWConnector(File optionsFile) {
this.optionsFile = optionsFile;
init();
}
public IotpGWConnector(GatewayClient iotpGatewayDeviceClient) {
this.client = iotpGatewayDeviceClient;
this.disconnectOnClose = false;
init();
}
private void init() {
try {
GatewayClient client = getClient();
this.deviceType = client.getGWDeviceType();
this.deviceId = client.getGWDeviceId();
this.fqDeviceId = toFqDeviceId(deviceType, deviceId);
} catch (Exception e) {
throw new IllegalArgumentException("Unable to create GatewayClient", e);
}
}
synchronized GatewayClient connect() {
GatewayClient client;
try {
client = getClient();
if (!client.isConnected()) {
client.connect();
}
if (isInitialConnect) {
// We need this for a passed in GatewayClient that was already
// connected, not just when we initiate the connect...
//
// GatewayClient pre-subscribes to cmds for the GW device
// but not for its connected devices so do that now.
//
// N.B. in the face of overlapping subscriptions,
// our GatewayCallback.processCommand(), established
// by our subscribeCommands(), gets called multiple times - once for
// each matching subscription. They are separate Command instances
// for the "duplicate" cmds so we can't filter them out.
// The net result is that the same cmd gets added to a stream
// multiple times.
//
// In combination with the GatewayClient's auto-subscription
// of the GW device's cmds, our desire to receive cmds for
// all of the GW's connected devices sets up this overlapping
// subscriptions condition.
// i.e., simply adding a "all deviceType and all deviceIDs" subscription
// results in duplicate GW device cmd callbacks/tuples.
//
// Unsubscribe the GW device auto-subscription to avoid the dups.
client.unsubscribeFromDeviceCommands(client.getGWDeviceType(), client.getGWDeviceId());
client.subscribeToDeviceCommands("+", "+");
isInitialConnect = false;
}
return client;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
synchronized GatewayClient getClient() throws Exception {
if (client == null) {
if (options == null)
options = DeviceClient.parsePropertiesFile(optionsFile);
client = new GatewayClient(options);
}
return client;
}
public synchronized GatewayCallback setExternalCallbackHandler(GatewayCallback handler) {
GatewayCallback prev = externalCallbackHandler;
externalCallbackHandler = handler;
return prev;
}
synchronized void subscribeCommands(Consumer<Command> tupleSubmitter) throws Exception {
GatewayClient client = getClient();
// N.B. See commentary in connect() above re "dup cmds".
client.setGatewayCallback(new GatewayCallback() {
@Override
public void processCommand(Command cmd) {
if (externalCallbackHandler != null) {
externalCallbackHandler.processCommand(cmd);
}
tupleSubmitter.accept(cmd);
}
@Override
public void processNotification(Notification notification) {
if (externalCallbackHandler != null) {
externalCallbackHandler.processNotification(notification);
}
// Edgent doesn't currently handle notifications.
}
});
connect();
}
void publishGWEvent(String eventId, JsonObject event, int qos) {
GatewayClient client;
try {
client = connect();
} catch (Exception e) {
throw new RuntimeException(e);
}
if (!client.publishGatewayEvent(eventId, event, qos)) {
logger.error("Publish event failed for eventId {}", eventId);
}
}
void publishDeviceEvent(String fqDeviceId, String eventId, JsonObject event, int qos) {
String[] devIdToks = splitFqDeviceId(fqDeviceId);
publishDeviceEvent(devIdToks[0], devIdToks[1], eventId, event, qos);
}
void publishDeviceEvent(String deviceType, String deviceId, String eventId, JsonObject event, int qos) {
GatewayClient client;
try {
client = connect();
} catch (Exception e) {
throw new RuntimeException(e);
}
if (!client.publishDeviceEvent(deviceType, deviceId, eventId, event, qos)) {
logger.error("Publish event failed for eventId {}", eventId);
}
}
// See https://github.com/ibm-watson-iot/iot-java/issues/83
// void publishHttpDeviceEvent(String eventId, JsonObject event) {
// try {
// APIClient api = getClient().api();
// if (!api.publishDeviceEventOverHTTP(eventId, event, ContentType.json)) {
// logger.error("HTTP publish event failed for eventId {}", eventId);
// }
// } catch (Exception e) {
// // throw new RuntimeException(e);
// // If the publish throws, a RuntimeException will cause
// // everything to unwind and the app/org.apache.edgent.org.apache.edgent.topology can terminate.
// // See the commentary/impl of MqttPublisher.accept().
// // See EDGENT-382
// logger.error("Unable to publish event for eventId {}", eventId, e);
// }
// }
@Override
public void close() throws Exception {
if (client == null)
return;
if (disconnectOnClose)
client.disconnect();
client = null;
}
public String getDeviceType() {
return deviceType;
}
public String getFqDeviceId() {
return fqDeviceId;
}
public String getIotDeviceId(Map<String, String> deviceIdAttrs) {
Objects.requireNonNull(deviceIdAttrs.get(ATTR_DEVICE_TYPE), ATTR_DEVICE_TYPE);
Objects.requireNonNull(deviceIdAttrs.get(ATTR_DEVICE_ID), ATTR_DEVICE_ID);
return toFqDeviceId(deviceIdAttrs.get(ATTR_DEVICE_TYPE), deviceIdAttrs.get(ATTR_DEVICE_ID));
}
public static String toFqDeviceId(String deviceType, String deviceId) {
return String.format("D/%s/%s", deviceType, deviceId);
}
public static String[] splitFqDeviceId(String fqDeviceId) {
String[] tokens = fqDeviceId.split("/");
if (tokens.length != 3 || !tokens[0].equals("D")) {
throw new IllegalArgumentException("bad fqDeviceId " + fqDeviceId);
}
return new String[] { tokens[1], tokens[2] };
}
}