/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.storm.scheduler.resource;

import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import org.apache.storm.scheduler.Cluster;
import org.apache.storm.scheduler.ExecutorDetails;
import org.apache.storm.scheduler.SchedulerAssignment;
import org.apache.storm.scheduler.SupervisorDetails;
import org.apache.storm.scheduler.WorkerSlot;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RasNodes {

    private static final Logger LOG = LoggerFactory.getLogger(RasNodes.class);
    private Map<String, RasNode> nodeMap;

    public RasNodes(Cluster cluster) {
        this.nodeMap = getAllNodesFrom(cluster);
    }

    public static Map<String, RasNode> getAllNodesFrom(Cluster cluster) {

        //A map of node ids to node objects
        Map<String, RasNode> nodeIdToNode = new HashMap<>();
        //A map of assignments organized by node with the following format:
        //{nodeId -> {topologyId -> {workerId -> {execs}}}}
        Map<String, Map<String, Map<String, Collection<ExecutorDetails>>>> assignmentRelationshipMap = new HashMap<>();

        Map<String, Map<String, WorkerSlot>> workerIdToWorker = new HashMap<>();
        for (SchedulerAssignment assignment : cluster.getAssignments().values()) {
            String topId = assignment.getTopologyId();

            for (Map.Entry<WorkerSlot, Collection<ExecutorDetails>> entry :
                assignment.getSlotToExecutors().entrySet()) {
                WorkerSlot slot = entry.getKey();
                String nodeId = slot.getNodeId();
                if (!assignmentRelationshipMap.containsKey(nodeId)) {
                    assignmentRelationshipMap.put(
                        nodeId, new HashMap<String, Map<String, Collection<ExecutorDetails>>>());
                    workerIdToWorker.put(nodeId, new HashMap<String, WorkerSlot>());
                }
                workerIdToWorker.get(nodeId).put(slot.getId(), slot);
                if (!assignmentRelationshipMap.get(nodeId).containsKey(topId)) {
                    assignmentRelationshipMap
                        .get(nodeId)
                        .put(topId, new HashMap<String, Collection<ExecutorDetails>>());
                }
                if (!assignmentRelationshipMap.get(nodeId).get(topId).containsKey(slot.getId())) {
                    assignmentRelationshipMap
                        .get(nodeId)
                        .get(topId)
                        .put(slot.getId(), new LinkedList<ExecutorDetails>());
                }
                Collection<ExecutorDetails> execs = entry.getValue();
                assignmentRelationshipMap.get(nodeId).get(topId).get(slot.getId()).addAll(execs);
            }
        }

        for (SupervisorDetails sup : cluster.getSupervisors().values()) {
            //Initialize a worker slot for every port even if there is no assignment to it
            for (int port : sup.getAllPorts()) {
                WorkerSlot worker = new WorkerSlot(sup.getId(), port);
                if (!workerIdToWorker.containsKey(sup.getId())) {
                    workerIdToWorker.put(sup.getId(), new HashMap<String, WorkerSlot>());
                }
                if (!workerIdToWorker.get(sup.getId()).containsKey(worker.getId())) {
                    workerIdToWorker.get(sup.getId()).put(worker.getId(), worker);
                }
            }
            nodeIdToNode.put(
                sup.getId(),
                new RasNode(
                    sup.getId(),
                    sup,
                    cluster,
                    workerIdToWorker.get(sup.getId()),
                    assignmentRelationshipMap.get(sup.getId())));
        }

        //Add in supervisors that might have crashed but workers are still alive
        for (Map.Entry<String, Map<String, Map<String, Collection<ExecutorDetails>>>> entry :
            assignmentRelationshipMap.entrySet()) {
            String nodeId = entry.getKey();
            Map<String, Map<String, Collection<ExecutorDetails>>> assignments = entry.getValue();
            if (!nodeIdToNode.containsKey(nodeId)) {
                LOG.info(
                    "Found an assigned slot(s) on a dead supervisor {} with assignments {}",
                    nodeId,
                    assignments);
                nodeIdToNode.put(
                    nodeId, new RasNode(nodeId, null, cluster, workerIdToWorker.get(nodeId), assignments));
            }
        }
        return nodeIdToNode;
    }

    /**
     * get node object from nodeId.
     */
    public RasNode getNodeById(String nodeId) {
        return this.nodeMap.get(nodeId);
    }

    /**
     * Free everything on the given slots.
     *
     * @param workerSlots the slots to free
     */
    public void freeSlots(Collection<WorkerSlot> workerSlots) {
        for (RasNode node : nodeMap.values()) {
            for (WorkerSlot ws : node.getUsedSlots()) {
                if (workerSlots.contains(ws)) {
                    LOG.debug("freeing ws {} on node {}", ws, node);
                    node.free(ws);
                }
            }
        }
    }

    public Collection<RasNode> getNodes() {
        return this.nodeMap.values();
    }

    @Override
    public String toString() {
        StringBuilder ret = new StringBuilder();
        for (RasNode node : nodeMap.values()) {
            ret.append(node).append("\n");
        }
        return ret.toString();
    }
}
