/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.slider.server.appmaster.model.appstate

import groovy.util.logging.Slf4j
import org.apache.hadoop.yarn.api.records.Container
import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.slider.server.appmaster.model.mock.BaseMockAppStateTest
import org.apache.slider.server.appmaster.model.mock.MockFactory
import org.apache.slider.server.appmaster.model.mock.MockRMOperationHandler
import org.apache.slider.server.appmaster.model.mock.MockRoles
import org.apache.slider.server.appmaster.model.mock.MockYarnEngine
import org.apache.slider.server.appmaster.operations.AbstractRMOperation
import org.apache.slider.server.appmaster.operations.CancelRequestOperation
import org.apache.slider.server.appmaster.operations.ContainerReleaseOperation
import org.apache.slider.server.appmaster.operations.ContainerRequestOperation
import org.apache.slider.server.appmaster.operations.RMOperationHandler
import org.apache.slider.server.appmaster.state.*
import org.junit.Test

import static org.apache.slider.server.appmaster.state.ContainerPriority.buildPriority
import static org.apache.slider.server.appmaster.state.ContainerPriority.extractRole

@Slf4j
class TestMockAppStateRMOperations extends BaseMockAppStateTest implements MockRoles {

  @Override
  String getTestName() {
    return "TestMockAppStateRMOperations"
  }

  @Test
  public void testPriorityOnly() throws Throwable {
    assert 5 == extractRole(buildPriority(5, false))
  }

  @Test
  public void testPriorityRoundTrip() throws Throwable {
    assert 5 == extractRole(buildPriority(5, false))
  }

  @Test
  public void testPriorityRoundTripWithRequest() throws Throwable {
    int priority = buildPriority(5, false)
    assert 5 == extractRole(priority)
  }

  @Test
  public void testMockAddOp() throws Throwable {
    role0Status.desired = 1
    List<AbstractRMOperation> ops = appState.reviewRequestAndReleaseNodes()
    assertListLength(ops, 1)
    ContainerRequestOperation operation = (ContainerRequestOperation) ops[0]
    int priority = operation.request.priority.priority
    assert extractRole(priority) == MockFactory.PROVIDER_ROLE0.id
    RMOperationHandler handler = new MockRMOperationHandler()
    handler.execute(ops)

    AbstractRMOperation op = handler.operations[0]
    assert op instanceof ContainerRequestOperation
  }

  /**
   * Test of a flex up and down op which verifies that outstanding
   * requests are cancelled first.
   * <ol>
   *   <li>request 5 nodes, assert 5 request made</li>
   *   <li>allocate 1 of them</li>
   *   <li>flex cluster size to 3</li>
   *   <li>assert this generates 2 cancel requests</li>
   * </ol>
   */
  @Test
  public void testRequestThenCancelOps() throws Throwable {
    def role0 = role0Status
    role0.desired = 5
    List<AbstractRMOperation> ops = appState.reviewRequestAndReleaseNodes()
    assertListLength(ops, 5)
    // now 5 outstanding requests.
    assert role0.requested == 5
    
    // allocate one
    role0.incActual()
    role0.decRequested()
    assert role0.requested == 4


    // flex cluster to 3
    role0.desired = 3
    ops = appState.reviewRequestAndReleaseNodes()

    // expect a cancel operation from review
    assertListLength(ops, 1)
    assert ops[0] instanceof CancelRequestOperation
    RMOperationHandler handler = new MockRMOperationHandler()
    handler.availableToCancel = 4;
    handler.execute(ops)
    assert handler.availableToCancel == 2
    assert role0.requested == 2
    
    // flex down one more
    role0.desired = 2
    ops = appState.reviewRequestAndReleaseNodes()
    assertListLength(ops, 1)
    assert ops[0] instanceof CancelRequestOperation
    handler.execute(ops)
    assert handler.availableToCancel == 1
    assert role0.requested == 1

  }

  @Test
  public void testCancelNoActualContainers() throws Throwable {
    def role0 = role0Status
    role0.desired = 5
    List<AbstractRMOperation> ops = appState.reviewRequestAndReleaseNodes()
    assertListLength(ops, 5)
    // now 5 outstanding requests.
    assert role0.requested == 5
    role0.desired = 0
    ops = appState.reviewRequestAndReleaseNodes()
    assertListLength(ops, 1)
    CancelRequestOperation cancel = ops[0] as CancelRequestOperation
    assert cancel.count == 5
  }


  @Test
  public void testFlexDownOutstandingRequests() throws Throwable {
    // engine only has two nodes, so > 2 will be outstanding
    engine = new MockYarnEngine(1, 2)
    List<AbstractRMOperation> ops
    // role: desired = 2, requested = 1, actual=1 
    def role0 = role0Status
    role0.desired = 4
    createAndSubmitNodes()
    
    assert role0.requested == 2
    assert role0.actual == 2
    // there are now two outstanding, two actual 
    // Release 3 and verify that the two
    // cancellations were combined with a release
    role0.desired = 1;
    assert role0.delta == -3
    ops = appState.reviewRequestAndReleaseNodes()
    assertListLength(ops, 2)
    assert role0.requested == 0
    assert role0.releasing == 1
  }

