SLING-10854 : introducing SlingIdCleanupTask to clean up old slingIds… (#13)
* SLING-10854 : introducing SlingIdCleanupTask to clean up old slingIds under /var/discovery/oak automatically
* SLING-10854 : more test coverage
* SLING-10854 : more test coverage
* SLING-10854 : logging cleanup
* SLING-10854 : modifier cleanup
* SLING-10854 : added run counter to log info
* SLING-10854 : fix variable name
Co-authored-by: Jörg Hoh <joerghoh@users.noreply.github.com>
* SLING-10854 : fix rename change
* SLING-10854 : log scheduledDate for convenience
* SLING-10854 : use resolver.getResource rather than ResourceHelper.getOrCreateResource to avoid unintended creation of resources
* SLING-10854 : remove unnecessary copy-paste-leftover revert
* SLING-10854 : beautify try catch clause
* SLING-10854 : more test coverage
* SLING-10854 : orphaned syncTokens support added
* SLING-10854 : disabled by default and introduced system property org.apache.sling.discovery.oak.slingidcleanup.enabled to enable
* SLING-10854 : added a minimal delay between a successful run and a follow-up run - to reduce unnecessary load on the repository for likely empty cleanup executions
* SLING-10854 : lower a logger to debug
* SLING-10854 : fix description
Co-authored-by: Jörg Hoh <joerghoh@users.noreply.github.com>
* SLING-10854 : test stability improvement
* SLING-10854 : avoid deleting slingIds ever seen by then leader
* SLING-10854 : minor comment fix
* SLING-10854 : shrink cleanup method size
* SLING-10854 : reduce log.infos
* SLING-10854 : getActiveSlingIdsFrom extracted
* SLING-10854 : some more code reductions
* SLING-10854 : added comment about not considering a slingId garbage if it was ever seen by the then leader
* SLING-10854 : minor javadoc rewording
* SLING-10854 : and some docu about the odd choice of 13h min delay added
* SLING-10854 : shorten config names
* SLING-10854 : rename getActiveSlignIdsFrom
* SLING-10854 : use streams rather than verbose for loops
Co-authored-by: Rishabh Kumar <rishabhdaim1991@gmail.com>
* SLING-10854 : fix stream code
* SLING-10854 : renamed method to topologyChanged
* SLING-10854 : fixed seenInstances concurrency
* SLING-10854 : log message improved
* SLING-10854 : reformating only
---------
Co-authored-by: Jörg Hoh <joerghoh@users.noreply.github.com>
Co-authored-by: Rishabh Kumar <rishabhdaim1991@gmail.com>
diff --git a/src/main/java/org/apache/sling/discovery/oak/SlingIdCleanupTask.java b/src/main/java/org/apache/sling/discovery/oak/SlingIdCleanupTask.java
new file mode 100644
index 0000000..3316cbc
--- /dev/null
+++ b/src/main/java/org/apache/sling/discovery/oak/SlingIdCleanupTask.java
@@ -0,0 +1,598 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.oak;
+
+import static org.osgi.util.converter.Converters.standardConverter;
+
+import java.util.Calendar;
+import java.util.Date;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import org.apache.sling.api.resource.LoginException;
+import org.apache.sling.api.resource.ModifiableValueMap;
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.api.resource.ValueMap;
+import org.apache.sling.commons.scheduler.ScheduleOptions;
+import org.apache.sling.commons.scheduler.Scheduler;
+import org.apache.sling.discovery.InstanceDescription;
+import org.apache.sling.discovery.TopologyEvent;
+import org.apache.sling.discovery.TopologyEvent.Type;
+import org.apache.sling.discovery.TopologyEventListener;
+import org.apache.sling.discovery.TopologyView;
+import org.osgi.framework.BundleContext;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Modified;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.metatype.annotations.AttributeDefinition;
+import org.osgi.service.metatype.annotations.Designate;
+import org.osgi.service.metatype.annotations.ObjectClassDefinition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A background task that cleans up garbage slingIds after topology changes.
+ * <p>
+ * A slingId is considered garbage when:
+ * <ul>
+ * <li>it is not in the current topology</li>
+ * <li>was not ever seen in previous topologies by the now leader instance</li>
+ * <li>it is not in the current idmap (where clusterNodeIds are reused hence
+ * that list stays small and does not need cleanup)</li>
+ * <li>its leaderElectionId was created more than 7 days ago (the
+ * leaderElectionId is created at activate time of the discovery.oak bundle -
+ * hence this more or less corresponds to the startup time of that
+ * instance)</li>
+ * </ul>
+ * The garbage accumulates at the following places, where it will thus be
+ * cleaned up from:
+ * <ul>
+ * <li>as child node under /var/discovery/oak/clusterInstances : this is the
+ * most performance critical garbage</li>
+ * <li>as a property key in /var/discovery/oak/syncTokens</li>
+ * </ul>
+ * The task by default is executed:
+ * <ul>
+ * <li>only on the leader</li>
+ * <li>10min after a TOPOLOGY_INIT or TOPOLOGY_CHANGED event</li>
+ * <li>with a maximum number of delete operations to avoid repository overload -
+ * that maximum is called batchSize and is 50 by default</li>
+ * <li>in subsequent intervals of 10min after the initial run, if that had to
+ * stop at the batchSize of 50 deletions</li>
+ * </ul>
+ * All parameters mentioned above can be configured.
+ * <p>
+ * Additionally, the cleanup is skipped for 13 hours after a successful cleanup.
+ * This is to avoid unnecessary load on the repository. The number of 13
+ * incorporates some heuristics such as : about 2 cleanup rounds per day maximum
+ * makes sense, if a leader is very long living, then the 1 additional hour
+ * makes it spread somewhat throughout the day. This is to further minimize any
+ * load side-effects.
+ */
+@Component
+@Designate(ocd = SlingIdCleanupTask.Conf.class)
+public class SlingIdCleanupTask implements TopologyEventListener, Runnable {
+
+ final static String SLINGID_CLEANUP_ENABLED_SYSTEM_PROPERTY_NAME = "org.apache.sling.discovery.oak.slingidcleanup.enabled";
+
+ /** default minimal cleanup delay at 13h, to intraday load balance */
+ final static long MIN_CLEANUP_DELAY_MILLIS = 46800000;
+
+ /**
+ * default age is 1 week : an instance that is not in the current topology,
+ * started 1 week ago is very unlikely to still be active
+ */
+ private static final long DEFAULT_MIN_CREATION_AGE_MILLIS = 604800000; // 1 week
+
+ /**
+ * initial delay is 10min : after a TOPOLOGY_INIT or TOPOLOGY_CHANGED on the
+ * leader, there should be a 10min delay before starting a round of cleanup.
+ * This is to not add unnecessary load after a startup/change.
+ */
+ private static final int DEFAULT_CLEANUP_INITIAL_DELAY = 600000; // 10min
+
+ /**
+ * default cleanup interval is 10min - this is together with the batchSize to
+ * lower repository load
+ */
+ private static final int DEFAULT_CLEANUP_INTERVAL = 600000; // 10min
+
+ /**
+ * default batch size is 50 deletions : normally there should not be much
+ * garbage around anyway, so normally it's just a few, 1-5 perhaps. If there's
+ * more than 50, that is probably a one-time cleanup after this feature is first
+ * rolled out. That one-time cleanup can actually take a considerable amount of
+ * time. So, to not overload the write load on the repository, the deletion is
+ * batched into 50 at any time - with 10min delays in between. That results in
+ * an average of 1 cleanup every 12 seconds, or 5 per minute, or 8640 per day,
+ * for a legacy cleanup.
+ */
+ private static final int DEFAULT_CLEANUP_BATCH_SIZE = 50;
+
+ /**
+ * The sling.commons.scheduler name, so that it can be cancelled upon topology
+ * changes.
+ */
+ private static final String SCHEDULE_NAME = "org.apache.sling.discovery.oak.SlingIdCleanupTask";
+
+ protected final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ @Reference
+ protected Scheduler scheduler;
+
+ @Reference
+ protected ResourceResolverFactory resourceResolverFactory;
+
+ @Reference
+ private Config config;
+
+ /**
+ * volatile flag to fast stop any ongoing deletion upon a change in the topology
+ */
+ private volatile boolean hasTopology = false;
+
+ /**
+ * volatile field to keep track of the current topology, shared between topology
+ * listener and deletion
+ */
+ @SuppressWarnings("all")
+ private volatile TopologyView currentView;
+
+ private int initialDelayMillis = DEFAULT_CLEANUP_INITIAL_DELAY;
+
+ private int intervalMillis = DEFAULT_CLEANUP_INTERVAL;
+
+ private int batchSize = DEFAULT_CLEANUP_BATCH_SIZE;
+
+ private long minCreationAgeMillis = DEFAULT_MIN_CREATION_AGE_MILLIS;
+
+ /** test counter that increments upon every scheduler invocation */
+ private AtomicInteger runCount = new AtomicInteger(0);
+
+ /** test counter that increments upon every batch deletion */
+ private AtomicInteger completionCount = new AtomicInteger(0);
+
+ /** test counter that keeps track of actually deleted slingIds */
+ private AtomicInteger deleteCount = new AtomicInteger(0);
+
+ /**
+ * flag to distinguish first from subsequent runs, as they might have different
+ * scheduler delays
+ */
+ private volatile boolean firstRun = true;
+
+ private long lastSuccessfulRun = -1;
+
+ /**
+ * Minimal delay after a successful cleanup round, in millis
+ */
+ private long minCleanupDelayMillis = MIN_CLEANUP_DELAY_MILLIS;
+
+ /**
+ * contains all slingIds ever seen by this instance - should not be a long list
+ * so not a memory issue
+ */
+ private Set<String> seenInstances = new CopyOnWriteArraySet<>();
+
+ @ObjectClassDefinition(name = "Apache Sling Discovery Oak SlingId Cleanup Task", description = "This task is in charge of cleaning up old SlingIds from the repository.")
+ public @interface Conf {
+
+ @AttributeDefinition(name = "Cleanup initial delay milliseconds", description = "Number of milliseconds to initially wait for the first cleanup")
+ int slingid_cleanup_initial_delay() default DEFAULT_CLEANUP_INITIAL_DELAY;
+
+ @AttributeDefinition(name = "Cleanup interval milliseconds", description = "Number of milliseconds after which to do another batch of cleaning up (if necessary)")
+ int slingid_cleanup_interval() default DEFAULT_CLEANUP_INTERVAL;
+
+ @AttributeDefinition(name = "Cleanup batch size", description = "Maximum number of slingIds to cleanup in one batch.")
+ int slingid_cleanup_batchsize() default DEFAULT_CLEANUP_BATCH_SIZE;
+
+ @AttributeDefinition(name = "Cleanup minimum creation age", description = "Minimum number of milliseconds since the slingId was created.")
+ long slingid_cleanup_min_creation_age() default DEFAULT_MIN_CREATION_AGE_MILLIS;
+ }
+
+ /**
+ * Test constructor
+ */
+ static SlingIdCleanupTask create(Scheduler scheduler, ResourceResolverFactory factory,
+ Config config, int initialDelayMillis, int intervalMillis, int batchSize,
+ long minCreationAgeMillis, long minCleanupDelayMillis) {
+ final SlingIdCleanupTask s = new SlingIdCleanupTask();
+ s.scheduler = scheduler;
+ s.resourceResolverFactory = factory;
+ s.config = config;
+ s.minCleanupDelayMillis = minCleanupDelayMillis;
+ s.config(initialDelayMillis, intervalMillis, batchSize, minCreationAgeMillis);
+ return s;
+ }
+
+ @Activate
+ protected void activate(final BundleContext bc, final Conf config) {
+ this.modified(bc, config);
+ }
+
+ @Modified
+ protected void modified(final BundleContext bc, final Conf config) {
+ if (config == null) {
+ return;
+ }
+ config(config.slingid_cleanup_initial_delay(), config.slingid_cleanup_interval(),
+ config.slingid_cleanup_batchsize(),
+ config.slingid_cleanup_min_creation_age());
+ }
+
+ @Deactivate
+ protected void deactivate() {
+ logger.info("deactivate : deactivated.");
+ hasTopology = false;
+ }
+
+ private void config(int initialDelayMillis, int intervalMillis, int batchSize,
+ long minCreationAgeMillis) {
+ this.initialDelayMillis = initialDelayMillis;
+ this.intervalMillis = intervalMillis;
+ this.batchSize = batchSize;
+ this.minCreationAgeMillis = minCreationAgeMillis;
+ logger.info(
+ "config: enabled = {}, initial delay milliseconds = {}, interval milliseconds = {}, batch size = {}, min creation age milliseconds = {}",
+ isEnabled(), initialDelayMillis, intervalMillis, batchSize,
+ minCreationAgeMillis);
+ }
+
+ @Override
+ public void handleTopologyEvent(TopologyEvent event) {
+ if (!isEnabled()) {
+ hasTopology = false; // stops potentially ongoing deletion
+ currentView = null;
+ // cancel cleanup schedule
+ stop();
+ logger.debug("handleTopologyEvent: slingId cleanup is disabled");
+ return;
+ }
+ final TopologyView newView = event.getNewView();
+ if (newView == null || event.getType() == Type.PROPERTIES_CHANGED) {
+ hasTopology = false; // stops potentially ongoing deletion
+ currentView = null;
+ // cancel cleanup schedule
+ stop();
+ } else {
+ hasTopology = true;
+ currentView = newView;
+ seenInstances.addAll(getActiveSlingIds(newView));
+ if (newView.getLocalInstance().isLeader()) {
+ // only execute on leader
+ recreateSchedule();
+ } else {
+ // should not be necessary, but lets stop anyway on non-leaders:
+ stop();
+ }
+ }
+ }
+
+ /**
+ * Cancels a potentially previously registered cleanup schedule.
+ */
+ private void stop() {
+ final Scheduler localScheduler = scheduler;
+ if (localScheduler == null) {
+ // should not happen
+ logger.warn("stop: no scheduler set, giving up.");
+ return;
+ }
+ final boolean unscheduled = localScheduler.unschedule(SCHEDULE_NAME);
+ logger.debug("stop: unschedule result={}", unscheduled);
+ }
+
+ /**
+ * Reads the system property that enables or disabled this tasks
+ */
+ private static boolean isEnabled() {
+ final String systemPropertyValue = System
+ .getProperty(SLINGID_CLEANUP_ENABLED_SYSTEM_PROPERTY_NAME);
+ return standardConverter().convert(systemPropertyValue).defaultValue(false)
+ .to(Boolean.class);
+ }
+
+ /**
+ * This method can be invoked at any time to reset the schedule to do a fresh
+ * round of cleanup.
+ * <p>
+ * This method is thread-safe : if called concurrently, the fact that
+ * scheduler.schedul is synchronized works out that ultimately there will be
+ * just 1 schedule active (which is what is the desired outcome).
+ */
+ private void recreateSchedule() {
+ final Scheduler localScheduler = scheduler;
+ if (localScheduler == null) {
+ // should not happen
+ logger.warn("recreateSchedule: no scheduler set, giving up.");
+ return;
+ }
+ final Calendar cal = Calendar.getInstance();
+ int delayMillis;
+ if (firstRun) {
+ delayMillis = initialDelayMillis;
+ } else {
+ delayMillis = intervalMillis;
+ }
+ cal.add(Calendar.MILLISECOND, delayMillis);
+ final Date scheduledDate = cal.getTime();
+ logger.debug(
+ "recreateSchedule: scheduling a cleanup in {} milliseconds from now, which is: {}",
+ delayMillis, scheduledDate);
+ ScheduleOptions options = localScheduler.AT(scheduledDate);
+ options.name(SCHEDULE_NAME);
+ options.canRunConcurrently(false); // should not concurrently execute
+ localScheduler.schedule(this, options);
+ }
+
+ /**
+ * Invoked via sling.commons.scheduler triggered from resetCleanupSchedule(). By
+ * default should get called at max every 5 minutes until cleanup is done or
+ * 10min after a topology change.
+ */
+ @Override
+ public void run() {
+ if (lastSuccessfulRun > 0 && System.currentTimeMillis()
+ - lastSuccessfulRun < minCleanupDelayMillis) {
+ logger.debug(
+ "run: last cleanup was {} millis ago, which is less than {} millis, therefore not cleaning up yet.",
+ System.currentTimeMillis() - lastSuccessfulRun,
+ minCleanupDelayMillis);
+ recreateSchedule();
+ return;
+ }
+ runCount.incrementAndGet();
+ if (!hasTopology) {
+ return;
+ }
+ boolean mightHaveMore = true;
+ try {
+ mightHaveMore = cleanup();
+ } catch (Exception e) {
+ // upon exception just log and retry in 10min
+ logger.error("run: got Exception while cleaning up slnigIds : " + e, e);
+ }
+ if (mightHaveMore) {
+ // then continue in 10min
+ recreateSchedule();
+ return;
+ }
+ // log successful cleanup done, yes, on info
+ logger.info(
+ "run: slingId cleanup done, run counter = {}, delete counter = {}, completion counter = {}",
+ getRunCount(), getDeleteCount(), getCompletionCount());
+ lastSuccessfulRun = System.currentTimeMillis();
+ }
+
+ /**
+ * Do the actual cleanup of garbage slingIds and report back with true if there
+ * might be more or false if we're at the end.
+ *
+ * @return true if there might be more garbage or false if we're at the end
+ */
+ private boolean cleanup() {
+ logger.debug("cleanup: start");
+ if (!isEnabled()) {
+ // bit of overkill probably, as this shouldn't happen.
+ // but adds to a good night's sleep.
+ logger.debug("cleanup: not enabled, stopping.");
+ return false;
+ }
+
+ final ResourceResolverFactory localFactory = resourceResolverFactory;
+ final Config localConfig = config;
+ if (localFactory == null || localConfig == null) {
+ logger.warn("cleanup: cannot cleanup due to rrf={} or c={}", localFactory,
+ localConfig);
+ return true;
+ }
+ final TopologyView localCurrentView = currentView;
+ if (localCurrentView == null || !localCurrentView.isCurrent()) {
+ logger.debug("cleanup : cannot cleanup as topology recently changed : {}",
+ localCurrentView);
+ return true;
+ }
+ final Set<String> activeSlingIds = getActiveSlingIds(localCurrentView);
+ try (ResourceResolver resolver = localFactory.getServiceResourceResolver(null)) {
+ final Resource clusterInstances = resolver
+ .getResource(localConfig.getClusterInstancesPath());
+ final Resource idMap = resolver.getResource(localConfig.getIdMapPath());
+ final Resource syncTokens = resolver
+ .getResource(localConfig.getSyncTokenPath());
+ if (clusterInstances == null || idMap == null || syncTokens == null) {
+ logger.warn("cleanup: no resource found at {}, {} or {}, stopping.",
+ localConfig.getClusterInstancesPath(), localConfig.getIdMapPath(),
+ localConfig.getSyncTokenPath());
+ return false;
+ }
+ resolver.refresh();
+ final ValueMap idMapMap = idMap.adaptTo(ValueMap.class);
+ final ModifiableValueMap syncTokenMap = syncTokens
+ .adaptTo(ModifiableValueMap.class);
+ final Calendar now = Calendar.getInstance();
+ int removed = 0;
+ boolean mightHaveMore = false;
+ final int localBatchSize = batchSize;
+ final long localMinCreationAgeMillis = minCreationAgeMillis;
+ for (Resource resource : clusterInstances.getChildren()) {
+ if (topologyChanged(localCurrentView)) {
+ return true;
+ }
+ final String slingId = resource.getName();
+ if (deleteIfOldSlingId(resource, slingId, syncTokenMap, idMapMap,
+ activeSlingIds, now, localMinCreationAgeMillis)) {
+ if (++removed >= localBatchSize) {
+ // we need to stop
+ mightHaveMore = true;
+ break;
+ }
+ }
+ }
+ // if we're not already at the batch limit, check syncTokens too
+ if (!mightHaveMore) {
+ for (String slingId : syncTokenMap.keySet()) {
+ try {
+ UUID.fromString(slingId);
+ } catch (Exception e) {
+ // not a uuid
+ continue;
+ }
+ if (topologyChanged(localCurrentView)) {
+ return true;
+ }
+ Resource resourceOrNull = clusterInstances.getChild(slingId);
+ if (deleteIfOldSlingId(resourceOrNull, slingId, syncTokenMap,
+ idMapMap, activeSlingIds, now, localMinCreationAgeMillis)) {
+ if (++removed >= localBatchSize) {
+ // we need to stop
+ mightHaveMore = true;
+ break;
+ }
+ }
+ }
+ }
+ if (topologyChanged(localCurrentView)) {
+ return true;
+ }
+ if (removed > 0) {
+ // only if we removed something we commit
+ resolver.commit();
+ logger.info(
+ "cleanup : removed {} old slingIds (batch size : {}), potentially has more: {}",
+ removed, localBatchSize, mightHaveMore);
+ deleteCount.addAndGet(removed);
+ }
+ firstRun = false;
+ completionCount.incrementAndGet();
+ return mightHaveMore;
+ } catch (LoginException e) {
+ logger.error("cleanup: could not log in administratively: " + e, e);
+ throw new RuntimeException("Could not log in to repository (" + e + ")", e);
+ } catch (PersistenceException e) {
+ logger.error("cleanup: got a PersistenceException: " + e, e);
+ throw new RuntimeException(
+ "Exception while talking to repository (" + e + ")", e);
+ } finally {
+ logger.debug("cleanup: done.");
+ }
+ }
+
+ private Set<String> getActiveSlingIds(final TopologyView localCurrentView) {
+ return localCurrentView.getLocalInstance().getClusterView().getInstances()
+ .stream().map(InstanceDescription::getSlingId)
+ .collect(Collectors.toSet());
+ }
+
+ private boolean topologyChanged(TopologyView localCurrentView) {
+ if (!hasTopology || currentView != localCurrentView
+ || !localCurrentView.isCurrent()) {
+ // we got interrupted during cleanup
+ // let's not commit at all then
+ logger.debug(
+ "topologyChanged : topology changing during cleanup - not committing this time - stopping for now.");
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ static long millisOf(Object leaderElectionIdCreatedAt) {
+ if (leaderElectionIdCreatedAt == null) {
+ return -1;
+ }
+ if (leaderElectionIdCreatedAt instanceof Date) {
+ final Date d = (Date) leaderElectionIdCreatedAt;
+ return d.getTime();
+ }
+ if (leaderElectionIdCreatedAt instanceof Calendar) {
+ final Calendar c = (Calendar) leaderElectionIdCreatedAt;
+ return c.getTimeInMillis();
+ }
+ return -1;
+ }
+
+ private boolean deleteIfOldSlingId(Resource resourceOrNull, String slingId,
+ ModifiableValueMap syncTokenMap, ValueMap idMapMap,
+ Set<String> activeSlingIds, Calendar now, long localMinCreationAgeMillis)
+ throws PersistenceException {
+ logger.trace("deleteIfOldSlingId : handling slingId = {}", slingId);
+ if (activeSlingIds.contains(slingId)) {
+ logger.trace("deleteIfOldSlingId : slingId is currently active : {}",
+ slingId);
+ return false;
+ } else if (seenInstances.contains(slingId)) {
+ logger.trace("deleteIfOldSlingId : slingId seen active previously : {}",
+ slingId);
+ return false;
+ }
+ // only check in idmap and for leaderElectionId details if the clusterInstance
+ // resource is there
+ if (resourceOrNull != null) {
+ Object clusterNodeId = idMapMap.get(slingId);
+ if (clusterNodeId == null) {
+ logger.trace("deleteIfOldSlingId : slingId {} not recently in use",
+ slingId);
+ } else {
+ logger.trace("deleteIfOldSlingId : slingId {} WAS recently in use : {}",
+ slingId, clusterNodeId);
+ return false;
+ }
+ Object o = resourceOrNull.getValueMap().get("leaderElectionIdCreatedAt");
+ final long leaderElectionIdCreatedAt = millisOf(o);
+ if (leaderElectionIdCreatedAt <= 0) {
+ // skip
+ logger.trace(
+ "deleteIfOldSlingId: resource ({}) has no or wrongly typed leaderElectionIdCreatedAt : {}",
+ resourceOrNull, o);
+ return false;
+ }
+ final long diffMillis = now.getTimeInMillis() - leaderElectionIdCreatedAt;
+ if (diffMillis <= localMinCreationAgeMillis) {
+ logger.trace("deleteIfOldSlingId: not old slingId : {}", resourceOrNull);
+ return false;
+ }
+ }
+ logger.trace("deleteIfOldSlingId: deleting old slingId : {}", resourceOrNull);
+ syncTokenMap.remove(slingId);
+ if (resourceOrNull != null) {
+ resourceOrNull.getResourceResolver().delete(resourceOrNull);
+ }
+ return true;
+ }
+
+ int getRunCount() {
+ return runCount.get();
+ }
+
+ int getDeleteCount() {
+ return deleteCount.get();
+ }
+
+ int getCompletionCount() {
+ return completionCount.get();
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/org/apache/sling/discovery/oak/TestSlingIdCleanupTask.java b/src/test/java/org/apache/sling/discovery/oak/TestSlingIdCleanupTask.java
new file mode 100644
index 0000000..01ed7b9
--- /dev/null
+++ b/src/test/java/org/apache/sling/discovery/oak/TestSlingIdCleanupTask.java
@@ -0,0 +1,982 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.oak;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.annotation.Annotation;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Hashtable;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.sling.api.resource.LoginException;
+import org.apache.sling.api.resource.ModifiableValueMap;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.api.resource.ValueMap;
+import org.apache.sling.commons.scheduler.Scheduler;
+import org.apache.sling.commons.scheduler.impl.QuartzScheduler;
+import org.apache.sling.commons.scheduler.impl.SchedulerServiceFactory;
+import org.apache.sling.commons.threads.impl.DefaultThreadPoolManager;
+import org.apache.sling.discovery.InstanceDescription;
+import org.apache.sling.discovery.TopologyEvent;
+import org.apache.sling.discovery.TopologyView;
+import org.apache.sling.discovery.base.its.setup.VirtualInstance;
+import org.apache.sling.discovery.commons.providers.DefaultClusterView;
+import org.apache.sling.discovery.commons.providers.DummyTopologyView;
+import org.apache.sling.discovery.commons.providers.util.ResourceHelper;
+import org.apache.sling.discovery.oak.its.setup.OakTestConfig;
+import org.apache.sling.discovery.oak.its.setup.OakVirtualInstanceBuilder;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.osgi.framework.BundleContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.io.Closer;
+
+import junitx.util.PrivateAccessor;
+
+public class TestSlingIdCleanupTask {
+
+ protected static final String PROPERTY_ID_ENDPOINTS = "endpoints";
+
+ protected static final String PROPERTY_ID_SLING_HOME_PATH = "slingHomePath";
+
+ protected static final String PROPERTY_ID_RUNTIME = "runtimeId";
+
+ @SuppressWarnings("all")
+ class DummyConf implements SlingIdCleanupTask.Conf {
+
+ int initialDelay;
+ int interval;
+ int batchSize;
+ long age;
+
+ DummyConf(int initialDelay, int interval, int batchSize, long age) {
+ this.initialDelay = initialDelay;
+ this.interval = interval;
+ this.batchSize = batchSize;
+ this.age = age;
+ }
+
+ @Override
+ public Class<? extends Annotation> annotationType() {
+ return null;
+ }
+
+ @Override
+ public int slingid_cleanup_initial_delay() {
+ return initialDelay;
+ }
+
+ @Override
+ public int slingid_cleanup_interval() {
+ return interval;
+ }
+
+ @Override
+ public int slingid_cleanup_batchsize() {
+ return batchSize;
+ }
+
+ @Override
+ public long slingid_cleanup_min_creation_age() {
+ return age;
+ }
+
+ }
+
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ private SlingIdCleanupTask cleanupTask;
+
+ private Scheduler scheduler;
+
+ private ResourceResolverFactory factory;
+
+ private Config config;
+
+ private VirtualInstance instance;
+
+ private Closer closer;
+
+ private Scheduler createScheduler() throws Exception {
+ try {
+ return createRealScheduler();
+ } catch (Throwable e) {
+ throw new Exception(e);
+ }
+ }
+
+ private Scheduler createRealScheduler() throws Throwable {
+ final BundleContext ctx = Mockito.mock(BundleContext.class);
+ final Map<String, Object> props = new HashMap<>();
+ final DefaultThreadPoolManager threadPoolManager = new DefaultThreadPoolManager(
+ ctx, new Hashtable<String, Object>());
+ final QuartzScheduler qScheduler = new QuartzScheduler();
+ Scheduler scheduler = new SchedulerServiceFactory();
+ PrivateAccessor.setField(qScheduler, "threadPoolManager", threadPoolManager);
+ PrivateAccessor.invoke(qScheduler, "activate",
+ new Class[] { BundleContext.class, Map.class },
+ new Object[] { ctx, props });
+ PrivateAccessor.setField(scheduler, "scheduler", qScheduler);
+
+ closer.register(new Closeable() {
+
+ @Override
+ public void close() throws IOException {
+ try {
+ PrivateAccessor.invoke(qScheduler, "deactivate",
+ new Class[] { BundleContext.class }, new Object[] { ctx });
+ } catch (Throwable e) {
+ throw new IOException(e);
+ }
+ }
+
+ });
+
+ return scheduler;
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ closer = Closer.create();
+ }
+
+ private void createCleanupTask(int initialDelayMillis, int minCreationAgeMillis)
+ throws Exception {
+ createCleanupTask(initialDelayMillis, 1000, 50, minCreationAgeMillis);
+ }
+
+ private void createCleanupTask(int initialDelayMillis, int intervalMillis,
+ int batchSize, long minCreationAgeMillis) throws Exception {
+ createCleanupTask(initialDelayMillis, intervalMillis, batchSize,
+ minCreationAgeMillis, SlingIdCleanupTask.MIN_CLEANUP_DELAY_MILLIS);
+ }
+
+ private void createCleanupTask(int initialDelayMillis, int intervalMillis,
+ int batchSize, long minCreationAgeMillis, long minCleanupDelayMillis)
+ throws Exception {
+ OakVirtualInstanceBuilder builder = (OakVirtualInstanceBuilder) new OakVirtualInstanceBuilder()
+ .setDebugName("instance").newRepository("/foo/bar/", true)
+ .setConnectorPingInterval(999).setConnectorPingTimeout(999);
+ builder.getConfig().setSuppressPartiallyStartedInstance(true);
+ instance = builder.build();
+ scheduler = createScheduler();
+ factory = instance.getResourceResolverFactory();
+ config = new OakTestConfig();
+ System.setProperty(
+ SlingIdCleanupTask.SLINGID_CLEANUP_ENABLED_SYSTEM_PROPERTY_NAME, "true");
+ cleanupTask = SlingIdCleanupTask.create(scheduler, factory, config,
+ initialDelayMillis, intervalMillis, batchSize, minCreationAgeMillis,
+ minCleanupDelayMillis);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ closer.close();
+ if (instance != null) {
+ instance.stop();
+ instance = null;
+ }
+ System.clearProperty(
+ SlingIdCleanupTask.SLINGID_CLEANUP_ENABLED_SYSTEM_PROPERTY_NAME);
+ if (cleanupTask != null) {
+ cleanupTask.deactivate();
+ cleanupTask = null;
+ }
+ }
+
+ private TopologyView newView() {
+ final DefaultClusterView cluster = new DefaultClusterView(
+ UUID.randomUUID().toString());
+ final DummyTopologyView view = new DummyTopologyView()
+ .addInstance(UUID.randomUUID().toString(), cluster, true, true);
+ return view;
+ }
+
+ private TopologyEvent newInitEvent(TopologyView view) {
+ return new TopologyEvent(TopologyEvent.Type.TOPOLOGY_INIT, null, view);
+ }
+
+ private TopologyEvent newChangingEvent(TopologyView oldView) {
+ return new TopologyEvent(TopologyEvent.Type.TOPOLOGY_CHANGING, oldView, null);
+ }
+
+ private TopologyEvent newPropertiesChangedEvent(TopologyView oldView,
+ TopologyView newView) {
+ return new TopologyEvent(TopologyEvent.Type.PROPERTIES_CHANGED, oldView, newView);
+ }
+
+ private TopologyEvent newChangedEvent(TopologyView oldView, TopologyView newView) {
+ return new TopologyEvent(TopologyEvent.Type.TOPOLOGY_CHANGED, oldView, newView);
+ }
+
+ @Test
+ public void testActivatde() throws Exception {
+ createCleanupTask(0, 86400000);
+ cleanupTask.activate(null, null);
+ cleanupTask.activate(null, new DummyConf(2, 3, 4, 5));
+ assertConfigs(2, 3, 4, 5);
+ cleanupTask.deactivate();
+ }
+
+ @Test
+ public void testModified() throws Exception {
+ createCleanupTask(0, 86400000);
+ cleanupTask.modified(null, null);
+ cleanupTask.modified(null, new DummyConf(3, 4, 5, 6));
+ assertConfigs(3, 4, 5, 6);
+ cleanupTask.modified(null, new DummyConf(4, 5, 6, 7));
+ assertConfigs(4, 5, 6, 7);
+ }
+
+ @Test
+ public void testMillisOf() throws Exception {
+ assertEquals(-1, SlingIdCleanupTask.millisOf(null));
+ assertEquals(2, SlingIdCleanupTask.millisOf(new Date(2)));
+ Calendar cal = Calendar.getInstance();
+ cal.setTime(new Date(3));
+ assertEquals(3, SlingIdCleanupTask.millisOf(cal));
+ }
+
+ @Test
+ public void testNoClusterInstancesResource() throws Exception {
+ createCleanupTask(0, 1, 100, 86400000);
+ cleanupTask.handleTopologyEvent(newInitEvent(newView()));
+ Thread.sleep(1000);
+ assertEquals(1, cleanupTask.getRunCount());
+ }
+
+ @Test
+ public void testNoIdMapResource() throws Exception {
+ createCleanupTask(0, 1, 100, 86400000);
+ createPath(config.getClusterInstancesPath());
+ cleanupTask.handleTopologyEvent(newInitEvent(newView()));
+ Thread.sleep(1000);
+ assertEquals(1, cleanupTask.getRunCount());
+ }
+
+ @Test
+ public void testNoSyncTokenResource() throws Exception {
+ createCleanupTask(0, 1, 100, 86400000);
+ createPath(config.getClusterInstancesPath());
+ createPath(config.getIdMapPath());
+ cleanupTask.handleTopologyEvent(newInitEvent(newView()));
+ Thread.sleep(1000);
+ assertEquals(1, cleanupTask.getRunCount());
+ }
+
+ private void assertConfigs(int expectedInitialDelay, int expectedInterval,
+ int expectedBatchSize, int expectedAge) throws NoSuchFieldException {
+ int initialDelayMillis = (Integer) PrivateAccessor.getField(cleanupTask,
+ "initialDelayMillis");
+ assertEquals(expectedInitialDelay, initialDelayMillis);
+ int intervalMillis = (Integer) PrivateAccessor.getField(cleanupTask,
+ "intervalMillis");
+ assertEquals(expectedInterval, intervalMillis);
+ int batchSize = (Integer) PrivateAccessor.getField(cleanupTask, "batchSize");
+ assertEquals(expectedBatchSize, batchSize);
+ long minCreationAgeMillis = (Long) PrivateAccessor.getField(cleanupTask,
+ "minCreationAgeMillis");
+ assertEquals(expectedAge, minCreationAgeMillis);
+ }
+
+ @Test
+ public void testPropertiesChanged() throws Exception {
+ createCleanupTask(0, 86400000);
+ assertEquals(0, cleanupTask.getDeleteCount());
+ TopologyView view1 = newView();
+ TopologyView view2 = newView();
+ TopologyEvent event = newPropertiesChangedEvent(view1, view2);
+ cleanupTask.handleTopologyEvent(event);
+ assertEquals(0, cleanupTask.getDeleteCount());
+ Thread.sleep(500);
+ assertEquals(0, cleanupTask.getDeleteCount());
+ }
+
+ private void waitForRunCount(SlingIdCleanupTask task, int expectedRunCount,
+ int timeoutMillis) throws InterruptedException {
+ final long start = System.currentTimeMillis();
+ long diff;
+ do {
+ if (task.getCompletionCount() == expectedRunCount) {
+ return;
+ }
+ Thread.sleep(50);
+ diff = (start + timeoutMillis) - System.currentTimeMillis();
+ } while (diff > 0);
+ assertEquals("did not reach expected runcount within " + timeoutMillis + "ms",
+ expectedRunCount, task.getCompletionCount());
+ }
+
+ @Test
+ public void testInit() throws Exception {
+ createCleanupTask(0, 86400000);
+ assertEquals(0, cleanupTask.getDeleteCount());
+ createSlingIds(5, 10, 0);
+
+ TopologyView view1 = newView();
+ TopologyEvent event = newInitEvent(view1);
+ cleanupTask.handleTopologyEvent(event);
+ assertEquals(0, cleanupTask.getDeleteCount());
+ waitForRunCount(cleanupTask, 1, 5000);
+ assertEquals(10, cleanupTask.getDeleteCount());
+ }
+
+ @Test
+ public void testInit_smallBatch() throws Exception {
+ createCleanupTask(0, 500, 2, 86400000);
+ assertEquals(0, cleanupTask.getDeleteCount());
+ createSlingIds(5, 10, 0);
+
+ TopologyView view1 = newView();
+ TopologyEvent event = newInitEvent(view1);
+ cleanupTask.handleTopologyEvent(event);
+ assertEquals(0, cleanupTask.getDeleteCount());
+ waitForRunCount(cleanupTask, 1, 5000);
+ assertEquals(2, cleanupTask.getDeleteCount());
+ waitForRunCount(cleanupTask, 2, 5000);
+ assertEquals(4, cleanupTask.getDeleteCount());
+ waitForRunCount(cleanupTask, 3, 5000);
+ assertEquals(6, cleanupTask.getDeleteCount());
+ waitForRunCount(cleanupTask, 4, 5000);
+ assertEquals(8, cleanupTask.getDeleteCount());
+ waitForRunCount(cleanupTask, 5, 5000);
+ assertEquals(10, cleanupTask.getDeleteCount());
+ }
+
+ @Test
+ public void testChanging() throws Exception {
+ createCleanupTask(1000, 86400000);
+ assertEquals(0, cleanupTask.getDeleteCount());
+ createSlingIds(5, 10, 0);
+
+ TopologyView view1 = newView();
+ cleanupTask.handleTopologyEvent(newInitEvent(view1));
+ cleanupTask.handleTopologyEvent(newChangingEvent(view1));
+
+ assertEquals(0, cleanupTask.getDeleteCount());
+ Thread.sleep(500);
+ assertEquals(0, cleanupTask.getDeleteCount());
+ }
+
+ @Test
+ public void testNoScheduler() throws Exception {
+ createCleanupTask(1000, 86400000);
+ PrivateAccessor.setField(cleanupTask, "scheduler", null);
+ cleanupTask.activate(null, new DummyConf(2, 3, 4, 5));
+ cleanupTask.handleTopologyEvent(newChangingEvent(newView()));
+ // no asserts, just tests execution without exception
+ cleanupTask.handleTopologyEvent(newChangedEvent(newView(), newView()));
+ // no asserts, just tests execution without exception
+ }
+
+ @Test
+ public void testChanged() throws Exception {
+ createCleanupTask(1000, 86400000);
+ assertEquals(0, cleanupTask.getDeleteCount());
+ createSlingIds(5, 10, 0);
+
+ TopologyView view1 = newView();
+ cleanupTask.handleTopologyEvent(newInitEvent(view1));
+ cleanupTask.handleTopologyEvent(newChangingEvent(view1));
+ assertEquals(0, cleanupTask.getDeleteCount());
+
+ TopologyView view2 = newView();
+ cleanupTask.handleTopologyEvent(newChangedEvent(view1, view2));
+ Thread.sleep(500);
+ assertEquals(0, cleanupTask.getDeleteCount());
+ waitForRunCount(cleanupTask, 1, 5000);
+ assertEquals(10, cleanupTask.getDeleteCount());
+ }
+
+ @Test
+ public void testRepetitionDelay() throws Exception {
+ createCleanupTask(1000, 86400000);
+ createSlingIds(5, 10, 0);
+
+ TopologyView view1 = newView();
+ cleanupTask.handleTopologyEvent(newInitEvent(view1));
+ waitForRunCount(cleanupTask, 1, 5000);
+ assertEquals(10, cleanupTask.getDeleteCount());
+ cleanupTask.handleTopologyEvent(newChangingEvent(view1));
+ Thread.sleep(1500);
+ assertEquals(1, cleanupTask.getRunCount());
+
+ TopologyView view2 = newView();
+ cleanupTask.handleTopologyEvent(newChangedEvent(view1, view2));
+ Thread.sleep(1500);
+ assertEquals(1, cleanupTask.getRunCount());
+ assertEquals(10, cleanupTask.getDeleteCount());
+ }
+
+ @Test
+ public void testDisabled() throws Exception {
+ createCleanupTask(1000, 86400000);
+ System.setProperty(
+ SlingIdCleanupTask.SLINGID_CLEANUP_ENABLED_SYSTEM_PROPERTY_NAME, "false");
+ assertEquals(0, cleanupTask.getDeleteCount());
+ createSlingIds(5, 10, 0);
+
+ TopologyView view1 = newView();
+ cleanupTask.handleTopologyEvent(newInitEvent(view1));
+ cleanupTask.handleTopologyEvent(newChangingEvent(view1));
+ assertEquals(0, cleanupTask.getDeleteCount());
+
+ TopologyView view2 = newView();
+ cleanupTask.handleTopologyEvent(newChangedEvent(view1, view2));
+ Thread.sleep(500);
+ assertEquals(0, cleanupTask.getDeleteCount());
+ Thread.sleep(1000);
+ assertEquals(0, cleanupTask.getRunCount());
+ assertEquals(0, cleanupTask.getDeleteCount());
+ }
+
+ /**
+ * This tests the case where there are slingIds under /clusterInstances with no
+ * corresponding syncToken
+ */
+ @Test
+ public void testOrphanedClsuterInstances() throws Exception {
+ createCleanupTask(1000, 86400000);
+ createSlingIds(5, 10, 0, 3);
+
+ TopologyView view1 = newView();
+ cleanupTask.handleTopologyEvent(newInitEvent(view1));
+ waitForRunCount(cleanupTask, 1, 5000);
+ assertEquals(10, cleanupTask.getDeleteCount());
+ }
+
+ /**
+ * This test the case where there are syncTokens without a corresponding slingId
+ * under /clusterInstances
+ */
+ @Test
+ public void testOrphanedSyncTokens() throws Exception {
+ createCleanupTask(1000, 86400000);
+ createSlingIds(5, 10, 0, 20);
+
+ instance.dumpRepo();
+
+ TopologyView view1 = newView();
+ cleanupTask.handleTopologyEvent(newInitEvent(view1));
+ waitForRunCount(cleanupTask, 1, 5000);
+ assertEquals(15, cleanupTask.getDeleteCount());
+ }
+
+ @Test
+ public void testLeaderVsNonLeader() throws Exception {
+ createCleanupTask(250, 86400000);
+ assertEquals(0, cleanupTask.getDeleteCount());
+ List<String> slingIds = createSlingIds(3, 7, 2);
+
+ final String clusterId = UUID.randomUUID().toString();
+ final DefaultClusterView remoteLeaderCluster = new DefaultClusterView(clusterId);
+ final DefaultClusterView localLeaderCluster = new DefaultClusterView(clusterId);
+ final DummyTopologyView remoteLeaderView = new DummyTopologyView();
+ final DummyTopologyView localLeaderView = new DummyTopologyView();
+
+ Iterator<String> it = slingIds.iterator();
+ String leaderSlingId = it.next(); // first is declared leader
+ String localSlignId = it.next(); // second is local
+ int idx = 0;
+ for (String aSlingId : slingIds) {
+ remoteLeaderView.addInstance(aSlingId, remoteLeaderCluster,
+ aSlingId.equals(leaderSlingId), aSlingId.equals(localSlignId));
+ localLeaderView.addInstance(aSlingId, localLeaderCluster,
+ aSlingId.equals(localSlignId), aSlingId.equals(localSlignId));
+ if (++idx >= 2) {
+ break;
+ }
+ }
+
+ assertEquals(0, cleanupTask.getDeleteCount());
+ cleanupTask.handleTopologyEvent(newInitEvent(remoteLeaderView));
+ assertEquals(0, cleanupTask.getDeleteCount());
+ Thread.sleep(1000);
+ assertEquals(0, cleanupTask.getDeleteCount());
+ cleanupTask.handleTopologyEvent(newChangingEvent(remoteLeaderView));
+ assertEquals(0, cleanupTask.getDeleteCount());
+ Thread.sleep(1000);
+ assertEquals(0, cleanupTask.getDeleteCount());
+ cleanupTask
+ .handleTopologyEvent(newChangedEvent(remoteLeaderView, localLeaderView));
+ assertEquals(0, cleanupTask.getDeleteCount());
+ assertEquals(0, cleanupTask.getCompletionCount());
+ waitForRunCount(cleanupTask, 1, 5000);
+ assertEquals(1, cleanupTask.getCompletionCount());
+ assertEquals(5, cleanupTask.getDeleteCount());
+ }
+
+ @Test
+ public void testOldSlingIdsButNowActive() throws InterruptedException, Exception {
+ doTestOldSlingIdsButActive(4, 9, 7);
+ }
+
+ @Test
+ public void testOldSlingIdButRecentlyActive() throws Exception, InterruptedException {
+ createCleanupTask(1000, 1000, 50, 86400000, -1);
+ assertEquals(0, cleanupTask.getDeleteCount());
+ int currentIds = 4;
+ int oldIds = 9;
+ int activeIds = 7;
+ int activeOldIds = Math.max(0, activeIds - currentIds);
+ List<String> slingIds = createSlingIds(currentIds, oldIds, activeOldIds);
+
+ final String clusterViewId = UUID.randomUUID().toString();
+ DefaultClusterView cluster = new DefaultClusterView(clusterViewId);
+ DummyTopologyView view1 = new DummyTopologyView();
+
+ Iterator<String> it = slingIds.iterator();
+ String leaderSlingId = it.next(); // first is declared leader
+ String localSlignId = leaderSlingId; // and is local too
+ int idx = 0;
+ for (String aSlingId : slingIds) {
+ view1.addInstance(aSlingId, cluster, aSlingId.equals(leaderSlingId),
+ aSlingId.equals(localSlignId));
+ if (++idx >= activeIds) {
+ break;
+ }
+ }
+
+ cleanupTask.handleTopologyEvent(newInitEvent(view1));
+ assertEquals(0, cleanupTask.getDeleteCount());
+ assertEquals(0, cleanupTask.getCompletionCount());
+ waitForRunCount(cleanupTask, 1, 5000);
+ int expectedDeleteCount = oldIds - activeOldIds;
+ assertEquals(expectedDeleteCount, cleanupTask.getDeleteCount());
+ assertNoGarbageLeft(instance, config, view1, 86400000);
+ idx = 0;
+ for (String aSlingId : slingIds) {
+ logger.info("checking idx=" + idx + ", slingId=" + aSlingId);
+ if (idx < 2) {
+ // those are currently active and should of course not have been deleted
+ assertStatus(instance, config, aSlingId, false);
+ } else if (idx < activeIds) {
+ // same here, while they are not currently active, they were active during
+ // leader's lifetime
+ assertStatus(instance, config, aSlingId, false);
+ } else {
+ // for the rest: those should be deleted
+ assertStatus(instance, config, aSlingId, true);
+ }
+ idx++;
+ }
+
+ // in addition to the above, which is the same as doTestOldSlingIdsButActive,
+ // we now simulate some activeIds crashing.
+ // in particular some old and some current ones.
+ // the logic should be that, as long as the leader at some previous
+ // time was in a topology with those now crashed active ids, it would
+ // not delete them.
+ // this is to avoid a race condition that could other wise happen
+ // between a crash looping instance and this cleanup mechanism
+ cluster = new DefaultClusterView(clusterViewId);
+ DummyTopologyView view2 = new DummyTopologyView();
+
+ it = slingIds.iterator();
+ leaderSlingId = it.next(); // first is declared leader
+ localSlignId = leaderSlingId; // and is local too
+ idx = 0;
+ for (String aSlingId : slingIds) {
+ view2.addInstance(aSlingId, cluster, aSlingId.equals(leaderSlingId),
+ aSlingId.equals(localSlignId));
+ if (++idx >= 2) { // let's have only 2 active instances
+ break;
+ }
+ }
+ cleanupTask.handleTopologyEvent(newChangingEvent(view1));
+ assertEquals(1, cleanupTask.getCompletionCount());
+ cleanupTask.handleTopologyEvent(newChangedEvent(view1, view2));
+ assertEquals(expectedDeleteCount, cleanupTask.getDeleteCount());
+ waitForRunCount(cleanupTask, 2, 5000);
+ assertEquals(2, cleanupTask.getCompletionCount());
+ // no further cleanup should have happened
+ assertEquals(expectedDeleteCount, cleanupTask.getDeleteCount());
+ // now check for correct non-/deletion
+ assertNoGarbageLeft(instance, config, view1, 86400000);
+ idx = 0;
+ for (String aSlingId : slingIds) {
+ if (idx < 2) {
+ // those are currently active and should of course not have been deleted
+ assertStatus(instance, config, aSlingId, false);
+ } else if (idx < activeIds) {
+ // same here, while they are not currently active, they were active during
+ // leader's lifetime
+ assertStatus(instance, config, aSlingId, false);
+ } else {
+ // for the rest: those should be deleted
+ assertStatus(instance, config, aSlingId, true);
+ }
+ idx++;
+ }
+ }
+
+ private void assertStatus(VirtualInstance i, Config c, String slingId,
+ boolean deleted) throws Exception {
+ ResourceResolverFactory f = i.getResourceResolverFactory();
+
+ ResourceResolver resolver = null;
+ resolver = f.getServiceResourceResolver(null);
+
+ final Resource clusterInstances = ResourceHelper.getOrCreateResource(resolver,
+ c.getClusterInstancesPath());
+ final Resource idMap = ResourceHelper.getOrCreateResource(resolver,
+ c.getIdMapPath());
+ final Resource syncTokens = ResourceHelper.getOrCreateResource(resolver,
+ c.getSyncTokenPath());
+ resolver.refresh();
+
+ if (deleted) {
+ assertNull(clusterInstances.getChild(slingId));
+ assertNull(idMap.getValueMap().get(slingId));
+ assertNull(syncTokens.getValueMap().get(slingId));
+ } else {
+ assertNotNull(clusterInstances.getChild(slingId));
+ assertNotNull(idMap.getValueMap().get(slingId));
+ assertNotNull(syncTokens.getValueMap().get(slingId));
+ }
+ }
+
+ @Test
+ public void testOldSlingIdButActive_all() throws Exception {
+ doTestOldSlingIdsButActive(5, 10, 15);
+ }
+
+ @Test
+ public void testOldSlingIdButActive_almostall() throws Exception {
+ doTestOldSlingIdsButActive(5, 10, 14);
+ }
+
+ @Test
+ public void testOldSlingIdButActive_some() throws Exception {
+ doTestOldSlingIdsButActive(5, 10, 8);
+ }
+
+ @Test
+ public void testOldSlingIdButActive_one() throws Exception {
+ doTestOldSlingIdsButActive(5, 10, 6);
+ }
+
+ @Test
+ public void testOldSlingIdButActive_none() throws Exception {
+ doTestOldSlingIdsButActive(5, 10, 5);
+ }
+
+ @Test
+ public void testOldSlingIdButActive_oneRecent() throws Exception {
+ doTestOldSlingIdsButActive(5, 10, 4);
+ }
+
+ @Test
+ public void testOldSlingIdButActive_twoRecent() throws Exception {
+ doTestOldSlingIdsButActive(5, 10, 3);
+ }
+
+ @Test
+ public void testOldSlingIdButActive_threeRecent() throws Exception {
+ doTestOldSlingIdsButActive(5, 10, 2);
+ }
+
+ @Test
+ public void testOldSlingIdButActive_allRecent() throws Exception {
+ doTestOldSlingIdsButActive(5, 10, 1);
+ }
+
+ private void doTestOldSlingIdsButActive(int recentIds, int oldIds, int activeIds)
+ throws Exception, InterruptedException {
+ logger.info(
+ "doTestOldSlingIdsButActive : recentIds={}, oldIds={}, activeIds={} : START",
+ recentIds, oldIds, activeIds);
+ createCleanupTask(1000, 86400000);
+ assertEquals(0, cleanupTask.getDeleteCount());
+ List<String> slingIds = createSlingIds(recentIds, oldIds,
+ Math.max(0, activeIds - recentIds));
+
+ final DefaultClusterView cluster = new DefaultClusterView(
+ UUID.randomUUID().toString());
+ final DummyTopologyView view = new DummyTopologyView();
+
+ Iterator<String> it = slingIds.iterator();
+ String leaderSlingId = it.next(); // first is declared leader
+ String localSlignId = leaderSlingId; // and is local too
+ int idx = 0;
+ for (String aSlingId : slingIds) {
+ view.addInstance(aSlingId, cluster, aSlingId.equals(leaderSlingId),
+ aSlingId.equals(localSlignId));
+ if (++idx >= activeIds) {
+ break;
+ }
+ }
+
+ cleanupTask.handleTopologyEvent(newInitEvent(view));
+ assertEquals(0, cleanupTask.getDeleteCount());
+ assertEquals(0, cleanupTask.getCompletionCount());
+ waitForRunCount(cleanupTask, 1, 5000);
+ assertEquals(Math.max(0, oldIds - Math.max(0, activeIds - recentIds)),
+ cleanupTask.getDeleteCount());
+ assertNoGarbageLeft(instance, config, view, 86400000);
+ }
+
+ private void assertNoGarbageLeft(VirtualInstance i, Config c,
+ TopologyView currentView, long maxAgeMillis) throws Exception {
+ List<InstanceDescription> instances = currentView.getLocalInstance()
+ .getClusterView().getInstances();
+ Set<String> activeIds = new HashSet<>();
+ for (InstanceDescription id : instances) {
+ activeIds.add(id.getSlingId());
+ }
+
+ ResourceResolverFactory f = i.getResourceResolverFactory();
+
+ ResourceResolver resolver = null;
+ resolver = f.getServiceResourceResolver(null);
+
+ final Resource clusterInstances = ResourceHelper.getOrCreateResource(resolver,
+ c.getClusterInstancesPath());
+ final Resource idMap = ResourceHelper.getOrCreateResource(resolver,
+ c.getIdMapPath());
+ final Resource syncTokens = ResourceHelper.getOrCreateResource(resolver,
+ c.getSyncTokenPath());
+ resolver.refresh();
+
+ final ValueMap idMapMap = idMap.adaptTo(ValueMap.class);
+ final ValueMap syncTokenMap = syncTokens.adaptTo(ValueMap.class);
+
+ for (Resource aChild : clusterInstances.getChildren()) {
+ String slingId = aChild.getName();
+ if (!isGarbage(aChild, maxAgeMillis)) {
+ activeIds.add(slingId);
+ }
+ }
+
+ Set<String> idMapSlingIds = new HashSet<>();
+ for (Entry<String, Object> e : idMapMap.entrySet()) {
+ String k = e.getKey();
+ if (!k.contains(":") && k.contains("-")) {
+ idMapSlingIds.add(k);
+ }
+ }
+ assertCurrentSlingIds(idMapSlingIds, activeIds, "idmap");
+ assertEquals("idmap size", activeIds.size(), idMapSlingIds.size());
+
+ Set<String> syncTokenSlingIds = new HashSet<>();
+ for (Entry<String, Object> e : syncTokenMap.entrySet()) {
+ String k = e.getKey();
+ if (!k.contains(":") && k.contains("-")) {
+ syncTokenSlingIds.add(k);
+ }
+ }
+ assertCurrentSlingIds(syncTokenSlingIds, activeIds, "syncToken");
+ assertEquals("syncToken size", activeIds.size(), syncTokenSlingIds.size());
+ resolver.close();
+ }
+
+ private boolean isGarbage(Resource aChild, long maxAgeMillis) {
+ Calendar now = Calendar.getInstance();
+ Object o = aChild.getValueMap().get("leaderElectionIdCreatedAt");
+ final long leaderElectionIdCreatedAt = SlingIdCleanupTask.millisOf(o);
+ if (leaderElectionIdCreatedAt <= 0) {
+ // skip
+ return false;
+ }
+ final long diffMillis = now.getTimeInMillis() - leaderElectionIdCreatedAt;
+ return diffMillis > maxAgeMillis;
+ }
+
+ private void assertCurrentSlingIds(Set<String> slingIds, Set<String> activeIds,
+ String context) {
+ for (String aSlingId : slingIds) {
+ assertTrue("[" + context + "] stored slingId " + aSlingId
+ + " not in current view", activeIds.contains(aSlingId));
+ }
+ }
+
+ /** Get or create a ResourceResolver **/
+ private ResourceResolver getResourceResolver() throws LoginException {
+ return factory.getServiceResourceResolver(null);
+ }
+
+ /**
+ * Calculate a new leaderElectionId based on the current config and system time
+ */
+ private String newLeaderElectionId(String slingId) {
+ int maxLongLength = String.valueOf(Long.MAX_VALUE).length();
+ String currentTimeMillisStr = String.format("%0" + maxLongLength + "d",
+ System.currentTimeMillis());
+
+ String prefix = String.valueOf(config.getLeaderElectionPrefix());
+
+ final String newLeaderElectionId = prefix + "_" + currentTimeMillisStr + "_"
+ + slingId;
+ return newLeaderElectionId;
+ }
+
+ private boolean createSyncToken(String slingId, long seqNum) throws Exception {
+ final String syncTokenPath = config.getSyncTokenPath();
+ ResourceResolver resourceResolver = getResourceResolver();
+ if (resourceResolver == null) {
+ fail("could not login");
+ return false;
+ }
+ final Resource resource = ResourceHelper.getOrCreateResource(resourceResolver,
+ syncTokenPath);
+ final ModifiableValueMap resourceMap = resource.adaptTo(ModifiableValueMap.class);
+ resourceMap.put(slingId, seqNum);
+ logger.info("createSyncToken: storing syncToken: {}, slingId: {}", seqNum,
+ slingId);
+ resourceResolver.commit();
+ return true;
+ }
+
+ private void createPath(String path) throws Exception {
+ ResourceResolver resourceResolver = getResourceResolver();
+ if (resourceResolver == null) {
+ fail("could not login");
+ return;
+ }
+
+ ResourceHelper.getOrCreateResource(resourceResolver, path);
+ resourceResolver.commit();
+ }
+
+ private boolean createClusterInstance(String uuid, String runtimeId,
+ String slingHomePath, String endpointsAsString, Calendar jcrCreated)
+ throws Exception {
+ final String myClusterNodePath = config.getClusterInstancesPath() + "/" + uuid;
+ ResourceResolver resourceResolver = getResourceResolver();
+ if (resourceResolver == null) {
+ fail("could not login");
+ return false;
+ }
+ String newLeaderElectionId = newLeaderElectionId(uuid);
+
+ final Resource resource = ResourceHelper.getOrCreateResource(resourceResolver,
+ myClusterNodePath);
+ final ModifiableValueMap resourceMap = resource.adaptTo(ModifiableValueMap.class);
+
+ resourceMap.put(PROPERTY_ID_RUNTIME, runtimeId);
+ resourceMap.put(PROPERTY_ID_SLING_HOME_PATH, slingHomePath);
+ resourceMap.put(PROPERTY_ID_ENDPOINTS, endpointsAsString);
+
+ resourceMap.put("leaderElectionId", newLeaderElectionId);
+ resourceMap.put("leaderElectionIdCreatedAt", jcrCreated);
+
+ resourceMap.put("jcr:created", jcrCreated);
+
+ logger.info(
+ "createClusterInstance: storing my runtimeId: {}, endpoints: {}, sling home path: {}, new leaderElectionId: {}, created at: {}",
+ new Object[] { runtimeId, endpointsAsString, slingHomePath,
+ newLeaderElectionId, jcrCreated });
+ resourceResolver.commit();
+ return true;
+ }
+
+ private void fillIdMap(Map<String, Long> ids) throws Exception {
+ ResourceResolver resourceResolver = getResourceResolver();
+ final Resource resource = ResourceHelper.getOrCreateResource(resourceResolver,
+ config.getIdMapPath());
+ ModifiableValueMap idmap = resource.adaptTo(ModifiableValueMap.class);
+ for (Entry<String, Long> e : ids.entrySet()) {
+ idmap.put(e.getKey(), e.getValue());
+ }
+ resourceResolver.commit();
+ }
+
+ private List<String> createSlingIds(int currentIds, int oldIds, int activeOldIds)
+ throws Exception {
+ return createSlingIds(currentIds, oldIds, activeOldIds, currentIds + oldIds);
+ }
+
+ private List<String> createSlingIds(int currentIds, int oldIds, int activeOldIds,
+ int numSyncTokens) throws Exception {
+ final List<String> orderedIds = new LinkedList<>();
+ final Map<String, Long> slingIdToClusterNodeIds = new HashMap<>();
+ final Map<String, Long> slingIdToSeqNums = new HashMap<>();
+ int currentSeqNum = currentIds * 2 + oldIds * 3 + 42;
+ for (long i = 0; i < currentIds; i++) {
+ final String uuid = UUID.randomUUID().toString();
+ orderedIds.add(uuid);
+ slingIdToClusterNodeIds.put(uuid, i);
+ slingIdToSeqNums.put(uuid, Long.valueOf(currentSeqNum));
+ createClusterInstance(uuid, UUID.randomUUID().toString(), "/a/b/c", "n/a",
+ Calendar.getInstance());
+ }
+ for (long i = 0; i < oldIds; i++) {
+ final String uuid = UUID.randomUUID().toString();
+ if (i < activeOldIds) {
+ slingIdToClusterNodeIds.put(uuid, i + currentIds);
+ }
+ orderedIds.add(uuid);
+ slingIdToSeqNums.put(uuid, Long.valueOf(i));
+ final Calendar cal = Calendar.getInstance();
+ cal.add(Calendar.DAY_OF_YEAR, -(7 + (int) i));
+ createClusterInstance(uuid, UUID.randomUUID().toString(), "/a/b/c", "n/a",
+ cal);
+ }
+ // idmap is created for all currentIds and active old ids
+ fillIdMap(slingIdToClusterNodeIds);
+ while (numSyncTokens > slingIdToSeqNums.size()) {
+ slingIdToSeqNums.put(UUID.randomUUID().toString(), Long.valueOf(1));
+ }
+ int c = 0;
+ // first make sure as many active slingIds have a syncToken as possible
+ for (String activeSlingId : slingIdToClusterNodeIds.keySet()) {
+ createSyncToken(activeSlingId, slingIdToSeqNums.get(activeSlingId));
+ if (++c >= numSyncTokens) {
+ break;
+ }
+ }
+ if (c < numSyncTokens) {
+ // only after they have been served, consider the rest, again up to
+ // numSynTtokens
+ for (Entry<String, Long> e : slingIdToSeqNums.entrySet()) {
+ if (slingIdToClusterNodeIds.containsKey(e.getKey())) {
+ // then it was probably already added above
+ continue;
+ }
+ createSyncToken(e.getKey(), e.getValue());
+ if (++c >= numSyncTokens) {
+ break;
+ }
+ }
+ }
+ return orderedIds;
+ }
+}
\ No newline at end of file