/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.storm.scheduler.resource;

import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.storm.scheduler.Cluster;
import org.apache.storm.scheduler.ExecutorDetails;
import org.apache.storm.scheduler.SupervisorDetails;
import org.apache.storm.scheduler.TopologyDetails;
import org.apache.storm.scheduler.WorkerSlot;
import org.apache.storm.scheduler.resource.normalization.NormalizedResourceOffer;
import org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Represents a single node in the cluster.
 */
public class RAS_Node {
    private static final Logger LOG = LoggerFactory.getLogger(RAS_Node.class);
    private final String nodeId;
    private final Cluster cluster;
    private final Set<WorkerSlot> originallyFreeSlots;
    //A map consisting of all workers on the node.
    //The key of the map is the worker id and the value is the corresponding workerslot object
    private Map<String, WorkerSlot> slots = new HashMap<>();
    // A map describing which topologies are using which slots on this node.  The format of the map is the following:
    // {TopologyId -> {WorkerId -> {Executors}}}
    private Map<String, Map<String, Collection<ExecutorDetails>>> topIdToUsedSlots = new HashMap<>();
    private String hostname;
    private boolean isAlive;
    private SupervisorDetails sup;
    private boolean loggedUnderageUsage = false;

    /**
     * Create a new node.
     * @param nodeId the id of the node.
     * @param sup the supervisor this is for.
     * @param cluster the cluster this is a part of.
     * @param workerIdToWorker the mapping of slots already assigned to this node.
     * @param assignmentMap the mapping of executors already assigned to this node.
     */
    public RAS_Node(
        String nodeId,
        SupervisorDetails sup,
        Cluster cluster,
        Map<String, WorkerSlot> workerIdToWorker,
        Map<String, Map<String, Collection<ExecutorDetails>>> assignmentMap) {
        //Node ID and supervisor ID are the same.
        this.nodeId = nodeId;
        if (sup == null) {
            isAlive = false;
        } else {
            isAlive = !cluster.isBlackListed(this.nodeId);
        }

        this.cluster = cluster;

        // initialize slots for this node
        if (workerIdToWorker != null) {
            slots = workerIdToWorker;
        }

        //initialize assignment map
        if (assignmentMap != null) {
            topIdToUsedSlots = assignmentMap;
        }

        //check if node is alive
        if (isAlive && sup != null) {
            hostname = sup.getHost();
            this.sup = sup;
        }

        HashSet<String> freeById = new HashSet<>(slots.keySet());
        if (assignmentMap != null) {
            for (Map<String, Collection<ExecutorDetails>> assignment : assignmentMap.values()) {
                freeById.removeAll(assignment.keySet());
            }
        }
        originallyFreeSlots = new HashSet<>();
        for (WorkerSlot slot : slots.values()) {
            if (freeById.contains(slot.getId())) {
                originallyFreeSlots.add(slot);
            }
        }
    }

    public String getId() {
        return nodeId;
    }

    public String getHostname() {
        return hostname;
    }

    private Collection<WorkerSlot> workerIdsToWorkers(Collection<String> workerIds) {
        Collection<WorkerSlot> ret = new LinkedList<>();
        for (String workerId : workerIds) {
            ret.add(slots.get(workerId));
        }
        return ret;
    }

    /**
     * Get the IDs of all free slots on this node.
     * @return the ids of the free slots.
     */
    public Collection<String> getFreeSlotsId() {
        if (!isAlive) {
            return new HashSet<>();
        }
        Set<String> ret = new HashSet<>(slots.keySet());
        ret.removeAll(getUsedSlotsId());
        return ret;
    }

    public Collection<WorkerSlot> getSlotsAvailableToScheduleOn() {
        return originallyFreeSlots;
    }

    public Collection<WorkerSlot> getFreeSlots() {
        return workerIdsToWorkers(getFreeSlotsId());
    }

    private Collection<String> getUsedSlotsId() {
        Collection<String> ret = new LinkedList<>();
        for (Map<String, Collection<ExecutorDetails>> entry : topIdToUsedSlots.values()) {
            ret.addAll(entry.keySet());
        }
        return ret;
    }

    public Collection<WorkerSlot> getUsedSlots() {
        return workerIdsToWorkers(getUsedSlotsId());
    }

    /**
     * Get slots used by the given topology.
     * @param topId the id of the topology to get.
     * @return the slots currently assigned to that topology on this node.
     */
    public Collection<WorkerSlot> getUsedSlots(String topId) {
        Collection<WorkerSlot> ret = null;
        if (topIdToUsedSlots.get(topId) != null) {
            ret = workerIdsToWorkers(topIdToUsedSlots.get(topId).keySet());
        }
        return ret;
    }

    public boolean isAlive() {
        return isAlive;
    }

    /**
     * Get a collection of the topology ids currently running on this node.
     */
    public Collection<String> getRunningTopologies() {
        return topIdToUsedSlots.keySet();
    }

