/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements. See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership. The SF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations under the License.
 */
package org.apache.felix.hc.core.impl.executor;

import static org.apache.felix.hc.api.FormattingResultLog.msHumanReadable;
import static org.apache.felix.hc.core.impl.executor.HealthCheckExecutorImplConfiguration.LONGRUNNING_FUTURE_THRESHOLD_CRITICAL_DEFAULT_MS;
import static org.apache.felix.hc.core.impl.executor.HealthCheckExecutorImplConfiguration.RESULT_CACHE_TTL_DEFAULT_MS;
import static org.apache.felix.hc.core.impl.executor.HealthCheckExecutorImplConfiguration.TIMEOUT_DEFAULT_MS;

import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Set;

import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.time.StopWatch;
import org.apache.felix.hc.api.FormattingResultLog;
import org.apache.felix.hc.api.HealthCheck;
import org.apache.felix.hc.api.Result;
import org.apache.felix.hc.api.ResultLog;
import org.apache.felix.hc.api.execution.HealthCheckExecutionOptions;
import org.apache.felix.hc.api.execution.HealthCheckExecutionResult;
import org.apache.felix.hc.api.execution.HealthCheckExecutor;
import org.apache.felix.hc.api.execution.HealthCheckMetadata;
import org.apache.felix.hc.api.execution.HealthCheckSelector;
import org.apache.felix.hc.core.impl.executor.async.AsyncHealthCheckExecutor;
import org.apache.felix.hc.core.impl.util.HealthCheckFilter;
import org.osgi.framework.BundleContext;
import org.osgi.framework.Constants;
import org.osgi.framework.InvalidSyntaxException;
import org.osgi.framework.ServiceEvent;
import org.osgi.framework.ServiceListener;
import org.osgi.framework.ServiceReference;
import org.osgi.framework.startlevel.FrameworkStartLevel;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Modified;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.metatype.annotations.Designate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Runs health checks for a given list of tags in parallel. */
@Component(service = { HealthCheckExecutor.class, ExtendedHealthCheckExecutor.class }, immediate = true // immediate = true to keep the
                                                                                                        // cache!
)
@Designate(ocd = HealthCheckExecutorImplConfiguration.class)
public class HealthCheckExecutorImpl implements ExtendedHealthCheckExecutor, ServiceListener {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    private static final String HC_LOGGING_SYS_PROP = "org.apache.felix.hc.autoLogging";
    
    private long timeoutInMs;

    private long longRunningFutureThresholdForRedMs;

    private long resultCacheTtlInMs;

    private String[] defaultTags;

    private HealthCheckResultCache healthCheckResultCache = new HealthCheckResultCache();

    private TempUnavailableGracePeriodEvaluator tempUnavailableGracePeriodEvaluator;
    
    private final Map<HealthCheckMetadata, HealthCheckFuture> stillRunningFutures = new HashMap<HealthCheckMetadata, HealthCheckFuture>();

    @Reference
    private AsyncHealthCheckExecutor asyncHealthCheckExecutor;

    @Reference
    HealthCheckExecutorThreadPool healthCheckExecutorThreadPool;

    private BundleContext bundleContext;

    @Activate
    protected final void activate(final HealthCheckExecutorImplConfiguration configuration, final BundleContext bundleContext) {
        this.bundleContext = bundleContext;

        configure(configuration);

        try {
            this.bundleContext.addServiceListener(this, "("
                    + Constants.OBJECTCLASS + "=" + HealthCheck.class.getName() + ")");
        } catch (final InvalidSyntaxException ise) {
            // this should really never happen as the expression above is constant
            throw new RuntimeException("Unexpected problem with filter syntax", ise);
        }

        logger.info("HealthCheckExecutor active at start level {}", getCurrentStartLevel());
    }

    @Modified
    protected final void modified(final HealthCheckExecutorImplConfiguration configuration) {
        configure(configuration);
    }

