/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.mapreduce.v2.util;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FilterFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.InvalidJobConfException;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;

public class TestMRApps {
  private static File testWorkDir = null;
  
  @BeforeClass
  public static void setupTestDirs() throws IOException {
    testWorkDir = new File("target", TestMRApps.class.getCanonicalName());
    delete(testWorkDir);
    testWorkDir.mkdirs();
    testWorkDir = testWorkDir.getAbsoluteFile();
  }
  
  @AfterClass
  public static void cleanupTestDirs() throws IOException {
    if (testWorkDir != null) {
      delete(testWorkDir);
    }
  }
  
  private static void delete(File dir) throws IOException {
    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.getLocal(conf);
    Path p = fs.makeQualified(new Path(dir.getAbsolutePath()));
    fs.delete(p, true);
  }

  @Test (timeout = 120000)
  public void testJobIDtoString() {
    JobId jid = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(JobId.class);
    jid.setAppId(RecordFactoryProvider.getRecordFactory(null).newRecordInstance(ApplicationId.class));
    assertEquals("job_0_0000", MRApps.toString(jid));
  }

  @Test (timeout = 120000)
  public void testToJobID() {
    JobId jid = MRApps.toJobID("job_1_1");
    assertEquals(1, jid.getAppId().getClusterTimestamp());
    assertEquals(1, jid.getAppId().getId());
    assertEquals(1, jid.getId()); // tests against some proto.id and not a job.id field
  }

  @Test (timeout = 120000, expected=IllegalArgumentException.class)
  public void testJobIDShort() {
    MRApps.toJobID("job_0_0_0");
  }

  //TODO_get.set
  @Test (timeout = 120000)
  public void testTaskIDtoString() {
    TaskId tid = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(TaskId.class);
    tid.setJobId(RecordFactoryProvider.getRecordFactory(null).newRecordInstance(JobId.class));
    tid.getJobId().setAppId(RecordFactoryProvider.getRecordFactory(null).newRecordInstance(ApplicationId.class));
    tid.setTaskType(TaskType.MAP);
    TaskType type = tid.getTaskType();
    System.err.println(type);
    type = TaskType.REDUCE;
    System.err.println(type);
    System.err.println(tid.getTaskType());
    assertEquals("task_0_0000_m_000000", MRApps.toString(tid));
    tid.setTaskType(TaskType.REDUCE);
    assertEquals("task_0_0000_r_000000", MRApps.toString(tid));
  }

  @Test (timeout = 120000)
  public void testToTaskID() {
    TaskId tid = MRApps.toTaskID("task_1_2_r_3");
    assertEquals(1, tid.getJobId().getAppId().getClusterTimestamp());
    assertEquals(2, tid.getJobId().getAppId().getId());
    assertEquals(2, tid.getJobId().getId());
    assertEquals(TaskType.REDUCE, tid.getTaskType());
    assertEquals(3, tid.getId());

    tid = MRApps.toTaskID("task_1_2_m_3");
    assertEquals(TaskType.MAP, tid.getTaskType());
  }

  @Test(timeout = 120000, expected=IllegalArgumentException.class) 
  public void testTaskIDShort() {
    MRApps.toTaskID("task_0_0000_m");
  }

  @Test(timeout = 120000, expected=IllegalArgumentException.class) 
  public void testTaskIDBadType() {
    MRApps.toTaskID("task_0_0000_x_000000");
  }

  //TODO_get.set
  @Test (timeout = 120000)
  public void testTaskAttemptIDtoString() {
    TaskAttemptId taid = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(TaskAttemptId.class);
    taid.setTaskId(RecordFactoryProvider.getRecordFactory(null).newRecordInstance(TaskId.class));
    taid.getTaskId().setTaskType(TaskType.MAP);
    taid.getTaskId().setJobId(RecordFactoryProvider.getRecordFactory(null).newRecordInstance(JobId.class));
    taid.getTaskId().getJobId().setAppId(RecordFactoryProvider.getRecordFactory(null).newRecordInstance(ApplicationId.class));
    assertEquals("attempt_0_0000_m_000000_0", MRApps.toString(taid));
  }

