blob: 51dad419490a4d05cf9b25a47cbb5f2f4e0db9a9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.eagle.app.environment.builder;
import com.google.common.base.Preconditions;
import org.apache.eagle.app.utils.Clock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public class CounterToRateFunction implements TransformFunction {
private static final Logger LOG = LoggerFactory.getLogger(CounterToRateFunction.class);
private final Map<String, CounterValue> cache;
private MetricDescriptor metricDescriptor;
private Collector collector;
public CounterToRateFunction(MetricDescriptor metricDescriptor, long heartbeat, TimeUnit unit, final Clock clock) {
final long heartbeatMillis = TimeUnit.MILLISECONDS.convert(heartbeat, unit);
this.cache = new LinkedHashMap<String, CounterValue>(16, 0.75f, true) {
protected boolean removeEldestEntry(Map.Entry<String, CounterValue> eldest) {
final long now = clock.now();
final long lastMod = eldest.getValue().getTimestamp();
final boolean expired = (now - lastMod) > heartbeatMillis;
if (expired) {
LOG.debug("heartbeat interval exceeded, expiring {}", eldest.getKey());
}
return expired;
}
};
this.metricDescriptor = metricDescriptor;
}
@Override
public String getName() {
return "CounterToRate";
}
@Override
public void open(Collector collector) {
this.collector = collector;
}
@Override
public void transform(Map event) {
Metric metric = toMetric(event);
LOG.debug("received {} metrics", metric);
if (new DefaultCountMetricFilter().apply(metric.getMetricName())) {
final String metricName = metric.getMetricName();
final CounterValue prev = cache.get(metricName);
if (prev != null) {
final double rate = prev.computeRate(metric);
event.put(metricDescriptor.getValueField(), rate);
collector.collect(event.toString(), event);
} else {
CounterValue current = new CounterValue(metric);
cache.put(metricName, current);
}
} else {
collector.collect(event.toString(), event);
}
}
@Override
public void close() {
cache.clear();
}
private Metric toMetric(Map event) {
String metricName = "";
for (String dimensionField : metricDescriptor.getDimensionFields()) {
metricName += event.get(dimensionField) + "-";
}
metricName += metricDescriptor.getMetricNameSelector().getMetricName(event);
long timestamp = metricDescriptor.getTimestampSelector().getTimestamp(event);
return new Metric(metricName, timestamp, getCurrentValue(event));
}
private double getCurrentValue(Map event) {
double[] values;
if (event.containsKey(metricDescriptor.getValueField())) {
values = new double[] {(double) event.get(metricDescriptor.getValueField())};
} else {
LOG.warn("Event has no value field '{}': {}, use 0 by default", metricDescriptor.getValueField(), event);
values = new double[] {0};
}
return values[0];
}
protected static class CounterValue {
private long timestamp;
private double value;
public CounterValue(long timestamp, double value) {
this.timestamp = timestamp;
this.value = value;
}
public CounterValue(Metric m) {
this(m.getTimestamp(), m.getNumberValue().doubleValue());
}
public long getTimestamp() {
return timestamp;
}
public double computeRate(Metric m) {
final long currentTimestamp = m.getTimestamp();
final double currentValue = m.getNumberValue().doubleValue();
final long durationMillis = currentTimestamp - timestamp;
final double delta = currentValue - value;
timestamp = currentTimestamp;
value = currentValue;
return computeRate(durationMillis, delta);
}
private double computeRate(long durationMillis, double delta) {
final double millisPerSecond = 1000.0;
final double duration = durationMillis / millisPerSecond;
return (duration <= 0.0 || delta <= 0.0) ? 0.0 : delta / duration;
}
@Override
public String toString() {
return "CounterValue{" + "timestamp=" + timestamp + ", value=" + value + '}';
}
}
protected final class Metric {
private final String metricName;
private final long timestamp;
private final Object value;
public Metric(String metricName, long timestamp, Object value) {
this.metricName = Preconditions.checkNotNull(metricName, "metricName");
this.timestamp = timestamp;
this.value = Preconditions.checkNotNull(value, "value");
}
public String getMetricName() {
return metricName;
}
public long getTimestamp() {
return timestamp;
}
public Object getValue() {
return value;
}
public Number getNumberValue() {
return (Number) value;
}
public boolean hasNumberValue() {
return (value instanceof Number);
}
public boolean isCounter() {
return metricName.endsWith("count");
}
@Override
public boolean equals(Object obj) {
if (obj == null || !(obj instanceof Metric)) {
return false;
}
Metric m = (Metric) obj;
return metricName.equals(m.getMetricName())
&& timestamp == m.getTimestamp()
&& value.equals(m.getValue());
}
@Override
public int hashCode() {
int result = metricName.hashCode();
result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
result = 31 * result + value.hashCode();
return result;
}
@Override
public String toString() {
return "Metric{metricName=" + metricName + ", timestamp=" + timestamp + ", value=" + value + '}';
}
}
private class DefaultCountMetricFilter implements CountMetricFilter {
@Override
public Boolean apply(String metricName) {
return metricName.endsWith("count");
}
}
}