SLING-5195 : HeartbeatHandler now has a dedicated separately scheduled Runnable that periodically checksForTopologyChanges - to avoid possibility of being blocked by long-during session-saves. Additionally, it checks if its own heartbeat is older than a heartbeat timeout - in which case it treats itself as TOPOLOGY_CHANGING.

git-svn-id: https://svn.apache.org/repos/asf/sling/trunk@1711257 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/src/main/java/org/apache/sling/discovery/impl/DiscoveryServiceImpl.java b/src/main/java/org/apache/sling/discovery/impl/DiscoveryServiceImpl.java
index 3822c6d..cd9473a 100644
--- a/src/main/java/org/apache/sling/discovery/impl/DiscoveryServiceImpl.java
+++ b/src/main/java/org/apache/sling/discovery/impl/DiscoveryServiceImpl.java
@@ -121,7 +121,7 @@
     private ConnectorRegistry connectorRegistry;
 
     @Reference
-    private ClusterViewService clusterViewService;
+    private ClusterViewServiceImpl clusterViewService;
 
     @Reference
     private Config config;
@@ -139,7 +139,7 @@
     public static BaseDiscoveryService testConstructor(ResourceResolverFactory resourceResolverFactory, 
             AnnouncementRegistry announcementRegistry, 
             ConnectorRegistry connectorRegistry,
-            ClusterViewService clusterViewService,
+            ClusterViewServiceImpl clusterViewService,
             HeartbeatHandler heartbeatHandler,
             SlingSettingsService settingsService,
             Scheduler scheduler,
@@ -651,6 +651,10 @@
         return clusterViewService;
     }
     
+    public ClusterViewServiceImpl getClusterViewServiceImpl() {
+        return clusterViewService;
+    }
+
     protected AnnouncementRegistry getAnnouncementRegistry() {
         return announcementRegistry;
     }
diff --git a/src/main/java/org/apache/sling/discovery/impl/cluster/ClusterViewServiceImpl.java b/src/main/java/org/apache/sling/discovery/impl/cluster/ClusterViewServiceImpl.java
index 1e1b9a4..cb3fb4a 100644
--- a/src/main/java/org/apache/sling/discovery/impl/cluster/ClusterViewServiceImpl.java
+++ b/src/main/java/org/apache/sling/discovery/impl/cluster/ClusterViewServiceImpl.java
@@ -44,7 +44,7 @@
  * currently established view
  */
 @Component
-@Service(value = ClusterViewService.class)
+@Service(value = {ClusterViewService.class, ClusterViewServiceImpl.class})
 public class ClusterViewServiceImpl implements ClusterViewService {
 
     private final Logger logger = LoggerFactory.getLogger(this.getClass());
@@ -58,6 +58,8 @@
     @Reference
     private Config config;
 
+    private String failedEstablishedViewId;
+
     public static ClusterViewService testConstructor(SlingSettingsService settingsService,
             ResourceResolverFactory factory, Config config) {
         ClusterViewServiceImpl service = new ClusterViewServiceImpl();
@@ -73,6 +75,15 @@
     	}
         return settingsService.getSlingId();
     }
+    
+    public void invalidateEstablishedViewId(String establishedViewId) {
+        if (establishedViewId != null &&
+                (failedEstablishedViewId == null ||
+                !failedEstablishedViewId.equals(establishedViewId))) {
+            logger.info("invalidateEstablishedViewId: marking established view as invalid: "+establishedViewId);;
+        }
+        failedEstablishedViewId = establishedViewId;
+    }
 
     public LocalClusterView getLocalClusterView() throws UndefinedClusterViewException {
     	if (resourceResolverFactory==null) {
@@ -91,6 +102,16 @@
                 throw new UndefinedClusterViewException(Reason.NO_ESTABLISHED_VIEW,
                         "no established view at the moment");
             }
+            
+            if (failedEstablishedViewId != null
+                    && failedEstablishedViewId.equals(view.getResource().getName())) {
+                // SLING-5195 : the heartbeat-handler-self-check has declared the currently
+                // established view as invalid - hence we should now treat this as 
+                // undefined clusterview
+                logger.debug("getClusterView: current establishedView is marked as invalid: "+failedEstablishedViewId);
+                throw new UndefinedClusterViewException(Reason.NO_ESTABLISHED_VIEW,
+                        "current established view was marked as invalid");
+            }
 
             EstablishedClusterView clusterViewImpl = new EstablishedClusterView(
                     config, view, getSlingId());
@@ -106,11 +127,19 @@
                 throw new UndefinedClusterViewException(Reason.ISOLATED_FROM_TOPOLOGY, 
                         "established view does not include local instance - isolated");
             }
