/*
 * Copyright (c) 2013 DataTorrent, Inc. ALL Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.datatorrent.demos.machinedata.operator;

import com.datatorrent.api.BaseOperator;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.demos.machinedata.data.MachineInfo;
import com.datatorrent.demos.machinedata.data.MachineKey;
import com.datatorrent.demos.machinedata.data.AverageData;
import com.datatorrent.demos.machinedata.data.ResourceType;
import com.datatorrent.lib.util.KeyHashValPair;
import com.datatorrent.lib.util.KeyValPair;
import com.google.common.collect.Maps;

import java.math.BigDecimal;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * This class calculates the average for various resources across different devices for a given key
 * <p>MachineInfoAveragingOperator class.</p>
 *
 * @since 0.9.0
 */
@SuppressWarnings("unused")
public class MachineInfoAveragingOperator extends BaseOperator
{

  private Map<MachineKey, List<Map<String, AverageData>>> dataMap = new HashMap<MachineKey, List<Map<String, AverageData>>>();

  public final transient DefaultOutputPort<KeyValPair<MachineKey, Map<String, String>>> outputPort = new DefaultOutputPort<KeyValPair<MachineKey, Map<String, String>>>();

  public transient DefaultOutputPort<String> smtpAlert = new DefaultOutputPort<String>();

  private int threshold = 95;

  private boolean genAlert;
  private transient DateFormat dateFormat = new SimpleDateFormat();

  /**
   * Buffer all the tuples as is till end window gets called
   */
  public final transient DefaultInputPort<KeyHashValPair<MachineKey, Map<String, AverageData>>> inputPort = new DefaultInputPort<KeyHashValPair<MachineKey, Map<String, AverageData>>>() {

    @Override
    public void process(KeyHashValPair<MachineKey, Map<String, AverageData>> tuple)
    {
      addTuple(tuple);
    }
  };

  /**
   * This method returns the threshold value
   * @return
   */
  public int getThreshold()
  {
    return threshold;
  }

  /**
   * This method sets the threshold value. If the average usage for any Resource is above this for a given key, then the alert is sent 
   * @param threshold the threshold value
   */
  public void setThreshold(int threshold)
  {
    this.threshold = threshold;
  }

  /**
   * This adds the given tuple to the dataMap
   * @param tuple input tuple
   */
  private void addTuple(KeyHashValPair<MachineKey, Map<String, AverageData>> tuple)
  {
    MachineKey key = tuple.getKey();
    List<Map<String, AverageData>> list = dataMap.get(key);
    if (list == null) {
      list = new ArrayList<Map<String, AverageData>>();
      dataMap.put(key, list);
    }
    list.add(tuple.getValue());
  }

  @Override
  public void endWindow()
  {
    for (Map.Entry<MachineKey, List<Map<String, AverageData>>> entry : dataMap.entrySet()) {
      MachineKey key = entry.getKey();
      List<Map<String, AverageData>> list = entry.getValue();

      Map<ResourceType, AverageData> averageResultMap = Maps.newHashMap();

      for (Map<String, AverageData> map : list) {
        prepareAverageResult(map, ResourceType.CPU, averageResultMap);
        prepareAverageResult(map, ResourceType.RAM, averageResultMap);
        prepareAverageResult(map, ResourceType.HDD, averageResultMap);
      }
      Map<String, String> averageResult = Maps.newHashMap();

      for (Map.Entry<ResourceType, AverageData> dataEntry : averageResultMap.entrySet()) {
        ResourceType resourceType = dataEntry.getKey();
        double average = dataEntry.getValue().getSum() / dataEntry.getValue().getCount();
        averageResult.put(resourceType.toString(), average+"");

        if (average > threshold) {
          BigDecimal bd = new BigDecimal(average);
          bd = bd.setScale(2, BigDecimal.ROUND_HALF_UP);
          String stime = key.getDay()+key.getTimeKey();
          String skey = getKeyInfo(key);

          smtpAlert.emit(resourceType.toString().toUpperCase() + " alert at " + stime + " " + resourceType + " usage breached current usage: " + bd.doubleValue() + "% threshold: " + threshold + "%\n\n" + skey);
        }
      }
      averageResult.put("day", key.getDay().toString());
      outputPort.emit(new KeyValPair<MachineKey, Map<String, String>>(key, averageResult));
    }
    dataMap.clear();
  }

  private void prepareAverageResult(Map<String, AverageData> map, ResourceType valueKey, Map<ResourceType, AverageData> averageResultMap)
  {
    AverageData average = averageResultMap.get(valueKey);
    if (average == null) {
      average = new AverageData(map.get(valueKey.toString()).getSum(), map.get(valueKey.toString()).getCount());
      averageResultMap.put(valueKey, average);
    } else {
      average.setSum(average.getSum() + map.get(valueKey.toString()).getSum());
      average.setCount(average.getCount() + map.get(valueKey.toString()).getCount());
    }
  }

  /**
   * This method is used to artificially generate alerts
   * @param genAlert
   */
  public void setGenAlert(boolean genAlert)
  {
    Calendar calendar = Calendar.getInstance();
    long timestamp = System.currentTimeMillis();
    calendar.setTimeInMillis(timestamp);
    DateFormat minuteDateFormat = new SimpleDateFormat("HHmm");
    Date date = calendar.getTime();
    String timeKey = minuteDateFormat.format(date);
    DateFormat dayDateFormat = new SimpleDateFormat("dd");
    String day = dayDateFormat.format(date);

    MachineKey alertKey = new MachineKey(timeKey,day);
    alertKey.setCustomer(1);
    alertKey.setProduct(5);
    alertKey.setOs(10);
    alertKey.setSoftware1(12);
    alertKey.setSoftware2(14);
    alertKey.setSoftware3(6);

    MachineInfo machineInfo = new MachineInfo();
    machineInfo.setMachineKey(alertKey);
    machineInfo.setCpu(threshold + 1);
    machineInfo.setRam(threshold + 1);
    machineInfo.setHdd(threshold + 1);

    smtpAlert.emit("CPU Alert: CPU Usage threshold (" + threshold + ") breached: current % usage: " + getKeyInfo(alertKey));
    smtpAlert.emit("RAM Alert: RAM Usage threshold (" + threshold + ") breached: current % usage: " + getKeyInfo(alertKey));
    smtpAlert.emit("HDD Alert: HDD Usage threshold (" + threshold + ") breached: current % usage: " + getKeyInfo(alertKey));
  }

  /**
   * This method returns the String for a given MachineKey instance
   * @param key MachineKey instance that needs to be converted to string
   * @return
   */
  private String getKeyInfo(MachineKey key)
  {
    StringBuilder sb = new StringBuilder();
    if (key instanceof MachineKey) {
      MachineKey mkey = (MachineKey) key;
      Integer customer = mkey.getCustomer();
      if (customer != null) {
        sb.append("customer: " + customer + "\n");
      }
      Integer product = mkey.getProduct();
      if (product != null) {
        sb.append("product version: " + product + "\n");
      }
      Integer os = mkey.getOs();
      if (os != null) {
        sb.append("os version: " + os + "\n");
      }
      Integer software1 = mkey.getSoftware1();
      if (software1 != null) {
        sb.append("software1 version: " + software1 + "\n");
      }
      Integer software2 = mkey.getSoftware2();
      if (software2 != null) {
        sb.append("software2 version: " + software2 + "\n");
      }
      Integer software3 = mkey.getSoftware3();
      if (software3 != null) {
        sb.append("software3 version: " + software3 + "\n");
      }
    }
    return sb.toString();
  }
}
