blob: e7b626cd5e9e4c7c480245c1c9e28aa50cb0ef12 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.utils;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import com.google.common.base.Function;
import com.google.common.util.concurrent.Uninterruptibles;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
public class ExpiringMap<K, V>
{
private static final Logger logger = LoggerFactory.getLogger(ExpiringMap.class);
private volatile boolean shutdown;
public static class CacheableObject<T>
{
public final T value;
public final long timeout;
private final long createdAt;
private CacheableObject(T value, long timeout)
{
assert value != null;
this.value = value;
this.timeout = timeout;
this.createdAt = System.nanoTime();
}
private boolean isReadyToDieAt(long atNano)
{
return atNano - createdAt > TimeUnit.MILLISECONDS.toNanos(timeout);
}
}
// if we use more ExpiringMaps we may want to add multiple threads to this executor
private static final ScheduledExecutorService service = new DebuggableScheduledThreadPoolExecutor("EXPIRING-MAP-REAPER");
private final ConcurrentMap<K, CacheableObject<V>> cache = new ConcurrentHashMap<K, CacheableObject<V>>();
private final long defaultExpiration;
public ExpiringMap(long defaultExpiration)
{
this(defaultExpiration, null);
}
/**
*
* @param defaultExpiration the TTL for objects in the cache in milliseconds
*/
public ExpiringMap(long defaultExpiration, final Function<Pair<K,CacheableObject<V>>, ?> postExpireHook)
{
this.defaultExpiration = defaultExpiration;
if (defaultExpiration <= 0)
{
throw new IllegalArgumentException("Argument specified must be a positive number");
}
Runnable runnable = new Runnable()
{
public void run()
{
long start = System.nanoTime();
int n = 0;
for (Map.Entry<K, CacheableObject<V>> entry : cache.entrySet())
{
if (entry.getValue().isReadyToDieAt(start))
{
if (cache.remove(entry.getKey()) != null)
{
n++;
if (postExpireHook != null)
postExpireHook.apply(Pair.create(entry.getKey(), entry.getValue()));
}
}
}
logger.trace("Expired {} entries", n);
}
};
service.scheduleWithFixedDelay(runnable, defaultExpiration / 2, defaultExpiration / 2, TimeUnit.MILLISECONDS);
}
public void shutdownBlocking()
{
service.shutdown();
try
{
service.awaitTermination(defaultExpiration * 2, TimeUnit.MILLISECONDS);
}
catch (InterruptedException e)
{
throw new AssertionError(e);
}
}
public void reset()
{
shutdown = false;
cache.clear();
}
public V put(K key, V value)
{
return put(key, value, this.defaultExpiration);
}
public V put(K key, V value, long timeout)
{
if (shutdown)
{
// StorageProxy isn't equipped to deal with "I'm nominally alive, but I can't send any messages out."
// So we'll just sit on this thread until the rest of the server shutdown completes.
//
// See comments in CustomTThreadPoolServer.serve, CASSANDRA-3335, and CASSANDRA-3727.
Uninterruptibles.sleepUninterruptibly(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
}
CacheableObject<V> previous = cache.put(key, new CacheableObject<V>(value, timeout));
return (previous == null) ? null : previous.value;
}
public V get(K key)
{
CacheableObject<V> co = cache.get(key);
return co == null ? null : co.value;
}
public V remove(K key)
{
CacheableObject<V> co = cache.remove(key);
return co == null ? null : co.value;
}
/**
* @return System.nanoTime() when key was put into the map.
*/
public long getAge(K key)
{
CacheableObject<V> co = cache.get(key);
return co == null ? 0 : co.createdAt;
}
public int size()
{
return cache.size();
}
public boolean containsKey(K key)
{
return cache.containsKey(key);
}
public boolean isEmpty()
{
return cache.isEmpty();
}
public Set<K> keySet()
{
return cache.keySet();
}
}