/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.mapred.gridmix;

import java.io.IOException;

import org.junit.Test;
import static org.junit.Assert.*;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.StatusReporter;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskInputOutputContext;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
import org.apache.hadoop.mapreduce.task.MapContextImpl;
import org.apache.hadoop.mapreduce.util.ResourceCalculatorPlugin;
import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin.ProcResourceValues;
import org.apache.hadoop.tools.rumen.ResourceUsageMetrics;
import org.apache.hadoop.mapred.DummyResourceCalculatorPlugin;
import org.apache.hadoop.mapred.gridmix.LoadJob.ResourceUsageMatcherRunner;
import org.apache.hadoop.mapred.gridmix.emulators.resourceusage.CumulativeCpuUsageEmulatorPlugin;
import org.apache.hadoop.mapred.gridmix.emulators.resourceusage.ResourceUsageEmulatorPlugin;
import org.apache.hadoop.mapred.gridmix.emulators.resourceusage.ResourceUsageMatcher;
import org.apache.hadoop.mapred.gridmix.emulators.resourceusage.CumulativeCpuUsageEmulatorPlugin.DefaultCpuUsageEmulator;

/**
 * Test Gridmix's resource emulator framework and supported plugins.
 */
public class TestResourceUsageEmulators {
  /**
   * A {@link ResourceUsageEmulatorPlugin} implementation for testing purpose.
   * It essentially creates a file named 'test' in the test directory.
   */
  static class TestResourceUsageEmulatorPlugin 
  implements ResourceUsageEmulatorPlugin {
    static final Path rootTempDir =
        new Path(System.getProperty("test.build.data", "/tmp"));
    static final Path tempDir = 
      new Path(rootTempDir, "TestResourceUsageEmulatorPlugin");
    static final String DEFAULT_IDENTIFIER = "test";
    
    private Path touchPath = null;
    private FileSystem fs = null;
    
    @Override
    public void emulate() throws IOException, InterruptedException {
      // add some time between 2 calls to emulate()
      try {
        Thread.sleep(1000); // sleep for 1s
      } catch (Exception e){}
      
      try {
        fs.delete(touchPath, false); // delete the touch file
        //TODO Search for a better touch utility
        fs.create(touchPath).close(); // recreate it
      } catch (Exception e) {
        throw new RuntimeException(e);
      }
    }
    
    protected String getIdentifier() {
      return DEFAULT_IDENTIFIER;
    }
    
    private static Path getFilePath(String id) {
      return new Path(tempDir, id);
    }
    
    private static Path getInitFilePath(String id) {
      return new Path(tempDir, id + ".init");
    }
    
    @Override
    public void initialize(Configuration conf, ResourceUsageMetrics metrics,
        ResourceCalculatorPlugin monitor, Progressive progress) {
      // add some time between 2 calls to initialize()
      try {
        Thread.sleep(1000); // sleep for 1s
      } catch (Exception e){}
      
      try {
        fs = FileSystem.getLocal(conf);
        
        Path initPath = getInitFilePath(getIdentifier());
        fs.delete(initPath, false); // delete the old file
        fs.create(initPath).close(); // create a new one
        
        touchPath = getFilePath(getIdentifier());
        fs.delete(touchPath, false);
      } catch (Exception e) {
        
      } finally {
        if (fs != null) {
          try {
            fs.deleteOnExit(tempDir);
          } catch (IOException ioe){}
        }
      }
    }
    
    // test if the emulation framework successfully loaded this plugin
    static long testInitialization(String id, Configuration conf) 
    throws IOException {
      Path testPath = getInitFilePath(id);
      FileSystem fs = FileSystem.getLocal(conf);
      return fs.exists(testPath) 
             ? fs.getFileStatus(testPath).getModificationTime() 
             : 0;
    }
    
    // test if the emulation framework successfully loaded this plugin
    static long testEmulation(String id, Configuration conf) 
    throws IOException {
      Path testPath = getFilePath(id);
      FileSystem fs = FileSystem.getLocal(conf);
      return fs.exists(testPath) 
             ? fs.getFileStatus(testPath).getModificationTime() 
             : 0;
    }
  }
  
