/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.chukwa.extraction.demux;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
import org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT;
import org.apache.hadoop.chukwa.util.NagiosHelper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Logger;

public class DemuxManager implements CHUKWA_CONSTANT {  
  static Logger log = Logger.getLogger(DemuxManager.class);

  int globalErrorcounter = 0;
  Date firstErrorTime = null;

  protected int ERROR_SLEEP_TIME = 60;
  protected int NO_DATASINK_SLEEP_TIME = 20;

  protected int DEFAULT_MAX_ERROR_COUNT = 6;
  protected int DEFAULT_MAX_FILES_PER_DEMUX = 500;
  protected int DEFAULT_REDUCER_COUNT = 8;
  
  protected int maxPermittedErrorCount = DEFAULT_MAX_ERROR_COUNT;
  protected int demuxReducerCount = 0;
  protected ChukwaConfiguration conf = null;
  protected FileSystem fs = null;
  protected int reprocess = 0;
  protected boolean sendAlert = true;
  
  protected SimpleDateFormat dayTextFormat = new java.text.SimpleDateFormat("yyyyMMdd");
  protected volatile boolean isRunning = true;

  final private static PathFilter DATA_SINK_FILTER = new PathFilter() {
    public boolean accept(Path file) {
      return file.getName().endsWith(".done");
    }     
  };


  public static void main(String[] args) throws Exception {
    
    DemuxManager manager = new DemuxManager();
    manager.start();

  }

  public DemuxManager() throws Exception {
    this.conf = new ChukwaConfiguration();
    init();
  }

  public DemuxManager(ChukwaConfiguration conf) throws Exception {
    this.conf = conf;
    init();
  }

  protected void init() throws IOException, URISyntaxException {
    String fsName = conf.get(HDFS_DEFAULT_NAME_FIELD);
    fs = FileSystem.get(new URI(fsName), conf);
  }

  public void shutdown() {
    this.isRunning = false;
  }


  public int getReprocess() {
    return reprocess;
  }

