blob: d1c1ab8c8ae035cda2d1413425c910be82b66611 [file] [log] [blame]
/*
* Copyright (c) 2013 DataTorrent, Inc. ALL Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datatorrent.demos.machinedata.operator;
import com.datatorrent.api.BaseOperator;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.demos.machinedata.data.MachineInfo;
import com.datatorrent.demos.machinedata.data.MachineKey;
import com.datatorrent.demos.machinedata.data.AverageData;
import com.datatorrent.demos.machinedata.data.ResourceType;
import com.datatorrent.lib.util.KeyHashValPair;
import com.datatorrent.lib.util.KeyValPair;
import com.google.common.collect.Maps;
import java.math.BigDecimal;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* This class calculates the average for various resources across different devices for a given key
* <p>MachineInfoAveragingOperator class.</p>
*
* @since 0.9.0
*/
@SuppressWarnings("unused")
public class MachineInfoAveragingOperator extends BaseOperator
{
private Map<MachineKey, List<Map<String, AverageData>>> dataMap = new HashMap<MachineKey, List<Map<String, AverageData>>>();
public final transient DefaultOutputPort<KeyValPair<MachineKey, Map<String, String>>> outputPort = new DefaultOutputPort<KeyValPair<MachineKey, Map<String, String>>>();
public transient DefaultOutputPort<String> smtpAlert = new DefaultOutputPort<String>();
private int threshold = 95;
private boolean genAlert;
private transient DateFormat dateFormat = new SimpleDateFormat();
/**
* Buffer all the tuples as is till end window gets called
*/
public final transient DefaultInputPort<KeyHashValPair<MachineKey, Map<String, AverageData>>> inputPort = new DefaultInputPort<KeyHashValPair<MachineKey, Map<String, AverageData>>>() {
@Override
public void process(KeyHashValPair<MachineKey, Map<String, AverageData>> tuple)
{
addTuple(tuple);
}
};
/**
* This method returns the threshold value
* @return
*/
public int getThreshold()
{
return threshold;
}
/**
* This method sets the threshold value. If the average usage for any Resource is above this for a given key, then the alert is sent
* @param threshold the threshold value
*/
public void setThreshold(int threshold)
{
this.threshold = threshold;
}
/**
* This adds the given tuple to the dataMap
* @param tuple input tuple
*/
private void addTuple(KeyHashValPair<MachineKey, Map<String, AverageData>> tuple)
{
MachineKey key = tuple.getKey();
List<Map<String, AverageData>> list = dataMap.get(key);
if (list == null) {
list = new ArrayList<Map<String, AverageData>>();
dataMap.put(key, list);
}
list.add(tuple.getValue());
}
@Override
public void endWindow()
{
for (Map.Entry<MachineKey, List<Map<String, AverageData>>> entry : dataMap.entrySet()) {
MachineKey key = entry.getKey();
List<Map<String, AverageData>> list = entry.getValue();
Map<ResourceType, AverageData> averageResultMap = Maps.newHashMap();
for (Map<String, AverageData> map : list) {
prepareAverageResult(map, ResourceType.CPU, averageResultMap);
prepareAverageResult(map, ResourceType.RAM, averageResultMap);
prepareAverageResult(map, ResourceType.HDD, averageResultMap);
}
Map<String, String> averageResult = Maps.newHashMap();
for (Map.Entry<ResourceType, AverageData> dataEntry : averageResultMap.entrySet()) {
ResourceType resourceType = dataEntry.getKey();
double average = dataEntry.getValue().getSum() / dataEntry.getValue().getCount();
averageResult.put(resourceType.toString(), average+"");
if (average > threshold) {
BigDecimal bd = new BigDecimal(average);
bd = bd.setScale(2, BigDecimal.ROUND_HALF_UP);
String stime = key.getDay()+key.getTimeKey();
String skey = getKeyInfo(key);
smtpAlert.emit(resourceType.toString().toUpperCase() + " alert at " + stime + " " + resourceType + " usage breached current usage: " + bd.doubleValue() + "% threshold: " + threshold + "%\n\n" + skey);
}
}
averageResult.put("day", key.getDay().toString());
outputPort.emit(new KeyValPair<MachineKey, Map<String, String>>(key, averageResult));
}
dataMap.clear();
}
private void prepareAverageResult(Map<String, AverageData> map, ResourceType valueKey, Map<ResourceType, AverageData> averageResultMap)
{
AverageData average = averageResultMap.get(valueKey);
if (average == null) {
average = new AverageData(map.get(valueKey.toString()).getSum(), map.get(valueKey.toString()).getCount());
averageResultMap.put(valueKey, average);
} else {
average.setSum(average.getSum() + map.get(valueKey.toString()).getSum());
average.setCount(average.getCount() + map.get(valueKey.toString()).getCount());
}
}
/**
* This method is used to artificially generate alerts
* @param genAlert
*/
public void setGenAlert(boolean genAlert)
{
Calendar calendar = Calendar.getInstance();
long timestamp = System.currentTimeMillis();
calendar.setTimeInMillis(timestamp);
DateFormat minuteDateFormat = new SimpleDateFormat("HHmm");
Date date = calendar.getTime();
String timeKey = minuteDateFormat.format(date);
DateFormat dayDateFormat = new SimpleDateFormat("dd");
String day = dayDateFormat.format(date);
MachineKey alertKey = new MachineKey(timeKey,day);
alertKey.setCustomer(1);
alertKey.setProduct(5);
alertKey.setOs(10);
alertKey.setSoftware1(12);
alertKey.setSoftware2(14);
alertKey.setSoftware3(6);
MachineInfo machineInfo = new MachineInfo();
machineInfo.setMachineKey(alertKey);
machineInfo.setCpu(threshold + 1);
machineInfo.setRam(threshold + 1);
machineInfo.setHdd(threshold + 1);
smtpAlert.emit("CPU Alert: CPU Usage threshold (" + threshold + ") breached: current % usage: " + getKeyInfo(alertKey));
smtpAlert.emit("RAM Alert: RAM Usage threshold (" + threshold + ") breached: current % usage: " + getKeyInfo(alertKey));
smtpAlert.emit("HDD Alert: HDD Usage threshold (" + threshold + ") breached: current % usage: " + getKeyInfo(alertKey));
}
/**
* This method returns the String for a given MachineKey instance
* @param key MachineKey instance that needs to be converted to string
* @return
*/
private String getKeyInfo(MachineKey key)
{
StringBuilder sb = new StringBuilder();
if (key instanceof MachineKey) {
MachineKey mkey = (MachineKey) key;
Integer customer = mkey.getCustomer();
if (customer != null) {
sb.append("customer: " + customer + "\n");
}
Integer product = mkey.getProduct();
if (product != null) {
sb.append("product version: " + product + "\n");
}
Integer os = mkey.getOs();
if (os != null) {
sb.append("os version: " + os + "\n");
}
Integer software1 = mkey.getSoftware1();
if (software1 != null) {
sb.append("software1 version: " + software1 + "\n");
}
Integer software2 = mkey.getSoftware2();
if (software2 != null) {
sb.append("software2 version: " + software2 + "\n");
}
Integer software3 = mkey.getSoftware3();
if (software3 != null) {
sb.append("software3 version: " + software3 + "\n");
}
}
return sb.toString();
}
}