  /**
   * Test implementation of {@link ResourceUsageEmulatorPlugin} which creates
   * a file named 'others' in the test directory.
   */
  static class TestOthers extends TestResourceUsageEmulatorPlugin {
    static final String ID = "others";
    
    @Override
    protected String getIdentifier() {
      return ID;
    }
  }
  
  /**
   * Test implementation of {@link ResourceUsageEmulatorPlugin} which creates
   * a file named 'cpu' in the test directory.
   */
  static class TestCpu extends TestResourceUsageEmulatorPlugin {
    static final String ID = "cpu";
    
    @Override
    protected String getIdentifier() {
      return ID;
    }
  }
  
  /**
   * Test {@link ResourceUsageMatcher}.
   */
  @Test
  public void testResourceUsageMatcher() throws Exception {
    ResourceUsageMatcher matcher = new ResourceUsageMatcher();
    Configuration conf = new Configuration();
    conf.setClass(ResourceUsageMatcher.RESOURCE_USAGE_EMULATION_PLUGINS, 
                  TestResourceUsageEmulatorPlugin.class, 
                  ResourceUsageEmulatorPlugin.class);
    long currentTime = System.currentTimeMillis();
    
    matcher.configure(conf, null, null, null);
    
    matcher.matchResourceUsage();
    
    String id = TestResourceUsageEmulatorPlugin.DEFAULT_IDENTIFIER;
    long result = 
      TestResourceUsageEmulatorPlugin.testInitialization(id, conf);
    assertTrue("Resource usage matcher failed to initialize the configured"
               + " plugin", result > currentTime);
    result = TestResourceUsageEmulatorPlugin.testEmulation(id, conf);
    assertTrue("Resource usage matcher failed to load and emulate the"
               + " configured plugin", result > currentTime);
    
    // test plugin order to first emulate cpu and then others
    conf.setStrings(ResourceUsageMatcher.RESOURCE_USAGE_EMULATION_PLUGINS, 
                    TestCpu.class.getName() + "," + TestOthers.class.getName());
    
    matcher.configure(conf, null, null, null);

    // test the initialization order
    long time1 = 
           TestResourceUsageEmulatorPlugin.testInitialization(TestCpu.ID, conf);
    long time2 = 
           TestResourceUsageEmulatorPlugin.testInitialization(TestOthers.ID, 
                                                              conf);
    assertTrue("Resource usage matcher failed to initialize the configured"
               + " plugins in order", time1 < time2);
    
    matcher.matchResourceUsage();

    // Note that the cpu usage emulator plugin is configured 1st and then the
    // others plugin.
    time1 = 
      TestResourceUsageEmulatorPlugin.testInitialization(TestCpu.ID, conf);
    time2 = 
      TestResourceUsageEmulatorPlugin.testInitialization(TestOthers.ID, 
                                                         conf);
    assertTrue("Resource usage matcher failed to load the configured plugins", 
               time1 < time2);
  }
  
  /**
   * Fakes the cumulative usage using {@link FakeCpuUsageEmulatorCore}.
   */
  static class FakeResourceUsageMonitor extends DummyResourceCalculatorPlugin {
    private FakeCpuUsageEmulatorCore core;
    
    public FakeResourceUsageMonitor(FakeCpuUsageEmulatorCore core) {
      this.core = core;
    }
    
    /**
     * A dummy CPU usage monitor. Every call to 
     * {@link ResourceCalculatorPlugin#getCumulativeCpuTime()} will return the 
     * value of {@link FakeCpuUsageEmulatorCore#getNumCalls()}.
     */
    @Override
    public long getCumulativeCpuTime() {
      return core.getCpuUsage();
    }

