package brooklyn.location.basic.jclouds.pool;

import static brooklyn.location.basic.jclouds.pool.MachinePoolPredicates.compose;
import static brooklyn.location.basic.jclouds.pool.MachinePoolPredicates.matching;

import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

import org.jclouds.compute.ComputeService;
import org.jclouds.compute.RunNodesException;
import org.jclouds.compute.domain.ComputeMetadata;
import org.jclouds.compute.domain.NodeMetadata;
import org.jclouds.compute.domain.Template;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;

/**
 * Contains details of machines detected at a given cloud (ComputeService),
 * and records claims made against those machines via this pool.
 * <p>
 * Machine instances themselves are persisted and rescanned as new instances of this class are created.
 * Claims however are specific to this instance of the class, i.e. <b>not</b> persisted.
 * <p>
 * This class is believed to be thread-safe.
 * Refreshes to the remote detected machines are synchronized on the pool instance.
 * Details of detected and claimed machines are also synchronized on the pool instance.
 * (If it is necessary to claim machines whilst the pool is being rescanned,
 * we can investigate a more sophisticated threading model.
 * Access to some fields is clearly independent and uses a tighter synchonization
 * strategy, e.g. templates.  
 * Synchronization of fields within a synch block on the class instance
 * is permitted, but not the other way round,
 * and synching on multiple fields is also not permitted.)
 * <p>
 * Callers wishing to guarantee results of e.g. ensureUnclaimed remaining available
 * can synchronize on this class for the duration that they wish to have that guarantee
 * (at the cost, of course, of any other threads being able to access this pool).
 * <p>
 * If underlying provisioning/destroying operations fail, the pool
 * currently may be in an unknown state, currently.
 * If more robustness is needed this can be added.
 */
public class MachinePool {
    
    private static final Logger log = LoggerFactory.getLogger(MachinePool.class);
    
    protected final ComputeService computeService;
    final AtomicBoolean refreshNeeded = new AtomicBoolean(true);
    final List<ReusableMachineTemplate> templates = new ArrayList<ReusableMachineTemplate>();
    String poolName = null;
    
    /** all machines detected, less those in the black list */
    volatile MachineSet detectedMachines = new MachineSet();
    volatile MachineSet matchedMachines = new MachineSet();
    volatile MachineSet claimedMachines = new MachineSet();
    volatile MachineSet blacklistedMachines = new MachineSet();
    
    public MachinePool(ComputeService computeService) {
        this.computeService = computeService;
    }
    
    protected synchronized void init() {
        if (!refreshNeeded.get()) return;
        refresh();
    }
    
    public void setPoolName(String poolName) {
        if (poolName!=null)
            log.warn("Changing pool name of "+this+" (from "+this.poolName+" to "+poolName+") is discouraged.");
        this.poolName = poolName;
    }
    /** pool name is used as a group/label by jclouds, for convenience only; 
     * it has no special properties for detecting matching instances
     * (use explicit tags on the templates, for that). 
     * defaults to name of pool class and user name.
     * callers should set pool name before getting, if using a custom name. */
    public synchronized String getPoolName() {
        if (poolName==null)
            poolName = getClass().getSimpleName()+"-"+System.getProperty("user.name");
        return poolName;
    }
    
    /** refreshes the pool of machines from the server (finding all instances matching the registered templates) */
    public synchronized void refresh() {
        refreshNeeded.set(false);
        Set<? extends ComputeMetadata> computes = computeService.listNodes();
        Set<NodeMetadata> nodes = new LinkedHashSet<NodeMetadata>();
        for (ComputeMetadata c: computes) {
            if (c instanceof NodeMetadata) {
                nodes.add((NodeMetadata)c);
            } else {
                // TODO should we try to fetch more info?
                log.warn("MachinePool "+this+" ignoring non-Node record for remote machine: "+c);
            }
        }

        MachineSet allNewDetectedMachines = new MachineSet(nodes);
        MachineSet newDetectedMachines = filterForAllowedMachines(allNewDetectedMachines);
        MachineSet oldDetectedMachines;
        oldDetectedMachines = detectedMachines;
        detectedMachines = newDetectedMachines;

        MachineSet appearedMachinesIncludingBlacklist = allNewDetectedMachines.removed(oldDetectedMachines);
        MachineSet appearedMachines = filterForAllowedMachines(appearedMachinesIncludingBlacklist);
        if (appearedMachinesIncludingBlacklist.size()>appearedMachines.size())
            if (log.isDebugEnabled()) log.debug("Pool "+this+", ignoring "+(appearedMachinesIncludingBlacklist.size()-appearedMachines.size())+" disallowed");
        int matchedAppeared = 0;
        for (NodeMetadata m: appearedMachines) {
            Set<ReusableMachineTemplate> ts = getTemplatesMatchingInstance(m);
            if (!ts.isEmpty()) {
                matchedAppeared++;
                if (log.isDebugEnabled()) 
                    log.debug("Pool "+this+", newly detected machine "+m+", matches pool templates "+ts);
            } else {
                if (log.isDebugEnabled()) 
                    log.debug("Pool "+this+", newly detected machine "+m+", does not match any pool templates");
            }
        }
        if (matchedAppeared>0) {
            log.info("Pool "+this+" discovered "+matchedAppeared+" matching machines (of "+appearedMachines.size()+" total new; "+newDetectedMachines.size()+" total including claimed and unmatched)");
        } else {
            if (log.isDebugEnabled()) 
                log.debug("Pool "+this+" discovered "+matchedAppeared+" matching machines (of "+appearedMachines.size()+" total new; "+newDetectedMachines.size()+" total including claimed and unmatched)");
        }
    }

