blob: ad76c2800425a2954dba3df3ef39c4765d1387d9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.diagnostics;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.stream.Collectors;
import org.apache.samza.util.TimestampedValue;
/**
* A {@link BoundedList} buffers multiple instances of a type T in a list.
* {@link BoundedList}s are useful for maintaining, recording, or collecting values over time.
* For example, a set of specific logging-events (e.g., errors).
*
* Eviction is controlled by parameters (maxNumberOfItems and maxStaleness), which are set during instantiation.
* Eviction happens during element addition or during reads of the BoundedList (getValues).
*
* All public methods are thread-safe.
*
*/
public class BoundedList<T> {
private final String name;
private final Queue<TimestampedValue<T>> elements;
private final int maxNumberOfItems;
private final Duration maxStaleness;
private final static int DEFAULT_MAX_NITEMS = 1000;
private final static Duration DEFAULT_MAX_STALENESS = Duration.ofMinutes(60);
/**
* Create a new {@link BoundedList} that auto evicts based on the given maxNumberOfItems, maxStaleness, and period parameters.
*
* @param name Name to be assigned
* @param maxNumberOfItems The max number of items that can remain in the list
* @param maxStaleness The max staleness of items permitted in the list
*/
public BoundedList(String name, int maxNumberOfItems, Duration maxStaleness) {
this.name = name;
this.elements = new ConcurrentLinkedQueue<TimestampedValue<T>>();
this.maxNumberOfItems = maxNumberOfItems;
this.maxStaleness = maxStaleness;
}
/**
* Create a new {@link BoundedList} that auto evicts upto a max of 100 items and a max-staleness of 60 minutes.
* @param name Name to be assigned
*/
public BoundedList(String name) {
this(name, DEFAULT_MAX_NITEMS, DEFAULT_MAX_STALENESS);
}
/**
* Get the name assigned to this {@link BoundedList}
* @return the assigned name
*/
public String getName() {
return this.name;
}
/**
* Get the Collection of values currently in the list.
* @return the collection of values
*/
public Collection<T> getValues() {
this.evict();
return Collections.unmodifiableList(this.elements.stream().map(x -> x.getValue()).collect(Collectors.toList()));
}
/**
* Add a value to the list.
* (Timestamp assigned to this value is the current timestamp.)
* @param value The Gauge value to be added
*/
public void add(T value) {
this.elements.add(new TimestampedValue<T>(value, Instant.now().toEpochMilli()));
// perform any evictions that may be needed.
this.evict();
}
/**
* Removes the given elements from the list-gauge.
* @param elementsToRemove collection of elements to remove.
*/
public void remove(Collection<T> elementsToRemove) {
Iterator<TimestampedValue<T>> iterator = this.elements.iterator();
while (iterator.hasNext()) {
TimestampedValue<T> value = iterator.next();
if (elementsToRemove.contains(value.getValue())) {
iterator.remove();
}
}
}
/**
* Evicts entries from the elements list, based on the given item-size and durationThreshold.
* Concurrent eviction threads can cause incorrectness (when reading elements.size or elements.peek).
*/
private synchronized void evict() {
this.evictBasedOnSize();
this.evictBasedOnTimestamp();
}
/**
* Evicts entries from elements in FIFO order until it has maxNumberOfItems
*/
private void evictBasedOnSize() {
int numToEvict = this.elements.size() - this.maxNumberOfItems;
while (numToEvict > 0) {
this.elements.poll(); // remove head
numToEvict--;
}
}
/**
* Removes entries from elements to ensure no element has a timestamp more than maxStaleness before current timestamp.
*/
private void evictBasedOnTimestamp() {
Instant currentTimestamp = Instant.now();
TimestampedValue<T> valueInfo = this.elements.peek();
// continue remove-head if currenttimestamp - head-element's timestamp > durationThreshold
while (valueInfo != null
&& currentTimestamp.toEpochMilli() - valueInfo.getTimestamp() > this.maxStaleness.toMillis()) {
this.elements.poll();
valueInfo = this.elements.peek();
}
}
}