  /**
   * Start the Demux Manager daemon
   * @throws Exception
   */
  public void start() throws Exception {

     String chukwaRootDir = conf.get(CHUKWA_ROOT_DIR_FIELD, DEFAULT_CHUKWA_ROOT_DIR_NAME);
     if ( ! chukwaRootDir.endsWith("/") ) {
       chukwaRootDir += "/";
     }
     log.info("chukwaRootDir:" + chukwaRootDir);

     String demuxRootDir = chukwaRootDir + DEFAULT_DEMUX_PROCESSING_DIR_NAME;
     String demuxErrorDir = demuxRootDir + DEFAULT_DEMUX_IN_ERROR_DIR_NAME;
     String demuxInputDir = demuxRootDir + DEFAULT_DEMUX_MR_INPUT_DIR_NAME;
     String demuxOutputDir = demuxRootDir + DEFAULT_DEMUX_MR_OUTPUT_DIR_NAME;

     String dataSinkDir = conf.get(CHUKWA_DATA_SINK_DIR_FIELD, chukwaRootDir +DEFAULT_CHUKWA_LOGS_DIR_NAME);
     if ( ! dataSinkDir.endsWith("/") ) {
       dataSinkDir += "/";
     }
     log.info("dataSinkDir:" + dataSinkDir);
     
     String postProcessDir = conf.get(CHUKWA_POST_PROCESS_DIR_FIELD, chukwaRootDir +DEFAULT_CHUKWA_POSTPROCESS_DIR_NAME);
     if ( ! postProcessDir.endsWith("/") ) {
       postProcessDir += "/";
     }
     log.info("postProcessDir:" + postProcessDir);
     
     String archiveRootDir = conf.get(CHUKWA_ARCHIVE_DIR_FIELD, chukwaRootDir +DEFAULT_CHUKWA_DATASINK_DIR_NAME);
     if ( ! archiveRootDir.endsWith("/") ) {
       archiveRootDir += "/";
     }
     log.info("archiveRootDir:" + archiveRootDir);
     
     maxPermittedErrorCount = conf.getInt(CHUKWA_DEMUX_MAX_ERROR_COUNT_FIELD,
                                          DEFAULT_MAX_ERROR_COUNT);
     demuxReducerCount = conf.getInt(CHUKWA_DEMUX_REDUCER_COUNT_FIELD, DEFAULT_REDUCER_COUNT);
     log.info("demuxReducerCount:" + demuxReducerCount);
     
     String nagiosHost = conf.get(CHUKWA_NAGIOS_HOST_FIELD);
     int nagiosPort = conf.getInt(CHUKWA_NAGIOS_PORT_FIELD,0);
     String reportingHost = conf.get(CHUKWA_REPORTING_HOST_FIELD);
     
     log.info("Nagios information: nagiosHost:" + nagiosHost + ", nagiosPort:" 
         + nagiosPort + ", reportingHost:" + reportingHost);
     
     
     if (nagiosHost == null || nagiosHost.length() == 0 || nagiosPort == 0 || reportingHost == null || reportingHost.length() == 0) {
       sendAlert = false;
       log.warn("Alerting is OFF");
     }
     
     boolean demuxReady = false;

     
     while (isRunning) {
       try {
         demuxReady = false;

         if (maxPermittedErrorCount != -1 && globalErrorcounter >= maxPermittedErrorCount) {
           log.warn("==================\nToo many errors (" + globalErrorcounter +
                    "), Bail out!\n==================");
           break;
         }
         
         // Check for anomalies
         if (checkDemuxOutputDir(demuxOutputDir) == true) {
           // delete current demux output dir
           if ( deleteDemuxOutputDir(demuxOutputDir) == false ) {
             log.warn("Cannot delete an existing demux output directory!");
             throw new IOException("Cannot move demuxOutput to postProcess!");
           }
           continue;
         } else if (checkDemuxInputDir(demuxInputDir) == true) { // dataSink already there
           reprocess++;

           // Data has been processed more than 3 times ... move to InError directory
           if (reprocess > 3) {
             if (moveDataSinkFilesToDemuxErrorDirectory(demuxInputDir,demuxErrorDir) == false) {
               log.warn("Cannot move dataSink files to DemuxErrorDir!");
               throw new IOException("Cannot move dataSink files to DemuxErrorDir!");
             }
             reprocess = 0;
             continue;
           }

           log.error("Demux inputDir aready contains some dataSink files,"
               + " going to reprocess, reprocessCount=" + reprocess);
           demuxReady = true;
         } else { // standard code path
           reprocess = 0;
           // Move new dataSink Files
           if (moveDataSinkFilesToDemuxInputDirectory(dataSinkDir, demuxInputDir) == true) {
             demuxReady = true; // if any are available
           } else {
             demuxReady = false; // if none
           }
         }

         // start a new demux ?
         if (demuxReady == true) {
          boolean demuxStatus = processData(dataSinkDir, demuxInputDir, demuxOutputDir,
               postProcessDir, archiveRootDir);
          sendDemuxStatusToNagios(nagiosHost,nagiosPort,reportingHost,demuxErrorDir,demuxStatus,null);

          // if demux suceeds, then we reset these.
          if (demuxStatus) {
           globalErrorcounter = 0;
           firstErrorTime = null;
          }
         } else {
           log.info("Demux not ready so going to sleep ...");
           Thread.sleep(NO_DATASINK_SLEEP_TIME * 1000);
         }
       }catch(Throwable e) {
         globalErrorcounter ++;
         if (firstErrorTime == null) firstErrorTime = new Date();

         log.warn("Consecutive error number " + globalErrorcounter +
                  " encountered since " + firstErrorTime, e);
         sendDemuxStatusToNagios(nagiosHost,nagiosPort,reportingHost,demuxErrorDir,false, e.getMessage());
         try { Thread.sleep(ERROR_SLEEP_TIME * 1000); } 
         catch (InterruptedException e1) {/*do nothing*/ }
         init();
       }
     }
   }


   /**
    * Send NSCA status to Nagios
    * @param nagiosHost
    * @param nagiosPort
    * @param reportingHost
    * @param demuxInErrorDir
    * @param demuxStatus
    * @param exception
    */
  protected void sendDemuxStatusToNagios(String nagiosHost,int nagiosPort,String reportingHost,
        String demuxInErrorDir,boolean demuxStatus,String demuxException) {
      
     if (sendAlert == false) {
       return;
     }
      
     boolean demuxInErrorStatus = true;
     String demuxInErrorMsg = "";
     try {
       Path pDemuxInErrorDir = new Path(demuxInErrorDir);
       if ( fs.exists(pDemuxInErrorDir)) {
         FileStatus[] demuxInErrorDirs = fs.listStatus(pDemuxInErrorDir);
         if (demuxInErrorDirs.length == 0) {
           demuxInErrorStatus = false;
         }          
       }
     } catch (Throwable e) {
       demuxInErrorMsg = e.getMessage();
       log.warn(e);
     }
     
     // send Demux status
     if (demuxStatus == true) {
       NagiosHelper.sendNsca("Demux OK",NagiosHelper.NAGIOS_OK);
     } else {
       NagiosHelper.sendNsca("Demux failed. " + demuxException,NagiosHelper.NAGIOS_CRITICAL);
     }
     
     // send DemuxInErrorStatus
     if (demuxInErrorStatus == false) {
       NagiosHelper.sendNsca("DemuxInError OK",NagiosHelper.NAGIOS_OK);
     } else {
       NagiosHelper.sendNsca("DemuxInError not empty -" + demuxInErrorMsg,NagiosHelper.NAGIOS_CRITICAL);
     }
     
   }
   
