blob: 1de676f3d47b352d6242685f76aa72e74dc63ea8 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.examples.machinedata.operator;
import java.math.BigDecimal;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import org.apache.apex.examples.machinedata.data.AverageData;
import org.apache.apex.examples.machinedata.data.MachineInfo;
import org.apache.apex.examples.machinedata.data.MachineKey;
import com.google.common.collect.Maps;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.lib.util.KeyHashValPair;
import com.datatorrent.lib.util.KeyValPair;
/**
* This class calculates the average for various resources across different devices for a given key
* <p>MachineInfoAveragingOperator class.</p>
*
* @since 0.9.0
*/
@SuppressWarnings("unused")
public class MachineInfoAveragingOperator extends BaseOperator
{
public static final String CPU = "cpu";
public static final String RAM = "ram";
public static final String HDD = "hdd";
public static final String DAY = "day";
private final transient Map<MachineKey, AverageData> dataMap = new HashMap<>();
public final transient DefaultOutputPort<KeyValPair<MachineKey, Map<String, String>>> outputPort = new DefaultOutputPort<>();
public transient DefaultOutputPort<String> smtpAlert = new DefaultOutputPort<>();
private int threshold = 95;
/**
* Buffer all the tuples as is till end window gets called
*/
public final transient DefaultInputPort<KeyHashValPair<MachineKey, AverageData>> inputPort = new DefaultInputPort<KeyHashValPair<MachineKey, AverageData>>()
{
@Override
public void process(KeyHashValPair<MachineKey, AverageData> tuple)
{
addTuple(tuple);
}
};
/**
* This method returns the threshold value
*
* @return
*/
public int getThreshold()
{
return threshold;
}
/**
* This method sets the threshold value. If the average usage for any Resource is above this for a given key, then the alert is sent
*
* @param threshold the threshold value
*/
public void setThreshold(int threshold)
{
this.threshold = threshold;
}
/**
* This adds the given tuple to the dataMap
*
* @param tuple input tuple
*/
private void addTuple(KeyHashValPair<MachineKey, AverageData> tuple)
{
MachineKey key = tuple.getKey();
dataMap.put(key, tuple.getValue());
}
@Override
public void endWindow()
{
for (Map.Entry<MachineKey, AverageData> entry : dataMap.entrySet()) {
MachineKey key = entry.getKey();
AverageData averageResultMap = entry.getValue();
Map<String, String> averageResult = Maps.newHashMap();
long count = averageResultMap.getCount();
double average = averageResultMap.getCpu() / count;
averageResult.put(CPU, average + "");
emitAlert(average, CPU, key);
average = averageResultMap.getHdd() / count;
averageResult.put(HDD, average + "");
emitAlert(average, HDD, key);
average = averageResultMap.getRam() / count;
averageResult.put(RAM, average + "");
emitAlert(average, RAM, key);
averageResult.put(DAY, key.getDay());
outputPort.emit(new KeyValPair<>(key, averageResult));
}
dataMap.clear();
}
private void emitAlert(double average, String resourceType, MachineKey key)
{
if (average > threshold) {
BigDecimal bd = new BigDecimal(average);
bd = bd.setScale(2, BigDecimal.ROUND_HALF_UP);
String stime = key.getDay() + key.getTimeKey();
String skey = getKeyInfo(key);
smtpAlert.emit(resourceType.toUpperCase() + " alert at " + stime + " " + resourceType + " usage breached current usage: " + bd.doubleValue() + "% threshold: " + threshold + "%\n\n" + skey);
}
}
/**
* This method is used to artificially generate alerts
*
* @param genAlert
*/
public void setGenAlert(boolean genAlert)
{
Calendar calendar = Calendar.getInstance();
long timestamp = System.currentTimeMillis();
calendar.setTimeInMillis(timestamp);
DateFormat minuteDateFormat = new SimpleDateFormat("HHmm");
Date date = calendar.getTime();
String timeKey = minuteDateFormat.format(date);
DateFormat dayDateFormat = new SimpleDateFormat("dd");
String day = dayDateFormat.format(date);
MachineKey alertKey = new MachineKey(timeKey, day);
alertKey.setCustomer(1);
alertKey.setProduct(5);
alertKey.setOs(10);
alertKey.setSoftware1(12);
alertKey.setSoftware2(14);
alertKey.setSoftware3(6);
MachineInfo machineInfo = new MachineInfo();
machineInfo.setMachineKey(alertKey);
machineInfo.setCpu(threshold + 1);
machineInfo.setRam(threshold + 1);
machineInfo.setHdd(threshold + 1);
smtpAlert.emit("CPU Alert: CPU Usage threshold (" + threshold + ") breached: current % usage: " + getKeyInfo(alertKey));
smtpAlert.emit("RAM Alert: RAM Usage threshold (" + threshold + ") breached: current % usage: " + getKeyInfo(alertKey));
smtpAlert.emit("HDD Alert: HDD Usage threshold (" + threshold + ") breached: current % usage: " + getKeyInfo(alertKey));
}
/**
* This method returns the String for a given MachineKey instance
*
* @param key MachineKey instance that needs to be converted to string
* @return
*/
private String getKeyInfo(MachineKey key)
{
StringBuilder sb = new StringBuilder();
if (key instanceof MachineKey) {
MachineKey mkey = (MachineKey)key;
Integer customer = mkey.getCustomer();
if (customer != null) {
sb.append("customer: " + customer + "\n");
}
Integer product = mkey.getProduct();
if (product != null) {
sb.append("product version: " + product + "\n");
}
Integer os = mkey.getOs();
if (os != null) {
sb.append("os version: " + os + "\n");
}
Integer software1 = mkey.getSoftware1();
if (software1 != null) {
sb.append("software1 version: " + software1 + "\n");
}
Integer software2 = mkey.getSoftware2();
if (software2 != null) {
sb.append("software2 version: " + software2 + "\n");
}
Integer software3 = mkey.getSoftware3();
if (software3 != null) {
sb.append("software3 version: " + software3 + "\n");
}
}
return sb.toString();
}
}