SLING-5195 : HeartbeatHandler now has a dedicated separately scheduled Runnable that periodically checksForTopologyChanges - to avoid possibility of being blocked by long-during session-saves. Additionally, it checks if its own heartbeat is older than a heartbeat timeout - in which case it treats itself as TOPOLOGY_CHANGING.
git-svn-id: https://svn.apache.org/repos/asf/sling/trunk@1711257 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/src/main/java/org/apache/sling/discovery/impl/DiscoveryServiceImpl.java b/src/main/java/org/apache/sling/discovery/impl/DiscoveryServiceImpl.java
index 3822c6d..cd9473a 100644
--- a/src/main/java/org/apache/sling/discovery/impl/DiscoveryServiceImpl.java
+++ b/src/main/java/org/apache/sling/discovery/impl/DiscoveryServiceImpl.java
@@ -121,7 +121,7 @@
private ConnectorRegistry connectorRegistry;
@Reference
- private ClusterViewService clusterViewService;
+ private ClusterViewServiceImpl clusterViewService;
@Reference
private Config config;
@@ -139,7 +139,7 @@
public static BaseDiscoveryService testConstructor(ResourceResolverFactory resourceResolverFactory,
AnnouncementRegistry announcementRegistry,
ConnectorRegistry connectorRegistry,
- ClusterViewService clusterViewService,
+ ClusterViewServiceImpl clusterViewService,
HeartbeatHandler heartbeatHandler,
SlingSettingsService settingsService,
Scheduler scheduler,
@@ -651,6 +651,10 @@
return clusterViewService;
}
+ public ClusterViewServiceImpl getClusterViewServiceImpl() {
+ return clusterViewService;
+ }
+
protected AnnouncementRegistry getAnnouncementRegistry() {
return announcementRegistry;
}
diff --git a/src/main/java/org/apache/sling/discovery/impl/cluster/ClusterViewServiceImpl.java b/src/main/java/org/apache/sling/discovery/impl/cluster/ClusterViewServiceImpl.java
index 1e1b9a4..cb3fb4a 100644
--- a/src/main/java/org/apache/sling/discovery/impl/cluster/ClusterViewServiceImpl.java
+++ b/src/main/java/org/apache/sling/discovery/impl/cluster/ClusterViewServiceImpl.java
@@ -44,7 +44,7 @@
* currently established view
*/
@Component
-@Service(value = ClusterViewService.class)
+@Service(value = {ClusterViewService.class, ClusterViewServiceImpl.class})
public class ClusterViewServiceImpl implements ClusterViewService {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@@ -58,6 +58,8 @@
@Reference
private Config config;
+ private String failedEstablishedViewId;
+
public static ClusterViewService testConstructor(SlingSettingsService settingsService,
ResourceResolverFactory factory, Config config) {
ClusterViewServiceImpl service = new ClusterViewServiceImpl();
@@ -73,6 +75,15 @@
}
return settingsService.getSlingId();
}
+
+ public void invalidateEstablishedViewId(String establishedViewId) {
+ if (establishedViewId != null &&
+ (failedEstablishedViewId == null ||
+ !failedEstablishedViewId.equals(establishedViewId))) {
+ logger.info("invalidateEstablishedViewId: marking established view as invalid: "+establishedViewId);;
+ }
+ failedEstablishedViewId = establishedViewId;
+ }
public LocalClusterView getLocalClusterView() throws UndefinedClusterViewException {
if (resourceResolverFactory==null) {
@@ -91,6 +102,16 @@
throw new UndefinedClusterViewException(Reason.NO_ESTABLISHED_VIEW,
"no established view at the moment");
}
+
+ if (failedEstablishedViewId != null
+ && failedEstablishedViewId.equals(view.getResource().getName())) {
+ // SLING-5195 : the heartbeat-handler-self-check has declared the currently
+ // established view as invalid - hence we should now treat this as
+ // undefined clusterview
+ logger.debug("getClusterView: current establishedView is marked as invalid: "+failedEstablishedViewId);
+ throw new UndefinedClusterViewException(Reason.NO_ESTABLISHED_VIEW,
+ "current established view was marked as invalid");
+ }
EstablishedClusterView clusterViewImpl = new EstablishedClusterView(
config, view, getSlingId());
@@ -106,11 +127,19 @@
throw new UndefinedClusterViewException(Reason.ISOLATED_FROM_TOPOLOGY,
"established view does not include local instance - isolated");
}
+ } catch (UndefinedClusterViewException e) {
+ // pass through
+ throw e;
} catch (LoginException e) {
logger.error(
"handleEvent: could not log in administratively: " + e, e);
throw new UndefinedClusterViewException(Reason.REPOSITORY_EXCEPTION,
"could not log in administratively: "+e);
+ } catch (Exception e) {
+ logger.error(
+ "handleEvent: got an exception: " + e, e);
+ throw new UndefinedClusterViewException(Reason.REPOSITORY_EXCEPTION,
+ "could not log in administratively: "+e);
} finally {
if (resourceResolver != null) {
resourceResolver.close();
diff --git a/src/main/java/org/apache/sling/discovery/impl/common/ViewHelper.java b/src/main/java/org/apache/sling/discovery/impl/common/ViewHelper.java
index 7586160..d415318 100644
--- a/src/main/java/org/apache/sling/discovery/impl/common/ViewHelper.java
+++ b/src/main/java/org/apache/sling/discovery/impl/common/ViewHelper.java
@@ -117,23 +117,4 @@
}
}
- /**
- * Check if the established view matches the given set of slingIds
- */
- public static boolean establishedViewMatches(
- final ResourceResolver resourceResolver, final Config config, final Set<String> view) {
- final View establishedView = ViewHelper.getEstablishedView(resourceResolver, config);
- if (establishedView == null) {
- return false;
- } else {
- String mismatchDetails = establishedView.matches(view);
- if (mismatchDetails != null) {
- logger.info("establishedViewMatches: established view does not match. (details: " + mismatchDetails + ")");
- } else {
- logger.debug("establishedViewMatches: established view matches with expected.");
- }
- return mismatchDetails == null;
- }
- }
-
}
diff --git a/src/main/java/org/apache/sling/discovery/impl/common/heartbeat/HeartbeatHandler.java b/src/main/java/org/apache/sling/discovery/impl/common/heartbeat/HeartbeatHandler.java
index bea64a2..44efedd 100644
--- a/src/main/java/org/apache/sling/discovery/impl/common/heartbeat/HeartbeatHandler.java
+++ b/src/main/java/org/apache/sling/discovery/impl/common/heartbeat/HeartbeatHandler.java
@@ -21,6 +21,7 @@
import java.util.Calendar;
import java.util.Date;
import java.util.Iterator;
+import java.util.List;
import java.util.Set;
import java.util.UUID;
@@ -48,6 +49,7 @@
import org.apache.sling.discovery.impl.cluster.voting.VotingHandler;
import org.apache.sling.discovery.impl.cluster.voting.VotingHelper;
import org.apache.sling.discovery.impl.cluster.voting.VotingView;
+import org.apache.sling.discovery.impl.common.View;
import org.apache.sling.discovery.impl.common.ViewHelper;
import org.apache.sling.launchpad.api.StartupListener;
import org.apache.sling.settings.SlingSettingsService;
@@ -108,10 +110,14 @@
private long firstHeartbeatWritten = -1;
/** SLING-2892: remember the value of the heartbeat this instance has written the last time **/
- private Calendar lastHeartbeatWritten = null;
+ private volatile Calendar lastHeartbeatWritten = null;
private DiscoveryServiceImpl discoveryServiceImpl;
+ private String lastEstablishedViewId;
+
+ protected String failedEstablishedViewId;
+
/** for testing only **/
public static HeartbeatHandler testConstructor(
SlingSettingsService slingSettingsService,
@@ -119,7 +125,8 @@
AnnouncementRegistry announcementRegistry,
ConnectorRegistry connectorRegistry,
Config config,
- Scheduler scheduler) {
+ Scheduler scheduler,
+ VotingHandler votingHandler) {
HeartbeatHandler handler = new HeartbeatHandler();
handler.slingSettingsService = slingSettingsService;
handler.resourceResolverFactory = factory;
@@ -127,6 +134,7 @@
handler.connectorRegistry = connectorRegistry;
handler.config = config;
handler.scheduler = scheduler;
+ handler.votingHandler = votingHandler;
return handler;
}
@@ -177,6 +185,12 @@
logger.info("doActivate: activated with runtimeId: {}, slingId: {}", runtimeId, slingId);
}
+ @Override
+ protected void deactivate() {
+ super.deactivate();
+ scheduler.removeJob(NAME+".checkForTopologyChange");
+ }
+
/**
* The initialize method is called by the DiscoveryServiceImpl.activate
* as we require the discoveryService (and the discoveryService has
@@ -201,16 +215,64 @@
}
try {
- final long interval = config.getHeartbeatInterval();
+ long interval = config.getHeartbeatInterval();
logger.info("initialize: starting periodic heartbeat job for "+slingId+" with interval "+interval+" sec.");
if (interval==0) {
- logger.warn("initialize: Repeat interval cannot be zero.");
+ logger.warn("initialize: Repeat interval cannot be zero. Defaulting to 10sec");
+ interval = 10;
}
scheduler.addPeriodicJob(NAME, this,
null, interval, false);
} catch (Exception e) {
logger.error("activate: Could not start heartbeat runner: " + e, e);
}
+
+ // SLING-5195 - to account for repository delays, the writing of heartbeats and voting
+ // should be done independently of getting the current clusterView and
+ // potentially sending a topology event.
+ // so this second part is now done (additionally) in a 2nd runner here:
+ try {
+ long interval = config.getHeartbeatInterval();
+ logger.info("initialize: starting periodic heartbeat job for "+slingId+" with interval "+interval+" sec.");
+ if (interval==0) {
+ logger.warn("initialize: Repeat interval cannot be zero. Defaulting to 10sec.");
+ interval = 10;
+ }
+ scheduler.addPeriodicJob(NAME+".checkForTopologyChange", new Runnable() {
+
+ @Override
+ public void run() {
+ Calendar lastHb = lastHeartbeatWritten;
+ if (lastHb!=null) {
+ // check to see when we last wrote a heartbeat
+ // if it is older than the configured timeout,
+ // then mark ourselves as in topologyChanging automatically
+ long timeSinceHb = System.currentTimeMillis() - lastHb.getTimeInMillis();
+ if (timeSinceHb > config.getHeartbeatTimeoutMillis()) {
+ logger.info("checkForTopologyChange/.run: time since local instance last wrote a heartbeat is "+timeSinceHb+"ms. Flagging us as (still) changing");
+ // mark the current establishedView as faulty
+ invalidateCurrentEstablishedView();
+
+ // then tell the listeners immediately
+ // note that just calling handleTopologyChanging alone - without the above invalidate -
+ // won't be sufficient, because that would only affect the listeners, not the
+ // getTopology() call.
+ discoveryService.handleTopologyChanging();
+ return;
+ }
+ }
+ // SLING-5195: guarantee frequent calls to checkForTopologyChange,
+ // independently of blocked write/save operations
+ logger.debug("checkForTopologyChange/.run: going to check for topology change...");
+ discoveryService.checkForTopologyChange();
+ logger.debug("checkForTopologyChange/.run: check for topology change done.");
+ }
+
+ },
+ null, interval, false);
+ } catch (Exception e) {
+ logger.error("activate: Could not start heartbeat runner: " + e, e);
+ }
}
/** Get or create a ResourceResolver **/
@@ -324,6 +386,7 @@
// sling instance accessing the same repository (ie in the same cluster)
// using the same sling.id - hence writing to the same
// resource
+ invalidateCurrentEstablishedView();
discoveryServiceImpl.handleTopologyChanging();
logger.error("issueClusterLocalHeartbeat: SLING-2892: Detected unexpected, concurrent update of: "+
myClusterNodePath+" 'lastHeartbeat'. If not done manually, " +
@@ -344,6 +407,7 @@
// someone deleted the resource property
firstHeartbeatWritten = -1;
} else if (!runtimeId.equals(readRuntimeId)) {
+ invalidateCurrentEstablishedView();
discoveryServiceImpl.handleTopologyChanging();
final String slingHomePath = slingSettingsService==null ? "n/a" : slingSettingsService.getSlingHomePath();
final String endpointsAsString = getEndpointsAsString();
@@ -390,7 +454,7 @@
this.newLeaderElectionId = null;
resourceMap.put("leaderElectionId", newLeaderElectionId);
resourceMap.put("leaderElectionIdCreatedAt", new Date());
- logger.info("issueClusterLocalHeartbeat: set leaderElectionId to "+newLeaderElectionId);
+ logger.info("issueClusterLocalHeartbeat: set leaderElectionId to "+newLeaderElectionId+" (resetLeaderElectionId: "+resetLeaderElectionId+")");
if (votingHandler!=null) {
votingHandler.setLeaderElectionId(newLeaderElectionId);
}
@@ -493,6 +557,9 @@
}
}
+ final View establishedView = ViewHelper.getEstablishedView(resourceResolver, config);
+ lastEstablishedViewId = establishedView == null ? null : establishedView.getResource().getName();
+
final VotingView winningVoting = VotingHelper.getWinningVoting(
resourceResolver, config);
int numOpenNonWinningVotes = VotingHelper.listOpenNonWinningVotings(
@@ -503,6 +570,7 @@
// but first: make sure we sent the TOPOLOGY_CHANGING
logger.info("doCheckViewWith: there are pending votings, marking topology as changing...");
+ invalidateCurrentEstablishedView();
discoveryServiceImpl.handleTopologyChanging();
if (logger.isDebugEnabled()) {
@@ -518,7 +586,28 @@
final Set<String> liveInstances = ViewHelper.determineLiveInstances(
clusterNodesRes, config);
- if (ViewHelper.establishedViewMatches(resourceResolver, config, liveInstances)) {
+ boolean establishedViewMatches;
+ if (lastEstablishedViewId != null && failedEstablishedViewId != null
+ && lastEstablishedViewId.equals(failedEstablishedViewId)) {
+ // SLING-5195 : heartbeat-self-check caused this establishedViewId
+ // to be declared as failed - so we must now cause a new voting
+ logger.info("doCheckView: current establishedViewId ({}) was declared as failed earlier already.", lastEstablishedViewId);
+ establishedViewMatches = false;
+ } else {
+ if (establishedView == null) {
+ establishedViewMatches = false;
+ } else {
+ String mismatchDetails = establishedView.matches(liveInstances);
+ if (mismatchDetails != null) {
+ logger.info("doCheckView: established view does not match. (details: " + mismatchDetails + ")");
+ } else {
+ logger.debug("doCheckView: established view matches with expected.");
+ }
+ establishedViewMatches = mismatchDetails == null;
+ }
+ }
+
+ if (establishedViewMatches) {
// that's the normal case. the established view matches what we're
// seeing.
// all happy and fine
@@ -528,8 +617,15 @@
// immediately send a TOPOLOGY_CHANGING - could already be sent, but just to be sure
logger.info("doCheckViewWith: no matching established view, marking topology as changing");
+ invalidateCurrentEstablishedView();
discoveryServiceImpl.handleTopologyChanging();
+ List<VotingView> myYesVotes = VotingHelper.getYesVotingsOf(resourceResolver, config, slingId);
+ if (myYesVotes != null && myYesVotes.size() > 0) {
+ logger.info("doCheckViewWith: I have voted yes (" + myYesVotes.size() + "x)- the vote was not yet promoted but expecting it to be soon. Not voting again in the meantime. My yes vote was for: "+myYesVotes);
+ return;
+ }
+
if (logger.isDebugEnabled()) {
logger.debug("doCheckViewWith: no pending nor winning votes. But: view does not match established or no established yet. Initiating a new voting");
Iterator<String> it = liveInstances.iterator();
@@ -554,7 +650,23 @@
VotingView.newVoting(resourceResolver, config, votingId, slingId, liveInstances);
}
-
+
+ /**
+ * Mark the current establishedView as invalid - requiring it to be
+ * replaced with a new one, be it by another instance or this one,
+ * via a new vote
+ */
+ public void invalidateCurrentEstablishedView() {
+ if (lastEstablishedViewId == null) {
+ logger.info("invalidateCurrentEstablishedView: cannot invalidate, lastEstablishedViewId==null");
+ return;
+ }
+ logger.info("invalidateCurrentEstablishedView: invalidating slingId=" + slingId
+ + ", lastEstablishedViewId=" + lastEstablishedViewId);
+ failedEstablishedViewId = lastEstablishedViewId;
+ discoveryServiceImpl.getClusterViewServiceImpl().invalidateEstablishedViewId(lastEstablishedViewId);
+ }
+
/**
* Management function to trigger the otherwise algorithm-dependent
* start of a new voting.