blob: 694335ed7d256f6a44e4a0034766d22d73e8bf0d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.jackrabbit.oak.plugins.document;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import javax.jcr.Value;
import org.apache.jackrabbit.commons.SimpleValueFactory;
import org.apache.jackrabbit.oak.api.Descriptors;
import org.apache.jackrabbit.oak.osgi.OsgiWhiteboard;
import org.apache.jackrabbit.oak.spi.cluster.ClusterRepositoryInfo;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.commit.Observer;
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.jetbrains.annotations.NotNull;
import org.osgi.framework.Version;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.component.annotations.ReferenceCardinality;
import org.osgi.service.component.annotations.ReferencePolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The DocumentDiscoveryLiteService is taking care of providing a repository
* descriptor that contains the current cluster-view details.
* <p>
* The clusterView is provided via a repository descriptor (see
* OAK_DISCOVERYLITE_CLUSTERVIEW)
* <p>
* The cluster-view lists all instances (ever) known in the cluster in one of
* the following states:
* <ul>
* <li>active: the instance is currently running and has an up-to-date lease
* </li>
* <li>deactivating: the instance failed to update the lease recently thus a
* recovery is happening - or it has just finished and the local instance is yet
* to do a backgroundRead before it has finished reading the crashed/shutdown
* instance's last changes</li>
* <li>inactive: the instance is currently not running and all its changes have
* been seen by the local instance</li>
* </ul>
* <p>
* Additionally, the cluster-view is assigned a monotonically increasing
* sequence number to. This sequence number is persisted, thus all instances in
* the cluster will show the same sequence number for a particular cluster-view
* in time.
* <p>
* Note that the 'deactivating' state might be hiding some complexity that is
* deliberately not shown: for the documentNS the state 'deactivating' consists
* of two substates: 'recovering' as in _lastRevs are updated, and 'backlog
* processing' for a pending backgroundRead to get the latest head state of a
* crashed/shutdown instance. So when an instance is in 'deactivating' state, it
* is not indicated via the cluster-view whether it is recovering or has backlog
* to process. However, the fact that an instance has to yet do a backgroundRead
* to get changes is a per-instance story: other instances might already have
* done the backgroundRead and thus no longer have a backlog for the instance(s)
* that left. Even though 'deactivating' therefore is dependent on the instance
* you get the information from, the cluster-view must have a sequence number
* that uniquely identifies it in the cluster. These two constraints conflict.
* As a simple solution to handle this case nevertheless, the 'final' flag has
* been introduced: the cluster-view has this flag 'final' set to true when the
* view is final and nothing will be changed in this sequence number anymore. If
* the 'final' flag is false however it indicates that the cluster-view with
* this particular sequence number might still experience a change (more
* concretely: the deactivating instances might change). Note that there
* alternatives to this 'final' flag have been discussed, such as using
* vector-counters, but there was no obvious gain achieve using an alternative
* approach.
* <p>
* In other words: whenever the 'final' flag is false, the view must be
* interpreted as 'in flux' wrt the deactivating/inactive instances and any
* action that depends on stable deactivating/inactive instances must not yet be
* done until the 'final' flag becomes true.
* <p>
* Underneath, the DocumentDiscoveryLiteService uses the clusterNodes collection
* to derive the clusterView, which it stores in the settings collection.
* Whenever it updates the clusterView it increments the sequence number by 1.
* <p>
* While this new 'clusterView' document in the settings collection sounds like
* redundant data (since it is just derived from the clusterNodes), it actually
* is not. By persisting the clusterView it becomes the new source of truth wrt
* what the clusterView looks like. And no two instances in the same cluster can
* make different conclusions based eg on different clocks they have or based on
* reading the clusterNodes in a slightly different moment etc. Also, the
* clusterView allows to store a the sequence number
* (which allows the instances to make reference to the same clusterView, and be
* able to simply detect whether anything has changed)
* <p>
* Prerequisites that the clusterView mechanism is stable:
* <ul>
* <li>the machine clocks are reasonably in sync - that is, they should be off
* by magnitudes less than the lease updateFrequency/timeout</li>
* <li>the write-delays from any instance to the mongo server where the
* clusterNodes and settings collections are stored should be very fast - at
* least orders of magnitudes lower again than the lease timeout</li>
* <li>when this instance notices that others have kicked it out of the
* clusterView (which it can find out when either its clusterNodes document is
* set to recovering or it is not in the clusterView anymore, although it just
* was - ie not just because of a fresh start), then this instance must step
* back gracefully. The exact definition is to be applied elsewhere - but it
* should include: stopping to update its own lease, waiting for the view to
* have stabilized - waiting for recovery of its own instance by the remaining
* instances in the cluster to have finished - and then probably waiting for
* another gracePeriod until it might rejoin the cluster. In between, any commit
* should fail with BannedFromClusterException</li>
* </ul>
*
* @see #OAK_DISCOVERYLITE_CLUSTERVIEW
*/
@Component(
name = DocumentDiscoveryLiteService.COMPONENT_NAME,
immediate = true,
service = { DocumentDiscoveryLiteService.class, Observer.class })
public class DocumentDiscoveryLiteService implements ClusterStateChangeListener, Observer {
static final String COMPONENT_NAME = "org.apache.jackrabbit.oak.plugins.document.DocumentDiscoveryLiteService";
/**
* Name of the repository descriptor via which the clusterView is published
* - which is the raison d'etre of the DocumentDiscoveryLiteService
**/
public static final String OAK_DISCOVERYLITE_CLUSTERVIEW = "oak.discoverylite.clusterview";
private static final Logger logger = LoggerFactory.getLogger(DocumentDiscoveryLiteService.class);
/** describes the reason why the BackgroundWorker should be woken up **/
private static enum WakeupReason {
CLUSTER_STATE_CHANGED, BACKGROUND_READ_FINISHED
}
/**
* The BackgroundWorker is taking care of regularly invoking checkView -
* which in turn will detect if anything changed
**/
private class BackgroundWorker implements Runnable {
final Random random = new Random();
boolean stopped = false;
private void stop() {
logger.trace("stop: start");
synchronized (BackgroundWorker.this) {
stopped = true;
}
logger.trace("stop: end");
}
@Override
public void run() {
logger.info("BackgroundWorker.run: start");
try {
doRun();
} finally {
logger.info("BackgroundWorker.run: end {finally}");
}
}
private void doRun() {
while (!stopped) {
try {
logger.trace("BackgroundWorker.doRun: going to call checkView");
boolean shortSleep = checkView();
logger.trace("BackgroundWorker.doRun: checkView terminated with {} (=shortSleep)", shortSleep);
long sleepMillis = shortSleep ? (50 + random.nextInt(450)) : 5000;
logger.trace("BackgroundWorker.doRun: sleeping {}ms", sleepMillis);
synchronized (BackgroundWorker.this) {
if (stopped)
return;
BackgroundWorker.this.wait(sleepMillis);
if (stopped)
return;
}
logger.trace("BackgorundWorker.doRun: done sleeping, looping");
} catch (Exception e) {
logger.error("doRun: got an exception: " + e, e);
try {
Thread.sleep(5000);
} catch (Exception e2) {
logger.error("doRun: got an exception while sleeping due to another exception: " + e2, e2);
}
}
}
}
}
/** This provides the 'clusterView' repository descriptors **/
private class DiscoveryLiteDescriptor implements Descriptors {
final SimpleValueFactory factory = new SimpleValueFactory();
@Override
public String[] getKeys() {
return new String[] { OAK_DISCOVERYLITE_CLUSTERVIEW };
}
@Override
public boolean isStandardDescriptor(String key) {
if (!OAK_DISCOVERYLITE_CLUSTERVIEW.equals(key)) {
return false;
}
return true;
}
@Override
public boolean isSingleValueDescriptor(String key) {
if (!OAK_DISCOVERYLITE_CLUSTERVIEW.equals(key)) {
return false;
}
return true;
}
@Override
public Value getValue(String key) {
if (!OAK_DISCOVERYLITE_CLUSTERVIEW.equals(key)) {
return null;
}
return factory.createValue(getClusterViewAsDescriptorValue());
}
@Override
public Value[] getValues(String key) {
if (!OAK_DISCOVERYLITE_CLUSTERVIEW.equals(key)) {
return null;
}
return new Value[] { getValue(key) };
}
}
/** DocumentNodeStore's (hence local) clusterId **/
private int clusterNodeId = -1;
/**
* the DocumentNodeStore - used to get the active/inactive cluster ids from
**/
private DocumentNodeStore documentNodeStore;
/**
* background job that periodically verifies and updates the clusterView
**/
private BackgroundWorker backgroundWorker;
/** the ClusterViewDocument which was used in the last checkView run **/
private ClusterViewDocument previousClusterViewDocument;
/**
* the ClusterView that was valid as a result of the previous checkView run
**/
private ClusterView previousClusterView;
/**
* kept volatile as this is frequently read in contentChanged which is
* better kept unsynchronized as long as possible
**/
private volatile boolean hasInstancesWithBacklog;
/**
* Require a static reference to the NodeStore. Note that this implies the
* service is only active for documentNS
**/
@Reference
private DocumentNodeStore nodeStore;
/**
* inactive nodes that have been so for a while, ie they have no backlog
* anymore, so no need to check for backlog every time
**/
private Set<Integer> longTimeInactives = new HashSet<Integer>();
/**
* returns the clusterView as a json value for it to be provided via the
* repository descriptor
**/
private String getClusterViewAsDescriptorValue() {
if (previousClusterView == null) {
return null;
} else {
return previousClusterView.asDescriptorValue();
}
}
/**
* On activate the DocumentDiscoveryLiteService tries to start the
* background job
*/
@Activate
public void activate(ComponentContext context) {
logger.trace("activate: start");
// set the ClusterStateChangeListener with the DocumentNodeStore
this.documentNodeStore = (DocumentNodeStore) nodeStore;
documentNodeStore.setClusterStateChangeListener(this);
// retrieve the clusterId
clusterNodeId = documentNodeStore.getClusterId();
// start the background worker
backgroundWorker = new BackgroundWorker();
Thread th = new Thread(backgroundWorker, "DocumentDiscoveryLiteService-BackgroundWorker-[" + clusterNodeId + "]");
th.setDaemon(true);
th.start();
// register the Descriptors - for Oak to pass it upwards
if (context != null) {
OsgiWhiteboard whiteboard = new OsgiWhiteboard(context.getBundleContext());
whiteboard.register(Descriptors.class, new DiscoveryLiteDescriptor(), Collections.emptyMap());
}
logger.trace("activate: end");
}
/**
* On deactivate the background job is stopped - if it was running at all
**/
@Deactivate
protected void deactivate() {
logger.trace("deactivate: deactivated");
if (backgroundWorker != null) {
backgroundWorker.stop();
backgroundWorker = null;
}
logger.trace("deactivate: end");
}
/**
* Checks if anything changed in the current view and updates the service
* fields accordingly.
*
* @return true if anything changed or is about to be changed (eg
* recovery/backlog), false if the view is stable
*/
private boolean checkView() {
logger.trace("checkView: start");
List<ClusterNodeInfoDocument> allClusterNodes = ClusterNodeInfoDocument.all(documentNodeStore.getDocumentStore());
Map<Integer, ClusterNodeInfoDocument> allNodeIds = new HashMap<Integer, ClusterNodeInfoDocument>();
Map<Integer, ClusterNodeInfoDocument> activeNotTimedOutNodes = new HashMap<Integer, ClusterNodeInfoDocument>();
Map<Integer, ClusterNodeInfoDocument> activeButTimedOutNodes = new HashMap<Integer, ClusterNodeInfoDocument>();
Map<Integer, ClusterNodeInfoDocument> recoveringNodes = new HashMap<Integer, ClusterNodeInfoDocument>();
Map<Integer, ClusterNodeInfoDocument> backlogNodes = new HashMap<Integer, ClusterNodeInfoDocument>();
Map<Integer, ClusterNodeInfoDocument> inactiveNoBacklogNodes = new HashMap<Integer, ClusterNodeInfoDocument>();
for (Iterator<ClusterNodeInfoDocument> it = allClusterNodes.iterator(); it.hasNext();) {
ClusterNodeInfoDocument clusterNode = it.next();
if (!clusterNode.isInvisible()) {
allNodeIds.put(clusterNode.getClusterId(), clusterNode);
if (clusterNode.isBeingRecovered()) {
recoveringNodes.put(clusterNode.getClusterId(), clusterNode);
} else if (!clusterNode.isActive()) {
if (hasBacklog(clusterNode)) {
backlogNodes.put(clusterNode.getClusterId(), clusterNode);
} else {
inactiveNoBacklogNodes.put(clusterNode.getClusterId(), clusterNode);
}
} else if (clusterNode.getLeaseEndTime() < System.currentTimeMillis()) {
activeButTimedOutNodes.put(clusterNode.getClusterId(), clusterNode);
} else {
activeNotTimedOutNodes.put(clusterNode.getClusterId(), clusterNode);
}
}
}
// the current view should now consist of:
// activeNotTimedOutNodes and activeButTimedOutNodes!
// (reason for including the timedout: they will yet have to
// switch to recovering or inactive - but we DONT KNOW yet.. that's
// predicting the future - so so far we have to stick with
// including them in the view)
Map<Integer, ClusterNodeInfoDocument> allActives;
allActives = new HashMap<Integer, ClusterNodeInfoDocument>(activeNotTimedOutNodes);
allActives.putAll(activeButTimedOutNodes);
// terminology:
// 'inactivating' are nodes that are either 'recovering' or 'backlog'
// ones
// 'recovering' are nodes for which one node is doing the recover() of
// lastRevs
// 'backlog' ones are nodes that are no longer active, that have
// finished the
// recover() but for which a backgroundRead is still pending to read
// the latest root changes.
logger.debug(
"checkView: active nodes: {}, timed out nodes: {}, recovering nodes: {}, backlog nodes: {}, inactive nodes: {}, total: {}, hence view nodes: {}",
activeNotTimedOutNodes.size(), activeButTimedOutNodes.size(), recoveringNodes.size(), backlogNodes.size(),
inactiveNoBacklogNodes.size(), allNodeIds.size(), allActives.size());
ClusterViewDocument originalView = previousClusterViewDocument;
ClusterViewDocument newView = doCheckView(allActives.keySet(), recoveringNodes.keySet(), backlogNodes.keySet(),
inactiveNoBacklogNodes.keySet());
if (newView == null) {
logger.trace("checkView: end. newView: null");
return true;
}
boolean newHasInstancesWithBacklog = recoveringNodes.size() > 0 || backlogNodes.size() > 0;
boolean changed = originalView == null || (newView.getViewSeqNum() != originalView.getViewSeqNum())
|| (newHasInstancesWithBacklog != hasInstancesWithBacklog);
logger.debug("checkView: viewFine: {}, changed: {}, originalView: {}, newView: {}", newView != null, changed, originalView,
newView);
if (longTimeInactives.addAll(inactiveNoBacklogNodes.keySet())) {
logger.debug("checkView: updated longTimeInactives to {} (inactiveNoBacklogNodes: {})", longTimeInactives,
inactiveNoBacklogNodes);
}
if (changed) {
String clusterId = ClusterRepositoryInfo.getOrCreateId(documentNodeStore);
ClusterView v = ClusterView.fromDocument(clusterNodeId, clusterId, newView, backlogNodes.keySet());
ClusterView previousView = previousClusterView;
previousClusterView = v;
hasInstancesWithBacklog = newHasInstancesWithBacklog;
logger.info("checkView: view changed from: " + previousView + ", to: " + v + ", hasInstancesWithBacklog: "
+ hasInstancesWithBacklog);
return true;
} else {
logger.debug("checkView: no changes whatsoever, still at view: " + previousClusterView);
return hasInstancesWithBacklog;
}
}
private Revision getLastKnownRevision(int clusterNodeId) {
String[] lastKnownRevisions = documentNodeStore.getMBean().getLastKnownRevisions();
for (int i = 0; i < lastKnownRevisions.length; i++) {
String aLastKnownRevisionStr = lastKnownRevisions[i];
String[] split = aLastKnownRevisionStr.split("=");
if (split.length == 2) {
try {
Integer id = Integer.parseInt(split[0]);
if (id == clusterNodeId) {
final Revision lastKnownRev = Revision.fromString(split[1]);
logger.trace("getLastKnownRevision: end. clusterNode: {}, lastKnownRevision: {}", clusterNodeId,
lastKnownRev);
return lastKnownRev;
}
} catch (NumberFormatException nfe) {
logger.warn("getLastKnownRevision: could not parse integer '" + split[0] + "': " + nfe, nfe);
}
} else {
logger.warn("getLastKnownRevision: cannot parse lastKnownRevision: " + aLastKnownRevisionStr);
}
}
logger.warn("getLastKnownRevision: no lastKnownRevision found for " + clusterNodeId);
return null;
}
/** package access only for testing **/
boolean hasBacklog(ClusterNodeInfoDocument clusterNode) {
if (logger.isTraceEnabled()) {
logger.trace("hasBacklog: start. clusterNodeId: {}", clusterNode.getClusterId());
}
Revision lastKnownRevision = getLastKnownRevision(clusterNode.getClusterId());
if (lastKnownRevision == null) {
logger.warn("hasBacklog: no lastKnownRevision found, hence cannot determine backlog for node "
+ clusterNode.getClusterId());
return false;
}
// The lastKnownRevision is what the local instance has last read/seen
// from another instance.
// This must be compared to what the other instance *actually* has
// written as the very last thing.
// Now the knowledge what the other instance has last written (after
// recovery) would sit
// in the root document - so that could in theory be used. But reading
// the root document
// would have to be done *uncached*. And that's quite a change to what
// the original
// idea was: that the root document would only be read every second, to
// avoid contention.
// So this 'what the other instance has last written' information is
// retrieved via
// a new, dedicated property in the clusterNodes collection: the
// 'lastWrittenRootRev'.
// The 'lastWrittenRootRev' is written by 'UnsavedModifications' during
// backgroundUpdate
// and retrieved here quite regularly (but it should not be a big deal,
// as the
// discovery-lite is the only one reading this field so frequently and
// it does not
// interfere with normal (jcr) nodes at all).
String lastWrittenRootRevStr = clusterNode.getLastWrittenRootRev();
if (lastWrittenRootRevStr == null) {
boolean warn = false;
Object oakVersion = clusterNode.get(ClusterNodeInfo.OAK_VERSION_KEY);
if (oakVersion != null && (oakVersion instanceof String)) {
try {
warn = versionPredates((String) oakVersion, "1.3.5");
} catch (Exception e) {
logger.debug("hasBacklog: couldn't parse version " + oakVersion + " : " + e);
warn = true;
}
}
if (warn) {
logger.warn("hasBacklog: node has lastWrittenRootRev=null");
} else {
logger.debug("hasBacklog: node has lastWrittenRootRev=null");
}
return false;
}
Revision lastWrittenRootRev = Revision.fromString(lastWrittenRootRevStr);
if (lastWrittenRootRev == null) {
logger.warn("hasBacklog: node has no lastWrittenRootRev: " + clusterNode.getClusterId());
return false;
}
boolean hasBacklog = Revision.getTimestampDifference(lastKnownRevision, lastWrittenRootRev) < 0;
if (logger.isDebugEnabled()) {
logger.debug("hasBacklog: clusterNodeId: {}, lastKnownRevision: {}, lastWrittenRootRev: {}, hasBacklog: {}",
clusterNode.getClusterId(), lastKnownRevision, lastWrittenRootRev, hasBacklog);
}
return hasBacklog;
}
static boolean versionPredates(String base, String compare) {
Version one = Version.parseVersion(substSnapshotPrefix(compare));
Version two = Version.parseVersion(substSnapshotPrefix(base));
return two.compareTo(one) > 0;
}
private static String substSnapshotPrefix(String version) {
String snapshot = "-SNAPSHOT";
return version.endsWith(snapshot) ? version.substring(0, version.length() - snapshot.length()) + ".999999" : version;
}
private ClusterViewDocument doCheckView(final Set<Integer> activeNodes, final Set<Integer> recoveringNodes,
final Set<Integer> backlogNodes, final Set<Integer> inactiveNodes) {
logger.trace("doCheckView: start: activeNodes: {}, recoveringNodes: {}, backlogNodes: {}, inactiveNodes: {}", activeNodes,
recoveringNodes, backlogNodes, inactiveNodes);
Set<Integer> allInactives = new HashSet<Integer>();
allInactives.addAll(inactiveNodes);
allInactives.addAll(backlogNodes);
if (activeNodes.size() == 0) {
// then we have zero active nodes - that's nothing expected as that
// includes our own node not to be active
// hence handle with care - ie wait until we get an active node
logger.warn("doCheckView: empty active ids. activeNodes:{}, recoveringNodes:{}, inactiveNodes:{}", activeNodes,
recoveringNodes, inactiveNodes);
return null;
}
ClusterViewDocument newViewOrNull;
try {
newViewOrNull = ClusterViewDocument.readOrUpdate(documentNodeStore, activeNodes, recoveringNodes, allInactives);
} catch (RuntimeException re) {
logger.error("doCheckView: RuntimeException: re: " + re, re);
return null;
} catch (Error er) {
logger.error("doCheckView: Error: er: " + er, er);
return null;
}
logger.trace("doChckView: readOrUpdate result: {}", newViewOrNull);
// and now for some verbose documentation and logging:
if (newViewOrNull == null) {
// then there was a concurrent update of the clusterView
// and we should do some quick backoff sleeping
logger.debug("doCheckView: newViewOrNull is null: " + newViewOrNull);
return null;
} else {
// otherwise we now hold the newly valid view
// it could be the same or different to the previous one, let's
// check
if (previousClusterViewDocument == null) {
// oh ok, this is the very first one
previousClusterViewDocument = newViewOrNull;
logger.debug("doCheckView: end. first ever view: {}", newViewOrNull);
return newViewOrNull;
} else if (previousClusterViewDocument.getViewSeqNum() == newViewOrNull.getViewSeqNum()) {
// that's the normal case: the viewId matches, nothing has
// changed, we've already
// processed the previousClusterView, so:
logger.debug("doCheckView: end. seqNum did not change. view: {}", newViewOrNull);
return newViewOrNull;
} else {
// otherwise the view has changed
logger.info("doCheckView: view has changed from: {} to: {} - sending event...", previousClusterViewDocument,
newViewOrNull);
previousClusterViewDocument = newViewOrNull;
logger.debug("doCheckView: end. changed view: {}", newViewOrNull);
return newViewOrNull;
}
}
}
@Override
public void handleClusterStateChange() {
// handleClusterStateChange is needed to learn about any state change in
// the clusternodes
// collection asap and being able to react on it - so this will wake up
// the
// backgroundWorker which in turn will - in a separate thread - check
// the view
// and send out events accordingly
wakeupBackgroundWorker(WakeupReason.CLUSTER_STATE_CHANGED);
}
private void wakeupBackgroundWorker(WakeupReason wakeupReason) {
final BackgroundWorker bw = backgroundWorker;
if (bw != null) {
// get a copy of this.hasInstancesWithBacklog for just the code-part
// in this synchronized
boolean hasInstancesWithBacklog = this.hasInstancesWithBacklog;
if (wakeupReason == WakeupReason.BACKGROUND_READ_FINISHED) {
// then only forward the notify if' hasInstancesWithBacklog'
// ie, we have anything we could be waiting for - otherwise
// we dont need to wakeup the background thread
if (!hasInstancesWithBacklog) {
logger.trace(
"wakeupBackgroundWorker: not waking up backgroundWorker, as we do not have any instances with backlog");
return;
}
}
logger.trace("wakeupBackgroundWorker: waking up backgroundWorker, reason: {} (hasInstancesWithBacklog: {})",
wakeupReason, hasInstancesWithBacklog);
synchronized (bw) {
bw.notifyAll();
}
}
}
/**
* <p>
* Additionally the DocumentDiscoveryLiteService must be notified when the
* background-read has finished - as it could be waiting for a crashed
* node's recovery to finish - which it can only do by checking the
* lastKnownRevision of the crashed instance - and that check is best done
* after the background read is just finished (it could optionally do that
* just purely time based as well, but going via a listener is more timely,
* that's why this approach has been chosen).
*/
@Override
public void contentChanged(@NotNull NodeState root,@NotNull CommitInfo info) {
// contentChanged is only used to react as quickly as possible
// when we have instances that have a 'backlog' - ie when instances
// crashed
// and are being recovered - then we must wait until the recovery is
// finished
// AND until the subsequent background read actually reads that
// instance'
// last changes. To catch that moment as quickly as possible,
// this contentChanged is used.
// Now from the above it also results that this only wakes up the
// backgroundWorker if we have any pending 'backlogy instances'
// otherwise this is a no-op
if (info.isExternal()) {
// then ignore this as this is likely an external change
// note: it could be a compacted change, in which case we should
// probably still process it - but we have a 5sec fallback
// in the BackgroundWorker to handle that case too,
// so:
logger.trace("contentChanged: ignoring content change due to commit info belonging to external change");
return;
}
logger.trace("contentChanged: handling content changed by waking up worker if necessary");
wakeupBackgroundWorker(WakeupReason.BACKGROUND_READ_FINISHED);
}
}