/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.sling.discovery.oak;

import static org.osgi.util.converter.Converters.standardConverter;

import java.util.Calendar;
import java.util.Date;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.ModifiableValueMap;
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.Resource;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.api.resource.ValueMap;
import org.apache.sling.commons.scheduler.ScheduleOptions;
import org.apache.sling.commons.scheduler.Scheduler;
import org.apache.sling.discovery.InstanceDescription;
import org.apache.sling.discovery.TopologyEvent;
import org.apache.sling.discovery.TopologyEvent.Type;
import org.apache.sling.discovery.TopologyEventListener;
import org.apache.sling.discovery.TopologyView;
import org.osgi.framework.BundleContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Modified;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.metatype.annotations.AttributeDefinition;
import org.osgi.service.metatype.annotations.Designate;
import org.osgi.service.metatype.annotations.ObjectClassDefinition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * A background task that cleans up garbage slingIds after topology changes.
 * <p>
 * A slingId is considered garbage when:
 * <ul>
 * <li>it is not in the current topology</li>
 * <li>was not ever seen in previous topologies by the now leader instance</li>
 * <li>it is not in the current idmap (where clusterNodeIds are reused hence
 * that list stays small and does not need cleanup)</li>
 * <li>its leaderElectionId was created more than 7 days ago (the
 * leaderElectionId is created at activate time of the discovery.oak bundle -
 * hence this more or less corresponds to the startup time of that
 * instance)</li>
 * </ul>
 * The garbage accumulates at the following places, where it will thus be
 * cleaned up from:
 * <ul>
 * <li>as child node under /var/discovery/oak/clusterInstances : this is the
 * most performance critical garbage</li>
 * <li>as a property key in /var/discovery/oak/syncTokens</li>
 * </ul>
 * The task by default is executed:
 * <ul>
 * <li>only on the leader</li>
 * <li>10min after a TOPOLOGY_INIT or TOPOLOGY_CHANGED event</li>
 * <li>with a maximum number of delete operations to avoid repository overload -
 * that maximum is called batchSize and is 50 by default</li>
 * <li>in subsequent intervals of 10min after the initial run, if that had to
 * stop at the batchSize of 50 deletions</li>
 * </ul>
 * All parameters mentioned above can be configured.
 * <p>
 * Additionally, the cleanup is skipped for 13 hours after a successful cleanup.
 * This is to avoid unnecessary load on the repository. The number of 13
 * incorporates some heuristics such as : about 2 cleanup rounds per day maximum
 * makes sense, if a leader is very long living, then the 1 additional hour
 * makes it spread somewhat throughout the day. This is to further minimize any
 * load side-effects.
 */
@Component
@Designate(ocd = SlingIdCleanupTask.Conf.class)
public class SlingIdCleanupTask implements TopologyEventListener, Runnable {

    final static String SLINGID_CLEANUP_ENABLED_SYSTEM_PROPERTY_NAME = "org.apache.sling.discovery.oak.slingidcleanup.enabled";

    /** default minimal cleanup delay at 13h, to intraday load balance */
    final static long MIN_CLEANUP_DELAY_MILLIS = 46800000;

    /**
     * default age is 1 week : an instance that is not in the current topology,
     * started 1 week ago is very unlikely to still be active
     */
    private static final long DEFAULT_MIN_CREATION_AGE_MILLIS = 604800000; // 1 week

    /**
     * initial delay is 10min : after a TOPOLOGY_INIT or TOPOLOGY_CHANGED on the
     * leader, there should be a 10min delay before starting a round of cleanup.
     * This is to not add unnecessary load after a startup/change.
     */
    private static final int DEFAULT_CLEANUP_INITIAL_DELAY = 600000; // 10min

    /**
     * default cleanup interval is 10min - this is together with the batchSize to
     * lower repository load
     */
    private static final int DEFAULT_CLEANUP_INTERVAL = 600000; // 10min

    /**
     * default batch size is 50 deletions : normally there should not be much
     * garbage around anyway, so normally it's just a few, 1-5 perhaps. If there's
     * more than 50, that is probably a one-time cleanup after this feature is first
     * rolled out. That one-time cleanup can actually take a considerable amount of
     * time. So, to not overload the write load on the repository, the deletion is
     * batched into 50 at any time - with 10min delays in between. That results in
     * an average of 1 cleanup every 12 seconds, or 5 per minute, or 8640 per day,
     * for a legacy cleanup.
     */
    private static final int DEFAULT_CLEANUP_BATCH_SIZE = 50;

