Initial support for Sync RPC on top of consul
diff --git a/admin/src/main/java/org/apache/airavata/mft/admin/MFTConsulClient.java b/admin/src/main/java/org/apache/airavata/mft/admin/MFTConsulClient.java
index b654e21..3329092 100644
--- a/admin/src/main/java/org/apache/airavata/mft/admin/MFTConsulClient.java
+++ b/admin/src/main/java/org/apache/airavata/mft/admin/MFTConsulClient.java
@@ -25,10 +25,9 @@
import com.orbitz.consul.KeyValueClient;
import com.orbitz.consul.SessionClient;
import com.orbitz.consul.model.kv.Value;
-import org.apache.airavata.mft.admin.models.AgentInfo;
-import org.apache.airavata.mft.admin.models.TransferCommand;
-import org.apache.airavata.mft.admin.models.TransferRequest;
-import org.apache.airavata.mft.admin.models.TransferState;
+import org.apache.airavata.mft.admin.models.*;
+import org.apache.airavata.mft.admin.models.rpc.SyncRPCRequest;
+import org.apache.airavata.mft.admin.models.rpc.SyncRPCResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,7 +56,8 @@
public static final String TRANSFER_STATE_PATH = "mft/transfer/state/";
public static final String CONTROLLER_TRANSFER_MESSAGE_PATH = "mft/controller/messages/transfers/";
public static final String CONTROLLER_STATE_MESSAGE_PATH = "mft/controller/messages/states/";
- public static final String AGENTS_MESSAGE_PATH = "mft/agents/messages/";
+ public static final String AGENTS_TRANSFER_REQUEST_MESSAGE_PATH = "mft/agents/transfermessages/";
+ public static final String AGENTS_RPC_REQUEST_MESSAGE_PATH = "mft/agents/rpcmessages/";
public static final String AGENTS_SCHEDULED_PATH = "mft/agents/scheduled/";
public static final String AGENTS_INFO_PATH = "mft/agents/info/";
public static final String LIVE_AGENTS_PATH = "mft/agent/live/";
@@ -106,13 +106,31 @@
.setPublisher("controller")
.setDescription("Initializing the transfer"));
String asString = mapper.writeValueAsString(transferCommand);
- kvClient.putValue(AGENTS_MESSAGE_PATH + agentId + "/" + transferCommand.getTransferId(), asString);
+ kvClient.putValue(AGENTS_TRANSFER_REQUEST_MESSAGE_PATH + agentId + "/" + transferCommand.getTransferId(), asString);
} catch (JsonProcessingException e) {
throw new MFTConsulClientException("Error in serializing transfer request", e);
}
}
+ public void sendSyncRPCToAgent(String agentId, SyncRPCRequest rpcRequest) throws MFTConsulClientException {
+ try {
+ String asString = mapper.writeValueAsString(rpcRequest);
+ kvClient.putValue(AGENTS_RPC_REQUEST_MESSAGE_PATH + agentId + "/" + rpcRequest.getMessageId(), asString);
+ } catch (JsonProcessingException e) {
+ throw new MFTConsulClientException("Error in serializing rpc request", e);
+ }
+ }
+
+ public void sendSyncRPCResponseFromAgent(String returnAddress, SyncRPCResponse rpcResponse) throws MFTConsulClientException {
+ try {
+ String asString = mapper.writeValueAsString(rpcResponse);
+ kvClient.putValue(returnAddress, asString);
+ } catch (JsonProcessingException e) {
+ throw new MFTConsulClientException("Error in serializing rpc response", e);
+ }
+ }
+
/**
* List all currently registered agents.
*
diff --git a/admin/src/main/java/org/apache/airavata/mft/admin/SyncRPCClient.java b/admin/src/main/java/org/apache/airavata/mft/admin/SyncRPCClient.java
new file mode 100644
index 0000000..19fd30e
--- /dev/null
+++ b/admin/src/main/java/org/apache/airavata/mft/admin/SyncRPCClient.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.airavata.mft.admin;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.orbitz.consul.cache.ConsulCache;
+import com.orbitz.consul.cache.KVCache;
+import com.orbitz.consul.model.kv.Value;
+import org.apache.airavata.mft.admin.models.rpc.SyncRPCRequest;
+import org.apache.airavata.mft.admin.models.rpc.SyncRPCResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This is the client implementation to provide RPC (Request / Response) based communication to Agents through the Consul
+ * Key Value store. Using this mechanism, we can invoke methods of the Agents which are living in both private and public
+ * networks.
+ */
+public class SyncRPCClient {
+
+ private static final Logger logger = LoggerFactory.getLogger(MFTConsulClient.class);
+
+ private String baseResponsePath;
+
+ private ConsulCache.Listener<String, Value> syncResponseCacheListener;
+ private KVCache syncResponseCache;
+ private MFTConsulClient mftConsulClient;
+
+ private ObjectMapper mapper = new ObjectMapper();
+ private Map<String, ArrayBlockingQueue<SyncRPCResponse>> responseQueueMap = new ConcurrentHashMap<>();
+
+ public SyncRPCClient(String agentId, MFTConsulClient mftConsulClient) {
+ this.baseResponsePath = "mft/sync/response/" + agentId + "/";
+ this.mftConsulClient = mftConsulClient;
+ }
+
+ public void init() {
+ syncResponseCache = KVCache.newCache(mftConsulClient.getKvClient(), baseResponsePath);
+ listenToResponses();
+ }
+
+ public void disconnectClient() {
+ if (syncResponseCacheListener != null) {
+ syncResponseCache.removeListener(syncResponseCacheListener);
+ }
+ }
+
+ private void listenToResponses() {
+ syncResponseCacheListener = newValues -> {
+ newValues.values().forEach(value -> {
+ Optional<String> decodedValue = value.getValueAsString();
+ decodedValue.ifPresent(v -> {
+ try {
+ SyncRPCResponse response = mapper.readValue(v, SyncRPCResponse.class);
+ if (responseQueueMap.containsKey(response.getMessageId())) {
+ responseQueueMap.get(response.getMessageId()).put(response);
+ }
+ } catch (Throwable e) {
+ logger.error("Errored while processing sync response", e);
+ } finally {
+ mftConsulClient.getKvClient().deleteKey(value.getKey());
+ }
+ });
+ });
+ };
+
+ syncResponseCache.addListener(syncResponseCacheListener);
+ syncResponseCache.start();
+ }
+
+ public SyncRPCResponse sendSyncRequest(SyncRPCRequest request, long waitMs) throws MFTConsulClientException, InterruptedException {
+ request.setReturnAddress(this.baseResponsePath + request.getMessageId());
+ ArrayBlockingQueue<SyncRPCResponse> queue = new ArrayBlockingQueue<>(1);
+
+ this.responseQueueMap.put(request.getMessageId(), queue);
+
+ try {
+ this.mftConsulClient.sendSyncRPCToAgent(request.getAgentId(), request);
+ SyncRPCResponse response = queue.poll(waitMs, TimeUnit.MILLISECONDS);
+ if (response == null) {
+ throw new MFTConsulClientException("Timed out waiting for the response");
+ }
+ return response;
+ } finally {
+ this.responseQueueMap.remove(request.getMessageId());
+ }
+ }
+
+ public SyncRPCResponse sendSyncRequest(SyncRPCRequest request) throws MFTConsulClientException, InterruptedException {
+ return sendSyncRequest(request, 10000);
+ }
+}
diff --git a/admin/src/main/java/org/apache/airavata/mft/admin/models/AgentInfo.java b/admin/src/main/java/org/apache/airavata/mft/admin/models/AgentInfo.java
index 570fc6f..db350f7 100644
--- a/admin/src/main/java/org/apache/airavata/mft/admin/models/AgentInfo.java
+++ b/admin/src/main/java/org/apache/airavata/mft/admin/models/AgentInfo.java
@@ -1,7 +1,3 @@
-package org.apache.airavata.mft.admin.models;
-
-import java.util.List;
-
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -19,6 +15,10 @@
* limitations under the License.
*/
+package org.apache.airavata.mft.admin.models;
+
+import java.util.List;
+
public class AgentInfo {
private String id;
private String host;
diff --git a/admin/src/main/java/org/apache/airavata/mft/admin/models/rpc/SyncRPCRequest.java b/admin/src/main/java/org/apache/airavata/mft/admin/models/rpc/SyncRPCRequest.java
new file mode 100644
index 0000000..2c0f2ff
--- /dev/null
+++ b/admin/src/main/java/org/apache/airavata/mft/admin/models/rpc/SyncRPCRequest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.airavata.mft.admin.models.rpc;
+
+import java.util.Map;
+
+public class SyncRPCRequest {
+ private String agentId;
+ private String method;
+ private Map<String, String> parameters;
+ private String returnAddress;
+ private String messageId;
+
+ public String getAgentId() {
+ return agentId;
+ }
+
+ public SyncRPCRequest setAgentId(String agentId) {
+ this.agentId = agentId;
+ return this;
+ }
+
+ public String getMethod() {
+ return method;
+ }
+
+ public SyncRPCRequest setMethod(String method) {
+ this.method = method;
+ return this;
+ }
+
+ public Map<String, String> getParameters() {
+ return parameters;
+ }
+
+ public SyncRPCRequest setParameters(Map<String, String> parameters) {
+ this.parameters = parameters;
+ return this;
+ }
+
+ public String getReturnAddress() {
+ return returnAddress;
+ }
+
+ public SyncRPCRequest setReturnAddress(String returnAddress) {
+ this.returnAddress = returnAddress;
+ return this;
+ }
+
+ public String getMessageId() {
+ return messageId;
+ }
+
+ public SyncRPCRequest setMessageId(String messageId) {
+ this.messageId = messageId;
+ return this;
+ }
+}
diff --git a/admin/src/main/java/org/apache/airavata/mft/admin/models/rpc/SyncRPCResponse.java b/admin/src/main/java/org/apache/airavata/mft/admin/models/rpc/SyncRPCResponse.java
new file mode 100644
index 0000000..048f2ac
--- /dev/null
+++ b/admin/src/main/java/org/apache/airavata/mft/admin/models/rpc/SyncRPCResponse.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.airavata.mft.admin.models.rpc;
+
+public class SyncRPCResponse {
+ public enum ResponseStatus {
+ SUCCESS, FAIL
+ }
+
+ private String messageId;
+ private String responseAsStr;
+ private ResponseStatus responseStatus;
+ private String errorAsStr;
+
+ public String getMessageId() {
+ return messageId;
+ }
+
+ public SyncRPCResponse setMessageId(String messageId) {
+ this.messageId = messageId;
+ return this;
+ }
+
+ public String getResponseAsStr() {
+ return responseAsStr;
+ }
+
+ public SyncRPCResponse setResponseAsStr(String responseAsStr) {
+ this.responseAsStr = responseAsStr;
+ return this;
+ }
+
+ public ResponseStatus getResponseStatus() {
+ return responseStatus;
+ }
+
+ public SyncRPCResponse setResponseStatus(ResponseStatus responseStatus) {
+ this.responseStatus = responseStatus;
+ return this;
+ }
+
+ public String getErrorAsStr() {
+ return errorAsStr;
+ }
+
+ public SyncRPCResponse setErrorAsStr(String errorAsStr) {
+ this.errorAsStr = errorAsStr;
+ return this;
+ }
+}
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java b/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
index aabd4a6..15c8442 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
@@ -29,9 +29,10 @@
import org.apache.airavata.mft.admin.models.AgentInfo;
import org.apache.airavata.mft.admin.models.TransferCommand;
import org.apache.airavata.mft.admin.models.TransferState;
+import org.apache.airavata.mft.admin.models.rpc.SyncRPCRequest;
+import org.apache.airavata.mft.admin.models.rpc.SyncRPCResponse;
import org.apache.airavata.mft.core.ConnectorResolver;
import org.apache.airavata.mft.core.MetadataCollectorResolver;
-import org.apache.airavata.mft.core.ResourceMetadata;
import org.apache.airavata.mft.core.api.Connector;
import org.apache.airavata.mft.core.api.MetadataCollector;
import org.apache.commons.lang3.exception.ExceptionUtils;
@@ -81,8 +82,11 @@
private final Semaphore mainHold = new Semaphore(0);
- private KVCache messageCache;
- private ConsulCache.Listener<String, Value> cacheListener;
+ private KVCache transferMessageCache;
+ private KVCache rpcMessageCache;
+
+ private ConsulCache.Listener<String, Value> transferCacheListener;
+ private ConsulCache.Listener<String, Value> rpcCacheListener;
private final ScheduledExecutorService sessionRenewPool = Executors.newSingleThreadScheduledExecutor();
private long sessionRenewSeconds = 4;
@@ -95,12 +99,39 @@
private MFTConsulClient mftConsulClient;
public void init() {
- messageCache = KVCache.newCache(mftConsulClient.getKvClient(), MFTConsulClient.AGENTS_MESSAGE_PATH + agentId );
+ transferMessageCache = KVCache.newCache(mftConsulClient.getKvClient(), MFTConsulClient.AGENTS_TRANSFER_REQUEST_MESSAGE_PATH + agentId );
+ rpcMessageCache = KVCache.newCache(mftConsulClient.getKvClient(), MFTConsulClient.AGENTS_RPC_REQUEST_MESSAGE_PATH + agentId );
}
- private void acceptRequests() {
+ private void acceptRPCRequests() {
+ rpcCacheListener = newValues -> {
+ newValues.values().forEach(value -> {
+ Optional<String> decodedValue = value.getValueAsString();
+ decodedValue.ifPresent(v -> {
+ try {
+ SyncRPCRequest rpcRequest = mapper.readValue(v, SyncRPCRequest.class);
+ mftConsulClient.sendSyncRPCResponseFromAgent(rpcRequest.getReturnAddress(), processRPCRequest(rpcRequest));
+ } catch (Throwable e) {
+ logger.error("Error processing the RPC request {}", value.getKey(), e);
+ } finally {
+ mftConsulClient.getKvClient().deleteKey(value.getKey());
+ }
+ });
+ });
+ };
- cacheListener = newValues -> {
+ rpcMessageCache.addListener(rpcCacheListener);
+ rpcMessageCache.start();
+ }
+
+ private SyncRPCResponse processRPCRequest(SyncRPCRequest request) {
+ // TODO implement using the reflection
+ return null;
+ }
+
+ private void acceptTransferRequests() {
+
+ transferCacheListener = newValues -> {
newValues.values().forEach(value -> {
Optional<String> decodedValue = value.getValueAsString();
decodedValue.ifPresent(v -> {
@@ -188,8 +219,8 @@
});
};
- messageCache.addListener(cacheListener);
- messageCache.start();
+ transferMessageCache.addListener(transferCacheListener);
+ transferMessageCache.start();
}
private boolean connectAgent() throws MFTConsulClientException {
@@ -252,8 +283,12 @@
public void disconnectAgent() {
sessionRenewPool.shutdown();
- if (cacheListener != null) {
- messageCache.removeListener(cacheListener);
+ if (transferCacheListener != null) {
+ transferMessageCache.removeListener(transferCacheListener);
+ }
+
+ if (rpcCacheListener != null) {
+ rpcMessageCache.removeListener(rpcCacheListener);
}
}
@@ -281,7 +316,8 @@
}
}
- acceptRequests();
+ acceptTransferRequests();
+ acceptRPCRequests();
}
@Override