blob: 9f6b9721539079300b9e4fd65a7b472f68c5bab5 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.util;
import java.util.Comparator;
import java.util.List;
import java.util.PriorityQueue;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
/**
* Thread-safe class that represents a TOP-N cache of items. It stores the top-N items of
* a generic type T based on a user-specified ranking function that returns a numeric
* value (long).
*
* The cache has a maximum capacity (N) of stored items. The implementation allows two
* policies with respect to the way new items are handled when maximum capacity is
* reached:
* a) Always evict policy: A new item will always replace the item with the lowest rank
* according to the specified ranking function even if the rank of the newly added
* function is lower than the one to be replaced.
* b) Rank-based eviction policy: A new item will be added to the cache iff its rank is
* higher than the smallest rank in the cache and the item with that rank will be evicted.
*
* TODO: Replace these two policies with an LFU cache with dynamic aging.
*/
public final class TopNCache<T, R extends Long> {
// Function used to rank items stored in this cache.
private final Function<T, R> function_;
// Maximum capacity of this cache.
private final int maxCapacity_;
// The cache is stored as a priority queue.
private final PriorityQueue<T> heap_;
// Determines the eviction policy to apply when the cache reaches maximum capacity.
// TODO: Convert to enum?
private final boolean alwaysEvictAtCapacity_;
/**
* Compares the ranks of two T objects, returning 0 if they are equal, < 0 if the rank
* of the first is smaller, or > 0 if the rank of the first object is larger.
*/
private int compareRanks(T t1, T t2) {
return function_.apply(t1).compareTo(function_.apply(t2));
}
public TopNCache(Function<T, R> f, int maxCapacity, boolean evictAtCapacity) {
Preconditions.checkNotNull(f);
Preconditions.checkState(maxCapacity > 0);
function_ = f;
maxCapacity_ = maxCapacity;
heap_ = new PriorityQueue<T>(maxCapacity_,
new Comparator<T>() {
@Override
public int compare(T t1, T t2) { return compareRanks(t1, t2); }
}
);
alwaysEvictAtCapacity_ = evictAtCapacity;
}
/**
* Adds or updates an item in the cache. If the item already exists, its rank position
* is refreshed by removing and adding the item back to the cache. If the item is not in
* the cache and maximum capacity hasn't been reached, the item is added to the cache.
* Otherwise, the eviction policy is applied and the item either replaces the cache item
* with the lowest rank or it is rejected from the cache if its rank is lower than the
* lowest rank in the cache.
*/
public synchronized void putOrUpdate(T item) {
if (!heap_.remove(item)) {
if (heap_.size() == maxCapacity_) {
if (!alwaysEvictAtCapacity_ && compareRanks(item, heap_.peek()) <= 0) {
return;
}
heap_.poll();
}
}
heap_.add(item);
}
/**
* Removes an item from the cache.
*/
public synchronized void remove(T item) { heap_.remove(item); }
/**
* Returns the list of all the items in the cache.
*/
public synchronized List<T> listEntries() { return ImmutableList.copyOf(heap_); }
}