blob: 2906f0964c467bc240a97a2e2f34a9564a9dce54 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.airavata.mft.agent.ingress;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.InvalidProtocolBufferException;
import com.orbitz.consul.ConsulException;
import com.orbitz.consul.cache.ConsulCache;
import com.orbitz.consul.cache.KVCache;
import com.orbitz.consul.model.kv.Value;
import com.orbitz.consul.model.session.ImmutableSession;
import com.orbitz.consul.model.session.SessionCreatedResponse;
import com.orbitz.consul.option.PutOptions;
import org.apache.airavata.mft.admin.MFTConsulClient;
import org.apache.airavata.mft.admin.MFTConsulClientException;
import org.apache.airavata.mft.admin.models.AgentInfo;
import org.apache.airavata.mft.admin.models.rpc.SyncRPCRequest;
import org.apache.airavata.mft.admin.models.rpc.SyncRPCResponse;
import org.apache.airavata.mft.agent.AgentUtil;
import org.apache.airavata.mft.agent.TransferOrchestrator;
import org.apache.airavata.mft.agent.rpc.RPCParser;
import org.apache.airavata.mft.agent.stub.AgentTransferRequest;
import org.apache.airavata.mft.agent.transport.TransportClassLoaderCache;
import org.apache.airavata.mft.api.service.TransferApiRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ConsulIngressHandler {
private static final Logger logger = LoggerFactory.getLogger(ConsulIngressHandler.class);
private KVCache transferMessageCache;
private KVCache rpcMessageCache;
private ConsulCache.Listener<String, Value> transferCacheListener;
private ConsulCache.Listener<String, Value> rpcCacheListener;
@Autowired
private MFTConsulClient mftConsulClient;
@Autowired
private RPCParser rpcParser;
private String session;
private final ScheduledExecutorService sessionRenewPool = Executors.newSingleThreadScheduledExecutor();
private long sessionRenewSeconds = 4;
private long sessionTTLSeconds = 10;
private ObjectMapper mapper = new ObjectMapper();
@org.springframework.beans.factory.annotation.Value("${agent.id}")
private String agentId;
@org.springframework.beans.factory.annotation.Value("${agent.secret}")
private String agentSecret;
@org.springframework.beans.factory.annotation.Value("${agent.host}")
private String agentHost;
@org.springframework.beans.factory.annotation.Value("${agent.user}")
private String agentUser;
@org.springframework.beans.factory.annotation.Value("${agent.supported.protocols}")
private String supportedProtocols;
@Autowired
TransportClassLoaderCache transportCache;
@Autowired
private TransferOrchestrator transferOrchestrator;
private void acceptTransferRequests() {
transferCacheListener = newValues -> {
newValues.values().forEach(value -> {
Optional<byte[]> decodedValue = value.getValueAsBytes();
String[] partsOfKey = value.getKey().split("/");
String agentTransferRequestId = partsOfKey[partsOfKey.length - 1];
String transferId = partsOfKey[partsOfKey.length - 2];
decodedValue.ifPresent(reqBytes -> {
mftConsulClient.getKvClient().deleteKey(value.getKey());
AgentTransferRequest.Builder builder = null;
try {
builder = AgentTransferRequest.newBuilder().mergeFrom(reqBytes);
} catch (InvalidProtocolBufferException e) {
logger.error("Failed to merge transfer request {} for transfer {} from bytes", agentTransferRequestId, transferId, e);
return;
}
AgentTransferRequest request = builder.build();
transferOrchestrator.submitTransferToProcess(transferId, request, transportCache,
AgentUtil.throwingBiConsumerWrapper((endPointPath, st) -> {
mftConsulClient.submitFileTransferStateToProcess(transferId, request.getRequestId(), endPointPath, agentId, st.setPublisher(agentId));
}),
AgentUtil.throwingBiConsumerWrapper((endpointPath, create) -> {
if (create) {
mftConsulClient.createEndpointHookForAgent(agentId, session, transferId, agentTransferRequestId, endpointPath);
} else {
mftConsulClient.deleteEndpointHookForAgent(agentId, session, transferId, agentTransferRequestId, endpointPath);
}
}));
});
});
};
transferMessageCache.addListener(transferCacheListener);
transferMessageCache.start();
}
private void acceptRPCRequests() {
rpcCacheListener = newValues -> {
newValues.values().forEach(value -> {
Optional<String> decodedValue = value.getValueAsString();
decodedValue.ifPresent(v -> {
try {
SyncRPCRequest rpcRequest = mapper.readValue(v, SyncRPCRequest.class);
SyncRPCResponse syncRPCResponse = rpcParser.processRPCRequest(rpcRequest, transportCache);
mftConsulClient.sendSyncRPCResponseFromAgent(rpcRequest.getReturnAddress(), syncRPCResponse);
} catch (Throwable e) {
logger.error("Error processing the RPC request {}", value.getKey(), e);
} finally {
mftConsulClient.getKvClient().deleteKey(value.getKey());
}
});
});
};
rpcMessageCache.addListener(rpcCacheListener);
rpcMessageCache.start();
}
private boolean connectAgent() throws MFTConsulClientException {
final ImmutableSession session = ImmutableSession.builder()
.name(agentId)
.behavior("delete")
.ttl(sessionTTLSeconds + "s").build();
final SessionCreatedResponse sessResp = mftConsulClient.getSessionClient().createSession(session);
final String lockPath = MFTConsulClient.LIVE_AGENTS_PATH + agentId;
boolean acquired = mftConsulClient.getKvClient().acquireLock(lockPath, sessResp.getId());
if (acquired) {
this.session = sessResp.getId();
sessionRenewPool.scheduleAtFixedRate(() -> {
try {
mftConsulClient.getSessionClient().renewSession(sessResp.getId());
} catch (ConsulException e) {
if (e.getCode() == 404) {
logger.error("Can not renew session as it is expired");
destroy();
}
logger.warn("Errored while renewing the session", e);
try {
boolean status = mftConsulClient.getKvClient().acquireLock(lockPath, sessResp.getId());
if (!status) {
logger.error("Can not renew session as it is expired");
destroy();
}
} catch (Exception ex) {
logger.error("Can not renew session as it is expired");
destroy();
}
} catch (Exception e) {
try {
boolean status = mftConsulClient.getKvClient().acquireLock(lockPath, sessResp.getId());
if (!status) {
logger.error("Can not renew session as it is expired");
destroy();
}
} catch (Exception ex) {
logger.error("Can not renew session as it is expired");
destroy();
}
}
}, sessionRenewSeconds, sessionRenewSeconds, TimeUnit.SECONDS);
this.mftConsulClient.registerAgent(new AgentInfo()
.setId(agentId)
.setHost(agentHost)
.setUser(agentUser)
.setSessionId(this.session)
.setSupportedProtocols(Arrays.asList(supportedProtocols.split(",")))
.setLocalStorages(new ArrayList<>()));
}
logger.info("Acquired lock " + acquired);
return acquired;
}
@PostConstruct
public void init() throws Exception {
transferMessageCache = KVCache.newCache(mftConsulClient.getKvClient(), MFTConsulClient.AGENTS_TRANSFER_REQUEST_MESSAGE_PATH + agentId, 9);
rpcMessageCache = KVCache.newCache(mftConsulClient.getKvClient(), MFTConsulClient.AGENTS_RPC_REQUEST_MESSAGE_PATH + agentId, 9);
boolean connected = false;
int connectionRetries = 0;
while (!connected) {
connected = connectAgent();
if (connected) {
logger.info("Successfully connected to consul with session id {}", session);
} else {
logger.info("Retrying to connect to consul");
Thread.sleep(5000);
connectionRetries++;
if (connectionRetries > 10) {
throw new Exception("Failed to connect to the cluster");
}
}
}
acceptTransferRequests();
acceptRPCRequests();
logger.info("Consul ingress handler initialized");
}
@PreDestroy
public void destroy() {
if (!sessionRenewPool.isShutdown())
sessionRenewPool.shutdown();
if (transferCacheListener != null) {
transferMessageCache.removeListener(transferCacheListener);
}
if (rpcCacheListener != null) {
rpcMessageCache.removeListener(rpcCacheListener);
}
logger.info("Consul ingress handler turned off");
}
}