+        } catch (UndefinedClusterViewException e) {
+            // pass through
+            throw e;
         } catch (LoginException e) {
             logger.error(
                     "handleEvent: could not log in administratively: " + e, e);
             throw new UndefinedClusterViewException(Reason.REPOSITORY_EXCEPTION,
                     "could not log in administratively: "+e);
+        } catch (Exception e) {
+            logger.error(
+                    "handleEvent: got an exception: " + e, e);
+            throw new UndefinedClusterViewException(Reason.REPOSITORY_EXCEPTION,
+                    "could not log in administratively: "+e);
         } finally {
             if (resourceResolver != null) {
                 resourceResolver.close();
diff --git a/src/main/java/org/apache/sling/discovery/impl/common/ViewHelper.java b/src/main/java/org/apache/sling/discovery/impl/common/ViewHelper.java
index 7586160..d415318 100644
--- a/src/main/java/org/apache/sling/discovery/impl/common/ViewHelper.java
+++ b/src/main/java/org/apache/sling/discovery/impl/common/ViewHelper.java
@@ -117,23 +117,4 @@
         }
     }
 
-    /**
-     * Check if the established view matches the given set of slingIds
-     */
-    public static boolean establishedViewMatches(
-            final ResourceResolver resourceResolver, final Config config, final Set<String> view) {
-        final View establishedView = ViewHelper.getEstablishedView(resourceResolver, config);
-        if (establishedView == null) {
-            return false;
-        } else {
-            String mismatchDetails = establishedView.matches(view);
-            if (mismatchDetails != null) {
-                logger.info("establishedViewMatches: established view does not match. (details: " + mismatchDetails + ")");
-            } else {
-                logger.debug("establishedViewMatches: established view matches with expected.");
-            }
-            return mismatchDetails == null;
-        }
-    }
-
 }
diff --git a/src/main/java/org/apache/sling/discovery/impl/common/heartbeat/HeartbeatHandler.java b/src/main/java/org/apache/sling/discovery/impl/common/heartbeat/HeartbeatHandler.java
index bea64a2..44efedd 100644
--- a/src/main/java/org/apache/sling/discovery/impl/common/heartbeat/HeartbeatHandler.java
+++ b/src/main/java/org/apache/sling/discovery/impl/common/heartbeat/HeartbeatHandler.java
@@ -21,6 +21,7 @@
 import java.util.Calendar;
 import java.util.Date;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Set;
 import java.util.UUID;
 
@@ -48,6 +49,7 @@
 import org.apache.sling.discovery.impl.cluster.voting.VotingHandler;
 import org.apache.sling.discovery.impl.cluster.voting.VotingHelper;
 import org.apache.sling.discovery.impl.cluster.voting.VotingView;
+import org.apache.sling.discovery.impl.common.View;
 import org.apache.sling.discovery.impl.common.ViewHelper;
 import org.apache.sling.launchpad.api.StartupListener;
 import org.apache.sling.settings.SlingSettingsService;
@@ -108,10 +110,14 @@
     private long firstHeartbeatWritten = -1;
 
     /** SLING-2892: remember the value of the heartbeat this instance has written the last time **/
-    private Calendar lastHeartbeatWritten = null;
+    private volatile Calendar lastHeartbeatWritten = null;
 
     private DiscoveryServiceImpl discoveryServiceImpl;
 
+    private String lastEstablishedViewId;
+
+    protected String failedEstablishedViewId;
+
     /** for testing only **/
     public static HeartbeatHandler testConstructor(
             SlingSettingsService slingSettingsService,
@@ -119,7 +125,8 @@
             AnnouncementRegistry announcementRegistry, 
             ConnectorRegistry connectorRegistry,
             Config config, 
-            Scheduler scheduler) {
+            Scheduler scheduler,
+            VotingHandler votingHandler) {
         HeartbeatHandler handler = new HeartbeatHandler();
         handler.slingSettingsService = slingSettingsService;
         handler.resourceResolverFactory = factory;
@@ -127,6 +134,7 @@
         handler.connectorRegistry = connectorRegistry;
         handler.config = config;
         handler.scheduler = scheduler;
+        handler.votingHandler = votingHandler;
         return handler;
     }
     
@@ -177,6 +185,12 @@
         logger.info("doActivate: activated with runtimeId: {}, slingId: {}", runtimeId, slingId);
     }
     
