blob: 33290928d068ced50f26564ec7801e7cfd4e93b8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.airavata.mft.admin;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.net.HostAndPort;
import com.orbitz.consul.Consul;
import com.orbitz.consul.ConsulException;
import com.orbitz.consul.KeyValueClient;
import com.orbitz.consul.SessionClient;
import com.orbitz.consul.model.kv.Value;
import org.apache.airavata.mft.admin.models.*;
import org.apache.airavata.mft.admin.models.rpc.SyncRPCRequest;
import org.apache.airavata.mft.admin.models.rpc.SyncRPCResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.*;
import java.util.stream.Collectors;
import static com.google.common.net.HostAndPort.*;
/*
mft/agents/messages/{agent-id} -> message
mft/agent/info/{agent-id} -> agent infos
mft/agent/live/{agent-id} -> live agent
mft/transfer/state/{transfer-id} -> transfer state
*/
public class MFTConsulClient {
private static final Logger logger = LoggerFactory.getLogger(MFTConsulClient.class);
private Consul client;
private KeyValueClient kvClient;
private SessionClient sessionClient;
private ObjectMapper mapper = new ObjectMapper();
public static final String TRANSFER_STATE_PATH = "mft/transfer/state/";
public static final String CONTROLLER_TRANSFER_MESSAGE_PATH = "mft/controller/messages/transfers/";
public static final String CONTROLLER_STATE_MESSAGE_PATH = "mft/controller/messages/states/";
public static final String AGENTS_TRANSFER_REQUEST_MESSAGE_PATH = "mft/agents/transfermessages/";
public static final String AGENTS_RPC_REQUEST_MESSAGE_PATH = "mft/agents/rpcmessages/";
public static final String AGENTS_SCHEDULED_PATH = "mft/agents/scheduled/";
public static final String AGENTS_INFO_PATH = "mft/agents/info/";
public static final String LIVE_AGENTS_PATH = "mft/agent/live/";
public static final String TRANSFER_PROCESSED_PATH = "mft/transfer/processed/";
public static final String TRANSFER_PENDING_PATH = "mft/transfer/pending/";
public MFTConsulClient(Map<String, Integer> consulHostPorts) {
List<HostAndPort> hostAndPorts = consulHostPorts.entrySet().stream()
.map(entry -> fromParts(entry.getKey(), entry.getValue()))
.collect(Collectors.toList());
this.client = Consul.builder().withMultipleHostAndPort(hostAndPorts, 100000).build();
this.kvClient = client.keyValueClient();
this.sessionClient = client.sessionClient();
}
public MFTConsulClient(String host, int port) {
this.client = Consul.builder().withHostAndPort(HostAndPort.fromParts(host, port)).build();
this.kvClient = client.keyValueClient();
this.sessionClient = client.sessionClient();
}
public String submitTransfer(TransferRequest transferRequest) throws MFTConsulClientException {
try {
String asStr = mapper.writeValueAsString(transferRequest);
String transferId = UUID.randomUUID().toString();
kvClient.putValue(CONTROLLER_TRANSFER_MESSAGE_PATH + transferId, asStr);
return transferId;
} catch (JsonProcessingException e) {
throw new MFTConsulClientException("Error in serializing transfer request", e);
}
}
/**
* Submits a {@link TransferCommand} to a target agent
*
* @param agentId Agent Id
* @param transferCommand Target transfer command
* @throws MFTConsulClientException If {@link TransferCommand} can not be delivered to consul store
*/
public void commandTransferToAgent(String agentId, TransferCommand transferCommand) throws MFTConsulClientException {
try {
submitTransferStateToProcess(transferCommand.getTransferId(), "controller", new TransferState()
.setState("INITIALIZING")
.setPercentage(0)
.setUpdateTimeMils(System.currentTimeMillis())
.setPublisher("controller")
.setDescription("Initializing the transfer"));
String asString = mapper.writeValueAsString(transferCommand);
kvClient.putValue(AGENTS_TRANSFER_REQUEST_MESSAGE_PATH + agentId + "/" + transferCommand.getTransferId(), asString);
} catch (JsonProcessingException e) {
throw new MFTConsulClientException("Error in serializing transfer request", e);
}
}
public void sendSyncRPCToAgent(String agentId, SyncRPCRequest rpcRequest) throws MFTConsulClientException {
try {
String asString = mapper.writeValueAsString(rpcRequest);
kvClient.putValue(AGENTS_RPC_REQUEST_MESSAGE_PATH + agentId + "/" + rpcRequest.getMessageId(), asString);
} catch (JsonProcessingException e) {
throw new MFTConsulClientException("Error in serializing rpc request", e);
}
}
public void sendSyncRPCResponseFromAgent(String returnAddress, SyncRPCResponse rpcResponse) throws MFTConsulClientException {
try {
String asString = mapper.writeValueAsString(rpcResponse);
kvClient.putValue(returnAddress, asString);
} catch (JsonProcessingException e) {
throw new MFTConsulClientException("Error in serializing rpc response", e);
}
}
/**
* List all currently registered agents.
*
* @return A list of {@link AgentInfo}
*/
public List<AgentInfo> listAgents() {
List<AgentInfo> agents = new ArrayList<>();
List<String> keys = kvClient.getKeys(AGENTS_INFO_PATH);
for (String key : keys) {
Optional<AgentInfo> agentInfo = getAgentInfo(key.substring(key.lastIndexOf("/") + 1));
agentInfo.ifPresent(agents::add);
}
return agents;
}
/**
* Get the {@link AgentInfo} for a given agent id
* @param agentId Agent Id
* @return AgentInfo if such agent is available
*/
public Optional<AgentInfo> getAgentInfo(String agentId) {
Optional<Value> value = kvClient.getValue(AGENTS_INFO_PATH + agentId);
if (value.isPresent()) {
Value absVal = value.get();
if (absVal.getValue().isPresent()) {
String asStr = absVal.getValueAsString().get();
try {
return Optional.of(mapper.readValue(asStr, AgentInfo.class));
} catch (IOException e) {
logger.error("Errored while fetching agent {} info", agentId, e);
}
}
}
return Optional.empty();
}
/**
* Agents are supposed to register themselves in MFT using this method
*
* @param agentInfo {@link AgentInfo} of the source Agents
* @throws MFTConsulClientException If {@link AgentInfo} can not be saved in consul store
*/
public void registerAgent(AgentInfo agentInfo) throws MFTConsulClientException {
try {
String asString = mapper.writeValueAsString(agentInfo);
kvClient.putValue(AGENTS_INFO_PATH + agentInfo.getId(), asString);
} catch (JsonProcessingException e) {
throw new MFTConsulClientException("Error in serializing agent information", e);
}
}
/**
* List all currently live agents
*
* @return A list of live agent ids
* @throws MFTConsulClientException If live agents can not be fetched from consul store
*/
public List<String> getLiveAgentIds() throws MFTConsulClientException {
try {
List<String> keys = kvClient.getKeys(LIVE_AGENTS_PATH);
return keys.stream().map(key -> key.substring(key.lastIndexOf("/") + 1)).collect(Collectors.toList());
} catch (ConsulException e) {
if (e.getCode() == 404) {
return Collections.emptyList();
}
throw new MFTConsulClientException("Error in fetching live agents", e);
} catch (Exception e) {
throw new MFTConsulClientException("Error in fetching live agents", e);
}
}
/**
* Agents should call this method to submit {@link TransferState}. These status are received by the controller and reorder
* status messages and put in the final status array.
*
* @param transferId
* @param agentId
* @param transferState
* @throws MFTConsulClientException
*/
public void submitTransferStateToProcess(String transferId, String agentId, TransferState transferState) throws MFTConsulClientException {
try {
kvClient.putValue(CONTROLLER_STATE_MESSAGE_PATH + transferId + "/" + agentId + "/" + transferState.getUpdateTimeMils(),
mapper.writeValueAsString(transferState));
} catch (Exception e) {
logger.error("Error in submitting transfer status to process for transfer {} and agent {}", transferId, agentId, e);
throw new MFTConsulClientException("Error in submitting transfer status", e);
}
}
/**
* Add the {@link TransferState} to the aggregated state array. This method should only be called by the
* Controller and API server once the transfer is accepted. Agents should NEVER call this method as it would corrupt
* state array when multiple clients are writing at the same time
*
* @param transferId
* @param transferState
* @throws MFTConsulClientException
*/
public void saveTransferState(String transferId, TransferState transferState) throws MFTConsulClientException {
try {
List<TransferState> allStates = getTransferStates(transferId);
// TODO implement sequence consistency
allStates.add(transferState);
String asStr = mapper.writeValueAsString(allStates);
kvClient.putValue(TRANSFER_STATE_PATH + transferId, asStr);
logger.info("Saved transfer status " + asStr);
} catch (Exception e) {
throw new MFTConsulClientException("Error in serializing transfer status", e);
}
}
/**
* Get the latest {@link TransferState} for given transfer id
*
* @param transferId Transfer Id
* @return Optional {@link TransferState } is there is any
* @throws MFTConsulClientException
*/
public Optional<TransferState> getTransferState(String transferId) throws MFTConsulClientException {
try {
List<TransferState> states = getTransferStates(transferId);
Optional<TransferState> lastStatusOp = states.stream().min((o1, o2) -> {
if (o1.getUpdateTimeMils() == o2.getUpdateTimeMils()) {
return 0;
} else {
return o1.getUpdateTimeMils() - o2.getUpdateTimeMils() < 0 ? 1 : -1;
}
});
return lastStatusOp;
} catch (ConsulException e) {
throw new MFTConsulClientException("Error in fetching transfer status " + transferId, e);
} catch (Exception e) {
throw new MFTConsulClientException("Error in fetching transfer status " + transferId, e);
}
}
/**
* Provide all {@link TransferState} for given transfer id
*
* @param transferId Transfer Id
* @return The list of all {@link TransferState}
* @throws IOException
*/
public List<TransferState> getTransferStates(String transferId) throws IOException {
Optional<Value> valueOp = kvClient.getValue(TRANSFER_STATE_PATH + transferId);
List<TransferState> allStates;
if (valueOp.isPresent()) {
String prevStates = valueOp.get().getValueAsString().get();
allStates = new ArrayList<>(Arrays.asList(mapper.readValue(prevStates, TransferState[].class)));
} else {
allStates = new ArrayList<>();
}
return allStates;
}
public List<AgentInfo> getLiveAgentInfos() throws MFTConsulClientException {
List<String> liveAgentIds = getLiveAgentIds();
return liveAgentIds.stream().map(id -> getAgentInfo(id).get()).collect(Collectors.toList());
}
public KeyValueClient getKvClient() {
return kvClient;
}
public SessionClient getSessionClient() {
return sessionClient;
}
}