/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.jena.tdb2.store.nodetable;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import org.apache.jena.atlas.iterator.Iter;
import org.apache.jena.atlas.lib.Cache;
import org.apache.jena.atlas.lib.CacheFactory;
import org.apache.jena.atlas.lib.Pair;
import org.apache.jena.atlas.logging.Log;
import org.apache.jena.dboe.transaction.txn.Transaction;
import org.apache.jena.dboe.transaction.txn.TransactionListener;
import org.apache.jena.graph.Node;
import org.apache.jena.tdb2.TDBException;
import org.apache.jena.tdb2.params.StoreParams;
import org.apache.jena.tdb2.store.NodeId;

/**
 * Cache wrapper around a NodeTable. Assumes all access goes through this
 * wrapper. Read-cache - write caching is done via the object file used by the
 * base NodeTable.
 */
public class NodeTableCache implements NodeTable, TransactionListener {
    // These caches are updated together.
    // See synchronization in _retrieveNodeByNodeId and _idForNode
    // The cache is assumed to be single operation-thread-safe.

    // The buffering is for updates. Only the updating thread will see changes due to new nodes   
    // Case 1: Not in main "not-present"
    //  Add to local "not-present", flush down.

    // Case 2: In main "not-present"
    //   May be goes into local cache.
    //   Write back updates "not-present"
    //   Depends on "not-rpesent" used to protect the underlying "not-present"

    
    
    private ThreadBufferingCache<Node, NodeId> node2id_Cache = null;
    private ThreadBufferingCache<NodeId, Node> id2node_Cache = null;

    // A small cache of "known unknowns" to speed up searching for impossible things.
    // Cache update needed on NodeTable changes because a node may become "known"
    private Cache<Node, Object> notPresent    = null;
    private NodeTable           baseTable;
    private final Object        lock          = new Object();
    private final ThreadLocal<Transaction> txn = new ThreadLocal<>();
    private long maxDataVersion;
    private boolean hasActiveWriteTransaction;
    
    public static NodeTable create(NodeTable nodeTable, StoreParams params) {
        int nodeToIdCacheSize = params.getNode2NodeIdCacheSize();
        int idToNodeCacheSize = params.getNodeId2NodeCacheSize();
        if ( nodeToIdCacheSize <= 0 && idToNodeCacheSize <= 0 )
            return nodeTable;
        return create(nodeTable, nodeToIdCacheSize, idToNodeCacheSize, params.getNodeMissCacheSize());
    }

    private static NodeTable create(NodeTable nodeTable, int nodeToIdCacheSize, int idToNodeCacheSize, int nodeMissesCacheSize) {
        if ( nodeToIdCacheSize <= 0 && idToNodeCacheSize <= 0 )
            return nodeTable;
        return new NodeTableCache(nodeTable, nodeToIdCacheSize, idToNodeCacheSize, nodeMissesCacheSize);
    }

    private NodeTableCache(NodeTable baseTable, int nodeToIdCacheSize, int idToNodeCacheSize, int nodeMissesCacheSize) {
        this.baseTable = baseTable;
        if ( nodeToIdCacheSize > 0 )
            node2id_Cache = createCache("nodeToId", nodeToIdCacheSize, 1000);
        if ( idToNodeCacheSize > 0 )
            id2node_Cache = createCache("idToNode", idToNodeCacheSize, 1000);
        if ( nodeMissesCacheSize > 0 )
            notPresent = CacheFactory.createCache(nodeMissesCacheSize);
    }

    private static <Key, Value> ThreadBufferingCache<Key, Value> createCache(String label, int mainCachesize, int bufferSize) {
        Cache<Key, Value> cache = CacheFactory.createCache(mainCachesize);
        return new ThreadBufferingCache<>(label, cache, bufferSize);
    }
    
    // ---- Cache access, no going to underlying table.

    public Node getNodeForNodeIdCache(NodeId id) {
        return id2node_Cache.getIfPresent(id);
    }

    public NodeId getNodeIdForNodeCache(Node node) {
        return node2id_Cache.getIfPresent(node);
    }

    public boolean isCachedNodeId(NodeId id) {
        return getNodeForNodeIdCache(id) != null;
    }

    public boolean isCachedNode(Node node) {
        return getNodeIdForNodeCache(node) != null;
    }

    // ---- Cache access

    @Override
    public final NodeTable wrapped() {
        return baseTable;
    }

    /** Get the Node for this NodeId, or null if none */
    @Override
    public Node getNodeForNodeId(NodeId id) {
        return _retrieveNodeByNodeId(id);
    }

    /** Find the NodeId for a node, or return NodeId.NodeDoesNotExist */
    @Override
    public NodeId getNodeIdForNode(Node node) {
        return _idForNode(node, false);
    }