+    @Override
+    protected void deactivate() {
+        super.deactivate();
+        scheduler.removeJob(NAME+".checkForTopologyChange");        
+    }
+    
     /**
      * The initialize method is called by the DiscoveryServiceImpl.activate
      * as we require the discoveryService (and the discoveryService has
@@ -201,16 +215,64 @@
         }
 
         try {
-            final long interval = config.getHeartbeatInterval();
+            long interval = config.getHeartbeatInterval();
             logger.info("initialize: starting periodic heartbeat job for "+slingId+" with interval "+interval+" sec.");
             if (interval==0) {
-                logger.warn("initialize: Repeat interval cannot be zero.");
+                logger.warn("initialize: Repeat interval cannot be zero. Defaulting to 10sec");
+                interval = 10;
             }
             scheduler.addPeriodicJob(NAME, this,
                     null, interval, false);
         } catch (Exception e) {
             logger.error("activate: Could not start heartbeat runner: " + e, e);
         }
+
+        // SLING-5195 - to account for repository delays, the writing of heartbeats and voting
+        // should be done independently of getting the current clusterView and 
+        // potentially sending a topology event.
+        // so this second part is now done (additionally) in a 2nd runner here:
+        try {
+            long interval = config.getHeartbeatInterval();
+            logger.info("initialize: starting periodic heartbeat job for "+slingId+" with interval "+interval+" sec.");
+            if (interval==0) {
+                logger.warn("initialize: Repeat interval cannot be zero. Defaulting to 10sec.");
+                interval = 10;
+            }
+            scheduler.addPeriodicJob(NAME+".checkForTopologyChange", new Runnable() {
+
+                @Override
+                public void run() {
+                    Calendar lastHb = lastHeartbeatWritten;
+                    if (lastHb!=null) {
+                        // check to see when we last wrote a heartbeat
+                        // if it is older than the configured timeout,
+                        // then mark ourselves as in topologyChanging automatically
+                        long timeSinceHb = System.currentTimeMillis() - lastHb.getTimeInMillis();
+                        if (timeSinceHb > config.getHeartbeatTimeoutMillis()) {
+                            logger.info("checkForTopologyChange/.run: time since local instance last wrote a heartbeat is "+timeSinceHb+"ms. Flagging us as (still) changing");
+                            // mark the current establishedView as faulty
+                            invalidateCurrentEstablishedView();
+                            
+                            // then tell the listeners immediately
+                            // note that just calling handleTopologyChanging alone - without the above invalidate -
+                            // won't be sufficient, because that would only affect the listeners, not the
+                            // getTopology() call.
+                            discoveryService.handleTopologyChanging();
+                            return;
+                        }
+                    }
+                    // SLING-5195: guarantee frequent calls to checkForTopologyChange,
+                    // independently of blocked write/save operations
+                    logger.debug("checkForTopologyChange/.run: going to check for topology change...");
+                    discoveryService.checkForTopologyChange();
+                    logger.debug("checkForTopologyChange/.run: check for topology change done.");
+                }
+                
+            },
+                    null, interval, false);
+        } catch (Exception e) {
+            logger.error("activate: Could not start heartbeat runner: " + e, e);
+        }
     }
 
     /** Get or create a ResourceResolver **/
@@ -324,6 +386,7 @@
             				// sling instance accessing the same repository (ie in the same cluster)
             				// using the same sling.id - hence writing to the same
             				// resource
+            			    invalidateCurrentEstablishedView();
             			    discoveryServiceImpl.handleTopologyChanging();
             				logger.error("issueClusterLocalHeartbeat: SLING-2892: Detected unexpected, concurrent update of: "+
             						myClusterNodePath+" 'lastHeartbeat'. If not done manually, " +
@@ -344,6 +407,7 @@
             	    // someone deleted the resource property
             	    firstHeartbeatWritten = -1;
             	} else if (!runtimeId.equals(readRuntimeId)) {
+            	    invalidateCurrentEstablishedView();
             	    discoveryServiceImpl.handleTopologyChanging();
                     final String slingHomePath = slingSettingsService==null ? "n/a" : slingSettingsService.getSlingHomePath();
                     final String endpointsAsString = getEndpointsAsString();
@@ -390,7 +454,7 @@
                 this.newLeaderElectionId = null;
                 resourceMap.put("leaderElectionId", newLeaderElectionId);
                 resourceMap.put("leaderElectionIdCreatedAt", new Date());
-                logger.info("issueClusterLocalHeartbeat: set leaderElectionId to "+newLeaderElectionId);
+                logger.info("issueClusterLocalHeartbeat: set leaderElectionId to "+newLeaderElectionId+" (resetLeaderElectionId: "+resetLeaderElectionId+")");
                 if (votingHandler!=null) {
                     votingHandler.setLeaderElectionId(newLeaderElectionId);
                 }
@@ -493,6 +557,9 @@
             }
         }
 
