blob: 29c92e632dfa95108912689437604d5fb55822a4 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.apps.logstream;
import java.text.DecimalFormat;
import java.text.NumberFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map.Entry;
import javax.validation.constraints.NotNull;
import org.apache.commons.lang3.tuple.MutablePair;
import com.datatorrent.lib.io.WidgetOutputOperator;
import com.datatorrent.lib.logs.DimensionObject;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.apps.logstream.PropertyRegistry.LogstreamPropertyRegistry;
import com.datatorrent.apps.logstream.PropertyRegistry.PropertyRegistry;
import java.lang.reflect.Array;
/**
* Output operator to send outputs to websocket to display on UI widgets.
*
* @since 0.9.4
*/
public class LogstreamWidgetOutputOperator extends WidgetOutputOperator
{
@NotNull
private PropertyRegistry<String> registry;
public final transient LogstreamTopNInputPort logstreamTopNInput = new LogstreamTopNInputPort(LogstreamWidgetOutputOperator.this);
/**
* supply the registry object which is used to store and retrieve meta information about each tuple
*
* @param registry
*/
public void setRegistry(PropertyRegistry<String> registry)
{
this.registry = registry;
}
@Override
public void setup(OperatorContext context)
{
super.setup(context);
LogstreamPropertyRegistry.setInstance(registry);
}
/**
* Creates the widget output object from input tuple, populates all the meta information and sends to websocket
*/
public class LogstreamTopNInputPort extends DefaultInputPort<HashMap<String, ArrayList<DimensionObject<String>>>>
{
NumberFormat formatter = new DecimalFormat("#0.00");
private LogstreamWidgetOutputOperator operator;
public LogstreamTopNInputPort(LogstreamWidgetOutputOperator oper)
{
operator = oper;
}
@Override
public void process(HashMap<String, ArrayList<DimensionObject<String>>> tuple)
{
for (Entry<String, ArrayList<DimensionObject<String>>> entry : tuple.entrySet()) {
String keyString = entry.getKey();
ArrayList<DimensionObject<String>> arrayList = entry.getValue();
HashMap<String, Object> schemaObj = new HashMap<String, Object>();
String[] keyInfo = keyString.split("\\|");
HashMap<String, String> tupleMeta = new HashMap<String, String>();
tupleMeta.put("timeBucket", keyInfo[0]);
//appMeta.put("timeStamp", key[1]);
tupleMeta.put("logType", registry.lookupValue(new Integer(keyInfo[2])));
tupleMeta.put("filter", registry.lookupValue(new Integer(keyInfo[3])));
tupleMeta.put("dimension", registry.lookupValue(new Integer(keyInfo[4])));
String[] val = keyInfo[5].split("\\.");
tupleMeta.put("value", val[0]);
tupleMeta.put("metric", val[1]);
schemaObj.put("tupleMeta", tupleMeta);
String keyTitle = tupleMeta.get("dimension");
String valueTitle = tupleMeta.get("metric") + "(" + tupleMeta.get("value") + ")";
schemaObj.put("keyTitle", keyTitle);
schemaObj.put("valueTitle", valueTitle);
String topic = keyInfo[0] + "|" + keyInfo[2] + "|" + keyInfo[3] + "|" + keyInfo[4] + "|" + keyInfo[5];
setTopic(topic);
HashMap<String, Number> topNMap = new HashMap<String, Number>();
for (DimensionObject<String> dimensionObject : arrayList) {
topNMap.put(dimensionObject.getVal(), dimensionObject.getCount());
}
this.processTopN(topNMap, schemaObj);
}
}
private void processTopN(HashMap<String, Number> topNMap, HashMap<String, Object> schemaObj)
{
@SuppressWarnings("unchecked")
HashMap<String, Object>[] result = (HashMap<String, Object>[])Array.newInstance(HashMap.class, topNMap.size());
int j = 0;
for (Entry<String, Number> e : topNMap.entrySet()) {
result[j] = new HashMap<String, Object>();
result[j].put("name", e.getKey());
String val = formatter.format(e.getValue());
result[j++].put("value", val);
}
if (operator.isWebSocketConnected) {
schemaObj.put("type", "topN");
schemaObj.put("n", operator.nInTopN);
operator.wsoo.input.process(new MutablePair<String, Object>(operator.getFullTopic(operator.topNTopic, schemaObj), result));
}
else {
operator.coo.input.process(topNMap);
}
}
public LogstreamTopNInputPort setN(int n)
{
operator.nInTopN = n;
return this;
}
public LogstreamTopNInputPort setTopic(String topic)
{
operator.topNTopic = topic;
return this;
}
}
}