blob: 715cb6bd960918e28fc9c09be76711b6f264f914 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package com.datatorrent.apps.logstream;
import java.text.DecimalFormat;
import java.text.NumberFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map.Entry;
import javax.validation.constraints.NotNull;
import org.apache.apex.malhar.lib.logs.DimensionObject;
import org.apache.commons.lang3.tuple.MutablePair;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.apps.logstream.PropertyRegistry.LogstreamPropertyRegistry;
import com.datatorrent.apps.logstream.PropertyRegistry.PropertyRegistry;
import java.lang.reflect.Array;
* Output operator to send outputs to websocket to display on UI widgets.
* @since 0.9.4
public class LogstreamWidgetOutputOperator extends WidgetOutputOperator
private PropertyRegistry<String> registry;
public final transient LogstreamTopNInputPort logstreamTopNInput = new LogstreamTopNInputPort(LogstreamWidgetOutputOperator.this);
* supply the registry object which is used to store and retrieve meta information about each tuple
* @param registry
public void setRegistry(PropertyRegistry<String> registry)
this.registry = registry;
public void setup(OperatorContext context)
* Creates the widget output object from input tuple, populates all the meta information and sends to websocket
public class LogstreamTopNInputPort extends DefaultInputPort<HashMap<String, ArrayList<DimensionObject<String>>>>
NumberFormat formatter = new DecimalFormat("#0.00");
private LogstreamWidgetOutputOperator operator;
public LogstreamTopNInputPort(LogstreamWidgetOutputOperator oper)
operator = oper;
public void process(HashMap<String, ArrayList<DimensionObject<String>>> tuple)
for (Entry<String, ArrayList<DimensionObject<String>>> entry : tuple.entrySet()) {
String keyString = entry.getKey();
ArrayList<DimensionObject<String>> arrayList = entry.getValue();
HashMap<String, Object> schemaObj = new HashMap<String, Object>();
String[] keyInfo = keyString.split("\\|");
HashMap<String, String> tupleMeta = new HashMap<String, String>();
tupleMeta.put("timeBucket", keyInfo[0]);
//appMeta.put("timeStamp", key[1]);
tupleMeta.put("logType", registry.lookupValue(new Integer(keyInfo[2])));
tupleMeta.put("filter", registry.lookupValue(new Integer(keyInfo[3])));
tupleMeta.put("dimension", registry.lookupValue(new Integer(keyInfo[4])));
String[] val = keyInfo[5].split("\\.");
tupleMeta.put("value", val[0]);
tupleMeta.put("metric", val[1]);
schemaObj.put("tupleMeta", tupleMeta);
String keyTitle = tupleMeta.get("dimension");
String valueTitle = tupleMeta.get("metric") + "(" + tupleMeta.get("value") + ")";
schemaObj.put("keyTitle", keyTitle);
schemaObj.put("valueTitle", valueTitle);
String topic = keyInfo[0] + "|" + keyInfo[2] + "|" + keyInfo[3] + "|" + keyInfo[4] + "|" + keyInfo[5];
HashMap<String, Number> topNMap = new HashMap<String, Number>();
for (DimensionObject<String> dimensionObject : arrayList) {
topNMap.put(dimensionObject.getVal(), dimensionObject.getCount());
this.processTopN(topNMap, schemaObj);
private void processTopN(HashMap<String, Number> topNMap, HashMap<String, Object> schemaObj)
HashMap<String, Object>[] result = (HashMap<String, Object>[])Array.newInstance(HashMap.class, topNMap.size());
int j = 0;
for (Entry<String, Number> e : topNMap.entrySet()) {
result[j] = new HashMap<String, Object>();
result[j].put("name", e.getKey());
String val = formatter.format(e.getValue());
result[j++].put("value", val);
if (operator.isWebSocketConnected) {
schemaObj.put("type", "topN");
schemaObj.put("n", operator.nInTopN);
operator.wsoo.input.process(new MutablePair<String, Object>(operator.getFullTopic(operator.topNTopic, schemaObj), result));
else {
public LogstreamTopNInputPort setN(int n)
operator.nInTopN = n;
return this;
public LogstreamTopNInputPort setTopic(String topic)
operator.topNTopic = topic;
return this;