    /**
     * The sling.commons.scheduler name, so that it can be cancelled upon topology
     * changes.
     */
    private static final String SCHEDULE_NAME = "org.apache.sling.discovery.oak.SlingIdCleanupTask";

    protected final Logger logger = LoggerFactory.getLogger(this.getClass());

    @Reference
    protected Scheduler scheduler;

    @Reference
    protected ResourceResolverFactory resourceResolverFactory;

    @Reference
    private Config config;

    /**
     * volatile flag to fast stop any ongoing deletion upon a change in the topology
     */
    private volatile boolean hasTopology = false;

    /**
     * volatile field to keep track of the current topology, shared between topology
     * listener and deletion
     */
    @SuppressWarnings("all")
    private volatile TopologyView currentView;

    private int initialDelayMillis = DEFAULT_CLEANUP_INITIAL_DELAY;

    private int intervalMillis = DEFAULT_CLEANUP_INTERVAL;

    private int batchSize = DEFAULT_CLEANUP_BATCH_SIZE;

    private long minCreationAgeMillis = DEFAULT_MIN_CREATION_AGE_MILLIS;

    /** test counter that increments upon every scheduler invocation */
    private AtomicInteger runCount = new AtomicInteger(0);

    /** test counter that increments upon every batch deletion */
    private AtomicInteger completionCount = new AtomicInteger(0);

    /** test counter that keeps track of actually deleted slingIds */
    private AtomicInteger deleteCount = new AtomicInteger(0);

    /**
     * flag to distinguish first from subsequent runs, as they might have different
     * scheduler delays
     */
    private volatile boolean firstRun = true;

    private long lastSuccessfulRun = -1;

    /**
     * Minimal delay after a successful cleanup round, in millis
     */
    private long minCleanupDelayMillis = MIN_CLEANUP_DELAY_MILLIS;

    /**
     * contains all slingIds ever seen by this instance - should not be a long list
     * so not a memory issue
     */
    private Set<String> seenInstances = new CopyOnWriteArraySet<>();

    @ObjectClassDefinition(name = "Apache Sling Discovery Oak SlingId Cleanup Task", description = "This task is in charge of cleaning up old SlingIds from the repository.")
    public @interface Conf {

        @AttributeDefinition(name = "Cleanup initial delay milliseconds", description = "Number of milliseconds to initially wait for the first cleanup")
        int slingid_cleanup_initial_delay() default DEFAULT_CLEANUP_INITIAL_DELAY;

        @AttributeDefinition(name = "Cleanup interval milliseconds", description = "Number of milliseconds after which to do another batch of cleaning up (if necessary)")
        int slingid_cleanup_interval() default DEFAULT_CLEANUP_INTERVAL;

        @AttributeDefinition(name = "Cleanup batch size", description = "Maximum number of slingIds to cleanup in one batch.")
        int slingid_cleanup_batchsize() default DEFAULT_CLEANUP_BATCH_SIZE;

        @AttributeDefinition(name = "Cleanup minimum creation age", description = "Minimum number of milliseconds since the slingId was created.")
        long slingid_cleanup_min_creation_age() default DEFAULT_MIN_CREATION_AGE_MILLIS;
    }

    /**
     * Test constructor
     */
    static SlingIdCleanupTask create(Scheduler scheduler, ResourceResolverFactory factory,
            Config config, int initialDelayMillis, int intervalMillis, int batchSize,
            long minCreationAgeMillis, long minCleanupDelayMillis) {
        final SlingIdCleanupTask s = new SlingIdCleanupTask();
        s.scheduler = scheduler;
        s.resourceResolverFactory = factory;
        s.config = config;
        s.minCleanupDelayMillis = minCleanupDelayMillis;
        s.config(initialDelayMillis, intervalMillis, batchSize, minCreationAgeMillis);
        return s;
    }

    @Activate
    protected void activate(final BundleContext bc, final Conf config) {
        this.modified(bc, config);
    }

    @Modified
    protected void modified(final BundleContext bc, final Conf config) {
        if (config == null) {
            return;
        }
        config(config.slingid_cleanup_initial_delay(), config.slingid_cleanup_interval(),
                config.slingid_cleanup_batchsize(),
                config.slingid_cleanup_min_creation_age());
    }

