/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package backtype.storm.scheduler.multitenant;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import backtype.storm.Config;
import backtype.storm.scheduler.Cluster;
import backtype.storm.scheduler.ExecutorDetails;
import backtype.storm.scheduler.SchedulerAssignment;
import backtype.storm.scheduler.TopologyDetails;
import backtype.storm.scheduler.WorkerSlot;

/**
 * A pool of nodes that can be used to run topologies.
 */
public abstract class NodePool {
    protected Cluster _cluster;
    protected Map<String, Node> _nodeIdToNode;

    public static class NodeAndSlotCounts {
        public final int _nodes;
        public final int _slots;

        public NodeAndSlotCounts(int nodes, int slots) {
            _nodes = nodes;
            _slots = slots;
        }
    }

    /**
     * Place executors into slots in a round robin way, taking into account component spreading among different hosts.
     */
    public static class RoundRobinSlotScheduler {
        private Map<String, Set<String>> _nodeToComps;
        private HashMap<String, List<ExecutorDetails>> _spreadToSchedule;
        private LinkedList<Set<ExecutorDetails>> _slots;
        private Set<ExecutorDetails> _lastSlot;
        private Cluster _cluster;
        private String _topId;

        /**
         * Create a new scheduler for a given topology
         * 
         * @param td the topology to schedule
         * @param slotsToUse the number of slots to use for the executors left to schedule.
         * @param cluster the cluster to schedule this on.
         */
        public RoundRobinSlotScheduler(TopologyDetails td, int slotsToUse, Cluster cluster) {
            _topId = td.getId();
            _cluster = cluster;

            Map<ExecutorDetails, String> execToComp = td.getExecutorToComponent();
            SchedulerAssignment assignment = _cluster.getAssignmentById(_topId);
            _nodeToComps = new HashMap<String, Set<String>>();

            if (assignment != null) {
                Map<ExecutorDetails, WorkerSlot> execToSlot = assignment.getExecutorToSlot();

                for (Entry<ExecutorDetails, WorkerSlot> entry : execToSlot.entrySet()) {
                    String nodeId = entry.getValue().getNodeId();
                    Set<String> comps = _nodeToComps.get(nodeId);
                    if (comps == null) {
                        comps = new HashSet<String>();
                        _nodeToComps.put(nodeId, comps);
                    }
                    comps.add(execToComp.get(entry.getKey()));
                }
            }

            _spreadToSchedule = new HashMap<String, List<ExecutorDetails>>();
            List<String> spreadComps = (List<String>) td.getConf().get(Config.TOPOLOGY_SPREAD_COMPONENTS);
            if (spreadComps != null) {
                for (String comp : spreadComps) {
                    _spreadToSchedule.put(comp, new ArrayList<ExecutorDetails>());
                }
            }

            _slots = new LinkedList<Set<ExecutorDetails>>();
            for (int i = 0; i < slotsToUse; i++) {
                _slots.add(new HashSet<ExecutorDetails>());
            }

            int at = 0;
            for (Entry<String, List<ExecutorDetails>> entry : _cluster.getNeedsSchedulingComponentToExecutors(td).entrySet()) {
                LOG.debug("Scheduling for {}", entry.getKey());
                if (_spreadToSchedule.containsKey(entry.getKey())) {
                    LOG.debug("Saving {} for spread...", entry.getKey());
                    _spreadToSchedule.get(entry.getKey()).addAll(entry.getValue());
                } else {
                    for (ExecutorDetails ed : entry.getValue()) {
                        LOG.debug("Assigning {} {} to slot {}", new Object[] { entry.getKey(), ed, at });
                        _slots.get(at).add(ed);
                        at++;
                        if (at >= _slots.size()) {
                            at = 0;
                        }
                    }
                }
            }
            _lastSlot = _slots.get(_slots.size() - 1);
        }

        /**
         * Assign a slot to the given node.
         * 
         * @param n the node to assign a slot to.
         * @return true if there are more slots to assign else false.
         */
        public boolean assignSlotTo(Node n) {
            if (_slots.isEmpty()) {
                return false;
            }
            Set<ExecutorDetails> slot = _slots.pop();
            if (slot == _lastSlot) {
                // The last slot fill it up
                for (Entry<String, List<ExecutorDetails>> entry : _spreadToSchedule.entrySet()) {
                    if (entry.getValue().size() > 0) {
                        slot.addAll(entry.getValue());
                    }
                }
            } else {
                String nodeId = n.getId();
                Set<String> nodeComps = _nodeToComps.get(nodeId);
                if (nodeComps == null) {
                    nodeComps = new HashSet<String>();
                    _nodeToComps.put(nodeId, nodeComps);
                }
                for (Entry<String, List<ExecutorDetails>> entry : _spreadToSchedule.entrySet()) {
                    if (entry.getValue().size() > 0) {
                        String comp = entry.getKey();
                        if (!nodeComps.contains(comp)) {
                            nodeComps.add(comp);
                            slot.add(entry.getValue().remove(0));
                        }
                    }
                }
            }
            n.assign(_topId, slot, _cluster);
            return !_slots.isEmpty();
        }
    }

