improve error handling at api start/stop phases
diff --git a/airavata-api/src/main/java/org/apache/airavata/Main.java b/airavata-api/src/main/java/org/apache/airavata/Main.java
index 770c569..44f8d73 100644
--- a/airavata-api/src/main/java/org/apache/airavata/Main.java
+++ b/airavata-api/src/main/java/org/apache/airavata/Main.java
@@ -71,6 +71,7 @@
         String logo = getLogo();
         System.out.println(logo);
         Thread.sleep(1000);
+        logger.info("Starting Airavata Services...");
 
         logger.info("Starting Airavata API Server .......");
         var airavataApiServer = new AiravataAPIServer();
@@ -137,8 +138,10 @@
             var monitoringServer = new MonitoringServer(
                     ServerSettings.getSetting("monitor.prometheus.host"),
                     ServerSettings.getIntSetting("monitor.prometheus.port"));
-            monitoringServer.start();
-            Runtime.getRuntime().addShutdownHook(new Thread(monitoringServer::stop));
+            var thread = new Thread(monitoringServer::start, "MonitoringServer");
+            thread.setDaemon(true);
+            thread.start();
+            Runtime.getRuntime().addShutdownHook(new Thread(monitoringServer::stop, "MonitoringServer.ShutdownHook"));
         }
 
         postInit();
@@ -146,9 +149,10 @@
         try {
             Thread.currentThread().join();
         } catch (InterruptedException ex) {
-            logger.info("Main thread is interrupted! reason: " + ex);
+            logger.info("Main thread is interrupted. Shutting down Airavata Services.");
             ServerSettings.setStopAllThreads(true);
         }
+        logger.info("Airavata Services stopped.");
     }
 
     public static void postInit() {
diff --git a/airavata-api/src/main/java/org/apache/airavata/api/AiravataAPIServer.java b/airavata-api/src/main/java/org/apache/airavata/api/AiravataAPIServer.java
index 9e6af03..2a26026 100644
--- a/airavata-api/src/main/java/org/apache/airavata/api/AiravataAPIServer.java
+++ b/airavata-api/src/main/java/org/apache/airavata/api/AiravataAPIServer.java
@@ -197,6 +197,7 @@
         if (server != null && server.isServing()) {
             setStatus(ServerStatus.STOPING);
             server.stop();
+            logger.info("Airavata Thrift API Stopped.");
         }
     }
 
diff --git a/airavata-api/src/main/java/org/apache/airavata/helix/core/participant/HelixParticipant.java b/airavata-api/src/main/java/org/apache/airavata/helix/core/participant/HelixParticipant.java
index 954b11f..e375952 100644
--- a/airavata-api/src/main/java/org/apache/airavata/helix/core/participant/HelixParticipant.java
+++ b/airavata-api/src/main/java/org/apache/airavata/helix/core/participant/HelixParticipant.java
@@ -212,11 +212,10 @@
             try {
                 Thread.currentThread().join();
             } catch (InterruptedException ex) {
-                logger.error("Participant: " + participantName + ", is interrupted! reason: " + ex, ex);
+                logger.info("Participant: " + participantName + ", is interrupted. Shutting down.");
             }