    @Deactivate
    protected void deactivate() {
        logger.info("deactivate : deactivated.");
        hasTopology = false;
    }

    private void config(int initialDelayMillis, int intervalMillis, int batchSize,
            long minCreationAgeMillis) {
        this.initialDelayMillis = initialDelayMillis;
        this.intervalMillis = intervalMillis;
        this.batchSize = batchSize;
        this.minCreationAgeMillis = minCreationAgeMillis;
        logger.info(
                "config: enabled = {}, initial delay milliseconds = {}, interval milliseconds = {}, batch size = {}, min creation age milliseconds = {}",
                isEnabled(), initialDelayMillis, intervalMillis, batchSize,
                minCreationAgeMillis);
    }

    @Override
    public void handleTopologyEvent(TopologyEvent event) {
        if (!isEnabled()) {
            hasTopology = false; // stops potentially ongoing deletion
            currentView = null;
            // cancel cleanup schedule
            stop();
            logger.debug("handleTopologyEvent: slingId cleanup is disabled");
            return;
        }
        final TopologyView newView = event.getNewView();
        if (event.getType() == Type.PROPERTIES_CHANGED) {
            // ignore those
        } else if (newView == null) {
            // that's a TOPOLOGY_CHANGING
            hasTopology = false; // stops potentially ongoing deletion
            currentView = null;
            // cancel cleanup schedule
            stop();
        } else {
            // that's TOPOLOGY_INIT or TOPOLOGY_CHANGED
            hasTopology = true;
            currentView = newView;
            seenInstances.addAll(getActiveSlingIds(newView));
            if (newView.getLocalInstance().isLeader()) {
                // only execute on leader
                recreateSchedule();
            } else {
                // should not be necessary, but lets stop anyway on non-leaders:
                stop();
            }
        }
    }

    /**
     * Cancels a potentially previously registered cleanup schedule.
     */
    private void stop() {
        final Scheduler localScheduler = scheduler;
        if (localScheduler == null) {
            // should not happen
            logger.warn("stop: no scheduler set, giving up.");
            return;
        }
        final boolean unscheduled = localScheduler.unschedule(SCHEDULE_NAME);
        if (unscheduled) {
            logger.info("stop: unscheduled");
        } else {
            logger.debug("stop: unschedule was not necessary");
        }
    }

    /**
     * Reads the system property that enables or disabled this tasks
     */
    private static boolean isEnabled() {
        final String systemPropertyValue = System
                .getProperty(SLINGID_CLEANUP_ENABLED_SYSTEM_PROPERTY_NAME);
        return standardConverter().convert(systemPropertyValue).defaultValue(false)
                .to(Boolean.class);
    }

    /**
     * This method can be invoked at any time to reset the schedule to do a fresh
     * round of cleanup.
     * <p>
     * This method is thread-safe : if called concurrently, the fact that
     * scheduler.schedul is synchronized works out that ultimately there will be
     * just 1 schedule active (which is what is the desired outcome).
     */
    private void recreateSchedule() {
        final Scheduler localScheduler = scheduler;
        if (localScheduler == null) {
            // should not happen
            logger.warn("recreateSchedule: no scheduler set, giving up.");
            return;
        }
        final Calendar cal = Calendar.getInstance();
        int delayMillis;
        if (firstRun) {
            delayMillis = initialDelayMillis;
        } else {
            delayMillis = intervalMillis;
        }
        cal.add(Calendar.MILLISECOND, delayMillis);
        final Date scheduledDate = cal.getTime();
        logger.info(
                "recreateSchedule: scheduling a cleanup in {} milliseconds from now, which is: {}",
                delayMillis, scheduledDate);
        ScheduleOptions options = localScheduler.AT(scheduledDate);
        options.name(SCHEDULE_NAME);
        options.canRunConcurrently(false); // should not concurrently execute
        localScheduler.schedule(this, options);
    }

