/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.mapred;

import java.io.DataOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.UUID;

import javax.security.auth.login.LoginException;

import junit.framework.TestCase;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.security.UserGroupInformation;

/** 
 * Re-runs a map task using the IsolationRunner. 
 *
 * The task included here is an identity mapper that touches
 * a file in a side-effect directory.  This is used
 * to verify that the task in fact ran.
 */
public class TestIsolationRunner extends TestCase {

  private static final String SIDE_EFFECT_DIR_PROPERTY =
    "test.isolationrunner.sideeffectdir";
  private static String TEST_ROOT_DIR = new File(System.getProperty(
      "test.build.data", "/tmp")).toURI().toString().replace(' ', '+');
  
  /** Identity mapper that also creates a side effect file. */
  static class SideEffectMapper<K, V> extends IdentityMapper<K, V> {
    private JobConf conf;
    @Override
    public void configure(JobConf conf) {
      this.conf = conf;
    }
    @Override
    public void close() throws IOException {
      writeSideEffectFile(conf, "map");
    }
  }

  static class SideEffectReducer<K, V> extends IdentityReducer<K, V> {
    private JobConf conf;
    @Override
    public void configure(JobConf conf) {
      this.conf = conf;
    }
    @Override
    public void close() throws IOException {
      writeSideEffectFile(conf, "reduce");
    }
  }

  private static void deleteSideEffectFiles(JobConf conf) throws IOException {
    FileSystem localFs = FileSystem.getLocal(conf);
    localFs.delete(new Path(conf.get(SIDE_EFFECT_DIR_PROPERTY)), true);
    assertEquals(0, countSideEffectFiles(conf, ""));
  }
  
  private static void writeSideEffectFile(JobConf conf, String prefix)
      throws IOException {
    FileSystem localFs = FileSystem.getLocal(conf);
    Path sideEffectFile = new Path(conf.get(SIDE_EFFECT_DIR_PROPERTY),
        prefix + "-" + UUID.randomUUID().toString());
    localFs.create(sideEffectFile).close();
  }
  
  private static int countSideEffectFiles(JobConf conf, final String prefix)
      throws IOException {
    FileSystem localFs = FileSystem.getLocal(conf);
    try {
      FileStatus[] files = localFs.listStatus(
          new Path(conf.get(SIDE_EFFECT_DIR_PROPERTY)), new PathFilter() {
        @Override public boolean accept(Path path) {
          return path.getName().startsWith(prefix + "-");
        }
      });
      return files.length;
    } catch (FileNotFoundException fnfe) {
      return 0;
    }
  }

  private Path getAttemptJobXml(JobConf conf, JobID jobId, TaskType taskType)
      throws IOException,
      LoginException {
    String taskid =
        new TaskAttemptID(new TaskID(jobId, taskType, 0), 0).toString();
    return new LocalDirAllocator(MRConfig.LOCAL_DIR).getLocalPathToRead(
        TaskTracker.getTaskConfFile(UserGroupInformation.login(conf)
            .getUserName(), jobId.toString(), taskid, false), conf);
  }

  public void testIsolationRunOfMapTask()
      throws IOException,
      InterruptedException,
      ClassNotFoundException,
      LoginException {
    MiniMRCluster mr = null;
    try {
      mr = new MiniMRCluster(1, "file:///", 4);

      // Run a job succesfully; keep task files.
      JobConf conf = mr.createJobConf();
      conf.setKeepTaskFilesPattern(".*");
      conf.set(SIDE_EFFECT_DIR_PROPERTY, TEST_ROOT_DIR +
          "/isolationrunnerjob/sideeffect");
      // Delete previous runs' data.
      deleteSideEffectFiles(conf);
      JobID jobId = runJobNormally(conf);
      assertEquals(1, countSideEffectFiles(conf, "map"));
      assertEquals(1, countSideEffectFiles(conf, "reduce"));
      
      deleteSideEffectFiles(conf);

      // Retrieve succesful job's configuration and 
      // run IsolationRunner against the map task.
      FileSystem localFs = FileSystem.getLocal(conf);
      Path mapJobXml =
          getAttemptJobXml(
              mr.getTaskTrackerRunner(0).getTaskTracker().getJobConf(), jobId,
              TaskType.MAP).makeQualified(localFs);
      assertTrue(localFs.exists(mapJobXml));
      
      new IsolationRunner().run(new String[] {
          new File(mapJobXml.toUri()).getCanonicalPath() });
      
      assertEquals(1, countSideEffectFiles(conf, "map"));
      assertEquals(0, countSideEffectFiles(conf, "reduce"));

      // Clean up
      deleteSideEffectFiles(conf);
    } finally {
      if (mr != null) {
        mr.shutdown();
      }
    }
  }

  static JobID runJobNormally(JobConf conf) throws IOException {
    final Path inDir = new Path(TEST_ROOT_DIR + "/isolationrunnerjob/input");
    final Path outDir = new Path(TEST_ROOT_DIR + "/isolationrunnerjob/output");

    FileSystem fs = FileSystem.get(conf);
    fs.delete(outDir, true);
    if (!fs.exists(inDir)) {
      fs.mkdirs(inDir);
    }
    String input = "The quick brown fox jumps over lazy dog\n";
    DataOutputStream file = fs.create(new Path(inDir, "file"));
    file.writeBytes(input);
    file.close();

    conf.setInputFormat(TextInputFormat.class);
    conf.setMapperClass(SideEffectMapper.class);
    conf.setReducerClass(SideEffectReducer.class);

    FileInputFormat.setInputPaths(conf, inDir);
    FileOutputFormat.setOutputPath(conf, outDir);
    conf.setNumMapTasks(1);
    conf.setNumReduceTasks(1);

    JobClient jobClient = new JobClient(conf);
    RunningJob job = jobClient.submitJob(conf);
    job.waitForCompletion();
    return job.getID();
  }
}
