[SYSTEMDS-3510] Recycling pointers from GPU lineage cache

This patch reworks the GPU cache eviction (deletion). The free pointer
cache stays empty when lineage cache is enabled. This patch allows
recycling the cached pointers. We maintain two lists for live and
free pointers in the cache. When the GPU memory is full, we poll
the first entry from the free list (weighted queue) and recycle
the pointer if the size matches the requested size, otherwise we
deallocate the cached pointers till enough space is available.

Closes #1796
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
index 59c9be2..6fd2c60 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
@@ -790,9 +790,6 @@
 			for (GPUObject gObj : _gpuObjects.values())
 				if (gObj != null) {
 					gObj.clearData(null, DMLScript.EAGER_CUDA_FREE);
-					if (gObj.isLinCached())
-						// set rmVarPending which helps detecting liveness
-						gObj.setrmVarPending(true);
 				}
 		}
 		
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/context/ExecutionContext.java b/src/main/java/org/apache/sysds/runtime/controlprogram/context/ExecutionContext.java
index 278f23c..cdedfb9 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/context/ExecutionContext.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/context/ExecutionContext.java
@@ -51,7 +51,9 @@
 import org.apache.sysds.runtime.instructions.gpu.context.GPUContext;
 import org.apache.sysds.runtime.instructions.gpu.context.GPUObject;
 import org.apache.sysds.runtime.lineage.Lineage;
+import org.apache.sysds.runtime.lineage.LineageCacheConfig;
 import org.apache.sysds.runtime.lineage.LineageDebugger;
+import org.apache.sysds.runtime.lineage.LineageGPUCacheEviction;
 import org.apache.sysds.runtime.lineage.LineageItem;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.matrix.data.Pair;
@@ -183,8 +185,12 @@
 	 */
 	public void setGPUContexts(List<GPUContext> gpuContexts){
 		_gpuContexts = gpuContexts;
+		// Set the single-GPU context in the lineage cache
+		if (!LineageCacheConfig.ReuseCacheType.isNone())
+			LineageGPUCacheEviction.setGPUContext(gpuContexts.get(0));
 	}
 
+
 	/**
 	 * Gets the list of GPUContexts
 	 * @return a list of GPUContexts
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/gpu/context/GPUContext.java b/src/main/java/org/apache/sysds/runtime/instructions/gpu/context/GPUContext.java
index 5ad9bc5..3ffbede 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/gpu/context/GPUContext.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/gpu/context/GPUContext.java
@@ -283,15 +283,6 @@
 		GPUObject ret = new GPUObject(this, source, mo);
 		getMemoryManager().getGPUMatrixMemoryManager().addGPUObject(ret);
 
-		// Maintain the linked list of GPUObjects that point to same memory region
-		if (!LineageCacheConfig.ReuseCacheType.isNone()) {
-			if (source.lineageCachedChainHead == null)
-				source.lineageCachedChainHead = source;
-			if (source.nextLineageCachedEntry != null)
-				ret.nextLineageCachedEntry = source.nextLineageCachedEntry;
-			source.nextLineageCachedEntry = ret;
-			ret.lineageCachedChainHead = source;
-		}
 		return ret;
 	}
 
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/gpu/context/GPUMatrixMemoryManager.java b/src/main/java/org/apache/sysds/runtime/instructions/gpu/context/GPUMatrixMemoryManager.java
index 23c1be9..4a2f2a5 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/gpu/context/GPUMatrixMemoryManager.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/gpu/context/GPUMatrixMemoryManager.java
@@ -120,12 +120,11 @@
 	 * Get pointers from the first memory sections "Matrix Memory"
 	 * @param locked return locked pointers if true
 	 * @param dirty return dirty pointers if true
-	 * @param lineageCached return cached pointer if true
 	 * @return set of pointers
 	 */