    private static final Logger LOG = LoggerFactory.getLogger(NodePool.class);

    /**
     * Initialize the pool.
     * 
     * @param cluster the cluster
     * @param nodeIdToNode the mapping of node id to nodes
     */
    public void init(Cluster cluster, Map<String, Node> nodeIdToNode) {
        _cluster = cluster;
        _nodeIdToNode = nodeIdToNode;
    }

    /**
     * Add a topology to the pool
     * 
     * @param td the topology to add.
     */
    public abstract void addTopology(TopologyDetails td);

    /**
     * Check if this topology can be added to this pool
     * 
     * @param td the topology
     * @return true if it can else false
     */
    public abstract boolean canAdd(TopologyDetails td);

    /**
     * @return the number of nodes that are available to be taken
     */
    public abstract int slotsAvailable();

    /**
     * Take nodes from this pool that can fulfill possibly up to the slotsNeeded
     * 
     * @param slotsNeeded the number of slots that are needed.
     * @return a Collection of nodes with the removed nodes in it. This may be empty, but should not be null.
     */
    public abstract Collection<Node> takeNodesBySlots(int slotsNeeded);

    /**
     * Get the number of nodes and slots this would provide to get the slots needed
     * 
     * @param slots the number of slots needed
     * @return the number of nodes and slots that would be returned.
     */
    public abstract NodeAndSlotCounts getNodeAndSlotCountIfSlotsWereTaken(int slots);

    /**
     * @return the number of nodes that are available to be taken
     */
    public abstract int nodesAvailable();

    /**
     * Take up to nodesNeeded from this pool
     * 
     * @param nodesNeeded the number of nodes that are needed.
     * @return a Collection of nodes with the removed nodes in it. This may be empty, but should not be null.
     */
    public abstract Collection<Node> takeNodes(int nodesNeeded);

    /**
     * Reschedule any topologies as needed.
     * 
     * @param lesserPools pools that may be used to steal nodes from.
     */
    public abstract void scheduleAsNeeded(NodePool... lesserPools);

    public static int slotsAvailable(NodePool[] pools) {
        int slotsAvailable = 0;
        for (NodePool pool : pools) {
            slotsAvailable += pool.slotsAvailable();
        }
        return slotsAvailable;
    }

    public static int nodesAvailable(NodePool[] pools) {
        int nodesAvailable = 0;
        for (NodePool pool : pools) {
            nodesAvailable += pool.nodesAvailable();
        }
        return nodesAvailable;
    }

    public static Collection<Node> takeNodesBySlot(int slotsNeeded, NodePool[] pools) {
        LOG.debug("Trying to grab {} free slots from {}", slotsNeeded, pools);
        HashSet<Node> ret = new HashSet<Node>();
        for (NodePool pool : pools) {
            Collection<Node> got = pool.takeNodesBySlots(slotsNeeded);
            ret.addAll(got);
            slotsNeeded -= Node.countFreeSlotsAlive(got);
            LOG.debug("Got {} nodes so far need {} more slots", ret.size(), slotsNeeded);
            if (slotsNeeded <= 0) {
                break;
            }
        }
        return ret;
    }

    public static Collection<Node> takeNodes(int nodesNeeded, NodePool[] pools) {
        LOG.debug("Trying to grab {} free nodes from {}", nodesNeeded, pools);
        HashSet<Node> ret = new HashSet<Node>();
        for (NodePool pool : pools) {
            Collection<Node> got = pool.takeNodes(nodesNeeded);
            ret.addAll(got);
            nodesNeeded -= got.size();
            LOG.debug("Got {} nodes so far need {} more nodes", ret.size(), nodesNeeded);
            if (nodesNeeded <= 0) {
                break;
            }
        }
        return ret;
    }

    public static int getNodeCountIfSlotsWereTaken(int slots, NodePool[] pools) {
        LOG.debug("How many nodes to get {} slots from {}", slots, pools);
        int total = 0;
        for (NodePool pool : pools) {
            NodeAndSlotCounts ns = pool.getNodeAndSlotCountIfSlotsWereTaken(slots);
            total += ns._nodes;
            slots -= ns._slots;
            LOG.debug("Found {} nodes so far {} more slots needed", total, slots);
            if (slots <= 0) {
                break;
            }
        }
        return total;
    }
}
