blob: 331b038df81f5d85f7f3e072d5eec84b45f9f9f2 [file] [log] [blame]
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.baidu.hugegraph.backend.cache;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import com.baidu.hugegraph.HugeGraphParams;
import com.baidu.hugegraph.backend.id.Id;
import com.baidu.hugegraph.backend.id.IdGenerator;
import com.baidu.hugegraph.backend.store.BackendStore;
import com.baidu.hugegraph.backend.store.ram.IntObjectMap;
import com.baidu.hugegraph.backend.tx.SchemaTransaction;
import com.baidu.hugegraph.config.CoreOptions;
import com.baidu.hugegraph.event.EventHub;
import com.baidu.hugegraph.event.EventListener;
import com.baidu.hugegraph.perf.PerfUtil.Watched;
import com.baidu.hugegraph.schema.SchemaElement;
import com.baidu.hugegraph.type.HugeType;
import com.baidu.hugegraph.util.E;
import com.baidu.hugegraph.util.Events;
import com.google.common.collect.ImmutableSet;
public final class CachedSchemaTransaction extends SchemaTransaction {
private final Cache<Id, Object> idCache;
private final Cache<Id, Object> nameCache;
private final SchemaCaches<SchemaElement> arrayCaches;
private EventListener storeEventListener;
private EventListener cacheEventListener;
public CachedSchemaTransaction(HugeGraphParams graph, BackendStore store) {
super(graph, store);
final long capacity = graph.configuration()
.get(CoreOptions.SCHEMA_CACHE_CAPACITY);
this.idCache = this.cache("schema-id", capacity);
this.nameCache = this.cache("schema-name", capacity);
SchemaCaches<SchemaElement> attachment = this.idCache.attachment();
if (attachment == null) {
int acSize = (int) (capacity >> 3);
attachment = this.idCache.attachment(new SchemaCaches<>(acSize));
}
this.arrayCaches = attachment;
this.listenChanges();
}
@Override
public void close() {
try {
super.close();
} finally {
this.clearCache(false);
this.unlistenChanges();
}
}
private Cache<Id, Object> cache(String prefix, long capacity) {
final String name = prefix + "-" + this.graphName();
// NOTE: must disable schema cache-expire due to getAllSchema()
return CacheManager.instance().cache(name, capacity);
}
private void listenChanges() {
// Listen store event: "store.init", "store.clear", ...
Set<String> storeEvents = ImmutableSet.of(Events.STORE_INIT,
Events.STORE_CLEAR,
Events.STORE_TRUNCATE);
this.storeEventListener = event -> {
if (storeEvents.contains(event.name())) {
LOG.debug("Graph {} clear schema cache on event '{}'",
this.graph(), event.name());
this.clearCache(true);
return true;
}
return false;
};
this.store().provider().listen(this.storeEventListener);
// Listen cache event: "cache"(invalid cache item)
this.cacheEventListener = event -> {
LOG.debug("Graph {} received schema cache event: {}",
this.graph(), event);
Object[] args = event.args();
E.checkArgument(args.length > 0 && args[0] instanceof String,
"Expect event action argument");
if (Cache.ACTION_INVALID.equals(args[0])) {
event.checkArgs(String.class, HugeType.class, Id.class);
HugeType type = (HugeType) args[1];
Id id = (Id) args[2];
this.invalidateCache(type, id);
this.resetCachedAll(type);
return true;
} else if (Cache.ACTION_CLEAR.equals(args[0])) {
event.checkArgs(String.class, HugeType.class);
this.clearCache(false);
return true;
}
return false;
};
EventHub schemaEventHub = this.params().schemaEventHub();
if (!schemaEventHub.containsListener(Events.CACHE)) {
schemaEventHub.listen(Events.CACHE, this.cacheEventListener);
}
}
private void unlistenChanges() {
// Unlisten store event
this.store().provider().unlisten(this.storeEventListener);
// Unlisten cache event
EventHub schemaEventHub = this.params().schemaEventHub();
schemaEventHub.unlisten(Events.CACHE, this.cacheEventListener);
}
private final void notifyChanges(String action, HugeType type, Id id) {
EventHub graphEventHub = this.params().schemaEventHub();
graphEventHub.notify(Events.CACHE, action, type, id);
}
private final void notifyChanges(String action, HugeType type) {
EventHub graphEventHub = this.params().schemaEventHub();
graphEventHub.notify(Events.CACHE, action, type);
}
private final void resetCachedAll(HugeType type) {
// Set the cache all flag of the schema type to false
this.cachedTypes().put(type, false);
}
private final void resetCachedAllIfReachedCapacity() {
if (this.idCache.size() >= this.idCache.capacity()) {
LOG.warn("Schema cache reached capacity({}): {}",
this.idCache.capacity(), this.idCache.size());
this.cachedTypes().clear();
}
}
private final CachedTypes cachedTypes() {
return this.arrayCaches.cachedTypes();
}
private final void clearCache(boolean notify) {
this.idCache.clear();
this.nameCache.clear();
this.arrayCaches.clear();
if (notify) {
this.notifyChanges(Cache.ACTION_CLEARED, null);
}
}
private final void updateCache(SchemaElement schema) {
this.resetCachedAllIfReachedCapacity();
// update id cache
Id prefixedId = generateId(schema.type(), schema.id());
this.idCache.update(prefixedId, schema);
// update name cache
Id prefixedName = generateId(schema.type(), schema.name());
this.nameCache.update(prefixedName, schema);
// update optimized array cache
this.arrayCaches.updateIfNeeded(schema);
}
private final void invalidateCache(HugeType type, Id id) {
// remove from id cache and name cache
Id prefixedId = generateId(type, id);
Object value = this.idCache.get(prefixedId);
if (value != null) {
this.idCache.invalidate(prefixedId);
SchemaElement schema = (SchemaElement) value;
Id prefixedName = generateId(schema.type(), schema.name());
this.nameCache.invalidate(prefixedName);
}
// remove from optimized array cache
this.arrayCaches.remove(type, id);
}
private static Id generateId(HugeType type, Id id) {
// NOTE: it's slower performance to use:
// String.format("%x-%s", type.code(), name)
return IdGenerator.of(type.string() + "-" + id.asString());
}
private static Id generateId(HugeType type, String name) {
return IdGenerator.of(type.string() + "-" + name);
}
@Override
protected void updateSchema(SchemaElement schema,
Consumer<SchemaElement> updateCallback) {
super.updateSchema(schema, updateCallback);
this.updateCache(schema);
this.notifyChanges(Cache.ACTION_INVALIDED, schema.type(), schema.id());
}
@Override
protected void addSchema(SchemaElement schema) {
super.addSchema(schema);
this.updateCache(schema);
this.notifyChanges(Cache.ACTION_INVALIDED, schema.type(), schema.id());
}
@Override
@SuppressWarnings("unchecked")
protected <T extends SchemaElement> T getSchema(HugeType type, Id id) {
// try get from optimized array cache
if (id.number() && id.asLong() > 0L) {
SchemaElement value = this.arrayCaches.get(type, id);
if (value != null) {
return (T) value;
}
}
Id prefixedId = generateId(type, id);
Object value = this.idCache.get(prefixedId);
if (value == null) {
value = super.getSchema(type, id);
if (value != null) {
SchemaElement schema = (SchemaElement) value;
// update id cache, name cache and optimized array cache
this.updateCache(schema);
}
} else {
// update optimized array cache for the result from id cache
this.arrayCaches.updateIfNeeded((SchemaElement) value);
}
return (T) value;
}
@Override
@SuppressWarnings("unchecked")
protected <T extends SchemaElement> T getSchema(HugeType type,
String name) {
Id prefixedName = generateId(type, name);
Object value = this.nameCache.get(prefixedName);
if (value == null) {
value = super.getSchema(type, name);
if (value != null) {
SchemaElement schema = (SchemaElement) value;
this.updateCache(schema);
}
}
return (T) value;
}
@Override
protected void removeSchema(SchemaElement schema) {
super.removeSchema(schema);
this.invalidateCache(schema.type(), schema.id());
this.notifyChanges(Cache.ACTION_INVALIDED, schema.type(), schema.id());
}
@Override
protected <T extends SchemaElement> List<T> getAllSchema(HugeType type) {
Boolean cachedAll = this.cachedTypes().getOrDefault(type, false);
if (cachedAll) {
List<T> results = new ArrayList<>();
// Get from cache
this.idCache.traverse(value -> {
@SuppressWarnings("unchecked")
T schema = (T) value;
if (schema.type() == type) {
results.add(schema);
}
});
return results;
} else {
this.cachedTypes().remove(type);
List<T> results = super.getAllSchema(type);
long free = this.idCache.capacity() - this.idCache.size();
if (results.size() <= free) {
// Update cache
for (T schema : results) {
this.updateCache(schema);
}
this.cachedTypes().putIfAbsent(type, true);
}
return results;
}
}
private static final class SchemaCaches<V extends SchemaElement> {
private final int size;
private final IntObjectMap<V> pks;
private final IntObjectMap<V> vls;
private final IntObjectMap<V> els;
private final IntObjectMap<V> ils;
private final CachedTypes cachedTypes;
public SchemaCaches(int size) {
// TODO: improve size of each type for optimized array cache
this.size = size;
this.pks = new IntObjectMap<>(size);
this.vls = new IntObjectMap<>(size);
this.els = new IntObjectMap<>(size);
this.ils = new IntObjectMap<>(size);
this.cachedTypes = new CachedTypes();
}
public void updateIfNeeded(V schema) {
if (schema == null) {
return;
}
Id id = schema.id();
if (id.number() && id.asLong() > 0L) {
this.set(schema.type(), id, schema);
}
}
@Watched
public V get(HugeType type, Id id) {
assert id.number();
long longId = id.asLong();
if (longId <= 0L) {
assert false : id;
return null;
}
int key = (int) longId;
if (key >= this.size) {
return null;
}
switch (type) {
case PROPERTY_KEY:
return this.pks.get(key);
case VERTEX_LABEL:
return this.vls.get(key);
case EDGE_LABEL:
return this.els.get(key);
case INDEX_LABEL:
return this.ils.get(key);
default:
return null;
}
}
public void set(HugeType type, Id id, V value) {
assert id.number();
long longId = id.asLong();
if (longId <= 0L) {
assert false : id;
return;
}
int key = (int) longId;
if (key >= this.size) {
return;
}
switch (type) {
case PROPERTY_KEY:
this.pks.set(key, value);
break;
case VERTEX_LABEL:
this.vls.set(key, value);
break;
case EDGE_LABEL:
this.els.set(key, value);
break;
case INDEX_LABEL:
this.ils.set(key, value);
break;
default:
// pass
break;
}
}
public void remove(HugeType type, Id id) {
assert id.number();
long longId = id.asLong();
if (longId <= 0L) {
return;
}
int key = (int) longId;
V value = null;
if (key >= this.size) {
return;
}
switch (type) {
case PROPERTY_KEY:
this.pks.set(key, value);
break;
case VERTEX_LABEL:
this.vls.set(key, value);
break;
case EDGE_LABEL:
this.els.set(key, value);
break;
case INDEX_LABEL:
this.ils.set(key, value);
break;
default:
// pass
break;
}
}
public void clear() {
this.pks.clear();
this.vls.clear();
this.els.clear();
this.ils.clear();
this.cachedTypes.clear();
}
public CachedTypes cachedTypes() {
return this.cachedTypes;
}
}
private static class CachedTypes
extends ConcurrentHashMap<HugeType, Boolean> {
private static final long serialVersionUID = -2215549791679355996L;
}
}