blob: 154fc1e58cc7b723d70d872008c6ff51d7ce713c [file] [log] [blame]
package org.apache.airavata.metascheduler.core.utils;
import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.common.utils.AiravataUtils;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.common.utils.ThriftClientPool;
import org.apache.airavata.messaging.core.MessageContext;
import org.apache.airavata.messaging.core.MessagingFactory;
import org.apache.airavata.messaging.core.Publisher;
import org.apache.airavata.messaging.core.Type;
import org.apache.airavata.model.messaging.event.MessageType;
import org.apache.airavata.model.messaging.event.ProcessIdentifier;
import org.apache.airavata.model.messaging.event.ProcessStatusChangeEvent;
import org.apache.airavata.model.status.ProcessState;
import org.apache.airavata.model.status.ProcessStatus;
import org.apache.airavata.registry.api.RegistryService;
import org.apache.airavata.registry.api.RegistryService.Client;
import org.apache.airavata.registry.api.exception.RegistryServiceException;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.apache.thrift.TException;
/**
* This class contains all utility methods across scheduler sub projects
*/
public class Utils {
private static ThriftClientPool<RegistryService.Client> registryClientPool;
private static Publisher statusPublisher;
/**
* Provides registry client to access databases
*
* @return RegistryService.Client
*/
public static synchronized ThriftClientPool<RegistryService.Client> getRegistryServiceClientPool() {
if (registryClientPool != null) {
return registryClientPool;
}
try {
// final int serverPort = Integer.parseInt(ServerSettings.getRegistryServerPort());
// final String serverHost = ServerSettings.getRegistryServerHost();
registryClientPool = new ThriftClientPool<RegistryService.Client>(
tProtocol -> new RegistryService.Client(tProtocol),
Utils.<RegistryService.Client>createGenericObjectPoolConfig(),
ServerSettings.getRegistryServerHost(),
Integer.parseInt(ServerSettings.getRegistryServerPort()));
return registryClientPool;
} catch (Exception e) {
throw new RuntimeException("Unable to create registry client...", e);
}
}
private static <T> GenericObjectPoolConfig<T> createGenericObjectPoolConfig() {
GenericObjectPoolConfig<T> poolConfig = new GenericObjectPoolConfig<T>();
poolConfig.setMaxTotal(100);
poolConfig.setMinIdle(5);
poolConfig.setBlockWhenExhausted(true);
poolConfig.setTestOnBorrow(true);
poolConfig.setTestWhileIdle(true);
// must set timeBetweenEvictionRunsMillis since eviction doesn't run unless that is positive
poolConfig.setTimeBetweenEvictionRunsMillis(5L * 60L * 1000L);
poolConfig.setNumTestsPerEvictionRun(10);
poolConfig.setMaxWaitMillis(3000);
return poolConfig;
}
public static void saveAndPublishProcessStatus(ProcessState processState, String processId,
String experimentId, String gatewayId)
throws RegistryServiceException, TException, AiravataException {
RegistryService.Client registryClient = null;
try {
registryClient = registryClientPool.getResource();
ProcessStatus processStatus = new ProcessStatus(processState);
processStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
registryClientPool.getResource().addProcessStatus(processStatus, processId);
ProcessIdentifier identifier = new ProcessIdentifier(processId, experimentId, gatewayId);
ProcessStatusChangeEvent processStatusChangeEvent = new ProcessStatusChangeEvent(processState, identifier);
MessageContext msgCtx = new MessageContext(processStatusChangeEvent, MessageType.PROCESS,
AiravataUtils.getId(MessageType.PROCESS.name()), gatewayId);
msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
getStatusPublisher().publish(msgCtx);
} catch (Exception ex){
if (registryClient != null) {
registryClientPool.returnBrokenResource(registryClient);
registryClient = null;
}
} finally {
if (registryClient != null) {
registryClientPool.returnResource(registryClient);
}
}
}
public static void updateProcessStatusAndPublishStatus(ProcessState processState, String processId,
String experimentId, String gatewayId)
throws RegistryServiceException, TException, AiravataException {
RegistryService.Client registryClient = null;
try {
ProcessStatus processStatus = new ProcessStatus(processState);
processStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
registryClientPool.getResource().updateProcessStatus(processStatus, processId);
ProcessIdentifier identifier = new ProcessIdentifier(processId, experimentId, gatewayId);
ProcessStatusChangeEvent processStatusChangeEvent = new ProcessStatusChangeEvent(processState, identifier);
MessageContext msgCtx = new MessageContext(processStatusChangeEvent, MessageType.PROCESS,
AiravataUtils.getId(MessageType.PROCESS.name()), gatewayId);
msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
getStatusPublisher().publish(msgCtx);
} catch (Exception ex) {
if (registryClient != null) {
registryClientPool.returnBrokenResource(registryClient);
registryClient = null;
}
} finally {
if (registryClient != null) {
registryClientPool.returnResource(registryClient);
}
}
}
public static synchronized Publisher getStatusPublisher() throws AiravataException {
if (statusPublisher == null) {
statusPublisher = MessagingFactory.getPublisher(Type.STATUS);
}
return statusPublisher;
}
}