/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.sling.event.impl.jobs.scheduling;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.sling.api.resource.ModifiableValueMap;
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.Resource;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.resource.observation.ExternalResourceChangeListener;
import org.apache.sling.api.resource.observation.ResourceChange;
import org.apache.sling.api.resource.observation.ResourceChangeListener;
import org.apache.sling.commons.scheduler.JobContext;
import org.apache.sling.commons.scheduler.ScheduleOptions;
import org.apache.sling.commons.scheduler.Scheduler;
import org.apache.sling.event.impl.jobs.JobManagerImpl;
import org.apache.sling.event.impl.jobs.Utility;
import org.apache.sling.event.impl.jobs.config.ConfigurationChangeListener;
import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
import org.apache.sling.event.impl.jobs.config.TopologyCapabilities;
import org.apache.sling.event.impl.support.ResourceHelper;
import org.apache.sling.event.impl.support.ScheduleInfoImpl;
import org.apache.sling.event.jobs.JobBuilder;
import org.apache.sling.event.jobs.ScheduleInfo;
import org.apache.sling.event.jobs.ScheduleInfo.ScheduleType;
import org.apache.sling.event.jobs.ScheduledJobInfo;
import org.osgi.service.event.Event;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
 * The scheduler for managing scheduled jobs.
 *
 * This is not a component by itself, it's directly created from the job manager.
 * The job manager is also registering itself as an event handler and forwards
 * the events to this service.
 */