    /**
     * Invoked via sling.commons.scheduler triggered from resetCleanupSchedule(). By
     * default should get called at max every 5 minutes until cleanup is done or
     * 10min after a topology change.
     */
    @Override
    public void run() {
        if (lastSuccessfulRun > 0 && System.currentTimeMillis()
                - lastSuccessfulRun < minCleanupDelayMillis) {
            logger.debug(
                    "run: last cleanup was {} millis ago, which is less than {} millis, therefore not cleaning up yet.",
                    System.currentTimeMillis() - lastSuccessfulRun,
                    minCleanupDelayMillis);
            recreateSchedule();
            return;
        }
        runCount.incrementAndGet();
        if (!hasTopology) {
            return;
        }
        boolean mightHaveMore = true;
        try {
            mightHaveMore = cleanup();
        } catch (Exception e) {
            // upon exception just log and retry in 10min
            logger.error("run: got Exception while cleaning up slnigIds : " + e, e);
        }
        if (mightHaveMore) {
            // then continue in 10min
            recreateSchedule();
            return;
        }
        // log successful cleanup done, yes, on info
        logger.info(
                "run: slingId cleanup done, run counter = {}, delete counter = {}, completion counter = {}",
                getRunCount(), getDeleteCount(), getCompletionCount());
        lastSuccessfulRun = System.currentTimeMillis();
    }

    /**
     * Do the actual cleanup of garbage slingIds and report back with true if there
     * might be more or false if we're at the end.
     * 
     * @return true if there might be more garbage or false if we're at the end
     */
    private boolean cleanup() {
        logger.debug("cleanup: start");
        if (!isEnabled()) {
            // bit of overkill probably, as this shouldn't happen.
            // but adds to a good night's sleep.
            logger.debug("cleanup: not enabled, stopping.");
            return false;
        }

        final ResourceResolverFactory localFactory = resourceResolverFactory;
        final Config localConfig = config;
        if (localFactory == null || localConfig == null) {
            logger.warn("cleanup: cannot cleanup due to rrf={} or c={}", localFactory,
                    localConfig);
            return true;
        }
        final TopologyView localCurrentView = currentView;
        if (localCurrentView == null || !localCurrentView.isCurrent()) {
            logger.debug("cleanup : cannot cleanup as topology recently changed : {}",
                    localCurrentView);
            return true;
        }
        final Set<String> activeSlingIds = getActiveSlingIds(localCurrentView);
        try (ResourceResolver resolver = localFactory.getServiceResourceResolver(null)) {
            final Resource clusterInstances = resolver
                    .getResource(localConfig.getClusterInstancesPath());
            final Resource idMap = resolver.getResource(localConfig.getIdMapPath());
            final Resource syncTokens = resolver
                    .getResource(localConfig.getSyncTokenPath());
            if (clusterInstances == null || idMap == null || syncTokens == null) {
                logger.warn("cleanup: no resource found at {}, {} or {}, stopping.",
                        localConfig.getClusterInstancesPath(), localConfig.getIdMapPath(),
                        localConfig.getSyncTokenPath());
                return false;
            }
            resolver.refresh();
            final ValueMap idMapMap = idMap.adaptTo(ValueMap.class);
            final ModifiableValueMap syncTokenMap = syncTokens
                    .adaptTo(ModifiableValueMap.class);
            final Calendar now = Calendar.getInstance();
            int removed = 0;
            boolean mightHaveMore = false;
            final int localBatchSize = batchSize;
            final long localMinCreationAgeMillis = minCreationAgeMillis;
            for (Resource resource : clusterInstances.getChildren()) {
                if (topologyChanged(localCurrentView)) {
                    return true;
                }
                final String slingId = resource.getName();
                if (deleteIfOldSlingId(resource, slingId, syncTokenMap, idMapMap,
                        activeSlingIds, now, localMinCreationAgeMillis)) {
                    if (++removed >= localBatchSize) {
                        // we need to stop
                        mightHaveMore = true;
                        break;
                    }
                }
            }
            // if we're not already at the batch limit, check syncTokens too
            if (!mightHaveMore) {
                for (String slingId : syncTokenMap.keySet()) {
                    try {
                        UUID.fromString(slingId);
                    } catch (Exception e) {
                        // not a uuid
                        continue;
                    }
                    if (topologyChanged(localCurrentView)) {
                        return true;
                    }
                    Resource resourceOrNull = clusterInstances.getChild(slingId);
                    if (deleteIfOldSlingId(resourceOrNull, slingId, syncTokenMap,
                            idMapMap, activeSlingIds, now, localMinCreationAgeMillis)) {
                        if (++removed >= localBatchSize) {
                            // we need to stop
                            mightHaveMore = true;
                            break;
                        }
                    }
                }
            }
            if (topologyChanged(localCurrentView)) {
                return true;
            }
            if (removed > 0) {
                // only if we removed something we commit
                resolver.commit();
                logger.info(
                        "cleanup : removed {} old slingIds (batch size : {}), potentially has more: {}",
                        removed, localBatchSize, mightHaveMore);
                deleteCount.addAndGet(removed);
            }
            firstRun = false;
            completionCount.incrementAndGet();
            return mightHaveMore;
        } catch (LoginException e) {
            logger.error("cleanup: could not log in administratively: " + e, e);
            throw new RuntimeException("Could not log in to repository (" + e + ")", e);
        } catch (PersistenceException e) {
            logger.error("cleanup: got a PersistenceException: " + e, e);
            throw new RuntimeException(
                    "Exception while talking to repository (" + e + ")", e);
        } finally {
            logger.debug("cleanup: done.");
        }
    }

