blob: e505a1f9e25286402378884fd23cb550cd381b77 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.jstorm.common.metric.old.window;
import com.alibaba.jstorm.callback.Callback;
import com.alibaba.jstorm.common.metric.old.operator.Sampling;
import com.alibaba.jstorm.common.metric.old.operator.convert.Convertor;
import com.alibaba.jstorm.common.metric.old.operator.merger.Merger;
import com.alibaba.jstorm.common.metric.old.operator.updater.Updater;
import com.alibaba.jstorm.utils.IntervalCheck;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
public class Metric<T, V> implements Sampling<Map<Integer, T>> {
private static final long serialVersionUID = -1362345159511508074L;
private static final Logger LOG = LoggerFactory.getLogger(Metric.class);
protected static boolean enable;
public static void setEnable(boolean e) {
enable = e;
}
protected List<RollingWindow<V>> rollingWindows;
protected AllWindow<V> allWindow;
protected int[] windowSeconds = { StatBuckets.MINUTE_WINDOW, StatBuckets.HOUR_WINDOW, StatBuckets.DAY_WINDOW };
protected int bucketSize = StatBuckets.NUM_STAT_BUCKETS;
protected V defaultValue;
protected Updater<V> updater;
protected Merger<V> merger;
protected Convertor<V, T> convertor;
protected Callback callback;
protected int interval; // unit is second
protected IntervalCheck intervalCheck;
protected V unflushed;
public Metric() {
}
public int getInterval() {
if (windowSeconds == null || windowSeconds.length == 0) {
return StatBuckets.NUM_STAT_BUCKETS;
}
int intervals[] = new int[windowSeconds.length];
int smallest = Integer.MAX_VALUE;
for (int i = 0; i < windowSeconds.length; i++) {
int interval = windowSeconds[i] / bucketSize;
intervals[i] = interval;
if (interval < smallest) {
smallest = interval;
}
}
for (int goodInterval = smallest; goodInterval > 1; goodInterval--) {
boolean good = true;
for (int interval : intervals) {
if (interval % goodInterval != 0) {
good = false;
break;
}
}
if (good == true) {
return goodInterval;
}
}
return 1;
}
public void init() {
if (defaultValue == null || updater == null || merger == null || convertor == null) {
throw new IllegalArgumentException("Invalid argements");
}
rollingWindows = new ArrayList<RollingWindow<V>>();
if (windowSeconds != null) {
rollingWindows.clear();
for (int windowSize : windowSeconds) {
RollingWindow<V> rollingWindow = new RollingWindow<V>(defaultValue, windowSize / bucketSize, windowSize, updater, merger);
rollingWindows.add(rollingWindow);
}
}
allWindow = new AllWindow<V>(defaultValue, updater, merger);
this.interval = getInterval();
this.intervalCheck = new IntervalCheck();
this.intervalCheck.setInterval(interval);
}
/**
* In order to improve performance Do
*/
@Override
public void update(Number obj) {
if (enable == false) {
return;
}
if (intervalCheck.check()) {
flush();
}
synchronized (this) {
unflushed = updater.update(obj, unflushed);
}
}
public synchronized void flush() {
if (unflushed == null) {
return;
}
for (RollingWindow<V> rollingWindow : rollingWindows) {
rollingWindow.updateBatch(unflushed);
}
allWindow.updateBatch(unflushed);
unflushed = null;
}
@Override
public Map<Integer, T> getSnapshot() {
// TODO Auto-generated method stub
flush();
Map<Integer, T> ret = new TreeMap<Integer, T>();
for (RollingWindow<V> rollingWindow : rollingWindows) {
V value = rollingWindow.getSnapshot();
ret.put(rollingWindow.getWindowSecond(), convertor.convert(value));
}
ret.put(StatBuckets.ALL_TIME_WINDOW, convertor.convert(allWindow.getSnapshot()));
if (callback != null) {
callback.execute(this);
}
return ret;
}
public T getAllTimeValue() {
return convertor.convert(allWindow.getSnapshot());
}
public int[] getWindowSeconds() {
return windowSeconds;
}
public void setWindowSeconds(int[] windowSeconds) {
this.windowSeconds = windowSeconds;
}
public int getBucketSize() {
return bucketSize;
}
public void setBucketSize(int bucketSize) {
this.bucketSize = bucketSize;
}
public V getDefaultValue() {
return defaultValue;
}
public void setDefaultValue(V defaultValue) {
this.defaultValue = defaultValue;
}
public Updater<V> getUpdater() {
return updater;
}
public void setUpdater(Updater<V> updater) {
this.updater = updater;
}
public Merger<V> getMerger() {
return merger;
}
public void setMerger(Merger<V> merger) {
this.merger = merger;
}
public Convertor<V, T> getConvertor() {
return convertor;
}
public void setConvertor(Convertor<V, T> convertor) {
this.convertor = convertor;
}
public Callback getCallback() {
return callback;
}
public void setCallback(Callback callback) {
this.callback = callback;
}
}