    public boolean isTotallyFree() {
        return getUsedSlots().isEmpty();
    }

    public int totalSlotsFree() {
        return getFreeSlots().size();
    }

    public int totalSlotsUsed() {
        return getUsedSlots().size();
    }

    public int totalSlotsUsed(String topId) {
        return getUsedSlots(topId).size();
    }

    public int totalSlots() {
        return slots.size();
    }

    /**
     * Free all slots on this node. This will update the Cluster too.
     */
    public void freeAllSlots() {
        if (!isAlive) {
            LOG.warn("Freeing all slots on a dead node {} ", nodeId);
        }
        cluster.freeSlots(slots.values());
        //clearing assignments
        topIdToUsedSlots.clear();
    }

    /**
     * frees a single executor.
     *
     * @param exec is the executor to free
     * @param topo the topology the executor is a part of
     */
    public void freeSingleExecutor(ExecutorDetails exec, TopologyDetails topo) {
        Map<String, Collection<ExecutorDetails>> usedSlots = topIdToUsedSlots.get(topo.getId());
        if (usedSlots == null) {
            throw new IllegalArgumentException("Topology " + topo + " is not assigned");
        }
        WorkerSlot ws = null;
        Set<ExecutorDetails> updatedAssignment = new HashSet<>();
        for (Entry<String, Collection<ExecutorDetails>> entry : usedSlots.entrySet()) {
            if (entry.getValue().contains(exec)) {
                ws = slots.get(entry.getKey());
                updatedAssignment.addAll(entry.getValue());
                updatedAssignment.remove(exec);
                break;
            }
        }

        if (ws == null) {
            throw new IllegalArgumentException(
                "Executor " + exec + " is not assinged on this node to " + topo);
        }
        free(ws);
        if (!updatedAssignment.isEmpty()) {
            assign(ws, topo, updatedAssignment);
        }
    }

    /**
     * Frees a single slot in this node.
     *
     * @param ws the slot to free
     */
    public void free(WorkerSlot ws) {
        LOG.debug("freeing WorkerSlot {} on node {}", ws, hostname);
        if (!slots.containsKey(ws.getId())) {
            throw new IllegalArgumentException(
                "Tried to free a slot " + ws + " that was not" + " part of this node " + nodeId);
        }

        TopologyDetails topo = findTopologyUsingWorker(ws);
        if (topo == null) {
            throw new IllegalArgumentException("Tried to free a slot " + ws + " that was already free!");
        }

        //free slot
        cluster.freeSlot(ws);
        //cleanup internal assignments
        topIdToUsedSlots.get(topo.getId()).remove(ws.getId());
    }

    /**
     * Find a which topology is running on a worker slot.
     *
     * @return the topology using the worker slot. If worker slot is free then return null
     */
    private TopologyDetails findTopologyUsingWorker(WorkerSlot ws) {
        for (Entry<String, Map<String, Collection<ExecutorDetails>>> entry :
            topIdToUsedSlots.entrySet()) {
            String topoId = entry.getKey();
            Set<String> workerIds = entry.getValue().keySet();
            for (String workerId : workerIds) {
                if (ws.getId().equals(workerId)) {
                    return cluster.getTopologies().getById(topoId);
                }
            }
        }
        return null;
    }

    /**
     * Assigns a worker to a node.
     *
     * @param target    the worker slot to assign the executors
     * @param td        the topology the executors are from
     * @param executors executors to assign to the specified worker slot
     */
    public void assign(WorkerSlot target, TopologyDetails td, Collection<ExecutorDetails> executors) {
        if (!isAlive) {
            throw new IllegalStateException("Trying to adding to a dead node " + nodeId);
        }
        Collection<WorkerSlot> freeSlots = getFreeSlots();
        if (freeSlots.isEmpty()) {
            throw new IllegalStateException("Trying to assign to a full node " + nodeId);
        }
        if (executors.size() == 0) {
            LOG.warn("Trying to assign nothing from " + td.getId() + " to " + nodeId + " (Ignored)");
        }
        if (target == null) {
            target = getFreeSlots().iterator().next();
        }
        if (!freeSlots.contains(target)) {
            throw new IllegalStateException(
                "Trying to assign already used slot " + target.getPort() + " on node " + nodeId);
        }
        LOG.debug("target slot: {}", target);

        cluster.assign(target, td.getId(), executors);

        //assigning internally
        topIdToUsedSlots.computeIfAbsent(td.getId(), (tid) -> new HashMap<>())
            .computeIfAbsent(target.getId(), (tid) -> new LinkedList<>())
            .addAll(executors);
    }

