ATLAS-4463: Fixed Infinite loop at Index Health Monitor
Signed-off-by: Sidharth Mishra <sidmishra@apache.org>
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/IndexRecoveryService.java b/repository/src/main/java/org/apache/atlas/repository/graph/IndexRecoveryService.java
index 2f11610..b316354 100644
--- a/repository/src/main/java/org/apache/atlas/repository/graph/IndexRecoveryService.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/IndexRecoveryService.java
@@ -37,6 +37,7 @@
import java.time.Instant;
import java.util.Iterator;
import java.util.TimeZone;
+import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.atlas.ApplicationProperties.DEFAULT_INDEX_RECOVERY;
import static org.apache.atlas.repository.Constants.PROPERTY_KEY_INDEX_RECOVERY_NAME;
@@ -58,6 +59,7 @@
private final RecoveryInfoManagement recoveryInfoManagement;
private Configuration configuration;
private boolean isIndexRecoveryEnabled;
+ private RecoveryThread recoveryThread;
@Inject
public IndexRecoveryService(Configuration config, AtlasGraph graph) {
@@ -67,7 +69,7 @@
long healthCheckFrequencyMillis = config.getLong(SOLR_STATUS_CHECK_RETRY_INTERVAL, SOLR_STATUS_RETRY_DEFAULT_MS);
this.recoveryInfoManagement = new RecoveryInfoManagement(graph);
- RecoveryThread recoveryThread = new RecoveryThread(recoveryInfoManagement, graph, recoveryStartTimeFromConfig, healthCheckFrequencyMillis);
+ this.recoveryThread = new RecoveryThread(recoveryInfoManagement, graph, recoveryStartTimeFromConfig, healthCheckFrequencyMillis);
this.indexHealthMonitor = new Thread(recoveryThread, INDEX_HEALTH_MONITOR_THREAD_NAME);
}
@@ -102,6 +104,8 @@
@Override
public void stop() throws AtlasException {
try {
+ recoveryThread.shutdown();
+
indexHealthMonitor.join();
} catch (InterruptedException e) {
LOG.error("indexHealthMonitor: Interrupted", e);
@@ -143,6 +147,8 @@
private long indexStatusCheckRetryMillis;
private Object txRecoveryObject;
+ private final AtomicBoolean shouldRun = new AtomicBoolean(false);
+
private RecoveryThread(RecoveryInfoManagement recoveryInfoManagement, AtlasGraph graph, long startTimeFromConfig, long healthCheckFrequencyMillis) {
this.graph = graph;
this.recoveryInfoManagement = recoveryInfoManagement;
@@ -154,9 +160,11 @@
}
public void run() {
+ shouldRun.set(true);
+
LOG.info("Index Health Monitor: Starting...");
- while (true) {
+ while (shouldRun.get()) {
try {
boolean solrHealthy = isSolrHealthy();
@@ -173,6 +181,22 @@
}
}
+ public void shutdown() {
+ try {
+ LOG.info("Index Health Monitor: Shutdown: Starting...");
+
+ // handle the case where thread was not started at all
+ // and shutdown called
+ if (shouldRun.get() == false) {
+ return;
+ }
+
+ shouldRun.set(false);
+ } finally {
+ LOG.info("Index Health Monitor: Shutdown: Done!");
+ }
+ }
+
private boolean isSolrHealthy() throws AtlasException, InterruptedException {
Thread.sleep(indexStatusCheckRetryMillis);