SLING-10854 : introducing SlingIdCleanupTask to clean up old slingIds… (#13)

* SLING-10854 : introducing SlingIdCleanupTask to clean up old slingIds under /var/discovery/oak automatically

* SLING-10854 : more test coverage

* SLING-10854 : more test coverage

* SLING-10854 : logging cleanup

* SLING-10854 : modifier cleanup

* SLING-10854 : added run counter to log info

* SLING-10854 : fix variable name

Co-authored-by: Jörg Hoh <joerghoh@users.noreply.github.com>

* SLING-10854 : fix rename change

* SLING-10854 : log scheduledDate for convenience

* SLING-10854 : use resolver.getResource rather than ResourceHelper.getOrCreateResource to avoid unintended creation of resources

* SLING-10854 : remove unnecessary copy-paste-leftover revert

* SLING-10854 : beautify try catch clause

* SLING-10854 : more test coverage

* SLING-10854 : orphaned syncTokens support added

* SLING-10854 : disabled by default and introduced system property org.apache.sling.discovery.oak.slingidcleanup.enabled to enable

* SLING-10854 : added a minimal delay between a successful run and a follow-up run - to reduce unnecessary load on the repository for likely empty cleanup executions

* SLING-10854 : lower a logger to debug

* SLING-10854 : fix description

Co-authored-by: Jörg Hoh <joerghoh@users.noreply.github.com>

* SLING-10854 : test stability improvement

* SLING-10854 : avoid deleting slingIds ever seen by then leader

* SLING-10854 : minor comment fix

* SLING-10854 : shrink cleanup method size

* SLING-10854 : reduce log.infos

* SLING-10854 : getActiveSlingIdsFrom extracted

* SLING-10854 : some more code reductions

* SLING-10854 : added comment about not considering a slingId garbage if it was ever seen by the then leader

* SLING-10854 : minor javadoc rewording

* SLING-10854 : and some docu about the odd choice of 13h min delay added

* SLING-10854 : shorten config names

* SLING-10854 : rename getActiveSlignIdsFrom

* SLING-10854 : use streams rather than verbose for loops

Co-authored-by: Rishabh Kumar <rishabhdaim1991@gmail.com>

* SLING-10854 : fix stream code

* SLING-10854 : renamed method to topologyChanged

* SLING-10854 : fixed seenInstances concurrency

* SLING-10854 : log message improved

* SLING-10854 : reformating only

---------

Co-authored-by: Jörg Hoh <joerghoh@users.noreply.github.com>
Co-authored-by: Rishabh Kumar <rishabhdaim1991@gmail.com>
diff --git a/src/main/java/org/apache/sling/discovery/oak/SlingIdCleanupTask.java b/src/main/java/org/apache/sling/discovery/oak/SlingIdCleanupTask.java
new file mode 100644
index 0000000..3316cbc
--- /dev/null
+++ b/src/main/java/org/apache/sling/discovery/oak/SlingIdCleanupTask.java
@@ -0,0 +1,598 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.oak;
+
+import static org.osgi.util.converter.Converters.standardConverter;
+
+import java.util.Calendar;
+import java.util.Date;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import org.apache.sling.api.resource.LoginException;
+import org.apache.sling.api.resource.ModifiableValueMap;
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.api.resource.ValueMap;
+import org.apache.sling.commons.scheduler.ScheduleOptions;
+import org.apache.sling.commons.scheduler.Scheduler;
+import org.apache.sling.discovery.InstanceDescription;
+import org.apache.sling.discovery.TopologyEvent;
+import org.apache.sling.discovery.TopologyEvent.Type;
+import org.apache.sling.discovery.TopologyEventListener;
+import org.apache.sling.discovery.TopologyView;
+import org.osgi.framework.BundleContext;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Modified;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.metatype.annotations.AttributeDefinition;
+import org.osgi.service.metatype.annotations.Designate;
+import org.osgi.service.metatype.annotations.ObjectClassDefinition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A background task that cleans up garbage slingIds after topology changes.
+ * <p>
+ * A slingId is considered garbage when:
+ * <ul>
+ * <li>it is not in the current topology</li>
+ * <li>was not ever seen in previous topologies by the now leader instance</li>
+ * <li>it is not in the current idmap (where clusterNodeIds are reused hence
+ * that list stays small and does not need cleanup)</li>
+ * <li>its leaderElectionId was created more than 7 days ago (the
+ * leaderElectionId is created at activate time of the discovery.oak bundle -
+ * hence this more or less corresponds to the startup time of that
+ * instance)</li>
+ * </ul>
+ * The garbage accumulates at the following places, where it will thus be
+ * cleaned up from:
+ * <ul>
+ * <li>as child node under /var/discovery/oak/clusterInstances : this is the
+ * most performance critical garbage</li>
+ * <li>as a property key in /var/discovery/oak/syncTokens</li>
+ * </ul>
+ * The task by default is executed:
+ * <ul>
+ * <li>only on the leader</li>
+ * <li>10min after a TOPOLOGY_INIT or TOPOLOGY_CHANGED event</li>
+ * <li>with a maximum number of delete operations to avoid repository overload -
+ * that maximum is called batchSize and is 50 by default</li>
+ * <li>in subsequent intervals of 10min after the initial run, if that had to
+ * stop at the batchSize of 50 deletions</li>
+ * </ul>
+ * All parameters mentioned above can be configured.
+ * <p>
+ * Additionally, the cleanup is skipped for 13 hours after a successful cleanup.
+ * This is to avoid unnecessary load on the repository. The number of 13
+ * incorporates some heuristics such as : about 2 cleanup rounds per day maximum
+ * makes sense, if a leader is very long living, then the 1 additional hour
+ * makes it spread somewhat throughout the day. This is to further minimize any
+ * load side-effects.
+ */
+@Component
+@Designate(ocd = SlingIdCleanupTask.Conf.class)
+public class SlingIdCleanupTask implements TopologyEventListener, Runnable {
+
+    final static String SLINGID_CLEANUP_ENABLED_SYSTEM_PROPERTY_NAME = "org.apache.sling.discovery.oak.slingidcleanup.enabled";
+
+    /** default minimal cleanup delay at 13h, to intraday load balance */
+    final static long MIN_CLEANUP_DELAY_MILLIS = 46800000;
+
+    /**
+     * default age is 1 week : an instance that is not in the current topology,
+     * started 1 week ago is very unlikely to still be active
+     */
+    private static final long DEFAULT_MIN_CREATION_AGE_MILLIS = 604800000; // 1 week
+
+    /**
+     * initial delay is 10min : after a TOPOLOGY_INIT or TOPOLOGY_CHANGED on the
+     * leader, there should be a 10min delay before starting a round of cleanup.
+     * This is to not add unnecessary load after a startup/change.
+     */
+    private static final int DEFAULT_CLEANUP_INITIAL_DELAY = 600000; // 10min
+
+    /**
+     * default cleanup interval is 10min - this is together with the batchSize to
+     * lower repository load
+     */
+    private static final int DEFAULT_CLEANUP_INTERVAL = 600000; // 10min
+
+    /**
+     * default batch size is 50 deletions : normally there should not be much
+     * garbage around anyway, so normally it's just a few, 1-5 perhaps. If there's
+     * more than 50, that is probably a one-time cleanup after this feature is first
+     * rolled out. That one-time cleanup can actually take a considerable amount of
+     * time. So, to not overload the write load on the repository, the deletion is
+     * batched into 50 at any time - with 10min delays in between. That results in
+     * an average of 1 cleanup every 12 seconds, or 5 per minute, or 8640 per day,
+     * for a legacy cleanup.
+     */
+    private static final int DEFAULT_CLEANUP_BATCH_SIZE = 50;
+
+    /**
+     * The sling.commons.scheduler name, so that it can be cancelled upon topology
+     * changes.
+     */
+    private static final String SCHEDULE_NAME = "org.apache.sling.discovery.oak.SlingIdCleanupTask";
+
+    protected final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+    @Reference
+    protected Scheduler scheduler;
+
+    @Reference
+    protected ResourceResolverFactory resourceResolverFactory;
+
+    @Reference
+    private Config config;
+
+    /**
+     * volatile flag to fast stop any ongoing deletion upon a change in the topology
+     */
+    private volatile boolean hasTopology = false;
+
+    /**
+     * volatile field to keep track of the current topology, shared between topology
+     * listener and deletion
+     */
+    @SuppressWarnings("all")
+    private volatile TopologyView currentView;
+
+    private int initialDelayMillis = DEFAULT_CLEANUP_INITIAL_DELAY;
+
+    private int intervalMillis = DEFAULT_CLEANUP_INTERVAL;
+
+    private int batchSize = DEFAULT_CLEANUP_BATCH_SIZE;
+
+    private long minCreationAgeMillis = DEFAULT_MIN_CREATION_AGE_MILLIS;
+
+    /** test counter that increments upon every scheduler invocation */
+    private AtomicInteger runCount = new AtomicInteger(0);
+
+    /** test counter that increments upon every batch deletion */
+    private AtomicInteger completionCount = new AtomicInteger(0);
+
+    /** test counter that keeps track of actually deleted slingIds */
+    private AtomicInteger deleteCount = new AtomicInteger(0);
+
+    /**
+     * flag to distinguish first from subsequent runs, as they might have different
+     * scheduler delays
+     */
+    private volatile boolean firstRun = true;
+
+    private long lastSuccessfulRun = -1;
+
+    /**
+     * Minimal delay after a successful cleanup round, in millis
+     */
+    private long minCleanupDelayMillis = MIN_CLEANUP_DELAY_MILLIS;
+
+    /**
+     * contains all slingIds ever seen by this instance - should not be a long list
+     * so not a memory issue
+     */
+    private Set<String> seenInstances = new CopyOnWriteArraySet<>();
+
+    @ObjectClassDefinition(name = "Apache Sling Discovery Oak SlingId Cleanup Task", description = "This task is in charge of cleaning up old SlingIds from the repository.")
+    public @interface Conf {
+
+        @AttributeDefinition(name = "Cleanup initial delay milliseconds", description = "Number of milliseconds to initially wait for the first cleanup")
+        int slingid_cleanup_initial_delay() default DEFAULT_CLEANUP_INITIAL_DELAY;
+
+        @AttributeDefinition(name = "Cleanup interval milliseconds", description = "Number of milliseconds after which to do another batch of cleaning up (if necessary)")
+        int slingid_cleanup_interval() default DEFAULT_CLEANUP_INTERVAL;
+
+        @AttributeDefinition(name = "Cleanup batch size", description = "Maximum number of slingIds to cleanup in one batch.")
+        int slingid_cleanup_batchsize() default DEFAULT_CLEANUP_BATCH_SIZE;
+
+        @AttributeDefinition(name = "Cleanup minimum creation age", description = "Minimum number of milliseconds since the slingId was created.")
+        long slingid_cleanup_min_creation_age() default DEFAULT_MIN_CREATION_AGE_MILLIS;
+    }
+
+    /**
+     * Test constructor
+     */
+    static SlingIdCleanupTask create(Scheduler scheduler, ResourceResolverFactory factory,
+            Config config, int initialDelayMillis, int intervalMillis, int batchSize,
+            long minCreationAgeMillis, long minCleanupDelayMillis) {
+        final SlingIdCleanupTask s = new SlingIdCleanupTask();
+        s.scheduler = scheduler;
+        s.resourceResolverFactory = factory;
+        s.config = config;
+        s.minCleanupDelayMillis = minCleanupDelayMillis;
+        s.config(initialDelayMillis, intervalMillis, batchSize, minCreationAgeMillis);
+        return s;
+    }
+
+    @Activate
+    protected void activate(final BundleContext bc, final Conf config) {
+        this.modified(bc, config);
+    }
+
+    @Modified
+    protected void modified(final BundleContext bc, final Conf config) {
+        if (config == null) {
+            return;
+        }
+        config(config.slingid_cleanup_initial_delay(), config.slingid_cleanup_interval(),
+                config.slingid_cleanup_batchsize(),
+                config.slingid_cleanup_min_creation_age());
+    }
+
+    @Deactivate
+    protected void deactivate() {
+        logger.info("deactivate : deactivated.");
+        hasTopology = false;
+    }
+
+    private void config(int initialDelayMillis, int intervalMillis, int batchSize,
+            long minCreationAgeMillis) {
+        this.initialDelayMillis = initialDelayMillis;
+        this.intervalMillis = intervalMillis;
+        this.batchSize = batchSize;
+        this.minCreationAgeMillis = minCreationAgeMillis;
+        logger.info(
+                "config: enabled = {}, initial delay milliseconds = {}, interval milliseconds = {}, batch size = {}, min creation age milliseconds = {}",
+                isEnabled(), initialDelayMillis, intervalMillis, batchSize,
+                minCreationAgeMillis);
+    }
+
+    @Override
+    public void handleTopologyEvent(TopologyEvent event) {
+        if (!isEnabled()) {
+            hasTopology = false; // stops potentially ongoing deletion
+            currentView = null;
+            // cancel cleanup schedule
+            stop();
+            logger.debug("handleTopologyEvent: slingId cleanup is disabled");
+            return;
+        }
+        final TopologyView newView = event.getNewView();
+        if (newView == null || event.getType() == Type.PROPERTIES_CHANGED) {
+            hasTopology = false; // stops potentially ongoing deletion
+            currentView = null;
+            // cancel cleanup schedule
+            stop();
+        } else {
+            hasTopology = true;
+            currentView = newView;
+            seenInstances.addAll(getActiveSlingIds(newView));
+            if (newView.getLocalInstance().isLeader()) {
+                // only execute on leader
+                recreateSchedule();
+            } else {
+                // should not be necessary, but lets stop anyway on non-leaders:
+                stop();
+            }
+        }
+    }
+
+    /**
+     * Cancels a potentially previously registered cleanup schedule.
+     */
+    private void stop() {
+        final Scheduler localScheduler = scheduler;
+        if (localScheduler == null) {
+            // should not happen
+            logger.warn("stop: no scheduler set, giving up.");
+            return;
+        }
+        final boolean unscheduled = localScheduler.unschedule(SCHEDULE_NAME);
+        logger.debug("stop: unschedule result={}", unscheduled);
+    }
+
+    /**
+     * Reads the system property that enables or disabled this tasks
+     */
+    private static boolean isEnabled() {
+        final String systemPropertyValue = System
+                .getProperty(SLINGID_CLEANUP_ENABLED_SYSTEM_PROPERTY_NAME);
+        return standardConverter().convert(systemPropertyValue).defaultValue(false)
+                .to(Boolean.class);
+    }
+
+    /**
+     * This method can be invoked at any time to reset the schedule to do a fresh
+     * round of cleanup.
+     * <p>
+     * This method is thread-safe : if called concurrently, the fact that
+     * scheduler.schedul is synchronized works out that ultimately there will be
+     * just 1 schedule active (which is what is the desired outcome).
+     */
+    private void recreateSchedule() {
+        final Scheduler localScheduler = scheduler;
+        if (localScheduler == null) {
+            // should not happen
+            logger.warn("recreateSchedule: no scheduler set, giving up.");
+            return;
+        }
+        final Calendar cal = Calendar.getInstance();
+        int delayMillis;
+        if (firstRun) {
+            delayMillis = initialDelayMillis;
+        } else {
+            delayMillis = intervalMillis;
+        }
+        cal.add(Calendar.MILLISECOND, delayMillis);
+        final Date scheduledDate = cal.getTime();
+        logger.debug(
+                "recreateSchedule: scheduling a cleanup in {} milliseconds from now, which is: {}",
+                delayMillis, scheduledDate);
+        ScheduleOptions options = localScheduler.AT(scheduledDate);
+        options.name(SCHEDULE_NAME);
+        options.canRunConcurrently(false); // should not concurrently execute
+        localScheduler.schedule(this, options);
+    }
+
+    /**
+     * Invoked via sling.commons.scheduler triggered from resetCleanupSchedule(). By
+     * default should get called at max every 5 minutes until cleanup is done or
+     * 10min after a topology change.
+     */
+    @Override
+    public void run() {
+        if (lastSuccessfulRun > 0 && System.currentTimeMillis()
+                - lastSuccessfulRun < minCleanupDelayMillis) {
+            logger.debug(
+                    "run: last cleanup was {} millis ago, which is less than {} millis, therefore not cleaning up yet.",
+                    System.currentTimeMillis() - lastSuccessfulRun,
+                    minCleanupDelayMillis);
+            recreateSchedule();
+            return;
+        }
+        runCount.incrementAndGet();
+        if (!hasTopology) {
+            return;
+        }
+        boolean mightHaveMore = true;
+        try {
+            mightHaveMore = cleanup();
+        } catch (Exception e) {
+            // upon exception just log and retry in 10min
+            logger.error("run: got Exception while cleaning up slnigIds : " + e, e);
+        }
+        if (mightHaveMore) {
+            // then continue in 10min
+            recreateSchedule();
+            return;
+        }
+        // log successful cleanup done, yes, on info
+        logger.info(
+                "run: slingId cleanup done, run counter = {}, delete counter = {}, completion counter = {}",
+                getRunCount(), getDeleteCount(), getCompletionCount());
+        lastSuccessfulRun = System.currentTimeMillis();
+    }
+
+    /**
+     * Do the actual cleanup of garbage slingIds and report back with true if there
+     * might be more or false if we're at the end.
+     * 
+     * @return true if there might be more garbage or false if we're at the end
+     */
+    private boolean cleanup() {
+        logger.debug("cleanup: start");
+        if (!isEnabled()) {
+            // bit of overkill probably, as this shouldn't happen.
+            // but adds to a good night's sleep.
+            logger.debug("cleanup: not enabled, stopping.");
+            return false;
+        }
+
+        final ResourceResolverFactory localFactory = resourceResolverFactory;
+        final Config localConfig = config;
+        if (localFactory == null || localConfig == null) {
+            logger.warn("cleanup: cannot cleanup due to rrf={} or c={}", localFactory,
+                    localConfig);
+            return true;
+        }
+        final TopologyView localCurrentView = currentView;
+        if (localCurrentView == null || !localCurrentView.isCurrent()) {
+            logger.debug("cleanup : cannot cleanup as topology recently changed : {}",
+                    localCurrentView);
+            return true;
+        }
+        final Set<String> activeSlingIds = getActiveSlingIds(localCurrentView);
+        try (ResourceResolver resolver = localFactory.getServiceResourceResolver(null)) {
+            final Resource clusterInstances = resolver
+                    .getResource(localConfig.getClusterInstancesPath());
+            final Resource idMap = resolver.getResource(localConfig.getIdMapPath());
+            final Resource syncTokens = resolver
+                    .getResource(localConfig.getSyncTokenPath());
+            if (clusterInstances == null || idMap == null || syncTokens == null) {
+                logger.warn("cleanup: no resource found at {}, {} or {}, stopping.",
+                        localConfig.getClusterInstancesPath(), localConfig.getIdMapPath(),
+                        localConfig.getSyncTokenPath());
+                return false;
+            }
+            resolver.refresh();
+            final ValueMap idMapMap = idMap.adaptTo(ValueMap.class);
+            final ModifiableValueMap syncTokenMap = syncTokens
+                    .adaptTo(ModifiableValueMap.class);
+            final Calendar now = Calendar.getInstance();
+            int removed = 0;
+            boolean mightHaveMore = false;
+            final int localBatchSize = batchSize;
+            final long localMinCreationAgeMillis = minCreationAgeMillis;
+            for (Resource resource : clusterInstances.getChildren()) {
+                if (topologyChanged(localCurrentView)) {
+                    return true;
+                }
+                final String slingId = resource.getName();
+                if (deleteIfOldSlingId(resource, slingId, syncTokenMap, idMapMap,
+                        activeSlingIds, now, localMinCreationAgeMillis)) {
+                    if (++removed >= localBatchSize) {
+                        // we need to stop
+                        mightHaveMore = true;
+                        break;
+                    }
+                }
+            }
+            // if we're not already at the batch limit, check syncTokens too
+            if (!mightHaveMore) {
+                for (String slingId : syncTokenMap.keySet()) {
+                    try {
+                        UUID.fromString(slingId);
+                    } catch (Exception e) {
+                        // not a uuid
+                        continue;
+                    }
+                    if (topologyChanged(localCurrentView)) {
+                        return true;
+                    }
+                    Resource resourceOrNull = clusterInstances.getChild(slingId);
+                    if (deleteIfOldSlingId(resourceOrNull, slingId, syncTokenMap,
+                            idMapMap, activeSlingIds, now, localMinCreationAgeMillis)) {
+                        if (++removed >= localBatchSize) {
+                            // we need to stop
+                            mightHaveMore = true;
+                            break;
+                        }
+                    }
+                }
+            }
+            if (topologyChanged(localCurrentView)) {
+                return true;
+            }
+            if (removed > 0) {
+                // only if we removed something we commit
+                resolver.commit();
+                logger.info(
+                        "cleanup : removed {} old slingIds (batch size : {}), potentially has more: {}",
+                        removed, localBatchSize, mightHaveMore);
+                deleteCount.addAndGet(removed);
+            }
+            firstRun = false;
+            completionCount.incrementAndGet();
+            return mightHaveMore;
+        } catch (LoginException e) {
+            logger.error("cleanup: could not log in administratively: " + e, e);
+            throw new RuntimeException("Could not log in to repository (" + e + ")", e);
+        } catch (PersistenceException e) {
+            logger.error("cleanup: got a PersistenceException: " + e, e);
+            throw new RuntimeException(
+                    "Exception while talking to repository (" + e + ")", e);
+        } finally {
+            logger.debug("cleanup: done.");
+        }
+    }
+
+    private Set<String> getActiveSlingIds(final TopologyView localCurrentView) {
+        return localCurrentView.getLocalInstance().getClusterView().getInstances()
+                .stream().map(InstanceDescription::getSlingId)
+                .collect(Collectors.toSet());
+    }
+
+    private boolean topologyChanged(TopologyView localCurrentView) {
+        if (!hasTopology || currentView != localCurrentView
+                || !localCurrentView.isCurrent()) {
+            // we got interrupted during cleanup
+            // let's not commit at all then
+            logger.debug(
+                    "topologyChanged : topology changing during cleanup - not committing this time - stopping for now.");
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    static long millisOf(Object leaderElectionIdCreatedAt) {
+        if (leaderElectionIdCreatedAt == null) {
+            return -1;
+        }
+        if (leaderElectionIdCreatedAt instanceof Date) {
+            final Date d = (Date) leaderElectionIdCreatedAt;
+            return d.getTime();
+        }
+        if (leaderElectionIdCreatedAt instanceof Calendar) {
+            final Calendar c = (Calendar) leaderElectionIdCreatedAt;
+            return c.getTimeInMillis();
+        }
+        return -1;
+    }
+
+    private boolean deleteIfOldSlingId(Resource resourceOrNull, String slingId,
+            ModifiableValueMap syncTokenMap, ValueMap idMapMap,
+            Set<String> activeSlingIds, Calendar now, long localMinCreationAgeMillis)
+            throws PersistenceException {
+        logger.trace("deleteIfOldSlingId : handling slingId = {}", slingId);
+        if (activeSlingIds.contains(slingId)) {
+            logger.trace("deleteIfOldSlingId : slingId is currently active : {}",
+                    slingId);
+            return false;
+        } else if (seenInstances.contains(slingId)) {
+            logger.trace("deleteIfOldSlingId : slingId seen active previously : {}",
+                    slingId);
+            return false;
+        }
+        // only check in idmap and for leaderElectionId details if the clusterInstance
+        // resource is there
+        if (resourceOrNull != null) {
+            Object clusterNodeId = idMapMap.get(slingId);
+            if (clusterNodeId == null) {
+                logger.trace("deleteIfOldSlingId : slingId {} not recently in use",
+                        slingId);
+            } else {
+                logger.trace("deleteIfOldSlingId : slingId {} WAS recently in use : {}",
+                        slingId, clusterNodeId);
+                return false;
+            }
+            Object o = resourceOrNull.getValueMap().get("leaderElectionIdCreatedAt");
+            final long leaderElectionIdCreatedAt = millisOf(o);
+            if (leaderElectionIdCreatedAt <= 0) {
+                // skip
+                logger.trace(
+                        "deleteIfOldSlingId: resource ({}) has no or wrongly typed leaderElectionIdCreatedAt : {}",
+                        resourceOrNull, o);
+                return false;
+            }
+            final long diffMillis = now.getTimeInMillis() - leaderElectionIdCreatedAt;
+            if (diffMillis <= localMinCreationAgeMillis) {
+                logger.trace("deleteIfOldSlingId: not old slingId : {}", resourceOrNull);
+                return false;
+            }
+        }
+        logger.trace("deleteIfOldSlingId: deleting old slingId : {}", resourceOrNull);
+        syncTokenMap.remove(slingId);
+        if (resourceOrNull != null) {
+            resourceOrNull.getResourceResolver().delete(resourceOrNull);
+        }
+        return true;
+    }
+
+    int getRunCount() {
+        return runCount.get();
+    }
+
+    int getDeleteCount() {
+        return deleteCount.get();
+    }
+
+    int getCompletionCount() {
+        return completionCount.get();
+    }
+}
\ No newline at end of file
diff --git a/src/test/java/org/apache/sling/discovery/oak/TestSlingIdCleanupTask.java b/src/test/java/org/apache/sling/discovery/oak/TestSlingIdCleanupTask.java
new file mode 100644
index 0000000..01ed7b9
--- /dev/null
+++ b/src/test/java/org/apache/sling/discovery/oak/TestSlingIdCleanupTask.java
@@ -0,0 +1,982 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.oak;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.annotation.Annotation;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Hashtable;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.sling.api.resource.LoginException;
+import org.apache.sling.api.resource.ModifiableValueMap;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.api.resource.ValueMap;
+import org.apache.sling.commons.scheduler.Scheduler;
+import org.apache.sling.commons.scheduler.impl.QuartzScheduler;
+import org.apache.sling.commons.scheduler.impl.SchedulerServiceFactory;
+import org.apache.sling.commons.threads.impl.DefaultThreadPoolManager;
+import org.apache.sling.discovery.InstanceDescription;
+import org.apache.sling.discovery.TopologyEvent;
+import org.apache.sling.discovery.TopologyView;
+import org.apache.sling.discovery.base.its.setup.VirtualInstance;
+import org.apache.sling.discovery.commons.providers.DefaultClusterView;
+import org.apache.sling.discovery.commons.providers.DummyTopologyView;
+import org.apache.sling.discovery.commons.providers.util.ResourceHelper;
+import org.apache.sling.discovery.oak.its.setup.OakTestConfig;
+import org.apache.sling.discovery.oak.its.setup.OakVirtualInstanceBuilder;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.osgi.framework.BundleContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.io.Closer;
+
+import junitx.util.PrivateAccessor;
+
+public class TestSlingIdCleanupTask {
+
+    protected static final String PROPERTY_ID_ENDPOINTS = "endpoints";
+
+    protected static final String PROPERTY_ID_SLING_HOME_PATH = "slingHomePath";
+
+    protected static final String PROPERTY_ID_RUNTIME = "runtimeId";
+
+    @SuppressWarnings("all")
+    class DummyConf implements SlingIdCleanupTask.Conf {
+
+        int initialDelay;
+        int interval;
+        int batchSize;
+        long age;
+
+        DummyConf(int initialDelay, int interval, int batchSize, long age) {
+            this.initialDelay = initialDelay;
+            this.interval = interval;
+            this.batchSize = batchSize;
+            this.age = age;
+        }
+
+        @Override
+        public Class<? extends Annotation> annotationType() {
+            return null;
+        }
+
+        @Override
+        public int slingid_cleanup_initial_delay() {
+            return initialDelay;
+        }
+
+        @Override
+        public int slingid_cleanup_interval() {
+            return interval;
+        }
+
+        @Override
+        public int slingid_cleanup_batchsize() {
+            return batchSize;
+        }
+
+        @Override
+        public long slingid_cleanup_min_creation_age() {
+            return age;
+        }
+
+    }
+
+    private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+    private SlingIdCleanupTask cleanupTask;
+
+    private Scheduler scheduler;
+
+    private ResourceResolverFactory factory;
+
+    private Config config;
+
+    private VirtualInstance instance;
+
+    private Closer closer;
+
+    private Scheduler createScheduler() throws Exception {
+        try {
+            return createRealScheduler();
+        } catch (Throwable e) {
+            throw new Exception(e);
+        }
+    }
+
+    private Scheduler createRealScheduler() throws Throwable {
+        final BundleContext ctx = Mockito.mock(BundleContext.class);
+        final Map<String, Object> props = new HashMap<>();
+        final DefaultThreadPoolManager threadPoolManager = new DefaultThreadPoolManager(
+                ctx, new Hashtable<String, Object>());
+        final QuartzScheduler qScheduler = new QuartzScheduler();
+        Scheduler scheduler = new SchedulerServiceFactory();
+        PrivateAccessor.setField(qScheduler, "threadPoolManager", threadPoolManager);
+        PrivateAccessor.invoke(qScheduler, "activate",
+                new Class[] { BundleContext.class, Map.class },
+                new Object[] { ctx, props });
+        PrivateAccessor.setField(scheduler, "scheduler", qScheduler);
+
+        closer.register(new Closeable() {
+
+            @Override
+            public void close() throws IOException {
+                try {
+                    PrivateAccessor.invoke(qScheduler, "deactivate",
+                            new Class[] { BundleContext.class }, new Object[] { ctx });
+                } catch (Throwable e) {
+                    throw new IOException(e);
+                }
+            }
+
+        });
+
+        return scheduler;
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        closer = Closer.create();
+    }
+
+    private void createCleanupTask(int initialDelayMillis, int minCreationAgeMillis)
+            throws Exception {
+        createCleanupTask(initialDelayMillis, 1000, 50, minCreationAgeMillis);
+    }
+
+    private void createCleanupTask(int initialDelayMillis, int intervalMillis,
+            int batchSize, long minCreationAgeMillis) throws Exception {
+        createCleanupTask(initialDelayMillis, intervalMillis, batchSize,
+                minCreationAgeMillis, SlingIdCleanupTask.MIN_CLEANUP_DELAY_MILLIS);
+    }
+
+    private void createCleanupTask(int initialDelayMillis, int intervalMillis,
+            int batchSize, long minCreationAgeMillis, long minCleanupDelayMillis)
+            throws Exception {
+        OakVirtualInstanceBuilder builder = (OakVirtualInstanceBuilder) new OakVirtualInstanceBuilder()
+                .setDebugName("instance").newRepository("/foo/bar/", true)
+                .setConnectorPingInterval(999).setConnectorPingTimeout(999);
+        builder.getConfig().setSuppressPartiallyStartedInstance(true);
+        instance = builder.build();
+        scheduler = createScheduler();
+        factory = instance.getResourceResolverFactory();
+        config = new OakTestConfig();
+        System.setProperty(
+                SlingIdCleanupTask.SLINGID_CLEANUP_ENABLED_SYSTEM_PROPERTY_NAME, "true");
+        cleanupTask = SlingIdCleanupTask.create(scheduler, factory, config,
+                initialDelayMillis, intervalMillis, batchSize, minCreationAgeMillis,
+                minCleanupDelayMillis);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        closer.close();
+        if (instance != null) {
+            instance.stop();
+            instance = null;
+        }
+        System.clearProperty(
+                SlingIdCleanupTask.SLINGID_CLEANUP_ENABLED_SYSTEM_PROPERTY_NAME);
+        if (cleanupTask != null) {
+            cleanupTask.deactivate();
+            cleanupTask = null;
+        }
+    }
+
+    private TopologyView newView() {
+        final DefaultClusterView cluster = new DefaultClusterView(
+                UUID.randomUUID().toString());
+        final DummyTopologyView view = new DummyTopologyView()
+                .addInstance(UUID.randomUUID().toString(), cluster, true, true);
+        return view;
+    }
+
+    private TopologyEvent newInitEvent(TopologyView view) {
+        return new TopologyEvent(TopologyEvent.Type.TOPOLOGY_INIT, null, view);
+    }
+
+    private TopologyEvent newChangingEvent(TopologyView oldView) {
+        return new TopologyEvent(TopologyEvent.Type.TOPOLOGY_CHANGING, oldView, null);
+    }
+
+    private TopologyEvent newPropertiesChangedEvent(TopologyView oldView,
+            TopologyView newView) {
+        return new TopologyEvent(TopologyEvent.Type.PROPERTIES_CHANGED, oldView, newView);
+    }
+
+    private TopologyEvent newChangedEvent(TopologyView oldView, TopologyView newView) {
+        return new TopologyEvent(TopologyEvent.Type.TOPOLOGY_CHANGED, oldView, newView);
+    }
+
+    @Test
+    public void testActivatde() throws Exception {
+        createCleanupTask(0, 86400000);
+        cleanupTask.activate(null, null);
+        cleanupTask.activate(null, new DummyConf(2, 3, 4, 5));
+        assertConfigs(2, 3, 4, 5);
+        cleanupTask.deactivate();
+    }
+
+    @Test
+    public void testModified() throws Exception {
+        createCleanupTask(0, 86400000);
+        cleanupTask.modified(null, null);
+        cleanupTask.modified(null, new DummyConf(3, 4, 5, 6));
+        assertConfigs(3, 4, 5, 6);
+        cleanupTask.modified(null, new DummyConf(4, 5, 6, 7));
+        assertConfigs(4, 5, 6, 7);
+    }
+
+    @Test
+    public void testMillisOf() throws Exception {
+        assertEquals(-1, SlingIdCleanupTask.millisOf(null));
+        assertEquals(2, SlingIdCleanupTask.millisOf(new Date(2)));
+        Calendar cal = Calendar.getInstance();
+        cal.setTime(new Date(3));
+        assertEquals(3, SlingIdCleanupTask.millisOf(cal));
+    }
+
+    @Test
+    public void testNoClusterInstancesResource() throws Exception {
+        createCleanupTask(0, 1, 100, 86400000);
+        cleanupTask.handleTopologyEvent(newInitEvent(newView()));
+        Thread.sleep(1000);
+        assertEquals(1, cleanupTask.getRunCount());
+    }
+
+    @Test
+    public void testNoIdMapResource() throws Exception {
+        createCleanupTask(0, 1, 100, 86400000);
+        createPath(config.getClusterInstancesPath());
+        cleanupTask.handleTopologyEvent(newInitEvent(newView()));
+        Thread.sleep(1000);
+        assertEquals(1, cleanupTask.getRunCount());
+    }
+
+    @Test
+    public void testNoSyncTokenResource() throws Exception {
+        createCleanupTask(0, 1, 100, 86400000);
+        createPath(config.getClusterInstancesPath());
+        createPath(config.getIdMapPath());
+        cleanupTask.handleTopologyEvent(newInitEvent(newView()));
+        Thread.sleep(1000);
+        assertEquals(1, cleanupTask.getRunCount());
+    }
+
+    private void assertConfigs(int expectedInitialDelay, int expectedInterval,
+            int expectedBatchSize, int expectedAge) throws NoSuchFieldException {
+        int initialDelayMillis = (Integer) PrivateAccessor.getField(cleanupTask,
+                "initialDelayMillis");
+        assertEquals(expectedInitialDelay, initialDelayMillis);
+        int intervalMillis = (Integer) PrivateAccessor.getField(cleanupTask,
+                "intervalMillis");
+        assertEquals(expectedInterval, intervalMillis);
+        int batchSize = (Integer) PrivateAccessor.getField(cleanupTask, "batchSize");
+        assertEquals(expectedBatchSize, batchSize);
+        long minCreationAgeMillis = (Long) PrivateAccessor.getField(cleanupTask,
+                "minCreationAgeMillis");
+        assertEquals(expectedAge, minCreationAgeMillis);
+    }
+
+    @Test
+    public void testPropertiesChanged() throws Exception {
+        createCleanupTask(0, 86400000);
+        assertEquals(0, cleanupTask.getDeleteCount());
+        TopologyView view1 = newView();
+        TopologyView view2 = newView();
+        TopologyEvent event = newPropertiesChangedEvent(view1, view2);
+        cleanupTask.handleTopologyEvent(event);
+        assertEquals(0, cleanupTask.getDeleteCount());
+        Thread.sleep(500);
+        assertEquals(0, cleanupTask.getDeleteCount());
+    }
+
+    private void waitForRunCount(SlingIdCleanupTask task, int expectedRunCount,
+            int timeoutMillis) throws InterruptedException {
+        final long start = System.currentTimeMillis();
+        long diff;
+        do {
+            if (task.getCompletionCount() == expectedRunCount) {
+                return;
+            }
+            Thread.sleep(50);
+            diff = (start + timeoutMillis) - System.currentTimeMillis();
+        } while (diff > 0);
+        assertEquals("did not reach expected runcount within " + timeoutMillis + "ms",
+                expectedRunCount, task.getCompletionCount());
+    }
+
+    @Test
+    public void testInit() throws Exception {
+        createCleanupTask(0, 86400000);
+        assertEquals(0, cleanupTask.getDeleteCount());
+        createSlingIds(5, 10, 0);
+
+        TopologyView view1 = newView();
+        TopologyEvent event = newInitEvent(view1);
+        cleanupTask.handleTopologyEvent(event);
+        assertEquals(0, cleanupTask.getDeleteCount());
+        waitForRunCount(cleanupTask, 1, 5000);
+        assertEquals(10, cleanupTask.getDeleteCount());
+    }
+
+    @Test
+    public void testInit_smallBatch() throws Exception {
+        createCleanupTask(0, 500, 2, 86400000);
+        assertEquals(0, cleanupTask.getDeleteCount());
+        createSlingIds(5, 10, 0);
+
+        TopologyView view1 = newView();
+        TopologyEvent event = newInitEvent(view1);
+        cleanupTask.handleTopologyEvent(event);
+        assertEquals(0, cleanupTask.getDeleteCount());
+        waitForRunCount(cleanupTask, 1, 5000);
+        assertEquals(2, cleanupTask.getDeleteCount());
+        waitForRunCount(cleanupTask, 2, 5000);
+        assertEquals(4, cleanupTask.getDeleteCount());
+        waitForRunCount(cleanupTask, 3, 5000);
+        assertEquals(6, cleanupTask.getDeleteCount());
+        waitForRunCount(cleanupTask, 4, 5000);
+        assertEquals(8, cleanupTask.getDeleteCount());
+        waitForRunCount(cleanupTask, 5, 5000);
+        assertEquals(10, cleanupTask.getDeleteCount());
+    }
+
+    @Test
+    public void testChanging() throws Exception {
+        createCleanupTask(1000, 86400000);
+        assertEquals(0, cleanupTask.getDeleteCount());
+        createSlingIds(5, 10, 0);
+
+        TopologyView view1 = newView();
+        cleanupTask.handleTopologyEvent(newInitEvent(view1));
+        cleanupTask.handleTopologyEvent(newChangingEvent(view1));
+
+        assertEquals(0, cleanupTask.getDeleteCount());
+        Thread.sleep(500);
+        assertEquals(0, cleanupTask.getDeleteCount());
+    }
+
+    @Test
+    public void testNoScheduler() throws Exception {
+        createCleanupTask(1000, 86400000);
+        PrivateAccessor.setField(cleanupTask, "scheduler", null);
+        cleanupTask.activate(null, new DummyConf(2, 3, 4, 5));
+        cleanupTask.handleTopologyEvent(newChangingEvent(newView()));
+        // no asserts, just tests execution without exception
+        cleanupTask.handleTopologyEvent(newChangedEvent(newView(), newView()));
+        // no asserts, just tests execution without exception
+    }
+
+    @Test
+    public void testChanged() throws Exception {
+        createCleanupTask(1000, 86400000);
+        assertEquals(0, cleanupTask.getDeleteCount());
+        createSlingIds(5, 10, 0);
+
+        TopologyView view1 = newView();
+        cleanupTask.handleTopologyEvent(newInitEvent(view1));
+        cleanupTask.handleTopologyEvent(newChangingEvent(view1));
+        assertEquals(0, cleanupTask.getDeleteCount());
+
+        TopologyView view2 = newView();
+        cleanupTask.handleTopologyEvent(newChangedEvent(view1, view2));
+        Thread.sleep(500);
+        assertEquals(0, cleanupTask.getDeleteCount());
+        waitForRunCount(cleanupTask, 1, 5000);
+        assertEquals(10, cleanupTask.getDeleteCount());
+    }
+
+    @Test
+    public void testRepetitionDelay() throws Exception {
+        createCleanupTask(1000, 86400000);
+        createSlingIds(5, 10, 0);
+
+        TopologyView view1 = newView();
+        cleanupTask.handleTopologyEvent(newInitEvent(view1));
+        waitForRunCount(cleanupTask, 1, 5000);
+        assertEquals(10, cleanupTask.getDeleteCount());
+        cleanupTask.handleTopologyEvent(newChangingEvent(view1));
+        Thread.sleep(1500);
+        assertEquals(1, cleanupTask.getRunCount());
+
+        TopologyView view2 = newView();
+        cleanupTask.handleTopologyEvent(newChangedEvent(view1, view2));
+        Thread.sleep(1500);
+        assertEquals(1, cleanupTask.getRunCount());
+        assertEquals(10, cleanupTask.getDeleteCount());
+    }
+
+    @Test
+    public void testDisabled() throws Exception {
+        createCleanupTask(1000, 86400000);
+        System.setProperty(
+                SlingIdCleanupTask.SLINGID_CLEANUP_ENABLED_SYSTEM_PROPERTY_NAME, "false");
+        assertEquals(0, cleanupTask.getDeleteCount());
+        createSlingIds(5, 10, 0);
+
+        TopologyView view1 = newView();
+        cleanupTask.handleTopologyEvent(newInitEvent(view1));
+        cleanupTask.handleTopologyEvent(newChangingEvent(view1));
+        assertEquals(0, cleanupTask.getDeleteCount());
+
+        TopologyView view2 = newView();
+        cleanupTask.handleTopologyEvent(newChangedEvent(view1, view2));
+        Thread.sleep(500);
+        assertEquals(0, cleanupTask.getDeleteCount());
+        Thread.sleep(1000);
+        assertEquals(0, cleanupTask.getRunCount());
+        assertEquals(0, cleanupTask.getDeleteCount());
+    }
+
+    /**
+     * This tests the case where there are slingIds under /clusterInstances with no
+     * corresponding syncToken
+     */
+    @Test
+    public void testOrphanedClsuterInstances() throws Exception {
+        createCleanupTask(1000, 86400000);
+        createSlingIds(5, 10, 0, 3);
+
+        TopologyView view1 = newView();
+        cleanupTask.handleTopologyEvent(newInitEvent(view1));
+        waitForRunCount(cleanupTask, 1, 5000);
+        assertEquals(10, cleanupTask.getDeleteCount());
+    }
+
+    /**
+     * This test the case where there are syncTokens without a corresponding slingId
+     * under /clusterInstances
+     */
+    @Test
+    public void testOrphanedSyncTokens() throws Exception {
+        createCleanupTask(1000, 86400000);
+        createSlingIds(5, 10, 0, 20);
+
+        instance.dumpRepo();
+
+        TopologyView view1 = newView();
+        cleanupTask.handleTopologyEvent(newInitEvent(view1));
+        waitForRunCount(cleanupTask, 1, 5000);
+        assertEquals(15, cleanupTask.getDeleteCount());
+    }
+
+    @Test
+    public void testLeaderVsNonLeader() throws Exception {
+        createCleanupTask(250, 86400000);
+        assertEquals(0, cleanupTask.getDeleteCount());
+        List<String> slingIds = createSlingIds(3, 7, 2);
+
+        final String clusterId = UUID.randomUUID().toString();
+        final DefaultClusterView remoteLeaderCluster = new DefaultClusterView(clusterId);
+        final DefaultClusterView localLeaderCluster = new DefaultClusterView(clusterId);
+        final DummyTopologyView remoteLeaderView = new DummyTopologyView();
+        final DummyTopologyView localLeaderView = new DummyTopologyView();
+
+        Iterator<String> it = slingIds.iterator();
+        String leaderSlingId = it.next(); // first is declared leader
+        String localSlignId = it.next(); // second is local
+        int idx = 0;
+        for (String aSlingId : slingIds) {
+            remoteLeaderView.addInstance(aSlingId, remoteLeaderCluster,
+                    aSlingId.equals(leaderSlingId), aSlingId.equals(localSlignId));
+            localLeaderView.addInstance(aSlingId, localLeaderCluster,
+                    aSlingId.equals(localSlignId), aSlingId.equals(localSlignId));
+            if (++idx >= 2) {
+                break;
+            }
+        }
+
+        assertEquals(0, cleanupTask.getDeleteCount());
+        cleanupTask.handleTopologyEvent(newInitEvent(remoteLeaderView));
+        assertEquals(0, cleanupTask.getDeleteCount());
+        Thread.sleep(1000);
+        assertEquals(0, cleanupTask.getDeleteCount());
+        cleanupTask.handleTopologyEvent(newChangingEvent(remoteLeaderView));
+        assertEquals(0, cleanupTask.getDeleteCount());
+        Thread.sleep(1000);
+        assertEquals(0, cleanupTask.getDeleteCount());
+        cleanupTask
+                .handleTopologyEvent(newChangedEvent(remoteLeaderView, localLeaderView));
+        assertEquals(0, cleanupTask.getDeleteCount());
+        assertEquals(0, cleanupTask.getCompletionCount());
+        waitForRunCount(cleanupTask, 1, 5000);
+        assertEquals(1, cleanupTask.getCompletionCount());
+        assertEquals(5, cleanupTask.getDeleteCount());
+    }
+
+    @Test
+    public void testOldSlingIdsButNowActive() throws InterruptedException, Exception {
+        doTestOldSlingIdsButActive(4, 9, 7);
+    }
+
+    @Test
+    public void testOldSlingIdButRecentlyActive() throws Exception, InterruptedException {
+        createCleanupTask(1000, 1000, 50, 86400000, -1);
+        assertEquals(0, cleanupTask.getDeleteCount());
+        int currentIds = 4;
+        int oldIds = 9;
+        int activeIds = 7;
+        int activeOldIds = Math.max(0, activeIds - currentIds);
+        List<String> slingIds = createSlingIds(currentIds, oldIds, activeOldIds);
+
+        final String clusterViewId = UUID.randomUUID().toString();
+        DefaultClusterView cluster = new DefaultClusterView(clusterViewId);
+        DummyTopologyView view1 = new DummyTopologyView();
+
+        Iterator<String> it = slingIds.iterator();
+        String leaderSlingId = it.next(); // first is declared leader
+        String localSlignId = leaderSlingId; // and is local too
+        int idx = 0;
+        for (String aSlingId : slingIds) {
+            view1.addInstance(aSlingId, cluster, aSlingId.equals(leaderSlingId),
+                    aSlingId.equals(localSlignId));
+            if (++idx >= activeIds) {
+                break;
+            }
+        }
+
+        cleanupTask.handleTopologyEvent(newInitEvent(view1));
+        assertEquals(0, cleanupTask.getDeleteCount());
+        assertEquals(0, cleanupTask.getCompletionCount());
+        waitForRunCount(cleanupTask, 1, 5000);
+        int expectedDeleteCount = oldIds - activeOldIds;
+        assertEquals(expectedDeleteCount, cleanupTask.getDeleteCount());
+        assertNoGarbageLeft(instance, config, view1, 86400000);
+        idx = 0;
+        for (String aSlingId : slingIds) {
+            logger.info("checking idx=" + idx + ", slingId=" + aSlingId);
+            if (idx < 2) {
+                // those are currently active and should of course not have been deleted
+                assertStatus(instance, config, aSlingId, false);
+            } else if (idx < activeIds) {
+                // same here, while they are not currently active, they were active during
+                // leader's lifetime
+                assertStatus(instance, config, aSlingId, false);
+            } else {
+                // for the rest: those should be deleted
+                assertStatus(instance, config, aSlingId, true);
+            }
+            idx++;
+        }
+
+        // in addition to the above, which is the same as doTestOldSlingIdsButActive,
+        // we now simulate some activeIds crashing.
+        // in particular some old and some current ones.
+        // the logic should be that, as long as the leader at some previous
+        // time was in a topology with those now crashed active ids, it would
+        // not delete them.
+        // this is to avoid a race condition that could other wise happen
+        // between a crash looping instance and this cleanup mechanism
+        cluster = new DefaultClusterView(clusterViewId);
+        DummyTopologyView view2 = new DummyTopologyView();
+
+        it = slingIds.iterator();
+        leaderSlingId = it.next(); // first is declared leader
+        localSlignId = leaderSlingId; // and is local too
+        idx = 0;
+        for (String aSlingId : slingIds) {
+            view2.addInstance(aSlingId, cluster, aSlingId.equals(leaderSlingId),
+                    aSlingId.equals(localSlignId));
+            if (++idx >= 2) { // let's have only 2 active instances
+                break;
+            }
+        }
+        cleanupTask.handleTopologyEvent(newChangingEvent(view1));
+        assertEquals(1, cleanupTask.getCompletionCount());
+        cleanupTask.handleTopologyEvent(newChangedEvent(view1, view2));
+        assertEquals(expectedDeleteCount, cleanupTask.getDeleteCount());
+        waitForRunCount(cleanupTask, 2, 5000);
+        assertEquals(2, cleanupTask.getCompletionCount());
+        // no further cleanup should have happened
+        assertEquals(expectedDeleteCount, cleanupTask.getDeleteCount());
+        // now check for correct non-/deletion
+        assertNoGarbageLeft(instance, config, view1, 86400000);
+        idx = 0;
+        for (String aSlingId : slingIds) {
+            if (idx < 2) {
+                // those are currently active and should of course not have been deleted
+                assertStatus(instance, config, aSlingId, false);
+            } else if (idx < activeIds) {
+                // same here, while they are not currently active, they were active during
+                // leader's lifetime
+                assertStatus(instance, config, aSlingId, false);
+            } else {
+                // for the rest: those should be deleted
+                assertStatus(instance, config, aSlingId, true);
+            }
+            idx++;
+        }
+    }
+
+    private void assertStatus(VirtualInstance i, Config c, String slingId,
+            boolean deleted) throws Exception {
+        ResourceResolverFactory f = i.getResourceResolverFactory();
+
+        ResourceResolver resolver = null;
+        resolver = f.getServiceResourceResolver(null);
+
+        final Resource clusterInstances = ResourceHelper.getOrCreateResource(resolver,
+                c.getClusterInstancesPath());
+        final Resource idMap = ResourceHelper.getOrCreateResource(resolver,
+                c.getIdMapPath());
+        final Resource syncTokens = ResourceHelper.getOrCreateResource(resolver,
+                c.getSyncTokenPath());
+        resolver.refresh();
+
+        if (deleted) {
+            assertNull(clusterInstances.getChild(slingId));
+            assertNull(idMap.getValueMap().get(slingId));
+            assertNull(syncTokens.getValueMap().get(slingId));
+        } else {
+            assertNotNull(clusterInstances.getChild(slingId));
+            assertNotNull(idMap.getValueMap().get(slingId));
+            assertNotNull(syncTokens.getValueMap().get(slingId));
+        }
+    }
+
+    @Test
+    public void testOldSlingIdButActive_all() throws Exception {
+        doTestOldSlingIdsButActive(5, 10, 15);
+    }
+
+    @Test
+    public void testOldSlingIdButActive_almostall() throws Exception {
+        doTestOldSlingIdsButActive(5, 10, 14);
+    }
+
+    @Test
+    public void testOldSlingIdButActive_some() throws Exception {
+        doTestOldSlingIdsButActive(5, 10, 8);
+    }
+
+    @Test
+    public void testOldSlingIdButActive_one() throws Exception {
+        doTestOldSlingIdsButActive(5, 10, 6);
+    }
+
+    @Test
+    public void testOldSlingIdButActive_none() throws Exception {
+        doTestOldSlingIdsButActive(5, 10, 5);
+    }
+
+    @Test
+    public void testOldSlingIdButActive_oneRecent() throws Exception {
+        doTestOldSlingIdsButActive(5, 10, 4);
+    }
+
+    @Test
+    public void testOldSlingIdButActive_twoRecent() throws Exception {
+        doTestOldSlingIdsButActive(5, 10, 3);
+    }
+
+    @Test
+    public void testOldSlingIdButActive_threeRecent() throws Exception {
+        doTestOldSlingIdsButActive(5, 10, 2);
+    }
+
+    @Test
+    public void testOldSlingIdButActive_allRecent() throws Exception {
+        doTestOldSlingIdsButActive(5, 10, 1);
+    }
+
+    private void doTestOldSlingIdsButActive(int recentIds, int oldIds, int activeIds)
+            throws Exception, InterruptedException {
+        logger.info(
+                "doTestOldSlingIdsButActive : recentIds={}, oldIds={}, activeIds={} : START",
+                recentIds, oldIds, activeIds);
+        createCleanupTask(1000, 86400000);
+        assertEquals(0, cleanupTask.getDeleteCount());
+        List<String> slingIds = createSlingIds(recentIds, oldIds,
+                Math.max(0, activeIds - recentIds));
+
+        final DefaultClusterView cluster = new DefaultClusterView(
+                UUID.randomUUID().toString());
+        final DummyTopologyView view = new DummyTopologyView();
+
+        Iterator<String> it = slingIds.iterator();
+        String leaderSlingId = it.next(); // first is declared leader
+        String localSlignId = leaderSlingId; // and is local too
+        int idx = 0;
+        for (String aSlingId : slingIds) {
+            view.addInstance(aSlingId, cluster, aSlingId.equals(leaderSlingId),
+                    aSlingId.equals(localSlignId));
+            if (++idx >= activeIds) {
+                break;
+            }
+        }
+
+        cleanupTask.handleTopologyEvent(newInitEvent(view));
+        assertEquals(0, cleanupTask.getDeleteCount());
+        assertEquals(0, cleanupTask.getCompletionCount());
+        waitForRunCount(cleanupTask, 1, 5000);
+        assertEquals(Math.max(0, oldIds - Math.max(0, activeIds - recentIds)),
+                cleanupTask.getDeleteCount());
+        assertNoGarbageLeft(instance, config, view, 86400000);
+    }
+
+    private void assertNoGarbageLeft(VirtualInstance i, Config c,
+            TopologyView currentView, long maxAgeMillis) throws Exception {
+        List<InstanceDescription> instances = currentView.getLocalInstance()
+                .getClusterView().getInstances();
+        Set<String> activeIds = new HashSet<>();
+        for (InstanceDescription id : instances) {
+            activeIds.add(id.getSlingId());
+        }
+
+        ResourceResolverFactory f = i.getResourceResolverFactory();
+
+        ResourceResolver resolver = null;
+        resolver = f.getServiceResourceResolver(null);
+
+        final Resource clusterInstances = ResourceHelper.getOrCreateResource(resolver,
+                c.getClusterInstancesPath());
+        final Resource idMap = ResourceHelper.getOrCreateResource(resolver,
+                c.getIdMapPath());
+        final Resource syncTokens = ResourceHelper.getOrCreateResource(resolver,
+                c.getSyncTokenPath());
+        resolver.refresh();
+
+        final ValueMap idMapMap = idMap.adaptTo(ValueMap.class);
+        final ValueMap syncTokenMap = syncTokens.adaptTo(ValueMap.class);
+
+        for (Resource aChild : clusterInstances.getChildren()) {
+            String slingId = aChild.getName();
+            if (!isGarbage(aChild, maxAgeMillis)) {
+                activeIds.add(slingId);
+            }
+        }
+
+        Set<String> idMapSlingIds = new HashSet<>();
+        for (Entry<String, Object> e : idMapMap.entrySet()) {
+            String k = e.getKey();
+            if (!k.contains(":") && k.contains("-")) {
+                idMapSlingIds.add(k);
+            }
+        }
+        assertCurrentSlingIds(idMapSlingIds, activeIds, "idmap");
+        assertEquals("idmap size", activeIds.size(), idMapSlingIds.size());
+
+        Set<String> syncTokenSlingIds = new HashSet<>();
+        for (Entry<String, Object> e : syncTokenMap.entrySet()) {
+            String k = e.getKey();
+            if (!k.contains(":") && k.contains("-")) {
+                syncTokenSlingIds.add(k);
+            }
+        }
+        assertCurrentSlingIds(syncTokenSlingIds, activeIds, "syncToken");
+        assertEquals("syncToken size", activeIds.size(), syncTokenSlingIds.size());
+        resolver.close();
+    }
+
+    private boolean isGarbage(Resource aChild, long maxAgeMillis) {
+        Calendar now = Calendar.getInstance();
+        Object o = aChild.getValueMap().get("leaderElectionIdCreatedAt");
+        final long leaderElectionIdCreatedAt = SlingIdCleanupTask.millisOf(o);
+        if (leaderElectionIdCreatedAt <= 0) {
+            // skip
+            return false;
+        }
+        final long diffMillis = now.getTimeInMillis() - leaderElectionIdCreatedAt;
+        return diffMillis > maxAgeMillis;
+    }
+
+    private void assertCurrentSlingIds(Set<String> slingIds, Set<String> activeIds,
+            String context) {
+        for (String aSlingId : slingIds) {
+            assertTrue("[" + context + "] stored slingId " + aSlingId
+                    + " not in current view", activeIds.contains(aSlingId));
+        }
+    }
+
+    /** Get or create a ResourceResolver **/
+    private ResourceResolver getResourceResolver() throws LoginException {
+        return factory.getServiceResourceResolver(null);
+    }
+
+    /**
+     * Calculate a new leaderElectionId based on the current config and system time
+     */
+    private String newLeaderElectionId(String slingId) {
+        int maxLongLength = String.valueOf(Long.MAX_VALUE).length();
+        String currentTimeMillisStr = String.format("%0" + maxLongLength + "d",
+                System.currentTimeMillis());
+
+        String prefix = String.valueOf(config.getLeaderElectionPrefix());
+
+        final String newLeaderElectionId = prefix + "_" + currentTimeMillisStr + "_"
+                + slingId;
+        return newLeaderElectionId;
+    }
+
+    private boolean createSyncToken(String slingId, long seqNum) throws Exception {
+        final String syncTokenPath = config.getSyncTokenPath();
+        ResourceResolver resourceResolver = getResourceResolver();
+        if (resourceResolver == null) {
+            fail("could not login");
+            return false;
+        }
+        final Resource resource = ResourceHelper.getOrCreateResource(resourceResolver,
+                syncTokenPath);
+        final ModifiableValueMap resourceMap = resource.adaptTo(ModifiableValueMap.class);
+        resourceMap.put(slingId, seqNum);
+        logger.info("createSyncToken: storing syncToken: {}, slingId: {}", seqNum,
+                slingId);
+        resourceResolver.commit();
+        return true;
+    }
+
+    private void createPath(String path) throws Exception {
+        ResourceResolver resourceResolver = getResourceResolver();
+        if (resourceResolver == null) {
+            fail("could not login");
+            return;
+        }
+
+        ResourceHelper.getOrCreateResource(resourceResolver, path);
+        resourceResolver.commit();
+    }
+
+    private boolean createClusterInstance(String uuid, String runtimeId,
+            String slingHomePath, String endpointsAsString, Calendar jcrCreated)
+            throws Exception {
+        final String myClusterNodePath = config.getClusterInstancesPath() + "/" + uuid;
+        ResourceResolver resourceResolver = getResourceResolver();
+        if (resourceResolver == null) {
+            fail("could not login");
+            return false;
+        }
+        String newLeaderElectionId = newLeaderElectionId(uuid);
+
+        final Resource resource = ResourceHelper.getOrCreateResource(resourceResolver,
+                myClusterNodePath);
+        final ModifiableValueMap resourceMap = resource.adaptTo(ModifiableValueMap.class);
+
+        resourceMap.put(PROPERTY_ID_RUNTIME, runtimeId);
+        resourceMap.put(PROPERTY_ID_SLING_HOME_PATH, slingHomePath);
+        resourceMap.put(PROPERTY_ID_ENDPOINTS, endpointsAsString);
+
+        resourceMap.put("leaderElectionId", newLeaderElectionId);
+        resourceMap.put("leaderElectionIdCreatedAt", jcrCreated);
+
+        resourceMap.put("jcr:created", jcrCreated);
+
+        logger.info(
+                "createClusterInstance: storing my runtimeId: {}, endpoints: {}, sling home path: {}, new leaderElectionId: {}, created at: {}",
+                new Object[] { runtimeId, endpointsAsString, slingHomePath,
+                        newLeaderElectionId, jcrCreated });
+        resourceResolver.commit();
+        return true;
+    }
+
+    private void fillIdMap(Map<String, Long> ids) throws Exception {
+        ResourceResolver resourceResolver = getResourceResolver();
+        final Resource resource = ResourceHelper.getOrCreateResource(resourceResolver,
+                config.getIdMapPath());
+        ModifiableValueMap idmap = resource.adaptTo(ModifiableValueMap.class);
+        for (Entry<String, Long> e : ids.entrySet()) {
+            idmap.put(e.getKey(), e.getValue());
+        }
+        resourceResolver.commit();
+    }
+
+    private List<String> createSlingIds(int currentIds, int oldIds, int activeOldIds)
+            throws Exception {
+        return createSlingIds(currentIds, oldIds, activeOldIds, currentIds + oldIds);
+    }
+
+    private List<String> createSlingIds(int currentIds, int oldIds, int activeOldIds,
+            int numSyncTokens) throws Exception {
+        final List<String> orderedIds = new LinkedList<>();
+        final Map<String, Long> slingIdToClusterNodeIds = new HashMap<>();
+        final Map<String, Long> slingIdToSeqNums = new HashMap<>();
+        int currentSeqNum = currentIds * 2 + oldIds * 3 + 42;
+        for (long i = 0; i < currentIds; i++) {
+            final String uuid = UUID.randomUUID().toString();
+            orderedIds.add(uuid);
+            slingIdToClusterNodeIds.put(uuid, i);
+            slingIdToSeqNums.put(uuid, Long.valueOf(currentSeqNum));
+            createClusterInstance(uuid, UUID.randomUUID().toString(), "/a/b/c", "n/a",
+                    Calendar.getInstance());
+        }
+        for (long i = 0; i < oldIds; i++) {
+            final String uuid = UUID.randomUUID().toString();
+            if (i < activeOldIds) {
+                slingIdToClusterNodeIds.put(uuid, i + currentIds);
+            }
+            orderedIds.add(uuid);
+            slingIdToSeqNums.put(uuid, Long.valueOf(i));
+            final Calendar cal = Calendar.getInstance();
+            cal.add(Calendar.DAY_OF_YEAR, -(7 + (int) i));
+            createClusterInstance(uuid, UUID.randomUUID().toString(), "/a/b/c", "n/a",
+                    cal);
+        }
+        // idmap is created for all currentIds and active old ids
+        fillIdMap(slingIdToClusterNodeIds);
+        while (numSyncTokens > slingIdToSeqNums.size()) {
+            slingIdToSeqNums.put(UUID.randomUUID().toString(), Long.valueOf(1));
+        }
+        int c = 0;
+        // first make sure as many active slingIds have a syncToken as possible
+        for (String activeSlingId : slingIdToClusterNodeIds.keySet()) {
+            createSyncToken(activeSlingId, slingIdToSeqNums.get(activeSlingId));
+            if (++c >= numSyncTokens) {
+                break;
+            }
+        }
+        if (c < numSyncTokens) {
+            // only after they have been served, consider the rest, again up to
+            // numSynTtokens
+            for (Entry<String, Long> e : slingIdToSeqNums.entrySet()) {
+                if (slingIdToClusterNodeIds.containsKey(e.getKey())) {
+                    // then it was probably already added above
+                    continue;
+                }
+                createSyncToken(e.getKey(), e.getValue());
+                if (++c >= numSyncTokens) {
+                    break;
+                }
+            }
+        }
+        return orderedIds;
+    }
+}
\ No newline at end of file