[SYSTEMDS-411] Efficient multi-level lineage cache management

This patch improves the handling of multiple cache entries pointing to
the same data (due to multilevel caching).

1) All the entries with the same values are connected with a linkedlist.
Even though they output same data, they have different computation time.

2) Eviction logic marks an entry for deferred spilling/removal if other
 entries are linked to that. If all the entries in a list are marked for
 spilling or removal, only then we evict the item.

3) Disk write and read happen only once for all the items connected to a
 single matrix. This way single read and write restores multiple entries
 to cache and clears more space respectively.

4) Initial experiments show huge improvements in cache management. Now
 the cache can store many more entries (this patch fixes duplicate size
 calculations), need reduced number of disk I/O. These changes overall
 improve cache hit count.

Closes #932.
diff --git a/dev/docs/Tasks.txt b/dev/docs/Tasks.txt
index 6b5dbb0..9a51eb5 100644
--- a/dev/docs/Tasks.txt
+++ b/dev/docs/Tasks.txt
@@ -314,5 +314,8 @@
 SYSTEMDS-400 Spark Backend Improvements
  * 401 Fix output block indexes of rdiag (diagM2V)                    OK
 
+SYSTEMDS-410 Lineage Tracing, Reuse and Integration II
+ * 411 Improved handling of multi-level cache duplicates              OK 
+
 Others:
  * Break append instruction to cbind and rbind 
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
index ac54b70..ca6b349 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
@@ -39,6 +39,7 @@
 import org.apache.sysds.runtime.instructions.cp.MMTSJCPInstruction;
 import org.apache.sysds.runtime.instructions.cp.ParameterizedBuiltinCPInstruction;
 import org.apache.sysds.runtime.instructions.cp.ScalarObject;
+import org.apache.sysds.runtime.lineage.LineageCacheConfig.LineageCacheStatus;
 import org.apache.sysds.runtime.lineage.LineageCacheConfig.ReuseCacheType;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.meta.MetaDataFormat;
@@ -255,7 +256,7 @@
 			LineageItem boundLI = ec.getLineage().get(boundVarName);
 			if (boundLI != null)
 				boundLI.resetVisitStatus();
