Initial support for Sync RPC on top of consul
diff --git a/admin/src/main/java/org/apache/airavata/mft/admin/MFTConsulClient.java b/admin/src/main/java/org/apache/airavata/mft/admin/MFTConsulClient.java
index b654e21..3329092 100644
--- a/admin/src/main/java/org/apache/airavata/mft/admin/MFTConsulClient.java
+++ b/admin/src/main/java/org/apache/airavata/mft/admin/MFTConsulClient.java
@@ -25,10 +25,9 @@
 import com.orbitz.consul.KeyValueClient;
 import com.orbitz.consul.SessionClient;
 import com.orbitz.consul.model.kv.Value;
-import org.apache.airavata.mft.admin.models.AgentInfo;
-import org.apache.airavata.mft.admin.models.TransferCommand;
-import org.apache.airavata.mft.admin.models.TransferRequest;
-import org.apache.airavata.mft.admin.models.TransferState;
+import org.apache.airavata.mft.admin.models.*;
+import org.apache.airavata.mft.admin.models.rpc.SyncRPCRequest;
+import org.apache.airavata.mft.admin.models.rpc.SyncRPCResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,7 +56,8 @@
     public static final String TRANSFER_STATE_PATH = "mft/transfer/state/";
     public static final String CONTROLLER_TRANSFER_MESSAGE_PATH = "mft/controller/messages/transfers/";
     public static final String CONTROLLER_STATE_MESSAGE_PATH = "mft/controller/messages/states/";
-    public static final String AGENTS_MESSAGE_PATH = "mft/agents/messages/";
+    public static final String AGENTS_TRANSFER_REQUEST_MESSAGE_PATH = "mft/agents/transfermessages/";
+    public static final String AGENTS_RPC_REQUEST_MESSAGE_PATH = "mft/agents/rpcmessages/";
     public static final String AGENTS_SCHEDULED_PATH = "mft/agents/scheduled/";
     public static final String AGENTS_INFO_PATH = "mft/agents/info/";
     public static final String LIVE_AGENTS_PATH = "mft/agent/live/";
@@ -106,13 +106,31 @@
             .setPublisher("controller")
             .setDescription("Initializing the transfer"));
             String asString = mapper.writeValueAsString(transferCommand);
-            kvClient.putValue(AGENTS_MESSAGE_PATH  + agentId + "/" + transferCommand.getTransferId(), asString);
+            kvClient.putValue(AGENTS_TRANSFER_REQUEST_MESSAGE_PATH + agentId + "/" + transferCommand.getTransferId(), asString);
 
         } catch (JsonProcessingException e) {
             throw new MFTConsulClientException("Error in serializing transfer request", e);
         }
     }
 
