blob: 747e85cbac439b3362900723e7e2b08684d4faa5 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
package org.apache.edgent.connectors.iot.aws.runtime;
import com.amazonaws.services.iot.client.AWSIotConnectionStatus;
import com.amazonaws.services.iot.client.AWSIotMessage;
import com.amazonaws.services.iot.client.AWSIotMqttClient;
import com.google.gson.JsonObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.io.Serializable;
import java.security.KeyStore;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
public class IotAwsGatewayConnector implements Serializable, AutoCloseable {
private static final long serialVersionUID = 1L;
private static final Logger logger = LoggerFactory.getLogger(IotAwsGatewayConnector.class);
private static final String OPTION_CLIENT_ENDPOINT = "clientEndpoint";
private static final String OPTION_CLIENT_ID = "clientId";
private static final String OPTION_KEYSTORE_PASSWORD = "keystorePassword";
private static final String OPTION_KEYSTORE_FILE = "keystoreFile";
private static final String OPTION_KEYSTORE_TYPE = "keystoreType";
private static final String OPTION_PRIVATE_KEY_PASSWORD = "privateKeyPassword";
private static final String OPTION_AWS_ACCESS_KEY_ID = "awsAccessKeyId";
private static final String OPTION_AWS_SECRET_ACCESS_KEY = "awsSecretAccessKey";
private static final String OPTION_SESSION_TOKEN = "sessionToken";
private Properties options;
private String deviceType; // for the gateway device
private String deviceId; // raw WIoTP deviceId for the gateway device
private String fqDeviceId; // for the gateway device
private AWSIotMqttClient client;
public IotAwsGatewayConnector(Properties options) {
this.options = options;
init();
}
public IotAwsGatewayConnector(File optionsFile) {
try {
this.options = new Properties();
options.load(new InputStreamReader(new FileInputStream(optionsFile)));
init();
} catch (Exception e) {
throw new IllegalArgumentException("Unable to create GatewayClient", e);
}
}
private void init() {
try {
AWSIotMqttClient client = getClient();
this.deviceType = client.getGWDeviceType();
this.deviceId = client.getGWDeviceId();
this.fqDeviceId = toFqDeviceId(deviceType, deviceId);
} catch (IllegalArgumentException e) {
throw e;
} catch (Exception e) {
throw new IllegalArgumentException("Unable to create GatewayClient", e);
}
}
synchronized AWSIotMqttClient getClient() throws Exception {
if (client == null) {
// Initialize a client instance depending on the properties provided.
String clientEndpoint = (String) options.get(OPTION_CLIENT_ENDPOINT);
String clientId = (String) options.get(OPTION_CLIENT_ID);
// Configure an MQTT over TLS connection
if(options.containsKey(OPTION_PRIVATE_KEY_PASSWORD) && options.containsKey(OPTION_KEYSTORE_FILE)) {
String keystoreType = KeyStore.getDefaultType();
if(options.containsKey(OPTION_KEYSTORE_TYPE)) {
keystoreType = options.getProperty(OPTION_KEYSTORE_TYPE);
}
String keystorePassword = (String) options.get(OPTION_KEYSTORE_PASSWORD);
FileInputStream keystoreFile = new FileInputStream(options.getProperty(OPTION_KEYSTORE_FILE));
// Initialize the keystore
KeyStore keystore = KeyStore.getInstance(keystoreType);
keystore.load(keystoreFile, (keystorePassword != null) ? keystorePassword.toCharArray() : null);
String privateKeyPassword = (String) options.get(OPTION_PRIVATE_KEY_PASSWORD);
client = new AWSIotMqttClient(clientEndpoint, clientId, keystore, privateKeyPassword);
}
// Configure an MQTT over Websocket connection.
else if(options.containsKey(OPTION_AWS_ACCESS_KEY_ID) &&
options.containsKey(OPTION_AWS_SECRET_ACCESS_KEY)) {
String awsAccessKeyId = (String) options.get(OPTION_AWS_ACCESS_KEY_ID);
String awsSecretAccessKey = (String) options.get(OPTION_AWS_SECRET_ACCESS_KEY);
String sessionToken = (String) options.get(OPTION_SESSION_TOKEN);
client = new AWSIotMqttClient(
clientEndpoint, clientId, awsAccessKeyId, awsSecretAccessKey, sessionToken);
}
// Well if it's not one of these, give up.
else {
throw new IllegalArgumentException("Unable to create GatewayClient. Missing properties.");
}
}
return client;
}
synchronized AWSIotMqttClient connect() {
AWSIotMqttClient client;
try {
client = getClient();
if (client.getConnectionStatus() == AWSIotConnectionStatus.DISCONNECTED) {
client.connect();
}
return client;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public void close() throws Exception {
if (client == null) {
return;
}
client.disconnect();
client = null;
}
void publishGWEvent(String eventId, JsonObject event, int qos) {
AWSIotMqttClient client;
try {
client = connect();
} catch (Exception e) {
throw new RuntimeException(e);
}
AWSIotMessage
if (!client.publishGatewayEvent(eventId, event, qos)) {
logger.error("Publish event failed for eventId {}", eventId);
}
}
void publishDeviceEvent(String fqDeviceId, String eventId, JsonObject event, int qos) {
String[] devIdToks = splitFqDeviceId(fqDeviceId);
publishDeviceEvent(devIdToks[0], devIdToks[1], eventId, event, qos);
}
void publishDeviceEvent(String deviceType, String deviceId, String eventId, JsonObject event, int qos) {
AWSIotMqttClient client;
try {
client = connect();
} catch (Exception e) {
throw new RuntimeException(e);
}
if (!client.publishDeviceEvent(deviceType, deviceId, eventId, event, qos)) {
logger.error("Publish event failed for eventId {}", eventId);
}
}
public String getDeviceType() {
return deviceType;
}
public String getFqDeviceId() {
return fqDeviceId;
}
public String getIotDeviceId(Map<String, String> deviceIdAttrs) {
Objects.requireNonNull(deviceIdAttrs.get(ATTR_DEVICE_TYPE), ATTR_DEVICE_TYPE);
Objects.requireNonNull(deviceIdAttrs.get(ATTR_DEVICE_ID), ATTR_DEVICE_ID);
return toFqDeviceId(deviceIdAttrs.get(ATTR_DEVICE_TYPE), deviceIdAttrs.get(ATTR_DEVICE_ID));
}
public static String toFqDeviceId(String deviceType, String deviceId) {
return String.format("D/%s/%s", deviceType, deviceId);
}
public static String[] splitFqDeviceId(String fqDeviceId) {
String[] tokens = fqDeviceId.split("/");
if (tokens.length != 3 || !tokens[0].equals("D")) {
throw new IllegalArgumentException("bad fqDeviceId " + fqDeviceId);
}
return new String[] { tokens[1], tokens[2] };
}
}