[MINOR] Minior fixes in lineage cache eviction.
This patch removes a bug where we were making space after
putting an entry in the cache -- this can evict the current
entry and introduce memory leak. Now we make space before
adding an entry to cache. Furthermore, this patch increases
the default I/O speed values to enable early starting of
spilling. The rolling adjustment logic anyway updates
the speed values according to the disk.
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
index e3cb62b..71d55e3 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
@@ -277,18 +277,18 @@
LineageItem item = entry.getKey();
Data data = entry.getValue();
LineageCacheEntry centry = _cache.get(item);
- if (data instanceof MatrixObject)
- centry.setValue(((MatrixObject)data).acquireReadAndRelease(), computetime);
- else if (data instanceof ScalarObject)
- centry.setValue((ScalarObject)data, computetime);
- else {
+
+ if (!(data instanceof MatrixObject) && !(data instanceof ScalarObject)) {
// Reusable instructions can return a frame (rightIndex). Remove placeholders.
_cache.remove(item);
continue;
}
- long size = centry.getSize();
- //remove the entry if the entry is bigger than the cache.
+ MatrixBlock mb = (data instanceof MatrixObject) ?
+ ((MatrixObject)data).acquireReadAndRelease() : null;
+ long size = mb != null ? mb.getInMemorySize() : ((ScalarObject)data).getSize();
+
+ //remove the placeholder if the entry is bigger than the cache.
//FIXME: the resumed threads will enter into infinite wait as the entry
//is removed. Need to add support for graceful remove (placeholder) and resume.
if (size > LineageCacheEviction.getCacheLimit()) {
@@ -296,12 +296,20 @@
continue;
}
- //maintain order for eviction
- LineageCacheEviction.addEntry(centry);
-
+ //make space for the data
if (!LineageCacheEviction.isBelowThreshold(size))
LineageCacheEviction.makeSpace(_cache, size);
LineageCacheEviction.updateSize(size, true);
+
+ //place the data
+ if (data instanceof MatrixObject)
+ centry.setValue(mb, computetime);
+ else if (data instanceof ScalarObject)
+ centry.setValue((ScalarObject)data, computetime);
+
+ //maintain order for eviction
+ LineageCacheEviction.addEntry(centry);
+
}
}
}
@@ -358,7 +366,7 @@
// Create a new entry.
LineageCacheEntry newItem = new LineageCacheEntry(key, dt, Mval, Sval, computetime);
- // Make space by removing or spilling LRU entries.
+ // Make space by removing or spilling entries.
if( Mval != null || Sval != null ) {
long size = newItem.getSize();
if( size > LineageCacheEviction.getCacheLimit())
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
index 66972c4..6a235b6 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
@@ -79,10 +79,13 @@
// Minimum reliable data size for spilling estimate in MB.
public static final double MIN_SPILL_DATA = 2;
// Default I/O in MB per second for binary blocks
- public static double FSREAD_DENSE = 200;
- public static double FSREAD_SPARSE = 100;
- public static double FSWRITE_DENSE = 150;
- public static double FSWRITE_SPARSE = 75;
+ // NOTE: These defaults are tuned according to high
+ // speed disks, so that spilling starts early. These
+ // will anyway be adjusted as per the current disk.
+ public static double FSREAD_DENSE = 500;
+ public static double FSREAD_SPARSE = 400;
+ public static double FSWRITE_DENSE = 450;
+ public static double FSWRITE_SPARSE = 225;
private enum CachedItemHead {
TSMM,
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEviction.java b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEviction.java
index 553ca03..7025818 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEviction.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEviction.java
@@ -213,13 +213,12 @@
double exectime = ((double) e._computeTime) / 1000000; // in milliseconds
if (LineageCache.DEBUG) {
- if (exectime > LineageCacheConfig.MIN_SPILL_TIME_ESTIMATE) {
- System.out.print("LI " + e._key.getOpcode());
- System.out.print(" exec time " + ((double) e._computeTime) / 1000000);
- System.out.print(" spill time " + getDiskSpillEstimate(e) * 1000);
- System.out.print(" dim " + e.getMBValue().getNumRows() + " " + e.getMBValue().getNumColumns());
- System.out.println(" size " + getDiskSizeEstimate(e));
- }
+ System.out.print("LI = " + e._key.getOpcode());
+ System.out.print(" exec time = " + ((double) e._computeTime) / 1000000);
+ System.out.println(" spill time = " + getDiskSpillEstimate(e) * 1000);
+ System.out.print("dim = " + e.getMBValue().getNumRows() + " " + e.getMBValue().getNumColumns());
+ System.out.print(" size = " + getDiskSizeEstimate(e));
+ System.out.println(" DAG height = " + e._key.getDistLeaf2Node());
}
if (spilltime < LineageCacheConfig.MIN_SPILL_TIME_ESTIMATE) {