/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;

import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import com.google.common.collect.Lists;

import org.apache.tez.http.HttpConnection;
import org.apache.tez.http.HttpConnectionParams;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.runtime.library.common.CompositeInputAttemptIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.security.SecureShuffleUtils;
import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
import org.apache.tez.runtime.library.exceptions.FetcherReadTimeoutException;
import org.apache.tez.runtime.library.testutils.RuntimeTestUtils;
import org.apache.tez.runtime.library.common.shuffle.InputAttemptFetchFailure;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import org.apache.tez.runtime.library.common.shuffle.api.ShuffleHandlerError;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;


public class TestFetcher {

  public static final String SHUFFLE_INPUT_FILE_PREFIX = "shuffle_input_file_";
  public static final String HOST = "localhost";
  public static final int PORT = 65;
  public static final int DAG_ID = 1;
  public static final String APP_ID = "application_1234_1";

  private TezCounters tezCounters = new TezCounters();
  private TezCounter ioErrsCounter = tezCounters.findCounter(ShuffleScheduler.SHUFFLE_ERR_GRP_NAME,
      ShuffleScheduler.ShuffleErrors.IO_ERROR.toString());
  private TezCounter wrongLengthErrsCounter =
      tezCounters.findCounter(ShuffleScheduler.SHUFFLE_ERR_GRP_NAME,
          ShuffleScheduler.ShuffleErrors.WRONG_LENGTH.toString());
  private TezCounter badIdErrsCounter =
      tezCounters.findCounter(ShuffleScheduler.SHUFFLE_ERR_GRP_NAME,
          ShuffleScheduler.ShuffleErrors.BAD_ID.toString());
  private TezCounter wrongMapErrsCounter =
      tezCounters.findCounter(ShuffleScheduler.SHUFFLE_ERR_GRP_NAME,
          ShuffleScheduler.ShuffleErrors.WRONG_MAP.toString());
  private TezCounter connectionErrsCounter =
      tezCounters.findCounter(ShuffleScheduler.SHUFFLE_ERR_GRP_NAME,
          ShuffleScheduler.ShuffleErrors.CONNECTION.toString());
  private TezCounter wrongReduceErrsCounter =
      tezCounters.findCounter(ShuffleScheduler.SHUFFLE_ERR_GRP_NAME,
          ShuffleScheduler.ShuffleErrors.WRONG_REDUCE.toString());

  static final Logger LOG = LoggerFactory.getLogger(TestFetcher.class);

  @Test(timeout = 5000)
  public void testInputsReturnedOnConnectionException() throws Exception {
    Configuration conf = new TezConfiguration();
    ShuffleScheduler scheduler = mock(ShuffleScheduler.class);
    MergeManager merger = mock(MergeManager.class);

    Shuffle shuffle = mock(Shuffle.class);

    InputContext inputContext = mock(InputContext.class);
    doReturn(new TezCounters()).when(inputContext).getCounters();
    doReturn("src vertex").when(inputContext).getSourceVertexName();

    MapHost mapHost = new MapHost(HOST, PORT, 0, 1);
    InputAttemptIdentifier inputAttemptIdentifier = new InputAttemptIdentifier(0, 0, "attempt");
    mapHost.addKnownMap(inputAttemptIdentifier);
    List<InputAttemptIdentifier> mapsForHost = Lists.newArrayList(inputAttemptIdentifier);
    doReturn(mapsForHost).when(scheduler).getMapsForHost(mapHost);

    FetcherOrderedGrouped fetcher =
        new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null, false, 0, null, conf,
            getRawFs(conf), false, HOST, PORT, "src vertex", mapHost, ioErrsCounter,
            wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter, connectionErrsCounter,
            wrongReduceErrsCounter, APP_ID, DAG_ID, false, false, true, false);