    /**
     * Returns a {@link ProcResourceValues} with cumulative cpu usage  
     * computed using {@link #getCumulativeCpuTime()}.
     */
    @Override
    public ProcResourceValues getProcResourceValues() {
      long usageValue = getCumulativeCpuTime();
      return new ProcResourceValues(usageValue, -1, -1);
    }
  }
  
  /**
   * A dummy {@link Progressive} implementation that allows users to set the
   * progress for testing. The {@link Progressive#getProgress()} call will 
   * return the last progress value set using 
   * {@link FakeProgressive#setProgress(float)}.
   */
  static class FakeProgressive implements Progressive {
    private float progress = 0F;
    @Override
    public float getProgress() {
      return progress;
    }
    
    void setProgress(float progress) {
      this.progress = progress;
    }
  }
  
  /**
   * A dummy reporter for {@link LoadJob.ResourceUsageMatcherRunner}.
   */
  private static class DummyReporter extends StatusReporter {
    private Progressive progress;
    
    DummyReporter(Progressive progress) {
      this.progress = progress;
    }
    
    @Override
    public org.apache.hadoop.mapreduce.Counter getCounter(Enum<?> name) {
      return null;
    }
    
    @Override
    public org.apache.hadoop.mapreduce.Counter getCounter(String group,
                                                          String name) {
      return null;
    }
    
    @Override
    public void progress() {
    }
    
    @Override
    public float getProgress() {
      return progress.getProgress();
    }
    
    @Override
    public void setStatus(String status) {
    }
  }
  
  // Extends ResourceUsageMatcherRunner for testing.
  @SuppressWarnings("unchecked")
  private static class FakeResourceUsageMatcherRunner 
  extends ResourceUsageMatcherRunner {
    FakeResourceUsageMatcherRunner(TaskInputOutputContext context, 
                                   ResourceUsageMetrics metrics) {
      super(context, metrics);
    }
    
    // test ResourceUsageMatcherRunner
    void test() throws Exception {
      super.match();
    }
  }
  
  /**
   * Test {@link LoadJob.ResourceUsageMatcherRunner}.
   */
  @Test
  @SuppressWarnings("unchecked")
  public void testResourceUsageMatcherRunner() throws Exception {
    Configuration conf = new Configuration();
    FakeProgressive progress = new FakeProgressive();
    
    // set the resource calculator plugin
    conf.setClass(TTConfig.TT_RESOURCE_CALCULATOR_PLUGIN,
                  DummyResourceCalculatorPlugin.class, 
                  ResourceCalculatorPlugin.class);
    // set the resources
    // set the resource implementation class
    conf.setClass(ResourceUsageMatcher.RESOURCE_USAGE_EMULATION_PLUGINS, 
                  TestResourceUsageEmulatorPlugin.class, 
                  ResourceUsageEmulatorPlugin.class);
    
    long currentTime = System.currentTimeMillis();
    
    // initialize the matcher class
    TaskAttemptID id = new TaskAttemptID("test", 1, TaskType.MAP, 1, 1);
    StatusReporter reporter = new DummyReporter(progress);
    TaskInputOutputContext context = 
      new MapContextImpl(conf, id, null, null, null, reporter, null);
    FakeResourceUsageMatcherRunner matcher = 
      new FakeResourceUsageMatcherRunner(context, null);
    
    // check if the matcher initialized the plugin
    String identifier = TestResourceUsageEmulatorPlugin.DEFAULT_IDENTIFIER;
    long initTime = 
      TestResourceUsageEmulatorPlugin.testInitialization(identifier, conf);
    assertTrue("ResourceUsageMatcherRunner failed to initialize the"
               + " configured plugin", initTime > currentTime);
    
    // check the progress
    assertEquals("Progress mismatch in ResourceUsageMatcherRunner", 
                 0, progress.getProgress(), 0D);
    
    // call match() and check progress
    progress.setProgress(0.01f);
    currentTime = System.currentTimeMillis();
    matcher.test();
    long emulateTime = 
      TestResourceUsageEmulatorPlugin.testEmulation(identifier, conf);
    assertTrue("ProgressBasedResourceUsageMatcher failed to load and emulate"
               + " the configured plugin", emulateTime > currentTime);
  }
  
