blob: 10319d0d208128890c6e95eac24dad66a577bdb8 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.util;
import java.time.Clock;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneOffset;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.atlas.util.AtlasMetricsCounter.Period.*;
public class AtlasMetricsCounter {
public enum Period { ALL, CURR_DAY, CURR_HOUR, PREV_HOUR, PREV_DAY };
private final String name;
private final Stats stats;
private Clock clock;
private Instant lastIncrTime;
private Instant dayStartTime;
private Instant dayEndTime;
private Instant hourStartTime;
private Instant hourEndTime;
public AtlasMetricsCounter(String name) {
this(name, Clock.systemUTC());
}
public AtlasMetricsCounter(String name, Clock clock) {
this.name = name;
this.stats = new Stats();
init(clock);
}
public String getName() { return name; }
public Instant getLastIncrTime() { return lastIncrTime; }
public void incr() {
incrByWithMeasure(1, 0);
}
public void incrBy(long count) {
incrByWithMeasure(count, 0);
}
public void incrWithMeasure(long measure) {
incrByWithMeasure(1, measure);
}
public void incrByWithMeasure(long count, long measure) {
Instant instant = clock.instant();
stats.addCount(ALL, count);
stats.addMeasure(ALL, measure);
if (instant.isAfter(dayStartTime)) { // ignore times earlier than start of current day
lastIncrTime = instant;
updateForTime(instant);
stats.addCount(CURR_DAY, count);
stats.addMeasure(CURR_DAY, measure);
if (instant.isAfter(hourStartTime)) { // ignore times earlier than start of current hour
stats.addCount(CURR_HOUR, count);
stats.addMeasure(CURR_HOUR, measure);
}
}
}
public StatsReport report() {
updateForTime(clock.instant());
return new StatsReport(stats, dayStartTime.toEpochMilli(), hourStartTime.toEpochMilli());
}
// visible only for testing
void init(Clock clock) {
this.clock = clock;
this.lastIncrTime = Instant.ofEpochSecond(0);
this.dayStartTime = Instant.ofEpochSecond(0);
this.dayEndTime = Instant.ofEpochSecond(0);
this.hourStartTime = Instant.ofEpochSecond(0);
this.hourEndTime = Instant.ofEpochSecond(0);
updateForTime(clock.instant());
}
protected void updateForTime(Instant now) {
Instant dayEndTime = this.dayEndTime;
Instant hourEndTime = this.hourEndTime;
if (now.isAfter(dayEndTime)) {
rolloverDay(dayEndTime, now);
rolloverHour(hourEndTime, now);
} else if (now.isAfter(hourEndTime)) {
rolloverHour(hourEndTime, now);
}
}
protected synchronized void rolloverDay(Instant fromDayEndTime, Instant now) {
if (fromDayEndTime == dayEndTime) { // only if rollover was not done already
Instant dayStartTime = getDayStartTime(now);
if (dayStartTime.equals(dayEndTime)) {
stats.copy(CURR_DAY, PREV_DAY);
} else {
stats.reset(PREV_DAY);
}
stats.reset(CURR_DAY);
this.dayStartTime = dayStartTime;
this.dayEndTime = getNextDayStartTime(now);
}
}
protected synchronized void rolloverHour(Instant fromHourEndTime, Instant now) {
if (fromHourEndTime == hourEndTime) { // only if rollover was not done already
Instant hourStartTime = getHourStartTime(now);
if (hourStartTime.equals(hourEndTime)) {
stats.copy(CURR_HOUR, PREV_HOUR);
} else {
stats.reset(PREV_HOUR);
}
stats.reset(CURR_HOUR);
this.hourStartTime = hourStartTime;
this.hourEndTime = getNextHourStartTime(now);
}
}
public static LocalDateTime getLocalDateTime(Instant instant) {
return LocalDateTime.ofInstant(instant, ZoneOffset.UTC);
}
public static Instant getHourStartTime(Instant instant) {
LocalDateTime time = getLocalDateTime(instant);
return LocalDateTime.of(time.toLocalDate(), LocalTime.MIN).plusHours(time.getHour()).toInstant(ZoneOffset.UTC);
}
public static Instant getNextHourStartTime(Instant instant) {
LocalDateTime time = getLocalDateTime(instant);
return LocalDateTime.of(time.toLocalDate(), LocalTime.MIN).plusHours(time.getHour() + 1).toInstant(ZoneOffset.UTC);
}
public static Instant getDayStartTime(Instant instant) {
LocalDateTime time = getLocalDateTime(instant);
return LocalDateTime.of(time.toLocalDate(), LocalTime.MIN).toInstant(ZoneOffset.UTC);
}
public static Instant getNextDayStartTime(Instant instant) {
LocalDateTime time = getLocalDateTime(instant);
return LocalDateTime.of(time.toLocalDate().plusDays(1), LocalTime.MIN).toInstant(ZoneOffset.UTC);
}
public static class Stats {
private static final int NUM_PERIOD = Period.values().length;
private final long dayStartTimeMs;
private final long hourStartTimeMs;
private final AtomicLong[] count = new AtomicLong[NUM_PERIOD];
private final AtomicLong[] measureSum = new AtomicLong[NUM_PERIOD];
private final AtomicLong[] measureMin = new AtomicLong[NUM_PERIOD];
private final AtomicLong[] measureMax = new AtomicLong[NUM_PERIOD];
public Stats() {
dayStartTimeMs = 0;
hourStartTimeMs = 0;
for (Period period : Period.values()) {
reset(period);
}
}
public void addCount(Period period, long num) {
count[period.ordinal()].addAndGet(num);
}
public void addMeasure(Period period, long measure) {
int idx = period.ordinal();
measureSum[idx].addAndGet(measure);
if (measureMin[idx].get() > measure) {
measureMin[idx].set(measure);
}
if (measureMax[idx].get() < measure) {
measureMax[idx].set(measure);
}
}
private void copy(Period src, Period dest) {
int srcIdx = src.ordinal();
int destIdx = dest.ordinal();
count[destIdx].set(count[srcIdx].get());
measureSum[destIdx].set(measureSum[srcIdx].get());
measureMin[destIdx].set(measureMin[srcIdx].get());
measureMax[destIdx].set( measureMax[srcIdx].get());
}
private void reset(Period period) {
int idx = period.ordinal();
count[idx] = new AtomicLong(0);
measureSum[idx] = new AtomicLong(0);
measureMin[idx] = new AtomicLong(Long.MAX_VALUE);
measureMax[idx] = new AtomicLong(Long.MIN_VALUE);
}
}
public static class StatsReport {
private static final int NUM_PERIOD = Period.values().length;
private final long dayStartTimeMs;
private final long hourStartTimeMs;
private final long[] count = new long[NUM_PERIOD];
private final long[] measureSum = new long[NUM_PERIOD];
private final long[] measureMin = new long[NUM_PERIOD];
private final long[] measureMax = new long[NUM_PERIOD];
public StatsReport(Stats other, long dayStartTimeMs, long hourStartTimeMs) {
this.dayStartTimeMs = dayStartTimeMs;
this.hourStartTimeMs = hourStartTimeMs;
copy(other.count, this.count);
copy(other.measureSum, this.measureSum);
copy(other.measureMin, this.measureMin);
copy(other.measureMax, this.measureMax);
}
public long getDayStartTimeMs() { return dayStartTimeMs; }
public long getHourStartTimeMs() { return hourStartTimeMs; }
public long getCount(Period period) { return count[period.ordinal()]; }
public long getMeasureSum(Period period) { return measureSum[period.ordinal()]; }
public long getMeasureMin(Period period) { return measureMin[period.ordinal()]; }
public long getMeasureMax(Period period) { return measureMax[period.ordinal()]; }
public long getMeasureAvg(Period period) {
int idx = period.ordinal();
long c = count[idx];
return c != 0 ? (measureSum[idx] / c) : 0;
}
private void copy(AtomicLong[] src, long[] dest) {
for (int i = 0; i < dest.length; i++) {
dest[i] = src[i].get();
}
}
}
}