blob: fba770f60f20c3f3d05e47ebe372e5e3a8333a58 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.airavata.mft.agent;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.orbitz.consul.Consul;
import com.orbitz.consul.ConsulException;
import com.orbitz.consul.KeyValueClient;
import com.orbitz.consul.cache.ConsulCache;
import com.orbitz.consul.cache.KVCache;
import com.orbitz.consul.model.kv.Value;
import com.orbitz.consul.model.session.ImmutableSession;
import com.orbitz.consul.model.session.SessionCreatedResponse;
import org.apache.airavata.mft.admin.models.TransferRequest;
import org.apache.airavata.mft.core.ResourceMetadata;
import org.apache.airavata.mft.core.TransportMediator;
import org.apache.airavata.mft.core.api.Connector;
import org.apache.airavata.mft.core.api.MetadataCollector;
import org.apache.airavata.mft.transport.local.LocalMetadataCollector;
import org.apache.airavata.mft.transport.local.LocalReceiver;
import org.apache.airavata.mft.transport.local.LocalSender;
import org.apache.airavata.mft.transport.scp.SCPMetadataCollector;
import org.apache.airavata.mft.transport.scp.SCPReceiver;
import org.apache.airavata.mft.transport.scp.SCPSender;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class MFTAgent {
private final TransportMediator mediator = new TransportMediator();
private String agentId = "agent0";
private final Semaphore mainHold = new Semaphore(0);
private Consul client;
private KeyValueClient kvClient;
private KVCache messageCache;
private ConsulCache.Listener<String, Value> cacheListener;
private final ScheduledExecutorService sessionRenewPool = Executors.newSingleThreadScheduledExecutor();
private long sessionRenewSeconds = 4;
private long sessionTTLSeconds = 10;
public void init() {
client = Consul.builder().build();
kvClient = client.keyValueClient();
messageCache = KVCache.newCache(kvClient, "mft/agents/messages/" + agentId );
}
private void acceptRequests() {
cacheListener = newValues -> {
// Cache notifies all paths with "foo" the root path
// If you want to watch only "foo" value, you must filter other paths
newValues.values().forEach(value -> {
// Values are encoded in key/value store, decode it if needed
Optional<String> decodedValue = value.getValueAsString();
decodedValue.ifPresent(v -> {
System.out.println(String.format("Value is: %s", v));
ObjectMapper mapper = new ObjectMapper();
try {
TransferRequest request = mapper.readValue(v, TransferRequest.class);
System.out.println("Received request " + request.getTransferId());
Connector inConnector = MFTAgent.this.resolveConnector(request.getSourceType(), "IN");
inConnector.init(request.getSourceId(), request.getSourceToken());
Connector outConnector = MFTAgent.this.resolveConnector(request.getDestinationType(), "OUT");
outConnector.init(request.getDestinationId(), request.getDestinationToken());
MetadataCollector metadataCollector = MFTAgent.this.resolveMetadataCollector(request.getSourceType());
ResourceMetadata metadata = metadataCollector.getGetResourceMetadata(request.getSourceId(), request.getSourceToken());
System.out.println("File size " + metadata.getResourceSize());
String transferId = mediator.transfer(inConnector, outConnector, metadata);
System.out.println("Submitted transfer " + transferId);
} catch (Exception e) {
e.printStackTrace();
} finally {
System.out.println("Deleting key " + value.getKey());
kvClient.deleteKey(value.getKey()); // Due to bug in consul https://github.com/hashicorp/consul/issues/571
}
});
});
};
messageCache.addListener(cacheListener);
messageCache.start();
}
private boolean connectAgent() {
ImmutableSession session = ImmutableSession.builder().name(agentId).behavior("delete").ttl(sessionTTLSeconds + "s").build();
SessionCreatedResponse sessResp = client.sessionClient().createSession(session);
String lockPath = "mft/agent/live/" + agentId;
boolean acquired = kvClient.acquireLock(lockPath, sessResp.getId());
if (acquired) {
sessionRenewPool.scheduleAtFixedRate(() -> {
try {
client.sessionClient().renewSession(sessResp.getId());
} catch (ConsulException e) {
if (e.getCode() == 404) {
stop();
}
e.printStackTrace();
} catch (Exception e) {
try {
boolean status = kvClient.acquireLock(lockPath, sessResp.getId());
if (!status) {
stop();
}
} catch (Exception ex) {
stop();
}
}
}, sessionRenewSeconds, sessionRenewSeconds, TimeUnit.SECONDS);
}
System.out.println("Lock status " + acquired);
return acquired;
}
public void disconnectAgent() {
sessionRenewPool.shutdown();
if (cacheListener != null) {
messageCache.removeListener(cacheListener);
}
}
public void stop() {
System.out.println("Stopping Agent " + agentId);
disconnectAgent();
mainHold.release();
}
public void start() throws Exception {
System.out.println("Starting Agent");
init();
boolean connected = connectAgent();
if (!connected) {
throw new Exception("Failed to connect to the cluster");
}
acceptRequests();
}
public static void main(String args[]) throws Exception {
MFTAgent agent = new MFTAgent();
agent.start();
agent.mainHold.acquire();
System.out.println("Agent exited");
}
// TODO load from reflection to avoid dependencies
private Connector resolveConnector(String type, String direction) {
switch (type) {
case "SCP":
switch (direction) {
case "IN":
return new SCPReceiver();
case "OUT":
return new SCPSender();
}
break;
case "LOCAL":
switch (direction) {
case "IN":
return new LocalReceiver();
case "OUT":
return new LocalSender();
}
}
return null;
}
// TODO load from reflection to avoid dependencies
private MetadataCollector resolveMetadataCollector(String type) {
switch (type) {
case "SCP":
return new SCPMetadataCollector();
case "LOCAL":
return new LocalMetadataCollector();
}
return null;
}
}