  @Test
  public void testCancelAllOutstandingRequests() throws Throwable {

    // role: desired = 2, requested = 1, actual=1 
    def role0 = role0Status
    role0.desired = 2
    role0.incRequested()
    role0.incRequested()
    List<AbstractRMOperation> ops
    
    // there are now two outstanding, two actual 
    // Release 3 and verify that the two
    // cancellations were combined with a release
    role0.desired = 0;
    ops = appState.reviewRequestAndReleaseNodes()
    assertListLength(ops, 1)
    CancelRequestOperation cancel = ops[0] as CancelRequestOperation
    assert cancel.getCount() == 2
  }

  
  @Test
  public void testFlexUpOutstandingRequests() throws Throwable {
    
    // role: desired = 2, requested = 1, actual=1 
    def role0 = role0Status
    role0.desired = 2
    role0.incActual();
    role0.incRequested()
    
    List<AbstractRMOperation> ops

    // flex up 2 nodes, yet expect only one node to be requested,
    // as the  outstanding request is taken into account
    role0.desired = 4;
    role0.incRequested()

    assert role0.actual == 1;
    assert role0.requested == 2;
    assert role0.actualAndRequested == 3;
    assert role0.delta == 1
    ops = appState.reviewRequestAndReleaseNodes()
    assertListLength(ops, 1)
    assert ops[0] instanceof ContainerRequestOperation
    assert role0.requested == 3
  }

  @Test
  public void testFlexUpNoSpace() throws Throwable {
    // engine only has two nodes, so > 2 will be outstanding
    engine = new MockYarnEngine(1, 2)
    List<AbstractRMOperation> ops
    // role: desired = 2, requested = 1, actual=1 
    def role0 = role0Status
    role0.desired = 4
    createAndSubmitNodes()

    assert role0.requested == 2
    assert role0.actual == 2
    role0.desired = 8;
    assert role0.delta == 4
    createAndSubmitNodes()
    assert role0.requested == 6
  }


  @Test
  public void testAllocateReleaseOp() throws Throwable {
    role0Status.desired = 1

    List<AbstractRMOperation> ops = appState.reviewRequestAndReleaseNodes()
    ContainerRequestOperation operation = (ContainerRequestOperation) ops[0]
    def (ArrayList<ContainerAssignment> assignments, Container cont, AMRMClient.ContainerRequest request) = satisfyContainerRequest(
        operation)
    assertListLength(ops, 1)
    assertListLength(assignments, 1)
    ContainerAssignment assigned = assignments[0]
    Container target = assigned.container
    assert target.id == cont.id
    int roleId = assigned.role.priority
    assert roleId == extractRole(request.priority)
    assert assigned.role.name == ROLE0
    RoleInstance ri = roleInstance(assigned)
    //tell the app it arrived
    appState.containerStartSubmitted(target, ri);
    appState.innerOnNodeManagerContainerStarted(target.id)
    assert role0Status.started == 1

    //now release it by changing the role status
    role0Status.desired = 0
    ops = appState.reviewRequestAndReleaseNodes()
    assertListLength(ops, 1)

    assert ops[0] instanceof ContainerReleaseOperation
    ContainerReleaseOperation release = (ContainerReleaseOperation) ops[0]
    assert release.containerId == cont.id
  }

  public List satisfyContainerRequest(ContainerRequestOperation operation) {
    AMRMClient.ContainerRequest request = operation.request
    Container cont = engine.allocateContainer(request)
    List<Container> allocated = [cont]
    List<ContainerAssignment> assignments = [];
    List<AbstractRMOperation> operations = []
    appState.onContainersAllocated(allocated, assignments, operations)
    return [assignments, cont, request]
  }

  @Test
  public void testComplexAllocation() throws Throwable {
    role0Status.desired = 1
    role1Status.desired = 3

    List<AbstractRMOperation> ops = appState.reviewRequestAndReleaseNodes()
    List<Container> allocations = engine.execute(ops)
    List<ContainerAssignment> assignments = [];
    List<AbstractRMOperation> releases = []
    appState.onContainersAllocated(allocations, assignments, releases)
    assertListLength(releases, 0)
    assertListLength(assignments, 4)
    assignments.each { ContainerAssignment assigned ->
      Container target = assigned.container
      RoleInstance ri = roleInstance(assigned)
      appState.containerStartSubmitted(target, ri);
    }
    //insert some async operation here
    assignments.each { ContainerAssignment assigned ->
      Container target = assigned.container
      appState.innerOnNodeManagerContainerStarted(target.id)
    }
    assert engine.containerCount() == 4;
    role1Status.desired = 0
    ops = appState.reviewRequestAndReleaseNodes()
    assertListLength(ops, 3)
    allocations = engine.execute(ops)
    assert engine.containerCount() == 1;

    appState.onContainersAllocated(allocations, assignments, releases)
    assert assignments.empty
    assert releases.empty
  }

  @Test
  public void testDoubleNodeManagerStartEvent() throws Throwable {
    role0Status.desired = 1

    List<AbstractRMOperation> ops = appState.reviewRequestAndReleaseNodes()
    List<Container> allocations = engine.execute(ops)
    List<ContainerAssignment> assignments = [];
    List<AbstractRMOperation> releases = []
    appState.onContainersAllocated(allocations, assignments, releases)
    assertListLength(assignments, 1)
    ContainerAssignment assigned = assignments[0]
    Container target = assigned.container
    RoleInstance ri = roleInstance(assigned)
    appState.containerStartSubmitted(target, ri);
    RoleInstance ri2 = appState.innerOnNodeManagerContainerStarted(target.id)
    assert ri2 == ri
    //try a second time, expect an error
    try {
      appState.innerOnNodeManagerContainerStarted(target.id)
      fail("Expected an exception")
    } catch (RuntimeException expected) {
      // expected
    }
    //and non-faulter should not downgrade to a null
    log.warn("Ignore any exception/stack trace that appears below")
    log.warn("===============================================================")
    RoleInstance ri3 = appState.onNodeManagerContainerStarted(target.id)
    log.warn("===============================================================")
    log.warn("Ignore any exception/stack trace that appeared above")
    assert ri3 == null
  }

}