-			if (boundLI == null || !LineageCache.probe(li)) {
+			if (boundLI == null || !LineageCache.probe(li) || !LineageCache.probe(boundLI)) {
 				AllOutputsCacheable = false;
 			}
 			FuncLIMap.put(li, boundLI);
@@ -282,7 +283,7 @@
 	
 	//----------------- INTERNAL CACHE LOGIC IMPLEMENTATION --------------//
 	
-	protected static void putIntern(LineageItem key, DataType dt, MatrixBlock Mval, ScalarObject Sval, long computetime) {
+	private static void putIntern(LineageItem key, DataType dt, MatrixBlock Mval, ScalarObject Sval, long computetime) {
 		if (_cache.containsKey(key))
 			//can come here if reuse_partial option is enabled
 			return;
@@ -300,7 +301,7 @@
 			LineageCacheEviction.updateSize(size, true);
 		}
 		
-		// Place the entry at head position.
+		// Place the entry in the weighted queue.
 		LineageCacheEviction.addEntry(newItem);
 		
 		_cache.put(key, newItem);
@@ -310,8 +311,7 @@
 	
 	private static LineageCacheEntry getIntern(LineageItem key) {
 		// This method is called only when entry is present either in cache or in local FS.
-		if (_cache.containsKey(key)) {
-			// Read and put the entry at head.
+		if (_cache.containsKey(key) && _cache.get(key).getCacheStatus() != LineageCacheStatus.SPILLED) {
 			LineageCacheEntry e = _cache.get(key);
 			// Maintain order for eviction
 			LineageCacheEviction.getEntry(e);
@@ -336,14 +336,16 @@
 			else
 				e.setValue(oe.getSOValue(), computetime);
 			e._origItem = probeItem; 
+			// Add the SB/func entry to the end of the list of items pointing to the same data.
+			// No cache size update is necessary.
+			LineageCacheEntry tmp = oe;
+			// Maintain _origItem as head.
+			while (tmp._nextEntry != null)
+				tmp = tmp._nextEntry;
+			tmp._nextEntry = e;
 			
 			//maintain order for eviction
 			LineageCacheEviction.addEntry(e);
-
-			long size = oe.getSize();
-			if(!LineageCacheEviction.isBelowThreshold(size)) 
-				LineageCacheEviction.makeSpace(_cache, size);
-			LineageCacheEviction.updateSize(size, true);
 		}
 		else
 			_cache.remove(item);    //remove the placeholder
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
index 2a3c426..7fce53b 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
@@ -33,8 +33,9 @@
 	//-------------CACHING LOGIC RELATED CONFIGURATIONS--------------//
 
 	private static final String[] REUSE_OPCODES = new String[] {
-		"tsmm", "ba+*", "*", "/", "+", "nrow", "ncol", "round", "exp", "log",
+		"tsmm", "ba+*", "*", "/", "+", "||", "nrow", "ncol", "round", "exp", "log",
 		"rightIndex", "leftIndex", "groupedagg", "r'", "solve", "spoof"
+		//TODO: Reuse everything. 
 	};
 	
 	public enum ReuseCacheType {
@@ -97,9 +98,11 @@
 	protected enum LineageCacheStatus {
 		EMPTY,     //Placeholder with no data. Cannot be evicted.
 		CACHED,    //General cached data. Can be evicted.
-		EVICTED,   //Data is in disk. Empty value. Cannot be evicted.
+		SPILLED,   //Data is in disk. Empty value. Cannot be evicted.
 		RELOADED,  //Reloaded from disk. Can be evicted.
-		PINNED;    //Pinned to memory. Cannot be evicted.
+		PINNED,    //Pinned to memory. Cannot be evicted.
+		TOSPILL,   //To be spilled lazily 
+		TODELETE;  //TO be removed lazily
 		public boolean canEvict() {
 			return this == CACHED || this == RELOADED;
 		}
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEntry.java b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEntry.java
index 485cac6..256d85f 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEntry.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEntry.java
@@ -33,6 +33,7 @@
 	protected long _computeTime;
 	protected long _timestamp = 0;
 	protected LineageCacheStatus _status;
+	protected LineageCacheEntry _nextEntry;
 	protected LineageItem _origItem;
 	
 	public LineageCacheEntry(LineageItem key, DataType dt, MatrixBlock Mval, ScalarObject Sval, long computetime) {
@@ -42,6 +43,7 @@
 		_SOval = Sval;
 		_computeTime = computetime;
 		_status = isNullVal() ? LineageCacheStatus.EMPTY : LineageCacheStatus.CACHED;
+		_nextEntry = null;
 		_origItem = null;
 	}
 	
@@ -100,6 +102,10 @@
 		//resume all threads waiting for val
 		notifyAll();
 	}
+	
+	public synchronized void setValue(MatrixBlock val) {
+		setValue(val, _computeTime);
+	}
 
 	public synchronized void setValue(ScalarObject val, long computetime) {
 		_SOval = val;
@@ -109,6 +115,12 @@
 		notifyAll();
 	}
 	
+	protected synchronized void setNullValues() {
+		_MBval = null;
+		_SOval = null;
+		_status = LineageCacheStatus.EMPTY;
+	}
+	
 	protected synchronized void setTimestamp() {
 		_timestamp = System.currentTimeMillis();
 	}
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEviction.java b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEviction.java
index 127e152..b83a78b 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEviction.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEviction.java
@@ -26,7 +26,6 @@
 import java.util.TreeSet;
 
 import org.apache.sysds.api.DMLScript;
-import org.apache.sysds.common.Types.DataType;
 import org.apache.sysds.hops.OptimizerUtils;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.lineage.LineageCacheConfig.LineageCacheStatus;
@@ -89,22 +88,68 @@
 		}
 	}
 
-	protected static void removeEntry(Map<LineageItem, LineageCacheEntry> cache, LineageItem key) {
-		if (!cache.containsKey(key))
-			return;
-		weightedQueue.remove(cache.get(key));
-		cache.remove(key);
-	}
-
 	private static void removeEntry(Map<LineageItem, LineageCacheEntry> cache, LineageCacheEntry e) {
-		if (DMLScript.STATISTICS)
+		if (cache.remove(e._key) != null)
+			_cachesize -= e.getSize();
+
+		if (DMLScript.STATISTICS) {
 			_removelist.add(e._key);
-
-		_cachesize -= e.getSize();
-		// NOTE: The caller of this method maintains the cache and the eviction queue.
-
-		if (DMLScript.STATISTICS)
 			LineageCacheStatistics.incrementMemDeletes();
+		}
+		// NOTE: The caller of this method maintains the eviction queue.
+	}
+	private static void removeOrSpillEntry(Map<LineageItem, LineageCacheEntry> cache, LineageCacheEntry e, boolean spill) {
+		if (e._origItem == null) {
+			// Single entry. Remove or spill.
+			if (spill)
+				spillToLocalFS(cache, e);
+			else
+				removeEntry(cache, e);
+			return;
+		}
+		
+		// Defer the eviction till all the entries with the same matrix are evicted.
+		e.setCacheStatus(spill ? LineageCacheStatus.TOSPILL : LineageCacheStatus.TODELETE);
+
+		// If all the entries with the same data are evicted, check if deferred spilling 
+		// is set for any of those. If so, spill the matrix to disk and set null in the 
+		// cache entries. Keeping the spilled entries removes the need to use another 
+		// data structure and also maintains the connections between items pointing to the 
+		// same data. Delete all the entries if all are set to be deleted.
+		boolean write = false;
+		LineageCacheEntry tmp = cache.get(e._origItem); //head
+		while (tmp != null) {
+			if (tmp.getCacheStatus() != LineageCacheStatus.TOSPILL
+				&& tmp.getCacheStatus() != LineageCacheStatus.TODELETE)
+				return; //do nothing
+
+			write |= (tmp.getCacheStatus() == LineageCacheStatus.TOSPILL);
+			tmp = tmp._nextEntry;
+		}
+		if (write) {
+			// Spill to disk if at least one entry has status TOSPILL. 
+			spillToLocalFS(cache, cache.get(e._origItem));
+			LineageCacheEntry h = cache.get(e._origItem);
+			while (h != null) {
+				// Set values to null for all the entries.
+				h.setNullValues();
+				// Set status to spilled for all the entries.
+				h.setCacheStatus(LineageCacheStatus.SPILLED);
+				h = h._nextEntry;
+			}
+			// Keep them in cache.
+			return;
+		}
+		// All are set to be deleted.
+		else {
+			// Remove all the entries from cache.
+			LineageCacheEntry h = cache.get(e._origItem);
+			while (h != null) {
+				removeEntry(cache, h);
+				h = h._nextEntry;
+			}
+		}
+		// NOTE: The callers of this method maintain the eviction queue.
 	}
 
 	//---------------- CACHE SPACE MANAGEMENT METHODS -----------------//
@@ -113,6 +158,7 @@
 		CACHE_LIMIT = limit;
 	}
 
+	//Note: public for spilling tests
 	public static long getCacheLimit() {
 		return CACHE_LIMIT;
 	}
@@ -139,8 +185,7 @@
 
 			if (!LineageCacheConfig.isSetSpill()) {
 				// If eviction is disabled, just delete the entries.
-				if (cache.remove(e._key) != null)
-					removeEntry(cache, e);
+				removeOrSpillEntry(cache, e, false);
 				e = weightedQueue.pollFirst();
 				continue;
 			}
@@ -157,8 +202,7 @@
 			if (!e.isMatrixValue()) {
 				// No spilling for scalar entries. Just delete those.
 				// Note: scalar entries with higher computation time are pinned.
-				if (cache.remove(e._key) != null)
-					removeEntry(cache, e);
+				removeOrSpillEntry(cache, e, false);
 				e = weightedQueue.pollFirst();
 				continue;
 			}
@@ -180,17 +224,21 @@
 				// Can't trust the estimate if less than 100ms.
 				// Spill if it takes longer to recompute.
 				if (exectime >= LineageCacheConfig.MIN_SPILL_TIME_ESTIMATE)
-					spillToLocalFS(e);
+					//spillToLocalFS(e);
+					removeOrSpillEntry(cache, e, true);  //spill
+				else
+					removeOrSpillEntry(cache, e, false); //delete
 			}
 			else {
 				// Spill if it takes longer to recompute than spilling.
 				if (exectime > spilltime)
-					spillToLocalFS(e);
+					//spillToLocalFS(e);
+					removeOrSpillEntry(cache, e, true);  //spill
+				else
+					removeOrSpillEntry(cache, e, false); //delete
 			}
 
 			// Remove the entry from cache.
-			if (cache.remove(e._key) != null)
-				removeEntry(cache, e);
 			e = weightedQueue.pollFirst();
 		}
 	}
