/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.yarn.server.resourcemanager;

import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;

import org.apache.hadoop.util.HostsFileReader;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.event.InlineDispatcher;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer
    .AllocationExpirationInfo;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeFinishedContainersPulledByAMEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

public class TestRMNodeTransitions {

  RMNodeImpl node;

  private RMContext rmContext;
  private YarnScheduler scheduler;

  private SchedulerEventType eventType;
  private List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
  
  private final class TestSchedulerEventDispatcher implements
  EventHandler<SchedulerEvent> {
    @Override
    public void handle(SchedulerEvent event) {
      scheduler.handle(event);
    }
  }

  private NodesListManagerEvent nodesListManagerEvent = null;
  private List<NodeState> nodesListManagerEventsNodeStateSequence =
      new LinkedList<>();

  private class TestNodeListManagerEventDispatcher implements
      EventHandler<NodesListManagerEvent> {
    
    @Override
    public void handle(NodesListManagerEvent event) {
      nodesListManagerEvent = event;
      nodesListManagerEventsNodeStateSequence.add(event.getNode().getState());
    }

  }

  @Before
  public void setUp() throws Exception {
    InlineDispatcher rmDispatcher = new InlineDispatcher();
    
    rmContext =
        new RMContextImpl(rmDispatcher, mock(ContainerAllocationExpirer.class),
          null, null, mock(DelegationTokenRenewer.class), null, null, null,
          null, null);
    NodesListManager nodesListManager = mock(NodesListManager.class);
    HostsFileReader reader = mock(HostsFileReader.class);
    when(nodesListManager.getHostsReader()).thenReturn(reader);
    ((RMContextImpl) rmContext).setNodesListManager(nodesListManager);
    scheduler = mock(YarnScheduler.class);
    doAnswer(
        new Answer<Void>() {

          @Override
          public Void answer(InvocationOnMock invocation) throws Throwable {
            final SchedulerEvent event = (SchedulerEvent)(invocation.getArguments()[0]);
            eventType = event.getType();
            if (eventType == SchedulerEventType.NODE_UPDATE) {
              List<UpdatedContainerInfo> lastestContainersInfoList = 
                  ((NodeUpdateSchedulerEvent)event).getRMNode().pullContainerUpdates();
              for(UpdatedContainerInfo lastestContainersInfo : lastestContainersInfoList) {
            	  completedContainers.addAll(lastestContainersInfo.getCompletedContainers()); 
              }
            }
            return null;
          }
        }
        ).when(scheduler).handle(any(SchedulerEvent.class));
    
    rmDispatcher.register(SchedulerEventType.class, 
        new TestSchedulerEventDispatcher());
    
    rmDispatcher.register(NodesListManagerEventType.class,
        new TestNodeListManagerEventDispatcher());

    NodeId nodeId = BuilderUtils.newNodeId("localhost", 0);
    node = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null);
    nodesListManagerEvent =  null;
    nodesListManagerEventsNodeStateSequence.clear();
  }
  
  @After
  public void tearDown() throws Exception {
  }
  
  private RMNodeStatusEvent getMockRMNodeStatusEvent(
      List<ContainerStatus> containerStatus) {
    NodeHealthStatus healthStatus = mock(NodeHealthStatus.class);
    Boolean yes = new Boolean(true);
    doReturn(yes).when(healthStatus).getIsNodeHealthy();
    
    RMNodeStatusEvent event = mock(RMNodeStatusEvent.class);
    doReturn(healthStatus).when(event).getNodeHealthStatus();
    doReturn(RMNodeEventType.STATUS_UPDATE).when(event).getType();
    if (containerStatus != null) {
      doReturn(containerStatus).when(event).getContainers();
    }
    return event;
  }
  
  private RMNodeStatusEvent getMockRMNodeStatusEventWithRunningApps() {
    NodeHealthStatus healthStatus = mock(NodeHealthStatus.class);
    Boolean yes = new Boolean(true);
    doReturn(yes).when(healthStatus).getIsNodeHealthy();

    RMNodeStatusEvent event = mock(RMNodeStatusEvent.class);
    doReturn(healthStatus).when(event).getNodeHealthStatus();
    doReturn(RMNodeEventType.STATUS_UPDATE).when(event).getType();
    doReturn(getAppIdList()).when(event).getKeepAliveAppIds();
    return event;
  }

  private List<ApplicationId> getAppIdList() {
    List<ApplicationId> appIdList = new ArrayList<ApplicationId>();
    appIdList.add(BuilderUtils.newApplicationId(0, 0));
    return appIdList;
  }

  private List<ContainerId> getContainerIdList() {
    List<ContainerId> containerIdList = new ArrayList<ContainerId>();
    containerIdList.add(BuilderUtils.newContainerId(BuilderUtils
        .newApplicationAttemptId(BuilderUtils.newApplicationId(0, 0), 0), 0));
    return containerIdList;
  }

  private RMNodeStatusEvent getMockRMNodeStatusEventWithoutRunningApps() {
    NodeHealthStatus healthStatus = mock(NodeHealthStatus.class);
    Boolean yes = new Boolean(true);
    doReturn(yes).when(healthStatus).getIsNodeHealthy();

    RMNodeStatusEvent event = mock(RMNodeStatusEvent.class);
    doReturn(healthStatus).when(event).getNodeHealthStatus();
    doReturn(RMNodeEventType.STATUS_UPDATE).when(event).getType();
    doReturn(null).when(event).getKeepAliveAppIds();
    return event;
  }

  @Test (timeout = 5000)
  public void testExpiredContainer() {
    NodeStatus mockNodeStatus = createMockNodeStatus();
    // Start the node
    node.handle(new RMNodeStartedEvent(null, null, null, mockNodeStatus));
    verify(scheduler).handle(any(NodeAddedSchedulerEvent.class));
    
    // Expire a container
    ContainerId completedContainerId = BuilderUtils.newContainerId(
        BuilderUtils.newApplicationAttemptId(
            BuilderUtils.newApplicationId(0, 0), 0), 0);
    node.handle(new RMNodeCleanContainerEvent(null, completedContainerId));
    Assert.assertEquals(1, node.getContainersToCleanUp().size());
    
    // Now verify that scheduler isn't notified of an expired container
    // by checking number of 'completedContainers' it got in the previous event
    RMNodeStatusEvent statusEvent = getMockRMNodeStatusEvent(null);
    ContainerStatus containerStatus = mock(ContainerStatus.class);
    doReturn(completedContainerId).when(containerStatus).getContainerId();
    doReturn(Collections.singletonList(containerStatus)).
        when(statusEvent).getContainers();
    node.handle(statusEvent);
    /* Expect the scheduler call handle function 2 times
     * 1. RMNode status from new to Running, handle the add_node event
     * 2. handle the node update event
     */
    verify(scheduler, times(1)).handle(any(NodeAddedSchedulerEvent.class));
    verify(scheduler, times(1)).handle(any(NodeUpdateSchedulerEvent.class));
  }

  @Test
  public void testStatusUpdateOnDecommissioningNode() {
    RMNodeImpl node = getDecommissioningNode();
    ClusterMetrics cm = ClusterMetrics.getMetrics();
    int initialActive = cm.getNumActiveNMs();
    int initialDecommissioning = cm.getNumDecommissioningNMs();
    int initialDecommissioned = cm.getNumDecommisionedNMs();
    Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
    // Verify node in DECOMMISSIONING won't be changed by status update
    // with running apps
    RMNodeStatusEvent statusEvent = getMockRMNodeStatusEventWithRunningApps();
    node.handle(statusEvent);
    Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
    Assert.assertEquals("Active Nodes", initialActive, cm.getNumActiveNMs());
    Assert.assertEquals("Decommissioning Nodes", initialDecommissioning,
        cm.getNumDecommissioningNMs());
    Assert.assertEquals("Decommissioned Nodes", initialDecommissioned,
        cm.getNumDecommisionedNMs());
  }

  @Test
  public void testRecommissionNode() {
    RMNodeImpl node = getDecommissioningNode();
    Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
    ClusterMetrics cm = ClusterMetrics.getMetrics();
    int initialActive = cm.getNumActiveNMs();
    int initialDecommissioning = cm.getNumDecommissioningNMs();
    node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.RECOMMISSION));
    Assert.assertEquals(NodeState.RUNNING, node.getState());
    Assert
        .assertEquals("Active Nodes", initialActive + 1, cm.getNumActiveNMs());
    Assert.assertEquals("Decommissioning Nodes", initialDecommissioning - 1,
        cm.getNumDecommissioningNMs());
  }

  @Test (timeout = 5000)
  public void testContainerUpdate() throws InterruptedException{
    NodeStatus mockNodeStatus = createMockNodeStatus();
    //Start the node
    node.handle(new RMNodeStartedEvent(null, null, null, mockNodeStatus));
    
    NodeId nodeId = BuilderUtils.newNodeId("localhost:1", 1);
    RMNodeImpl node2 = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null);
    node2.handle(new RMNodeStartedEvent(null, null, null, mockNodeStatus));

    ApplicationId app0 = BuilderUtils.newApplicationId(0, 0);
    ApplicationId app1 = BuilderUtils.newApplicationId(1, 1);
    ContainerId completedContainerIdFromNode1 = BuilderUtils.newContainerId(
        BuilderUtils.newApplicationAttemptId(app0, 0), 0);
    ContainerId completedContainerIdFromNode2_1 = BuilderUtils.newContainerId(
        BuilderUtils.newApplicationAttemptId(app1, 1), 1);
    ContainerId completedContainerIdFromNode2_2 = BuilderUtils.newContainerId(
        BuilderUtils.newApplicationAttemptId(app1, 1), 2);
    rmContext.getRMApps().put(app0, Mockito.mock(RMApp.class));
    rmContext.getRMApps().put(app1, Mockito.mock(RMApp.class));

    RMNodeStatusEvent statusEventFromNode1 = getMockRMNodeStatusEvent(null);
    RMNodeStatusEvent statusEventFromNode2_1 = getMockRMNodeStatusEvent(null);
    RMNodeStatusEvent statusEventFromNode2_2 = getMockRMNodeStatusEvent(null);

    ContainerStatus containerStatusFromNode1 = mock(ContainerStatus.class);
    ContainerStatus containerStatusFromNode2_1 = mock(ContainerStatus.class);
    ContainerStatus containerStatusFromNode2_2 = mock(ContainerStatus.class);

    doReturn(completedContainerIdFromNode1).when(containerStatusFromNode1)
        .getContainerId();
    doReturn(Collections.singletonList(containerStatusFromNode1))
        .when(statusEventFromNode1).getContainers();
    node.handle(statusEventFromNode1);
    Assert.assertEquals(1, completedContainers.size());
    Assert.assertEquals(completedContainerIdFromNode1,
        completedContainers.get(0).getContainerId());

    completedContainers.clear();

    doReturn(completedContainerIdFromNode2_1).when(containerStatusFromNode2_1)
        .getContainerId();
    doReturn(Collections.singletonList(containerStatusFromNode2_1))
        .when(statusEventFromNode2_1).getContainers();

    doReturn(completedContainerIdFromNode2_2).when(containerStatusFromNode2_2)
        .getContainerId();
    doReturn(Collections.singletonList(containerStatusFromNode2_2))
        .when(statusEventFromNode2_2).getContainers();

    node2.setNextHeartBeat(false);
    node2.handle(statusEventFromNode2_1);
    node2.setNextHeartBeat(true);
    node2.handle(statusEventFromNode2_2);

    Assert.assertEquals(2, completedContainers.size());
    Assert.assertEquals(completedContainerIdFromNode2_1,completedContainers.get(0)
        .getContainerId()); 
    Assert.assertEquals(completedContainerIdFromNode2_2,completedContainers.get(1)
        .getContainerId());
  }

  @Test (timeout = 5000)
  public void testStatusChange(){
    NodeStatus mockNodeStatus = createMockNodeStatus();
    //Start the node
    node.handle(new RMNodeStartedEvent(null, null, null, mockNodeStatus));
    //Add info to the queue first
    node.setNextHeartBeat(false);

    ContainerId completedContainerId1 = BuilderUtils.newContainerId(
        BuilderUtils.newApplicationAttemptId(
            BuilderUtils.newApplicationId(0, 0), 0), 0);
    ContainerId completedContainerId2 = BuilderUtils.newContainerId(
        BuilderUtils.newApplicationAttemptId(
            BuilderUtils.newApplicationId(1, 1), 1), 1);
        
    RMNodeStatusEvent statusEvent1 = getMockRMNodeStatusEvent(null);
    RMNodeStatusEvent statusEvent2 = getMockRMNodeStatusEvent(null);

    ContainerStatus containerStatus1 = mock(ContainerStatus.class);
    ContainerStatus containerStatus2 = mock(ContainerStatus.class);

    doReturn(completedContainerId1).when(containerStatus1).getContainerId();
    doReturn(Collections.singletonList(containerStatus1))
        .when(statusEvent1).getContainers();
     
    doReturn(completedContainerId2).when(containerStatus2).getContainerId();
    doReturn(Collections.singletonList(containerStatus2))
        .when(statusEvent2).getContainers();

    verify(scheduler, times(1)).handle(any(NodeAddedSchedulerEvent.class));
    node.handle(statusEvent1);
    node.handle(statusEvent2);
    verify(scheduler, times(1)).handle(any(NodeAddedSchedulerEvent.class));
    Assert.assertEquals(2, node.getQueueSize());
    node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.EXPIRE));
    Assert.assertEquals(0, node.getQueueSize());
  }

  @Test
  public void testRunningExpire() {
    RMNodeImpl node = getRunningNode();
    ClusterMetrics cm = ClusterMetrics.getMetrics();
    int initialActive = cm.getNumActiveNMs();
    int initialLost = cm.getNumLostNMs();
    int initialUnhealthy = cm.getUnhealthyNMs();
    int initialDecommissioned = cm.getNumDecommisionedNMs();
    int initialRebooted = cm.getNumRebootedNMs();
    node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.EXPIRE));
    Assert.assertEquals("Active Nodes", initialActive - 1, cm.getNumActiveNMs());
    Assert.assertEquals("Lost Nodes", initialLost + 1, cm.getNumLostNMs());
    Assert.assertEquals("Unhealthy Nodes",
        initialUnhealthy, cm.getUnhealthyNMs());
    Assert.assertEquals("Decommissioned Nodes",
        initialDecommissioned, cm.getNumDecommisionedNMs());
    Assert.assertEquals("Rebooted Nodes",
        initialRebooted, cm.getNumRebootedNMs());
    Assert.assertEquals(NodeState.LOST, node.getState());
  }

  @Test
  public void testRunningExpireMultiple() {
    RMNodeImpl node1 = getRunningNode(null, 10001);
    RMNodeImpl node2 = getRunningNode(null, 10002);
    ClusterMetrics cm = ClusterMetrics.getMetrics();
    int initialActive = cm.getNumActiveNMs();
    int initialLost = cm.getNumLostNMs();
    int initialUnhealthy = cm.getUnhealthyNMs();
    int initialDecommissioned = cm.getNumDecommisionedNMs();
    int initialRebooted = cm.getNumRebootedNMs();
    node1.handle(new RMNodeEvent(node1.getNodeID(), RMNodeEventType.EXPIRE));
    Assert.assertEquals("Active Nodes", initialActive - 1, cm.getNumActiveNMs());
    Assert.assertEquals("Lost Nodes", initialLost + 1, cm.getNumLostNMs());
    Assert.assertEquals("Unhealthy Nodes", initialUnhealthy,
        cm.getUnhealthyNMs());
    Assert.assertEquals("Decommissioned Nodes", initialDecommissioned,
        cm.getNumDecommisionedNMs());
    Assert.assertEquals("Rebooted Nodes", initialRebooted,
        cm.getNumRebootedNMs());
    Assert.assertEquals(NodeState.LOST, node1.getState());
    Assert.assertTrue("Node " + node1.toString() + " should be inactive",
        rmContext.getInactiveRMNodes().containsKey(node1.getNodeID()));
    Assert.assertFalse("Node " + node2.toString() + " should not be inactive",
        rmContext.getInactiveRMNodes().containsKey(node2.getNodeID()));

    node2.handle(new RMNodeEvent(node1.getNodeID(), RMNodeEventType.EXPIRE));
    Assert.assertEquals("Active Nodes", initialActive - 2, cm.getNumActiveNMs());
    Assert.assertEquals("Lost Nodes", initialLost + 2, cm.getNumLostNMs());
    Assert.assertEquals("Unhealthy Nodes", initialUnhealthy,
        cm.getUnhealthyNMs());
    Assert.assertEquals("Decommissioned Nodes", initialDecommissioned,
        cm.getNumDecommisionedNMs());
    Assert.assertEquals("Rebooted Nodes", initialRebooted,
        cm.getNumRebootedNMs());
    Assert.assertEquals(NodeState.LOST, node2.getState());
    Assert.assertTrue("Node " + node1.toString() + " should be inactive",
        rmContext.getInactiveRMNodes().containsKey(node1.getNodeID()));
    Assert.assertTrue("Node " + node2.toString() + " should be inactive",
        rmContext.getInactiveRMNodes().containsKey(node2.getNodeID()));
  }

  @Test
  public void testUnhealthyExpire() {
    RMNodeImpl node = getUnhealthyNode();
    ClusterMetrics cm = ClusterMetrics.getMetrics();
    int initialActive = cm.getNumActiveNMs();
    int initialLost = cm.getNumLostNMs();
    int initialUnhealthy = cm.getUnhealthyNMs();
    int initialDecommissioned = cm.getNumDecommisionedNMs();
    int initialRebooted = cm.getNumRebootedNMs();
    node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.EXPIRE));
    Assert.assertEquals("Active Nodes", initialActive, cm.getNumActiveNMs());
    Assert.assertEquals("Lost Nodes", initialLost + 1, cm.getNumLostNMs());
    Assert.assertEquals("Unhealthy Nodes",
        initialUnhealthy - 1, cm.getUnhealthyNMs());
    Assert.assertEquals("Decommissioned Nodes",
        initialDecommissioned, cm.getNumDecommisionedNMs());
    Assert.assertEquals("Rebooted Nodes",
        initialRebooted, cm.getNumRebootedNMs());
    Assert.assertEquals(NodeState.LOST, node.getState());
  }

  @Test
  public void testUnhealthyExpireForSchedulerRemove() {
    RMNodeImpl node = getUnhealthyNode();
    verify(scheduler, times(1)).handle(any(NodeRemovedSchedulerEvent.class));
    node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.EXPIRE));
    verify(scheduler, times(1)).handle(any(NodeRemovedSchedulerEvent.class));
    Assert.assertEquals(NodeState.LOST, node.getState());
  }

  @Test
  public void testRunningDecommission() {
    RMNodeImpl node = getRunningNode();
    ClusterMetrics cm = ClusterMetrics.getMetrics();
    int initialActive = cm.getNumActiveNMs();
    int initialLost = cm.getNumLostNMs();
    int initialUnhealthy = cm.getUnhealthyNMs();
    int initialDecommissioned = cm.getNumDecommisionedNMs();
    int initialRebooted = cm.getNumRebootedNMs();
    node.handle(new RMNodeEvent(node.getNodeID(),
        RMNodeEventType.DECOMMISSION));
    Assert.assertEquals("Active Nodes", initialActive - 1, cm.getNumActiveNMs());
    Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs());
    Assert.assertEquals("Unhealthy Nodes",
        initialUnhealthy, cm.getUnhealthyNMs());
    Assert.assertEquals("Decommissioned Nodes",
        initialDecommissioned + 1, cm.getNumDecommisionedNMs());
    Assert.assertEquals("Rebooted Nodes",
        initialRebooted, cm.getNumRebootedNMs());
    Assert.assertEquals(NodeState.DECOMMISSIONED, node.getState());
  }

  @Test
  public void testDecommissionOnDecommissioningNode() {
    RMNodeImpl node = getDecommissioningNode();
    ClusterMetrics cm = ClusterMetrics.getMetrics();
    int initialActive = cm.getNumActiveNMs();
    int initialLost = cm.getNumLostNMs();
    int initialUnhealthy = cm.getUnhealthyNMs();
    int initialDecommissioned = cm.getNumDecommisionedNMs();
    int initialRebooted = cm.getNumRebootedNMs();
    int initialDecommissioning = cm.getNumDecommissioningNMs();
    node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.DECOMMISSION));
    Assert.assertEquals("Active Nodes", initialActive, cm.getNumActiveNMs());
    Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs());
    Assert.assertEquals("Unhealthy Nodes", initialUnhealthy,
        cm.getUnhealthyNMs());
    Assert.assertEquals("Decommissioning Nodes", initialDecommissioning - 1,
        cm.getNumDecommissioningNMs());
    Assert.assertEquals("Decommissioned Nodes", initialDecommissioned + 1,
        cm.getNumDecommisionedNMs());
    Assert.assertEquals("Rebooted Nodes", initialRebooted,
        cm.getNumRebootedNMs());
    Assert.assertEquals(NodeState.DECOMMISSIONED, node.getState());
  }

  @Test
  public void testUnhealthyDecommission() {
    RMNodeImpl node = getUnhealthyNode();
    ClusterMetrics cm = ClusterMetrics.getMetrics();
    int initialActive = cm.getNumActiveNMs();
    int initialLost = cm.getNumLostNMs();
    int initialUnhealthy = cm.getUnhealthyNMs();
    int initialDecommissioned = cm.getNumDecommisionedNMs();
    int initialRebooted = cm.getNumRebootedNMs();
    node.handle(new RMNodeEvent(node.getNodeID(),
        RMNodeEventType.DECOMMISSION));
    Assert.assertEquals("Active Nodes", initialActive, cm.getNumActiveNMs());
    Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs());
    Assert.assertEquals("Unhealthy Nodes",
        initialUnhealthy - 1, cm.getUnhealthyNMs());
    Assert.assertEquals("Decommissioned Nodes",
        initialDecommissioned + 1, cm.getNumDecommisionedNMs());
    Assert.assertEquals("Rebooted Nodes",
        initialRebooted, cm.getNumRebootedNMs());
    Assert.assertEquals(NodeState.DECOMMISSIONED, node.getState());
  }

  // Test Decommissioning on a unhealthy node will make it decommissioning.
  @Test
  public void testUnhealthyDecommissioning() {
    RMNodeImpl node = getUnhealthyNode();
    ClusterMetrics cm = ClusterMetrics.getMetrics();
    int initialActive = cm.getNumActiveNMs();
    int initialLost = cm.getNumLostNMs();
    int initialUnhealthy = cm.getUnhealthyNMs();
    int initialDecommissioned = cm.getNumDecommisionedNMs();
    int initialDecommissioning = cm.getNumDecommissioningNMs();
    int initialRebooted = cm.getNumRebootedNMs();
    node.handle(new RMNodeEvent(node.getNodeID(),
        RMNodeEventType.GRACEFUL_DECOMMISSION));
    Assert.assertEquals("Active Nodes", initialActive,
        cm.getNumActiveNMs());
    Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs());
    Assert.assertEquals("Unhealthy Nodes",
        initialUnhealthy - 1, cm.getUnhealthyNMs());
    Assert.assertEquals("Decommissioned Nodes", initialDecommissioned,
        cm.getNumDecommisionedNMs());
    Assert.assertEquals("Decommissioning Nodes", initialDecommissioning + 1,
        cm.getNumDecommissioningNMs());
    Assert.assertEquals("Rebooted Nodes",
        initialRebooted, cm.getNumRebootedNMs());
    Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
  }

  @Test
  public void testRunningRebooting() {
    RMNodeImpl node = getRunningNode();
    ClusterMetrics cm = ClusterMetrics.getMetrics();
    int initialActive = cm.getNumActiveNMs();
    int initialLost = cm.getNumLostNMs();
    int initialUnhealthy = cm.getUnhealthyNMs();
    int initialDecommissioned = cm.getNumDecommisionedNMs();
    int initialRebooted = cm.getNumRebootedNMs();
    node.handle(new RMNodeEvent(node.getNodeID(),
        RMNodeEventType.REBOOTING));
    Assert.assertEquals("Active Nodes", initialActive - 1, cm.getNumActiveNMs());
    Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs());
    Assert.assertEquals("Unhealthy Nodes",
        initialUnhealthy, cm.getUnhealthyNMs());
    Assert.assertEquals("Decommissioned Nodes",
        initialDecommissioned, cm.getNumDecommisionedNMs());
    Assert.assertEquals("Rebooted Nodes",
        initialRebooted + 1, cm.getNumRebootedNMs());
    Assert.assertEquals(NodeState.REBOOTED, node.getState());
  }

  @Test
  public void testUnhealthyRebooting() {
    RMNodeImpl node = getUnhealthyNode();
    ClusterMetrics cm = ClusterMetrics.getMetrics();
    int initialActive = cm.getNumActiveNMs();
    int initialLost = cm.getNumLostNMs();
    int initialUnhealthy = cm.getUnhealthyNMs();
    int initialDecommissioned = cm.getNumDecommisionedNMs();
    int initialRebooted = cm.getNumRebootedNMs();
    node.handle(new RMNodeEvent(node.getNodeID(),
        RMNodeEventType.REBOOTING));
    Assert.assertEquals("Active Nodes", initialActive, cm.getNumActiveNMs());
    Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs());
    Assert.assertEquals("Unhealthy Nodes",
        initialUnhealthy - 1, cm.getUnhealthyNMs());
    Assert.assertEquals("Decommissioned Nodes",
        initialDecommissioned, cm.getNumDecommisionedNMs());
    Assert.assertEquals("Rebooted Nodes",
        initialRebooted + 1, cm.getNumRebootedNMs());
    Assert.assertEquals(NodeState.REBOOTED, node.getState());
  }

  @Test
  public void testAddUnhealthyNode() {
    ClusterMetrics cm = ClusterMetrics.getMetrics();
    int initialUnhealthy = cm.getUnhealthyNMs();
    int initialActive = cm.getNumActiveNMs();
    int initialLost = cm.getNumLostNMs();
    int initialDecommissioned = cm.getNumDecommisionedNMs();
    int initialRebooted = cm.getNumRebootedNMs();

    NodeHealthStatus status = NodeHealthStatus.newInstance(false, "sick",
        System.currentTimeMillis());
    NodeStatus nodeStatus = NodeStatus.newInstance(node.getNodeID(), 0,
        new ArrayList<>(), null, status, null, null, null);
    node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null,
        nodeStatus));

    Assert.assertEquals("Unhealthy Nodes",
        initialUnhealthy + 1, cm.getUnhealthyNMs());
    Assert.assertEquals("Active Nodes", initialActive, cm.getNumActiveNMs());
    Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs());
    Assert.assertEquals("Decommissioned Nodes",
        initialDecommissioned, cm.getNumDecommisionedNMs());
    Assert.assertEquals("Rebooted Nodes",
        initialRebooted, cm.getNumRebootedNMs());
    Assert.assertEquals(NodeState.UNHEALTHY, node.getState());
  }

  @Test
  public void testNMShutdown() {
    RMNodeImpl node = getRunningNode();
    node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.SHUTDOWN));
    Assert.assertEquals(NodeState.SHUTDOWN, node.getState());
  }

  @Test
  public void testUnhealthyNMShutdown() {
    RMNodeImpl node = getUnhealthyNode();
    node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.SHUTDOWN));
    Assert.assertEquals(NodeState.SHUTDOWN, node.getState());
  }

  @Test(timeout=20000)
  public void testUpdateHeartbeatResponseForCleanup() {
    RMNodeImpl node = getRunningNode();
    NodeId nodeId = node.getNodeID();

    // Expire a container
    ContainerId completedContainerId = BuilderUtils.newContainerId(
        BuilderUtils.newApplicationAttemptId(
            BuilderUtils.newApplicationId(0, 0), 0), 0);
    node.handle(new RMNodeCleanContainerEvent(nodeId, completedContainerId));
    Assert.assertEquals(1, node.getContainersToCleanUp().size());

    // Finish an application
    ApplicationId finishedAppId = BuilderUtils.newApplicationId(0, 1);
    node.handle(new RMNodeCleanAppEvent(nodeId, finishedAppId));
    Assert.assertEquals(1, node.getAppsToCleanup().size());

    // Verify status update does not clear containers/apps to cleanup
    // but updating heartbeat response for cleanup does
    RMNodeStatusEvent statusEvent = getMockRMNodeStatusEvent(null);
    node.handle(statusEvent);
    Assert.assertEquals(1, node.getContainersToCleanUp().size());
    Assert.assertEquals(1, node.getAppsToCleanup().size());
    NodeHeartbeatResponse hbrsp = Records.newRecord(NodeHeartbeatResponse.class);
    node.setAndUpdateNodeHeartbeatResponse(hbrsp);
    Assert.assertEquals(0, node.getContainersToCleanUp().size());
    Assert.assertEquals(0, node.getAppsToCleanup().size());
    Assert.assertEquals(1, hbrsp.getContainersToCleanup().size());
    Assert.assertEquals(completedContainerId, hbrsp.getContainersToCleanup().get(0));
    Assert.assertEquals(1, hbrsp.getApplicationsToCleanup().size());
    Assert.assertEquals(finishedAppId, hbrsp.getApplicationsToCleanup().get(0));
  }

  @Test(timeout=20000)
  public void testUpdateHeartbeatResponseForAppLifeCycle() {
    RMNodeImpl node = getRunningNode();
    NodeId nodeId = node.getNodeID();

    ApplicationId runningAppId = BuilderUtils.newApplicationId(0, 1);
    rmContext.getRMApps().put(runningAppId, Mockito.mock(RMApp.class));
    // Create a running container
    ContainerId runningContainerId = BuilderUtils.newContainerId(
        BuilderUtils.newApplicationAttemptId(
        runningAppId, 0), 0);

    ContainerStatus status = ContainerStatus.newInstance(runningContainerId,
        ContainerState.RUNNING, "", 0);
    List<ContainerStatus> statusList = new ArrayList<ContainerStatus>();
    statusList.add(status);
    NodeHealthStatus nodeHealth = NodeHealthStatus.newInstance(true,
        "", System.currentTimeMillis());
    NodeStatus nodeStatus = NodeStatus.newInstance(nodeId, 0, statusList, null,
        nodeHealth, null, null, null);
    node.handle(new RMNodeStatusEvent(nodeId, nodeStatus, null));

    Assert.assertEquals(1, node.getRunningApps().size());

    // Finish an application
    ApplicationId finishedAppId = runningAppId;
    node.handle(new RMNodeCleanAppEvent(nodeId, finishedAppId));
    Assert.assertEquals(1, node.getAppsToCleanup().size());
    Assert.assertEquals(0, node.getRunningApps().size());
  }

  @Test
  public void testUnknownNodeId() {
    NodeId nodeId =
        NodesListManager.createUnknownNodeId("host1");
    RMNodeImpl node =
        new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null);
    rmContext.getInactiveRMNodes().putIfAbsent(nodeId,node);
    node.handle(
        new RMNodeEvent(node.getNodeID(), RMNodeEventType.DECOMMISSION));
    Assert.assertNull(
        "Must be null as there is no NODE_UNUSABLE update",
        nodesListManagerEvent);
  }

  private RMNodeImpl getRunningNode() {
    return getRunningNode(null, 0);
  }

  private RMNodeImpl getRunningNode(String nmVersion) {
    return getRunningNode(nmVersion, 0);
  }

  private RMNodeImpl getRunningNode(String nmVersion, int port) {
    NodeId nodeId = BuilderUtils.newNodeId("localhost", port);
    Resource capability = Resource.newInstance(4096, 4);
    RMNodeImpl node = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null,
        capability, nmVersion);
    NodeStatus mockNodeStatus = createMockNodeStatus();
    node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null,
        mockNodeStatus));
    Assert.assertEquals(NodeState.RUNNING, node.getState());
    return node;
  }

  private RMNodeImpl getDecommissioningNode() {
    RMNodeImpl node = getRunningNode();
    ClusterMetrics cm = ClusterMetrics.getMetrics();
    int initialActive = cm.getNumActiveNMs();
    int initialDecommissioning = cm.getNumDecommissioningNMs();
    node.handle(new RMNodeEvent(node.getNodeID(),
        RMNodeEventType.GRACEFUL_DECOMMISSION));
    Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
    Assert.assertEquals(Arrays.asList(NodeState.NEW, NodeState.RUNNING),
        nodesListManagerEventsNodeStateSequence);
    Assert
        .assertEquals("Active Nodes", initialActive - 1, cm.getNumActiveNMs());
    Assert.assertEquals("Decommissioning Nodes", initialDecommissioning + 1,
        cm.getNumDecommissioningNMs());
    return node;
  }

  private RMNodeImpl getUnhealthyNode() {
    RMNodeImpl node = getRunningNode();
    NodeHealthStatus status = NodeHealthStatus.newInstance(false, "sick",
        System.currentTimeMillis());
    NodeStatus nodeStatus = NodeStatus.newInstance(node.getNodeID(), 0,
      new ArrayList<ContainerStatus>(), null, status, null, null, null);
    node.handle(new RMNodeStatusEvent(node.getNodeID(), nodeStatus, null));
    Assert.assertEquals(NodeState.UNHEALTHY, node.getState());
    return node;
  }

  private RMNodeImpl getNewNode() {
    NodeId nodeId = BuilderUtils.newNodeId("localhost", 0);
    RMNodeImpl node = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null);
    return node;
  }

  private RMNodeImpl getNewNode(Resource capability) {
    NodeId nodeId = BuilderUtils.newNodeId("localhost", 0);
    RMNodeImpl node = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, 
        capability, null);
    return node;
  }

  private RMNodeImpl getRebootedNode() {
    NodeId nodeId = BuilderUtils.newNodeId("localhost", 0);
    Resource capability = Resource.newInstance(4096, 4);
    RMNodeImpl node = new RMNodeImpl(nodeId, rmContext,null, 0, 0,
        null, capability, null);
    NodeStatus mockNodeStatus = createMockNodeStatus();

    node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null,
        mockNodeStatus));
    Assert.assertEquals(NodeState.RUNNING, node.getState());
    node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.REBOOTING));
    Assert.assertEquals(NodeState.REBOOTED, node.getState());
    return node;
  }

  @Test
  public void testAdd() {
    RMNodeImpl node = getNewNode();
    ClusterMetrics cm = ClusterMetrics.getMetrics();
    int initialActive = cm.getNumActiveNMs();
    int initialLost = cm.getNumLostNMs();
    int initialUnhealthy = cm.getUnhealthyNMs();
    int initialDecommissioned = cm.getNumDecommisionedNMs();
    int initialRebooted = cm.getNumRebootedNMs();
    NodeStatus mockNodeStatus = createMockNodeStatus();
    node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null,
        mockNodeStatus));
    Assert.assertEquals("Active Nodes", initialActive + 1, cm.getNumActiveNMs());
    Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs());
    Assert.assertEquals("Unhealthy Nodes",
        initialUnhealthy, cm.getUnhealthyNMs());
    Assert.assertEquals("Decommissioned Nodes",
        initialDecommissioned, cm.getNumDecommisionedNMs());
    Assert.assertEquals("Rebooted Nodes",
        initialRebooted, cm.getNumRebootedNMs());
    Assert.assertEquals(NodeState.RUNNING, node.getState());
    Assert.assertNotNull(nodesListManagerEvent);
    Assert.assertEquals(NodesListManagerEventType.NODE_USABLE, 
        nodesListManagerEvent.getType());
  }

  @Test
  public void testReconnect() {
    RMNodeImpl node = getRunningNode();
    ClusterMetrics cm = ClusterMetrics.getMetrics();
    int initialActive = cm.getNumActiveNMs();
    int initialLost = cm.getNumLostNMs();
    int initialUnhealthy = cm.getUnhealthyNMs();
    int initialDecommissioned = cm.getNumDecommisionedNMs();
    int initialRebooted = cm.getNumRebootedNMs();
    node.handle(new RMNodeReconnectEvent(node.getNodeID(), node, null, null));
    Assert.assertEquals("Active Nodes", initialActive, cm.getNumActiveNMs());
    Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs());
    Assert.assertEquals("Unhealthy Nodes",
        initialUnhealthy, cm.getUnhealthyNMs());
    Assert.assertEquals("Decommissioned Nodes",
        initialDecommissioned, cm.getNumDecommisionedNMs());
    Assert.assertEquals("Rebooted Nodes",
        initialRebooted, cm.getNumRebootedNMs());
    Assert.assertEquals(NodeState.RUNNING, node.getState());
    Assert.assertNotNull(nodesListManagerEvent);
    Assert.assertEquals(NodesListManagerEventType.NODE_USABLE,
        nodesListManagerEvent.getType());
  }

  @Test
  public void testReconnectOnDecommissioningNode() {
    RMNodeImpl node = getDecommissioningNode();
    ClusterMetrics cm = ClusterMetrics.getMetrics();
    int initialActive = cm.getNumActiveNMs();
    int initialDecommissioning = cm.getNumDecommissioningNMs();
    int initialDecommissioned = cm.getNumDecommisionedNMs();

    // Reconnect event with running app
    node.handle(new RMNodeReconnectEvent(node.getNodeID(), node,
        getAppIdList(), null));
    // still decommissioning
    Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
    Assert.assertEquals("Active Nodes", initialActive, cm.getNumActiveNMs());
    Assert.assertEquals("Decommissioning Nodes", initialDecommissioning,
        cm.getNumDecommissioningNMs());
    Assert.assertEquals("Decommissioned Nodes", initialDecommissioned,
        cm.getNumDecommisionedNMs());

    // Reconnect event without any running app
    node.handle(new RMNodeReconnectEvent(node.getNodeID(), node, null, null));
    Assert.assertEquals(NodeState.DECOMMISSIONED, node.getState());
    Assert.assertEquals("Active Nodes", initialActive, cm.getNumActiveNMs());
    Assert.assertEquals("Decommissioning Nodes", initialDecommissioning - 1,
        cm.getNumDecommissioningNMs());
    Assert.assertEquals("Decommissioned Nodes", initialDecommissioned + 1,
        cm.getNumDecommisionedNMs());
  }

  @Test
  public void testReconnectWithNewPortOnDecommissioningNode() {
    RMNodeImpl node = getDecommissioningNode();
    Random r= new Random();
    node.setHttpPort(r.nextInt(10000));
    // Reconnect event with running app
    node.handle(new RMNodeReconnectEvent(node.getNodeID(), node,
        getAppIdList(), null));
    // still decommissioning
    Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());

    node.setHttpPort(r.nextInt(10000));
    // Reconnect event without any running app
    node.handle(new RMNodeReconnectEvent(node.getNodeID(), node, null, null));
    Assert.assertEquals(NodeState.DECOMMISSIONED, node.getState());
  }

  @Test
  public void testResourceUpdateOnRunningNode() {
    RMNodeImpl node = getRunningNode();
    Resource oldCapacity = node.getTotalCapability();
    assertEquals("Memory resource is not match.", oldCapacity.getMemorySize(), 4096);
    assertEquals("CPU resource is not match.", oldCapacity.getVirtualCores(), 4);
    node.handle(new RMNodeResourceUpdateEvent(node.getNodeID(),
        ResourceOption.newInstance(Resource.newInstance(2048, 2),
            ResourceOption.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT)));
    Resource newCapacity = node.getTotalCapability();
    assertEquals("Memory resource is not match.", newCapacity.getMemorySize(), 2048);
    assertEquals("CPU resource is not match.", newCapacity.getVirtualCores(), 2);

    Assert.assertEquals(NodeState.RUNNING, node.getState());
    Assert.assertNotNull(nodesListManagerEvent);
    Assert.assertEquals(NodesListManagerEventType.NODE_USABLE,
        nodesListManagerEvent.getType());
  }

  @Test
  public void testDecommissioningOnRunningNode(){
    getDecommissioningNode();
  }

  @Test
  public void testResourceUpdateOnNewNode() {
    RMNodeImpl node = getNewNode(Resource.newInstance(4096, 4));
    Resource oldCapacity = node.getTotalCapability();
    assertEquals("Memory resource is not match.", oldCapacity.getMemorySize(), 4096);
    assertEquals("CPU resource is not match.", oldCapacity.getVirtualCores(), 4);
    node.handle(new RMNodeResourceUpdateEvent(node.getNodeID(),
        ResourceOption.newInstance(Resource.newInstance(2048, 2), 
            ResourceOption.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT)));
    Resource newCapacity = node.getTotalCapability();
    assertEquals("Memory resource is not match.", newCapacity.getMemorySize(), 2048);
    assertEquals("CPU resource is not match.", newCapacity.getVirtualCores(), 2);

    Assert.assertEquals(NodeState.NEW, node.getState());
  }

  @Test
  public void testResourceUpdateOnRebootedNode() {
    RMNodeImpl node = getRebootedNode();
    ClusterMetrics cm = ClusterMetrics.getMetrics();
    int initialActive = cm.getNumActiveNMs();
    int initialUnHealthy = cm.getUnhealthyNMs();
    int initialDecommissioning = cm.getNumDecommissioningNMs();
    Resource oldCapacity = node.getTotalCapability();
    assertEquals("Memory resource is not match.", oldCapacity.getMemorySize(), 4096);
    assertEquals("CPU resource is not match.", oldCapacity.getVirtualCores(), 4);
    node.handle(new RMNodeResourceUpdateEvent(node.getNodeID(), ResourceOption
        .newInstance(Resource.newInstance(2048, 2),
            ResourceOption.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT)));
    Resource newCapacity = node.getTotalCapability();
    assertEquals("Memory resource is not match.", newCapacity.getMemorySize(), 2048);
    assertEquals("CPU resource is not match.", newCapacity.getVirtualCores(), 2);

    Assert.assertEquals(NodeState.REBOOTED, node.getState());
    Assert.assertEquals("Active Nodes", initialActive, cm.getNumActiveNMs());
    Assert.assertEquals("Unhealthy Nodes", initialUnHealthy,
        cm.getUnhealthyNMs());
    Assert.assertEquals("Decommissioning Nodes", initialDecommissioning,
        cm.getNumDecommissioningNMs());
  }

  // Test unhealthy report on a decommissioning node will make it
  // keep decommissioning as long as there's a running or keep alive app.
  // Otherwise, it will go to decommissioned
  @Test
  public void testDecommissioningUnhealthy() {
    RMNodeImpl node = getDecommissioningNode();
    NodeHealthStatus status = NodeHealthStatus.newInstance(false, "sick",
        System.currentTimeMillis());
    List<ApplicationId> keepAliveApps = new ArrayList<>();
    keepAliveApps.add(BuilderUtils.newApplicationId(1, 1));
    NodeStatus nodeStatus = NodeStatus.newInstance(node.getNodeID(), 0,
        null, keepAliveApps, status, null, null, null);
    node.handle(new RMNodeStatusEvent(node.getNodeID(), nodeStatus, null));
    Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
    nodeStatus.setKeepAliveApplications(null);
    node.handle(new RMNodeStatusEvent(node.getNodeID(), nodeStatus, null));
    Assert.assertEquals(NodeState.DECOMMISSIONED, node.getState());
  }

  @Test
  public void testReconnnectUpdate() {
    final String nmVersion1 = "nm version 1";
    final String nmVersion2 = "nm version 2";
    RMNodeImpl node = getRunningNode(nmVersion1);
    Assert.assertEquals(nmVersion1, node.getNodeManagerVersion());
    RMNodeImpl reconnectingNode = getRunningNode(nmVersion2);
    node.handle(new RMNodeReconnectEvent(node.getNodeID(), reconnectingNode,
        null, null));
    Assert.assertEquals(nmVersion2, node.getNodeManagerVersion());
  }

  @Test
  public void testContainerExpire() throws Exception {
    ContainerAllocationExpirer mockExpirer =
        mock(ContainerAllocationExpirer.class);
    ApplicationId appId =
        ApplicationId.newInstance(System.currentTimeMillis(), 1);
    ApplicationAttemptId appAttemptId =
        ApplicationAttemptId.newInstance(appId, 1);
    rmContext.getRMApps().put(appId, Mockito.mock(RMApp.class));
    ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1L);
    ContainerId containerId2 = ContainerId.newContainerId(appAttemptId, 2L);
    AllocationExpirationInfo expirationInfo1 =
        new AllocationExpirationInfo(containerId1);
    AllocationExpirationInfo expirationInfo2 =
        new AllocationExpirationInfo(containerId2);
    mockExpirer.register(expirationInfo1);
    mockExpirer.register(expirationInfo2);
    verify(mockExpirer).register(expirationInfo1);
    verify(mockExpirer).register(expirationInfo2);
    ((RMContextImpl) rmContext).setContainerAllocationExpirer(mockExpirer);
    RMNodeImpl rmNode = getRunningNode();
    ContainerStatus status1 =
        ContainerStatus
          .newInstance(containerId1, ContainerState.RUNNING, "", 0);
    ContainerStatus status2 =
        ContainerStatus.newInstance(containerId2, ContainerState.COMPLETE, "",
          0);
    List<ContainerStatus> statusList = new ArrayList<ContainerStatus>();
    statusList.add(status1);
    statusList.add(status2);
    RMNodeStatusEvent statusEvent = getMockRMNodeStatusEvent(statusList);
    rmNode.handle(statusEvent);
    verify(mockExpirer).unregister(expirationInfo1);
    verify(mockExpirer).unregister(expirationInfo2);
  }

  @Test
  public void testResourceUpdateOnDecommissioningNode() {
    RMNodeImpl node = getDecommissioningNode();
    Resource oldCapacity = node.getTotalCapability();
    assertEquals("Memory resource is not match.", oldCapacity.getMemorySize(), 4096);
    assertEquals("CPU resource is not match.", oldCapacity.getVirtualCores(), 4);
    node.handle(new RMNodeResourceUpdateEvent(node.getNodeID(),
        ResourceOption.newInstance(Resource.newInstance(2048, 2),
            ResourceOption.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT)));
    Resource originalCapacity = node.getOriginalTotalCapability();
    assertEquals("Memory resource is not match.", originalCapacity.getMemorySize(), oldCapacity.getMemorySize());
    assertEquals("CPU resource is not match.", originalCapacity.getVirtualCores(), oldCapacity.getVirtualCores());
    Resource newCapacity = node.getTotalCapability();
    assertEquals("Memory resource is not match.", newCapacity.getMemorySize(), 2048);
    assertEquals("CPU resource is not match.", newCapacity.getVirtualCores(), 2);

    Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
    Assert.assertNotNull(nodesListManagerEvent);
    Assert.assertEquals(NodesListManagerEventType.NODE_DECOMMISSIONING,
        nodesListManagerEvent.getType());
  }

  @Test
  public void testResourceUpdateOnRecommissioningNode() {
    RMNodeImpl node = getDecommissioningNode();
    Resource oldCapacity = node.getTotalCapability();
    assertEquals("Memory resource is not match.", oldCapacity.getMemorySize(), 4096);
    assertEquals("CPU resource is not match.", oldCapacity.getVirtualCores(), 4);
    assertFalse("updatedCapability should be false.",
        node.isUpdatedCapability());
    node.handle(new RMNodeEvent(node.getNodeID(),
        RMNodeEventType.RECOMMISSION));
    Resource originalCapacity = node.getOriginalTotalCapability();
    assertEquals("Original total capability not null after recommission", null, originalCapacity);
    assertTrue("updatedCapability should be set.", node.isUpdatedCapability());
  }

  @Test
  public void testDisappearingContainer() {
    ContainerId cid1 = BuilderUtils.newContainerId(
        BuilderUtils.newApplicationAttemptId(
            BuilderUtils.newApplicationId(1, 1), 1), 1);
    ContainerId cid2 = BuilderUtils.newContainerId(
        BuilderUtils.newApplicationAttemptId(
            BuilderUtils.newApplicationId(2, 2), 2), 2);
    ArrayList<ContainerStatus> containerStats =
        new ArrayList<ContainerStatus>();
    containerStats.add(ContainerStatus.newInstance(cid1,
        ContainerState.RUNNING, "", -1));
    containerStats.add(ContainerStatus.newInstance(cid2,
        ContainerState.RUNNING, "", -1));
    node = getRunningNode();
    node.handle(getMockRMNodeStatusEvent(containerStats));
    assertEquals("unexpected number of running containers",
        2, node.getLaunchedContainers().size());
    Assert.assertTrue("first container not running",
        node.getLaunchedContainers().contains(cid1));
    Assert.assertTrue("second container not running",
        node.getLaunchedContainers().contains(cid2));
    assertEquals("unexpected number of running containers",
        2, node.getUpdatedExistContainers().size());
    Assert.assertTrue("first container not running",
        node.getUpdatedExistContainers().containsKey(cid1));
    Assert.assertTrue("second container not running",
        node.getUpdatedExistContainers().containsKey(cid2));
    assertEquals("already completed containers",
        0, completedContainers.size());
    containerStats.remove(0);
    node.handle(getMockRMNodeStatusEvent(containerStats));
    assertEquals("expected one container to be completed",
        1, completedContainers.size());
    ContainerStatus cs = completedContainers.get(0);
    assertEquals("first container not the one that completed",
        cid1, cs.getContainerId());
    assertEquals("completed container not marked complete",
        ContainerState.COMPLETE, cs.getState());
    assertEquals("completed container not marked aborted",
        ContainerExitStatus.ABORTED, cs.getExitStatus());
    Assert.assertTrue("completed container not marked missing",
        cs.getDiagnostics().contains("not reported"));
    assertEquals("unexpected number of running containers",
        1, node.getLaunchedContainers().size());
    Assert.assertTrue("second container not running",
        node.getLaunchedContainers().contains(cid2));
    assertEquals("unexpected number of running containers",
        1, node.getUpdatedExistContainers().size());
    Assert.assertTrue("second container not running",
        node.getUpdatedExistContainers().containsKey(cid2));
  }

  @Test
  public void testForHandlingDuplicatedCompltedContainers() {
    NodeStatus mockNodeStatus = createMockNodeStatus();
    // Start the node
    node.handle(new RMNodeStartedEvent(null, null, null, mockNodeStatus));
    // Add info to the queue first
    node.setNextHeartBeat(false);

    ContainerId completedContainerId1 = BuilderUtils.newContainerId(
        BuilderUtils.newApplicationAttemptId(BuilderUtils.newApplicationId(0, 0), 0), 0);

    RMNodeStatusEvent statusEvent1 = getMockRMNodeStatusEvent(null);

    ContainerStatus containerStatus1 = mock(ContainerStatus.class);

    doReturn(completedContainerId1).when(containerStatus1).getContainerId();
    doReturn(Collections.singletonList(containerStatus1)).when(statusEvent1)
        .getContainers();

    verify(scheduler, times(1)).handle(any(NodeAddedSchedulerEvent.class));
    node.handle(statusEvent1);
    verify(scheduler, times(1)).handle(any(NodeAddedSchedulerEvent.class));
    Assert.assertEquals(1, node.getQueueSize());
    Assert.assertEquals(1, node.getCompletedContainers().size());

    // test for duplicate entries
    node.handle(statusEvent1);
    Assert.assertEquals(1, node.getQueueSize());

    // send clean up container event
    node.handle(new RMNodeFinishedContainersPulledByAMEvent(node.getNodeID(),
        Collections.singletonList(completedContainerId1)));

    NodeHeartbeatResponse hbrsp =
        Records.newRecord(NodeHeartbeatResponse.class);
    node.setAndUpdateNodeHeartbeatResponse(hbrsp);

    Assert.assertEquals(1, hbrsp.getContainersToBeRemovedFromNM().size());
    Assert.assertEquals(0, node.getCompletedContainers().size());
  }

  @Test
  public void testFinishedContainersPulledByAMOnNewNode() {
    RMNodeImpl rmNode = getNewNode();
    NodeId nodeId = BuilderUtils.newNodeId("localhost", 0);

    rmNode.handle(new RMNodeFinishedContainersPulledByAMEvent(nodeId,
        getContainerIdList()));
    Assert.assertEquals(1, rmNode.getContainersToBeRemovedFromNM().size());

  }

  private void calcIntervalTest(RMNodeImpl rmNode, ResourceUtilization nodeUtil,
      long hbDefault, long hbMin, long hbMax, float speedup, float slowdown,
                                float cpuUtil, long expectedHb) {
    nodeUtil.setCPU(cpuUtil);
    rmNode.setNodeUtilization(nodeUtil);
    long hbInterval = rmNode.calculateHeartBeatInterval(hbDefault, hbMin, hbMax,
        speedup, slowdown);
    assertEquals("heartbeat interval incorrect", expectedHb, hbInterval);
  }

  @Test
  public void testCalculateHeartBeatInterval() {
    RMNodeImpl rmNode = getRunningNode();
    Resource nodeCapability = rmNode.getTotalCapability();
    ClusterMetrics metrics = ClusterMetrics.getMetrics();
    // Set cluster capability to 10 * nodeCapability
    int vcoreUnit = nodeCapability.getVirtualCores();
    rmNode.setPhysicalResource(nodeCapability);
    int clusterVcores = vcoreUnit * 10;
    metrics.incrCapability(
        Resource.newInstance(10 * nodeCapability.getMemorySize(),
            clusterVcores));

    long hbDefault = 2000;
    long hbMin = 1500;
    long hbMax = 2500;
    float speedup = 1.0F;
    float slowdown = 1.0F;
    metrics.incrUtilizedVirtualCores(vcoreUnit * 5); // 50 % cluster util
    ResourceUtilization nodeUtil = ResourceUtilization.newInstance(
        1024, vcoreUnit, 0.0F * vcoreUnit); // 0% rmNode util
    calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
        speedup, slowdown, vcoreUnit * 0.0F, hbMin); // 0%

    calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
        speedup, slowdown, vcoreUnit * 0.10F, hbMin); // 10%

    calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
        speedup, slowdown, vcoreUnit * 0.20F, hbMin); // 20%

    calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
        speedup, slowdown, vcoreUnit * 0.30F, 1600); // 30%

    calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
        speedup, slowdown, vcoreUnit * 0.40F, 1800); // 40%

    calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
        speedup, slowdown, vcoreUnit * 0.50F, hbDefault); // 50%

    calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
        speedup, slowdown, vcoreUnit * 0.60F, 2200); // 60%

    calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
        speedup, slowdown, vcoreUnit * 0.70F, 2400); // 70%

    calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
        speedup, slowdown, vcoreUnit * 0.80F, hbMax); // 80%

    calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
        speedup, slowdown, vcoreUnit * 0.90F, hbMax); // 90%

    calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
        speedup, slowdown, vcoreUnit * 1.0F, hbMax); // 100%

    // Try with 50% speedup/slowdown factors
    speedup = 0.5F;
    slowdown = 0.5F;
    calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
        speedup, slowdown, vcoreUnit * 0.0F, hbMin); // 0%

    calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
        speedup, slowdown, vcoreUnit * 0.10F, 1600); // 10%

    calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
        speedup, slowdown, vcoreUnit * 0.20F, 1700); // 20%

    calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
        speedup, slowdown, vcoreUnit * 0.30F, 1800); // 30%

    calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
        speedup, slowdown, vcoreUnit * 0.40F, 1900); // 40%

    calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
        speedup, slowdown, vcoreUnit * 0.50F, hbDefault); // 50%

    calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
        speedup, slowdown, vcoreUnit * 0.60F, 2100); // 60%

    calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
        speedup, slowdown, vcoreUnit * 0.70F, 2200); // 70%

    calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
        speedup, slowdown, vcoreUnit * 0.80F, 2300); // 80%

    calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
        speedup, slowdown, vcoreUnit * 0.90F, 2400); // 90%

    calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
        speedup, slowdown, vcoreUnit * 1.0F, hbMax); // 100%

    // With Physical Resource null, it should always return default
    rmNode.setPhysicalResource(null);
    calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
        speedup, slowdown, vcoreUnit * 0.1F, hbDefault); // 10%
    calcIntervalTest(rmNode, nodeUtil, hbDefault, hbMin, hbMax,
        speedup, slowdown, vcoreUnit * 1.0F, hbDefault); // 100%
  }
}
