blob: ef18e84d8d5ac35f95b96d3f06dbe57681151d19 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.jena.tdb2.store.nodetable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.jena.atlas.iterator.Iter;
import org.apache.jena.atlas.lib.Cache;
import org.apache.jena.atlas.lib.CacheFactory;
import org.apache.jena.atlas.lib.Pair;
import org.apache.jena.atlas.logging.Log;
import org.apache.jena.dboe.transaction.txn.Transaction;
import org.apache.jena.dboe.transaction.txn.TransactionListener;
import org.apache.jena.graph.Node;
import org.apache.jena.tdb2.TDBException;
import org.apache.jena.tdb2.params.StoreParams;
import org.apache.jena.tdb2.store.NodeId;
/**
* Cache wrapper around a NodeTable. Assumes all access goes through this
* wrapper. Read-cache - write caching is done via the object file used by the
* base NodeTable.
*/
public class NodeTableCache implements NodeTable, TransactionListener {
// These caches are updated together.
// See synchronization in _retrieveNodeByNodeId and _idForNode.
// The cache is assumed to be single operation-thread-safe.
// The buffering is for updates so that if it aborts, the changes are not made;
// the underlying node table, being transactional, also does not make the changes.
//
// It does not matter if a readers can see nodes from a completed now-finished
// writer transaction. Nodes in the node table do not mean triples exist and only triples detemine
// the state of the data.
//
// Where there are only readers active the ThreadBufferingCache caches act as
// pass-through and the not-present cache can be updated by any reader.
//
// When there is an active writer, the ThreadBufferingCache caches add a
// write-visible-only caching and only the writer can update the "not-present"
// cache. Because the node table is append-only (nodes are not deleted), it can
// mean a node which was not-present is added and the not-present cache now does
// not catch that for a previous version reader. This does not matter, the small
// not-present cache is only a speed-up and does not have to be correct
// for missing nodes (it can't have entries for nodes that do exist in visible
// data).
private ThreadBufferingCache<Node, NodeId> node2id_Cache = null;
private ThreadBufferingCache<NodeId, Node> id2node_Cache = null;
// A small cache of "known unknowns" to speed up searching for impossible things.
private Cache<Node, Object> notPresent = null;
private NodeTable baseTable;
private final Object lock = new Object();
private volatile Thread writingThread;
public static NodeTable create(NodeTable nodeTable, StoreParams params) {
int nodeToIdCacheSize = params.getNode2NodeIdCacheSize();
int idToNodeCacheSize = params.getNodeId2NodeCacheSize();
if ( nodeToIdCacheSize <= 0 && idToNodeCacheSize <= 0 )
return nodeTable;
return create(nodeTable, nodeToIdCacheSize, idToNodeCacheSize, params.getNodeMissCacheSize());
}
private static NodeTable create(NodeTable nodeTable, int nodeToIdCacheSize, int idToNodeCacheSize, int nodeMissesCacheSize) {
if ( nodeToIdCacheSize <= 0 && idToNodeCacheSize <= 0 )
return nodeTable;
return new NodeTableCache(nodeTable, nodeToIdCacheSize, idToNodeCacheSize, nodeMissesCacheSize);
}
private NodeTableCache(NodeTable baseTable, int nodeToIdCacheSize, int idToNodeCacheSize, int nodeMissesCacheSize) {
this.baseTable = baseTable;
if ( nodeToIdCacheSize > 0 )
node2id_Cache = createCache("nodeToId", nodeToIdCacheSize, 1000);
if ( idToNodeCacheSize > 0 )
id2node_Cache = createCache("idToNode", idToNodeCacheSize, 1000);
if ( nodeMissesCacheSize > 0 )
notPresent = CacheFactory.createCache(nodeMissesCacheSize);
}
private static <Key, Value> ThreadBufferingCache<Key, Value> createCache(String label, int mainCachesize, int bufferSize) {
Cache<Key, Value> cache = CacheFactory.createCache(mainCachesize);
return new ThreadBufferingCache<>(label, cache, bufferSize);
}
// ---- Cache access, no going to underlying table.
public Node getNodeForNodeIdCache(NodeId id) {
return id2node_Cache.getIfPresent(id);
}
public NodeId getNodeIdForNodeCache(Node node) {
return node2id_Cache.getIfPresent(node);
}
public boolean isCachedNodeId(NodeId id) {
return getNodeForNodeIdCache(id) != null;
}
public boolean isCachedNode(Node node) {
return getNodeIdForNodeCache(node) != null;
}
// ---- Cache access
@Override
public final NodeTable wrapped() {
return baseTable;
}
/** Get the Node for this NodeId, or null if none */
@Override
public Node getNodeForNodeId(NodeId id) {
return _retrieveNodeByNodeId(id);
}
/** Find the NodeId for a node, or return NodeId.NodeDoesNotExist */
@Override
public NodeId getNodeIdForNode(Node node) {
return _idForNode(node, false);
}
/**
* Find the NodeId for a node, allocating a new NodeId if the Node does not
* yet have a NodeId
*/
@Override
public NodeId getAllocateNodeId(Node node) {
return _idForNode(node, true);
}
@Override
public boolean containsNode(Node node) {
NodeId x = getNodeIdForNode(node);
return NodeId.isDoesNotExist(x);
}
@Override
public boolean containsNodeId(NodeId nodeId) {
Node x = getNodeForNodeId(nodeId);
return x == null;
}
@Override
public List<NodeId> bulkNodeToNodeId(List<Node> required, boolean withAllocation) {
synchronized(lock) {
List<Node> nodes = new ArrayList<>();
for ( Node n : required ) {
//
if ( getNodeIdForNodeCache(n) == null )
nodes.add(n);
}
// Check bulk access.
List<NodeId> x = baseTable.bulkNodeToNodeId(nodes, true);
for ( int i = 0; i < nodes.size() ; i++ ) {
Node n = nodes.get(i);
NodeId nid = x.get(i);
cacheUpdate(n ,nid);
}
return x;
}
}
@Override
public List<Node> bulkNodeIdToNode(List<NodeId> nodeIds) {
return NodeTableOps.bulkNodeIdToNodeImpl(this, nodeIds);
}
// ---- The worker functions
// NodeId ==> Node
private Node _retrieveNodeByNodeId(NodeId id) {
if ( NodeId.isDoesNotExist(id) )
return null;
if ( NodeId.isAny(id) )
return null;
// Try once outside the synchronized
// (Cache access is thread-safe)
Node n = cacheLookup(id);
if ( n != null )
return n;
synchronized (lock) {
// Lock to update two caches consistently.
// Verify cache miss
n = cacheLookup(id);
if ( n != null )
return n;
n = baseTable.getNodeForNodeId(id);
cacheUpdate(n, id);
return n;
}
}
// Node ==> NodeId
private NodeId _idForNode(Node node, boolean allocate) {
if ( node == Node.ANY )
return NodeId.NodeIdAny;
// Try once outside the synchronized
// (Cache access is thread-safe.)
NodeId nodeId = cacheLookup(node);
if ( nodeId != null )
return nodeId;
synchronized (lock) {
// Update two caches inside synchronized.
// Check still valid.
nodeId = cacheLookup(node);
if ( nodeId != null )
return nodeId;
if ( allocate )
nodeId = baseTable.getAllocateNodeId(node);
else {
if ( notPresent(node) )
// Known not be in the baseTable.
return NodeId.NodeDoesNotExist;
else
nodeId = baseTable.getNodeIdForNode(node);
}
// Ensure caches have it. Includes recording "no such node"
cacheUpdate(node, nodeId);
return nodeId;
}
}
// ----------------
// ---- Only places that the caches are touched
/**
* Test whether in the "not present" cache.
* True means "known to be absent from the baseTable".
*/
private boolean notPresent(Node node) {
if ( notPresent == null )
return false;
return notPresent.containsKey(node);
}
/**
* Check caches to see if we can map a NodeId to a Node. Returns null on no
* cache entry.
*/
private Node cacheLookup(NodeId id) {
if ( id2node_Cache == null )
return null;
return id2node_Cache.getIfPresent(id);
}
/**
* Check caches to see if we can map a Node to a NodeId. Returns null on no
* cache entry.
*/
private NodeId cacheLookup(Node node) {
// Remember things known (currently) not to exist.
// Does not matter if notPresent is being updated elsewhere.
return node2id_Cache.getIfPresent(node);
}
/** Update the Node&lt;-&gt;NodeId caches */
private void cacheUpdate(Node node, NodeId id) {
if ( node == null )
return;
// synchronized is further out.
// The "notPresent" cache is used to note whether a node
// is known not to exist in the baseTable..
// This must be specially handled later if the node is added.
// Only top-level transactions can add nodes to the "notPresent" cache.
if ( NodeId.isDoesNotExist(id) ) {
if ( notPresent != null && inTopLevelTxn())
notPresent.put(node, Boolean.TRUE);
return;
}
if ( id == NodeId.NodeIdAny ) {
Log.warn(this, "Attempt to cache NodeIdAny - ignored");
return;
}
if ( node2id_Cache != null )
node2id_Cache.put(node, id);
if ( id2node_Cache != null )
id2node_Cache.put(id, node);
// Remove if previously marked "not present"
if ( notPresent != null )
notPresent.remove(node);
}
// A top-level transaction can update the not-present cache.
// It is either
// - a write transaction or
// - a read transaction and no active writer.
private boolean inTopLevelTxn() {
Thread writer = writingThread;
return (writer == null) || (writer == Thread.currentThread());
}
// -- TransactionListener
@Override
public void notifyTxnStart(Transaction transaction) {
if (transaction.isWriteTxn())
updateStart();
}
@Override
public void notifyPromoteFinish(Transaction transaction) {
if(transaction.isWriteTxn())
updateStart();
}
@Override
public void notifyCommitFinish(Transaction transaction) {
if(transaction.isWriteTxn()) {
updateCommit();
}
}
@Override
public void notifyAbortStart(Transaction transaction) {
if(transaction.isWriteTxn())
updateAbort();
}
// -- TransactionListener
// The cache is "optimistic" - nodes are added during the transaction.
// The underlying file has them "transactionally".
//
// On abort, it does need to be undone because the underlying NodeTable
// being cached will not have them.
//
// We don't "undo" for abort because it would mean keeping an data structure that
// is related to the size of the transaction and if in-memory, a limitation of
// scale.
private void updateStart() {
node2id_Cache.enableBuffering();
id2node_Cache.enableBuffering();
writingThread = Thread.currentThread();
}
private void updateAbort() {
writingThread = null;
node2id_Cache.dropBuffer();
id2node_Cache.dropBuffer();
}
private void updateCommit() {
writingThread = null;
node2id_Cache.flushBuffer();
id2node_Cache.flushBuffer();
}
@Override
public boolean isEmpty() {
synchronized (lock) {
if ( node2id_Cache != null )
return node2id_Cache.isEmpty();
if ( id2node_Cache != null )
id2node_Cache.isEmpty();
// Write through.
return baseTable.isEmpty();
}
}
@Override
public synchronized void close() {
if ( baseTable == null )
// Already closed
return;
baseTable.close();
node2id_Cache = null;
id2node_Cache = null;
notPresent = null;
baseTable = null;
writingThread = null;
}
@Override
public void sync() {
baseTable.sync();
}
@Override
public Iterator<Pair<NodeId, Node>> all() {
if ( false )
testForConsistency();
return baseTable.all();
}
private void testForConsistency() {
Iterator<Node> iter1 = Iter.toList(node2id_Cache.keys()).iterator();
for (; iter1.hasNext() ; ) {
Node n = iter1.next();
NodeId nId = node2id_Cache.getIfPresent(n);
if ( !id2node_Cache.containsKey(nId) )
throw new TDBException("Inconsistent: " + n + " => " + nId);
if ( notPresent.containsKey(n) )
throw new TDBException("Inconsistent: " + n + " in notPresent cache (1)");
}
Iterator<NodeId> iter2 = Iter.toList(id2node_Cache.keys()).iterator();
for (; iter2.hasNext() ; ) {
NodeId nId = iter2.next();
Node n = id2node_Cache.getIfPresent(nId);
if ( !node2id_Cache.containsKey(n) )
throw new TDBException("Inconsistent: " + nId + " => " + n);
if ( notPresent.containsKey(n) )
throw new TDBException("Inconsistent: " + n + " in notPresent cache (2)");
}
}
@Override
public String toString() {
return "Cache(" + baseTable.toString() + ")";
}
}