@@ -259,7 +307,7 @@
 
 	// ---------------- I/O METHODS TO LOCAL FS -----------------
 	
-	private static void spillToLocalFS(LineageCacheEntry entry) {
+	private static void spillToLocalFS(Map<LineageItem, LineageCacheEntry> cache, LineageCacheEntry entry) {
 		if (!entry.isMatrixValue())
 			throw new DMLRuntimeException ("Spilling scalar objects to disk is not allowd. Key: "+entry._key);
 		if (entry.isNullVal())
@@ -280,15 +328,28 @@
 		// Adjust disk writing speed
 		adjustReadWriteSpeed(entry, ((double)(t1-t0))/1000000000, false);
 		
+		// Add all the entries associated with this matrix to spillList.
+		if (entry._origItem == null) {
+			_spillList.put(entry._key, new SpilledItem(outfile));
+		}
+		else {
+			LineageCacheEntry h = cache.get(entry._origItem); //head
+			while (h != null) {
+				_spillList.put(h._key, new SpilledItem(outfile));
+				h = h._nextEntry;
+			}
+		}
+
 		if (DMLScript.STATISTICS) {
 			LineageCacheStatistics.incrementFSWriteTime(t1-t0);
 			LineageCacheStatistics.incrementFSWrites();
 		}
-
-		_spillList.put(entry._key, new SpilledItem(outfile, entry._computeTime));
 	}
 
 	protected static LineageCacheEntry readFromLocalFS(Map<LineageItem, LineageCacheEntry> cache, LineageItem key) {
+		if (cache.get(key) == null)
+			throw new DMLRuntimeException ("Spilled item should present in cache. Key: "+key);
+
 		long t0 = System.nanoTime();
 		MatrixBlock mb = null;
 		// Read from local FS
@@ -297,12 +358,23 @@
 		} catch (IOException e) {
 			throw new DMLRuntimeException ("Read from " + _spillList.get(key)._outfile + " failed.", e);
 		}
-		// Restore to cache
 		LocalFileUtils.deleteFileIfExists(_spillList.get(key)._outfile, true);
 		long t1 = System.nanoTime();
-		LineageCache.putIntern(key, DataType.MATRIX, mb, null, _spillList.get(key)._computeTime);
+
+		// Restore to cache
+		LineageCacheEntry e = cache.get(key);
+		e.setValue(mb);
+		if (e._origItem != null) {
+			// Restore to all the entries having the same data.
+			LineageCacheEntry h = cache.get(e._origItem); //head
+			while (h != null) {
+				h.setValue(mb);
+				h = h._nextEntry;
+			}
+		}
+
 		// Adjust disk reading speed
-		adjustReadWriteSpeed(cache.get(key), ((double)(t1-t0))/1000000000, true);
+		adjustReadWriteSpeed(e, ((double)(t1-t0))/1000000000, true);
 		// TODO: set cache status as RELOADED for this entry
 		_spillList.remove(key);
 		if (DMLScript.STATISTICS) {
@@ -318,13 +390,16 @@
 
 	// ---------------- INTERNAL DATA STRUCTURES FOR EVICTION -----------------
 
+	// TODO: Remove this class, and add outfile to LineageCacheEntry.
 	private static class SpilledItem {
 		String _outfile;
-		long _computeTime;
+		//long _computeTime;
+		//protected LineageItem _origItem;
 
-		public SpilledItem(String outfile, long computetime) {
+		public SpilledItem(String outfile) {
 			_outfile = outfile;
-			_computeTime = computetime;
+			//_computeTime = computetime;
+			//_origItem = origItem;
 		}
 	}
 }