Load balancing transfer requests across over agents
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java b/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
index 18e7ba6..9490225 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
@@ -366,6 +366,7 @@
                     .setId(agentId)
                     .setHost(agentHost)
                     .setUser(agentUser)
+                    .setSessionId(this.session)
                     .setSupportedProtocols(Arrays.asList(supportedProtocols.split(",")))
                     .setLocalStorages(new ArrayList<>()));
         }
diff --git a/common/common-clients/src/main/java/org/apache/airavata/mft/admin/MFTConsulClient.java b/common/common-clients/src/main/java/org/apache/airavata/mft/admin/MFTConsulClient.java
index e46fc9b..8fdeaaf 100644
--- a/common/common-clients/src/main/java/org/apache/airavata/mft/admin/MFTConsulClient.java
+++ b/common/common-clients/src/main/java/org/apache/airavata/mft/admin/MFTConsulClient.java
@@ -208,6 +208,27 @@
     }
 
     /**
+     * Lists all currently processing transfer id for the given agent
+     *
+     * @param agentInfo
+     * @return
+     * @throws MFTConsulClientException
+     */
+    public List<String> getAgentActiveTransferIds(AgentInfo agentInfo) throws MFTConsulClientException {
+        try {
+            List<String> keys = kvClient.getKeys(MFTConsulClient.AGENTS_SCHEDULED_PATH + agentInfo.getId() + "/" + agentInfo.getSessionId());
+            return keys.stream().map(key -> key.substring(key.lastIndexOf("/") + 1)).collect(Collectors.toList());
+        } catch (ConsulException e) {
+            if (e.getCode() == 404) {
+                return Collections.emptyList();
+            }
+            throw new MFTConsulClientException("Error in fetching active transfers for agent " + agentInfo.getId(), e);
+        } catch (Exception e) {
+            throw new MFTConsulClientException("Error in fetching active transfers for agent " + agentInfo.getId(), e);
+        }
+    }
+
+    /**
      * Agents should call this method to submit {@link TransferState}. These status are received by the controller and reorder
      * status messages and put in the final status array.
      *
@@ -237,12 +258,8 @@
      */
     public void saveTransferState(String transferId, TransferState transferState) throws MFTConsulClientException {
         try {
-            List<TransferState> allStates = getTransferStates(transferId);
-            // TODO implement sequence consistency
-            allStates.add(transferState);
-            String asStr = mapper.writeValueAsString(allStates);
-            kvClient.putValue(TRANSFER_STATE_PATH + transferId, asStr);
-
+            String asStr = mapper.writeValueAsString(transferState);
+            kvClient.putValue(TRANSFER_STATE_PATH + transferId + "/" + UUID.randomUUID().toString(), asStr);
             logger.info("Saved transfer status " + asStr);
 
         } catch (Exception e) {
@@ -287,15 +304,20 @@
      * @throws IOException
      */
     public List<TransferState> getTransferStates(String transferId) throws IOException {
-        Optional<Value> valueOp = kvClient.getValue(TRANSFER_STATE_PATH + transferId);
-        List<TransferState> allStates;
-        if (valueOp.isPresent()) {
-            String prevStates = valueOp.get().getValueAsString().get();
-            allStates = new ArrayList<>(Arrays.asList(mapper.readValue(prevStates, TransferState[].class)));
-        } else {
-            allStates = new ArrayList<>();
+        List<String> keys = kvClient.getKeys(TRANSFER_STATE_PATH + transferId);
+
+        List<TransferState> allStates = new ArrayList<>();
+
+        for (String key: keys) {
+            Optional<Value> valueOp = kvClient.getValue(key);
+            String stateAsStr = valueOp.get().getValueAsString().get();
+            TransferState transferState = mapper.readValue(stateAsStr, TransferState.class);
+            allStates.add(transferState);
         }
-        return allStates;
+        List<TransferState> sortedStates = allStates.stream().sorted((o1, o2) ->
+                (o1.getUpdateTimeMils() - o2.getUpdateTimeMils()) < 0 ? -1 :
+                (o1.getUpdateTimeMils() - o2.getUpdateTimeMils()) == 0 ? 0 : 1).collect(Collectors.toList());
+        return sortedStates;
     }
 
     public List<AgentInfo> getLiveAgentInfos() throws MFTConsulClientException {
diff --git a/common/common-clients/src/main/java/org/apache/airavata/mft/admin/models/AgentInfo.java b/common/common-clients/src/main/java/org/apache/airavata/mft/admin/models/AgentInfo.java
index db350f7..e015702 100644
--- a/common/common-clients/src/main/java/org/apache/airavata/mft/admin/models/AgentInfo.java
+++ b/common/common-clients/src/main/java/org/apache/airavata/mft/admin/models/AgentInfo.java
@@ -24,6 +24,7 @@
     private String host;
     private String user;
     private boolean sudo;
+    private String sessionId;
     private List<String> supportedProtocols;
     private List<String> localStorages;
 
@@ -80,4 +81,13 @@
         this.localStorages = localStorages;
         return this;
     }
+
+    public String getSessionId() {
+        return sessionId;
+    }
+
+    public AgentInfo setSessionId(String sessionId) {
+        this.sessionId = sessionId;
+        return this;
+    }
 }
diff --git a/controller/src/main/java/org/apache/airavata/mft/controller/MFTController.java b/controller/src/main/java/org/apache/airavata/mft/controller/MFTController.java
index 2576cee..77fb123 100644
--- a/controller/src/main/java/org/apache/airavata/mft/controller/MFTController.java
+++ b/controller/src/main/java/org/apache/airavata/mft/controller/MFTController.java
@@ -25,6 +25,7 @@
 import com.orbitz.consul.model.kv.Value;
 import org.apache.airavata.mft.admin.MFTConsulClient;
 import org.apache.airavata.mft.admin.MFTConsulClientException;
+import org.apache.airavata.mft.admin.models.AgentInfo;
 import org.apache.airavata.mft.admin.models.TransferState;
 import org.apache.airavata.mft.api.service.TransferApiRequest;
 import org.slf4j.Logger;
@@ -40,10 +41,12 @@
 import java.io.IOException;
 import java.util.List;
 import java.util.Optional;
+import java.util.Random;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 @SpringBootApplication()
 @ComponentScan(basePackages = {"org.apache.airavata.mft"})
@@ -247,7 +250,19 @@
                 selectedAgent = possibleAgent.get();
             }
         } else if (!transferRequest.getAffinityTransfer()){
-            selectedAgent = liveAgentIds.get(0);
+            List<Optional<AgentInfo>> agentInfos = liveAgentIds.stream().map(id -> mftConsulClient.getAgentInfo(id)).collect(Collectors.toList());
+            int transferCount = -1;
+            for (Optional<AgentInfo> agentInfo : agentInfos) {
+                if (agentInfo.isPresent()) {
+                    if (transferCount == -1) {
+                        transferCount = mftConsulClient.getAgentActiveTransferIds(agentInfo.get()).size();
+                        selectedAgent = agentInfo.get().getId();
+                    } else if (transferCount > mftConsulClient.getAgentActiveTransferIds(agentInfo.get()).size()) {
+                        transferCount = mftConsulClient.getAgentActiveTransferIds(agentInfo.get()).size();
+                        selectedAgent = agentInfo.get().getId();
+                    }
+                }
+            }
         }
 
         if (selectedAgent == null) {