    protected MachineSet filterForAllowedMachines(MachineSet input) {
        return input.removed(blacklistedMachines);
    }

    // TODO template registry and claiming from a template could be a separate responsibility
    
    protected ReusableMachineTemplate registerTemplate(ReusableMachineTemplate template) {
        registerTemplates(template);
        return template;
    }
    protected void registerTemplates(ReusableMachineTemplate ...templatesToReg) {
        synchronized (templates) { 
            for (ReusableMachineTemplate template: templatesToReg)
                templates.add(template); 
        }
    }
    
    protected ReusableMachineTemplate newTemplate(String name) {
        return registerTemplate(new ReusableMachineTemplate(name));
    }

    
    public List<ReusableMachineTemplate> getTemplates() {
        List<ReusableMachineTemplate> result;
        synchronized (templates) { result = ImmutableList.copyOf(templates); }
        return result;
    }
    
    /** all machines matching any templates */
    public MachineSet all() {
        init();
        return detectedMachines;
    }

    /** machines matching any templates which have not been claimed */
    public MachineSet unclaimed() {
        init();
        synchronized (this) {
            return detectedMachines.removed(claimedMachines);
        }
    }
    
    /** returns all machines matching the given criteria (may be claimed) */
    @SuppressWarnings("unchecked")
    public MachineSet all(Predicate<NodeMetadata> criterion) {
        // To avoid generics complaints in callers caused by varargs, overload here
        return all(new Predicate[] {criterion});
    }
    
    /** returns all machines matching the given criteria (may be claimed) */
    public MachineSet all(Predicate<NodeMetadata> ...ops) {
        return new MachineSet(Iterables.filter(all(), compose(ops)));
    }

    /** returns unclaimed machines matching the given criteria */
    @SuppressWarnings("unchecked")
    public MachineSet unclaimed(Predicate<NodeMetadata> criterion) {
        // To avoid generics complaints in callers caused by varargs, overload here
        return unclaimed(new Predicate[] {criterion});
    }
    
    /** returns unclaimed machines matching the given criteria */
    public MachineSet unclaimed(Predicate<NodeMetadata> ...criteria) {
        return new MachineSet(Iterables.filter(unclaimed(), compose(criteria)));
    }

    /** creates machines if necessary so that this spec exists (may already be claimed however) 
     * returns a set of all matching machines, guaranteed non-empty 
     * (but possibly some are already claimed) */
    public MachineSet ensureExists(ReusableMachineTemplate template) {
        return ensureExists(1, template);
    }

    public synchronized void addToBlacklist(MachineSet newToBlacklist) {
        setBlacklist(blacklistedMachines.added(newToBlacklist));
    }
    
    /** replaces the blacklist set; callers should generally perform a refresh()
     * afterwards, to trigger re-detection of blacklisted machines
     */
    public synchronized void setBlacklist(MachineSet newBlacklist) {
        blacklistedMachines = newBlacklist;
        detectedMachines = detectedMachines.removed(blacklistedMachines);
    }
    
    /** creates machines if necessary so that this spec exists (may already be claimed however);
     * returns a set of all matching machines, of size at least count (but possibly some are already claimed).
     * (the pool can change at any point, so this set is a best-effort but may be out of date.
     * see javadoc comments on this class.) */
    public MachineSet ensureExists(int count, ReusableMachineTemplate template) {
        MachineSet current;
        current = all(matching(template));
        if (current.size() >= count)
            return current;
        //have to create more
        MachineSet moreNeeded = create(count-current.size(), template);
        return current.added(moreNeeded);
    }
    