    @Deactivate
    protected final void deactivate() {
        this.bundleContext.removeServiceListener(this);
        this.healthCheckResultCache.clear();
        logger.info("HealthCheckExecutor shutdown at start level {}", getCurrentStartLevel());
    }
    
    private int getCurrentStartLevel() {
        return bundleContext.getBundle(Constants.SYSTEM_BUNDLE_ID).adapt(FrameworkStartLevel.class).getStartLevel();
    }

    protected final void configure(final HealthCheckExecutorImplConfiguration configuration) {
        this.timeoutInMs = configuration.timeoutInMs();
        if (this.timeoutInMs <= 0L) {
            this.timeoutInMs = TIMEOUT_DEFAULT_MS;
        }

        this.longRunningFutureThresholdForRedMs = configuration.longRunningFutureThresholdForCriticalMs();
        if (this.longRunningFutureThresholdForRedMs <= 0L) {
            this.longRunningFutureThresholdForRedMs = LONGRUNNING_FUTURE_THRESHOLD_CRITICAL_DEFAULT_MS;
        }

        this.resultCacheTtlInMs = configuration.resultCacheTtlInMs();
        if (this.resultCacheTtlInMs <= 0L) {
            this.resultCacheTtlInMs = RESULT_CACHE_TTL_DEFAULT_MS;
        }

        this.defaultTags = configuration.defaultTags();

        tempUnavailableGracePeriodEvaluator = new TempUnavailableGracePeriodEvaluator(configuration.temporarilyAvailableGracePeriodInMs());
        
        System.setProperty(HC_LOGGING_SYS_PROP, String.valueOf(configuration.autoLogging()));

    }

    @Override
    public void serviceChanged(final ServiceEvent event) {
        if (event.getType() == ServiceEvent.UNREGISTERING) {
            final Long serviceId = (Long) event.getServiceReference().getProperty(Constants.SERVICE_ID);
            this.healthCheckResultCache.removeCachedResult(serviceId);
        }
    }

    @Override
    public List<HealthCheckExecutionResult> execute(HealthCheckSelector selector) {
        return execute(selector, new HealthCheckExecutionOptions());
    }

    @Override
    public List<HealthCheckExecutionResult> execute(HealthCheckSelector selector, HealthCheckExecutionOptions options) {
        logger.debug("Starting executing checks for filter selector {} and execution options {}", selector, options);

        if (ArrayUtils.isEmpty(selector.tags())) {
            logger.debug("Using default tags");
            selector.withTags(defaultTags);
        }

        final ServiceReference<HealthCheck>[] healthCheckReferences = selectHealthCheckReferences(selector, options);
        List<HealthCheckExecutionResult> results = this.execute(healthCheckReferences, options);
        return results;

    }

    /** @see org.apache.felix.hc.core.impl.executor.ExtendedHealthCheckExecutor#selectHealthCheckReferences(HealthCheckSelector,
     *      HealthCheckExecutionOptions) */
    @Override
    public ServiceReference<HealthCheck>[] selectHealthCheckReferences(HealthCheckSelector selector, HealthCheckExecutionOptions options) {
        final HealthCheckFilter filter = new HealthCheckFilter(this.bundleContext);
        final ServiceReference<HealthCheck>[] healthCheckReferences = filter.getHealthCheckServiceReferences(selector, options.isCombineTagsWithOr());
        return healthCheckReferences;
    }

    /** @see org.apache.felix.hc.core.impl.executor.ExtendedHealthCheckExecutor#execute(org.osgi.framework.ServiceReference) */
    @Override
    public HealthCheckExecutionResult execute(final ServiceReference<HealthCheck> ref) {
        final HealthCheckMetadata metadata = this.getHealthCheckMetadata(ref);
        return createResultsForDescriptor(metadata);
    }