    /**
     * Find the NodeId for a node, allocating a new NodeId if the Node does not
     * yet have a NodeId
     */
    @Override
    public NodeId getAllocateNodeId(Node node) {
        return _idForNode(node, true);
    }

    @Override
    public boolean containsNode(Node node) {
        NodeId x = getNodeIdForNode(node);
        return NodeId.isDoesNotExist(x);
    }

    @Override
    public boolean containsNodeId(NodeId nodeId) {
        Node x = getNodeForNodeId(nodeId);
        return x == null;
    }

    @Override
    public List<NodeId> bulkNodeToNodeId(List<Node> required, boolean withAllocation) {
        synchronized(lock) {
            List<Node> nodes = new ArrayList<>();
            for ( Node n : required ) {
                //
                if ( getNodeIdForNodeCache(n) == null )
                    nodes.add(n);
            }
            // Check bulk access.
            List<NodeId> x = baseTable.bulkNodeToNodeId(nodes, true);
            for ( int i = 0; i < nodes.size() ; i++ ) {
                Node n = nodes.get(i);
                NodeId nid = x.get(i);
                cacheUpdate(n ,nid);
            }
            return x;
        }
    }

    @Override
    public List<Node> bulkNodeIdToNode(List<NodeId> nodeIds) {
        return NodeTableOps.bulkNodeIdToNodeImpl(this, nodeIds);
    }

    // ---- The worker functions
    // NodeId ==> Node
    private Node _retrieveNodeByNodeId(NodeId id) {
        if ( NodeId.isDoesNotExist(id) )
            return null;
        if ( NodeId.isAny(id) )
            return null;
        // Try once outside the synchronized
        // (Cache access is thread-safe)
        Node n = cacheLookup(id);
        if ( n != null )
            return n;

        synchronized (lock) {
            // Lock to update two caches consistently.
            // Verify cache miss
            n = cacheLookup(id);
            if ( n != null )
                return n;

            n = baseTable.getNodeForNodeId(id);
            cacheUpdate(n, id);
            return n;
        }
    }

    // Node ==> NodeId
    private NodeId _idForNode(Node node, boolean allocate) {
        if ( node == Node.ANY )
            return NodeId.NodeIdAny;
        // Try once outside the synchronized
        // (Cache access is thread-safe.)
        NodeId nodeId = cacheLookup(node);
        if ( nodeId != null )
            return nodeId;
        synchronized (lock) {
            // Update two caches inside synchronized.
            // Check still valid.
            nodeId = cacheLookup(node);
            if ( nodeId != null )
                return nodeId;

            if ( allocate )
                nodeId = baseTable.getAllocateNodeId(node);
            else {
                if ( notPresent(node) )
                    // Known not be in the baseTable. 
                    return NodeId.NodeDoesNotExist;
                else
                    nodeId = baseTable.getNodeIdForNode(node);
            }
            // Ensure caches have it. Includes recording "no such node"
            cacheUpdate(node, nodeId);
            return nodeId;
        }
    }

    // ----------------
    // ---- Only places that the caches are touched

    /** 
     * Test whether in the "not present" cache.
     * True means "known to be absent from the baseTable".
     */
    private boolean notPresent(Node node) {
        if ( notPresent == null )
            return false;
        return notPresent.containsKey(node);
    }

    /**
     * Check caches to see if we can map a NodeId to a Node. Returns null on no
     * cache entry.
     */
    private Node cacheLookup(NodeId id) {
        if ( id2node_Cache == null )
            return null;
        return id2node_Cache.getIfPresent(id);
    }

    /**
     * Check caches to see if we can map a Node to a NodeId. Returns null on no
     * cache entry.
     */
    private NodeId cacheLookup(Node node) {
        // Remember things known (currently) not to exist.
        // Does not matter if notPresent is being updated elsewhere.
        return node2id_Cache.getIfPresent(node);

        // XXX [467]
        // Pre JENA-1467 - can be deleted.
//        if ( notPresent != null && notPresent.containsKey(node) )
//            return null;
//        if ( node2id_Cache == null )
//            return null;
//        return node2id_Cache.getIfPresent(node);
    }

    /** Update the Node&lt;-&gt;NodeId caches */
    private void cacheUpdate(Node node, NodeId id) {
        if ( node == null )
            return;

        // synchronized is further out.
        // The "notPresent" cache is used to note whether a node
        // is known not to exist in the baseTable..
        // This must be specially handled later if the node is added.
        // Only top-level transactions can add nodes to the "notPresent" cache.
        if ( NodeId.isDoesNotExist(id) ) {
            if ( notPresent != null && inTopLevelTxn())
                notPresent.put(node, Boolean.TRUE);
            return;
        }

        if ( id == NodeId.NodeIdAny ) {
            Log.warn(this, "Attempt to cache NodeIdAny - ignored");
            return;
        }

        if ( node2id_Cache != null )
            node2id_Cache.put(node, id);
        if ( id2node_Cache != null )
            id2node_Cache.put(id, node);
        // Remove if previously marked "not present"
        if ( notPresent != null )
            notPresent.remove(node);
    }