    /** creates machines if necessary so that this spec can subsequently be claimed;
     * returns all such unclaimed machines, guaranteed to be non-empty.
    * (the pool can change at any point, so this set is a best-effort but may be out of date.
    * see javadoc comments on this class.) */
    public MachineSet ensureUnclaimed(ReusableMachineTemplate template) {
        return ensureUnclaimed(1, template);
    }

    /** creates machines if necessary so that this spec can subsequently be claimed;
     * returns a set of at least count unclaimed machines */
    public MachineSet ensureUnclaimed(int count, ReusableMachineTemplate template) {
        MachineSet current;
        current = unclaimed(matching(template));
        if (current.size() >= count)
            return current;
        //have to create more
        MachineSet moreNeeded = create(count-current.size(), template);
        return current.added(moreNeeded);
    }

    public Set<ReusableMachineTemplate> getTemplatesMatchingInstance(NodeMetadata nm) {
        Set<ReusableMachineTemplate> result = new LinkedHashSet<ReusableMachineTemplate>(); 
        for (ReusableMachineTemplate t: getTemplates()) {
            if (matching(t).apply(nm)) {
               result.add(t); 
            }
        }        
        return result;
    }
    
    /** creates the given number of machines of the indicated template */
    public MachineSet create(int count, ReusableMachineTemplate template) {
        Set<? extends NodeMetadata> nodes;
        try {
            Template t = template.newJcloudsTemplate(computeService);
            if (log.isDebugEnabled()) log.debug("Creating "+count+" new instances of "+t);
            nodes = computeService.createNodesInGroup(getPoolName(), count, t);
        } catch (RunNodesException e) {
            throw Throwables.propagate(e);
        }
        MachineSet result = new MachineSet(nodes);
        registerNewNodes(result, template);
        return result;
    }
    protected void registerNewNodes(MachineSet result, ReusableMachineTemplate template) {
        for (NodeMetadata m: result) {
            Set<ReusableMachineTemplate> ts = getTemplatesMatchingInstance(m);
            if (ts.isEmpty()) {
                log.error("Pool "+this+", created machine "+m+" from template "+template+", but no pool templates match!");
            } else {
                if (log.isDebugEnabled())
                    log.debug("Pool "+this+", created machine "+m+" from template "+template+", matching templates "+ts);
            }
        }
        synchronized (this) {
            detectedMachines = detectedMachines.added(result);
        }
    }

    /** claims the indicated number of machines with the indicated spec, creating if necessary */
    public MachineSet claim(int count, ReusableMachineTemplate t) {
        init();
        Set<NodeMetadata> claiming = new LinkedHashSet<NodeMetadata>();
        while (claiming.size() < count) {
            MachineSet mm = ensureUnclaimed(count - claiming.size(), t);
            for (NodeMetadata m : mm) {
                synchronized (this) {
                    if (claiming.size() < count && !claimedMachines.contains(m)) {
                        claiming.add(m);
                        claimedMachines = claimedMachines.added(new MachineSet(m));
                    }
                }
            }
        }
        MachineSet result = new MachineSet(claiming);
        return result;
    }


    /** claims the indicated set of machines;
     * throws exception if cannot all be claimed;
     * returns the set passed in if successful */
    public MachineSet claim(MachineSet set) {
        init();
        synchronized (this) {
            MachineSet originalClaimed = claimedMachines;
            claimedMachines = claimedMachines.added(set);
            MachineSet newlyClaimed = claimedMachines.removed(originalClaimed);
            if (newlyClaimed.size() != set.size()) {
                //did not claim all; unclaim and fail
                claimedMachines = originalClaimed;
                MachineSet unavailable = set.removed(newlyClaimed); 
                throw new IllegalArgumentException("Could not claim all requested machines; failed to claim "+unavailable);
            }
            return newlyClaimed;
        }
    }
    
    public int unclaim(MachineSet set) {
        init();
        synchronized (this) {
            MachineSet originalClaimed = claimedMachines;
            claimedMachines = claimedMachines.removed(set);
            return originalClaimed.size() - claimedMachines.size();
        }
    }

    
    public int destroy(final MachineSet set) {
        init();
        synchronized (this) {
            detectedMachines = detectedMachines.removed(set);
            claimedMachines = claimedMachines.removed(set);
        }
        Set<? extends NodeMetadata> destroyed = computeService.destroyNodesMatching(new Predicate<NodeMetadata>() {
            @Override
            public boolean apply(NodeMetadata input) {
                return set.contains(input);
            }
        });
        synchronized (this) {
            //in case a rescan happened while we were destroying
            detectedMachines = detectedMachines.removed(set);
            claimedMachines = claimedMachines.removed(set);
        }
        return destroyed.size();        
    }
        
    
}