    fetcher.call();
    verify(scheduler).getMapsForHost(mapHost);
    verify(scheduler).freeHost(mapHost);
    verify(scheduler).putBackKnownMapOutput(mapHost, inputAttemptIdentifier);
  }


  @Test(timeout = 5000)
  public void testLocalFetchModeSetting1() throws Exception {
    Configuration conf = new TezConfiguration();
    ShuffleScheduler scheduler = mock(ShuffleScheduler.class);
    MergeManager merger = mock(MergeManager.class);
    Shuffle shuffle = mock(Shuffle.class);

    InputContext inputContext = mock(InputContext.class);
    doReturn(new TezCounters()).when(inputContext).getCounters();
    doReturn("src vertex").when(inputContext).getSourceVertexName();

    final boolean ENABLE_LOCAL_FETCH = true;
    final boolean DISABLE_LOCAL_FETCH = false;
    MapHost mapHost = new MapHost(HOST, PORT, 0, 1);
    FetcherOrderedGrouped fetcher =
        new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null, false, 0, null, conf,
            getRawFs(conf), ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter,
            wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter, connectionErrsCounter,
            wrongReduceErrsCounter, APP_ID, DAG_ID, false, false, true, false);

    // when local mode is enabled and host and port matches use local fetch
    FetcherOrderedGrouped spyFetcher = spy(fetcher);
    doNothing().when(spyFetcher).setupLocalDiskFetch(mapHost);

    spyFetcher.fetchNext();

    verify(spyFetcher, times(1)).setupLocalDiskFetch(mapHost);
    verify(spyFetcher, never()).copyFromHost(any(MapHost.class));

    // if hostname does not match use http
    mapHost = new MapHost(HOST + "_OTHER", PORT, 0, 1);
    fetcher =
        new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null, false, 0, null, conf,
            getRawFs(conf), ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter,
            wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter, connectionErrsCounter,
            wrongReduceErrsCounter, APP_ID, DAG_ID, false, false, true, false);
    spyFetcher = spy(fetcher);
    doNothing().when(spyFetcher).setupLocalDiskFetch(mapHost);

    spyFetcher.fetchNext();

    verify(spyFetcher, never()).setupLocalDiskFetch(any(MapHost.class));
    verify(spyFetcher, times(1)).copyFromHost(mapHost);

    // if port does not match use http
    mapHost = new MapHost(HOST, PORT + 1, 0, 1);
    fetcher =
        new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null, false, 0, null, conf,
            getRawFs(conf), ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter,
            wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter, connectionErrsCounter,
            wrongReduceErrsCounter, APP_ID, DAG_ID, false, false, true, false);
    spyFetcher = spy(fetcher);
    doNothing().when(spyFetcher).setupLocalDiskFetch(mapHost);

    spyFetcher.fetchNext();

    verify(spyFetcher, never()).setupLocalDiskFetch(any(MapHost.class));
    verify(spyFetcher, times(1)).copyFromHost(mapHost);

    //if local fetch is not enabled
    mapHost = new MapHost(HOST, PORT, 0, 1);
    fetcher = new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null, false, 0, null,
        conf, getRawFs(conf), DISABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter,
        wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter, connectionErrsCounter,
        wrongReduceErrsCounter, APP_ID, DAG_ID, false, false, true, false);
    spyFetcher = spy(fetcher);
    doNothing().when(spyFetcher).setupLocalDiskFetch(mapHost);

    spyFetcher.fetchNext();

    verify(spyFetcher, never()).setupLocalDiskFetch(any(MapHost.class));
    verify(spyFetcher, times(1)).copyFromHost(mapHost);
  }

  @Test(timeout = 5000)
  public void testSetupLocalDiskFetch() throws Exception {
    Configuration conf = new TezConfiguration();
    ShuffleScheduler scheduler = mock(ShuffleScheduler.class);
    MergeManager merger = mock(MergeManager.class);
    Shuffle shuffle = mock(Shuffle.class);
    InputContext inputContext = mock(InputContext.class);
    when(inputContext.getCounters()).thenReturn(new TezCounters());
    when(inputContext.getSourceVertexName()).thenReturn("");

    MapHost host = new MapHost(HOST, PORT, 1, 1);
    FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(null, scheduler, merger, shuffle,
        null, false, 0, null, conf, getRawFs(conf), true, HOST, PORT, "src vertex", host,
        ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter,
        connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false, false, true, false);
    FetcherOrderedGrouped spyFetcher = spy(fetcher);


    final List<CompositeInputAttemptIdentifier> srcAttempts = Arrays.asList(
        new CompositeInputAttemptIdentifier(0, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0", 1),
        new CompositeInputAttemptIdentifier(1, 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1", 1),
        new CompositeInputAttemptIdentifier(2, 3, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_2", 1),
        new CompositeInputAttemptIdentifier(3, 4, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3", 1),
        new CompositeInputAttemptIdentifier(4, 4, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_4", 1)
    );
    final int FIRST_FAILED_ATTEMPT_IDX = 2;
    final int SECOND_FAILED_ATTEMPT_IDX = 4;
    final int[] successfulAttemptsIndexes = { 0, 1, 3 };

    doReturn(srcAttempts).when(scheduler).getMapsForHost(host);

    final ConcurrentMap<ShuffleScheduler.PathPartition, InputAttemptIdentifier> pathToIdentifierMap = new ConcurrentHashMap<ShuffleScheduler.PathPartition, InputAttemptIdentifier>();
    for (CompositeInputAttemptIdentifier srcAttempt : srcAttempts) {
      for (int i = 0; i < srcAttempt.getInputIdentifierCount(); i++) {
        ShuffleScheduler.PathPartition pathPartition = new ShuffleScheduler.PathPartition(srcAttempt.getPathComponent(), host.getPartitionId() + i);
        pathToIdentifierMap.put(pathPartition, srcAttempt.expand(i));
        }
      }
    doAnswer(new Answer<InputAttemptIdentifier>() {
      @Override
      public InputAttemptIdentifier answer(InvocationOnMock invocation) throws Throwable {
        Object[] args = invocation.getArguments();
        String path = (String) args[0];
        int reduceId = (int) args[1];
        return pathToIdentifierMap.get(new ShuffleScheduler.PathPartition(path, reduceId));
      }
    }).when(scheduler)
        .getIdentifierForFetchedOutput(any(String.class), any(int.class));

    doAnswer(new Answer<MapOutput>() {
      @Override
      public MapOutput answer(InvocationOnMock invocation) throws Throwable {
        Object[] args = invocation.getArguments();
        MapOutput mapOutput = mock(MapOutput.class);
        doReturn(MapOutput.Type.DISK_DIRECT).when(mapOutput).getType();
        doReturn(args[0]).when(mapOutput).getAttemptIdentifier();
        return mapOutput;
      }
    }).when(spyFetcher)
        .getMapOutputForDirectDiskFetch(any(InputAttemptIdentifier.class), any(Path.class),
            any(TezIndexRecord.class));

    doAnswer(new Answer<Path>() {
      @Override
      public Path answer(InvocationOnMock invocation) throws Throwable {
        Object[] args = invocation.getArguments();
        return new Path(SHUFFLE_INPUT_FILE_PREFIX + args[0]);
      }
    }).when(spyFetcher).getShuffleInputFileName(anyString(), anyString());

    for (int i = 0; i < host.getPartitionCount(); i++) {
      doAnswer(new Answer<TezIndexRecord>() {
        @Override
        public TezIndexRecord answer(InvocationOnMock invocation) throws Throwable {
          Object[] args = invocation.getArguments();
          String pathComponent = (String) args[0];
          int len = pathComponent.length();
          long p = Long.valueOf(pathComponent.substring(len - 1, len));
          if (p == FIRST_FAILED_ATTEMPT_IDX || p == SECOND_FAILED_ATTEMPT_IDX) {
            throw new IOException("failing to simulate failure case");
          }
          // match with params for copySucceeded below.
          return new TezIndexRecord(p * 10, (p+1) * 1000, (p+2) * 100);
        }
      }).when(spyFetcher).getIndexRecord(anyString(), eq(host.getPartitionId() + i));
    }

    doNothing().when(scheduler).copySucceeded(any(InputAttemptIdentifier.class), any(MapHost.class),
        anyLong(), anyLong(), anyLong(), any(MapOutput.class), anyBoolean());
    doNothing().when(scheduler).putBackKnownMapOutput(host,
        srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX));
    doNothing().when(scheduler).putBackKnownMapOutput(host,
        srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX));

    spyFetcher.setupLocalDiskFetch(host);

    // should have exactly 3 success and 1 failure.
    for (int i : successfulAttemptsIndexes) {
      for (int j = 0; j < host.getPartitionCount(); j++) {
        verifyCopySucceeded(scheduler, host, srcAttempts, i, j);
      }
    }
    verify(scheduler).copyFailed(
        eq(InputAttemptFetchFailure.fromLocalFetchFailure(srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX).expand(0))),
        eq(host), eq(true), eq(false));
    verify(scheduler).copyFailed(
        eq(InputAttemptFetchFailure.fromLocalFetchFailure(srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX).expand(0))),
        eq(host), eq(true), eq(false));

    verify(spyFetcher).putBackRemainingMapOutputs(host);
    verify(scheduler).putBackKnownMapOutput(host, srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX));
    verify(scheduler).putBackKnownMapOutput(host, srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX));
  }

  @Test (timeout = 5000)
  public void testSetupLocalDiskFetchEmptyPartitions() throws Exception {
    Configuration conf = new TezConfiguration();
    ShuffleScheduler scheduler = mock(ShuffleScheduler.class);
    MergeManager merger = mock(MergeManager.class);
    Shuffle shuffle = mock(Shuffle.class);
    InputContext inputContext = mock(InputContext.class);
    when(inputContext.getCounters()).thenReturn(new TezCounters());
    when(inputContext.getSourceVertexName()).thenReturn("");

    MapHost host = new MapHost(HOST, PORT, 1, 1);
    FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(null, scheduler, merger, shuffle,
        null, false, 0, null, conf, getRawFs(conf), true, HOST, PORT, "src vertex", host,
        ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter,
        connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false, false, true, false);
    FetcherOrderedGrouped spyFetcher = spy(fetcher);

    final List<CompositeInputAttemptIdentifier> srcAttempts = Arrays.asList(
        new CompositeInputAttemptIdentifier(0, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0", 1),
        new CompositeInputAttemptIdentifier(1, 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1", 1),
        new CompositeInputAttemptIdentifier(2, 3, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_2", 1),
        new CompositeInputAttemptIdentifier(3, 4, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3", 1),
        new CompositeInputAttemptIdentifier(4, 4, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_4", 1)
    );

    doReturn(srcAttempts).when(scheduler).getMapsForHost(host);

    final ConcurrentMap<ShuffleScheduler.PathPartition, InputAttemptIdentifier> pathToIdentifierMap = new ConcurrentHashMap<ShuffleScheduler.PathPartition, InputAttemptIdentifier>();
    for (CompositeInputAttemptIdentifier srcAttempt : srcAttempts) {
      for (int i = 0; i < srcAttempt.getInputIdentifierCount(); i++) {
        ShuffleScheduler.PathPartition pathPartition = new ShuffleScheduler.PathPartition(srcAttempt.getPathComponent(), host.getPartitionId() + i);
        pathToIdentifierMap.put(pathPartition, srcAttempt.expand(i));
      }
    }
    doAnswer(new Answer<InputAttemptIdentifier>() {
      @Override
      public InputAttemptIdentifier answer(InvocationOnMock invocation) throws Throwable {
        Object[] args = invocation.getArguments();
        String path = (String) args[0];
        int reduceId = (int) args[1];
        return pathToIdentifierMap.get(new ShuffleScheduler.PathPartition(path, reduceId));
      }
    }).when(scheduler)
        .getIdentifierForFetchedOutput(any(String.class), any(int.class));

    doAnswer(new Answer<Path>() {
      @Override
      public Path answer(InvocationOnMock invocation) throws Throwable {
        Object[] args = invocation.getArguments();
        return new Path(SHUFFLE_INPUT_FILE_PREFIX + args[0]);
      }
    }).when(spyFetcher).getShuffleInputFileName(anyString(), anyString());

    for (int i = 0; i < host.getPartitionCount(); i++) {
      doAnswer(new Answer<TezIndexRecord>() {
        @Override
        public TezIndexRecord answer(InvocationOnMock invocation) throws Throwable {
          Object[] args = invocation.getArguments();
          String pathComponent = (String) args[0];
          int len = pathComponent.length();
          long p = Long.valueOf(pathComponent.substring(len - 1, len));
          // match with params for copySucceeded below.
          return new TezIndexRecord(p * 10, 0, 0);
        }
      }).when(spyFetcher).getIndexRecord(anyString(), eq(host.getPartitionId() + i));
    }

    doNothing().when(scheduler).copySucceeded(any(InputAttemptIdentifier.class), any(MapHost.class),
        anyLong(), anyLong(), anyLong(), any(MapOutput.class), anyBoolean());
    spyFetcher.setupLocalDiskFetch(host);
    verify(scheduler, times(0)).copySucceeded(any(InputAttemptIdentifier.class), any(MapHost.class),
        anyLong(), anyLong(), anyLong(), any(MapOutput.class), anyBoolean());
    verify(spyFetcher).putBackRemainingMapOutputs(host);
  }

  @Test(timeout = 5000)
  public void testSetupLocalDiskFetchAutoReduce() throws Exception {
    Configuration conf = new TezConfiguration();
    ShuffleScheduler scheduler = mock(ShuffleScheduler.class);
    MergeManager merger = mock(MergeManager.class);
    Shuffle shuffle = mock(Shuffle.class);
    InputContext inputContext = mock(InputContext.class);
    when(inputContext.getCounters()).thenReturn(new TezCounters());
    when(inputContext.getSourceVertexName()).thenReturn("");

    MapHost host = new MapHost(HOST, PORT, 1, 2);
    FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(null, scheduler, merger, shuffle,
        null, false, 0, null, conf, getRawFs(conf), true, HOST, PORT, "src vertex", host,
        ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter,
        connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false, false, true, false);
    FetcherOrderedGrouped spyFetcher = spy(fetcher);


    final List<CompositeInputAttemptIdentifier> srcAttempts = Arrays.asList(
        new CompositeInputAttemptIdentifier(0, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0", host.getPartitionCount()),
        new CompositeInputAttemptIdentifier(1, 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1", host.getPartitionCount()),
        new CompositeInputAttemptIdentifier(2, 3, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_2", host.getPartitionCount()),
        new CompositeInputAttemptIdentifier(3, 4, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3", host.getPartitionCount()),
        new CompositeInputAttemptIdentifier(4, 4, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_4", host.getPartitionCount())
    );
    final int FIRST_FAILED_ATTEMPT_IDX = 2;
    final int SECOND_FAILED_ATTEMPT_IDX = 4;
    final int[] successfulAttemptsIndexes = { 0, 1, 3 };

    doReturn(srcAttempts).when(scheduler).getMapsForHost(host);
    final ConcurrentMap<ShuffleScheduler.PathPartition, InputAttemptIdentifier> pathToIdentifierMap
        = new ConcurrentHashMap<ShuffleScheduler.PathPartition, InputAttemptIdentifier>();
    for (CompositeInputAttemptIdentifier srcAttempt : srcAttempts) {
      for (int i = 0; i < srcAttempt.getInputIdentifierCount(); i++) {
        ShuffleScheduler.PathPartition pathPartition = new ShuffleScheduler.PathPartition(srcAttempt.getPathComponent(), host.getPartitionId() + i);
        pathToIdentifierMap.put(pathPartition, srcAttempt.expand(i));
      }
    }

    doAnswer(new Answer<InputAttemptIdentifier>() {
        @Override
        public InputAttemptIdentifier answer(InvocationOnMock invocation) throws Throwable {
          Object[] args = invocation.getArguments();
          String path = (String) args[0];
          int reduceId = (int) args[1];
          return pathToIdentifierMap.get(new ShuffleScheduler.PathPartition(path, reduceId));
        }
      }).when(scheduler)
          .getIdentifierForFetchedOutput(any(String.class), any(int.class));

    doAnswer(new Answer<MapOutput>() {
      @Override
      public MapOutput answer(InvocationOnMock invocation) throws Throwable {
        Object[] args = invocation.getArguments();
        MapOutput mapOutput = mock(MapOutput.class);
        doReturn(MapOutput.Type.DISK_DIRECT).when(mapOutput).getType();
        doReturn(args[0]).when(mapOutput).getAttemptIdentifier();
        return mapOutput;
      }
    }).when(spyFetcher)
        .getMapOutputForDirectDiskFetch(any(InputAttemptIdentifier.class), any(Path.class),
            any(TezIndexRecord.class));

    doAnswer(new Answer<Path>() {
      @Override
      public Path answer(InvocationOnMock invocation) throws Throwable {
        Object[] args = invocation.getArguments();
        return new Path(SHUFFLE_INPUT_FILE_PREFIX + args[0]);
      }
    }).when(spyFetcher).getShuffleInputFileName(anyString(), anyString());

    for (int i = 0; i < host.getPartitionCount(); i++) {
      doAnswer(new Answer<TezIndexRecord>() {
        @Override
        public TezIndexRecord answer(InvocationOnMock invocation) throws Throwable {
          Object[] args = invocation.getArguments();
          String pathComponent = (String) args[0];
          int len = pathComponent.length();
          long p = Long.valueOf(pathComponent.substring(len - 1, len));

          if (pathComponent.equals(srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX).getPathComponent()) ||
              pathComponent.equals(srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX).getPathComponent())) {
            throw new IOException("Thowing exception to simulate failure case");
          }
          // match with params for copySucceeded below.
          return new TezIndexRecord(p * 10, (p + 1) * 1000, (p + 2) * 100);
        }
      }).when(spyFetcher).getIndexRecord(anyString(), eq(host.getPartitionId() + i));
    }

    doNothing().when(scheduler).copySucceeded(any(InputAttemptIdentifier.class), any(MapHost.class),
        anyLong(), anyLong(), anyLong(), any(MapOutput.class), anyBoolean());
    doNothing().when(scheduler).putBackKnownMapOutput(host,
        srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX).expand(0));
    doNothing().when(scheduler).putBackKnownMapOutput(host,
        srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX).expand(1));
    doNothing().when(scheduler).putBackKnownMapOutput(host,
        srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX).expand(0));
    doNothing().when(scheduler).putBackKnownMapOutput(host,
        srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX).expand(1));

    spyFetcher.setupLocalDiskFetch(host);

    // should have exactly 3 success and 1 failure.
    for (int i : successfulAttemptsIndexes) {
      for (int j = 0; j < host.getPartitionCount(); j++) {
        verifyCopySucceeded(scheduler, host, srcAttempts, i, j);
      }
    }
    verify(scheduler).copyFailed(
        eq(InputAttemptFetchFailure.fromLocalFetchFailure(srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX).expand(0))),
        eq(host), eq(true), eq(false));
    verify(scheduler).copyFailed(
        eq(InputAttemptFetchFailure.fromLocalFetchFailure(srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX).expand(1))),
        eq(host), eq(true), eq(false));
    verify(scheduler).copyFailed(eq(
        InputAttemptFetchFailure.fromLocalFetchFailure(srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX).expand(0))),
        eq(host), eq(true), eq(false));
    verify(scheduler).copyFailed(eq(
        InputAttemptFetchFailure.fromLocalFetchFailure(srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX).expand(1))),
        eq(host), eq(true), eq(false));

    verify(spyFetcher).putBackRemainingMapOutputs(host);
    verify(scheduler).putBackKnownMapOutput(host, srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX));
    verify(scheduler).putBackKnownMapOutput(host, srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX));
    verify(scheduler).putBackKnownMapOutput(host, srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX));
    verify(scheduler).putBackKnownMapOutput(host, srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX));
  }

  private void verifyCopySucceeded(ShuffleScheduler scheduler, MapHost host,
      List<CompositeInputAttemptIdentifier> srcAttempts, long p, int j) throws
      IOException {
    // need to verify filename, offsets, sizes wherever they are used.
    InputAttemptIdentifier srcAttemptToMatch = srcAttempts.get((int) p).expand(j);
    String filenameToMatch = SHUFFLE_INPUT_FILE_PREFIX + srcAttemptToMatch.getPathComponent();
    ArgumentCaptor<MapOutput> captureMapOutput = ArgumentCaptor.forClass(MapOutput.class);
    verify(scheduler).copySucceeded(eq(srcAttemptToMatch), eq(host), eq((p+2) * 100),
        eq((p+1) * 1000), anyLong(), captureMapOutput.capture(), anyBoolean());

    // cannot use the equals of MapOutput as it compares id which is private. so doing it manually
    MapOutput m = captureMapOutput.getAllValues().get(0);
    Assert.assertTrue(m.getType().equals(MapOutput.Type.DISK_DIRECT) &&
        m.getAttemptIdentifier().equals(srcAttemptToMatch));
  }

  static class FakeHttpConnection extends HttpConnection {

    public FakeHttpConnection(URL url,
        HttpConnectionParams connParams, String logIdentifier, JobTokenSecretManager jobTokenSecretMgr)
        throws IOException {
      super(url, connParams, logIdentifier, jobTokenSecretMgr);
      this.connection = mock(HttpURLConnection.class);
      when(connection.getResponseCode()).thenReturn(200);
      when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME))
          .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
      when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION))
          .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
      when(connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH)).thenReturn("");
    }

    public DataInputStream getInputStream() throws IOException {
      byte[] b = new byte[1024];
      ByteArrayInputStream bin = new ByteArrayInputStream(b);
      return new DataInputStream(bin);
    }

    public void validate() throws IOException {
      //empty
    }

    public void cleanup(boolean disconnect) throws IOException {
      LOG.info("HttpConnection cleanup called with disconnect=" + disconnect);
      //ignore
    }
  }

  @Test(timeout = 5000)
  @SuppressWarnings("unchecked")
  public void testWithRetry() throws Exception {
    Configuration conf = new TezConfiguration();
    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT, 3000);
    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_CONNECT_TIMEOUT, 3000);
    ShuffleScheduler scheduler = mock(ShuffleScheduler.class);
    MergeManager merger = mock(MergeManager.class);

    Shuffle shuffle = mock(Shuffle.class);
    InputContext inputContext = mock(InputContext.class);
    when(inputContext.getCounters()).thenReturn(new TezCounters());
    when(inputContext.getSourceVertexName()).thenReturn("");
    when(inputContext.getApplicationId()).thenReturn(ApplicationId.newInstance(0, 1));

    HttpConnectionParams httpConnectionParams = ShuffleUtils.getHttpConnectionParams(conf);
    final MapHost host = new MapHost(HOST, PORT, 1, 1);
    FetcherOrderedGrouped mockFetcher = new FetcherOrderedGrouped(null, scheduler, merger, shuffle,
        null, false, 0, null, conf, getRawFs(conf), false, HOST, PORT, "src vertex", host,
        ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter,
        connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false, false, true, false);
    final FetcherOrderedGrouped fetcher = spy(mockFetcher);


    final List<InputAttemptIdentifier> srcAttempts = Arrays.asList(
        new InputAttemptIdentifier(0, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0"),
        new InputAttemptIdentifier(1, 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1"),
        new InputAttemptIdentifier(3, 4, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3")
    );
    doReturn(srcAttempts).when(scheduler).getMapsForHost(host);
    doReturn(true).when(fetcher).setupConnection(any(MapHost.class), any(Collection.class));

    URL url = ShuffleUtils.constructInputURL("http://" + HOST + ":" + PORT + "/mapOutput?job=job_123&&reduce=1&map=", srcAttempts, false);
    fetcher.httpConnection = new FakeHttpConnection(url, null, "", null);

    doAnswer(new Answer<MapOutput>() {
      @Override
      public MapOutput answer(InvocationOnMock invocation) throws Throwable {
        Object[] args = invocation.getArguments();
        MapOutput mapOutput = mock(MapOutput.class);
        doReturn(MapOutput.Type.MEMORY).when(mapOutput).getType();
        doReturn(args[0]).when(mapOutput).getAttemptIdentifier();
        return mapOutput;
      }
    }).when(merger).reserve(any(InputAttemptIdentifier.class), anyInt(), anyInt(), anyInt());

    //Create read timeout when reading data
    doAnswer(new Answer<Void>() {
      @Override public Void answer(InvocationOnMock invocation) throws Throwable {
        doReturn(false).when(fetcher).setupConnection(any(MapHost.class), any(Collection.class));
        // Simulate read timeout by throwing proper exception
        throw new FetcherReadTimeoutException("creating fetcher socket read timeout exception");
      }
    }).when(fetcher).copyMapOutput(any(MapHost.class), any(DataInputStream.class), any(InputAttemptIdentifier.class));

    try {
      fetcher.copyFromHost(host);
    } catch(IOException e) {
      //ignore
    }
    //setup connection should be called twice (1 for connect and another for retry)
    verify(fetcher, times(2)).setupConnection(any(MapHost.class), any(Collection.class));
    //since copyMapOutput consistently fails, it should call copyFailed once
    verify(scheduler, times(1)).copyFailed(any(InputAttemptFetchFailure.class), any(MapHost.class),
          anyBoolean(), anyBoolean());

    verify(fetcher, times(1)).putBackRemainingMapOutputs(any(MapHost.class));
    verify(scheduler, times(3)).putBackKnownMapOutput(any(MapHost.class),
        any(InputAttemptIdentifier.class));


    //Verify by stopping the fetcher abruptly
    try {
      fetcher.stopped = false; // flag to indicate fetcher stopped
      fetcher.copyFromHost(host);
      verify(fetcher, times(2)).putBackRemainingMapOutputs(any(MapHost.class));
    } catch(IOException e) {
      //ignore
    }

  }

  @Test
  @SuppressWarnings("unchecked")
  public void testAsyncWithException() throws Exception {
    Configuration conf = new TezConfiguration();
    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT, 3000);
    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_CONNECT_TIMEOUT, 3000);

    ShuffleScheduler scheduler = mock(ShuffleScheduler.class);
    MergeManager merger = mock(MergeManager.class);
    Shuffle shuffle = mock(Shuffle.class);

    TezCounters counters = new TezCounters();
    InputContext inputContext = mock(InputContext.class);
    when(inputContext.getCounters()).thenReturn(counters);
    when(inputContext.getSourceVertexName()).thenReturn("");

    JobTokenSecretManager jobMgr = mock(JobTokenSecretManager.class);
    doReturn(new byte[10]).when(jobMgr).computeHash(any(byte[].class));

    HttpConnectionParams httpConnectionParams = ShuffleUtils.getHttpConnectionParams(conf);
    final MapHost host = new MapHost(HOST, PORT, 1, 1);
    FetcherOrderedGrouped mockFetcher =
        new FetcherOrderedGrouped(httpConnectionParams, scheduler, merger, shuffle, jobMgr, false,
            0, null, conf, getRawFs(conf), false, HOST, PORT, "src vertex", host, ioErrsCounter,
            wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter, connectionErrsCounter,
            wrongReduceErrsCounter, APP_ID, DAG_ID, true, false, true, false);
    final FetcherOrderedGrouped fetcher = spy(mockFetcher);
    fetcher.remaining = new LinkedHashMap<String, InputAttemptIdentifier>();
    final List<InputAttemptIdentifier> srcAttempts = Arrays.asList(
        new InputAttemptIdentifier(0, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0"),
        new InputAttemptIdentifier(1, 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1"),
        new InputAttemptIdentifier(3, 4, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3")
    );
    doReturn(srcAttempts).when(scheduler).getMapsForHost(host);

    try {
      long currentIOErrors = ioErrsCounter.getValue();
      boolean connected = fetcher.setupConnection(host, srcAttempts);
      Assert.assertTrue(connected == false);
      //Ensure that counters are incremented (i.e it followed the exception codepath)
      Assert.assertTrue(ioErrsCounter.getValue() > currentIOErrors);
    } catch (IOException e) {
      fail();
    }
  }

  @Test(timeout = 5000)
  public void testInputAttemptIdentifierMap() {
    InputAttemptIdentifier[] srcAttempts = {
      new InputAttemptIdentifier(0, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0",
          false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0),
          //duplicate entry
      new InputAttemptIdentifier(0, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0",
          false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0),
      // pipeline shuffle based identifiers, with multiple attempts
      new InputAttemptIdentifier(1, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1",
          false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0),
      new InputAttemptIdentifier(1, 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1",
          false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0),
      new InputAttemptIdentifier(1, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_2",
          false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1),
      new InputAttemptIdentifier(1, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3",
          false, InputAttemptIdentifier.SPILL_INFO.FINAL_UPDATE, 2),
      new InputAttemptIdentifier(2, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3",
          false, InputAttemptIdentifier.SPILL_INFO.FINAL_MERGE_ENABLED, 0)
    };
    InputAttemptIdentifier[] expectedSrcAttempts = {
      new InputAttemptIdentifier(0, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0",
          false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0),
      // pipeline shuffle based identifiers
      new InputAttemptIdentifier(1, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1",
          false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0),
      new InputAttemptIdentifier(1, 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1",
          false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0),
      new InputAttemptIdentifier(1, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_2",
          false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1),
      new InputAttemptIdentifier(1, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3",
          false, InputAttemptIdentifier.SPILL_INFO.FINAL_UPDATE, 2),
      new InputAttemptIdentifier(2, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3",
          false, InputAttemptIdentifier.SPILL_INFO.FINAL_MERGE_ENABLED, 0)
    };

    Configuration conf = new TezConfiguration();
    ShuffleScheduler scheduler = mock(ShuffleScheduler.class);
    MergeManager merger = mock(MergeManager.class);
    Shuffle shuffle = mock(Shuffle.class);
    MapHost mapHost = new MapHost(HOST, PORT, 0, 1);
    FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(null, scheduler, merger, shuffle,
        null, false, 0, null, conf, getRawFs(conf), false, HOST, PORT, "src vertex", mapHost,
        ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter,
        connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false, false, true, false);
    fetcher.populateRemainingMap(new LinkedList<InputAttemptIdentifier>(Arrays.asList(srcAttempts)));
    Assert.assertEquals(expectedSrcAttempts.length, fetcher.remaining.size());
    Iterator<Entry<String, InputAttemptIdentifier>> iterator = fetcher.remaining.entrySet().iterator();
    int count = 0;
    while(iterator.hasNext()) {
      String key = iterator.next().getKey();
      Assert.assertTrue(expectedSrcAttempts[count++].toString().compareTo(key) == 0);
    }
  }

  @Test
  public void testShuffleHandlerDiskErrorOrdered()
      throws Exception {
    MapHost mapHost = new MapHost(HOST, PORT, 0, 1);
    InputAttemptIdentifier inputAttemptIdentifier = new InputAttemptIdentifier(0, 0, "attempt");

    FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(null, null, null, null, null, false,
        0, null, new TezConfiguration(), null, false, HOST, PORT, "src vertex", mapHost,
        ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter,
        connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false, false, true, false);
    fetcher.remaining = new HashMap<String, InputAttemptIdentifier>();

    ShuffleHeader header =
        new ShuffleHeader(ShuffleHandlerError.DISK_ERROR_EXCEPTION.toString(), -1, -1, -1);
    DataInputStream input = RuntimeTestUtils.shuffleHeaderToDataInput(header);

    // copyMapOutput is used for remote fetch, this time it returns a fetch failure, which is fatal
    // and should be treated as a local fetch failure
    InputAttemptFetchFailure[] failures =
        fetcher.copyMapOutput(mapHost, input, inputAttemptIdentifier);

    Assert.assertEquals(1, failures.length);
    Assert.assertTrue(failures[0].isDiskErrorAtSource());
    Assert.assertFalse(failures[0].isLocalFetch());
  }

  private RawLocalFileSystem getRawFs(Configuration conf) {
    try {
      return (RawLocalFileSystem) FileSystem.getLocal(conf).getRaw();
    } catch (IOException e) {
      // TODO Auto-generated catch block
      throw new RuntimeException(e);
    }
  }
}