+    public void sendSyncRPCToAgent(String agentId, SyncRPCRequest rpcRequest) throws MFTConsulClientException {
+        try {
+            String asString = mapper.writeValueAsString(rpcRequest);
+            kvClient.putValue(AGENTS_RPC_REQUEST_MESSAGE_PATH + agentId + "/" + rpcRequest.getMessageId(), asString);
+        } catch (JsonProcessingException e) {
+            throw new MFTConsulClientException("Error in serializing rpc request", e);
+        }
+    }
+
+    public void sendSyncRPCResponseFromAgent(String returnAddress, SyncRPCResponse rpcResponse) throws MFTConsulClientException {
+        try {
+            String asString = mapper.writeValueAsString(rpcResponse);
+            kvClient.putValue(returnAddress, asString);
+        } catch (JsonProcessingException e) {
+            throw new MFTConsulClientException("Error in serializing rpc response", e);
+        }
+    }
+
     /**
      * List all currently registered agents.
      *
diff --git a/admin/src/main/java/org/apache/airavata/mft/admin/SyncRPCClient.java b/admin/src/main/java/org/apache/airavata/mft/admin/SyncRPCClient.java
new file mode 100644
index 0000000..19fd30e
--- /dev/null
+++ b/admin/src/main/java/org/apache/airavata/mft/admin/SyncRPCClient.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.airavata.mft.admin;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.orbitz.consul.cache.ConsulCache;
+import com.orbitz.consul.cache.KVCache;
+import com.orbitz.consul.model.kv.Value;
+import org.apache.airavata.mft.admin.models.rpc.SyncRPCRequest;
+import org.apache.airavata.mft.admin.models.rpc.SyncRPCResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This is the client implementation to provide RPC (Request / Response) based communication to Agents through the Consul
+ * Key Value store. Using this mechanism, we can invoke methods of the Agents which are living in both private and public
+ * networks.
+ */
+public class SyncRPCClient {
+
+    private static final Logger logger = LoggerFactory.getLogger(MFTConsulClient.class);
+
+    private String baseResponsePath;
+
+    private ConsulCache.Listener<String, Value> syncResponseCacheListener;
+    private KVCache syncResponseCache;
+    private MFTConsulClient mftConsulClient;
+
+    private ObjectMapper mapper = new ObjectMapper();
+    private Map<String, ArrayBlockingQueue<SyncRPCResponse>> responseQueueMap = new ConcurrentHashMap<>();
+
+    public SyncRPCClient(String agentId, MFTConsulClient mftConsulClient) {
+        this.baseResponsePath = "mft/sync/response/" + agentId + "/";
+        this.mftConsulClient = mftConsulClient;
+    }
+
+    public void init() {
+        syncResponseCache = KVCache.newCache(mftConsulClient.getKvClient(), baseResponsePath);
+        listenToResponses();
+    }
+
+    public void disconnectClient() {
+        if (syncResponseCacheListener != null) {
+            syncResponseCache.removeListener(syncResponseCacheListener);
+        }
+    }
+
+    private void listenToResponses() {
+        syncResponseCacheListener = newValues -> {
+            newValues.values().forEach(value -> {
+                Optional<String> decodedValue = value.getValueAsString();
+                decodedValue.ifPresent(v -> {
+                    try {
+                        SyncRPCResponse response = mapper.readValue(v, SyncRPCResponse.class);
+                        if (responseQueueMap.containsKey(response.getMessageId())) {
+                            responseQueueMap.get(response.getMessageId()).put(response);
+                        }
+                    } catch (Throwable e) {
+                        logger.error("Errored while processing sync response", e);
+                    } finally {
+                        mftConsulClient.getKvClient().deleteKey(value.getKey());
+                    }
+                });
+            });
+        };
+
+        syncResponseCache.addListener(syncResponseCacheListener);
+        syncResponseCache.start();
+    }
+
+    public SyncRPCResponse sendSyncRequest(SyncRPCRequest request, long waitMs) throws MFTConsulClientException, InterruptedException {
+        request.setReturnAddress(this.baseResponsePath + request.getMessageId());
+        ArrayBlockingQueue<SyncRPCResponse> queue = new ArrayBlockingQueue<>(1);
+
+        this.responseQueueMap.put(request.getMessageId(), queue);
+
+        try {
+            this.mftConsulClient.sendSyncRPCToAgent(request.getAgentId(), request);
+            SyncRPCResponse response = queue.poll(waitMs, TimeUnit.MILLISECONDS);
+            if (response == null) {
+                throw new MFTConsulClientException("Timed out waiting for the response");
+            }
+            return response;
+        } finally {
+            this.responseQueueMap.remove(request.getMessageId());
+        }
+    }
+
+    public SyncRPCResponse sendSyncRequest(SyncRPCRequest request) throws MFTConsulClientException, InterruptedException {
+        return sendSyncRequest(request, 10000);
+    }
+}
diff --git a/admin/src/main/java/org/apache/airavata/mft/admin/models/AgentInfo.java b/admin/src/main/java/org/apache/airavata/mft/admin/models/AgentInfo.java
index 570fc6f..db350f7 100644
--- a/admin/src/main/java/org/apache/airavata/mft/admin/models/AgentInfo.java
+++ b/admin/src/main/java/org/apache/airavata/mft/admin/models/AgentInfo.java
@@ -1,7 +1,3 @@
-package org.apache.airavata.mft.admin.models;
-
-import java.util.List;
-
 /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -19,6 +15,10 @@
  * limitations under the License.
  */
 