    /** @see org.apache.felix.hc.core.impl.executor.ExtendedHealthCheckExecutor#execute(ServiceReference[], HealthCheckExecutionOptions) */
    @Override
    public List<HealthCheckExecutionResult> execute(final ServiceReference<HealthCheck>[] healthCheckReferences,
            HealthCheckExecutionOptions options) {
        final long startTime = System.currentTimeMillis();

        final List<HealthCheckExecutionResult> results = new ArrayList<HealthCheckExecutionResult>();
        final List<HealthCheckMetadata> healthCheckDescriptors = getHealthCheckMetadata(healthCheckReferences);

        createResultsForDescriptors(healthCheckDescriptors, results, options);

        if (logger.isDebugEnabled()) {
            logger.debug("Time consumed for all checks: {}", msHumanReadable(System.currentTimeMillis() - startTime));
        }

        // sort result
        Collections.sort(results, new Comparator<HealthCheckExecutionResult>() {

            @Override
            public int compare(final HealthCheckExecutionResult arg0,
                    final HealthCheckExecutionResult arg1) {
                return ((ExecutionResult) arg0).compareTo((ExecutionResult) arg1);
            }

        });
        return results;
    }

    // method to get the result for one HC (using the generic method to get multiple under the hood
    private HealthCheckExecutionResult createResultsForDescriptor(final HealthCheckMetadata metadata) {

        final List<HealthCheckExecutionResult> results = new ArrayList<HealthCheckExecutionResult>();
        final List<HealthCheckMetadata> healthCheckDescriptors = new ArrayList<HealthCheckMetadata>();
        healthCheckDescriptors.add(metadata);

        createResultsForDescriptors(healthCheckDescriptors, results, new HealthCheckExecutionOptions());
        
        if (results.size() != 1) {
            throw new IllegalStateException("Execute method for a single service reference unexpectedly resulted in "+results.size()+ " results: "+results);
        }
        return results.get(0);
        
    }
    
    private void createResultsForDescriptors(final List<HealthCheckMetadata> healthCheckDescriptors,
            final List<HealthCheckExecutionResult> results, HealthCheckExecutionOptions options) {
        // -- All methods below check if they can transform a healthCheckDescriptor into a result
        // -- if yes the descriptor is removed from the list and the result added

        // get async results
        if (!options.isForceInstantExecution()) {
            if (asyncHealthCheckExecutor != null) {
                asyncHealthCheckExecutor.collectAsyncResults(healthCheckDescriptors, results, healthCheckResultCache);
            }
        }

        // reuse cached results where possible
        if (!options.isForceInstantExecution()) {
            healthCheckResultCache.useValidCacheResults(healthCheckDescriptors, results, resultCacheTtlInMs);
        }

        // everything else is executed in parallel via futures
        List<HealthCheckFuture> futures = createOrReuseFutures(healthCheckDescriptors);

        // wait for futures at most until timeout (but will return earlier if all futures are finished)
        waitForFuturesRespectingTimeout(futures, options);
        collectResultsFromFutures(futures, results);

        // respect sticky results if configured via HealthCheck.KEEP_NON_OK_RESULTS_STICKY_FOR_SEC
        appendStickyResultLogIfConfigured(results);

        // ensure long standing TEMPORARILY_UNAVAILABLE results are marked as CRITICAL
        tempUnavailableGracePeriodEvaluator.evaluateGracePeriodForTemporarilyUnavailableResults(results);
    }

    private void appendStickyResultLogIfConfigured(List<HealthCheckExecutionResult> results) {
        ListIterator<HealthCheckExecutionResult> resultsIt = results.listIterator();
        while (resultsIt.hasNext()) {
            HealthCheckExecutionResult result = resultsIt.next();
            Long warningsStickForMinutes = result.getHealthCheckMetadata().getKeepNonOkResultsStickyForSec();
            if (warningsStickForMinutes != null && warningsStickForMinutes > 0) {
                result = healthCheckResultCache.createExecutionResultWithStickyResults(result);
                resultsIt.set(result);
            }
        }
    }