  @Test (timeout = 120000)
  public void testToTaskAttemptID() {
    TaskAttemptId taid = MRApps.toTaskAttemptID("attempt_0_1_m_2_3");
    assertEquals(0, taid.getTaskId().getJobId().getAppId().getClusterTimestamp());
    assertEquals(1, taid.getTaskId().getJobId().getAppId().getId());
    assertEquals(1, taid.getTaskId().getJobId().getId());
    assertEquals(2, taid.getTaskId().getId());
    assertEquals(3, taid.getId());
  }

  @Test(timeout = 120000, expected=IllegalArgumentException.class) 
  public void testTaskAttemptIDShort() {
    MRApps.toTaskAttemptID("attempt_0_0_0_m_0");
  }

  @Test (timeout = 120000)
  public void testGetJobFileWithUser() {
    Configuration conf = new Configuration();
    conf.set(MRJobConfig.MR_AM_STAGING_DIR, "/my/path/to/staging");
    String jobFile = MRApps.getJobFile(conf, "dummy-user", 
        new JobID("dummy-job", 12345));
    assertNotNull("getJobFile results in null.", jobFile);
    assertEquals("jobFile with specified user is not as expected.",
        "/my/path/to/staging/dummy-user/.staging/job_dummy-job_12345/job.xml", jobFile);
  }

  @Test (timeout = 120000)
  public void testSetClasspath() throws IOException {
    Job job = Job.getInstance();
    Map<String, String> environment = new HashMap<String, String>();
    MRApps.setClasspath(environment, job.getConfiguration());
    assertTrue(environment.get("CLASSPATH").startsWith(
      ApplicationConstants.Environment.PWD.$() + File.pathSeparator));
    String yarnAppClasspath = 
        job.getConfiguration().get(
            YarnConfiguration.YARN_APPLICATION_CLASSPATH);
    if (yarnAppClasspath != null) {
      yarnAppClasspath = yarnAppClasspath.replaceAll(",\\s*", File.pathSeparator)
        .trim();
    }
    assertTrue(environment.get("CLASSPATH").contains(yarnAppClasspath));
    String mrAppClasspath = 
        job.getConfiguration().get(MRJobConfig.MAPREDUCE_APPLICATION_CLASSPATH);
    if (mrAppClasspath != null) {
      mrAppClasspath = mrAppClasspath.replaceAll(",\\s*", File.pathSeparator)
        .trim();
    }
    assertTrue(environment.get("CLASSPATH").contains(mrAppClasspath));
  }
  
  @Test (timeout = 120000)
  public void testSetClasspathWithArchives () throws IOException {
    File testTGZ = new File(testWorkDir, "test.tgz");
    FileOutputStream out = new FileOutputStream(testTGZ);
    out.write(0);
    out.close();
    Job job = Job.getInstance();
    Configuration conf = job.getConfiguration();
    String testTGZQualifiedPath = FileSystem.getLocal(conf).makeQualified(new Path(
      testTGZ.getAbsolutePath())).toString();
    conf.set(MRJobConfig.CLASSPATH_ARCHIVES, testTGZQualifiedPath);
    conf.set(MRJobConfig.CACHE_ARCHIVES, testTGZQualifiedPath + "#testTGZ");
    Map<String, String> environment = new HashMap<String, String>();
    MRApps.setClasspath(environment, conf);
    assertTrue(environment.get("CLASSPATH").startsWith(
      ApplicationConstants.Environment.PWD.$() + File.pathSeparator));
    String confClasspath = job.getConfiguration().get(YarnConfiguration.YARN_APPLICATION_CLASSPATH);
    if (confClasspath != null) {
      confClasspath = confClasspath.replaceAll(",\\s*", File.pathSeparator)
        .trim();
    }
    assertTrue(environment.get("CLASSPATH").contains(confClasspath));
    assertTrue(environment.get("CLASSPATH").contains("testTGZ"));
  }

