/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.mapreduce.v2.app.job.impl;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Task;
import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.SystemClock;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.InlineDispatcher;
import org.apache.hadoop.yarn.util.Records;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

@SuppressWarnings("rawtypes")
public class TestTaskImpl {

  private static final Log LOG = LogFactory.getLog(TestTaskImpl.class);    
  
  private JobConf conf;
  private TaskAttemptListener taskAttemptListener;
  private OutputCommitter committer;
  private Token<JobTokenIdentifier> jobToken;
  private JobId jobId;
  private Path remoteJobConfFile;
  private Collection<Token<? extends TokenIdentifier>> fsTokens;
  private Clock clock;
  private Map<TaskId, TaskInfo> completedTasksFromPreviousRun;
  private MRAppMetrics metrics;
  private TaskImpl mockTask;
  private ApplicationId appId;
  private TaskSplitMetaInfo taskSplitMetaInfo;  
  private String[] dataLocations = new String[0]; 
  private final TaskType taskType = TaskType.MAP;
  
  private int startCount = 0;
  private int taskCounter = 0;
  private final int partition = 1;
  
  private InlineDispatcher dispatcher;   
  private List<MockTaskAttemptImpl> taskAttempts;
  
  private class MockTaskImpl extends TaskImpl {
        
    private int taskAttemptCounter = 0;

    public MockTaskImpl(JobId jobId, int partition,
        EventHandler eventHandler, Path remoteJobConfFile, JobConf conf,
        TaskAttemptListener taskAttemptListener, OutputCommitter committer,
        Token<JobTokenIdentifier> jobToken,
        Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock,
        Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
        MRAppMetrics metrics) {
      super(jobId, taskType , partition, eventHandler,
          remoteJobConfFile, conf, taskAttemptListener, committer, 
          jobToken, fsTokens, clock, 
          completedTasksFromPreviousRun, startCount, metrics);
    }

    @Override
    public TaskType getType() {
      return taskType;
    }

    @Override
    protected TaskAttemptImpl createAttempt() {
      MockTaskAttemptImpl attempt = new MockTaskAttemptImpl(getID(), ++taskAttemptCounter, 
          eventHandler, taskAttemptListener, remoteJobConfFile, partition,
          conf, committer, jobToken, fsTokens, clock);
      taskAttempts.add(attempt);
      return attempt;
    }

    @Override
    protected int getMaxAttempts() {
      return 100;
    }    
    
  }
  
  private class MockTaskAttemptImpl extends TaskAttemptImpl {

    private float progress = 0;
    private TaskAttemptState state = TaskAttemptState.NEW;
    private TaskAttemptId attemptId;

    public MockTaskAttemptImpl(TaskId taskId, int id, EventHandler eventHandler,
        TaskAttemptListener taskAttemptListener, Path jobFile, int partition,
        JobConf conf, OutputCommitter committer,
        Token<JobTokenIdentifier> jobToken,
        Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock) {
      super(taskId, id, eventHandler, taskAttemptListener, jobFile, partition, conf,
          dataLocations, committer, jobToken, fsTokens, clock);
      attemptId = Records.newRecord(TaskAttemptId.class);
      attemptId.setId(id);
      attemptId.setTaskId(taskId);
    }

    public TaskAttemptId getAttemptId() {
      return attemptId;
    }
    
    @Override
    protected Task createRemoteTask() {
      return new MockTask();
    }    
    
    public float getProgress() {
      return progress ;
    }
    
    public void setProgress(float progress) {
      this.progress = progress;
    }
    
    public void setState(TaskAttemptState state) {
      this.state = state;
    }
    
    public TaskAttemptState getState() {
      return state;
    }
    
  }
  
  private class MockTask extends Task {

    @Override
    public void run(JobConf job, TaskUmbilicalProtocol umbilical)
        throws IOException, ClassNotFoundException, InterruptedException {
      return;
    }

    @Override
    public boolean isMapTask() {
      return true;
    }    
    
  }
  
