Merge pull request #414 from isururanawaka/metaschedular
bug fix:return resource to pool
diff --git a/modules/airavata-metascheduler/metascheduler-core/src/main/java/org/apache/airavata/metascheduler/core/utils/Utils.java b/modules/airavata-metascheduler/metascheduler-core/src/main/java/org/apache/airavata/metascheduler/core/utils/Utils.java
index 466abae..90ff631 100644
--- a/modules/airavata-metascheduler/metascheduler-core/src/main/java/org/apache/airavata/metascheduler/core/utils/Utils.java
+++ b/modules/airavata-metascheduler/metascheduler-core/src/main/java/org/apache/airavata/metascheduler/core/utils/Utils.java
@@ -71,33 +71,56 @@
public static void saveAndPublishProcessStatus(ProcessState processState, String processId,
String experimentId, String gatewayId)
throws RegistryServiceException, TException, AiravataException {
+ RegistryService.Client registryClient = null;
+ try {
+ registryClient = registryClientPool.getResource();
+ ProcessStatus processStatus = new ProcessStatus(processState);
+ processStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
- ProcessStatus processStatus = new ProcessStatus(processState);
- processStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
-
- registryClientPool.getResource().addProcessStatus(processStatus, processId);
- ProcessIdentifier identifier = new ProcessIdentifier(processId, experimentId, gatewayId);
- ProcessStatusChangeEvent processStatusChangeEvent = new ProcessStatusChangeEvent(processState, identifier);
- MessageContext msgCtx = new MessageContext(processStatusChangeEvent, MessageType.PROCESS,
- AiravataUtils.getId(MessageType.PROCESS.name()), gatewayId);
- msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
- getStatusPublisher().publish(msgCtx);
+ registryClientPool.getResource().addProcessStatus(processStatus, processId);
+ ProcessIdentifier identifier = new ProcessIdentifier(processId, experimentId, gatewayId);
+ ProcessStatusChangeEvent processStatusChangeEvent = new ProcessStatusChangeEvent(processState, identifier);
+ MessageContext msgCtx = new MessageContext(processStatusChangeEvent, MessageType.PROCESS,
+ AiravataUtils.getId(MessageType.PROCESS.name()), gatewayId);
+ msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+ getStatusPublisher().publish(msgCtx);
+ } catch (Exception ex){
+ if (registryClient != null) {
+ registryClientPool.returnBrokenResource(registryClient);
+ registryClient = null;
+ }
+ } finally {
+ if (registryClient != null) {
+ registryClientPool.returnResource(registryClient);
+ }
+ }
}
public static void updateProcessStatusAndPublishStatus(ProcessState processState, String processId,
String experimentId, String gatewayId)
throws RegistryServiceException, TException, AiravataException {
+ RegistryService.Client registryClient = null;
+ try {
+ ProcessStatus processStatus = new ProcessStatus(processState);
+ processStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
- ProcessStatus processStatus = new ProcessStatus(processState);
- processStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
-
- registryClientPool.getResource().updateProcessStatus(processStatus, processId);
- ProcessIdentifier identifier = new ProcessIdentifier(processId, experimentId, gatewayId);
- ProcessStatusChangeEvent processStatusChangeEvent = new ProcessStatusChangeEvent(processState, identifier);
- MessageContext msgCtx = new MessageContext(processStatusChangeEvent, MessageType.PROCESS,
- AiravataUtils.getId(MessageType.PROCESS.name()), gatewayId);
- msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
- getStatusPublisher().publish(msgCtx);
+ registryClientPool.getResource().updateProcessStatus(processStatus, processId);
+ ProcessIdentifier identifier = new ProcessIdentifier(processId, experimentId, gatewayId);
+ ProcessStatusChangeEvent processStatusChangeEvent = new ProcessStatusChangeEvent(processState, identifier);
+ MessageContext msgCtx = new MessageContext(processStatusChangeEvent, MessageType.PROCESS,
+ AiravataUtils.getId(MessageType.PROCESS.name()), gatewayId);
+ msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+ getStatusPublisher().publish(msgCtx);
+ } catch (Exception ex) {
+ if (registryClient != null) {
+ registryClientPool.returnBrokenResource(registryClient);
+ registryClient = null;
+ }
+ } finally {
+ if (registryClient != null) {
+ registryClientPool.returnResource(registryClient);
+ }
+ }
}
public static synchronized Publisher getStatusPublisher() throws AiravataException {
diff --git a/modules/computer-resource-monitoring-service/src/main/java/org/apache/airavata/compute/resource/monitoring/ComputationalResourceMonitoringService.java b/modules/computer-resource-monitoring-service/src/main/java/org/apache/airavata/compute/resource/monitoring/ComputationalResourceMonitoringService.java
index 015c3b7..631542f 100644
--- a/modules/computer-resource-monitoring-service/src/main/java/org/apache/airavata/compute/resource/monitoring/ComputationalResourceMonitoringService.java
+++ b/modules/computer-resource-monitoring-service/src/main/java/org/apache/airavata/compute/resource/monitoring/ComputationalResourceMonitoringService.java
@@ -12,6 +12,7 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
+import java.util.stream.Collectors;
/**
* Computational Resource Monitoring Service
@@ -89,7 +90,9 @@
@Override
public void stop() throws Exception {
- scheduler.unscheduleJobs(new ArrayList(jobTriggerMap.values()));
+ scheduler.unscheduleJobs(jobTriggerMap.values().stream().map(trigger -> {
+ return trigger.getKey();
+ }).collect(Collectors.toList()));
}
@Override