blob: a7e4c452578eefe877a461e65d6a382c851ca095 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.util;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
import java.util.Arrays;
import org.apache.beam.sdk.transforms.Combine;
/**
* Keep track of the moving minimum/maximum/sum of sampled long values. The minimum/maximum/sum is
* over at most the user-specified last {@code samplePeriodMs}, and is updated every {@code
* sampleUpdateMs}.
*/
public class MovingFunction {
/** How frequently to update the moving function, in ms. */
private final long sampleUpdateMs;
/** How many buckets are considered 'significant'? */
private final int numSignificantBuckets;
/** How many samples are considered 'significant'? */
private final int numSignificantSamples;
/** Function for combining sample values. */
private final Combine.BinaryCombineLongFn function;
/** Minimum/maximum/sum of all values per bucket. */
private final long[] buckets;
/** How many samples have been added to each bucket. */
private final int[] numSamples;
/** Time of start of current bucket. */
private long currentMsSinceEpoch;
/** Index of bucket corresponding to above timestamp, or -1 if no entries. */
private int currentIndex;
public MovingFunction(
long samplePeriodMs,
long sampleUpdateMs,
int numSignificantBuckets,
int numSignificantSamples,
Combine.BinaryCombineLongFn function) {
this.sampleUpdateMs = sampleUpdateMs;
this.numSignificantBuckets = numSignificantBuckets;
this.numSignificantSamples = numSignificantSamples;
this.function = function;
int n = (int) (samplePeriodMs / sampleUpdateMs);
buckets = new long[n];
Arrays.fill(buckets, function.identity());
numSamples = new int[n];
Arrays.fill(numSamples, 0);
currentMsSinceEpoch = -1;
currentIndex = 0;
}
/** Flush stale values. */
private void flush(long nowMsSinceEpoch) {
checkArgument(nowMsSinceEpoch >= 0, "Only positive timestamps supported");
checkArgument(nowMsSinceEpoch >= currentMsSinceEpoch, "Attempting to move backwards");
int newBuckets =
Math.min((int) ((nowMsSinceEpoch - currentMsSinceEpoch) / sampleUpdateMs), buckets.length);
currentMsSinceEpoch = nowMsSinceEpoch - (nowMsSinceEpoch % sampleUpdateMs);
while (newBuckets > 0) {
currentIndex = (currentIndex + 1) % buckets.length;
buckets[currentIndex] = function.identity();
numSamples[currentIndex] = 0;
newBuckets--;
}
}
/** Add {@code value} at {@code nowMsSinceEpoch}. */
public void add(long nowMsSinceEpoch, long value) {
flush(nowMsSinceEpoch);
buckets[currentIndex] = function.apply(buckets[currentIndex], value);
numSamples[currentIndex]++;
}
/**
* Return the minimum/maximum/sum of all retained values within samplePeriodMs of {@code
* nowMsSinceEpoch}.
*/
public long get(long nowMsSinceEpoch) {
flush(nowMsSinceEpoch);
long result = function.identity();
for (long bucket : buckets) {
result = function.apply(result, bucket);
}
return result;
}
/**
* Is the current result 'significant'? Ie is it drawn from enough buckets or from enough samples?
*/
public boolean isSignificant() {
int totalSamples = 0;
int activeBuckets = 0;
for (int i = 0; i < buckets.length; i++) {
totalSamples += numSamples[i];
if (numSamples[i] > 0) {
activeBuckets++;
}
}
return activeBuckets >= numSignificantBuckets || totalSamples >= numSignificantSamples;
}
}