blob: 8a9855537054484de475d21fb4ddd5b96a861df2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hugegraph.backend.cache;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock;
import java.util.function.Consumer;
import org.apache.hugegraph.backend.id.Id;
import org.apache.hugegraph.concurrent.KeyLock;
import org.apache.hugegraph.perf.PerfUtil.Watched;
import org.apache.hugegraph.util.E;
public class RamCache extends AbstractCache<Id, Object> {
// Implement LRU cache
private final ConcurrentMap<Id, LinkNode<Id, Object>> map;
private final LinkedQueueNonBigLock<Id, Object> queue;
private final KeyLock keyLock;
private final long halfCapacity;
public RamCache() {
this(DEFAULT_SIZE);
}
public RamCache(long capacity) {
super(capacity);
if (capacity < 0L) {
capacity = 0L;
}
this.keyLock = new KeyLock();
this.halfCapacity = capacity >> 1;
long initialCapacity = capacity >= MB ? capacity >> 10 : 256;
if (initialCapacity > MAX_INIT_CAP) {
initialCapacity = MAX_INIT_CAP;
}
this.map = new ConcurrentHashMap<>((int) initialCapacity);
this.queue = new LinkedQueueNonBigLock<>();
}
@Override
@Watched(prefix = "ramcache")
protected final Object access(Id id) {
assert id != null;
if (this.map.size() <= this.halfCapacity) {
LinkNode<Id, Object> node = this.map.get(id);
if (node == null) {
return null;
}
assert id.equals(node.key());
return node.value();
}
// Avoid to catch lock if cache missed
if (!this.containsKey(id)) {
return null;
}
final Lock lock = this.keyLock.lock(id);
try {
// Maybe the id removed by other threads and returned null value
LinkNode<Id, Object> node = this.map.get(id);
if (node == null) {
return null;
}
// NOTE: update the queue only if the size > capacity/2
if (this.map.size() > this.halfCapacity) {
// Move the node from mid to tail
if (this.queue.remove(node) == null) {
// The node may be removed by others through dequeue()
return null;
}
this.queue.enqueue(node);
}
assert id.equals(node.key());
return node.value();
} finally {
lock.unlock();
}
}
@Override
@Watched(prefix = "ramcache")
protected final boolean write(Id id, Object value, long timeOffset) {
assert id != null;
long capacity = this.capacity();
assert capacity > 0;
final Lock lock = this.keyLock.lock(id);
try {
// The cache is full
this.removeOldestIfCacheFull(id, capacity);
// Remove the old node if exists
LinkNode<Id, Object> node = this.map.get(id);
if (node != null) {
this.queue.remove(node);
}
// Add the new item to tail of the queue, then map it
this.map.put(id, this.queue.enqueue(id, value, timeOffset));
return true;
} finally {
lock.unlock();
}
}
@Override
@Watched(prefix = "ramcache")
protected final void remove(Id id) {
if (id == null) {
return;
}
final Lock lock = this.keyLock.lock(id);
try {
/*
* Remove the id from map and queue
* NOTE: it maybe return null if other threads have removed the id
*/
LinkNode<Id, Object> node = this.map.remove(id);
if (node != null) {
this.queue.remove(node);
}
} finally {
lock.unlock();
}
}
@Override
protected Iterator<CacheNode<Id, Object>> nodes() {
Iterator<LinkNode<Id, Object>> iter = this.map.values().iterator();
@SuppressWarnings({ "unchecked", "rawtypes" })
Iterator<CacheNode<Id, Object>> iterSuper = (Iterator) iter;
return iterSuper;
}
@Override
public boolean containsKey(Id id) {
return this.map.containsKey(id);
}
@Watched(prefix = "ramcache")
@Override
public void traverse(Consumer<Object> consumer) {
E.checkNotNull(consumer, "consumer");
// NOTE: forEach is 20% faster than for-in with ConcurrentHashMap
this.map.values().forEach(node -> consumer.accept(node.value()));
}
@Watched(prefix = "ramcache")
@Override
public void clear() {
// TODO: synchronized
if (this.capacity() <= 0 || this.map.isEmpty()) {
return;
}
this.map.clear();
this.queue.clear();
}
@Override
public long size() {
return this.map.size();
}
@Override
public String toString() {
return this.map.toString();
}
private void removeOldestIfCacheFull(Id id, long capacity) {
while (this.map.size() >= capacity) {
/*
* Remove the oldest from the queue
* NOTE: it maybe return null if someone else (that's other
* threads) are doing dequeue() and the queue may be empty.
*/
LinkNode<Id, Object> removed = this.queue.dequeue();
if (removed == null) {
/*
* If at this time someone add some new items, these will
* be cleared in the map, but still stay in the queue, so
* the queue will have some more nodes than the map.
*/
this.map.clear();
break;
}
/*
* Remove the oldest from the map
* NOTE: it maybe return null if other threads are doing remove
*/
this.map.remove(removed.key());
if (LOG.isDebugEnabled()) {
LOG.debug("RamCache replaced '{}' with '{}' (capacity={})",
removed.key(), id, capacity);
}
/*
* Release the object
* NOTE: we can't reuse the removed node due to someone else
* may access the node (will do remove() -> enqueue())
*/
removed = null;
}
}
private static final class LinkNode<K, V> extends CacheNode<K, V> {
private LinkNode<K, V> prev;
private LinkNode<K, V> next;
public LinkNode(K key, V value) {
this(key, value, 0L);
}
public LinkNode(K key, V value, long timeOffset) {
super(key, value, timeOffset);
this.prev = this.next = null;
}
@Override
public boolean equals(Object obj) {
if (!(obj instanceof LinkNode)) {
return false;
}
@SuppressWarnings("unchecked")
LinkNode<K, V> other = (LinkNode<K, V>) obj;
return this.key().equals(other.key());
}
public int hashCode() {
return this.key().hashCode();
}
}
private static final class LinkedQueueNonBigLock<K, V> {
private final KeyLock keyLock;
private final LinkNode<K, V> empty;
private final LinkNode<K, V> head;
private final LinkNode<K, V> rear;
// private volatile long size;
@SuppressWarnings("unchecked")
public LinkedQueueNonBigLock() {
this.keyLock = new KeyLock();
this.empty = new LinkNode<>((K) "<empty>", null);
this.head = new LinkNode<>((K) "<head>", null);
this.rear = new LinkNode<>((K) "<rear>", null);
this.reset();
}
/**
* Reset the head node and rear node
* NOTE:
* only called by LinkedQueueNonBigLock() without lock
* or called by clear() with lock(head, rear)
*/
private void reset() {
this.head.prev = this.empty;
this.head.next = this.rear;
this.rear.prev = this.head;
this.rear.next = this.empty;
assert this.head.next == this.rear;
assert this.rear.prev == this.head;
}
/**
* Dump keys of all nodes in this queue (just for debug)
*/
private List<K> dumpKeys() {
List<K> keys = new LinkedList<>();
LinkNode<K, V> node = this.head.next;
while (node != this.rear && node != this.empty) {
assert node != null;
keys.add(node.key());
node = node.next;
}
return keys;
}
/**
* Check whether a key not in this queue (just for debug)
*/
@SuppressWarnings("unused")
private boolean checkNotInQueue(K key) {
List<K> keys = this.dumpKeys();
if (keys.contains(key)) {
throw new RuntimeException(String.format(
"Expect %s should be not in %s", key, keys));
}
return true;
}
/**
* Check whether there is circular reference (just for debug)
* NOTE: but it is important to note that this is only key check
* rather than pointer check.
*/
@SuppressWarnings("unused")
private boolean checkPrevNotInNext(LinkNode<K, V> self) {
LinkNode<K, V> prev = self.prev;
if (prev.key() == null) {
assert prev == this.head || prev == this.empty : prev;
return true;
}
List<K> keys = this.dumpKeys();
int prevPos = keys.indexOf(prev.key());
int selfPos = keys.indexOf(self.key());
if (prevPos > selfPos && selfPos != -1) {
throw new RuntimeException(String.format(
"Expect %s should be before %s, actual %s",
prev.key(), self.key(), keys));
}
return true;
}
private List<Lock> lock(Object... nodes) {
return this.keyLock.lockAll(nodes);
}
private List<Lock> lock(Object node1, Object node2) {
return this.keyLock.lockAll(node1, node2);
}
private void unlock(List<Lock> locks) {
this.keyLock.unlockAll(locks);
}
/**
* Clear the queue
*/
public void clear() {
assert this.rear.prev != null : this.head.next;
while (true) {
/*
* If someone is removing the last node by remove(),
* it will update the rear.prev, so we should lock it.
*/
LinkNode<K, V> last = this.rear.prev;
List<Lock> locks = this.lock(this.head, last, this.rear);
try {
if (last != this.rear.prev) {
// The rear.prev has changed, try to get lock again
continue;
}
this.reset();
} finally {
this.unlock(locks);
}
return;
}
}
/**
* Add an item with key-value to the queue
*/
public LinkNode<K, V> enqueue(K key, V value, long timeOffset) {
return this.enqueue(new LinkNode<>(key, value, timeOffset));
}
/**
* Add a node to tail of the queue
*/
public LinkNode<K, V> enqueue(LinkNode<K, V> node) {
assert node != null;
assert node.prev == null || node.prev == this.empty;
assert node.next == null || node.next == this.empty;
while (true) {
LinkNode<K, V> last = this.rear.prev;
assert last != this.empty : last;
// TODO: should we lock the new `node`?
List<Lock> locks = this.lock(last, this.rear);
try {
if (last != this.rear.prev) {
// The rear.prev has changed, try to get lock again
continue;
}
/*
* Link the node to the `rear` before to the `last` if we
* have not locked the `node` itself, because dumpKeys()
* may get the new node with next=null.
* TODO: it also depends on memory barrier.
*/
// Build the link between `node` and the `rear`
node.next = this.rear;
assert this.rear.prev == last : this.rear.prev;
this.rear.prev = node;
// Build the link between `last` and `node`
node.prev = last;
last.next = node;
return node;
} finally {
this.unlock(locks);
}
}
}
/**
* Remove a node from head of the queue
*/
public LinkNode<K, V> dequeue() {
while (true) {
LinkNode<K, V> first = this.head.next;
if (first == this.rear) {
// Empty queue
return null;
}
List<Lock> locks = this.lock(this.head, first);
try {
if (first != this.head.next) {
// The head.next has changed, try to get lock again
continue;
}
// Break the link between the `head` and `first`
assert first.next != null && first.next != this.empty;
this.head.next = first.next;
first.next.prev = this.head;
// Clear the links of the `first` node
first.prev = this.empty;
first.next = this.empty;
return first;
} finally {
this.unlock(locks);
}
}
}
/**
* Remove a specified node from the queue
*/
public LinkNode<K, V> remove(LinkNode<K, V> node) {
assert node != this.empty;
assert node != this.head && node != this.rear;
while (true) {
LinkNode<K, V> prev = node.prev;
if (prev == this.empty || node.next == this.empty) {
// Ignore the `node` if it has been removed
return null;
}
List<Lock> locks = this.lock(prev, node);
try {
if (prev != node.prev) {
/*
* The previous node has changed (maybe it's lock
* released after it's removed, then we got the
* lock), so try again until it's not changed.
*/
continue;
}
assert node.next != null : node;
assert node.next != this.empty : node.next;
assert node.next != node.prev : node.next;
// Break `node` & Build the link between node.prev~node.next
node.prev.next = node.next;
node.next.prev = node.prev;
assert prev == node.prev : prev.key() + "!=" + node.prev;
// Clear the links of `node`
node.prev = this.empty;
node.next = this.empty;
return node;
} finally {
this.unlock(locks);
}
}
}
}
}