-
         } catch (InterruptedException ex) {
-            logger.error("Participant: " + participantName + ", is interrupted! reason: " + ex, ex);
+            logger.info("Participant: " + participantName + ", is interrupted. Shutting down.");
         } catch (Exception ex) {
             logger.error("Error in connect() for Participant: " + participantName + ", reason: " + ex, ex);
         } finally {
diff --git a/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java b/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
index d18cc37..83db90b 100644
--- a/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
+++ b/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
@@ -319,6 +319,7 @@
     }
 
     public void startServer() {
+        logger.info("PostWorkflowManager started.");
         try {
             init();
         } catch (Exception e) {
@@ -332,9 +333,16 @@
             return;
         }
         try {
-            while (true) {
-                final ConsumerRecords<String, JobStatusResult> consumerRecords =
-                        consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
+            while (!Thread.currentThread().isInterrupted()) {
+                final ConsumerRecords<String, JobStatusResult> consumerRecords;
+                try {
+                    consumerRecords = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
+                } catch (Exception e) {
+                    if (Thread.currentThread().isInterrupted()) {
+                        throw new InterruptedException("PostWorkflowManager is interrupted. Shutting down.");
+                    }
+                    throw e;
+                }
                 var executorCompletionService = new ExecutorCompletionService<>(processingPool);
                 var processingFutures = new ArrayList<>();
 
@@ -379,15 +387,26 @@
                 logger.info("All messages processed. Moving to next round");
 
                 if (Thread.currentThread().isInterrupted()) {
-                    throw new InterruptedException("PostWorkflowManager is interrupted!");
+                    throw new InterruptedException("PostWorkflowManager is interrupted. Shutting down.");
                 }
             }
         } catch (InterruptedException ex) {
-            logger.error("PostWorkflowManager is interrupted! reason: " + ex, ex);
+            logger.info("PostWorkflowManager is interrupted. Shutting down.");
         } finally {
-            consumer.close();
-            processingPool.shutdown();
+            try {
+                consumer.unsubscribe();
+            } catch (Exception ignored) {
+            }
+            try {
+                consumer.close();
+            } catch (Exception ignored) {
+            }
+            try {
+                processingPool.shutdown();
+            } catch (Exception ignored) {
+            }
         }
+        logger.info("PostWorkflowManager stopped.");
     }
 
     private void saveAndPublishJobStatus(
diff --git a/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java b/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
index 5d13153..851ffd4 100644
--- a/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
+++ b/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
@@ -76,15 +76,17 @@
     }
 
     public void startServer() {
+        logger.info("PreWorkflowManager started.");
         try {
             super.initComponents();
             initLaunchSubscriber();
             Thread.currentThread().join();
         } catch (InterruptedException ex) {
-            logger.error("PreWorkflowManager is interrupted! reason: " + ex, ex);
+            logger.info("PreWorkflowManager is interrupted. Shutting down.");
         } catch (Exception e) {
-            logger.error("Error starting PreWorkflowManager", e);
+            logger.error("Error running PreWorkflowManager", e);
         }
+        logger.info("PreWorkflowManager stopped.");
     }
 
     public void stopServer() {}
diff --git a/airavata-api/src/main/java/org/apache/airavata/monitor/email/EmailBasedMonitor.java b/airavata-api/src/main/java/org/apache/airavata/monitor/email/EmailBasedMonitor.java
index e2137be..e0d3203 100644
--- a/airavata-api/src/main/java/org/apache/airavata/monitor/email/EmailBasedMonitor.java
+++ b/airavata-api/src/main/java/org/apache/airavata/monitor/email/EmailBasedMonitor.java
@@ -173,7 +173,13 @@
     private void runEmailMonitor() {
         Session session = null;
         SearchTerm unseenBefore = new FlagTerm(new Flags(Flags.Flag.SEEN), false);
-        while (!ServerSettings.isStopAllThreads()) {
+        long sleepTime = Duration.ofMinutes(1).toMillis();
+        try {
+            sleepTime = ServerSettings.getEmailMonitorPeriod();
+        } catch (Exception e) {
+            log.warn("Email monitor period was not found. Using default (1 minute)", e);
+        }
+        while (!ServerSettings.isStopAllThreads() && !Thread.currentThread().isInterrupted()) {
             try {
                 if (session == null) {
                     session = Session.getDefaultInstance(properties);
@@ -214,11 +220,6 @@
                     }
                     emailFolder.close(false);
                 }
-                if (Thread.currentThread().isInterrupted()) {
-                    throw new InterruptedException("EmailBasedMonitor is interrupted!");
-                }
-            } catch (InterruptedException ex) {
-                log.error("EmailBasedMonitor is interrupted! reason: " + ex, ex);
             } catch (MessagingException e) {
                 log.error("Couldn't connect to the store ", e);
             } catch (Throwable e) {
@@ -234,16 +235,14 @@
                 } catch (MessagingException e) {
                     log.error("Store close operation failed, couldn't close store", e);
                 }
-                try {
-                    Thread.sleep(ServerSettings.getEmailMonitorPeriod());
-                } catch (InterruptedException e) {
-                    log.error("interrupted while sleeping ", e);
-                } catch (Exception e) {
-                    log.error("exception thrown when attempting to sleep ", e);
-                }
+            }
+            try {
+                Thread.sleep(sleepTime);
+            } catch (InterruptedException e) {
+                log.info("interrupt received. shutting down email monitor.");
             }
         }
-        log.info("Email monitoring daemon stopped");
+        log.info("Email monitor stopped");
     }
 
     @Override