    private Set<String> getActiveSlingIds(final TopologyView localCurrentView) {
        return localCurrentView.getLocalInstance().getClusterView().getInstances()
                .stream().map(InstanceDescription::getSlingId)
                .collect(Collectors.toSet());
    }

    private boolean topologyChanged(TopologyView localCurrentView) {
        if (!hasTopology || currentView != localCurrentView
                || !localCurrentView.isCurrent()) {
            // we got interrupted during cleanup
            // let's not commit at all then
            logger.debug(
                    "topologyChanged : topology changing during cleanup - not committing this time - stopping for now.");
            return true;
        } else {
            return false;
        }
    }

    static long millisOf(Object leaderElectionIdCreatedAt) {
        if (leaderElectionIdCreatedAt == null) {
            return -1;
        }
        if (leaderElectionIdCreatedAt instanceof Date) {
            final Date d = (Date) leaderElectionIdCreatedAt;
            return d.getTime();
        }
        if (leaderElectionIdCreatedAt instanceof Calendar) {
            final Calendar c = (Calendar) leaderElectionIdCreatedAt;
            return c.getTimeInMillis();
        }
        return -1;
    }

    private boolean deleteIfOldSlingId(Resource resourceOrNull, String slingId,
            ModifiableValueMap syncTokenMap, ValueMap idMapMap,
            Set<String> activeSlingIds, Calendar now, long localMinCreationAgeMillis)
            throws PersistenceException {
        logger.trace("deleteIfOldSlingId : handling slingId = {}", slingId);
        if (activeSlingIds.contains(slingId)) {
            logger.trace("deleteIfOldSlingId : slingId is currently active : {}",
                    slingId);
            return false;
        } else if (seenInstances.contains(slingId)) {
            logger.trace("deleteIfOldSlingId : slingId seen active previously : {}",
                    slingId);
            return false;
        }
        // only check in idmap and for leaderElectionId details if the clusterInstance
        // resource is there
        if (resourceOrNull != null) {
            Object clusterNodeId = idMapMap.get(slingId);
            if (clusterNodeId == null) {
                logger.trace("deleteIfOldSlingId : slingId {} not recently in use",
                        slingId);
            } else {
                logger.trace("deleteIfOldSlingId : slingId {} WAS recently in use : {}",
                        slingId, clusterNodeId);
                return false;
            }
            Object o = resourceOrNull.getValueMap().get("leaderElectionIdCreatedAt");
            final long leaderElectionIdCreatedAt = millisOf(o);
            if (leaderElectionIdCreatedAt <= 0) {
                // skip
                logger.trace(
                        "deleteIfOldSlingId: resource ({}) has no or wrongly typed leaderElectionIdCreatedAt : {}",
                        resourceOrNull, o);
                return false;
            }
            final long diffMillis = now.getTimeInMillis() - leaderElectionIdCreatedAt;
            if (diffMillis <= localMinCreationAgeMillis) {
                logger.trace("deleteIfOldSlingId: not old slingId : {}", resourceOrNull);
                return false;
            }
        }
        logger.trace("deleteIfOldSlingId: deleting old slingId : {}", resourceOrNull);
        syncTokenMap.remove(slingId);
        if (resourceOrNull != null) {
            resourceOrNull.getResourceResolver().delete(resourceOrNull);
        }
        return true;
    }

    int getRunCount() {
        return runCount.get();
    }

    int getDeleteCount() {
        return deleteCount.get();
    }

    int getCompletionCount() {
        return completionCount.get();
    }
}