/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.mapred;

import java.util.List;

import junit.framework.Assert;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.junit.Before;
import org.junit.Test;

//
// Test case for SimulatorJobTracker.
// We create a table of expected list of new events generated and 
// another table of expected heartbeat() in parameters and the task actions 
// to performe for each timestamp t. We then run the task tracker with our
// own little event queue and check if exactly those things happen that
// are listed in the two tables/
//
public class TestSimulatorTaskTracker {
  MockSimulatorJobTracker jobTracker;
  SimulatorTaskTracker taskTracker;

  static final Log LOG = LogFactory.getLog(TestSimulatorTaskTracker.class);
  
  // Our own little event queue, checks the events against the expected before \
  // enqueueing them
  CheckedEventQueue eventQueue;
  
  // Global test parameters
  final int heartbeatInterval = 10;
  final long simulationStartTime = 100;
  
  // specify number of heartbeats since simulation start for mapStarts
  final long[] mapStarts = {2, 3, 4};
  // specify number of heartbeats since mapStarts[i] for mapKills[i], 
  // use -1 for no kill
  final long[] mapKills = {1, -1, 2};
  final long[] mapRuntimes = {53, 17, 42};
  
  // specify number of heartbeats since start for reduceStarts
  final long[] reduceStarts = {3, 4, 6};
  // specify number of heartbeats since mapStarts[i] for mapKills[i], 
  // use -1 for no kill
  final long[] reduceKills = {1, -1, 6};  
  final long[] mapDoneDelays = {11, 0, 33};
  final long[] reduceRuntimes = {49, 25, 64};
  
  final static String taskAttemptIdPrefix = "attempt_200907150128_0007_";
  final String taskTrackerName = "test_task_tracker";
  final int maxMapSlots = 3;
  final int maxReduceSlots = 3;
  
  @Before
  public void setUp() {
    try {
      jobTracker = new MockSimulatorJobTracker(simulationStartTime,
                                               heartbeatInterval, true);
    } catch (Exception e) {
      Assert.fail("Couldn't set up the mock job tracker: " + e);
    }
    taskTracker = new SimulatorTaskTracker(jobTracker, taskTrackerName,
                                           "test_host", 
                                           maxMapSlots, maxReduceSlots);
    eventQueue = new CheckedEventQueue(simulationStartTime); 
  }
  
  @Test
  public void testInitAndHeartbeat() {
    LOG.debug("Testing init and hearbeat mechanism");
    genericTest(5, 0, 0, false);
  }

  // All further tests assume that testInitAndHeartbeat passed
  @Test
  public void testSingleMapTask() {
    LOG.debug("Testing with a single map task");
    genericTest(20, 1, 0, false);
  }

  @Test
  public void testSingleReduceTask() {
    LOG.debug("Testing with a single reduce task");
    genericTest(20, 0, 1, false);
  }
  
  @Test
  public void testMultipleMapTasks() {
    LOG.debug("Testing with multiple map tasks");
    genericTest(20, mapStarts.length, 0, false);
  }
  
  @Test
  public void testMultipleReduceTasks() {
    LOG.debug("Testing with multiple reduce tasks");
    genericTest(20, 0, reduceStarts.length, false);
  }

  @Test
  public void testMultipleMapAndReduceTasks() {
    LOG.debug("Testing with multiple map and reduce tasks");
    genericTest(20, mapStarts.length, reduceStarts.length, false);
  }

  @Test
  public void testKillSingleMapTask() {
    LOG.debug("Testing killing a single map task");
    genericTest(20, 1, 0, true);
  }

  @Test
  public void testKillSingleReduceTask() {
    LOG.debug("Testing killing a single reduce task");
    genericTest(20, 0, 1, true);
  }
  
  @Test
  public void testKillMultipleMapTasks() {
    LOG.debug("Testing killing multiple map tasks");
    genericTest(20, mapStarts.length, 0, true);
  }

  @Test
  public void testKillMultipleReduceTasks() {
    LOG.debug("Testing killing multiple reduce tasks");
    genericTest(20, 0, reduceStarts.length, true);
  }
  
  @Test
  public void testKillMultipleMapAndReduceTasks() {
    LOG.debug("Testing killing multiple map and reduce tasks");
    genericTest(20, mapStarts.length, reduceStarts.length, true);
  }
  
  protected void genericTest(int numAccepts, int numMaps, int numReduces, 
                             boolean testKill) {
    LOG.debug("Generic test with numAccepts=" + numAccepts +
              ", numMaps=" + numMaps + ", numReduces=" + numReduces +
              ", testKill=" + testKill);
    
    setUpHeartbeats(numAccepts);
    for(int i=0; i<numMaps; i++) {
      setUpMapTask(i, testKill);
    }
    for(int i=0; i<numReduces; i++) {
      setUpReduceTask(i, testKill);
    }    
    runTaskTracker();
  }
  