  /**
   * Test {@link CumulativeCpuUsageEmulatorPlugin}'s core CPU usage emulation 
   * engine.
   */
  @Test
  public void testCpuUsageEmulator() throws IOException {
    // test CpuUsageEmulator calibration with fake resource calculator plugin
    long target = 100000L; // 100 secs
    int unitUsage = 50;
    FakeCpuUsageEmulatorCore fakeCpuEmulator = new FakeCpuUsageEmulatorCore();
    fakeCpuEmulator.setUnitUsage(unitUsage);
    FakeResourceUsageMonitor fakeMonitor = 
      new FakeResourceUsageMonitor(fakeCpuEmulator);
    
    // calibrate for 100ms
    fakeCpuEmulator.calibrate(fakeMonitor, target);
    
    // by default, CpuUsageEmulator.calibrate() will consume 100ms of CPU usage
    assertEquals("Fake calibration failed", 
                 100, fakeMonitor.getCumulativeCpuTime());
    assertEquals("Fake calibration failed", 
                 100, fakeCpuEmulator.getCpuUsage());
    // by default, CpuUsageEmulator.performUnitComputation() will be called 
    // twice
    assertEquals("Fake calibration failed", 
                 2, fakeCpuEmulator.getNumCalls());
  }
  
  /**
   * This is a dummy class that fakes CPU usage.
   */
  private static class FakeCpuUsageEmulatorCore 
  extends DefaultCpuUsageEmulator {
    private int numCalls = 0;
    private int unitUsage = 1;
    private int cpuUsage = 0;
    
    @Override
    protected void performUnitComputation() {
      ++numCalls;
      cpuUsage += unitUsage;
    }
    
    int getNumCalls() {
      return numCalls;
    }
    
    int getCpuUsage() {
      return cpuUsage;
    }
    
    void reset() {
      numCalls = 0;
      cpuUsage = 0;
    }
    
    void setUnitUsage(int unitUsage) {
      this.unitUsage = unitUsage;
    }
  }
  
  // Creates a ResourceUsageMetrics object from the target usage
  static ResourceUsageMetrics createMetrics(long target) {
    ResourceUsageMetrics metrics = new ResourceUsageMetrics();
    metrics.setCumulativeCpuUsage(target);
    metrics.setVirtualMemoryUsage(target);
    metrics.setPhysicalMemoryUsage(target);
    metrics.setHeapUsage(target);
    return metrics;
  }
  
