/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.tez.dag.app.rm;

import java.util.HashMap;
import java.util.concurrent.BlockingQueue;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.rm.TestLocalTaskSchedulerService.MockLocalTaskSchedulerSerivce.MockAsyncDelegateRequestHandler;
import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
import org.junit.Assert;
import org.junit.Test;

import static org.junit.Assert.*;
import static org.mockito.Mockito.*;

public class TestLocalTaskSchedulerService {

  LocalTaskSchedulerService ltss ;
  int core =10;

  @Test(timeout = 5000)
  public void testCreateResource() {
    Resource resource;
    //value in integer
    long value = 4*1024*1024;
    resource = ltss.createResource(value,core);
    Assert.assertEquals((int)(value/(1024*1024)),resource.getMemory());
  }

  @Test(timeout = 5000)
  public void testCreateResourceLargerThanIntMax() {
    //value beyond integer but within Long.MAX_VALUE
    try {
      ltss.createResource(Long.MAX_VALUE, core);
      fail("No exception thrown.");
    } catch (Exception ex) {
      assertTrue(ex instanceof IllegalArgumentException);
      assertTrue(ex.getMessage().contains("Out of range:"));
    }
  }

  @Test(timeout = 5000)
  public void testCreateResourceWithNegativeValue() {
    //value is Long.MAX_VALUE*1024*1024,
    // it will be negative after it is passed to createResource

    try {
      ltss.createResource((Long.MAX_VALUE*1024*1024), core);
      fail("No exception thrown.");
    } catch (Exception ex) {
      assertTrue(ex instanceof IllegalArgumentException);
      assertTrue(ex.getMessage().contains("Negative Memory or Core provided!"));
    }
  }

  /**
   * Normal flow of TaskAttempt
   */
  @Test(timeout = 5000)
  public void testDeallocationBeforeAllocation() throws InterruptedException {
    ApplicationAttemptId appAttemptId =
        ApplicationAttemptId.newInstance(ApplicationId.newInstance(10000l, 1), 1);

    TaskSchedulerContext mockContext = TestTaskSchedulerHelpers
        .setupMockTaskSchedulerContext("", 0, "", false, appAttemptId, 10000l, null, new Configuration());

    MockLocalTaskSchedulerSerivce taskSchedulerService = new MockLocalTaskSchedulerSerivce(mockContext);
    taskSchedulerService.initialize();
    taskSchedulerService.start();

    Task task = mock(Task.class);
    taskSchedulerService.allocateTask(task, Resource.newInstance(1024, 1), null, null, Priority.newInstance(1), null, null);
    taskSchedulerService.deallocateTask(task, false, null, null);
    // start the RequestHandler, DeallocateTaskRequest has higher priority, so will be processed first
    taskSchedulerService.startRequestHandlerThread();

    MockAsyncDelegateRequestHandler requestHandler = taskSchedulerService.getRequestHandler();
    requestHandler.drainRequest(1);
    assertEquals(1, requestHandler.deallocateCount);
    // The corresponding AllocateTaskRequest will be removed, so won't been processed.
    assertEquals(0, requestHandler.allocateCount);
    taskSchedulerService.shutdown();
  }

  /**
   * TaskAttempt Killed from START_WAIT
   */
  @Test(timeout = 5000)
  public void testDeallocationAfterAllocation() throws InterruptedException {
    ApplicationAttemptId appAttemptId =
        ApplicationAttemptId.newInstance(ApplicationId.newInstance(10000l, 1), 1);

    TaskSchedulerContext mockContext = TestTaskSchedulerHelpers
        .setupMockTaskSchedulerContext("", 0, "", false, appAttemptId, 10000l, null, new Configuration());

    MockLocalTaskSchedulerSerivce taskSchedulerService =
        new MockLocalTaskSchedulerSerivce(mockContext);

    taskSchedulerService.initialize();
    taskSchedulerService.start();

    Task task = mock(Task.class);
    taskSchedulerService.allocateTask(task, Resource.newInstance(1024, 1), null, null, Priority.newInstance(1), null, null);
    taskSchedulerService.startRequestHandlerThread();

    MockAsyncDelegateRequestHandler requestHandler = taskSchedulerService.getRequestHandler();
    requestHandler.drainRequest(1);
    taskSchedulerService.deallocateTask(task, false, null, null);
    requestHandler.drainRequest(2);
    assertEquals(1, requestHandler.deallocateCount);
    assertEquals(1, requestHandler.allocateCount);
    taskSchedulerService.shutdown();
  }

  static class MockLocalTaskSchedulerSerivce extends LocalTaskSchedulerService {

    private MockAsyncDelegateRequestHandler requestHandler;

    public MockLocalTaskSchedulerSerivce(TaskSchedulerContext appClient) {
      super(appClient);
    }

    @Override
    public AsyncDelegateRequestHandler createRequestHandler(Configuration conf) {
      requestHandler = new MockAsyncDelegateRequestHandler(taskRequestQueue,
          new LocalContainerFactory(getContext().getApplicationAttemptId(), customContainerAppId),
          taskAllocations,
          getContext(),
          conf);
      return requestHandler;
    }

    @Override
    public void start() {
      // don't start RequestHandler thread, control it in unit test
    }

    public void startRequestHandlerThread() {
      asyncDelegateRequestThread.start();
    }

    public MockAsyncDelegateRequestHandler getRequestHandler() {
      return requestHandler;
    }

    static class MockAsyncDelegateRequestHandler extends AsyncDelegateRequestHandler {

      public int allocateCount = 0;
      public int deallocateCount = 0;
      public int processedCount =0;

      MockAsyncDelegateRequestHandler(
          BlockingQueue<TaskRequest> taskRequestQueue,
          LocalContainerFactory localContainerFactory,
          HashMap<Object, Container> taskAllocations,
          TaskSchedulerContext appClientDelegate, Configuration conf) {
        super(taskRequestQueue, localContainerFactory, taskAllocations,
            appClientDelegate, conf);
      }

      @Override
      void processRequest() {
        super.processRequest();
        processedCount ++;
      }

      public void drainRequest(int count) {
        while(processedCount != count || !taskRequestQueue.isEmpty()) {
          try {
            Thread.sleep(100);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
      }

      @Override
      void allocateTask(AllocateTaskRequest request) {
        super.allocateTask(request);
        allocateCount ++;
      }

      @Override
      void deallocateTask(DeallocateTaskRequest request) {
        super.deallocateTask(request);
        deallocateCount ++;
      }
    }
  }
}
