[SYSTEMDS-521] Add lineage option exposing eviction policies.
This patch exposes cache eviction policies. In addition to that
this patch tunes the weights for the scoring function, adds new
reusable instructions and sanity checks.
diff --git a/dev/Tasks.txt b/dev/Tasks.txt
index 3bf07c7..39cab22 100644
--- a/dev/Tasks.txt
+++ b/dev/Tasks.txt
@@ -357,4 +357,7 @@
* Break append instruction to cbind and rbind
SYSTEMDS-510 IO formats
- * 511 Add protobuf support to write and read FrameBlocks to HDFS OK
\ No newline at end of file
+ * 511 Add protobuf support to write and read FrameBlocks to HDFS OK
+
+SYSTEMDS-520 Lineage Tracing, Reuse and Integration III
+ * 521 New lineage option exposing cache policies OK
\ No newline at end of file
diff --git a/src/main/java/org/apache/sysds/api/DMLOptions.java b/src/main/java/org/apache/sysds/api/DMLOptions.java
index be0e266..5d8b360 100644
--- a/src/main/java/org/apache/sysds/api/DMLOptions.java
+++ b/src/main/java/org/apache/sysds/api/DMLOptions.java
@@ -31,6 +31,7 @@
import org.apache.commons.cli.PosixParser;
import org.apache.sysds.common.Types.ExecMode;
import org.apache.sysds.hops.OptimizerUtils;
+import org.apache.sysds.runtime.lineage.LineageCacheConfig.LineageCachePolicy;
import org.apache.sysds.runtime.lineage.LineageCacheConfig.ReuseCacheType;
import org.apache.sysds.utils.Explain;
import org.apache.sysds.utils.Explain.ExplainType;
@@ -60,7 +61,8 @@
public boolean help = false; // whether to print the usage option
public boolean lineage = false; // whether compute lineage trace
public boolean lineage_dedup = false; // whether deduplicate lineage items
- public ReuseCacheType linReuseType = ReuseCacheType.NONE;
+ public ReuseCacheType linReuseType = ReuseCacheType.NONE; // reuse type (full, partial, hybrid)
+ public LineageCachePolicy linCachePolicy= LineageCachePolicy.HYBRID; // lineage cache eviction policy
public boolean fedWorker = false;
public int fedWorkerPort = -1;
public boolean checkPrivacy = false; // Check which privacy constraints are loaded and checked during federated execution
@@ -127,6 +129,12 @@
dmlOptions.linReuseType = ReuseCacheType.REUSE_HYBRID;
else if (lineageType.equalsIgnoreCase("none"))
dmlOptions.linReuseType = ReuseCacheType.NONE;
+ else if (lineageType.equalsIgnoreCase("policy_lru"))
+ dmlOptions.linCachePolicy = LineageCachePolicy.LRU;
+ else if (lineageType.equalsIgnoreCase("policy_weighted"))
+ dmlOptions.linCachePolicy = LineageCachePolicy.WEIGHTED;
+ else if (lineageType.equalsIgnoreCase("policy_hybrid"))
+ dmlOptions.linCachePolicy = LineageCachePolicy.HYBRID;
else
throw new org.apache.commons.cli.ParseException(
"Invalid argument specified for -lineage option: " + lineageType);
diff --git a/src/main/java/org/apache/sysds/api/DMLScript.java b/src/main/java/org/apache/sysds/api/DMLScript.java
index 477f0ec..4f3f3f1 100644
--- a/src/main/java/org/apache/sysds/api/DMLScript.java
+++ b/src/main/java/org/apache/sysds/api/DMLScript.java
@@ -64,6 +64,7 @@
import org.apache.sysds.runtime.instructions.gpu.context.GPUContextPool;
import org.apache.sysds.runtime.io.IOUtilFunctions;
import org.apache.sysds.runtime.lineage.LineageCacheConfig;
+import org.apache.sysds.runtime.lineage.LineageCacheConfig.LineageCachePolicy;
import org.apache.sysds.runtime.lineage.LineageCacheConfig.ReuseCacheType;
import org.apache.sysds.runtime.privacy.CheckedConstraintsLog;
import org.apache.sysds.runtime.util.LocalFileUtils;
@@ -93,6 +94,7 @@
public static boolean LINEAGE = DMLOptions.defaultOptions.lineage; // whether compute lineage trace
public static boolean LINEAGE_DEDUP = DMLOptions.defaultOptions.lineage_dedup; // whether deduplicate lineage items
public static ReuseCacheType LINEAGE_REUSE = DMLOptions.defaultOptions.linReuseType; // whether lineage-based reuse
+ public static LineageCachePolicy LINEAGE_POLICY = DMLOptions.defaultOptions.linCachePolicy; // lineage cache eviction policy
public static boolean CHECK_PRIVACY = DMLOptions.defaultOptions.checkPrivacy; // Check which privacy constraints are loaded and checked during federated execution
public static boolean USE_ACCELERATOR = DMLOptions.defaultOptions.gpu;
@@ -194,6 +196,7 @@
LINEAGE = dmlOptions.lineage;
LINEAGE_DEDUP = dmlOptions.lineage_dedup;
LINEAGE_REUSE = dmlOptions.linReuseType;
+ LINEAGE_POLICY = dmlOptions.linCachePolicy;
CHECK_PRIVACY = dmlOptions.checkPrivacy;
String fnameOptConfig = dmlOptions.configFile;
@@ -219,6 +222,7 @@
}
LineageCacheConfig.setConfig(LINEAGE_REUSE);
+ LineageCacheConfig.setCachePolicy(LINEAGE_POLICY);
String dmlScriptStr = readDMLScript(isFile, fileOrScript);
Map<String, String> argVals = dmlOptions.argVals;
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
index 598deb7..7f53038 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
@@ -280,10 +280,18 @@
else
throw new DMLRuntimeException("Lineage Cache: unsupported data: "+data.getDataType());
+ long size = centry.getSize();
+ //remove the entry if the entry is bigger than the cache.
+ //FIXME: the resumed threads will enter into infinite wait as the entry
+ //is removed. Need to add support for graceful remove (placeholder) and resume.
+ if (size > LineageCacheEviction.getCacheLimit()) {
+ _cache.remove(item);
+ continue;
+ }
+
//maintain order for eviction
LineageCacheEviction.addEntry(centry);
- long size = centry.getSize();
if (!LineageCacheEviction.isBelowThreshold(size))
LineageCacheEviction.makeSpace(_cache, size);
LineageCacheEviction.updateSize(size, true);
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
index f1e8632..924c5f2 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
@@ -39,7 +39,7 @@
"rightIndex", "leftIndex", "groupedagg", "r'", "solve", "spoof",
"uamean", "max", "min", "ifelse", "-", "sqrt", ">", "uak+", "<=",
"^", "uamax", "uark+", "uacmean", "eigen", "ctableexpand", "replace",
- "^2", "uack+", "tak+*", "uacsqk+", "uark+", "n+"
+ "^2", "uack+", "tak+*", "uacsqk+", "uark+", "n+", "uarimax"
//TODO: Reuse everything.
};
private static String[] REUSE_OPCODES = new String[] {};
@@ -223,7 +223,7 @@
WEIGHTS[0] = 1; WEIGHTS[1] = 0;
break;
case HYBRID:
- WEIGHTS[0] = 1; WEIGHTS[1] = 1;
+ WEIGHTS[0] = 1; WEIGHTS[1] = 0.0033;
break;
}
_cachepolicy = policy;
@@ -233,9 +233,9 @@
return _cachepolicy;
}
- public static boolean isLRU() {
+ public static boolean isTimeBased() {
// Check the LRU component of weights array.
- return (WEIGHTS[1] == 1);
+ return (WEIGHTS[1] > 0);
}
public static void setSpill(boolean toSpill) {
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEviction.java b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEviction.java
index 2ad18a5..43fa7c5 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEviction.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEviction.java
@@ -80,7 +80,7 @@
protected static void getEntry(LineageCacheEntry entry) {
// Reset the timestamp to maintain the LRU component of the scoring function
- if (!LineageCacheConfig.isLRU())
+ if (!LineageCacheConfig.isTimeBased())
return;
if (weightedQueue.remove(entry)) {
diff --git a/src/test/java/org/apache/sysds/test/functions/lineage/CacheEvictionTest.java b/src/test/java/org/apache/sysds/test/functions/lineage/CacheEvictionTest.java
index 03604e4..417d0a9 100644
--- a/src/test/java/org/apache/sysds/test/functions/lineage/CacheEvictionTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/lineage/CacheEvictionTest.java
@@ -92,11 +92,11 @@
proArgs.add("-stats");
proArgs.add("-lineage");
proArgs.add(ReuseCacheType.REUSE_FULL.name().toLowerCase());
+ proArgs.add("policy_lru");
proArgs.add("-args");
proArgs.add(String.valueOf(cacheSize));
proArgs.add(output("R"));
programArgs = proArgs.toArray(new String[proArgs.size()]);
- LineageCacheConfig.setCachePolicy(LineageCacheConfig.LineageCachePolicy.LRU);
runTest(true, EXCEPTION_NOT_EXPECTED, null, -1);
HashMap<MatrixValue.CellIndex, Double> R_lru = readDMLMatrixFromHDFS("R");
long expCount_lru = Statistics.getCPHeavyHitterCount("exp");
@@ -108,12 +108,12 @@
proArgs.add("-stats");
proArgs.add("-lineage");
proArgs.add(ReuseCacheType.REUSE_FULL.name().toLowerCase());
+ proArgs.add("policy_weighted");
proArgs.add("-args");
proArgs.add(String.valueOf(cacheSize));
proArgs.add(output("R"));
programArgs = proArgs.toArray(new String[proArgs.size()]);
Lineage.resetInternalState();
- LineageCacheConfig.setCachePolicy(LineageCacheConfig.LineageCachePolicy.WEIGHTED);
runTest(true, EXCEPTION_NOT_EXPECTED, null, -1);
HashMap<MatrixValue.CellIndex, Double> R_weighted= readDMLMatrixFromHDFS("R");
long expCount_wt = Statistics.getCPHeavyHitterCount("exp");