public class JobSchedulerImpl
    implements ConfigurationChangeListener,
               ResourceChangeListener, ExternalResourceChangeListener,
               org.apache.sling.commons.scheduler.Job {

    private static final String PROPERTY_READ_JOB = "properties";

    private static final String PROPERTY_SCHEDULE_INDEX = "index";

    /** Default logger */
    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    /** Is this active? */
    private final AtomicBoolean active = new AtomicBoolean(false);

    /** Central job handling configuration. */
    private final JobManagerConfiguration configuration;

    /** Scheduler service. */
    private final Scheduler scheduler;

    /** Job manager. */
    private final JobManagerImpl jobManager;

    /** Scheduled job handler. */
    private final ScheduledJobHandler scheduledJobHandler;

    /** All scheduled jobs, by scheduler name */
    private final Map<String, ScheduledJobInfoImpl> scheduledJobs = new HashMap<String, ScheduledJobInfoImpl>();

    /**
     * Create the scheduler
     * @param configuration Central job manager configuration
     * @param scheduler The scheduler service
     * @param jobManager The job manager
     */
    public JobSchedulerImpl(final JobManagerConfiguration configuration,
            final Scheduler scheduler,
            final JobManagerImpl jobManager) {
        this.configuration = configuration;
        this.scheduler = scheduler;
        this.jobManager = jobManager;

        this.configuration.addListener(this);

        this.scheduledJobHandler = new ScheduledJobHandler(configuration, this);
    }

    /**
     * Deactivate this component.
     */
    public void deactivate() {
        this.configuration.removeListener(this);

        this.scheduledJobHandler.deactivate();

        if ( this.active.compareAndSet(true, false) ) {
            this.stopScheduling();
        }
        synchronized ( this.scheduledJobs ) {
            this.scheduledJobs.clear();
        }
    }

    /**
     * @see org.apache.sling.event.impl.jobs.config.ConfigurationChangeListener#configurationChanged(boolean)
     */
    @Override
    public void configurationChanged(final boolean processingActive) {
        // scheduling is only active if
        // - processing is active and
        // - configuration is still available and active
        // - and current instance is leader
        final boolean schedulingActive;
        if ( processingActive ) {
            final TopologyCapabilities caps = this.configuration.getTopologyCapabilities();
            if ( caps != null && caps.isActive() ) {
                schedulingActive = caps.isLeader();
            } else {
                schedulingActive = false;
            }
        } else {
            schedulingActive = false;
        }

        // switch activation based on current state and new state
        if ( schedulingActive ) {
            // activate if inactive
            if ( this.active.compareAndSet(false, true) ) {
                this.startScheduling();
            }
        } else {
            // deactivate if active
            if ( this.active.compareAndSet(true, false) ) {
                this.stopScheduling();
            }
        }
    }

    /**
     * Start all scheduled jobs
     */
    private void startScheduling() {
        synchronized ( this.scheduledJobs ) {
            for(final ScheduledJobInfo info : this.scheduledJobs.values()) {
                this.startScheduledJob(((ScheduledJobInfoImpl)info));
            }
        }
    }

    /**
     * Stop all scheduled jobs.
     */
    private void stopScheduling() {
        synchronized ( this.scheduledJobs ) {
            for(final ScheduledJobInfo info : this.scheduledJobs.values()) {
                this.stopScheduledJob((ScheduledJobInfoImpl)info);
            }
        }
    }

    /**
     * Add a scheduled job
     */
    public void scheduleJob(final ScheduledJobInfoImpl info) {
        synchronized ( this.scheduledJobs ) {
            this.scheduledJobs.put(info.getName(), info);
            this.startScheduledJob(info);
        }
    }

    /**
     * Unschedule a scheduled job
     */
    public void unscheduleJob(final ScheduledJobInfoImpl info) {
        synchronized ( this.scheduledJobs ) {
            if ( this.scheduledJobs.remove(info.getName()) != null ) {
                this.stopScheduledJob(info);
            }
        }
    }

    /**
     * Remove a scheduled job
     */
    public void removeJob(final ScheduledJobInfoImpl info) {
        this.unscheduleJob(info);
        this.scheduledJobHandler.remove(info);
    }

    /**
     * Start a scheduled job
     * @param info The scheduling info
     */
    private void startScheduledJob(final ScheduledJobInfoImpl info) {
        if ( this.active.get() ) {
            if ( !info.isSuspended() ) {
                this.configuration.getAuditLogger().debug("SCHEDULED OK name={}, topic={}, properties={} : {}",
                        new Object[] {info.getName(),
                                      info.getJobTopic(),
                                      info.getJobProperties()},
                                      info.getSchedules());
                int index = 0;
                for(final ScheduleInfo si : info.getSchedules()) {
                    final String name = info.getSchedulerJobId() + "-" + String.valueOf(index);
                    ScheduleOptions options = null;
                    switch ( si.getType() ) {
                        case DAILY:
                        case WEEKLY:
                        case HOURLY:
                        case MONTHLY:
                        case YEARLY:
                        case CRON:
                            options = this.scheduler.EXPR(((ScheduleInfoImpl)si).getCronExpression());

                            break;
                        case DATE:
                            options = this.scheduler.AT(((ScheduleInfoImpl)si).getNextScheduledExecution());
                            break;
                    }
                    // Create configuration for scheduled job
                    final Map<String, Serializable> config = new HashMap<String, Serializable>();
                    config.put(PROPERTY_READ_JOB, info);
                    config.put(PROPERTY_SCHEDULE_INDEX, index);
                    this.scheduler.schedule(this, options.name(name).config(config).canRunConcurrently(false));
                    index++;
                }
            } else {
                this.configuration.getAuditLogger().debug("SCHEDULED SUSPENDED name={}, topic={}, properties={} : {}",
                        new Object[] {info.getName(),
                                      info.getJobTopic(),
                                      info.getJobProperties(),
                                      info.getSchedules()});
            }
        }
    }

    /**
     * Stop a scheduled job
     * @param info The scheduling info
     */
    private void stopScheduledJob(final ScheduledJobInfoImpl info) {
        final Scheduler localScheduler = this.scheduler;
        if ( localScheduler != null ) {
            this.configuration.getAuditLogger().debug("SCHEDULED STOP name={}, topic={}, properties={} : {}",
                    new Object[] {info.getName(),
                                  info.getJobTopic(),
                                  info.getJobProperties(),
                                  info.getSchedules()});
            for(int index = 0; index<info.getSchedules().size(); index++) {
                final String name = info.getSchedulerJobId() + "-" + String.valueOf(index);
                localScheduler.unschedule(name);
            }
        }
    }

    /**
     * @see org.apache.sling.commons.scheduler.Job#execute(org.apache.sling.commons.scheduler.JobContext)
     */
    @Override
    public void execute(final JobContext context) {
        if ( !active.get() ) {
            // not active anymore, simply return
            return;
        }
        final ScheduledJobInfoImpl info = (ScheduledJobInfoImpl) context.getConfiguration().get(PROPERTY_READ_JOB);

        if ( info.isSuspended() ) {
            return;
        }

        this.jobManager.addJob(info.getJobTopic(), info.getJobProperties());
        final int index = (Integer)context.getConfiguration().get(PROPERTY_SCHEDULE_INDEX);
        final Iterator<ScheduleInfo> iter = info.getSchedules().iterator();
        ScheduleInfo si = iter.next();
        for(int i=0; i<index; i++) {
            si = iter.next();
        }
        // if scheduled once (DATE), remove from schedule
        if ( si.getType() == ScheduleType.DATE ) {
            if ( index == 0 && info.getSchedules().size() == 1 ) {
                // remove
                this.scheduledJobHandler.remove(info);
            } else {
                // update schedule list
                final List<ScheduleInfo> infos = new ArrayList<ScheduleInfo>();
                for(final ScheduleInfo i : info.getSchedules() ) {
                    if ( i != si ) { // no need to use equals
                        infos.add(i);
                    }
                }
                info.update(infos);
                this.scheduledJobHandler.updateSchedule(info.getName(), infos);
            }
        }
    }

    /**
     * @see org.osgi.service.event.EventHandler#handleEvent(org.osgi.service.event.Event)
     */
    public void handleEvent(final Event event) {
        if ( ResourceHelper.BUNDLE_EVENT_STARTED.equals(event.getTopic())
             || ResourceHelper.BUNDLE_EVENT_UPDATED.equals(event.getTopic()) ) {
            this.scheduledJobHandler.bundleEvent();
        }
    }

    /**
     * Helper method which just logs the exception in debug mode.
     * @param e
     */
    private void ignoreException(final Exception e) {
        if ( this.logger.isDebugEnabled() ) {
            this.logger.debug("Ignored exception " + e.getMessage(), e);
        }
    }

    /**
     * Create a schedule builder for a currently scheduled job
     */
    public JobBuilder.ScheduleBuilder createJobBuilder(final ScheduledJobInfoImpl info) {
        final JobBuilder.ScheduleBuilder sb = new JobScheduleBuilderImpl(info.getJobTopic(),
                info.getJobProperties(), info.getName(), this);
        return (info.isSuspended() ? sb.suspend() : sb);
    }

    private enum Operation {
        LESS,
        LESS_OR_EQUALS,
        EQUALS,
        GREATER_OR_EQUALS,
        GREATER
    }

    /**
     * Check if the job matches the template
     */
    private boolean match(final ScheduledJobInfoImpl job, final Map<String, Object> template) {
        if ( template != null ) {
            for(final Map.Entry<String, Object> current : template.entrySet()) {
                final String key = current.getKey();
                final char firstChar = key.length() > 0 ? key.charAt(0) : 0;
                final String propName;
                final Operation op;
                if ( firstChar == '=' ) {
                    propName = key.substring(1);
                    op  = Operation.EQUALS;
                } else if ( firstChar == '<' ) {
                    final char secondChar = key.length() > 1 ? key.charAt(1) : 0;
                    if ( secondChar == '=' ) {
                        op = Operation.LESS_OR_EQUALS;
                        propName = key.substring(2);
                    } else {
                        op = Operation.LESS;
                        propName = key.substring(1);
                    }
                } else if ( firstChar == '>' ) {
                    final char secondChar = key.length() > 1 ? key.charAt(1) : 0;
                    if ( secondChar == '=' ) {
                        op = Operation.GREATER_OR_EQUALS;
                        propName = key.substring(2);
                    } else {
                        op = Operation.GREATER;
                        propName = key.substring(1);
                    }
                } else {
                    propName = key;
                    op  = Operation.EQUALS;
                }
                final Object value = current.getValue();

                if ( op == Operation.EQUALS ) {
                    if ( !value.equals(job.getJobProperties().get(propName)) ) {
                        return false;
                    }
                } else {
                    if ( value instanceof Comparable ) {
                        @SuppressWarnings({ "unchecked", "rawtypes" })
                        final int result = ((Comparable)value).compareTo(job.getJobProperties().get(propName));
                        if ( op == Operation.LESS && result > -1 ) {
                            return false;
                        } else if ( op == Operation.LESS_OR_EQUALS && result > 0 ) {
                            return false;
                        } else if ( op == Operation.GREATER_OR_EQUALS && result < 0 ) {
                            return false;
                        } else if ( op == Operation.GREATER && result < 1 ) {
                            return false;
                        }
                    } else {
                        // if the value is not comparable we simply don't match
                        return false;
                    }
                }
            }
        }
        return true;
    }

    /**
     * Get all scheduled jobs
     */
    public Collection<ScheduledJobInfo> getScheduledJobs(final String topic,
            final long limit,
            final Map<String, Object>... templates) {
        final List<ScheduledJobInfo> jobs = new ArrayList<ScheduledJobInfo>();
        long count = 0;
        synchronized ( this.scheduledJobs ) {
            for(final ScheduledJobInfoImpl job : this.scheduledJobs.values() ) {
                boolean add = true;
                if ( topic != null && !topic.equals(job.getJobTopic()) ) {
                    add = false;
                }
                if ( add && templates != null && templates.length != 0 ) {
                    add = false;
                    for (Map<String,Object> template : templates) {
                        add = this.match(job, template);
                        if ( add ) {
                            break;
                        }
                    }
                }
                if ( add ) {
                    jobs.add(job);
                    count++;
                    if ( limit > 0 && count == limit ) {
                        break;
                    }
                }
            }
        }
        return jobs;
    }

    /**
     * Change the suspended flag for a scheduled job
     * @param info The schedule info
     * @param flag The corresponding flag
     */
    public void setSuspended(final ScheduledJobInfoImpl info, final boolean flag) {
        final ResourceResolver resolver = configuration.createResourceResolver();
        try {
            final StringBuilder sb = new StringBuilder(this.configuration.getScheduledJobsPath(true));
            sb.append(ResourceHelper.filterName(info.getName()));
            final String path = sb.toString();

            final Resource eventResource = resolver.getResource(path);
            if ( eventResource != null ) {
                final ModifiableValueMap mvm = eventResource.adaptTo(ModifiableValueMap.class);
                if ( flag ) {
                    mvm.put(ResourceHelper.PROPERTY_SCHEDULE_SUSPENDED, Boolean.TRUE);
                } else {
                    mvm.remove(ResourceHelper.PROPERTY_SCHEDULE_SUSPENDED);
                }
                resolver.commit();
            }
            if ( flag ) {
                this.stopScheduledJob(info);
            } else {
                this.startScheduledJob(info);
            }
        } catch (final PersistenceException pe) {
            // we ignore the exception if removing fails
            ignoreException(pe);
        } finally {
            resolver.close();
        }
    }

    /**
     * Add a scheduled job
     * @param topic The job topic
     * @param properties The job properties
     * @param scheduleName The schedule name
     * @param isSuspended Whether it is suspended
     * @param scheduleInfos The scheduling information
     * @param errors Optional list to contain potential errors
     * @return A new job info or {@code null}
     */
    public ScheduledJobInfo addScheduledJob(final String topic,
            final Map<String, Object> properties,
            final String scheduleName,
            final boolean isSuspended,
            final List<ScheduleInfoImpl> scheduleInfos,
            final List<String> errors) {
        final List<String> msgs = new ArrayList<String>();
        if ( scheduleName == null || scheduleName.length() == 0 ) {
            msgs.add("Schedule name not specified");
        }
        final String errorMessage = Utility.checkJob(topic, properties);
        if ( errorMessage != null ) {
            msgs.add(errorMessage);
        }
        if ( scheduleInfos.size() == 0 ) {
            msgs.add("No schedule defined for " + scheduleName);
        }
        for(final ScheduleInfoImpl info : scheduleInfos) {
            info.check(msgs);
        }
        if ( msgs.size() == 0 ) {
            try {
                final ScheduledJobInfo info = this.scheduledJobHandler.addOrUpdateJob(topic, properties, scheduleName, isSuspended, scheduleInfos);
                if ( info != null ) {
                    return info;
                }
                msgs.add("Unable to persist scheduled job.");
            } catch ( final PersistenceException pe) {
                msgs.add("Unable to persist scheduled job: " + scheduleName);
                logger.warn("Unable to persist scheduled job", pe);
            }
        } else {
            for(final String msg : msgs) {
                logger.warn(msg);
            }
        }
        if ( errors != null ) {
            errors.addAll(msgs);
        }
        return null;
    }

    public void maintenance() {
        this.scheduledJobHandler.maintenance();
    }

    /**
     * @see org.apache.sling.api.resource.observation.ResourceChangeListener#onChange(java.util.List)
     */
    @Override
    public void onChange(List<ResourceChange> changes) {
        for(final ResourceChange change : changes ) {
            if ( change.getPath() != null && change.getPath().startsWith(this.configuration.getScheduledJobsPath(true)) ) {
                if ( change.getType() == ResourceChange.ChangeType.REMOVED ) {
                    // removal
                    logger.debug("Remove scheduled job {}", change.getPath());
                    this.scheduledJobHandler.handleRemove(change.getPath());
                } else {
                    // add or update
                    logger.debug("Add or update scheduled job {}, event {}", change.getPath(), change.getType());
                    this.scheduledJobHandler.handleAddUpdate(change.getPath());
                }
            }
        }
    }
}