   /**
    * Process Data, i.e. 
    * - run demux
    * - move demux output to postProcessDir
    * - move dataSink file to archiveDir
    * 
    * @param dataSinkDir
    * @param demuxInputDir
    * @param demuxOutputDir
    * @param postProcessDir
    * @param archiveDir
    * @return True iff succeed
    * @throws IOException
    */
    protected boolean processData(String dataSinkDir, String demuxInputDir,
       String demuxOutputDir, String postProcessDir, String archiveDir) throws IOException {

     boolean demuxStatus = false;

     long startTime = System.currentTimeMillis();
     demuxStatus = runDemux(demuxInputDir, demuxOutputDir);
     log.info("Demux Duration: " + (System.currentTimeMillis() - startTime));

     if (demuxStatus == false) {
       log.warn("Demux failed!");
     } else {

       // Move demux output to postProcessDir 
       if (checkDemuxOutputDir(demuxOutputDir)) {
         if (moveDemuxOutputDirToPostProcessDirectory(demuxOutputDir, postProcessDir) == false) {
           log.warn("Cannot move demuxOutput to postProcess! bail out!");
           throw new IOException("Cannot move demuxOutput to postProcess! bail out!");
         } 
       } else {
         log.warn("Demux processing OK but no output");
       }

       // Move DataSink Files to archiveDir
       if (moveDataSinkFilesToArchiveDirectory(demuxInputDir, archiveDir) == false) {
         log.warn("Cannot move datasinkFile to archive! bail out!");
         throw new IOException("Cannot move datasinkFile to archive! bail out!");
       }
     }
     
     return demuxStatus;
   }


   /**
    * Submit and Run demux Job 
    * @param demuxInputDir
    * @param demuxOutputDir
    * @return true id Demux succeed
    */
   protected boolean runDemux(String demuxInputDir, String demuxOutputDir) { 
     // to reload the configuration, and demux's reduce number
     Configuration tempConf = new Configuration(conf);
     tempConf.reloadConfiguration();
     demuxReducerCount = tempConf.getInt(CHUKWA_DEMUX_REDUCER_COUNT_FIELD, DEFAULT_REDUCER_COUNT);
     String[] demuxParams;
     int i=0;
     Demux.addParsers(tempConf);
     demuxParams = new String[4];
     demuxParams[i++] = "-r";
     demuxParams[i++] = "" + demuxReducerCount;
     demuxParams[i++] = demuxInputDir;
     demuxParams[i++] = demuxOutputDir;
     try {
       return ( 0 == ToolRunner.run(tempConf,new Demux(), demuxParams) );
     } catch (Throwable e) {
       e.printStackTrace();
       globalErrorcounter ++;
       if (firstErrorTime == null) firstErrorTime = new Date();
       log.error("Failed to run demux. Consecutive error number " +
               globalErrorcounter + " encountered since " + firstErrorTime, e);
     }
     return false;
   }



   /**
    * Move dataSink files to Demux input directory
    * @param dataSinkDir
    * @param demuxInputDir
    * @return true if there's any dataSink files ready to be processed
    * @throws IOException
    */
   protected boolean moveDataSinkFilesToDemuxInputDirectory(
       String dataSinkDir, String demuxInputDir) throws IOException {
     Path pDataSinkDir = new Path(dataSinkDir);
     Path pDemuxInputDir = new Path(demuxInputDir);
     log.info("dataSinkDir: " + dataSinkDir);
     log.info("demuxInputDir: " + demuxInputDir);


     boolean containsFile = false;

     FileStatus[] dataSinkFiles = fs.listStatus(pDataSinkDir,DATA_SINK_FILTER);
     if (dataSinkFiles.length > 0) {
       setup(pDemuxInputDir);
     }

     int maxFilesPerDemux = 0;
     for (FileStatus fstatus : dataSinkFiles) {
       boolean rename = fs.rename(fstatus.getPath(),pDemuxInputDir);
       log.info("Moving " + fstatus.getPath() + " to " + pDemuxInputDir +", status is:" + rename);
       maxFilesPerDemux ++;
       containsFile = true;
       if (maxFilesPerDemux >= DEFAULT_MAX_FILES_PER_DEMUX) {
         log.info("Max File per Demux reached:" + maxFilesPerDemux);
         break;
       }
     }
     return containsFile;
   }




