improve error handling at api start/stop phases
diff --git a/airavata-api/src/main/java/org/apache/airavata/Main.java b/airavata-api/src/main/java/org/apache/airavata/Main.java
index 770c569..44f8d73 100644
--- a/airavata-api/src/main/java/org/apache/airavata/Main.java
+++ b/airavata-api/src/main/java/org/apache/airavata/Main.java
@@ -71,6 +71,7 @@
String logo = getLogo();
System.out.println(logo);
Thread.sleep(1000);
+ logger.info("Starting Airavata Services...");
logger.info("Starting Airavata API Server .......");
var airavataApiServer = new AiravataAPIServer();
@@ -137,8 +138,10 @@
var monitoringServer = new MonitoringServer(
ServerSettings.getSetting("monitor.prometheus.host"),
ServerSettings.getIntSetting("monitor.prometheus.port"));
- monitoringServer.start();
- Runtime.getRuntime().addShutdownHook(new Thread(monitoringServer::stop));
+ var thread = new Thread(monitoringServer::start, "MonitoringServer");
+ thread.setDaemon(true);
+ thread.start();
+ Runtime.getRuntime().addShutdownHook(new Thread(monitoringServer::stop, "MonitoringServer.ShutdownHook"));
}
postInit();
@@ -146,9 +149,10 @@
try {
Thread.currentThread().join();
} catch (InterruptedException ex) {
- logger.info("Main thread is interrupted! reason: " + ex);
+ logger.info("Main thread is interrupted. Shutting down Airavata Services.");
ServerSettings.setStopAllThreads(true);
}
+ logger.info("Airavata Services stopped.");
}
public static void postInit() {
diff --git a/airavata-api/src/main/java/org/apache/airavata/api/AiravataAPIServer.java b/airavata-api/src/main/java/org/apache/airavata/api/AiravataAPIServer.java
index 9e6af03..2a26026 100644
--- a/airavata-api/src/main/java/org/apache/airavata/api/AiravataAPIServer.java
+++ b/airavata-api/src/main/java/org/apache/airavata/api/AiravataAPIServer.java
@@ -197,6 +197,7 @@
if (server != null && server.isServing()) {
setStatus(ServerStatus.STOPING);
server.stop();
+ logger.info("Airavata Thrift API Stopped.");
}
}
diff --git a/airavata-api/src/main/java/org/apache/airavata/helix/core/participant/HelixParticipant.java b/airavata-api/src/main/java/org/apache/airavata/helix/core/participant/HelixParticipant.java
index 954b11f..e375952 100644
--- a/airavata-api/src/main/java/org/apache/airavata/helix/core/participant/HelixParticipant.java
+++ b/airavata-api/src/main/java/org/apache/airavata/helix/core/participant/HelixParticipant.java
@@ -212,11 +212,10 @@
try {
Thread.currentThread().join();
} catch (InterruptedException ex) {
- logger.error("Participant: " + participantName + ", is interrupted! reason: " + ex, ex);
+ logger.info("Participant: " + participantName + ", is interrupted. Shutting down.");
}
-
} catch (InterruptedException ex) {
- logger.error("Participant: " + participantName + ", is interrupted! reason: " + ex, ex);
+ logger.info("Participant: " + participantName + ", is interrupted. Shutting down.");
} catch (Exception ex) {
logger.error("Error in connect() for Participant: " + participantName + ", reason: " + ex, ex);
} finally {
diff --git a/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java b/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
index d18cc37..83db90b 100644
--- a/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
+++ b/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
@@ -319,6 +319,7 @@
}
public void startServer() {
+ logger.info("PostWorkflowManager started.");
try {
init();
} catch (Exception e) {
@@ -332,9 +333,16 @@
return;
}
try {
- while (true) {
- final ConsumerRecords<String, JobStatusResult> consumerRecords =
- consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
+ while (!Thread.currentThread().isInterrupted()) {
+ final ConsumerRecords<String, JobStatusResult> consumerRecords;
+ try {
+ consumerRecords = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
+ } catch (Exception e) {
+ if (Thread.currentThread().isInterrupted()) {
+ throw new InterruptedException("PostWorkflowManager is interrupted. Shutting down.");
+ }
+ throw e;
+ }
var executorCompletionService = new ExecutorCompletionService<>(processingPool);
var processingFutures = new ArrayList<>();
@@ -379,15 +387,26 @@
logger.info("All messages processed. Moving to next round");
if (Thread.currentThread().isInterrupted()) {
- throw new InterruptedException("PostWorkflowManager is interrupted!");
+ throw new InterruptedException("PostWorkflowManager is interrupted. Shutting down.");
}
}
} catch (InterruptedException ex) {
- logger.error("PostWorkflowManager is interrupted! reason: " + ex, ex);
+ logger.info("PostWorkflowManager is interrupted. Shutting down.");
} finally {
- consumer.close();
- processingPool.shutdown();
+ try {
+ consumer.unsubscribe();
+ } catch (Exception ignored) {
+ }
+ try {
+ consumer.close();
+ } catch (Exception ignored) {
+ }
+ try {
+ processingPool.shutdown();
+ } catch (Exception ignored) {
+ }
}
+ logger.info("PostWorkflowManager stopped.");
}
private void saveAndPublishJobStatus(
diff --git a/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java b/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
index 5d13153..851ffd4 100644
--- a/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
+++ b/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
@@ -76,15 +76,17 @@
}
public void startServer() {
+ logger.info("PreWorkflowManager started.");
try {
super.initComponents();
initLaunchSubscriber();
Thread.currentThread().join();
} catch (InterruptedException ex) {
- logger.error("PreWorkflowManager is interrupted! reason: " + ex, ex);
+ logger.info("PreWorkflowManager is interrupted. Shutting down.");
} catch (Exception e) {
- logger.error("Error starting PreWorkflowManager", e);
+ logger.error("Error running PreWorkflowManager", e);
}
+ logger.info("PreWorkflowManager stopped.");
}
public void stopServer() {}
diff --git a/airavata-api/src/main/java/org/apache/airavata/monitor/email/EmailBasedMonitor.java b/airavata-api/src/main/java/org/apache/airavata/monitor/email/EmailBasedMonitor.java
index e2137be..e0d3203 100644
--- a/airavata-api/src/main/java/org/apache/airavata/monitor/email/EmailBasedMonitor.java
+++ b/airavata-api/src/main/java/org/apache/airavata/monitor/email/EmailBasedMonitor.java
@@ -173,7 +173,13 @@
private void runEmailMonitor() {
Session session = null;
SearchTerm unseenBefore = new FlagTerm(new Flags(Flags.Flag.SEEN), false);
- while (!ServerSettings.isStopAllThreads()) {
+ long sleepTime = Duration.ofMinutes(1).toMillis();
+ try {
+ sleepTime = ServerSettings.getEmailMonitorPeriod();
+ } catch (Exception e) {
+ log.warn("Email monitor period was not found. Using default (1 minute)", e);
+ }
+ while (!ServerSettings.isStopAllThreads() && !Thread.currentThread().isInterrupted()) {
try {
if (session == null) {
session = Session.getDefaultInstance(properties);
@@ -214,11 +220,6 @@
}
emailFolder.close(false);
}
- if (Thread.currentThread().isInterrupted()) {
- throw new InterruptedException("EmailBasedMonitor is interrupted!");
- }
- } catch (InterruptedException ex) {
- log.error("EmailBasedMonitor is interrupted! reason: " + ex, ex);
} catch (MessagingException e) {
log.error("Couldn't connect to the store ", e);
} catch (Throwable e) {
@@ -234,16 +235,14 @@
} catch (MessagingException e) {
log.error("Store close operation failed, couldn't close store", e);
}
- try {
- Thread.sleep(ServerSettings.getEmailMonitorPeriod());
- } catch (InterruptedException e) {
- log.error("interrupted while sleeping ", e);
- } catch (Exception e) {
- log.error("exception thrown when attempting to sleep ", e);
- }
+ }
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) {
+ log.info("interrupt received. shutting down email monitor.");
}
}
- log.info("Email monitoring daemon stopped");
+ log.info("Email monitor stopped");
}
@Override
diff --git a/airavata-api/src/main/java/org/apache/airavata/monitor/platform/MonitoringServer.java b/airavata-api/src/main/java/org/apache/airavata/monitor/platform/MonitoringServer.java
index 1cde8f5..8b1e6f0 100644
--- a/airavata-api/src/main/java/org/apache/airavata/monitor/platform/MonitoringServer.java
+++ b/airavata-api/src/main/java/org/apache/airavata/monitor/platform/MonitoringServer.java
@@ -40,6 +40,7 @@
try {
logger.info("Starting the monitoring server");
httpServer = new HTTPServer(host, port, true);
+ logger.info("Monitoring server started on host {} and port {}", host, port);
} catch (Exception e) {
logger.error("Failed to start the monitoring server on host {} na port {}", host, port, e);
}
diff --git a/airavata-api/src/main/java/org/apache/airavata/monitor/realtime/RealtimeMonitor.java b/airavata-api/src/main/java/org/apache/airavata/monitor/realtime/RealtimeMonitor.java
index 72ab558..7325421 100644
--- a/airavata-api/src/main/java/org/apache/airavata/monitor/realtime/RealtimeMonitor.java
+++ b/airavata-api/src/main/java/org/apache/airavata/monitor/realtime/RealtimeMonitor.java
@@ -31,7 +31,6 @@
import org.apache.airavata.registry.api.RegistryService;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
@@ -66,36 +65,53 @@
return consumer;
}
+ private void closeConsumer(Consumer<String, String> consumer) {
+ if (consumer != null) {
+ try {
+ consumer.unsubscribe();
+ consumer.close();
+ } catch (Exception ignored) {
+ }
+ }
+ }
+
private void runConsumer() {
- final Consumer<String, String> consumer;
+ logger.info("RealtimeMonitor started.");
+ Consumer<String, String> consumer;
try {
consumer = createConsumer();
} catch (ApplicationSettingsException e) {
logger.error("Error while creating consumer", e);
return;
}
-
try {
- while (true) {
- final ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
- RegistryService.Iface registry = getRegistry();
- consumerRecords.forEach(record -> {
- try {
- process(record.key(), record.value(), registry);
- } catch (Exception e) {
- logger.error("Error while processing message {}", record.value(), e);
+ while (!Thread.currentThread().isInterrupted()) {
+ try {
+ var consumerRecords = consumer.poll(Duration.ofSeconds(1));
+ var registry = getRegistry();
+ consumerRecords.forEach(record -> {
+ try {
+ process(record.key(), record.value(), registry);
+ } catch (Exception e) {
+ logger.error("Error while processing message {}", record.value(), e);
+ }
+ });
+ consumer.commitAsync();
+ } catch (Exception e) {
+ if (Thread.currentThread().isInterrupted()) {
+ throw new InterruptedException("RealtimeMonitor is interrupted.");
}
- });
- consumer.commitAsync();
- if (Thread.currentThread().isInterrupted()) {
- throw new InterruptedException("RealtimeMonitor is interrupted!");
+ logger.error("Error while polling consumer", e);
}
}
- } catch (InterruptedException ex) {
- logger.error("RealtimeMonitor is interrupted! reason: " + ex, ex);
+ } catch (InterruptedException e) {
+ logger.info("RealtimeMonitor is interrupted. Shutting down.");
+ closeConsumer(consumer);
+ consumer = null;
} finally {
- consumer.close();
+ closeConsumer(consumer);
}
+ logger.info("RealtimeMonitor stopped.");
}
private void process(String key, String value, RegistryService.Iface registry) throws MonitoringException {