  // numAccepts must be at least 1
  private void setUpHeartbeats(int numAccepts) {
    eventQueue.expectHeartbeats(taskTracker, numAccepts, heartbeatInterval);
    jobTracker.expectEmptyHeartbeats(taskTrackerName, numAccepts);
  }
  
  private void setUpMapTask(TaskAttemptID mapTaskId, long mapStart, 
                            long mapRuntime, long mapKill) {
    jobTracker.runMapTask(taskTrackerName, mapTaskId, mapStart, mapRuntime, 
                          mapKill);
    eventQueue.expectMapTask(taskTracker, mapTaskId, mapStart, mapRuntime);
  }

  private void setUpMapTask(int idx, boolean testKill) {
    TaskAttemptID mapTaskId = createTaskAttemptID(true, idx);
    long mapStart = simulationStartTime + heartbeatInterval*mapStarts[idx];
    long mapKill = -1;
    if (testKill && 0 <= mapKills[idx]) {
      mapKill = mapStart + heartbeatInterval*mapKills[idx];
    }
    setUpMapTask(mapTaskId, mapStart, mapRuntimes[idx], mapKill);
  }

  private void setUpReduceTask(TaskAttemptID reduceTaskId, long reduceStart, 
                               long mapDoneDelay, long reduceRuntime,
                               long reduceKill) {
    jobTracker.runReduceTask(taskTrackerName, reduceTaskId, reduceStart,
                             mapDoneDelay, reduceRuntime, reduceKill);
    long mapDone = jobTracker.nextHeartbeat(reduceStart + mapDoneDelay);
    if (reduceKill < 0 ||  mapDone < reduceKill) {
      // it generates completion events iff it survives mapDone 
      eventQueue.expectReduceTask(taskTracker, reduceTaskId, 
                                  mapDone, reduceRuntime);
    }
  }

  private void setUpReduceTask(int idx, boolean testKill) {
    TaskAttemptID reduceTaskId = createTaskAttemptID(false, idx);
    long reduceStart = simulationStartTime + 
                       heartbeatInterval*reduceStarts[idx];
    long reduceKill = -1;
    if (testKill && 0 <= reduceKills[idx]) {
      reduceKill = reduceStart + heartbeatInterval*reduceKills[idx];
    }
    setUpReduceTask(reduceTaskId, reduceStart, mapDoneDelays[idx],
                    reduceRuntimes[idx], reduceKill);
  }
   
  //
  // runs a single task tracker
  // checks that generated events conform to expectedEvents
  // and the mock jobtracker checks that the heartbeats() sent to it are right
  //
  private void runTaskTracker() {
    long runUntil = eventQueue.getLastCheckTime();
    LOG.debug("Running task tracker until simulation time=" + runUntil);

    List<SimulatorEvent> events = taskTracker.init(simulationStartTime);
    eventQueue.addAll(events);
    while (true) {
      // can't be empty as it must go past runUntil for verifiability
      // besides it is never empty because of HeartbeatEvent            
      SimulatorEvent currentEvent = eventQueue.get();
      // copy time, make sure TT does not modify it
      long now = currentEvent.getTimeStamp();
      LOG.debug("Number of events to deliver=" + (eventQueue.getSize()+1) +
                ", now=" + now);
      if (now > runUntil) {
        break;
      }                             
      LOG.debug("Calling accept(), event=" + currentEvent + ", now=" + now);
      events = taskTracker.accept(currentEvent);
      LOG.debug("Accept() returned " + events.size() + " new event(s)");
      for (SimulatorEvent newEvent: events) {
        LOG.debug("New event " + newEvent);
      }
      eventQueue.addAll(events);
      LOG.debug("Done checking and enqueuing new events");
    }
    
    // make sure we have seen all expected events, even for the last 
    // time checked
    eventQueue.checkMissingExpected();
    // Mock JT should have consumed all entries from its heartbeat table
    jobTracker.checkMissingHeartbeats();
  }
  
  // taskNumber should be < 10
  static private TaskAttemptID createTaskAttemptID(boolean isMap, 
                                                   int taskNumber) {
    String attempt = taskAttemptIdPrefix + (isMap ? "m" : "r") + 
                     "_00000" + taskNumber + "_0";
    TaskAttemptID taskId = null;
    try {
      taskId = TaskAttemptID.forName(attempt);
    } catch (IllegalArgumentException iae) {
      Assert.fail("Invalid task attempt id string " + iae);
    }
    return taskId;
  }
}
