Merge pull request #414 from isururanawaka/metaschedular

bug fix:return resource to pool
diff --git a/modules/airavata-metascheduler/metascheduler-core/src/main/java/org/apache/airavata/metascheduler/core/utils/Utils.java b/modules/airavata-metascheduler/metascheduler-core/src/main/java/org/apache/airavata/metascheduler/core/utils/Utils.java
index 466abae..90ff631 100644
--- a/modules/airavata-metascheduler/metascheduler-core/src/main/java/org/apache/airavata/metascheduler/core/utils/Utils.java
+++ b/modules/airavata-metascheduler/metascheduler-core/src/main/java/org/apache/airavata/metascheduler/core/utils/Utils.java
@@ -71,33 +71,56 @@
     public static void saveAndPublishProcessStatus(ProcessState processState, String processId,
                                                    String experimentId, String gatewayId)
             throws RegistryServiceException, TException, AiravataException {
+         RegistryService.Client registryClient = null;
+        try {
+            registryClient = registryClientPool.getResource();
+            ProcessStatus processStatus = new ProcessStatus(processState);
+            processStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
 
-        ProcessStatus processStatus = new ProcessStatus(processState);
-        processStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
-
-        registryClientPool.getResource().addProcessStatus(processStatus, processId);
-        ProcessIdentifier identifier = new ProcessIdentifier(processId, experimentId, gatewayId);
-        ProcessStatusChangeEvent processStatusChangeEvent = new ProcessStatusChangeEvent(processState, identifier);
-        MessageContext msgCtx = new MessageContext(processStatusChangeEvent, MessageType.PROCESS,
-                AiravataUtils.getId(MessageType.PROCESS.name()), gatewayId);
-        msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
-        getStatusPublisher().publish(msgCtx);
+            registryClientPool.getResource().addProcessStatus(processStatus, processId);
+            ProcessIdentifier identifier = new ProcessIdentifier(processId, experimentId, gatewayId);
+            ProcessStatusChangeEvent processStatusChangeEvent = new ProcessStatusChangeEvent(processState, identifier);
+            MessageContext msgCtx = new MessageContext(processStatusChangeEvent, MessageType.PROCESS,
+                    AiravataUtils.getId(MessageType.PROCESS.name()), gatewayId);
+            msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+            getStatusPublisher().publish(msgCtx);
+        } catch (Exception ex){
+            if (registryClient != null) {
+                registryClientPool.returnBrokenResource(registryClient);
+                registryClient = null;
+            }
+        } finally {
+            if (registryClient != null) {
+                registryClientPool.returnResource(registryClient);
+            }
+        }
     }
 
     public static void updateProcessStatusAndPublishStatus(ProcessState processState, String processId,
                                                    String experimentId, String gatewayId)
             throws RegistryServiceException, TException, AiravataException {
+        RegistryService.Client registryClient = null;
+        try {
+            ProcessStatus processStatus = new ProcessStatus(processState);
+            processStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
 
-        ProcessStatus processStatus = new ProcessStatus(processState);
-        processStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
-
-        registryClientPool.getResource().updateProcessStatus(processStatus, processId);
-        ProcessIdentifier identifier = new ProcessIdentifier(processId, experimentId, gatewayId);
-        ProcessStatusChangeEvent processStatusChangeEvent = new ProcessStatusChangeEvent(processState, identifier);
-        MessageContext msgCtx = new MessageContext(processStatusChangeEvent, MessageType.PROCESS,
-                AiravataUtils.getId(MessageType.PROCESS.name()), gatewayId);
-        msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
-        getStatusPublisher().publish(msgCtx);
+            registryClientPool.getResource().updateProcessStatus(processStatus, processId);
+            ProcessIdentifier identifier = new ProcessIdentifier(processId, experimentId, gatewayId);
+            ProcessStatusChangeEvent processStatusChangeEvent = new ProcessStatusChangeEvent(processState, identifier);
+            MessageContext msgCtx = new MessageContext(processStatusChangeEvent, MessageType.PROCESS,
+                    AiravataUtils.getId(MessageType.PROCESS.name()), gatewayId);
+            msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+            getStatusPublisher().publish(msgCtx);
+        } catch (Exception ex) {
+            if (registryClient != null) {
+                registryClientPool.returnBrokenResource(registryClient);
+                registryClient = null;
+            }
+        } finally {
+            if (registryClient != null) {
+                registryClientPool.returnResource(registryClient);
+            }
+        }
     }
 
     public static synchronized Publisher getStatusPublisher() throws AiravataException {
diff --git a/modules/computer-resource-monitoring-service/src/main/java/org/apache/airavata/compute/resource/monitoring/ComputationalResourceMonitoringService.java b/modules/computer-resource-monitoring-service/src/main/java/org/apache/airavata/compute/resource/monitoring/ComputationalResourceMonitoringService.java
index 015c3b7..631542f 100644
--- a/modules/computer-resource-monitoring-service/src/main/java/org/apache/airavata/compute/resource/monitoring/ComputationalResourceMonitoringService.java
+++ b/modules/computer-resource-monitoring-service/src/main/java/org/apache/airavata/compute/resource/monitoring/ComputationalResourceMonitoringService.java
@@ -12,6 +12,7 @@
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 /**
  * Computational Resource Monitoring Service
@@ -89,7 +90,9 @@
 
     @Override
     public void stop() throws Exception {
-        scheduler.unscheduleJobs(new ArrayList(jobTriggerMap.values()));
+        scheduler.unscheduleJobs(jobTriggerMap.values().stream().map(trigger -> {
+           return trigger.getKey();
+        }).collect(Collectors.toList()));
     }
 
     @Override