blob: c534e45df6a6f648b046842244e2454b5cc07491 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hugegraph.backend.cache;
import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.hugegraph.backend.cache.CachedBackendStore.QueryId;
import org.apache.hugegraph.backend.query.Query;
import org.apache.hugegraph.backend.store.BackendMutation;
import org.apache.hugegraph.backend.store.BackendStore;
import org.apache.hugegraph.backend.store.ram.RamTable;
import org.apache.hugegraph.HugeGraphParams;
import org.apache.hugegraph.backend.id.Id;
import org.apache.hugegraph.backend.query.IdQuery;
import org.apache.hugegraph.backend.query.QueryResults;
import org.apache.hugegraph.backend.tx.GraphTransaction;
import org.apache.hugegraph.config.CoreOptions;
import org.apache.hugegraph.config.HugeConfig;
import org.apache.hugegraph.event.EventHub;
import org.apache.hugegraph.event.EventListener;
import org.apache.hugegraph.exception.NotSupportException;
import org.apache.hugegraph.iterator.ExtendableIterator;
import org.apache.hugegraph.iterator.ListIterator;
import org.apache.hugegraph.perf.PerfUtil.Watched;
import org.apache.hugegraph.schema.IndexLabel;
import org.apache.hugegraph.structure.HugeEdge;
import org.apache.hugegraph.structure.HugeVertex;
import org.apache.hugegraph.type.HugeType;
import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.Events;
import com.google.common.collect.ImmutableSet;
public final class CachedGraphTransaction extends GraphTransaction {
private static final int MAX_CACHE_PROPS_PER_VERTEX = 10000;
private static final int MAX_CACHE_EDGES_PER_QUERY = 100;
private static final float DEFAULT_LEVEL_RATIO = 0.001f;
private static final long AVG_VERTEX_ENTRY_SIZE = 40L;
private static final long AVG_EDGE_ENTRY_SIZE = 100L;
private final Cache<Id, Object> verticesCache;
private final Cache<Id, Object> edgesCache;
private EventListener storeEventListener;
private EventListener cacheEventListener;
public CachedGraphTransaction(HugeGraphParams graph, BackendStore store) {
super(graph, store);
HugeConfig conf = graph.configuration();
String type = conf.get(CoreOptions.VERTEX_CACHE_TYPE);
long capacity = conf.get(CoreOptions.VERTEX_CACHE_CAPACITY);
int expire = conf.get(CoreOptions.VERTEX_CACHE_EXPIRE);
this.verticesCache = this.cache("vertex", type, capacity,
AVG_VERTEX_ENTRY_SIZE, expire);
type = conf.get(CoreOptions.EDGE_CACHE_TYPE);
capacity = conf.get(CoreOptions.EDGE_CACHE_CAPACITY);
expire = conf.get(CoreOptions.EDGE_CACHE_EXPIRE);
this.edgesCache = this.cache("edge", type, capacity,
AVG_EDGE_ENTRY_SIZE, expire);
this.listenChanges();
}
@Override
public void close() {
try {
super.close();
} finally {
this.unlistenChanges();
}
}
private Cache<Id, Object> cache(String prefix, String type, long capacity,
long entrySize, long expire) {
String name = prefix + "-" + this.params().name();
Cache<Id, Object> cache;
switch (type) {
case "l1":
cache = CacheManager.instance().cache(name, capacity);
break;
case "l2":
long heapCapacity = (long) (DEFAULT_LEVEL_RATIO * capacity);
cache = CacheManager.instance().levelCache(super.graph(),
name, heapCapacity,
capacity, entrySize);
break;
default:
throw new NotSupportException("cache type '%s'", type);
}
// Convert the unit from seconds to milliseconds
cache.expire(expire * 1000L);
// Enable metrics for graph cache by default
cache.enableMetrics(true);
return cache;
}
private void listenChanges() {
// Listen store event: "store.init", "store.clear", ...
Set<String> storeEvents = ImmutableSet.of(Events.STORE_INIT,
Events.STORE_CLEAR,
Events.STORE_TRUNCATE);
this.storeEventListener = event -> {
if (storeEvents.contains(event.name())) {
LOG.debug("Graph {} clear graph cache on event '{}'",
this.graph(), event.name());
this.clearCache(null, true);
return true;
}
return false;
};
this.store().provider().listen(this.storeEventListener);
// Listen cache event: "cache"(invalid cache item)
this.cacheEventListener = event -> {
LOG.debug("Graph {} received graph cache event: {}",
this.graph(), event);
Object[] args = event.args();
E.checkArgument(args.length > 0 && args[0] instanceof String,
"Expect event action argument");
if (Cache.ACTION_INVALID.equals(args[0])) {
event.checkArgs(String.class, HugeType.class, Object.class);
HugeType type = (HugeType) args[1];
if (type.isVertex()) {
// Invalidate vertex cache
Object arg2 = args[2];
if (arg2 instanceof Id) {
Id id = (Id) arg2;
this.verticesCache.invalidate(id);
} else if (arg2 != null && arg2.getClass().isArray()) {
int size = Array.getLength(arg2);
for (int i = 0; i < size; i++) {
Object id = Array.get(arg2, i);
E.checkArgument(id instanceof Id,
"Expect instance of Id in array, " +
"but got '%s'", id.getClass());
this.verticesCache.invalidate((Id) id);
}
} else {
E.checkArgument(false,
"Expect Id or Id[], but got: %s",
arg2);
}
} else if (type.isEdge()) {
/*
* Invalidate edge cache via clear instead of invalidate
* because of the cacheKey is QueryId not EdgeId
*/
// this.edgesCache.invalidate(id);
this.edgesCache.clear();
}
return true;
} else if (Cache.ACTION_CLEAR.equals(args[0])) {
event.checkArgs(String.class, HugeType.class);
HugeType type = (HugeType) args[1];
this.clearCache(type, false);
return true;
}
return false;
};
EventHub graphEventHub = this.params().graphEventHub();
if (!graphEventHub.containsListener(Events.CACHE)) {
graphEventHub.listen(Events.CACHE, this.cacheEventListener);
}
}
private void unlistenChanges() {
// Unlisten store event
this.store().provider().unlisten(this.storeEventListener);
// Unlisten cache event
EventHub graphEventHub = this.params().graphEventHub();
graphEventHub.unlisten(Events.CACHE, this.cacheEventListener);
}
private void notifyChanges(String action, HugeType type, Id[] ids) {
EventHub graphEventHub = this.params().graphEventHub();
graphEventHub.notify(Events.CACHE, action, type, ids);
}
private void notifyChanges(String action, HugeType type) {
EventHub graphEventHub = this.params().graphEventHub();
graphEventHub.notify(Events.CACHE, action, type);
}
private void clearCache(HugeType type, boolean notify) {
if (type == null || type == HugeType.VERTEX) {
this.verticesCache.clear();
}
if (type == null || type == HugeType.EDGE) {
this.edgesCache.clear();
}
if (notify) {
this.notifyChanges(Cache.ACTION_CLEARED, null);
}
}
private boolean enableCacheVertex() {
return this.verticesCache.capacity() > 0L;
}
private boolean enableCacheEdge() {
return this.edgesCache.capacity() > 0L;
}
private boolean needCacheVertex(HugeVertex vertex) {
return vertex.sizeOfSubProperties() <= MAX_CACHE_PROPS_PER_VERTEX;
}
@Override
@Watched(prefix = "graphcache")
protected Iterator<HugeVertex> queryVerticesFromBackend(Query query) {
if (this.enableCacheVertex() &&
query.idsSize() > 0 && query.conditionsSize() == 0) {
return this.queryVerticesByIds((IdQuery) query);
} else {
return super.queryVerticesFromBackend(query);
}
}
@Watched(prefix = "graphcache")
private Iterator<HugeVertex> queryVerticesByIds(IdQuery query) {
if (query.idsSize() == 1) {
Id vertexId = query.ids().iterator().next();
HugeVertex vertex = (HugeVertex) this.verticesCache.get(vertexId);
if (vertex != null) {
if (!vertex.expired()) {
return QueryResults.iterator(vertex);
}
this.verticesCache.invalidate(vertexId);
}
Iterator<HugeVertex> rs = super.queryVerticesFromBackend(query);
vertex = QueryResults.one(rs);
if (vertex == null) {
return QueryResults.emptyIterator();
}
if (needCacheVertex(vertex)) {
this.verticesCache.update(vertex.id(), vertex);
}
return QueryResults.iterator(vertex);
}
IdQuery newQuery = new IdQuery(HugeType.VERTEX, query);
List<HugeVertex> vertices = new ArrayList<>();
for (Id vertexId : query.ids()) {
HugeVertex vertex = (HugeVertex) this.verticesCache.get(vertexId);
if (vertex == null) {
newQuery.query(vertexId);
} else if (vertex.expired()) {
newQuery.query(vertexId);
this.verticesCache.invalidate(vertexId);
} else {
vertices.add(vertex);
}
}
// Join results from cache and backend
ExtendableIterator<HugeVertex> results = new ExtendableIterator<>();
if (!vertices.isEmpty()) {
results.extend(vertices.iterator());
} else {
// Just use the origin query if find none from the cache
newQuery = query;
}
if (!newQuery.empty()) {
Iterator<HugeVertex> rs = super.queryVerticesFromBackend(newQuery);
// Generally there are not too much data with id query
ListIterator<HugeVertex> listIterator = QueryResults.toList(rs);
for (HugeVertex vertex : listIterator.list()) {
// Skip large vertex
if (needCacheVertex(vertex)) {
this.verticesCache.update(vertex.id(), vertex);
}
}
results.extend(listIterator);
}
return results;
}
@Override
@Watched(prefix = "graphcache")
protected Iterator<HugeEdge> queryEdgesFromBackend(Query query) {
RamTable ramtable = this.params().ramtable();
if (ramtable != null && ramtable.matched(query)) {
return ramtable.query(query);
}
if (!this.enableCacheEdge() || query.empty() ||
query.paging() || query.bigCapacity()) {
// Query all edges or query edges in paging, don't cache it
return super.queryEdgesFromBackend(query);
}
Id cacheKey = new QueryId(query);
Object value = this.edgesCache.get(cacheKey);
@SuppressWarnings("unchecked")
Collection<HugeEdge> edges = (Collection<HugeEdge>) value;
if (value != null) {
for (HugeEdge edge : edges) {
if (edge.expired()) {
this.edgesCache.invalidate(cacheKey);
value = null;
break;
}
}
}
if (value != null) {
// Not cached or the cache expired
return edges.iterator();
}
Iterator<HugeEdge> rs = super.queryEdgesFromBackend(query);
/*
* Iterator can't be cached, caching list instead
* there may be super node and too many edges in a query,
* try fetch a few of the head results and determine whether to cache.
*/
final int tryMax = 1 + MAX_CACHE_EDGES_PER_QUERY;
edges = new ArrayList<>(tryMax);
for (int i = 0; rs.hasNext() && i < tryMax; i++) {
edges.add(rs.next());
}
if (edges.size() == 0) {
this.edgesCache.update(cacheKey, Collections.emptyList());
} else if (edges.size() <= MAX_CACHE_EDGES_PER_QUERY) {
this.edgesCache.update(cacheKey, edges);
}
return new ExtendableIterator<>(edges.iterator(), rs);
}
@Override
@Watched(prefix = "graphcache")
protected void commitMutation2Backend(BackendMutation... mutations) {
// Collect changes before commit
Collection<HugeVertex> updates = this.verticesInTxUpdated();
Collection<HugeVertex> deletions = this.verticesInTxRemoved();
Id[] vertexIds = new Id[updates.size() + deletions.size()];
int vertexOffset = 0;
int edgesInTxSize = this.edgesInTxSize();
try {
super.commitMutation2Backend(mutations);
// Update vertex cache
if (this.enableCacheVertex()) {
for (HugeVertex vertex : updates) {
vertexIds[vertexOffset++] = vertex.id();
if (needCacheVertex(vertex)) {
// Update cache
this.verticesCache.updateIfPresent(vertex.id(), vertex);
} else {
// Skip large vertex
this.verticesCache.invalidate(vertex.id());
}
}
}
} finally {
// Update removed vertex in cache whatever success or fail
if (this.enableCacheVertex()) {
for (HugeVertex vertex : deletions) {
vertexIds[vertexOffset++] = vertex.id();
this.verticesCache.invalidate(vertex.id());
}
if (vertexOffset > 0) {
this.notifyChanges(Cache.ACTION_INVALIDED,
HugeType.VERTEX, vertexIds);
}
}
/*
* Update edge cache if any vertex or edge changed
* For vertex change, the edges linked with should also be updated
* Before we find a more precise strategy, just clear all the edge cache now
*/
boolean invalidEdgesCache = (edgesInTxSize + updates.size() + deletions.size()) > 0;
if (invalidEdgesCache && this.enableCacheEdge()) {
// TODO: Use a more precise strategy to update the edge cache
this.edgesCache.clear();
this.notifyChanges(Cache.ACTION_CLEARED, HugeType.EDGE);
}
}
}
@Override
public void removeIndex(IndexLabel indexLabel) {
try {
super.removeIndex(indexLabel);
} finally {
// Update edge cache if needed (any edge-index is deleted)
if (indexLabel.baseType() == HugeType.EDGE_LABEL) {
// TODO: Use a more precise strategy to update the edge cache
this.edgesCache.clear();
this.notifyChanges(Cache.ACTION_CLEARED, HugeType.EDGE);
}
}
}
}