    /** Create the health check meta data */
    private List<HealthCheckMetadata> getHealthCheckMetadata(final ServiceReference... healthCheckReferences) {
        final List<HealthCheckMetadata> descriptors = new LinkedList<HealthCheckMetadata>();
        for (final ServiceReference serviceReference : healthCheckReferences) {
            final HealthCheckMetadata descriptor = getHealthCheckMetadata(serviceReference);

            descriptors.add(descriptor);
        }

        return descriptors;
    }

    /** Create the health check meta data */
    private HealthCheckMetadata getHealthCheckMetadata(final ServiceReference healthCheckReference) {
        final HealthCheckMetadata descriptor = new HealthCheckMetadata(healthCheckReference);
        return descriptor;
    }

    /** Create or reuse future for the list of health checks */
    private List<HealthCheckFuture> createOrReuseFutures(final List<HealthCheckMetadata> healthCheckDescriptors) {
        final List<HealthCheckFuture> futuresForResultOfThisCall = new LinkedList<HealthCheckFuture>();

        synchronized (this.stillRunningFutures) {
            for (final HealthCheckMetadata md : healthCheckDescriptors) {

                futuresForResultOfThisCall.add(createOrReuseFuture(md));

            }
        }
        return futuresForResultOfThisCall;
    }

    /** Create or reuse future for the health check This method must be synchronized by the caller(!) on stillRunningFutures */
    private HealthCheckFuture createOrReuseFuture(final HealthCheckMetadata metadata) {
        HealthCheckFuture future = this.stillRunningFutures.get(metadata);
        if (future != null) {
            logger.debug("Found a future that is still running for {}", metadata);
        } else {
            logger.debug("Creating future for {}", metadata);
            future = new HealthCheckFuture(metadata, bundleContext, new HealthCheckFuture.Callback() {

                @Override
                public void finished(final HealthCheckExecutionResult result) {
                    healthCheckResultCache.updateWith(result);
                    asyncHealthCheckExecutor.updateWith(result);
                    tempUnavailableGracePeriodEvaluator.updateTemporarilyUnavailableTimestampWith(result);
                    synchronized (stillRunningFutures) {
                        stillRunningFutures.remove(metadata);
                    }
                }
            });
            this.stillRunningFutures.put(metadata, future);

            final HealthCheckFuture newFuture = future;

            healthCheckExecutorThreadPool.execute(new Runnable() {
                @Override
                public void run() {
                    newFuture.run();
                    synchronized (stillRunningFutures) {
                        // notify executor threads that newFuture is finished. Wrapping it in another runnable
                        // ensures that newFuture.isDone() will return true (if e.g. done in callback above, there are
                        // still a few lines of code until the future is really done and hence then the executor thread
                        // is sometime notified a bit too early, still receives the result isDone()=false and then waits
                        // for another 50ms, even though the future was about to be done one ms later)
                        stillRunningFutures.notifyAll();
                    }
                }
            });
        }

        return future;
    }

    /** Wait for the futures until the timeout is reached */
    private void waitForFuturesRespectingTimeout(final List<HealthCheckFuture> futuresForResultOfThisCall,
            HealthCheckExecutionOptions options) {
        final StopWatch callExcutionTimeStopWatch = new StopWatch();
        callExcutionTimeStopWatch.start();
        boolean allFuturesDone;

        long effectiveTimeout = this.timeoutInMs;
        if (options != null && options.getOverrideGlobalTimeout() > 0) {
            effectiveTimeout = options.getOverrideGlobalTimeout();
        }

        if (futuresForResultOfThisCall.isEmpty()) {
            return; // nothing to wait for (usually because of cached results)
        }

        do {
            try {
                synchronized (stillRunningFutures) {
                    stillRunningFutures.wait(50); // wait for notifications of callbacks of HealthCheckFutures
                }
            } catch (final InterruptedException ie) {
                logger.warn("Unexpected InterruptedException while waiting for healthCheckContributors", ie);
            }

            allFuturesDone = true;
            for (final HealthCheckFuture healthCheckFuture : futuresForResultOfThisCall) {
                allFuturesDone &= healthCheckFuture.isDone();
            }
        } while (!allFuturesDone && callExcutionTimeStopWatch.getTime() < effectiveTimeout);
    }