    /**
     * Assign a single executor to a slot, even if other things are in the slot.
     * @param ws the slot to assign it to.
     * @param exec the executor to assign.
     * @param td the topology for the executor.
     */
    public void assignSingleExecutor(WorkerSlot ws, ExecutorDetails exec, TopologyDetails td) {
        if (!isAlive) {
            throw new IllegalStateException("Trying to adding to a dead node " + nodeId);
        }
        Collection<WorkerSlot> freeSlots = getFreeSlots();
        Set<ExecutorDetails> toAssign = new HashSet<>();
        toAssign.add(exec);
        if (!freeSlots.contains(ws)) {
            Map<String, Collection<ExecutorDetails>> usedSlots = topIdToUsedSlots.get(td.getId());
            if (usedSlots == null) {
                throw new IllegalArgumentException(
                    "Slot " + ws + " is not availble to schedue " + exec + " on");
            }
            Collection<ExecutorDetails> alreadyHere = usedSlots.get(ws.getId());
            if (alreadyHere == null) {
                throw new IllegalArgumentException(
                    "Slot " + ws + " is not availble to schedue " + exec + " on");
            }
            toAssign.addAll(alreadyHere);
            free(ws);
        }
        assign(ws, td, toAssign);
    }

    /**
     * Would scheduling exec in ws fit with the current resource constraints.
     *
     * @param ws   the slot to possibly put exec in
     * @param exec the executor to possibly place in ws
     * @param td   the topology exec is a part of
     * @return true if it would fit else false
     */
    public boolean wouldFit(WorkerSlot ws, ExecutorDetails exec, TopologyDetails td) {
        assert nodeId.equals(ws.getNodeId()) : "Slot " + ws + " is not a part of this node " + nodeId;
        return isAlive
               && cluster.wouldFit(
            ws,
            exec,
            td,
            getTotalAvailableResources(),
            td.getTopologyWorkerMaxHeapSize()
        );
    }

    /**
     * Is there any possibility that exec could ever fit on this node.
     * @param exec the executor to schedule
     * @param td the topology the executor is a part of
     * @return true if there is the possibility it might fit, no guarantee that it will, or false if there is no
     *     way it would ever fit.
     */
    public boolean couldEverFit(ExecutorDetails exec, TopologyDetails td) {
        if (!isAlive) {
            return false;
        }
        NormalizedResourceOffer avail = getTotalAvailableResources();
        NormalizedResourceRequest requestedResources = td.getTotalResources(exec);
        return avail.couldFit(cluster.getMinWorkerCpu(), requestedResources);
    }

    @Override
    public boolean equals(Object other) {
        if (other instanceof RAS_Node) {
            return nodeId.equals(((RAS_Node) other).nodeId);
        }
        return false;
    }

    @Override
    public int hashCode() {
        return nodeId.hashCode();
    }

    @Override
    public String toString() {
        return "{Node: "
               + ((sup == null) ? "null (possibly down)" : sup.getHost())
               + ", Avail [ Mem: "
               + getAvailableMemoryResources()
               + ", CPU: "
               + getAvailableCpuResources()
               + ", Slots: "
               + this.getFreeSlots()
               + "] Total [ Mem: "
               + ((sup == null) ? "N/A" : this.getTotalMemoryResources())
               + ", CPU: "
               + ((sup == null) ? "N/A" : this.getTotalCpuResources())
               + ", Slots: "
               + this.slots.values()
               + " ]}";
    }

    /**
     * Gets the available memory resources for this node.
     *
     * @return the available memory for this node
     */
    public double getAvailableMemoryResources() {
        return getTotalAvailableResources().getTotalMemoryMb();
    }

    /**
     * Gets total resources for this node.
     */
    public NormalizedResourceOffer getTotalResources() {
        if (sup != null) {
            return sup.getTotalResources();
        } else {
            return new NormalizedResourceOffer();
        }
    }

    /**
     * Gets all available resources for this node.
     *
     * @return All of the available resources.
     */
    public NormalizedResourceOffer getTotalAvailableResources() {
        if (sup != null) {
            NormalizedResourceOffer availableResources = new NormalizedResourceOffer(sup.getTotalResources());
            if (availableResources.remove(cluster.getAllScheduledResourcesForNode(sup.getId()), cluster.getResourceMetrics())) {
                if (!loggedUnderageUsage) {
                    LOG.error("Resources on {} became negative and was clamped to 0 {}.", hostname, availableResources);
                    loggedUnderageUsage = true;
                }
            }
            return availableResources;
        } else {
            return new NormalizedResourceOffer();
        }
    }

    /**
     * Gets the total memory resources for this node.
     *
     * @return the total memory for this node
     */
    public double getTotalMemoryResources() {
        if (sup != null) {
            return sup.getTotalMemory();
        } else {
            return 0.0;
        }
    }

    /**
     * Gets the available cpu resources for this node.
     *
     * @return the available cpu for this node
     */
    public double getAvailableCpuResources() {
        return getTotalAvailableResources().getTotalCpu();
    }

    /**
     * Gets the total cpu resources for this node.
     *
     * @return the total cpu for this node
     */
    public double getTotalCpuResources() {
        if (sup != null) {
            return sup.getTotalCpu();
        } else {
            return 0.0;
        }
    }
}
