blob: 65611f3bacd67c87a9c11c0baebfec3ea29fdbbe [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.top.window;
import java.util.Date;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.classification.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A class for exposing a rolling window view on the event that occur over time.
* Events are reported based on occurrence time. The total number of events in
* the last period covered by the rolling window can be retrieved by the
* {@link #getSum(long)} method.
* <p/>
*
* Assumptions:
* <p/>
*
* (1) Concurrent invocation of {@link #incAt} method are possible
* <p/>
*
* (2) The time parameter of two consecutive invocation of {@link #incAt} could
* be in any given order
* <p/>
*
* (3) The buffering delays are not more than the window length, i.e., after two
* consecutive invocation {@link #incAt(long time1, long)} and
* {@link #incAt(long time2, long)}, time1 < time2 || time1 - time2 < windowLenMs.
* This assumption helps avoiding unnecessary synchronizations.
* <p/>
*
* Thread-safety is built in the {@link RollingWindow.Bucket}
*/
@InterfaceAudience.Private
public class RollingWindow {
private static final Logger LOG = LoggerFactory.getLogger(RollingWindow.class);
/**
* Each window is composed of buckets, which offer a trade-off between
* accuracy and space complexity: the lower the number of buckets, the less
* memory is required by the rolling window but more inaccuracy is possible in
* reading window total values.
*/
Bucket[] buckets;
final int windowLenMs;
final int bucketSize;
/**
* @param windowLenMs The period that is covered by the window. This period must
* be more than the buffering delays.
* @param numBuckets number of buckets in the window
*/
RollingWindow(int windowLenMs, int numBuckets) {
buckets = new Bucket[numBuckets];
for (int i = 0; i < numBuckets; i++) {
buckets[i] = new Bucket();
}
this.windowLenMs = windowLenMs;
this.bucketSize = windowLenMs / numBuckets;
if (this.bucketSize % bucketSize != 0) {
throw new IllegalArgumentException(
"The bucket size in the rolling window is not integer: windowLenMs= "
+ windowLenMs + " numBuckets= " + numBuckets);
}
}
/**
* When an event occurs at the specified time, this method reflects that in
* the rolling window.
* <p/>
*
* @param time the time at which the event occurred
* @param delta the delta that will be added to the window
*/
public void incAt(long time, long delta) {
int bi = computeBucketIndex(time);
Bucket bucket = buckets[bi];
// If the last time the bucket was updated is out of the scope of the
// rolling window, reset the bucket.
if (bucket.isStaleNow(time)) {
bucket.safeReset(time);
}
bucket.inc(delta);
}
private int computeBucketIndex(long time) {
int positionOnWindow = (int) (time % windowLenMs);
int bucketIndex = positionOnWindow * buckets.length / windowLenMs;
return bucketIndex;
}
/**
* Thread-safety is provided by synchronization when resetting the update time
* as well as atomic fields.
*/
private class Bucket {
private AtomicLong value = new AtomicLong(0);
private AtomicLong updateTime = new AtomicLong(-1); // -1 = never updated.
/**
* Check whether the last time that the bucket was updated is no longer
* covered by rolling window.
*
* @param time the current time
* @return true if the bucket state is stale
*/
boolean isStaleNow(long time) {
long utime = updateTime.get();
return (utime == -1) || (time - utime >= windowLenMs);
}
/**
* Safely reset the bucket state considering concurrent updates (inc) and
* resets.
*
* @param time the current time
*/
void safeReset(long time) {
// At any point in time, only one thread is allowed to reset the
// bucket
synchronized (this) {
if (isStaleNow(time)) {
// reset the value before setting the time, it allows other
// threads to safely assume that the value is updated if the
// time is not stale
value.set(0);
updateTime.set(time);
}
// else a concurrent thread has already reset it: do nothing
}
}
/**
* Increment the bucket. It assumes that staleness check is already
* performed. We do not need to update the {@link #updateTime} because as
* long as the {@link #updateTime} belongs to the current view of the
* rolling window, the algorithm works fine.
*/
void inc(long delta) {
value.addAndGet(delta);
}
}
/**
* Get value represented by this window at the specified time
* <p/>
*
* If time lags behind the latest update time, the new updates are still
* included in the sum
*
* @param time
* @return number of events occurred in the past period
*/
public long getSum(long time) {
long sum = 0;
for (Bucket bucket : buckets) {
boolean stale = bucket.isStaleNow(time);
if (!stale) {
sum += bucket.value.get();
}
if (LOG.isDebugEnabled()) {
long bucketTime = bucket.updateTime.get();
String timeStr = new Date(bucketTime).toString();
LOG.debug("Sum: + " + sum + " Bucket: updateTime: " + timeStr + " ("
+ bucketTime + ") isStale " + stale + " at " + time);
}
}
return sum;
}
}