    /** Collect the results from all futures
     * 
     * @param futuresForResultOfThisCall The list of futures
     * @param results The result collection */
    void collectResultsFromFutures(final List<HealthCheckFuture> futuresForResultOfThisCall,
            final Collection<HealthCheckExecutionResult> results) {

        final Set<HealthCheckExecutionResult> resultsFromFutures = new HashSet<HealthCheckExecutionResult>();

        final Iterator<HealthCheckFuture> futuresIt = futuresForResultOfThisCall.iterator();
        while (futuresIt.hasNext()) {
            final HealthCheckFuture future = futuresIt.next();
            final HealthCheckExecutionResult result = this.collectResultFromFuture(future);

            resultsFromFutures.add(result);
            futuresIt.remove();
        }

        logger.debug("Adding {} results from futures", resultsFromFutures.size());
        results.addAll(resultsFromFutures);
    }

    /** Collect the result from a single future
     * 
     * @param future The future
     * @return The execution result or a result for a reached timeout */
    HealthCheckExecutionResult collectResultFromFuture(final HealthCheckFuture future) {

        HealthCheckExecutionResult result;
        HealthCheckMetadata hcMetadata = future.getHealthCheckMetadata();
        if (future.isDone()) {
            logger.debug("Health Check is done: {}", hcMetadata);

            try {
                result = future.get();
            } catch (final Exception e) {
                logger.warn("Unexpected Exception during future.get(): " + e, e);
                long futureElapsedTimeMs = new Date().getTime() - future.getCreatedTime().getTime();
                result = new ExecutionResult(hcMetadata, Result.Status.HEALTH_CHECK_ERROR,
                        "Unexpected Exception during future.get(): " + e, futureElapsedTimeMs, false);
            }

        } else {
            logger.debug("Health Check timed out: {}", hcMetadata);
            // Futures must not be cancelled as interrupting a health check might leave the system in invalid state
            // (worst case could be a corrupted repository index if using write operations)

            // normally we turn the check into WARN (normal timeout), but if the threshold time for CRITICAL is reached for a certain
            // future we turn the result CRITICAL
            long futureElapsedTimeMs = new Date().getTime() - future.getCreatedTime().getTime();
            FormattingResultLog resultLog = new FormattingResultLog();
            if (futureElapsedTimeMs < this.longRunningFutureThresholdForRedMs) {
                resultLog.warn("Timeout: Check still running after " + msHumanReadable(futureElapsedTimeMs));
            } else {
                resultLog.critical("Timeout: Check still running after " + msHumanReadable(futureElapsedTimeMs)
                        + " (exceeding the configured threshold for CRITICAL: "
                        + msHumanReadable(this.longRunningFutureThresholdForRedMs) + ")");
            }

            // add logs from previous, cached result if exists (using a 1 year TTL)
            HealthCheckExecutionResult lastCachedResult = healthCheckResultCache.getValidCacheResult(hcMetadata, 1000 * 60 * 60 * 24 * 365);
            if (lastCachedResult != null) {
                DateFormat df = new SimpleDateFormat("HH:mm:ss.SSS");
                resultLog.info("*** Result log of last execution finished at {} after {} ***",
                        df.format(lastCachedResult.getFinishedAt()),
                        msHumanReadable(lastCachedResult.getElapsedTimeInMs()));
                for (ResultLog.Entry entry : lastCachedResult.getHealthCheckResult()) {
                    resultLog.add(entry);
                }
            }

            result = new ExecutionResult(hcMetadata, new Result(resultLog), futureElapsedTimeMs, true);

        }

        return result;
    }

    public void setTimeoutInMs(final long timeoutInMs) {
        this.timeoutInMs = timeoutInMs;
    }

    public void setLongRunningFutureThresholdForRedMs(
            final long longRunningFutureThresholdForRedMs) {
        this.longRunningFutureThresholdForRedMs = longRunningFutureThresholdForRedMs;
    }

}