  @Before 
  @SuppressWarnings("unchecked")
  public void setup() {
     dispatcher = new InlineDispatcher();
    
    ++startCount;
    
    conf = new JobConf();
    taskAttemptListener = mock(TaskAttemptListener.class);
    committer = mock(OutputCommitter.class);
    jobToken = (Token<JobTokenIdentifier>) mock(Token.class);
    remoteJobConfFile = mock(Path.class);
    fsTokens = null;
    clock = new SystemClock();
    metrics = mock(MRAppMetrics.class);  
    dataLocations = new String[1];
    
    appId = Records.newRecord(ApplicationId.class);
    appId.setClusterTimestamp(System.currentTimeMillis());
    appId.setId(1);

    jobId = Records.newRecord(JobId.class);
    jobId.setId(1);
    jobId.setAppId(appId);

    taskSplitMetaInfo = mock(TaskSplitMetaInfo.class);
    when(taskSplitMetaInfo.getLocations()).thenReturn(dataLocations); 
    
    taskAttempts = new ArrayList<MockTaskAttemptImpl>();
    
    mockTask = new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(),
        remoteJobConfFile, conf, taskAttemptListener, committer, jobToken,
        fsTokens, clock, 
        completedTasksFromPreviousRun, startCount,
        metrics);        
    
  }

  @After 
  public void teardown() {
    taskAttempts.clear();
  }
  
  private TaskId getNewTaskID() {
    TaskId taskId = Records.newRecord(TaskId.class);
    taskId.setId(++taskCounter);
    taskId.setJobId(jobId);
    taskId.setTaskType(mockTask.getType());    
    return taskId;
  }
  
  private void scheduleTaskAttempt(TaskId taskId) {
    mockTask.handle(new TaskEvent(taskId, 
        TaskEventType.T_SCHEDULE));
    assertTaskScheduledState();
  }
  
  private void killTask(TaskId taskId) {
    mockTask.handle(new TaskEvent(taskId, 
        TaskEventType.T_KILL));
    assertTaskKillWaitState();
  }
  
  private void killScheduledTaskAttempt(TaskAttemptId attemptId) {
    mockTask.handle(new TaskTAttemptEvent(attemptId, 
        TaskEventType.T_ATTEMPT_KILLED));
    assertTaskScheduledState();
  }

  private void launchTaskAttempt(TaskAttemptId attemptId) {
    mockTask.handle(new TaskTAttemptEvent(attemptId, 
        TaskEventType.T_ATTEMPT_LAUNCHED));
    assertTaskRunningState();    
  }
  
  private void commitTaskAttempt(TaskAttemptId attemptId) {
    mockTask.handle(new TaskTAttemptEvent(attemptId, 
        TaskEventType.T_ATTEMPT_COMMIT_PENDING));
    assertTaskRunningState();    
  }
  
  private MockTaskAttemptImpl getLastAttempt() {
    return taskAttempts.get(taskAttempts.size()-1);
  }
  
  private void updateLastAttemptProgress(float p) {    
    getLastAttempt().setProgress(p);
  }

  private void updateLastAttemptState(TaskAttemptState s) {
    getLastAttempt().setState(s);
  }
  
  private void killRunningTaskAttempt(TaskAttemptId attemptId) {
    mockTask.handle(new TaskTAttemptEvent(attemptId, 
        TaskEventType.T_ATTEMPT_KILLED));
    assertTaskRunningState();  
  }
  
  private void failRunningTaskAttempt(TaskAttemptId attemptId) {
    mockTask.handle(new TaskTAttemptEvent(attemptId, 
        TaskEventType.T_ATTEMPT_FAILED));
    assertTaskRunningState();
  }
  
  /**
   * {@link TaskState#NEW}
   */
  private void assertTaskNewState() {
    assertEquals(TaskState.NEW, mockTask.getState());
  }
  
  /**
   * {@link TaskState#SCHEDULED}
   */
  private void assertTaskScheduledState() {
    assertEquals(TaskState.SCHEDULED, mockTask.getState());
  }

  /**
   * {@link TaskState#RUNNING}
   */
  private void assertTaskRunningState() {
    assertEquals(TaskState.RUNNING, mockTask.getState());
  }
    
  /**
   * {@link TaskState#KILL_WAIT}
   */
  private void assertTaskKillWaitState() {
    assertEquals(TaskState.KILL_WAIT, mockTask.getState());
  }
  
  /**
   * {@link TaskState#SUCCEEDED}
   */
  private void assertTaskSucceededState() {
    assertEquals(TaskState.SUCCEEDED, mockTask.getState());
  }
  
  @Test
  public void testInit() {
    LOG.info("--- START: testInit ---");
    assertTaskNewState();
    assert(taskAttempts.size() == 0);
  }

  @Test
  /**
   * {@link TaskState#NEW}->{@link TaskState#SCHEDULED}
   */
  public void testScheduleTask() {
    LOG.info("--- START: testScheduleTask ---");
    TaskId taskId = getNewTaskID();
    scheduleTaskAttempt(taskId);
  }
  
  @Test 
  /**
   * {@link TaskState#SCHEDULED}->{@link TaskState#KILL_WAIT}
   */
  public void testKillScheduledTask() {
    LOG.info("--- START: testKillScheduledTask ---");
    TaskId taskId = getNewTaskID();
    scheduleTaskAttempt(taskId);
    killTask(taskId);
  }
  
  @Test 
  /**
   * Kill attempt
   * {@link TaskState#SCHEDULED}->{@link TaskState#SCHEDULED}
   */
  public void testKillScheduledTaskAttempt() {
    LOG.info("--- START: testKillScheduledTaskAttempt ---");
    TaskId taskId = getNewTaskID();
    scheduleTaskAttempt(taskId);
    killScheduledTaskAttempt(getLastAttempt().getAttemptId());
  }
  
  @Test 
  /**
   * Launch attempt
   * {@link TaskState#SCHEDULED}->{@link TaskState#RUNNING}
   */
  public void testLaunchTaskAttempt() {
    LOG.info("--- START: testLaunchTaskAttempt ---");
    TaskId taskId = getNewTaskID();
    scheduleTaskAttempt(taskId);
    launchTaskAttempt(getLastAttempt().getAttemptId());
  }

  @Test
  /**
   * Kill running attempt
   * {@link TaskState#RUNNING}->{@link TaskState#RUNNING} 
   */
  public void testKillRunningTaskAttempt() {
    LOG.info("--- START: testKillRunningTaskAttempt ---");
    TaskId taskId = getNewTaskID();
    scheduleTaskAttempt(taskId);
    launchTaskAttempt(getLastAttempt().getAttemptId());
    killRunningTaskAttempt(getLastAttempt().getAttemptId());    
  }

  @Test 
  public void testTaskProgress() {
    LOG.info("--- START: testTaskProgress ---");
        
    // launch task
    TaskId taskId = getNewTaskID();
    scheduleTaskAttempt(taskId);
    float progress = 0f;
    assert(mockTask.getProgress() == progress);
    launchTaskAttempt(getLastAttempt().getAttemptId());    
    
    // update attempt1 
    progress = 50f;
    updateLastAttemptProgress(progress);
    assert(mockTask.getProgress() == progress);
    progress = 100f;
    updateLastAttemptProgress(progress);
    assert(mockTask.getProgress() == progress);
    
    progress = 0f;
    // mark first attempt as killed
    updateLastAttemptState(TaskAttemptState.KILLED);
    assert(mockTask.getProgress() == progress);

    // kill first attempt 
    // should trigger a new attempt
    // as no successful attempts 
    killRunningTaskAttempt(getLastAttempt().getAttemptId());
    assert(taskAttempts.size() == 2);
    
    assert(mockTask.getProgress() == 0f);
    launchTaskAttempt(getLastAttempt().getAttemptId());
    progress = 50f;
    updateLastAttemptProgress(progress);
    assert(mockTask.getProgress() == progress);
        
  }
  
  @Test
  public void testFailureDuringTaskAttemptCommit() {
    TaskId taskId = getNewTaskID();
    scheduleTaskAttempt(taskId);
    launchTaskAttempt(getLastAttempt().getAttemptId());
    updateLastAttemptState(TaskAttemptState.COMMIT_PENDING);
    commitTaskAttempt(getLastAttempt().getAttemptId());

    // During the task attempt commit there is an exception which causes
    // the attempt to fail
    updateLastAttemptState(TaskAttemptState.FAILED);
    failRunningTaskAttempt(getLastAttempt().getAttemptId());

    assertEquals(2, taskAttempts.size());
    updateLastAttemptState(TaskAttemptState.SUCCEEDED);
    commitTaskAttempt(getLastAttempt().getAttemptId());
    mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(), 
        TaskEventType.T_ATTEMPT_SUCCEEDED));
    
    assertFalse("First attempt should not commit",
        mockTask.canCommit(taskAttempts.get(0).getAttemptId()));
    assertTrue("Second attempt should commit",
        mockTask.canCommit(getLastAttempt().getAttemptId()));

    assertTaskSucceededState();
  }

}
