[SYSTEMDS-2713] DAG height based eviction policy for lineage cache

This patch introduces a new eviction policy, DAGHEIGHT, for lineage cache
eviction. According to this policy, lineage items farthest from the
leaves get evicted first. We store the height of each item on construction.
For inputs having different heights, we find the maximum (linear time) and
increment it by one for the current node.
In addition, this patch updates the Comparator for the eviction queue.
Now, entries with same score are ordered by cost, size ratio in LRU and
DAGHEIGHT policies, and by timestamp in COSTNSIZE policy. For HYBRID,
entries with same score are still ordered by Ids.
diff --git a/src/main/java/org/apache/sysds/api/DMLOptions.java b/src/main/java/org/apache/sysds/api/DMLOptions.java
index 21b1ab8..7db0043 100644
--- a/src/main/java/org/apache/sysds/api/DMLOptions.java
+++ b/src/main/java/org/apache/sysds/api/DMLOptions.java
@@ -133,6 +133,8 @@
 							dmlOptions.linCachePolicy = LineageCachePolicy.LRU;
 						else if (lineageType.equalsIgnoreCase("policy_costnsize"))
 							dmlOptions.linCachePolicy = LineageCachePolicy.COSTNSIZE;
+						else if (lineageType.equalsIgnoreCase("policy_dagheight"))
+							dmlOptions.linCachePolicy = LineageCachePolicy.DAGHEIGHT;
 						else if (lineageType.equalsIgnoreCase("policy_hybrid"))
 							dmlOptions.linCachePolicy = LineageCachePolicy.HYBRID;
 						else
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
index 87a9749..e3cb62b 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
@@ -163,6 +163,9 @@
 		for (int i=0; i<numOutputs; i++) {
 			String opcode = name + String.valueOf(i+1);
 			LineageItem li = new LineageItem(opcode, liInputs);
+			// set _distLeaf2Node for this special lineage item to 1
+			// to save it from early eviction if DAGHEIGHT policy is selected
+			li.setDistLeaf2Node(1);
 			LineageCacheEntry e = null;
 			synchronized(_cache) {
 				if (LineageCache.probe(li)) {
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
index b873205..8e922d8 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
@@ -100,7 +100,7 @@
 
 	private static LineageCachePolicy _cachepolicy = null;
 	// Weights for scoring components (computeTime/size, LRU timestamp)
-	protected static double[] WEIGHTS = {0, 1};
+	protected static double[] WEIGHTS = {0, 1, 0};
 
 	protected enum LineageCacheStatus {
 		EMPTY,     //Placeholder with no data. Cannot be evicted.
@@ -118,13 +118,48 @@
 	public enum LineageCachePolicy {
 		LRU,
 		COSTNSIZE,
+		DAGHEIGHT,
 		HYBRID;
 	}
 	
 	protected static Comparator<LineageCacheEntry> LineageCacheComparator = (e1, e2) -> {
-		return e1.score == e2.score ?
+		/*return e1.score == e2.score ?
 			Long.compare(e1._key.getId(), e2._key.getId()) :
 			e1.score < e2.score ? -1 : 1;
+		*/
+		int ret = 0;
+		if (e1.score == e2.score) {
+			switch(_cachepolicy) {
+				case LRU:
+				case DAGHEIGHT:
+				{
+					// order entries with same score by cost, size ratio
+					double e1_cs = e1.getCostNsize();
+					double e2_cs = e2.getCostNsize();
+					ret = e1_cs == e2_cs ?
+						Long.compare(e1._key.getId(), e2._key.getId()) :
+						e1_cs < e2_cs ? -1 : 1;
+					break;
+				}
+				case COSTNSIZE:
+				{
+					// order entries with same score by last used time
+					double e1_ts = e1.getTimestamp();
+					double e2_ts = e2.getTimestamp();
+					ret = e1_ts == e2_ts ?
+						Long.compare(e1._key.getId(), e2._key.getId()) :
+						e1_ts < e2_ts ? -1 : 1;
+					break;
+				}
+				case HYBRID:
+					// order entries with same score by IDs
+					ret = Long.compare(e1._key.getId(), e2._key.getId());
+			}
+		}
+		else
+			ret = e1.score < e2.score ? -1 : 1;
+
+		return ret;
 	};
 
 	//----------------------------------------------------------------//
@@ -218,17 +253,21 @@
 	public static void setCachePolicy(LineageCachePolicy policy) {
 		switch(policy) {
 			case LRU:
-				WEIGHTS[0] = 0; WEIGHTS[1] = 1;
+				WEIGHTS[0] = 0; WEIGHTS[1] = 1; WEIGHTS[2] = 0;
 				break;
 			case COSTNSIZE:
-				WEIGHTS[0] = 1; WEIGHTS[1] = 0;
+				WEIGHTS[0] = 1; WEIGHTS[1] = 0; WEIGHTS[2] = 0;
+				break;
+			case DAGHEIGHT:
+				WEIGHTS[0] = 0; WEIGHTS[1] = 0; WEIGHTS[2] = 1;
 				break;
 			case HYBRID:
-				WEIGHTS[0] = 1; WEIGHTS[1] = 0.0033;
+				WEIGHTS[0] = 1; WEIGHTS[1] = 0.0033; WEIGHTS[2] = 0;
 				// FIXME: Relative timestamp fix reduces the absolute
 				// value of the timestamp component of the scoring function
 				// to a comparatively much smaller number. W[1] needs to be
 				// re-tuned accordingly.
+				// FIXME: Tune hybrid with a ratio of all three.
 				// TODO: Automatic tuning of weights.
 				break;
 		}
@@ -244,6 +283,11 @@
 		return (WEIGHTS[1] > 0);
 	}
 
+	public static boolean isDagHeightBased() {
+		// Check the DAGHEIGHT component of weights array.
+		return (WEIGHTS[2] > 0);
+	}
+
 	public static void setSpill(boolean toSpill) {
 		_allowSpill = toSpill;
 		// NOTE: _allowSpill only enables/disables disk spilling, but has 
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEntry.java b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEntry.java
index 983572c..00d385e 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEntry.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEntry.java
@@ -142,11 +142,20 @@
 		return _timestamp;
 	}
 	
+	protected synchronized long getDagHeight() {
+		return _key.getDistLeaf2Node();
+	}
+	
+	protected synchronized double getCostNsize() {
+		return ((double)_computeTime)/getSize();
+	}
+	
 	private void recomputeScore() {
 		// Gather the weights for scoring components
 		double w1 = LineageCacheConfig.WEIGHTS[0];
 		double w2 = LineageCacheConfig.WEIGHTS[1];
+		double w3 = LineageCacheConfig.WEIGHTS[2];
 		// Generate scores
-		score = w1*(((double)_computeTime)/getSize()) + w2*getTimestamp();
+		score = w1*(((double)_computeTime)/getSize()) + w2*getTimestamp() + w3*(((double)1)/getDagHeight());
 	}
 }
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageItem.java b/src/main/java/org/apache/sysds/runtime/lineage/LineageItem.java
index 16c229e..90119cc 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageItem.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageItem.java
@@ -33,6 +33,7 @@
 	private final String _data;
 	private final LineageItem[] _inputs;
 	private int _hash = 0;
+	private long _distLeaf2Node;
 	// init visited to true to ensure visited items are
 	// not hidden when used as inputs to new items
 	private boolean _visited = true;
@@ -84,6 +85,8 @@
 		// materialize hash on construction 
 		// (constant time operation if input hashes constructed)
 		_hash = hashCode();
+		// store the distance of this node from the leaves. (O(#inputs)) operation
+		_distLeaf2Node = distLeaf2Node();
 	}
 	
 	public LineageItem[] getInputs() {
@@ -111,6 +114,25 @@
 		_visited = flag;
 	}
 	
+	private long distLeaf2Node() {
+		// Derive height only if the corresponding reuse
+		// policy is selected, otherwise set -1.
+		if (LineageCacheConfig.ReuseCacheType.isNone()
+			|| !LineageCacheConfig.isDagHeightBased())
+			return -1;
+
+		if (_inputs != null && _inputs.length > 0) {
+			// find the input with highest height
+			long maxDistance = _inputs[0].getDistLeaf2Node();
+			for (int i=1; i<_inputs.length; i++)
+				if (_inputs[i].getDistLeaf2Node() > maxDistance)
+					maxDistance = _inputs[i].getDistLeaf2Node();
+			return maxDistance + 1;
+		}
+		else
+			return 1;  //leaf node
+	}
+	
 	public long getId() {
 		return _id;
 	}
@@ -119,6 +141,14 @@
 		return _opcode;
 	}
 	
+	public void setDistLeaf2Node(long d) {
+		_distLeaf2Node = d;
+	}
+	
+	public long getDistLeaf2Node() {
+		return _distLeaf2Node;
+	}
+	
 	public LineageItemType getType() {
 		if (_opcode.startsWith(dedupItemOpcode))
 			return LineageItemType.Dedup;