   /**
    * Move sourceFolder inside destFolder
    * @param dataSinkDir : ex chukwa/demux/inputDir
    * @param demuxErrorDir : ex /chukwa/demux/inError
    * @return true if able to move chukwa/demux/inputDir to /chukwa/demux/inError/<YYYYMMDD>/demuxInputDirXXX
    * @throws IOException
    */
   protected boolean moveDataSinkFilesToDemuxErrorDirectory(
       String dataSinkDir, String demuxErrorDir) throws IOException {
     demuxErrorDir += "/" + dayTextFormat.format(System.currentTimeMillis());
     return moveFolder(dataSinkDir,demuxErrorDir,"demuxInputDir");
   }

   /**
    * Move sourceFolder inside destFolder
    * @param demuxInputDir: ex chukwa/demux/inputDir
    * @param archiveDirectory: ex /chukwa/archives
    * @return true if able to move chukwa/demux/inputDir to /chukwa/archives/raw/<YYYYMMDD>/dataSinkDirXXX
    * @throws IOException
    */
   protected boolean moveDataSinkFilesToArchiveDirectory(
       String demuxInputDir, String archiveDirectory) throws IOException {
     archiveDirectory += "/" + dayTextFormat.format(System.currentTimeMillis());
     return moveFolder(demuxInputDir,archiveDirectory,"dataSinkDir");
   }

   /**
    * Move sourceFolder inside destFolder
    * @param demuxOutputDir: ex chukwa/demux/outputDir 
    * @param postProcessDirectory: ex /chukwa/postProcess
    * @return true if able to move chukwa/demux/outputDir to /chukwa/postProcess/demuxOutputDirXXX
    * @throws IOException 
    */
   protected  boolean moveDemuxOutputDirToPostProcessDirectory(
       String demuxOutputDir, String postProcessDirectory) throws IOException {
     return moveFolder(demuxOutputDir,postProcessDirectory,"demuxOutputDir");
   }


   /**
    * Test if demuxInputDir exists
    * @param demuxInputDir
    * @return true if demuxInputDir exists
    * @throws IOException
    */
   protected boolean checkDemuxInputDir(String demuxInputDir)
   throws IOException {
     return dirExists(demuxInputDir);
   }

   /**
    * Test if demuxOutputDir exists
    * @param demuxOutputDir
    * @return true if demuxOutputDir exists
    * @throws IOException
    */
   protected boolean checkDemuxOutputDir(String demuxOutputDir)
   throws IOException {
     return dirExists(demuxOutputDir);
   }


   /**
    * Delete DemuxOutput directory
    * @param demuxOutputDir
    * @return true if succeed
    * @throws IOException
    */
   protected boolean deleteDemuxOutputDir(String demuxOutputDir) throws IOException
   {
     return fs.delete(new Path(demuxOutputDir), true);
   }

   /**
    * Create directory if !exists
    * @param directory
    * @throws IOException
    */
   protected void setup(Path directory) throws IOException {
      if ( ! fs.exists(directory)) {
        fs.mkdirs(directory);
      }
    }

    /** 
     * Check if source exists and if source is a directory
     * @param f source file
     */
   protected boolean dirExists(String directory) throws IOException {
      Path pDirectory = new Path(directory);
      return (fs.exists(pDirectory) && fs.getFileStatus(pDirectory).isDir());
    }

    /**
     * Move sourceFolder inside destFolder
     * @param srcDir
     * @param destDir
     * @return
     * @throws IOException
     */ 
   protected boolean moveFolder(String srcDir,String destDir, String prefix) throws IOException {
      if (!destDir.endsWith("/")) {
        destDir +="/";
      }
      Path pSrcDir = new Path(srcDir);
      Path pDestDir = new Path(destDir );
      setup(pDestDir);
      destDir += prefix +"_" +System.currentTimeMillis();
      Path pFinalDestDir = new Path(destDir );

      return fs.rename(pSrcDir, pFinalDestDir);
    }
}