+        final View establishedView = ViewHelper.getEstablishedView(resourceResolver, config);
+        lastEstablishedViewId = establishedView == null ? null : establishedView.getResource().getName();
+
         final VotingView winningVoting = VotingHelper.getWinningVoting(
                 resourceResolver, config);
         int numOpenNonWinningVotes = VotingHelper.listOpenNonWinningVotings(
@@ -503,6 +570,7 @@
             
             // but first: make sure we sent the TOPOLOGY_CHANGING
             logger.info("doCheckViewWith: there are pending votings, marking topology as changing...");
+            invalidateCurrentEstablishedView();
             discoveryServiceImpl.handleTopologyChanging();
             
         	if (logger.isDebugEnabled()) {
@@ -518,7 +586,28 @@
         final Set<String> liveInstances = ViewHelper.determineLiveInstances(
                 clusterNodesRes, config);
 
-        if (ViewHelper.establishedViewMatches(resourceResolver, config, liveInstances)) {
+        boolean establishedViewMatches;
+        if (lastEstablishedViewId != null && failedEstablishedViewId != null
+                && lastEstablishedViewId.equals(failedEstablishedViewId)) {
+            // SLING-5195 : heartbeat-self-check caused this establishedViewId
+            // to be declared as failed - so we must now cause a new voting
+            logger.info("doCheckView: current establishedViewId ({}) was declared as failed earlier already.", lastEstablishedViewId);
+            establishedViewMatches = false;
+        } else {
+            if (establishedView == null) {
+                establishedViewMatches = false;
+            } else {
+                String mismatchDetails = establishedView.matches(liveInstances);
+                if (mismatchDetails != null) {
+                    logger.info("doCheckView: established view does not match. (details: " + mismatchDetails + ")");
+                } else {
+                    logger.debug("doCheckView: established view matches with expected.");
+                }
+                establishedViewMatches = mismatchDetails == null;
+            }
+        }
+        
+        if (establishedViewMatches) {
             // that's the normal case. the established view matches what we're
             // seeing.
             // all happy and fine
@@ -528,8 +617,15 @@
         
         // immediately send a TOPOLOGY_CHANGING - could already be sent, but just to be sure
         logger.info("doCheckViewWith: no matching established view, marking topology as changing");
+        invalidateCurrentEstablishedView();
         discoveryServiceImpl.handleTopologyChanging();
         
+        List<VotingView> myYesVotes = VotingHelper.getYesVotingsOf(resourceResolver, config, slingId);
+        if (myYesVotes != null && myYesVotes.size() > 0) {
+            logger.info("doCheckViewWith: I have voted yes (" + myYesVotes.size() + "x)- the vote was not yet promoted but expecting it to be soon. Not voting again in the meantime. My yes vote was for: "+myYesVotes);
+            return;
+        }
+        
     	if (logger.isDebugEnabled()) {
 	        logger.debug("doCheckViewWith: no pending nor winning votes. But: view does not match established or no established yet. Initiating a new voting");
 	        Iterator<String> it = liveInstances.iterator();
@@ -554,7 +650,23 @@
 
         VotingView.newVoting(resourceResolver, config, votingId, slingId, liveInstances);
     }
-
+    
+    /**
+     * Mark the current establishedView as invalid - requiring it to be
+     * replaced with a new one, be it by another instance or this one,
+     * via a new vote
+     */
+    public void invalidateCurrentEstablishedView() {
+        if (lastEstablishedViewId == null) {
+            logger.info("invalidateCurrentEstablishedView: cannot invalidate, lastEstablishedViewId==null");
+            return;
+        }
+        logger.info("invalidateCurrentEstablishedView: invalidating slingId=" + slingId
+                + ", lastEstablishedViewId=" + lastEstablishedViewId);
+        failedEstablishedViewId = lastEstablishedViewId;
+        discoveryServiceImpl.getClusterViewServiceImpl().invalidateEstablishedViewId(lastEstablishedViewId);
+    }
+    
     /**
      * Management function to trigger the otherwise algorithm-dependent
      * start of a new voting.