  /**
   * Test {@link CumulativeCpuUsageEmulatorPlugin}.
   */
  @Test
  public void testCumulativeCpuUsageEmulatorPlugin() throws Exception {
    Configuration conf = new Configuration();
    long targetCpuUsage = 1000L;
    int unitCpuUsage = 50;
    
    // fake progress indicator
    FakeProgressive fakeProgress = new FakeProgressive();
    
    // fake cpu usage generator
    FakeCpuUsageEmulatorCore fakeCore = new FakeCpuUsageEmulatorCore();
    fakeCore.setUnitUsage(unitCpuUsage);
    
    // a cumulative cpu usage emulator with fake core
    CumulativeCpuUsageEmulatorPlugin cpuPlugin = 
      new CumulativeCpuUsageEmulatorPlugin(fakeCore);
    
    // test with invalid or missing resource usage value
    ResourceUsageMetrics invalidUsage = createMetrics(0);
    cpuPlugin.initialize(conf, invalidUsage, null, null);
    
    // test if disabled cpu emulation plugin's emulate() call is a no-operation
    // this will test if the emulation plugin is disabled or not
    int numCallsPre = fakeCore.getNumCalls();
    long cpuUsagePre = fakeCore.getCpuUsage();
    cpuPlugin.emulate();
    int numCallsPost = fakeCore.getNumCalls();
    long cpuUsagePost = fakeCore.getCpuUsage();
    
    //  test if no calls are made cpu usage emulator core
    assertEquals("Disabled cumulative CPU usage emulation plugin works!", 
                 numCallsPre, numCallsPost);
    
    //  test if no calls are made cpu usage emulator core
    assertEquals("Disabled cumulative CPU usage emulation plugin works!", 
                 cpuUsagePre, cpuUsagePost);
    
    // test with valid resource usage value
    ResourceUsageMetrics metrics = createMetrics(targetCpuUsage);
    
    // fake monitor
    ResourceCalculatorPlugin monitor = new FakeResourceUsageMonitor(fakeCore);
    
    // test with default emulation interval
    testEmulationAccuracy(conf, fakeCore, monitor, metrics, cpuPlugin, 
                          targetCpuUsage, targetCpuUsage / unitCpuUsage);
    
    // test with custom value for emulation interval of 20%
    conf.setFloat(CumulativeCpuUsageEmulatorPlugin.CPU_EMULATION_PROGRESS_INTERVAL,
                  0.2F);
    testEmulationAccuracy(conf, fakeCore, monitor, metrics, cpuPlugin, 
                          targetCpuUsage, targetCpuUsage / unitCpuUsage);
    
    // test if emulation interval boundary is respected (unit usage = 1)
    //  test the case where the current progress is less than threshold
    fakeProgress = new FakeProgressive(); // initialize
    fakeCore.reset();
    fakeCore.setUnitUsage(1);
    conf.setFloat(CumulativeCpuUsageEmulatorPlugin.CPU_EMULATION_PROGRESS_INTERVAL,
                  0.25F);
    cpuPlugin.initialize(conf, metrics, monitor, fakeProgress);
    // take a snapshot after the initialization
    long initCpuUsage = monitor.getCumulativeCpuTime();
    long initNumCalls = fakeCore.getNumCalls();
    // test with 0 progress
    testEmulationBoundary(0F, fakeCore, fakeProgress, cpuPlugin, initCpuUsage, 
                          initNumCalls, "[no-op, 0 progress]");
    // test with 24% progress
    testEmulationBoundary(0.24F, fakeCore, fakeProgress, cpuPlugin, 
                          initCpuUsage, initNumCalls, "[no-op, 24% progress]");
    // test with 25% progress
    //  target = 1000ms, target emulation at 25% = 250ms, 
    //  weighed target = 1000 * 0.25^4 (we are using progress^4 as the weight)
    //                 ~ 4
    //  but current usage = init-usage = 100, hence expected = 100
    testEmulationBoundary(0.25F, fakeCore, fakeProgress, cpuPlugin, 
                          initCpuUsage, initNumCalls, "[op, 25% progress]");
    
    // test with 80% progress
    //  target = 1000ms, target emulation at 80% = 800ms, 
    //  weighed target = 1000 * 0.25^4 (we are using progress^4 as the weight)
    //                 ~ 410
    //  current-usage = init-usage = 100, hence expected-usage = 410
    testEmulationBoundary(0.80F, fakeCore, fakeProgress, cpuPlugin, 410, 410, 
                          "[op, 80% progress]");
    
    // now test if the final call with 100% progress ramps up the CPU usage
    testEmulationBoundary(1F, fakeCore, fakeProgress, cpuPlugin, targetCpuUsage,
                          targetCpuUsage, "[op, 100% progress]");
    
    // test if emulation interval boundary is respected (unit usage = 50)
    //  test the case where the current progress is less than threshold
    fakeProgress = new FakeProgressive(); // initialize
    fakeCore.reset();
    fakeCore.setUnitUsage(unitCpuUsage);
    conf.setFloat(CumulativeCpuUsageEmulatorPlugin.CPU_EMULATION_PROGRESS_INTERVAL,
                  0.40F);
    cpuPlugin.initialize(conf, metrics, monitor, fakeProgress);
    // take a snapshot after the initialization
    initCpuUsage = monitor.getCumulativeCpuTime();
    initNumCalls = fakeCore.getNumCalls();
    // test with 0 progress
    testEmulationBoundary(0F, fakeCore, fakeProgress, cpuPlugin, initCpuUsage, 
                          initNumCalls, "[no-op, 0 progress]");
    // test with 39% progress
    testEmulationBoundary(0.39F, fakeCore, fakeProgress, cpuPlugin, 
                          initCpuUsage, initNumCalls, "[no-op, 39% progress]");
    // test with 40% progress
    //  target = 1000ms, target emulation at 40% = 4000ms, 
    //  weighed target = 1000 * 0.40^4 (we are using progress^4 as the weight)
    //                 ~ 26
    // current-usage = init-usage = 100, hence expected-usage = 100
    testEmulationBoundary(0.40F, fakeCore, fakeProgress, cpuPlugin, 
                          initCpuUsage, initNumCalls, "[op, 40% progress]");
    
    // test with 90% progress
    //  target = 1000ms, target emulation at 90% = 900ms, 
    //  weighed target = 1000 * 0.90^4 (we are using progress^4 as the weight)
    //                 ~ 657
    //  current-usage = init-usage = 100, hence expected-usage = 657 but 
    //  the fake-core increases in steps of 50, hence final target = 700
    testEmulationBoundary(0.90F, fakeCore, fakeProgress, cpuPlugin, 700, 
                          700 / unitCpuUsage, "[op, 90% progress]");
    
    // now test if the final call with 100% progress ramps up the CPU usage
    testEmulationBoundary(1F, fakeCore, fakeProgress, cpuPlugin, targetCpuUsage,
                          targetCpuUsage / unitCpuUsage, "[op, 100% progress]");
  }
  