+package org.apache.airavata.mft.admin.models;
+
+import java.util.List;
+
 public class AgentInfo {
     private String id;
     private String host;
diff --git a/admin/src/main/java/org/apache/airavata/mft/admin/models/rpc/SyncRPCRequest.java b/admin/src/main/java/org/apache/airavata/mft/admin/models/rpc/SyncRPCRequest.java
new file mode 100644
index 0000000..2c0f2ff
--- /dev/null
+++ b/admin/src/main/java/org/apache/airavata/mft/admin/models/rpc/SyncRPCRequest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.airavata.mft.admin.models.rpc;
+
+import java.util.Map;
+
+public class SyncRPCRequest {
+    private String agentId;
+    private String method;
+    private Map<String, String> parameters;
+    private String returnAddress;
+    private String messageId;
+
+    public String getAgentId() {
+        return agentId;
+    }
+
+    public SyncRPCRequest setAgentId(String agentId) {
+        this.agentId = agentId;
+        return this;
+    }
+
+    public String getMethod() {
+        return method;
+    }
+
+    public SyncRPCRequest setMethod(String method) {
+        this.method = method;
+        return this;
+    }
+
+    public Map<String, String> getParameters() {
+        return parameters;
+    }
+
+    public SyncRPCRequest setParameters(Map<String, String> parameters) {
+        this.parameters = parameters;
+        return this;
+    }
+
+    public String getReturnAddress() {
+        return returnAddress;
+    }
+
+    public SyncRPCRequest setReturnAddress(String returnAddress) {
+        this.returnAddress = returnAddress;
+        return this;
+    }
+
+    public String getMessageId() {
+        return messageId;
+    }
+
+    public SyncRPCRequest setMessageId(String messageId) {
+        this.messageId = messageId;
+        return this;
+    }
+}
diff --git a/admin/src/main/java/org/apache/airavata/mft/admin/models/rpc/SyncRPCResponse.java b/admin/src/main/java/org/apache/airavata/mft/admin/models/rpc/SyncRPCResponse.java
new file mode 100644
index 0000000..048f2ac
--- /dev/null
+++ b/admin/src/main/java/org/apache/airavata/mft/admin/models/rpc/SyncRPCResponse.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.airavata.mft.admin.models.rpc;
+
+public class SyncRPCResponse {
+    public enum ResponseStatus {
+        SUCCESS, FAIL
+    }
+
+    private String messageId;
+    private String responseAsStr;
+    private ResponseStatus responseStatus;
+    private String errorAsStr;
+
+    public String getMessageId() {
+        return messageId;
+    }
+
+    public SyncRPCResponse setMessageId(String messageId) {
+        this.messageId = messageId;
+        return this;
+    }
+
+    public String getResponseAsStr() {
+        return responseAsStr;
+    }
+
+    public SyncRPCResponse setResponseAsStr(String responseAsStr) {
+        this.responseAsStr = responseAsStr;
+        return this;
+    }
+
+    public ResponseStatus getResponseStatus() {
+        return responseStatus;
+    }
+
+    public SyncRPCResponse setResponseStatus(ResponseStatus responseStatus) {
+        this.responseStatus = responseStatus;
+        return this;
+    }
+
+    public String getErrorAsStr() {
+        return errorAsStr;
+    }
+
+    public SyncRPCResponse setErrorAsStr(String errorAsStr) {
+        this.errorAsStr = errorAsStr;
+        return this;
+    }
+}
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java b/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
index aabd4a6..15c8442 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
@@ -29,9 +29,10 @@
 import org.apache.airavata.mft.admin.models.AgentInfo;
 import org.apache.airavata.mft.admin.models.TransferCommand;
 import org.apache.airavata.mft.admin.models.TransferState;
+import org.apache.airavata.mft.admin.models.rpc.SyncRPCRequest;
+import org.apache.airavata.mft.admin.models.rpc.SyncRPCResponse;
 import org.apache.airavata.mft.core.ConnectorResolver;
 import org.apache.airavata.mft.core.MetadataCollectorResolver;
-import org.apache.airavata.mft.core.ResourceMetadata;
 import org.apache.airavata.mft.core.api.Connector;
 import org.apache.airavata.mft.core.api.MetadataCollector;
 import org.apache.commons.lang3.exception.ExceptionUtils;
@@ -81,8 +82,11 @@
 
     private final Semaphore mainHold = new Semaphore(0);
 
-    private KVCache messageCache;
-    private ConsulCache.Listener<String, Value> cacheListener;
+    private KVCache transferMessageCache;
+    private KVCache rpcMessageCache;
+
+    private ConsulCache.Listener<String, Value> transferCacheListener;
+    private ConsulCache.Listener<String, Value> rpcCacheListener;
 
     private final ScheduledExecutorService sessionRenewPool = Executors.newSingleThreadScheduledExecutor();
     private long sessionRenewSeconds = 4;
@@ -95,12 +99,39 @@
     private MFTConsulClient mftConsulClient;
 
     public void init() {
-        messageCache = KVCache.newCache(mftConsulClient.getKvClient(), MFTConsulClient.AGENTS_MESSAGE_PATH + agentId );
+        transferMessageCache = KVCache.newCache(mftConsulClient.getKvClient(), MFTConsulClient.AGENTS_TRANSFER_REQUEST_MESSAGE_PATH + agentId );
+        rpcMessageCache = KVCache.newCache(mftConsulClient.getKvClient(), MFTConsulClient.AGENTS_RPC_REQUEST_MESSAGE_PATH + agentId );
     }
 
-    private void acceptRequests() {
+    private void acceptRPCRequests() {
+        rpcCacheListener = newValues -> {
+            newValues.values().forEach(value -> {
+                Optional<String> decodedValue = value.getValueAsString();
+                decodedValue.ifPresent(v -> {
+                    try {
+                        SyncRPCRequest rpcRequest = mapper.readValue(v, SyncRPCRequest.class);
+                        mftConsulClient.sendSyncRPCResponseFromAgent(rpcRequest.getReturnAddress(), processRPCRequest(rpcRequest));
+                    } catch (Throwable e) {
+                        logger.error("Error processing the RPC request {}", value.getKey(), e);
+                    } finally {
+                        mftConsulClient.getKvClient().deleteKey(value.getKey());
+                    }
+                });
+            });
+        };
 
-        cacheListener = newValues -> {
+        rpcMessageCache.addListener(rpcCacheListener);
+        rpcMessageCache.start();
+    }
+
+    private SyncRPCResponse processRPCRequest(SyncRPCRequest request) {
+        // TODO implement using the reflection
+        return null;
+    }
+
+    private void acceptTransferRequests() {
+
+        transferCacheListener = newValues -> {
             newValues.values().forEach(value -> {
                 Optional<String> decodedValue = value.getValueAsString();
                 decodedValue.ifPresent(v -> {
@@ -188,8 +219,8 @@
 
             });
         };
-        messageCache.addListener(cacheListener);
-        messageCache.start();
+        transferMessageCache.addListener(transferCacheListener);
+        transferMessageCache.start();
     }
 
     private boolean connectAgent() throws MFTConsulClientException {
@@ -252,8 +283,12 @@
 
     public void disconnectAgent() {
         sessionRenewPool.shutdown();
-        if (cacheListener != null) {
-            messageCache.removeListener(cacheListener);
+        if (transferCacheListener != null) {
+            transferMessageCache.removeListener(transferCacheListener);
+        }
+
+        if (rpcCacheListener != null) {
+            rpcMessageCache.removeListener(rpcCacheListener);
         }
     }
 
@@ -281,7 +316,8 @@
             }
         }
 
-        acceptRequests();
+        acceptTransferRequests();
+        acceptRPCRequests();
     }
 
     @Override