 @Test (timeout = 120000)
 public void testSetClasspathWithUserPrecendence() {
    Configuration conf = new Configuration();
    conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, true);
    Map<String, String> env = new HashMap<String, String>();
    try {
      MRApps.setClasspath(env, conf);
    } catch (Exception e) {
      fail("Got exception while setting classpath");
    }
    String env_str = env.get("CLASSPATH");
    String expectedClasspath = StringUtils.join(File.pathSeparator,
      Arrays.asList(ApplicationConstants.Environment.PWD.$(), "job.jar/job.jar",
        "job.jar/classes/", "job.jar/lib/*",
        ApplicationConstants.Environment.PWD.$() + "/*"));
    assertTrue("MAPREDUCE_JOB_USER_CLASSPATH_FIRST set, but not taking effect!",
      env_str.startsWith(expectedClasspath));
  }

  @Test (timeout = 120000)
  public void testSetClasspathWithNoUserPrecendence() {
    Configuration conf = new Configuration();
    conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, false);
    Map<String, String> env = new HashMap<String, String>();
    try {
      MRApps.setClasspath(env, conf);
    } catch (Exception e) {
      fail("Got exception while setting classpath");
    }
    String env_str = env.get("CLASSPATH");
    String expectedClasspath = StringUtils.join(File.pathSeparator,
      Arrays.asList("job.jar/job.jar", "job.jar/classes/", "job.jar/lib/*",
        ApplicationConstants.Environment.PWD.$() + "/*"));
    assertTrue("MAPREDUCE_JOB_USER_CLASSPATH_FIRST false, and job.jar is not in"
      + " the classpath!", env_str.contains(expectedClasspath));
    assertFalse("MAPREDUCE_JOB_USER_CLASSPATH_FIRST false, but taking effect!",
      env_str.startsWith(expectedClasspath));
  }
  
  @Test (timeout = 120000)
  public void testSetClasspathWithJobClassloader() throws IOException {
    Configuration conf = new Configuration();
    conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, true);
    Map<String, String> env = new HashMap<String, String>();
    MRApps.setClasspath(env, conf);
    String cp = env.get("CLASSPATH");
    String appCp = env.get("APP_CLASSPATH");
    assertFalse("MAPREDUCE_JOB_CLASSLOADER true, but job.jar is in the"
      + " classpath!", cp.contains("jar" + File.pathSeparator + "job"));
    assertFalse("MAPREDUCE_JOB_CLASSLOADER true, but PWD is in the classpath!",
      cp.contains("PWD"));
    String expectedAppClasspath = StringUtils.join(File.pathSeparator,
      Arrays.asList(ApplicationConstants.Environment.PWD.$(), "job.jar/job.jar",
        "job.jar/classes/", "job.jar/lib/*",
        ApplicationConstants.Environment.PWD.$() + "/*"));
    assertEquals("MAPREDUCE_JOB_CLASSLOADER true, but job.jar is not in the app"
      + " classpath!", expectedAppClasspath, appCp);
  }
  
  @Test (timeout = 30000)
  public void testSetupDistributedCacheEmpty() throws IOException {
    Configuration conf = new Configuration();
    Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
    MRApps.setupDistributedCache(conf, localResources);
    assertTrue("Empty Config did not produce an empty list of resources",
        localResources.isEmpty());
  }
  
  @SuppressWarnings("deprecation")
  @Test(timeout = 120000, expected = InvalidJobConfException.class)
  public void testSetupDistributedCacheConflicts() throws Exception {
    Configuration conf = new Configuration();
    conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
    
    URI mockUri = URI.create("mockfs://mock/");
    FileSystem mockFs = ((FilterFileSystem)FileSystem.get(mockUri, conf))
        .getRawFileSystem();
    
    URI archive = new URI("mockfs://mock/tmp/something.zip#something");
    Path archivePath = new Path(archive);
    URI file = new URI("mockfs://mock/tmp/something.txt#something");
    Path filePath = new Path(file);
    
    when(mockFs.resolvePath(archivePath)).thenReturn(archivePath);
    when(mockFs.resolvePath(filePath)).thenReturn(filePath);
    
    DistributedCache.addCacheArchive(archive, conf);
    conf.set(MRJobConfig.CACHE_ARCHIVES_TIMESTAMPS, "10");
    conf.set(MRJobConfig.CACHE_ARCHIVES_SIZES, "10");
    conf.set(MRJobConfig.CACHE_ARCHIVES_VISIBILITIES, "true");
    DistributedCache.addCacheFile(file, conf);
    conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "11");
    conf.set(MRJobConfig.CACHE_FILES_SIZES, "11");
    conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "true");
    Map<String, LocalResource> localResources = 
      new HashMap<String, LocalResource>();
    MRApps.setupDistributedCache(conf, localResources);
  }
  
  @SuppressWarnings("deprecation")
  @Test(timeout = 120000, expected = InvalidJobConfException.class)
  public void testSetupDistributedCacheConflictsFiles() throws Exception {
    Configuration conf = new Configuration();
    conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
    
    URI mockUri = URI.create("mockfs://mock/");
    FileSystem mockFs = ((FilterFileSystem)FileSystem.get(mockUri, conf))
        .getRawFileSystem();
    
    URI file = new URI("mockfs://mock/tmp/something.zip#something");
    Path filePath = new Path(file);
    URI file2 = new URI("mockfs://mock/tmp/something.txt#something");
    Path file2Path = new Path(file);
    
    when(mockFs.resolvePath(filePath)).thenReturn(filePath);
    when(mockFs.resolvePath(file2Path)).thenReturn(file2Path);
    
    DistributedCache.addCacheFile(file, conf);
    DistributedCache.addCacheFile(file2, conf);
    conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "10,11");
    conf.set(MRJobConfig.CACHE_FILES_SIZES, "10,11");
    conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "true,true");
    Map<String, LocalResource> localResources = 
      new HashMap<String, LocalResource>();
    MRApps.setupDistributedCache(conf, localResources);
  }
  
  @SuppressWarnings("deprecation")
  @Test (timeout = 30000)
  public void testSetupDistributedCache() throws Exception {
    Configuration conf = new Configuration();
    conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
    
    URI mockUri = URI.create("mockfs://mock/");
    FileSystem mockFs = ((FilterFileSystem)FileSystem.get(mockUri, conf))
        .getRawFileSystem();
    
    URI archive = new URI("mockfs://mock/tmp/something.zip");
    Path archivePath = new Path(archive);
    URI file = new URI("mockfs://mock/tmp/something.txt#something");
    Path filePath = new Path(file);
    
    when(mockFs.resolvePath(archivePath)).thenReturn(archivePath);
    when(mockFs.resolvePath(filePath)).thenReturn(filePath);
    
    DistributedCache.addCacheArchive(archive, conf);
    conf.set(MRJobConfig.CACHE_ARCHIVES_TIMESTAMPS, "10");
    conf.set(MRJobConfig.CACHE_ARCHIVES_SIZES, "10");
    conf.set(MRJobConfig.CACHE_ARCHIVES_VISIBILITIES, "true");
    DistributedCache.addCacheFile(file, conf);
    conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "11");
    conf.set(MRJobConfig.CACHE_FILES_SIZES, "11");
    conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "true");
    Map<String, LocalResource> localResources = 
      new HashMap<String, LocalResource>();
    MRApps.setupDistributedCache(conf, localResources);
    assertEquals(2, localResources.size());
    LocalResource lr = localResources.get("something.zip");
    assertNotNull(lr);
    assertEquals(10l, lr.getSize());
    assertEquals(10l, lr.getTimestamp());
    assertEquals(LocalResourceType.ARCHIVE, lr.getType());
    lr = localResources.get("something");
    assertNotNull(lr);
    assertEquals(11l, lr.getSize());
    assertEquals(11l, lr.getTimestamp());
    assertEquals(LocalResourceType.FILE, lr.getType());
  }
  
  static class MockFileSystem extends FilterFileSystem {
    MockFileSystem() {
      super(mock(FileSystem.class));
    }
    public void initialize(URI name, Configuration conf) throws IOException {}
  }
  
}