diff --git a/airavata-api/src/main/java/org/apache/airavata/monitor/platform/MonitoringServer.java b/airavata-api/src/main/java/org/apache/airavata/monitor/platform/MonitoringServer.java
index 1cde8f5..8b1e6f0 100644
--- a/airavata-api/src/main/java/org/apache/airavata/monitor/platform/MonitoringServer.java
+++ b/airavata-api/src/main/java/org/apache/airavata/monitor/platform/MonitoringServer.java
@@ -40,6 +40,7 @@
         try {
             logger.info("Starting the monitoring server");
             httpServer = new HTTPServer(host, port, true);
+            logger.info("Monitoring server started on host {} and port {}", host, port);
         } catch (Exception e) {
             logger.error("Failed to start the monitoring server on host {} na port {}", host, port, e);
         }
diff --git a/airavata-api/src/main/java/org/apache/airavata/monitor/realtime/RealtimeMonitor.java b/airavata-api/src/main/java/org/apache/airavata/monitor/realtime/RealtimeMonitor.java
index 72ab558..7325421 100644
--- a/airavata-api/src/main/java/org/apache/airavata/monitor/realtime/RealtimeMonitor.java
+++ b/airavata-api/src/main/java/org/apache/airavata/monitor/realtime/RealtimeMonitor.java
@@ -31,7 +31,6 @@
 import org.apache.airavata.registry.api.RegistryService;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.slf4j.Logger;
@@ -66,36 +65,53 @@
         return consumer;
     }
 
+    private void closeConsumer(Consumer<String, String> consumer) {
+        if (consumer != null) {
+            try {
+                consumer.unsubscribe();
+                consumer.close();
+            } catch (Exception ignored) {
+            }
+        }
+    }
+
     private void runConsumer() {
-        final Consumer<String, String> consumer;
+        logger.info("RealtimeMonitor started.");
+        Consumer<String, String> consumer;
         try {
             consumer = createConsumer();
         } catch (ApplicationSettingsException e) {
             logger.error("Error while creating consumer", e);
             return;
         }
-
         try {
-            while (true) {
-                final ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
-                RegistryService.Iface registry = getRegistry();
-                consumerRecords.forEach(record -> {
-                    try {
-                        process(record.key(), record.value(), registry);
-                    } catch (Exception e) {
-                        logger.error("Error while processing message {}", record.value(), e);
+            while (!Thread.currentThread().isInterrupted()) {
+                try {
+                    var consumerRecords = consumer.poll(Duration.ofSeconds(1));
+                    var registry = getRegistry();
+                    consumerRecords.forEach(record -> {
+                        try {
+                            process(record.key(), record.value(), registry);
+                        } catch (Exception e) {
+                            logger.error("Error while processing message {}", record.value(), e);
+                        }
+                    });
+                    consumer.commitAsync();
+                } catch (Exception e) {
+                    if (Thread.currentThread().isInterrupted()) {
+                        throw new InterruptedException("RealtimeMonitor is interrupted.");
                     }
-                });
-                consumer.commitAsync();
-                if (Thread.currentThread().isInterrupted()) {
-                    throw new InterruptedException("RealtimeMonitor is interrupted!");
+                    logger.error("Error while polling consumer", e);
                 }
             }
-        } catch (InterruptedException ex) {
-            logger.error("RealtimeMonitor is interrupted! reason: " + ex, ex);
+        } catch (InterruptedException e) {
+            logger.info("RealtimeMonitor is interrupted. Shutting down.");
+            closeConsumer(consumer);
+            consumer = null;
         } finally {
-            consumer.close();
+            closeConsumer(consumer);
         }
+        logger.info("RealtimeMonitor stopped.");
     }
 
     private void process(String key, String value, RegistryService.Iface registry) throws MonitoringException {