blob: bf3b15377468451d5c3ef3312be5832bd033f26e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.airavata.mft.controller;
import org.apache.airavata.mft.admin.MFTConsulClient;
import org.apache.airavata.mft.admin.models.TransferState;
import org.apache.airavata.mft.agent.stub.AgentTransferRequest;
import org.apache.airavata.mft.api.service.EndpointPaths;
import org.apache.airavata.mft.api.service.TransferApiRequest;
import org.apache.airavata.mft.controller.spawner.AgentOrchestrator;
import org.apache.airavata.mft.controller.spawner.AgentSpawner;
import org.apache.airavata.mft.controller.spawner.SpawnerSelector;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;
public class TransferDispatcher {
private static final Logger logger = LoggerFactory.getLogger(TransferDispatcher.class);
private AgentOrchestrator agentOrchestrator;
@Autowired
private MFTConsulClient mftConsulClient;
public void init() {
agentOrchestrator = new AgentOrchestrator(this);
agentOrchestrator.init();
}
public void submitTransferToAgent(List<String> filteredAgents, String transferId,
TransferApiRequest transferRequest,
AgentTransferRequest agentTransferRequest, String consulKey) {
try {
if (filteredAgents.isEmpty()) {
mftConsulClient.saveTransferState(transferId, null, new TransferState()
.setUpdateTimeMils(System.currentTimeMillis())
.setState("FAILED").setPercentage(0)
.setPublisher("controller")
.setDescription("No qualifying agent was found to orchestrate the transfer"));
return;
}
mftConsulClient.saveTransferState(transferId,null, new TransferState()
.setState("STARTING")
.setPercentage(0)
.setUpdateTimeMils(System.currentTimeMillis())
.setPublisher("controller")
.setDescription("Initializing the transfer"));
// TODO use a better way to select the right agent
mftConsulClient.commandTransferToAgent(filteredAgents.get(0), transferId, agentTransferRequest);
mftConsulClient.markTransferAsProcessed(transferId, transferRequest);
logger.info("Marked transfer {} as processed", transferId);
} catch (Exception e) {
logger.error("Failed to submit the transfer {} to agent", transferId, e);
try {
mftConsulClient.saveTransferState(transferId, null, new TransferState()
.setUpdateTimeMils(System.currentTimeMillis())
.setState("FAILED").setPercentage(0)
.setPublisher("controller")
.setDescription("Failed to submit the transfer to agent. Error: " + ExceptionUtils.getRootCauseMessage(e)));
} catch (Exception e2) {
// Ignore
logger.warn("Failed to update the failed transfer state for transfer id {}", transferId, e);
}
} finally {
mftConsulClient.getKvClient().deleteKey(consulKey);
}
}
public void handleTransferRequest(String transferId,
TransferApiRequest transferRequest,
AgentTransferRequest.Builder agentTransferRequestTemplate,
String consulKey) throws Exception{
if (this.agentOrchestrator.isAnAgentDeploying(consulKey)) {
logger.info("Ignoring handling transfer id {} as it is already in optimizing stage", transferId);
return;
}
logger.info("Handling transfer id {} with consul key {}", transferId, consulKey);
List<String> liveAgentIds = mftConsulClient.getLiveAgentIds();
Map<String, Integer> targetAgentsMap = transferRequest.getTargetAgentsMap();
List<String> userProvidedAgents = liveAgentIds.stream().filter(targetAgentsMap::containsKey).collect(Collectors.toList());
AgentTransferRequest.Builder agentTransferRequestBuilder = agentTransferRequestTemplate.clone();
agentTransferRequestBuilder.setRequestId(UUID.randomUUID().toString());
for (EndpointPaths ep : transferRequest.getEndpointPathsList()) {
agentTransferRequestBuilder.addEndpointPaths(org.apache.airavata.mft.agent.stub.EndpointPaths.newBuilder()
.setSourcePath(ep.getSourcePath())
.setDestinationPath(ep.getDestinationPath()).buildPartial());
}
AgentTransferRequest agentTransferRequest = agentTransferRequestBuilder.build();
if (transferRequest.getOptimizeTransferPath()) {
boolean agentLaunching = agentOrchestrator.tryLaunchingAgent(
transferId, transferRequest,
agentTransferRequest,
consulKey);
if (!agentLaunching) {
logger.warn("No optimizing path is available. Moving user provided agents");
submitTransferToAgent(userProvidedAgents, transferId,
transferRequest,
agentTransferRequest,
consulKey);
}
} else if (userProvidedAgents.isEmpty()) {
if (liveAgentIds.isEmpty()) {
logger.warn("No live agent available to perform the transfer.");
return;
}
logger.info("No agent selection criteria was provided. Going with the local agent");
// TODO select the local agent
submitTransferToAgent(liveAgentIds, transferId,
transferRequest,
agentTransferRequest,
consulKey);
} else {
submitTransferToAgent(userProvidedAgents, transferId,
transferRequest,
agentTransferRequest,
consulKey);
}
}
public MFTConsulClient getMftConsulClient() {
return mftConsulClient;
}
}