Load balancing transfer requests across over agents
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java b/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
index 18e7ba6..9490225 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
@@ -366,6 +366,7 @@
.setId(agentId)
.setHost(agentHost)
.setUser(agentUser)
+ .setSessionId(this.session)
.setSupportedProtocols(Arrays.asList(supportedProtocols.split(",")))
.setLocalStorages(new ArrayList<>()));
}
diff --git a/common/common-clients/src/main/java/org/apache/airavata/mft/admin/MFTConsulClient.java b/common/common-clients/src/main/java/org/apache/airavata/mft/admin/MFTConsulClient.java
index e46fc9b..8fdeaaf 100644
--- a/common/common-clients/src/main/java/org/apache/airavata/mft/admin/MFTConsulClient.java
+++ b/common/common-clients/src/main/java/org/apache/airavata/mft/admin/MFTConsulClient.java
@@ -208,6 +208,27 @@
}
/**
+ * Lists all currently processing transfer id for the given agent
+ *
+ * @param agentInfo
+ * @return
+ * @throws MFTConsulClientException
+ */
+ public List<String> getAgentActiveTransferIds(AgentInfo agentInfo) throws MFTConsulClientException {
+ try {
+ List<String> keys = kvClient.getKeys(MFTConsulClient.AGENTS_SCHEDULED_PATH + agentInfo.getId() + "/" + agentInfo.getSessionId());
+ return keys.stream().map(key -> key.substring(key.lastIndexOf("/") + 1)).collect(Collectors.toList());
+ } catch (ConsulException e) {
+ if (e.getCode() == 404) {
+ return Collections.emptyList();
+ }
+ throw new MFTConsulClientException("Error in fetching active transfers for agent " + agentInfo.getId(), e);
+ } catch (Exception e) {
+ throw new MFTConsulClientException("Error in fetching active transfers for agent " + agentInfo.getId(), e);
+ }
+ }
+
+ /**
* Agents should call this method to submit {@link TransferState}. These status are received by the controller and reorder
* status messages and put in the final status array.
*
@@ -237,12 +258,8 @@
*/
public void saveTransferState(String transferId, TransferState transferState) throws MFTConsulClientException {
try {
- List<TransferState> allStates = getTransferStates(transferId);
- // TODO implement sequence consistency
- allStates.add(transferState);
- String asStr = mapper.writeValueAsString(allStates);
- kvClient.putValue(TRANSFER_STATE_PATH + transferId, asStr);
-
+ String asStr = mapper.writeValueAsString(transferState);
+ kvClient.putValue(TRANSFER_STATE_PATH + transferId + "/" + UUID.randomUUID().toString(), asStr);
logger.info("Saved transfer status " + asStr);
} catch (Exception e) {
@@ -287,15 +304,20 @@
* @throws IOException
*/
public List<TransferState> getTransferStates(String transferId) throws IOException {
- Optional<Value> valueOp = kvClient.getValue(TRANSFER_STATE_PATH + transferId);
- List<TransferState> allStates;
- if (valueOp.isPresent()) {
- String prevStates = valueOp.get().getValueAsString().get();
- allStates = new ArrayList<>(Arrays.asList(mapper.readValue(prevStates, TransferState[].class)));
- } else {
- allStates = new ArrayList<>();
+ List<String> keys = kvClient.getKeys(TRANSFER_STATE_PATH + transferId);
+
+ List<TransferState> allStates = new ArrayList<>();
+
+ for (String key: keys) {
+ Optional<Value> valueOp = kvClient.getValue(key);
+ String stateAsStr = valueOp.get().getValueAsString().get();
+ TransferState transferState = mapper.readValue(stateAsStr, TransferState.class);
+ allStates.add(transferState);
}
- return allStates;
+ List<TransferState> sortedStates = allStates.stream().sorted((o1, o2) ->
+ (o1.getUpdateTimeMils() - o2.getUpdateTimeMils()) < 0 ? -1 :
+ (o1.getUpdateTimeMils() - o2.getUpdateTimeMils()) == 0 ? 0 : 1).collect(Collectors.toList());
+ return sortedStates;
}
public List<AgentInfo> getLiveAgentInfos() throws MFTConsulClientException {
diff --git a/common/common-clients/src/main/java/org/apache/airavata/mft/admin/models/AgentInfo.java b/common/common-clients/src/main/java/org/apache/airavata/mft/admin/models/AgentInfo.java
index db350f7..e015702 100644
--- a/common/common-clients/src/main/java/org/apache/airavata/mft/admin/models/AgentInfo.java
+++ b/common/common-clients/src/main/java/org/apache/airavata/mft/admin/models/AgentInfo.java
@@ -24,6 +24,7 @@
private String host;
private String user;
private boolean sudo;
+ private String sessionId;
private List<String> supportedProtocols;
private List<String> localStorages;
@@ -80,4 +81,13 @@
this.localStorages = localStorages;
return this;
}
+
+ public String getSessionId() {
+ return sessionId;
+ }
+
+ public AgentInfo setSessionId(String sessionId) {
+ this.sessionId = sessionId;
+ return this;
+ }
}
diff --git a/controller/src/main/java/org/apache/airavata/mft/controller/MFTController.java b/controller/src/main/java/org/apache/airavata/mft/controller/MFTController.java
index 2576cee..77fb123 100644
--- a/controller/src/main/java/org/apache/airavata/mft/controller/MFTController.java
+++ b/controller/src/main/java/org/apache/airavata/mft/controller/MFTController.java
@@ -25,6 +25,7 @@
import com.orbitz.consul.model.kv.Value;
import org.apache.airavata.mft.admin.MFTConsulClient;
import org.apache.airavata.mft.admin.MFTConsulClientException;
+import org.apache.airavata.mft.admin.models.AgentInfo;
import org.apache.airavata.mft.admin.models.TransferState;
import org.apache.airavata.mft.api.service.TransferApiRequest;
import org.slf4j.Logger;
@@ -40,10 +41,12 @@
import java.io.IOException;
import java.util.List;
import java.util.Optional;
+import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
@SpringBootApplication()
@ComponentScan(basePackages = {"org.apache.airavata.mft"})
@@ -247,7 +250,19 @@
selectedAgent = possibleAgent.get();
}
} else if (!transferRequest.getAffinityTransfer()){
- selectedAgent = liveAgentIds.get(0);
+ List<Optional<AgentInfo>> agentInfos = liveAgentIds.stream().map(id -> mftConsulClient.getAgentInfo(id)).collect(Collectors.toList());
+ int transferCount = -1;
+ for (Optional<AgentInfo> agentInfo : agentInfos) {
+ if (agentInfo.isPresent()) {
+ if (transferCount == -1) {
+ transferCount = mftConsulClient.getAgentActiveTransferIds(agentInfo.get()).size();
+ selectedAgent = agentInfo.get().getId();
+ } else if (transferCount > mftConsulClient.getAgentActiveTransferIds(agentInfo.get()).size()) {
+ transferCount = mftConsulClient.getAgentActiveTransferIds(agentInfo.get()).size();
+ selectedAgent = agentInfo.get().getId();
+ }
+ }
+ }
}
if (selectedAgent == null) {