-	Set<Pointer> getPointers(boolean locked, boolean dirty, boolean lineageCached) {
+	Set<Pointer> getPointers(boolean locked, boolean dirty) {
 		return gpuObjects.stream()
-			.filter(gObj -> gObj.isLocked() == locked && gObj.isDirty() == dirty || gObj.isLinCached() == lineageCached)
+			.filter(gObj -> gObj.isLocked() == locked && gObj.isDirty() == dirty)
 			.flatMap(gObj -> getPointers(gObj).stream()).collect(Collectors.toSet());
 	}
 	
@@ -137,7 +136,7 @@
 	 */
 	void clearAllUnlocked(String opcode) throws DMLRuntimeException {
 		Set<GPUObject> unlockedGPUObjects = gpuObjects.stream()
-				.filter(gpuObj -> !gpuObj.isLocked() && !gpuObj.isLinCached()).collect(Collectors.toSet());
+				.filter(gpuObj -> !gpuObj.isLocked()).collect(Collectors.toSet());
 		if(unlockedGPUObjects.size() > 0) {
 			if(LOG.isWarnEnabled())
 				LOG.warn("Clearing all unlocked matrices (count=" + unlockedGPUObjects.size() + ").");
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/gpu/context/GPUMemoryEviction.java b/src/main/java/org/apache/sysds/runtime/instructions/gpu/context/GPUMemoryEviction.java
index 0264497..3676d33 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/gpu/context/GPUMemoryEviction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/gpu/context/GPUMemoryEviction.java
@@ -50,7 +50,7 @@
 		// Stop if 1) Evicted the request number of entries, 2) The parallel
 		// CPU instruction is ended, and 3) No non-live entries left in the cache.
 		long t0 =  DMLScript.STATISTICS ? System.nanoTime() : 0;
-		while (!LineageGPUCacheEviction.isGPUCacheEmpty()) 
+		/*while (!LineageGPUCacheEviction.isGPUCacheEmpty())
 		{
 			if (LineageCacheConfig.STOPBACKGROUNDEVICTION)
 				// This logic reduces #evictions if the cpu instructions is so small
@@ -135,7 +135,7 @@
 				LineageCacheStatistics.incrementGpuAsyncEvicts();
 			}
 			count++;
-		}
+		}*/
 
 		// Add the locked entries back to the eviction queue
 		if (!lockedOrLiveEntries.isEmpty())
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/gpu/context/GPUMemoryManager.java b/src/main/java/org/apache/sysds/runtime/instructions/gpu/context/GPUMemoryManager.java
index 4aed4a7..4b0a67c 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/gpu/context/GPUMemoryManager.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/gpu/context/GPUMemoryManager.java
@@ -89,6 +89,7 @@
 	private Set<Pointer> getNonMatrixLockedPointers() {
 		Set<Pointer> managedPointers = matrixMemoryManager.getPointers();
 		managedPointers.addAll(lazyCudaFreeMemoryManager.getAllPointers());
+		managedPointers.addAll(LineageGPUCacheEviction.getAllCachedPointers());
 		return nonIn(allPointers.keySet(), managedPointers);
 	}
 	
@@ -298,80 +299,35 @@
 			long t0 =  DMLScript.STATISTICS ? System.nanoTime() : 0;
 			while (A == null && !LineageGPUCacheEviction.isGPUCacheEmpty()) {
 				LineageCacheEntry le = LineageGPUCacheEviction.pollFirstEntry();
-				GPUObject cachedGpuObj = le.getGPUObject();
-				GPUObject headGpuObj = cachedGpuObj.lineageCachedChainHead != null
-						? cachedGpuObj.lineageCachedChainHead : cachedGpuObj;
-				// Check and continue if any object in the linked list is locked
-				boolean locked = false;
-				GPUObject nextgpuObj = headGpuObj;
-				while (nextgpuObj!= null) {
-					if (nextgpuObj.isLocked())
-						locked = true;
-					nextgpuObj = nextgpuObj.nextLineageCachedEntry;
-				}
-				if (locked) {
-					lockedAndLiveList.add(le);
-					continue;
-				}
 
 				// First remove the gpuobj chains that don't contain any live and dirty objects.
-				// Continue if any object is live
-				boolean copied = false;
-				boolean live = false;
-				nextgpuObj = headGpuObj;
-				while (nextgpuObj!= null) {
-					// Keeping isLinCached as True here will save data deletion by copyFromDeviceToHost
-					if (!nextgpuObj.isrmVarPending()) { //live
-						//nextgpuObj.copyFromDeviceToHost(opcode, true, true);
-						//copied = true;
-						live = true;
-					}
-					//nextgpuObj.setIsLinCached(false);
-					nextgpuObj = nextgpuObj.nextLineageCachedEntry;
-				}
-				if (live) {
+				// TODO: Handle dirty objects separately. Copy them back to the host
+
+				// Check and continue if the pointer is live
+				// Note: all locked entries are live
+				Pointer ptr = le.getGPUPointer();
+				if (LineageGPUCacheEviction.probeLiveCachedPointers(ptr)) {
 					lockedAndLiveList.add(le);
 					continue;
 				}
-				// TODO: Handle dirty objects separately. Copy them back to the host
+				currentAvailableMemory += getSizeAllocatedGPUPointer(ptr);
 
-				currentAvailableMemory += headGpuObj.getSizeOnDevice();
-
-				if (!LineageCacheConfig.GPU2HOSTEVICTION)
-					LineageGPUCacheEviction.removeFromDeviceCache(le, opcode, copied);
-				else {
+				if (!LineageCacheConfig.GPU2HOSTEVICTION) {
+					LineageGPUCacheEviction.removeFromDeviceCache(le, opcode, false);
+					// Recycle the pointer if matches the required size
+					if (getSizeAllocatedGPUPointer(ptr) == size) {
+						A = ptr;
+						continue;
+					}
+					else
+						free(opcode, ptr, true);
+				}
+				/*else {
 					// Copy from device cache to CPU lineage cache if not already copied
 					LineageGPUCacheEviction.copyToHostCache(le, opcode, copied);
 					if(DMLScript.STATISTICS)
 						LineageCacheStatistics.incrementGpuSyncEvicts();
-				}
-
-				// For all the other objects, remove and clear data (only once)
-				nextgpuObj = headGpuObj;
-				boolean freed = false;
-				while (nextgpuObj!= null) {
-					// If not live or live but not dirty
-					if (nextgpuObj.isrmVarPending() || !nextgpuObj.isDirty()) {
-						if (!freed) {
-							nextgpuObj.setIsLinCached(false);
-							nextgpuObj.clearData(opcode, true);
-							freed = true;
-						}
-						else
-							nextgpuObj.clearGPUObject();
-					}
-					nextgpuObj = nextgpuObj.nextLineageCachedEntry;
-				}
-
-				// Clear the GPUOjects chain
-				GPUObject currgpuObj = headGpuObj;
-				while (currgpuObj.nextLineageCachedEntry != null) {
-					nextgpuObj = currgpuObj.nextLineageCachedEntry;
-					currgpuObj.lineageCachedChainHead = null;
-					currgpuObj.nextLineageCachedEntry = null;
-					nextgpuObj.lineageCachedChainHead = null;
-					currgpuObj = nextgpuObj;
-				}
+				}*/
 
 				if(currentAvailableMemory >= size)
 					// This doesn't guarantee allocation due to fragmented freed memory
@@ -393,7 +349,7 @@
 			long t0 =  DMLScript.STATISTICS ? System.nanoTime() : 0;
 			synchronized (matrixMemoryManager.gpuObjects) {
 				Optional<GPUObject> sizeBasedUnlockedGPUObjects = matrixMemoryManager.gpuObjects.stream()
-					.filter(gpuObj -> !gpuObj.isLocked() && !gpuObj.isLinCached()
+					.filter(gpuObj -> !gpuObj.isLocked()
 						&& matrixMemoryManager.getWorstCaseContiguousMemorySize(gpuObj) >= size)
 					.min((o1, o2) -> worstCaseContiguousMemorySizeCompare(o1, o2));
 				if(sizeBasedUnlockedGPUObjects.isPresent()) {
@@ -421,7 +377,7 @@
 			// Evict unlocked GPU objects one-by-one and try malloc
 			synchronized(matrixMemoryManager.gpuObjects) {
 				List<GPUObject> unlockedGPUObjects = matrixMemoryManager.gpuObjects.stream()
-						.filter(gpuObj -> !gpuObj.isLocked() && !gpuObj.isLinCached()).collect(Collectors.toList());
+						.filter(gpuObj -> !gpuObj.isLocked()).collect(Collectors.toList());
 				Collections.sort(unlockedGPUObjects, new EvictionPolicyBasedComparator(size));
 				while(A == null && unlockedGPUObjects.size() > 0) {
 					GPUObject evictedGPUObject = unlockedGPUObjects.remove(unlockedGPUObjects.size()-1);
@@ -511,7 +467,7 @@
 	 * 
 	 * @param toFree pointer to call cudaFree method on
 	 */
-	void guardedCudaFree(Pointer toFree) {
+	public void guardedCudaFree(Pointer toFree) {
 		synchronized(allPointers) {
 			if(allPointers.containsKey(toFree)) {
 				long size = allPointers.get(toFree).getSizeInBytes();
@@ -541,6 +497,11 @@
 	 * @throws DMLRuntimeException if error occurs
 	 */
 	public void free(String opcode, Pointer toFree, boolean eager) throws DMLRuntimeException {
+		if (!LineageCacheConfig.ReuseCacheType.isNone()
+			&& LineageGPUCacheEviction.probeLiveCachedPointers(toFree)) {
+			LineageGPUCacheEviction.decrementLiveCount(toFree);
+			return;
+		}
 		if(LOG.isTraceEnabled())
 			LOG.trace("Free-ing the pointer with eager=" + eager);
 		if (eager) {
@@ -626,7 +587,7 @@
 	 */
 	public void clearTemporaryMemory() {
 		// To record the cuda block sizes needed by allocatedGPUObjects, others are cleared up.
-		Set<Pointer> unlockedDirtyOrCachedPointers = matrixMemoryManager.getPointers(false, true, true);
+		Set<Pointer> unlockedDirtyOrCachedPointers = matrixMemoryManager.getPointers(false, true);
 		Set<Pointer> temporaryPointers = nonIn(allPointers.keySet(), unlockedDirtyOrCachedPointers);
 		for(Pointer tmpPtr : temporaryPointers) {
 			guardedCudaFree(tmpPtr);
@@ -659,18 +620,11 @@
 		long sizeOfLockedGPUObjects = 0; int numLockedGPUObjects = 0; int numLockedPointers = 0;
 		long sizeOfUnlockedDirtyGPUObjects = 0; int numUnlockedDirtyGPUObjects = 0; int numUnlockedDirtyPointers = 0;
 		long sizeOfUnlockedNonDirtyGPUObjects = 0; int numUnlockedNonDirtyGPUObjects = 0; int numUnlockedNonDirtyPointers = 0;
-		long sizeOfLockedCachedGPUObjects = 0; int numLockedCachedGPUObjects = 0; int numLockedCachedPointers = 0;
-		long sizeOfUnlockedCachedGPUObjects = 0; int numUnlockedCachedGPUObjects = 0; int numUnlockedCachedPointers = 0;
 		for(GPUObject gpuObj : matrixMemoryManager.gpuObjects) {
 			if(gpuObj.isLocked()) {
 				numLockedGPUObjects++;
 				sizeOfLockedGPUObjects += gpuObj.getSizeOnDevice();
 				numLockedPointers += matrixMemoryManager.getPointers(gpuObj).size();
-				if (gpuObj.isLinCached()) {
-					numLockedCachedGPUObjects++;
-					sizeOfLockedCachedGPUObjects += gpuObj.getSizeOnDevice();
-					numLockedCachedPointers += matrixMemoryManager.getPointers(gpuObj).size();
-				}
 			}
 			else {
 				if(gpuObj.isDirty()) {
@@ -683,11 +637,6 @@
 					sizeOfUnlockedNonDirtyGPUObjects += gpuObj.getSizeOnDevice();
 					numUnlockedNonDirtyPointers += matrixMemoryManager.getPointers(gpuObj).size();
 				}
-				if (gpuObj.isLinCached()) {
-					numUnlockedCachedGPUObjects++;
-					sizeOfUnlockedCachedGPUObjects += gpuObj.getSizeOnDevice();
-					numUnlockedCachedPointers += matrixMemoryManager.getPointers(gpuObj).size();
-				}
 			}
 		}
 		
@@ -696,7 +645,10 @@
 		for(PointerInfo ptrInfo : allPointers.values()) {
 			totalMemoryAllocated += ptrInfo.getSizeInBytes();
 		}
-		
+
+		int numCachedPointers = LineageGPUCacheEviction.numPointersCached();
+		long totalMemoryCached = LineageGPUCacheEviction.totalMemoryCached();
+
 		
 		Set<Pointer> potentiallyLeakyPointers = getNonMatrixLockedPointers();
 		List<Long> sizePotentiallyLeakyPointers = potentiallyLeakyPointers.stream().
@@ -719,11 +671,9 @@
 				numUnlockedNonDirtyGPUObjects, numUnlockedNonDirtyPointers, byteCountToDisplaySize(sizeOfUnlockedNonDirtyGPUObjects)));
 		ret.append(String.format("%-35s%-15s%-15s%-15s\n", "Locked GPU objects", 
 				numLockedGPUObjects, numLockedPointers, byteCountToDisplaySize(sizeOfLockedGPUObjects)));
-		ret.append(String.format("%-35s%-15s%-15s%-15s\n", "Locked Cached GPU objects", 
-				numLockedCachedGPUObjects, numLockedCachedPointers, byteCountToDisplaySize(sizeOfLockedCachedGPUObjects)));
-		ret.append(String.format("%-35s%-15s%-15s%-15s\n", "Unlocked Cached GPU objects", 
-				numUnlockedCachedGPUObjects, numUnlockedCachedPointers, byteCountToDisplaySize(sizeOfUnlockedCachedGPUObjects)));
-		ret.append(String.format("%-35s%-15s%-15s%-15s\n", "Cached rmvar-ed pointers", 
+		ret.append(String.format("%-35s%-15s%-15s%-15s\n", "All Cached Pointers",
+			"-", numCachedPointers, byteCountToDisplaySize(totalMemoryCached)));
+		ret.append(String.format("%-35s%-15s%-15s%-15s\n", "Cached rmvar-ed pointers",
 				"-", lazyCudaFreeMemoryManager.getNumPointers(), byteCountToDisplaySize(lazyCudaFreeMemoryManager.getTotalMemoryAllocated())));
 		ret.append(String.format("%-35s%-15s%-15s%-15s\n", "Non-matrix/non-cached pointers", 
 				"-", potentiallyLeakyPointers.size(), byteCountToDisplaySize(totalSizePotentiallyLeakyPointers)));
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/gpu/context/GPUObject.java b/src/main/java/org/apache/sysds/runtime/instructions/gpu/context/GPUObject.java
index 043243f..6dccee9 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/gpu/context/GPUObject.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/gpu/context/GPUObject.java
@@ -102,27 +102,6 @@
 	 */
 	final ShadowBuffer shadowBuffer;
 	
-	/**
-	 * whether cached in lineage cache
-	 */
-	private boolean isLineageCached = false;
-	
-	/**
-	 * whether remove variable is called on this object.
-	 * True -> live; False -> not-live
-	 */
-	private boolean rmVarPending = false;
-	
-	/**
-	 * Next GPUObject that points to the same lineage cached GPU pointer
-	 */
-	public GPUObject lineageCachedChainHead = null;
-	
-	/**
-	 * Head of the linked list of GPUObjects that point to the same lineage cached GPU pointer
-	 */
-	public GPUObject nextLineageCachedEntry = null;
-	
 	// ----------------------------------------------------------------------
 	// Methods used to access, set and check jcudaDenseMatrixPtr
 	
@@ -466,13 +445,20 @@
 		this.shadowBuffer = new ShadowBuffer(this);
 	}
 
+	public GPUObject(GPUContext gCtx, MatrixObject mat, Pointer ptr) {
+		gpuContext = gCtx;
+		this.mat = mat;
+		setDensePointer(ptr);
+		isSparse = false;
+		this.shadowBuffer = new ShadowBuffer(this);
+	}
+
 	public GPUObject(GPUContext gCtx, GPUObject that, MatrixObject mat) {
 		dirty = that.dirty;
 		readLocks.reset();
 		writeLock = false;
 		timestamp = new AtomicLong(that.timestamp.get());
 		isSparse = that.isSparse;
-		isLineageCached = that.isLineageCached;
 		if (!that.isDensePointerNull())
 			setDensePointer(that.getDensePointer());
 		if (that.getJcudaSparseMatrixPtr() != null)
@@ -991,7 +977,7 @@
 			tmp.allocateDenseBlock();
 			LibMatrixCUDA.cudaSupportFunctions.deviceToHost(getGPUContext(),
 						getDensePointer(), tmp.getDenseBlockValues(), instName, isEviction);
-			if(eagerDelete && !isLinCached())
+			if(eagerDelete)
 				clearData(instName, true);
 			tmp.recomputeNonZeros();
 		} else {
@@ -1003,7 +989,7 @@
 			int[] rowPtr = new int[rows + 1];
 			int[] colInd = new int[nnz];
 			CSRPointer.copyPtrToHost(getJcudaSparseMatrixPtr(), rows, nnz, rowPtr, colInd);
-			if(eagerDelete && !isLinCached())
+			if(eagerDelete)
 				clearData(instName, true);
 			SparseBlockCSR sparseBlock = new SparseBlockCSR(rowPtr, colInd, values, nnz);
 			tmp = new MatrixBlock(rows, cols, nnz, sparseBlock);
@@ -1064,11 +1050,6 @@
 	 * @throws DMLRuntimeException if error occurs
 	 */
 	synchronized public void clearData(String opcode, boolean eager) throws DMLRuntimeException {
-		if (isLineageCached) {
-			setDirty(false);
-			return;
-		}
-
 		if(LOG.isTraceEnabled()) {
 			LOG.trace("GPU : clearData on " + this + ", GPUContext=" + getGPUContext());
 		}
@@ -1082,13 +1063,10 @@
 		shadowBuffer.clearShadowPointer();
 		jcudaSparseMatrixPtr = null;
 		resetReadWriteLock();
-		setrmVarPending(false);
 		getGPUContext().getMemoryManager().removeGPUObject(this);
 	}
 	
 	public void clearGPUObject() {
-		if (isLineageCached)
-			return;
 		if(LOG.isTraceEnabled())
 			LOG.trace("GPU : clearData on " + this + ", GPUContext=" + getGPUContext());
 
@@ -1096,7 +1074,6 @@
 		shadowBuffer.clearShadowPointer();
 		jcudaSparseMatrixPtr = null;
 		resetReadWriteLock();
-		setrmVarPending(false);
 		getGPUContext().getMemoryManager().removeGPUObject(this);
 	}
 
@@ -1118,22 +1095,6 @@
 		return dirty;
 	}
 	
-	public void setIsLinCached(boolean val) {
-		isLineageCached = val;
-	}
-
-	public boolean isLinCached() {
-		return isLineageCached;
-	}
-	
-	public void setrmVarPending(boolean val) {
-		rmVarPending = val;
-	}
-	
-	public boolean isrmVarPending() {
-		return rmVarPending;
-	}
-
 	@Override
 	public String toString() {
 		final StringBuilder sb = new StringBuilder("GPUObject{");
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
index fb4f579..c38132a 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
@@ -19,6 +19,7 @@
 
 package org.apache.sysds.runtime.lineage;
 
+import jcuda.Pointer;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.lang3.tuple.MutablePair;
 import org.apache.commons.lang3.tuple.Pair;
@@ -151,10 +152,14 @@
 					}
 					else { //TODO handle locks on gpu objects
 						//shallow copy the cached GPUObj to the output MatrixObject
-						ec.getMatrixObject(outName).setGPUObject(ec.getGPUContext(0), 
-								ec.getGPUContext(0).shallowCopyGPUObject(e._gpuObject, ec.getMatrixObject(outName)));
+						//Create a GPUObject with the cached pointer
+						GPUObject gpuObj = new GPUObject(ec.getGPUContext(0),
+							ec.getMatrixObject(outName), e.getGPUPointer());
+						ec.getMatrixObject(outName).setGPUObject(ec.getGPUContext(0), gpuObj);
 						//Set dirty to true, so that it is later copied to the host for write
 						ec.getMatrixObject(outName).getGPUObject(ec.getGPUContext(0)).setDirty(true);
+						//Increment the live count for this pointer
+						LineageGPUCacheEviction.incrementLiveCount(e.getGPUPointer());
 					}
 				}
 				maintainReuseStatistics(ec, inst, liList.get(0).getValue());
@@ -474,7 +479,7 @@
 		if (LineageCacheConfig.isReusable(inst, ec) ) {
 			//if (!isMarkedForCaching(inst, ec)) return;
 			List<Pair<LineageItem, Data>> liData = null;
-			GPUObject liGpuObj = null;
+			Pointer gpuPtr = null;
 			LineageItem instLI = ((LineageTraceable) inst).getLineageItem(ec).getValue();
 			if (inst instanceof MultiReturnBuiltinCPInstruction) {
 				liData = new ArrayList<>();
@@ -489,12 +494,13 @@
 			else if (inst instanceof GPUInstruction) {
 				// TODO: gpu multiretrun instructions
 				Data gpudata = ec.getVariable(((GPUInstruction) inst)._output);
-				liGpuObj = gpudata instanceof MatrixObject ? 
-						ec.getMatrixObject(((GPUInstruction)inst)._output).getGPUObject(ec.getGPUContext(0)) : null;
+				gpuPtr = gpudata instanceof MatrixObject ?
+						ec.getMatrixObject(((GPUInstruction)inst)._output).
+							getGPUObject(ec.getGPUContext(0)).getDensePointer() : null;
 
 				// Scalar gpu intermediates is always copied back to host. 
 				// No need to cache the GPUobj for scalar intermediates.
-				if (liGpuObj == null)
+				if (gpuPtr == null)
 					liData = Arrays.asList(Pair.of(instLI, ec.getVariable(((GPUInstruction)inst)._output)));
 			}
 			else if (inst instanceof ComputationSPInstruction
@@ -511,10 +517,10 @@
 				else if (inst instanceof ComputationSPInstruction) //collects or prefetches
 					liData = Arrays.asList(Pair.of(instLI, ec.getVariable(((ComputationSPInstruction) inst).output)));
 
-			if (liGpuObj == null)
+			if (gpuPtr == null)
 				putValueCPU(inst, liData, computetime);
 			else
-				putValueGPU(liGpuObj, instLI, computetime);
+				putValueGPU(gpuPtr, instLI, computetime);
 		}
 	}
 	
@@ -588,14 +594,14 @@
 		}
 	}
 	
-	private static void putValueGPU(GPUObject gpuObj, LineageItem instLI, long computetime) {
+	private static void putValueGPU(Pointer gpuPtr, LineageItem instLI, long computetime) {
 		synchronized( _cache ) {
 			LineageCacheEntry centry = _cache.get(instLI);
 			// Update the total size of lineage cached gpu objects
 			// The eviction is handled by the unified gpu memory manager
-			LineageGPUCacheEviction.updateSize(gpuObj.getSizeOnDevice(), true);
+			LineageGPUCacheEviction.updateSize(LineageGPUCacheEviction.getPointerSize(gpuPtr), true);
 			// Set the GPUOject in the cache
-			centry.setGPUValue(gpuObj, computetime);
+			centry.setGPUValue(gpuPtr, computetime);
 			// Maintain order for eviction
 			LineageGPUCacheEviction.addEntry(centry);
 		}
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEntry.java b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEntry.java
index 8efe57a..e0b595e 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEntry.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEntry.java
@@ -21,6 +21,7 @@
 
 import java.util.Map;
 
+import jcuda.Pointer;
 import org.apache.sysds.common.Types.DataType;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.instructions.cp.ScalarObject;
@@ -42,7 +43,8 @@
 	protected LineageItem _origItem;
 	private String _outfile = null;
 	protected double score;
-	protected GPUObject _gpuObject;
+	protected Pointer _gpuPointer;
+
 	protected RDDObject _rddObject;
 	
 	public LineageCacheEntry(LineageItem key, DataType dt, MatrixBlock Mval, ScalarObject Sval, long computetime) {
@@ -55,7 +57,7 @@
 		_nextEntry = null;
 		_origItem = null;
 		_outfile = null;
-		_gpuObject = null;
+		_gpuPointer = null;
 	}
 	
 	protected synchronized void setCacheStatus(LineageCacheStatus st) {
@@ -140,13 +142,13 @@
 			size += _MBval.getInMemorySize();
 		if (_SOval != null)
 			size += _SOval.getSize();
-		if (_gpuObject != null)
-			size += _gpuObject.getSizeOnDevice();
+		if (_gpuPointer!= null)
+			size += LineageGPUCacheEviction.getPointerSize(_gpuPointer);
 		return size;
 	}
 	
 	public boolean isNullVal() {
-		return(_MBval == null && _SOval == null && _gpuObject == null && _serialBytes == null && _rddObject == null);
+		return(_MBval == null && _SOval == null && _gpuPointer == null && _serialBytes == null && _rddObject == null);
 	}
 	
 	public boolean isMatrixValue() {
@@ -162,7 +164,7 @@
 	}
 
 	public boolean isGPUObject() {
-		return _gpuObject != null;
+		return _gpuPointer != null;
 	}
 
 	public boolean isSerializedBytes() {
@@ -171,7 +173,7 @@
 
 	public synchronized void setValue(MatrixBlock val, long computetime) {
 		_MBval = val;
-		_gpuObject = null;  //Matrix block and gpu object cannot coexist
+		_gpuPointer = null;  //Matrix block and gpu pointer cannot coexist
 		_computeTime = computetime;
 		_status = isNullVal() ? LineageCacheStatus.EMPTY : LineageCacheStatus.CACHED;
 		//resume all threads waiting for val
@@ -184,16 +186,15 @@
 
 	public synchronized void setValue(ScalarObject val, long computetime) {
 		_SOval = val;
-		_gpuObject = null;  //scalar and gpu object cannot coexist
+		_gpuPointer = null;  //scalar and gpu pointer cannot coexist
 		_computeTime = computetime;
 		_status = isNullVal() ? LineageCacheStatus.EMPTY : LineageCacheStatus.CACHED;
 		//resume all threads waiting for val
 		notifyAll();
 	}
 	
-	public synchronized void setGPUValue(GPUObject gpuObj, long computetime) {
-		gpuObj.setIsLinCached(true);
-		_gpuObject = gpuObj;
+	public synchronized void setGPUValue(Pointer ptr, long computetime) {
+		_gpuPointer = ptr;
 		_computeTime = computetime;
 		_status = isNullVal() ? LineageCacheStatus.EMPTY : LineageCacheStatus.GPUCACHED;
 		//resume all threads waiting for val
@@ -216,8 +217,8 @@
 		notifyAll();
 	}
 	
-	public synchronized GPUObject getGPUObject() {
-		return _gpuObject;
+	public synchronized Pointer getGPUPointer() {
+		return _gpuPointer;
 	}
 	
 	protected synchronized void setNullValues() {
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageGPUCacheEviction.java b/src/main/java/org/apache/sysds/runtime/lineage/LineageGPUCacheEviction.java
index b302359..ed0d85a 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageGPUCacheEviction.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageGPUCacheEviction.java
@@ -19,32 +19,66 @@
 
 package org.apache.sysds.runtime.lineage;
 
+import java.util.HashMap;
 import java.util.List;
+import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
 
+import jcuda.Pointer;
 import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.instructions.gpu.context.GPUContext;
 import org.apache.sysds.runtime.instructions.gpu.context.GPUContextPool;
-import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 
 public class LineageGPUCacheEviction 
 {
 	private static long _currentCacheSize = 0;
 	private static long GPU_CACHE_LIMIT; //limit in bytes
+	private static GPUContext _gpuContext = null;
 	private static long _startTimestamp = 0;
 	public static ExecutorService gpuEvictionThread = null;
+
+	// Weighted queue of freed pointers.
 	private static TreeSet<LineageCacheEntry> weightedQueue = new TreeSet<>(LineageCacheConfig.LineageCacheComparator);
+	private static HashMap<Pointer, Integer> livePointers = new HashMap<>();
+	private static HashMap<Pointer, LineageCacheEntry> GPUCacheEntries = new HashMap<>();
 
 	protected static void resetEviction() {
-		while(!weightedQueue.isEmpty()) {
-			LineageCacheEntry e = weightedQueue.pollFirst();
-			e._gpuObject.setIsLinCached(false);
-			e._gpuObject.clearData(null, true);
-		}
 		_currentCacheSize = 0;
 		gpuEvictionThread = null;
 		//LineageCacheConfig.CONCURRENTGPUEVICTION = false;
 		weightedQueue.clear();
+		livePointers.clear();
+		GPUCacheEntries.clear();
+	}
+
+	public static void setGPUContext(GPUContext gpuCtx) {
+		_gpuContext = gpuCtx;
+	}
+
+	protected static GPUContext getGPUContext() {
+		return _gpuContext;
+	}
+
+	protected static long getPointerSize(Pointer ptr) {
+		return _gpuContext.getMemoryManager().getSizeAllocatedGPUPointer(ptr);
+	}
+
+	protected static void incrementLiveCount(Pointer ptr) {
+		//TODO: move from free list to live list
+		if(livePointers.merge(ptr, 1, Integer::sum) == 1)
+			weightedQueue.remove(GPUCacheEntries.get(ptr));
+	}
+
+	public static void decrementLiveCount(Pointer ptr) {
+		// Decrement and remove if the live counte becomes 0
+		if(livePointers.compute(ptr, (k, v) -> v==1 ? null : v-1) == null)
+			weightedQueue.add(GPUCacheEntries.get(ptr));
+	}
+
+	public static boolean probeLiveCachedPointers(Pointer ptr) {
+		return livePointers.containsKey(ptr);
 	}
 
 	//---------------- COSTING RELATED METHODS -----------------
@@ -88,7 +122,9 @@
 
 		// TODO: Separate removelist, starttimestamp, score and weights from CPU cache
 		entry.computeScore(LineageCacheEviction._removelist);
-		weightedQueue.add(entry);
+		//weightedQueue.add(entry);
+		livePointers.put(entry.getGPUPointer(), 1);
+		GPUCacheEntries.put(entry.getGPUPointer(), entry);
 	}
 	
 	public static boolean isGPUCacheEmpty() {
@@ -127,8 +163,29 @@
 	protected static long getGPUCacheLimit() {
 		return GPU_CACHE_LIMIT;
 	}
+
+	public static int numPointersCached() {
+		return livePointers.size() + weightedQueue.size();
+	}
+
+	public static long totalMemoryCached() {
+		long totLive = livePointers.keySet().stream()
+			.mapToLong(ptr -> _gpuContext.getMemoryManager().getSizeAllocatedGPUPointer(ptr)).sum();
+		long totFree = weightedQueue.stream()
+			.mapToLong(en -> _gpuContext.getMemoryManager().getSizeAllocatedGPUPointer(en.getGPUPointer())).sum();
+		return totLive + totFree;
+	}
+
+	public static Set<Pointer> getAllCachedPointers() {
+		//livePointers.keySet() + weightedQueue.stream().map()
+		Set<Pointer> cachedPointers = weightedQueue.stream()
+			.map(LineageCacheEntry::getGPUPointer)
+			.collect(Collectors.toSet());
+		cachedPointers.addAll(livePointers.keySet());
+		return cachedPointers;
+	}
 	
-	public static void copyToHostCache(LineageCacheEntry entry, String instName, boolean alreadyCopied) {
+	/*public static void copyToHostCache(LineageCacheEntry entry, String instName, boolean alreadyCopied) {
 		// TODO: move to the shadow buffer. Convert to double precision only when reused.
 		long t0 = System.nanoTime();
 		MatrixBlock mb = alreadyCopied ? entry._gpuObject.getMatrixObject().acquireReadAndRelease()
@@ -150,12 +207,14 @@
 		LineageCacheEviction.addEntry(entry);
 		// manage space in gpu cache
 		updateSize(size, false);
-	}
+	}*/
 
 	public static void removeFromDeviceCache(LineageCacheEntry entry, String instName, boolean alreadyCopied) {
-		long size = entry.getGPUObject().getSizeOnDevice();
+		//long size = entry.getGPUObject().getSizeOnDevice();
+		long size = _gpuContext.getMemoryManager().getSizeAllocatedGPUPointer(entry.getGPUPointer());
 		LineageCache.removeEntry(entry._key);
 		updateSize(size, false);
+		GPUCacheEntries.remove(entry.getGPUPointer());
 	}
 
 }
\ No newline at end of file
diff --git a/src/test/java/org/apache/sysds/test/functions/lineage/GPULineageCacheEvictionTest.java b/src/test/java/org/apache/sysds/test/functions/lineage/GPULineageCacheEvictionTest.java
index 7536173..7cfbbd9 100644
--- a/src/test/java/org/apache/sysds/test/functions/lineage/GPULineageCacheEvictionTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/lineage/GPULineageCacheEvictionTest.java
@@ -81,7 +81,7 @@
 		// reset clears the lineage cache held memory from the last run
 		Lineage.resetInternalState();
 		boolean gpu2Mem = LineageCacheConfig.GPU2HOSTEVICTION;
-		LineageCacheConfig.GPU2HOSTEVICTION = true;
+		//LineageCacheConfig.GPU2HOSTEVICTION = true;
 		//run the test
 		runTest(true, EXCEPTION_NOT_EXPECTED, null, -1);
 		HashMap<MatrixValue.CellIndex, Double> R_orig = readDMLMatrixFromOutputDir("R");