/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.mapred.gridmix;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobStatus;
import org.apache.hadoop.mapred.gridmix.Gridmix;
import org.apache.hadoop.mapred.gridmix.test.system.GridMixConfig;
import org.apache.hadoop.mapred.gridmix.test.system.GridMixRunMode;
import org.apache.hadoop.mapred.gridmix.test.system.UtilsForGridmix;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.junit.Assert;
import org.junit.Test;

/**
 * Verify the Gridmix generated input if compression emulation turn on.
 */
public class TestGridmixCompressedInputGeneration 
    extends GridmixSystemTestCase { 

  private static final Log LOG = 
      LogFactory.getLog("TestGridmixCompressedInputGeneration.class");

  /**
   * Generate input data and verify whether input files are compressed
   * or not.
   * @throws Exception - if an error occurs.
   */
  @Test
  public void testGridmixCompressionInputGeneration() throws Exception {
    final long inputSizeInMB = 1024 * 7;
    final String [] runtimeValues = {"LOADJOB",
                                     SubmitterUserResolver.class.getName(),
                                     "STRESS",
                                     inputSizeInMB  + "m",
                                     "file:///dev/null"};
    final String [] otherArgs = { 
        "-D", GridMixConfig.GRIDMIX_DISTCACHE_ENABLE + "=false",
        "-D", GridMixConfig.GRIDMIX_COMPRESSION_ENABLE + "=true"
    };
    LOG.info("Verify the generated compressed input data.");
    runAndVerify(true, inputSizeInMB, runtimeValues, otherArgs);
  }

  /**
   * Disable compression emulation and verify whether input files are 
   * compressed or not.
   * @throws Exception
   */
  @Test
  public void testGridmixInputGenerationWithoutCompressionEnable() 
      throws Exception { 
    UtilsForGridmix.cleanup(gridmixDir, rtClient.getDaemonConf());
    final long inputSizeInMB = 1024 * 6;
    final String [] runtimeValues = {"LOADJOB",
                                     SubmitterUserResolver.class.getName(),
                                     "STRESS",
                                     inputSizeInMB + "m",
                                     "file:///dev/null"};
    final String [] otherArgs = {
        "-D", GridMixConfig.GRIDMIX_DISTCACHE_ENABLE + "=false",
        "-D", GridMixConfig.GRIDMIX_COMPRESSION_ENABLE + "=false"
    };

    LOG.info("Verify the generated uncompressed input data.");
    runAndVerify(false, inputSizeInMB, runtimeValues, otherArgs);
  }
  
  private void runAndVerify(boolean isCompressed, long INPUT_SIZE, 
      String [] runtimeValues, String [] otherArgs) throws Exception { 
    int exitCode = 
        UtilsForGridmix.runGridmixJob(gridmixDir, conf, 
                                      GridMixRunMode.DATA_GENERATION.getValue(),
                                      runtimeValues,otherArgs);
    Assert.assertEquals("Data generation has failed.", 0, exitCode);
    verifyJobStatus();
    verifyInputDataSize(INPUT_SIZE);
    verifyInputFiles(isCompressed);
  }
  
  private void verifyInputFiles(boolean isCompressed) throws IOException { 
    List<String> inputFiles = 
        getInputFiles(conf, Gridmix.getGridmixInputDataPath(gridmixDir));
    for (String inputFile: inputFiles) {
      boolean fileStatus = (inputFile.contains(".gz") 
                         || inputFile.contains(".tgz"))? true : false;
      if (isCompressed) { 
        Assert.assertTrue("Compressed input split file was not found.",
                          fileStatus);
      } else {
        Assert.assertFalse("Uncompressed input split file was not found.",
                           fileStatus);
      }
    }
  }

  private void verifyInputDataSize(long INPUT_SIZE) throws IOException {
    long actDataSize = 
        getInputDataSizeInMB(conf, Gridmix.getGridmixInputDataPath(gridmixDir));
    double ratio = ((double)actDataSize)/INPUT_SIZE;
    long expDataSize = (long)(INPUT_SIZE * ratio);
    Assert.assertEquals("Generated data has not matched with given size.", 
                        expDataSize, actDataSize);
  }

  private void verifyJobStatus() throws IOException { 
    JobClient jobClient = jtClient.getClient();
    int len = jobClient.getAllJobs().length;
    LOG.info("Verify the job status after completion of job...");
    Assert.assertEquals("Job has not succeeded.", JobStatus.SUCCEEDED, 
                        jobClient.getAllJobs()[len -1].getRunState());
  }

  private long getInputDataSizeInMB(Configuration conf, Path inputDir) 
      throws IOException { 
    FileSystem fs = inputDir.getFileSystem(conf);
    ContentSummary csmry = fs.getContentSummary(inputDir);
    long dataSize = csmry.getLength();
    dataSize = dataSize/(1024 * 1024);
    return dataSize;
  }

  private List<String> getInputFiles(Configuration conf, Path inputDir) 
      throws IOException {
    FileSystem fs = inputDir.getFileSystem(conf);
    FileStatus [] listStatus = fs.listStatus(inputDir);
    List<String> files = new ArrayList<String>();
    for (FileStatus fileStat : listStatus) {
      files.add(getInputFile(fileStat, conf));
    }
    return files;
  }

  private String getInputFile(FileStatus fstatus, Configuration conf) 
      throws IOException {
    String fileName = null;
    if (!fstatus.isDirectory()) {
      fileName = fstatus.getPath().getName();
    } else {
      FileSystem fs = fstatus.getPath().getFileSystem(conf);
      FileStatus [] listStatus = fs.listStatus(fstatus.getPath());
      for (FileStatus fileStat : listStatus) {
         return getInputFile(fileStat, conf);
      }
    }
    return fileName;
  }
}

