blob: 91104afde6c0bbe0e7c0d5d272506c85a96ca973 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.malhar.lib.appdata.datastructs;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import it.unimi.dsi.fastutil.longs.Long2ObjectAVLTreeMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectSortedMap;
/**
* This is an LRU cache.
* @param <KEY> The type of keys in the cache.
* @param <VALUE> The type of values in the cache.
* @since 3.0.0
*/
public class CacheLRUSynchronousFlush<KEY, VALUE>
{
public static final int DEFAULT_FLUSHED_SIZE = 50000;
private Map<KEY, VALUE> keyToValue = Maps.newHashMap();
private Long2ObjectSortedMap<Set<KEY>> timeStampToKey = new Long2ObjectAVLTreeMap<Set<KEY>>();
private Map<KEY, Long> keyToTimeStamp = new HashMap<KEY, Long>();
private Set<KEY> changed = Sets.newHashSet();
private int flushedSize = DEFAULT_FLUSHED_SIZE;
private CacheFlushListener<KEY, VALUE> flushListener;
public CacheLRUSynchronousFlush(CacheFlushListener<KEY, VALUE> flushListener)
{
setFlushListener(flushListener);
}
public CacheLRUSynchronousFlush(int flushedSize, CacheFlushListener<KEY, VALUE> flushListener)
{
setFlushedSizePri(flushedSize);
setFlushListener(flushListener);
}
private void setFlushListener(CacheFlushListener<KEY, VALUE> flushListener)
{
Preconditions.checkNotNull(flushListener);
this.flushListener = flushListener;
}
/**
* @return the flushedSize
*/
public int getFlushedSize()
{
return flushedSize;
}
/**
* @param flushedSize the flushedSize to set
*/
public void setFlushedSize(int flushedSize)
{
setFlushedSizePri(flushedSize);
}
private void setFlushedSizePri(int flushedSize)
{
Preconditions.checkArgument(flushedSize > 0, "The flushedSize must be positive.");
this.flushedSize = flushedSize;
}
public VALUE put(KEY key, VALUE value)
{
long timeStamp = System.currentTimeMillis();
return put(timeStamp, key, value);
}
public VALUE put(long timeStamp, KEY key, VALUE value)
{
Preconditions.checkNotNull(key);
Preconditions.checkNotNull(value);
changed.add(key);
Long oldTimeStamp = keyToTimeStamp.put(key, timeStamp);
if (oldTimeStamp == null || oldTimeStamp != timeStamp) {
Set<KEY> keys = timeStampToKey.get(timeStamp);
if (keys == null) {
keys = Sets.newHashSet();
timeStampToKey.put(timeStamp, keys);
}
keys.add(key);
}
if (oldTimeStamp != null) {
timeStampToKey.get(oldTimeStamp).remove(key);
}
return keyToValue.put(key, value);
}
public VALUE get(KEY key)
{
Preconditions.checkNotNull(key);
return keyToValue.get(key);
}
public VALUE remove(KEY key)
{
Preconditions.checkNotNull(key);
Long timeStamp = keyToTimeStamp.get(key);
if (timeStamp != null) {
keyToTimeStamp.remove(key);
Set<KEY> keys = timeStampToKey.get(timeStamp);
keys.remove(key);
}
changed.remove(key);
return keyToValue.remove(key);
}
public void removeExcess()
{
int currentSize = keyToValue.size();
while (currentSize > flushedSize) {
long firstKey = timeStampToKey.firstLongKey();
Set<KEY> keys = timeStampToKey.get(firstKey);
Iterator<KEY> keyIterator = keys.iterator();
while (keyIterator.hasNext() && currentSize > flushedSize) {
KEY key = keyIterator.next();
VALUE value = keyToValue.remove(key);
if (value == null) {
continue;
}
currentSize--;
changed.remove(key);
keyToTimeStamp.remove(key);
flushListener.flush(key, value);
}
if (keys.isEmpty()) {
timeStampToKey.remove(firstKey);
}
}
}
public void flushChanges()
{
for (KEY key : changed) {
VALUE value = keyToValue.get(key);
flushListener.flush(key, value);
}
changed.clear();
}
public void removeExcessAndFlushChanges()
{
removeExcess();
flushChanges();
}
/**
* An interface for a listener whose callback is called when the cache is flushed.
* @param <KEY> The type of keys in the cache.
* @param <VALUE> The type of values in the cache.
*/
public interface CacheFlushListener<KEY, VALUE>
{
/**
* The method that is called when a key-value pair is flushed from the cache.
* @param key The value of the key for the pair that is flushed from the cache.
* @param value The value of the value for the pair that is flushed from the cache.
*/
public void flush(KEY key, VALUE value);
}
}