/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 *  with the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdds.scm.server;

import com.google.common.annotations.VisibleForTesting;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ScmOps;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeProtocolServer
    .NodeRegistrationContainerReport;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * StorageContainerManager enters chill mode on startup to allow system to
 * reach a stable state before becoming fully functional. SCM will wait
 * for certain resources to be reported before coming out of chill mode.
 *
 * ChillModeExitRule defines format to define new rules which must be satisfied
 * to exit Chill mode.
 * ContainerChillModeRule defines the only exit criteria right now.
 * On every new datanode registration event this class adds replicas
 * for reported containers and validates if cutoff threshold for
 * containers is meet.
 */
public class SCMChillModeManager implements
    EventHandler<NodeRegistrationContainerReport> {

  private static final Logger LOG =
      LoggerFactory.getLogger(SCMChillModeManager.class);
  private AtomicBoolean inChillMode = new AtomicBoolean(true);
  private AtomicLong containerWithMinReplicas = new AtomicLong(0);
  private Map<String, ChillModeExitRule> exitRules = new HashMap(1);
  private Configuration config;
  private static final String CONT_EXIT_RULE = "ContainerChillModeRule";
  private static final String DN_EXIT_RULE = "DataNodeChillModeRule";
  private final EventQueue eventPublisher;

  SCMChillModeManager(Configuration conf, List<ContainerInfo> allContainers,
      EventQueue eventQueue) {
    this.config = conf;
    this.eventPublisher = eventQueue;
    exitRules.put(CONT_EXIT_RULE,
        new ContainerChillModeRule(config, allContainers));
    exitRules.put(DN_EXIT_RULE, new DataNodeChillModeRule(config));
    if (!conf.getBoolean(HddsConfigKeys.HDDS_SCM_CHILLMODE_ENABLED,
        HddsConfigKeys.HDDS_SCM_CHILLMODE_ENABLED_DEFAULT)) {
      exitChillMode(eventQueue);
    }
    emitChillModeStatus();
  }

  /**
   * Emit Chill mode status.
   */
  @VisibleForTesting
  public void emitChillModeStatus() {
    eventPublisher.fireEvent(SCMEvents.CHILL_MODE_STATUS, inChillMode.get());
  }

  private void validateChillModeExitRules(EventPublisher eventQueue) {
    for (ChillModeExitRule exitRule : exitRules.values()) {
      if (!exitRule.validate()) {
        return;
      }
    }
    exitChillMode(eventQueue);
  }

  /**
   * Exit chill mode. It does following actions:
   * 1. Set chill mode status to fale.
   * 2. Emits START_REPLICATION for ReplicationManager.
   * 3. Cleanup resources.
   * 4. Emit chill mode status.
   * @param eventQueue
   */
  @VisibleForTesting
  public void exitChillMode(EventPublisher eventQueue) {
    LOG.info("SCM exiting chill mode.");
    setInChillMode(false);

    // TODO: Remove handler registration as there is no need to listen to
    // register events anymore.

    for (ChillModeExitRule e : exitRules.values()) {
      e.cleanup();
    }
    emitChillModeStatus();
  }

  @Override
  public void onMessage(
      NodeRegistrationContainerReport nodeRegistrationContainerReport,
      EventPublisher publisher) {
    if (getInChillMode()) {
      exitRules.get(CONT_EXIT_RULE).process(nodeRegistrationContainerReport);
      exitRules.get(DN_EXIT_RULE).process(nodeRegistrationContainerReport);
      validateChillModeExitRules(publisher);
    }
  }

  public boolean getInChillMode() {
    return inChillMode.get();
  }

  /**
   * Set chill mode status.
   */
  public void setInChillMode(boolean inChillMode) {
    this.inChillMode.set(inChillMode);
  }

  /**
   * Interface for defining chill mode exit rules.
   *
   * @param <T>
   */
  public interface ChillModeExitRule<T> {

    boolean validate();

    void process(T report);

    void cleanup();
  }

  /**
   * Class defining Chill mode exit criteria for Containers.
   */
  public class ContainerChillModeRule implements
      ChillModeExitRule<NodeRegistrationContainerReport> {

    // Required cutoff % for containers with at least 1 reported replica.
    private double chillModeCutoff;
    // Containers read from scm db.
    private Map<Long, ContainerInfo> containerMap;
    private double maxContainer;

    public ContainerChillModeRule(Configuration conf,
        List<ContainerInfo> containers) {
      chillModeCutoff = conf
          .getDouble(HddsConfigKeys.HDDS_SCM_CHILLMODE_THRESHOLD_PCT,
              HddsConfigKeys.HDDS_SCM_CHILLMODE_THRESHOLD_PCT_DEFAULT);
      containerMap = new ConcurrentHashMap<>();
      if(containers != null) {
        containers.forEach(c -> {
          if (c != null) {
            containerMap.put(c.getContainerID(), c);
          }
        });
        maxContainer = containers.size();
      }
    }

    @Override
    public boolean validate() {
      if (maxContainer == 0) {
        return true;
      }
      return getCurrentContainerThreshold() >= chillModeCutoff;
    }

    @VisibleForTesting
    public double getCurrentContainerThreshold() {
      if (maxContainer == 0) {
        return 1;
      }
      return (containerWithMinReplicas.doubleValue() / maxContainer);
    }

    @Override
    public void process(NodeRegistrationContainerReport reportsProto) {
      if (maxContainer == 0) {
        // No container to check.
        return;
      }

      reportsProto.getReport().getReportsList().forEach(c -> {
        if (containerMap.containsKey(c.getContainerID())) {
          if(containerMap.remove(c.getContainerID()) != null) {
            containerWithMinReplicas.getAndAdd(1);
          }
        }
      });
      if(inChillMode.get()) {
        LOG.info("SCM in chill mode. {} % containers have at least one"
                + " reported replica.",
            (containerWithMinReplicas.get() / maxContainer) * 100);
      }
    }

    @Override
    public void cleanup() {
      containerMap.clear();
    }
  }

  /**
   * Class defining Chill mode exit criteria according to number of DataNodes
   * registered with SCM.
   */
  public class DataNodeChillModeRule implements
      ChillModeExitRule<NodeRegistrationContainerReport> {

    // Min DataNodes required to exit chill mode.
    private int requiredDns;
    private int registeredDns = 0;
    // Set to track registered DataNodes.
    private HashSet<UUID> registeredDnSet;

    public DataNodeChillModeRule(Configuration conf) {
      requiredDns = conf
          .getInt(HddsConfigKeys.HDDS_SCM_CHILLMODE_MIN_DATANODE,
              HddsConfigKeys.HDDS_SCM_CHILLMODE_MIN_DATANODE_DEFAULT);
      registeredDnSet = new HashSet<>(requiredDns * 2);
    }

    @Override
    public boolean validate() {
      return registeredDns >= requiredDns;
    }

    @VisibleForTesting
    public double getRegisteredDataNodes() {
      return registeredDns;
    }

    @Override
    public void process(NodeRegistrationContainerReport reportsProto) {
      if (requiredDns == 0) {
        // No dn check required.
        return;
      }

      if(inChillMode.get()) {
        registeredDnSet.add(reportsProto.getDatanodeDetails().getUuid());
        registeredDns = registeredDnSet.size();
        LOG.info("SCM in chill mode. {} DataNodes registered, {} required.",
            registeredDns, requiredDns);
      }
    }

    @Override
    public void cleanup() {
      registeredDnSet.clear();
    }
  }

  @VisibleForTesting
  public static Logger getLogger() {
    return LOG;
  }

  @VisibleForTesting
  public double getCurrentContainerThreshold() {
    return ((ContainerChillModeRule) exitRules.get(CONT_EXIT_RULE))
        .getCurrentContainerThreshold();
  }

  /**
   * Operations restricted in SCM chill mode.
   */
  public static class ChillModeRestrictedOps {
    private static EnumSet restrictedOps =  EnumSet.noneOf(ScmOps.class);

    static {
      restrictedOps.add(ScmOps.allocateBlock);
      restrictedOps.add(ScmOps.allocateContainer);
    }

    public static boolean isRestrictedInChillMode(ScmOps opName) {
      return restrictedOps.contains(opName);
    }
  }

}