  // test whether the CPU usage emulator achieves the desired target using
  // desired calls to the underling core engine.
  private static void testEmulationAccuracy(Configuration conf, 
                        FakeCpuUsageEmulatorCore fakeCore,
                        ResourceCalculatorPlugin monitor,
                        ResourceUsageMetrics metrics,
                        CumulativeCpuUsageEmulatorPlugin cpuPlugin,
                        long expectedTotalCpuUsage, long expectedTotalNumCalls) 
  throws Exception {
    FakeProgressive fakeProgress = new FakeProgressive();
    fakeCore.reset();
    cpuPlugin.initialize(conf, metrics, monitor, fakeProgress);
    int numLoops = 0;
    while (fakeProgress.getProgress() < 1) {
      ++numLoops;
      float progress = (float)numLoops / 100;
      fakeProgress.setProgress(progress);
      cpuPlugin.emulate();
    }
    
    // test if the resource plugin shows the expected invocations
    assertEquals("Cumulative cpu usage emulator plugin failed (num calls)!", 
                 expectedTotalNumCalls, fakeCore.getNumCalls(), 0L);
    // test if the resource plugin shows the expected usage
    assertEquals("Cumulative cpu usage emulator plugin failed (total usage)!", 
                 expectedTotalCpuUsage, fakeCore.getCpuUsage(), 0L);
  }
  
  // tests if the CPU usage emulation plugin emulates only at the expected
  // progress gaps
  private static void testEmulationBoundary(float progress, 
      FakeCpuUsageEmulatorCore fakeCore, FakeProgressive fakeProgress, 
      CumulativeCpuUsageEmulatorPlugin cpuPlugin, long expectedTotalCpuUsage, 
      long expectedTotalNumCalls, String info) throws Exception {
    fakeProgress.setProgress(progress);
    cpuPlugin.emulate();
    
    assertEquals("Emulation interval test for cpu usage failed " + info + "!", 
                 expectedTotalCpuUsage, fakeCore.getCpuUsage(), 0L);
    assertEquals("Emulation interval test for num calls failed " + info + "!", 
                 expectedTotalNumCalls, fakeCore.getNumCalls(), 0L);
  }
}