    // A top-level transaction is either
    // - a write transaction or
    // - a read transaction with most recent data version given that there's no active write transaction.
    private boolean inTopLevelTxn() {
        Transaction t = txn.get();
        return (t != null) && (t.isWriteTxn() || (!hasActiveWriteTransaction && t.getDataVersion() == maxDataVersion));
    }

    @Override
    public void notifyTxnStart(Transaction transaction) {
        txn.set(transaction);

        synchronized (lock) {
            maxDataVersion = Math.max(maxDataVersion, transaction.getDataVersion());
            hasActiveWriteTransaction |= transaction.isWriteTxn();
        }

        if (transaction.isWriteTxn()) {
            updateStart();
        }

    }

    @Override
    public void notifyPromoteFinish(Transaction transaction) {
        if(transaction.isWriteTxn()) {
            updateStart();
            synchronized (lock) {
                hasActiveWriteTransaction = true;
            }
        }
    }

    @Override
    public void notifyCommitStart(Transaction transaction) {
        if(transaction.isWriteTxn()) {
            synchronized (lock) {
                hasActiveWriteTransaction = false;
            }
        }
    }

    @Override
    public void notifyCompleteFinish(Transaction transaction) {
        if(transaction.isWriteTxn()) {
            updateCommit();
        }
    }

    @Override
    public void notifyAbortStart(Transaction transaction) {
        if(transaction.isWriteTxn()) {
            synchronized (lock) {
                hasActiveWriteTransaction = false;
            }
            updateAbort();
        }
    }

    @Override
    public void notifyTxnFinish(Transaction transaction) {
        txn.remove();
    }

    // ----

    // The cache is "optimistic" - nodes are added during the transaction.
    // It does not matter if they get added (and visible earlier) 
    // because this is nothing more than "preallocation". Triples (Tuple of NodeIds) don't match.
    
    // Underlying file has them "transactionally".
    
    // On abort, it does need to be undone because the underlying NodeTable
    // being cached will not have them.
    
    private void updateStart() {
        //System.out.println("updateStart: "+baseTable.toString());
        node2id_Cache.enableBuffering();
        id2node_Cache.enableBuffering();
    }
    
    private void updateAbort() {
        //System.out.println("updateAbort: "+baseTable.toString());
        node2id_Cache.dropBuffer();
        id2node_Cache.dropBuffer();
    }
    
    private void updateCommit() {
        //System.out.println("updateCommit: "+baseTable.toString());
        // Already in the baseTable.
        // Write to main caches.
        node2id_Cache.flushBuffer();
        id2node_Cache.flushBuffer();
    }
    
    @Override
    public boolean isEmpty() {
        synchronized (lock) {
            if ( node2id_Cache != null )
                return node2id_Cache.isEmpty();
            if ( id2node_Cache != null )
                id2node_Cache.isEmpty();
            // Write through.
            return baseTable.isEmpty();
        }
    }

    @Override
    public synchronized void close() {
        if ( baseTable == null )
            // Already closed
            return;
        baseTable.close();
        node2id_Cache = null;
        id2node_Cache = null;
        notPresent = null;
        baseTable = null;
    }

    @Override
    public void sync() {
        baseTable.sync();
    }

    @Override
    public Iterator<Pair<NodeId, Node>> all() {
        if ( false )
            testForConsistency();
        return baseTable.all();
    }

    private void testForConsistency() {
        Iterator<Node> iter1 = Iter.toList(node2id_Cache.keys()).iterator();

        for (; iter1.hasNext() ; ) {
            Node n = iter1.next();

            NodeId nId = node2id_Cache.getIfPresent(n);
            if ( !id2node_Cache.containsKey(nId) )
                throw new TDBException("Inconsistent: " + n + " => " + nId);
            if ( notPresent.containsKey(n) )
                throw new TDBException("Inconsistent: " + n + " in notPresent cache (1)");
        }
        Iterator<NodeId> iter2 = Iter.toList(id2node_Cache.keys()).iterator();
        for (; iter2.hasNext() ; ) {
            NodeId nId = iter2.next();
            Node n = id2node_Cache.getIfPresent(nId);
            if ( !node2id_Cache.containsKey(n) )
                throw new TDBException("Inconsistent: " + nId + " => " + n);
            if ( notPresent.containsKey(n) )
                throw new TDBException("Inconsistent: " + n + " in notPresent cache (2)");
        }
    }

    @Override
    public String toString() {
        return "Cache(" + baseTable.toString() + ")";
    }
}
