blob: 77fb1231ca1136261536424e8abe1319a63f5b9f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.airavata.mft.controller;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import com.orbitz.consul.cache.ConsulCache;
import com.orbitz.consul.cache.KVCache;
import com.orbitz.consul.model.kv.Value;
import org.apache.airavata.mft.admin.MFTConsulClient;
import org.apache.airavata.mft.admin.MFTConsulClientException;
import org.apache.airavata.mft.admin.models.AgentInfo;
import org.apache.airavata.mft.admin.models.TransferState;
import org.apache.airavata.mft.api.service.TransferApiRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.domain.EntityScan;
import org.springframework.context.annotation.ComponentScan;
import javax.annotation.PreDestroy;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@SpringBootApplication()
@ComponentScan(basePackages = {"org.apache.airavata.mft"})
@EntityScan("org.apache.airavata.mft.api.db.entities")
public class MFTController implements CommandLineRunner {
private static final Logger logger = LoggerFactory.getLogger(MFTController.class);
private final Semaphore mainHold = new Semaphore(0);
private ObjectMapper mapper = new ObjectMapper();
private KVCache messageCache;
private KVCache stateCache;
private KVCache liveAgentCache;
private ScheduledExecutorService pendingMonitor;
@Autowired
private MFTConsulClient mftConsulClient;
private ObjectMapper jsonMapper = new ObjectMapper();
public void init() {
logger.info("Initializing the Controller");
messageCache = KVCache.newCache(mftConsulClient.getKvClient(), MFTConsulClient.CONTROLLER_TRANSFER_MESSAGE_PATH);
stateCache = KVCache.newCache(mftConsulClient.getKvClient(), MFTConsulClient.CONTROLLER_STATE_MESSAGE_PATH);
liveAgentCache = KVCache.newCache(mftConsulClient.getKvClient(), MFTConsulClient.LIVE_AGENTS_PATH);
pendingMonitor = Executors.newSingleThreadScheduledExecutor();
pendingMonitor.scheduleWithFixedDelay(this::processPending, 2000, 4000, TimeUnit.MILLISECONDS);
logger.info("Controller initialized");
}
@PreDestroy
public void destroy() {
logger.info("Destroying the Controller");
try {
if (this.pendingMonitor != null) {
this.pendingMonitor.shutdown();
}
} catch (Exception e) {
logger.warn("Errored while shutting down the pending monitor", e);
}
logger.info("Controller destroyed");
}
/**
* Accepts transfer requests coming from the API and put it into the pending queue
*/
private void acceptRequests() {
// Due to bug in consul https://github.com/hashicorp/consul/issues/571
ConsulCache.Listener<String, Value> messageCacheListener = newValues -> newValues.forEach((key, value) -> {
String transferId = key.substring(key.lastIndexOf("/") + 1);
Optional<String> decodedValue = value.getValueAsString();
decodedValue.ifPresent(v -> {
logger.info("Received transfer request : {} with id {}", v, transferId);
TransferApiRequest transferRequest;
try {
TransferApiRequest.Builder builder = TransferApiRequest.newBuilder();
JsonFormat.parser().merge(value.getValueAsString().get(), builder);
transferRequest = builder.build();
} catch (IOException e) {
logger.error("Failed to parse the transfer request {}", v, e);
return;
}
try {
markAsPending(transferId, transferRequest);
logger.info("Marked transfer {} as pending", transferId);
} catch (Exception e) {
logger.error("Failed to store transfer request {}", transferId, e);
} finally {
logger.info("Deleting key " + value.getKey());
mftConsulClient.getKvClient().deleteKey(value.getKey()); // Due to bug in consul https://github.com/hashicorp/consul/issues/571
}
});
});
messageCache.addListener(messageCacheListener);
messageCache.start();
}
private void acceptStates() {
ConsulCache.Listener<String, Value> stateCacheListener = newValues -> newValues.forEach((key, value) -> {
try {
if (value.getValueAsString().isPresent()) {
String valAsStr = value.getValueAsString().get();
logger.info("Received state Key {} val {}", key, valAsStr);
String parts[] = key.split("/");
if (parts.length != 3) {
logger.error("Invalid status key {}", key);
}
String transferId = parts[0];
String agentId = parts[1];
String time = parts[2];
TransferState transferState = mapper.readValue(valAsStr, TransferState.class);
mftConsulClient.saveTransferState(transferId, transferState);
}
} catch (Exception e) {
logger.error("Error while processing the state message", e);
} finally {
logger.info("Deleting key " + value.getKey());
mftConsulClient.getKvClient().deleteKey(value.getKey()); // Due to bug in consul https://github.com/hashicorp/consul/issues/571
}
});
stateCache.addListener(stateCacheListener);
stateCache.start();
}
private void acceptLiveAgents() {
ConsulCache.Listener<String, Value> liveAgentCacheListener = newValues -> newValues.forEach((agentId, value) -> {
try {
Optional<String> session = mftConsulClient.getKvClient().getSession(value.getKey());
if (session.isPresent()) {
String sessionId = session.get();
logger.info("Agent connected in path {} agent id {} session {}", value.getKey(), agentId, sessionId);
List<Value> scheduledTransfers = mftConsulClient.getKvClient().getValues(MFTConsulClient.AGENTS_SCHEDULED_PATH + agentId);
for (Value v: scheduledTransfers) {
logger.info("Found scheduled key {}", v.getKey());
try {
// Key = AGENTS_SCHEDULED_PATH agent id / session id / transfer id
String[] parts = v.getKey().split("/");
// Make sure right amount of data is available
if (parts.length == MFTConsulClient.AGENTS_SCHEDULED_PATH.split("/").length + 3) {
String scheduledSession = parts[parts.length - 2];
String scheduledTransfer = parts[parts.length - 1];
logger.info("Scheduled session {} transfer {}", scheduledSession, scheduledTransfer);
if (!scheduledSession.equals(sessionId)) {
logger.info("Old transfer session found. Re scheduling to agent {}", agentId);
TransferApiRequest.Builder builder = TransferApiRequest.newBuilder();
JsonFormat.parser().merge(v.getValueAsString().get(), builder);
mftConsulClient.commandTransferToAgent(agentId, scheduledTransfer, builder.build());
// Delete the key as it is already processed
mftConsulClient.getKvClient().deleteKey(v.getKey());
} else {
logger.info("Session {} is already active so skipping scheduled transfer", scheduledSession);
}
} else {
logger.warn("Invalid schedule key {}", v.getKey());
}
} catch (Exception e) {
logger.error("Failed to process schedule key {}", v.getKey());
}
}
}
} catch (Exception e) {
logger.error("Errored while processing live agent cache key {}", agentId, e);
} finally {
}
});
liveAgentCache.addListener(liveAgentCacheListener);
liveAgentCache.start();
}
private void markAsProcessed(String transferId, TransferApiRequest transferRequest) throws InvalidProtocolBufferException {
mftConsulClient.getKvClient().putValue(MFTConsulClient.TRANSFER_PROCESSED_PATH + transferId,
JsonFormat.printer().print(transferRequest));
}
private void markAsPending(String transferId, TransferApiRequest transferRequest) throws InvalidProtocolBufferException {
mftConsulClient.getKvClient().putValue(MFTConsulClient.TRANSFER_PENDING_PATH + transferId,
JsonFormat.printer().print(transferRequest));
}
private Optional<String> selectAgent(String transferId, TransferApiRequest transferRequest) throws MFTConsulClientException {
List<String> liveAgentIds = mftConsulClient.getLiveAgentIds();
if (liveAgentIds.isEmpty()) {
logger.error("Live agents are not available. Skipping for now");
return Optional.empty();
}
if (logger.isDebugEnabled()) {
logger.debug("Processing transfer request {} with target agents {}", transferId, transferRequest.getTargetAgents());
logger.debug("Printing live agents");
liveAgentIds.forEach(a -> logger.debug("Agent {} is live", a));
}
String selectedAgent = null;
if (transferRequest.getTargetAgents() != null && !transferRequest.getTargetAgents().isEmpty()) {
Optional<String> possibleAgent = transferRequest.getTargetAgents().keySet()
.stream().filter(req -> liveAgentIds.stream().anyMatch(agent -> agent.equals(req))).findFirst();
if (possibleAgent.isPresent()) {
selectedAgent = possibleAgent.get();
}
} else if (!transferRequest.getAffinityTransfer()){
List<Optional<AgentInfo>> agentInfos = liveAgentIds.stream().map(id -> mftConsulClient.getAgentInfo(id)).collect(Collectors.toList());
int transferCount = -1;
for (Optional<AgentInfo> agentInfo : agentInfos) {
if (agentInfo.isPresent()) {
if (transferCount == -1) {
transferCount = mftConsulClient.getAgentActiveTransferIds(agentInfo.get()).size();
selectedAgent = agentInfo.get().getId();
} else if (transferCount > mftConsulClient.getAgentActiveTransferIds(agentInfo.get()).size()) {
transferCount = mftConsulClient.getAgentActiveTransferIds(agentInfo.get()).size();
selectedAgent = agentInfo.get().getId();
}
}
}
}
if (selectedAgent == null) {
logger.warn("Couldn't find an Agent that meet transfer requirements");
return Optional.empty();
}
return Optional.of(selectedAgent);
}
/**
* Fetch pending transfer requests and check for available agents. If an agent is found, forwards the transfer request
* to that agent and mark transfer state as processed.
*/
private void processPending() {
List<Value> values = mftConsulClient.getKvClient().getValues(MFTConsulClient.TRANSFER_PENDING_PATH);
logger.debug("Scanning pending transfers");
values.forEach(value -> {
if (value.getValueAsString().isPresent()) {
logger.debug("Pending " + value.getKey() + " : " + value.getValueAsString().get());
try {
TransferApiRequest.Builder builder = TransferApiRequest.newBuilder();
JsonFormat.parser().merge(value.getValueAsString().get(), builder);
TransferApiRequest transferRequest = builder.build();
String transferId = value.getKey().substring(value.getKey().lastIndexOf("/") + 1);
Optional<String> agent = selectAgent(transferId, transferRequest);
if (agent.isPresent()) {
logger.info("Found agent {} to initiate the transfer {}", agent, transferId);
mftConsulClient.commandTransferToAgent(agent.get(), transferId, transferRequest);
markAsProcessed(transferId, transferRequest);
mftConsulClient.getKvClient().deleteKey(value.getKey());
logger.info("Marked transfer {} as processed", transferId);
}
} catch (Exception e) {
logger.error("Failed to process pending transfer in key {}", value.getKey(), e);
}
}
});
}
@Override
public void run(String... args) throws Exception {
init();
acceptRequests();
acceptStates();
acceptLiveAgents();
mainHold.acquire();
}
public static void main(String args[]) {
SpringApplication.run(MFTController.class);
}
}