blob: d641c8276f72e1c9ef96212ad36472ee10432d5a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hugegraph.backend.cache;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.function.Consumer;
import org.apache.hugegraph.HugeException;
import org.apache.hugegraph.HugeGraph;
import org.apache.hugegraph.backend.id.Id;
import org.apache.hugegraph.backend.serializer.AbstractSerializer;
import org.apache.hugegraph.backend.serializer.BinaryBackendEntry;
import org.apache.hugegraph.backend.serializer.BinarySerializer;
import org.apache.hugegraph.backend.serializer.BytesBuffer;
import org.apache.hugegraph.backend.store.BackendEntry;
import org.apache.hugegraph.backend.store.BackendEntry.BackendColumn;
import org.apache.hugegraph.structure.HugeEdge;
import org.apache.hugegraph.structure.HugeVertex;
import org.apache.hugegraph.type.HugeType;
import org.apache.hugegraph.type.define.DataType;
import org.apache.hugegraph.util.Bytes;
import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.InsertionOrderUtil;
import org.caffinitas.ohc.CacheSerializer;
import org.caffinitas.ohc.CloseableIterator;
import org.caffinitas.ohc.Eviction;
import org.caffinitas.ohc.OHCache;
import org.caffinitas.ohc.OHCacheBuilder;
public class OffheapCache extends AbstractCache<Id, Object> {
private static final long VALUE_SIZE_TO_SKIP = 100 * Bytes.KB;
private final OHCache<Id, Value> cache;
private final HugeGraph graph;
private final AbstractSerializer serializer;
public OffheapCache(HugeGraph graph, long capacity, long avgEntryBytes) {
// NOTE: capacity unit is bytes, the super capacity expect elements size
super(capacity);
long capacityInBytes = capacity * (avgEntryBytes + 64L);
if (capacityInBytes <= 0L) {
capacityInBytes = 1L;
}
this.graph = graph;
this.cache = this.builder().capacity(capacityInBytes).build();
this.serializer = new BinarySerializer();
}
private HugeGraph graph() {
return this.graph;
}
private AbstractSerializer serializer() {
return this.serializer;
}
@Override
public void traverse(Consumer<Object> consumer) {
CloseableIterator<Id> iter = this.cache.keyIterator();
while (iter.hasNext()) {
Id key = iter.next();
Value value = this.cache.get(key);
consumer.accept(value.value());
}
}
@Override
public void clear() {
this.cache.clear();
}
@Override
public long size() {
return this.cache.size();
}
@Override
public boolean containsKey(Id id) {
return this.cache.containsKey(id);
}
@Override
protected Object access(Id id) {
Value value = this.cache.get(id);
return value == null ? null : value.value();
}
@Override
protected boolean write(Id id, Object value, long timeOffset) {
Value serializedValue = new Value(value);
int serializedSize;
try {
serializedSize = serializedValue.serializedSize();
} catch (Throwable e) {
// May can't cache value that failed to serialize, like 0x00 byte
LOG.warn("Can't cache '{}' due to {}", id, e.toString());
return false;
}
if (serializedSize > VALUE_SIZE_TO_SKIP) {
LOG.info("Skip to cache '{}' due to value size {} > limit {}",
id, serializedSize, VALUE_SIZE_TO_SKIP);
return false;
}
long expireTime = this.expire();
boolean success;
if (expireTime <= 0L) {
success = this.cache.put(id, serializedValue);
} else {
expireTime += now() + timeOffset;
/*
* Seems only the linked implementation support expiring entries,
* the chunked implementation does not support it.
*/
success = this.cache.put(id, serializedValue, expireTime);
}
assert success;
return success;
}
@Override
protected void remove(Id id) {
this.cache.remove(id);
}
@Override
protected Iterator<CacheNode<Id, Object>> nodes() {
// No needed to expire by timer, return none. use OHCache TTL instead
return Collections.emptyIterator();
}
private OHCacheBuilder<Id, Value> builder() {
return OHCacheBuilder.<Id, Value>newBuilder()
.keySerializer(new IdSerializer())
.valueSerializer(new ValueSerializer())
.eviction(Eviction.LRU)
.throwOOME(true)
.timeouts(true);
}
private class IdSerializer implements CacheSerializer<Id> {
@Override
public Id deserialize(ByteBuffer input) {
return BytesBuffer.wrap(input).readId(true);
}
@Override
public void serialize(Id id, ByteBuffer output) {
BytesBuffer.wrap(output).writeId(id, true);
}
@Override
public int serializedSize(Id id) {
// NOTE: return size must be == actual bytes to write
return BytesBuffer.allocate(id.length() + 2)
.writeId(id, true).position();
}
}
private class ValueSerializer implements CacheSerializer<Value> {
@Override
public Value deserialize(ByteBuffer input) {
return new Value(input);
}
@Override
public void serialize(Value value, ByteBuffer output) {
output.put(value.asBuffer());
}
@Override
public int serializedSize(Value value) {
// NOTE: return size must be >= actual bytes to write
return value.serializedSize();
}
}
private class Value {
private final Object value;
private BytesBuffer svalue = null;
private int serializedSize = 0;
public Value(Object value) {
E.checkNotNull(value, "value");
this.value = value;
}
public Value(ByteBuffer input) {
this.value = this.deserialize(BytesBuffer.wrap(input));
}
public Object value() {
return this.value;
}
public int serializedSize() {
this.asBuffer();
return this.serializedSize;
}
public ByteBuffer asBuffer() {
if (this.svalue == null) {
int listSize = 1;
if (this.value instanceof List) {
listSize = ((List<?>) this.value).size();
}
BytesBuffer buffer = BytesBuffer.allocate(64 * listSize);
// May fail to serialize and throw exception here
this.serialize(this.value, buffer);
this.serializedSize = buffer.position();
buffer.forReadWritten();
this.svalue = buffer;
}
return this.svalue.asByteBuffer();
}
private void serialize(Object element, BytesBuffer buffer) {
ValueType type = ValueType.valueOf(element);
buffer.write(type.code());
switch (type) {
case LIST:
@SuppressWarnings("unchecked")
Collection<Object> list = (Collection<Object>) element;
serializeList(buffer, list);
break;
case VERTEX:
case EDGE:
serializeElement(buffer, type, element);
break;
case UNKNOWN:
throw unsupported(this.value);
default:
buffer.writeProperty(type.dataType(), element);
break;
}
}
private Object deserialize(BytesBuffer buffer) {
ValueType type = ValueType.valueOf(buffer.read());
switch (type) {
case LIST:
return deserializeList(buffer);
case VERTEX:
case EDGE:
return deserializeElement(type, buffer);
case UNKNOWN:
throw unsupported(type);
default:
return buffer.readProperty(type.dataType());
}
}
private void serializeList(BytesBuffer buffer,
Collection<Object> list) {
// Write list
buffer.writeVInt(list.size());
for (Object i : list) {
this.serialize(i, buffer);
}
}
private List<Object> deserializeList(BytesBuffer buffer) {
// Read list
int length = buffer.readVInt();
List<Object> list = InsertionOrderUtil.newList();
for (int i = 0; i < length; i++) {
list.add(this.deserialize(buffer));
}
return list;
}
private void serializeElement(BytesBuffer buffer,
ValueType type, Object value) {
E.checkNotNull(value, "serialize value");
BackendEntry entry;
if (type == ValueType.VERTEX) {
entry = serializer().writeVertex((HugeVertex) value);
} else if (type == ValueType.EDGE) {
entry = serializer().writeEdge((HugeEdge) value);
} else {
throw unsupported(type);
}
assert entry.columnsSize() == 1;
BackendColumn column = entry.columns().iterator().next();
buffer.writeBytes(column.name);
buffer.writeBigBytes(column.value);
}
private Object deserializeElement(ValueType type, BytesBuffer buffer) {
byte[] key = buffer.readBytes();
byte[] value = buffer.readBigBytes();
BinaryBackendEntry entry;
if (type == ValueType.VERTEX) {
entry = new BinaryBackendEntry(HugeType.VERTEX, key);
entry.column(key, value);
return serializer().readVertex(graph(), entry);
} else if (type == ValueType.EDGE) {
entry = new BinaryBackendEntry(HugeType.EDGE, key);
entry.column(key, value);
return serializer().readEdge(graph(), entry);
} else {
throw unsupported(type);
}
}
private HugeException unsupported(ValueType type) {
throw new HugeException(
"Unsupported deserialize type: %s", type);
}
private HugeException unsupported(Object value) {
throw new HugeException(
"Unsupported type of serialize value: '%s'(%s)",
value, value.getClass());
}
}
private enum ValueType {
UNKNOWN,
LIST,
VERTEX,
EDGE,
BOOLEAN(DataType.BOOLEAN),
BYTE(DataType.BYTE),
BLOB(DataType.BLOB),
STRING(DataType.TEXT),
INT(DataType.INT),
LONG(DataType.LONG),
FLOAT(DataType.FLOAT),
DOUBLE(DataType.DOUBLE),
DATE(DataType.DATE),
UUID(DataType.UUID);
private final DataType dataType;
ValueType() {
this(DataType.UNKNOWN);
}
ValueType(DataType dataType) {
this.dataType = dataType;
}
public int code() {
return this.ordinal();
}
public DataType dataType() {
return this.dataType;
}
public static ValueType valueOf(int index) {
ValueType[] values = values();
E.checkArgument(0 <= index && index < values.length,
"Invalid ValueType index %s", index);
return values[index];
}
public static ValueType valueOf(Object object) {
E.checkNotNull(object, "object");
Class<?> clazz = object.getClass();
if (Collection.class.isAssignableFrom(clazz)) {
return ValueType.LIST;
} else if (HugeVertex.class.isAssignableFrom(clazz)) {
return ValueType.VERTEX;
} else if (HugeEdge.class.isAssignableFrom(clazz)) {
return ValueType.EDGE;
} else {
for (ValueType type : values()) {
if (clazz == type.dataType().clazz()) {
return type;
}
}
}
return ValueType.UNKNOWN;
}
}
}