[SYSTEMDS-2708] Performance buffer pool handling of frames w/ strings

This patch improves the performance of buffer pool handling for frames
with string columns. There are many calls to getInMemorySize() on the
path through the buffer pool. In case of string columns, these calls can
all values to determine the lengths of individual strings. We now reuse
previously computed size estimates within the frame block. Furthermore,
this patch also adds generalize the asynchronous file cleaning in the
buffer pool to now also accept serialization tasks (such as frames with
string columns). However, because asynchronous serialization provides
weaker guarantees this second modification is not enabled by default.

On an example mini-batch scenario (with preprocessing in the individual
iterations), this patch improved performance from 91s (86 right indexing
incl bufferpool release) to 9.2s (5.1s right indexing). With the
additional sync serialization, it further reduced to 7.9s (3.2s right
indexing).
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
index cb7e096..97f211a 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
@@ -85,9 +85,20 @@
 	public static final RPolicy CACHING_BUFFER_POLICY = RPolicy.FIFO;
 	public static final boolean CACHING_BUFFER_PAGECACHE = false;
 	public static final boolean CACHING_WRITE_CACHE_ON_READ = false;
-	public static final String  CACHING_COUNTER_GROUP_NAME    = "SystemDS Caching Counters";
+	public static final String  CACHING_COUNTER_GROUP_NAME = "SystemDS Caching Counters";
 	public static final String  CACHING_EVICTION_FILEEXTENSION = ".dat";
 	public static final boolean CACHING_ASYNC_FILECLEANUP = true;
+	public static final boolean CACHING_ASYNC_SERIALIZE = false;
+	
+	//NOTE CACHING_ASYNC_SERIALIZE:
+	// The serialization of matrices and frames (ultra-sparse matrices or 
+	// frames with strings) into buffer pool byte arrays happens outside the 
+	// critical region of the global lock in LazyWriteBuffer. However, it still
+	// requires thread-local serialization (before returning from release) in 
+	// order to guarantee that not too many objects are pinned at the same time 
+	// which would violate the memory budget. Therefore, the new asynchronous 
+	// serialization (see CACHING_ASYNC_SERIALIZE) should be understood as
+	// optimistic with weaker guarantees.
 	
 	/**
 	 * Defines all possible cache status types for a data blob.
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/LazyWriteBuffer.java b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/LazyWriteBuffer.java
index cf6bf0e..9ca079f 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/LazyWriteBuffer.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/LazyWriteBuffer.java
@@ -27,6 +27,7 @@
 import java.util.concurrent.Executors;
 
 import org.apache.sysds.api.DMLScript;
+import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
 import org.apache.sysds.runtime.util.LocalFileUtils;
 
@@ -47,8 +48,8 @@
 	//for (1) queue semantics and (2) constant time get/insert/delete operations)
 	private static EvictionQueue _mQueue;
 	
-	//file cleaner for synchronous or asynchronous delete of evicted files
-	private static FileCleaner _fClean;
+	//maintenance service for synchronous or asynchronous delete of evicted files
+	private static MaintenanceService _fClean;
 	
 	static {
 		//obtain the logical buffer size in bytes
@@ -100,7 +101,7 @@
 			}
 			
 			//serialize matrix (outside synchronized critical path)
-			bbuff.serializeBlock(cb);
+			_fClean.serializeData(bbuff, cb);
 			
 			if( DMLScript.STATISTICS ) {
 				CacheStatistics.incrementFSBuffWrites();
@@ -180,7 +181,7 @@
 
 	public static void init() {
 		_mQueue = new EvictionQueue();
-		_fClean = new FileCleaner();
+		_fClean = new MaintenanceService();
 		_size = 0;
 		if( CacheableData.CACHING_BUFFER_PAGECACHE )
 			PageCache.init();
@@ -309,18 +310,19 @@
 	}
 	
 	/**
-	 * File delete service for abstraction of synchronous and asynchronous
-	 * file cleanup on rmvar/cpvar. The threadpool for asynchronous cleanup
-	 * may increase the number of threads temporarily to the number of concurrent
-	 * delete tasks (which is bounded to the parfor degree of parallelism).
+	 * Maintenance service for abstraction of synchronous and asynchronous
+	 * file cleanup on rmvar/cpvar as well as serialization of matrices and
+	 * frames. The thread pool for asynchronous cleanup may increase the 
+	 * number of threads temporarily to the number of concurrent delete tasks
+	 * (which is bounded to the parfor degree of parallelism).
 	 */
-	private static class FileCleaner
+	private static class MaintenanceService
 	{
 		private ExecutorService _pool = null;
 		
-		public FileCleaner() {
+		public MaintenanceService() {
 			//create new threadpool for async cleanup
-			if( CacheableData.CACHING_ASYNC_FILECLEANUP )
+			if( isAsync() )
 				_pool = Executors.newCachedThreadPool();
 		}
 		
@@ -332,12 +334,32 @@
 				LocalFileUtils.deleteFileIfExists(fname, true);
 		}
 		
+		public void serializeData(ByteBuffer bbuff, CacheBlock cb) {
+			//sync or async file delete
+			if( CacheableData.CACHING_ASYNC_SERIALIZE )
+				_pool.submit(new DataSerializerTask(bbuff, cb));
+			else {
+				try {
+					bbuff.serializeBlock(cb);
+				}
+				catch(IOException ex) {
+					throw new DMLRuntimeException(ex);
+				}
+			}
+		}
+		
 		public void close() {
 			//execute pending tasks and shutdown pool
-			if( CacheableData.CACHING_ASYNC_FILECLEANUP )
+			if( isAsync() )
 				_pool.shutdown();
 		}
 		
+		@SuppressWarnings("unused")
+		public boolean isAsync() {
+			return CacheableData.CACHING_ASYNC_FILECLEANUP 
+				|| CacheableData.CACHING_ASYNC_SERIALIZE;
+		}
+		
 		private static class FileCleanerTask implements Runnable {
 			private String _fname = null;
 			
@@ -350,5 +372,25 @@
 				LocalFileUtils.deleteFileIfExists(_fname, true);
 			}
 		}
+		
+		private static class DataSerializerTask implements Runnable {
+			private ByteBuffer _bbuff = null;
+			private CacheBlock _cb = null;
+			
+			public DataSerializerTask(ByteBuffer bbuff, CacheBlock cb) {
+				_bbuff = bbuff;
+				_cb = cb;
+			}
+			
+			@Override
+			public void run() {
+				try {
+					_bbuff.serializeBlock(_cb);
+				}
+				catch(IOException ex) {
+					throw new DMLRuntimeException(ex);
+				}
+			}
+		}
 	}
 }
diff --git a/src/main/java/org/apache/sysds/runtime/io/IOUtilFunctions.java b/src/main/java/org/apache/sysds/runtime/io/IOUtilFunctions.java
index 75095b2..7b5e4d0 100644
--- a/src/main/java/org/apache/sysds/runtime/io/IOUtilFunctions.java
+++ b/src/main/java/org/apache/sysds/runtime/io/IOUtilFunctions.java
@@ -442,10 +442,10 @@
 		//size in modified UTF-8 as used by DataInput/DataOutput
 		int size = 2; //length in bytes
 		for (int i = 0; i < value.length(); i++) {
-            char c = value.charAt(i);
-            size += ( c>=0x0001 && c<=0x007F) ? 1 :
-            	(c >= 0x0800) ? 3 : 2;
-        }
+			char c = value.charAt(i);
+			size += ( c>=0x0001 && c<=0x007F) ? 1 :
+				(c >= 0x0800) ? 3 : 2;
+		}
 		return size;
 	}
 
diff --git a/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java b/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java
index 175d400..9c4284c 100644
--- a/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java
@@ -84,6 +84,9 @@
 	/** The data frame data as an ordered list of columns */
 	private Array[] _coldata = null;
 	
+	/** Cached size in memory to avoid repeated scans of string columns */
+	long _msize = -1;
+	
 	public FrameBlock() {
 		_numRows = 0;
 	}
@@ -273,6 +276,7 @@
 	 * @param numRows number of rows
 	 */
 	public void ensureAllocatedColumns(int numRows) {
+		_msize = -1;
 		//early abort if already allocated
 		if( _coldata != null && _schema.length == _coldata.length ) {
 			//handle special case that to few rows allocated
@@ -376,6 +380,7 @@
 	 */
 	public void set(int r, int c, Object val) {
 		_coldata[c].set(r, UtilFunctions.objectToObject(_schema[c], val));
+		_msize = -1;
 	}
 
 	public void reset(int nrow, boolean clearMeta) {
@@ -392,6 +397,7 @@
 			for( int i=0; i < _coldata.length; i++ )
 				_coldata[i].reset(nrow);
 		}
+		_msize = -1;
 	}
 
 	public void reset() {
@@ -440,7 +446,7 @@
 		_coldata = (_coldata==null) ? new Array[]{new StringArray(col)} :
 			(Array[]) ArrayUtils.add(_coldata, new StringArray(col));
 		_numRows = col.length;
-
+		_msize = -1;
 	}
 	
 	/**
@@ -458,6 +464,7 @@
 		_coldata = (_coldata==null) ? new Array[]{new BooleanArray(col)} :
 			(Array[]) ArrayUtils.add(_coldata, new BooleanArray(col));	
 		_numRows = col.length;
+		_msize = -1;
 	}
 	
 	/**
@@ -475,6 +482,7 @@
 		_coldata = (_coldata==null) ? new Array[]{new IntegerArray(col)} :
 			(Array[]) ArrayUtils.add(_coldata, new IntegerArray(col));
 		_numRows = col.length;
+		_msize = -1;
 	}
 	/**
 	 * Append a column of value type LONG as the last column of
@@ -491,6 +499,7 @@
 		_coldata = (_coldata==null) ? new Array[]{new LongArray(col)} :
 			(Array[]) ArrayUtils.add(_coldata, new LongArray(col));
 		_numRows = col.length;
+		_msize = -1;
 	}
 	
 	/**
@@ -508,6 +517,7 @@
 		_coldata = (_coldata==null) ? new Array[]{new FloatArray(col)} :
 				(Array[]) ArrayUtils.add(_coldata, new FloatArray(col));
 		_numRows = col.length;
+		_msize = -1;
 	}
 	/**
 	 * Append a column of value type DOUBLE as the last column of
@@ -524,6 +534,7 @@
 		_coldata = (_coldata==null) ? new Array[]{new DoubleArray(col)} :
 			(Array[]) ArrayUtils.add(_coldata, new DoubleArray(col));
 		_numRows = col.length;
+		_msize = -1;
 	}
 	
 	/**
@@ -545,6 +556,7 @@
 		_schema = empty ? tmpSchema : (ValueType[]) ArrayUtils.addAll(_schema, tmpSchema); 
 		_coldata = empty ? tmpData : (Array[]) ArrayUtils.addAll(_coldata, tmpData);
 		_numRows = cols[0].length;
+		_msize = -1;
 	}
 
 	public Object getColumnData(int c) {
@@ -564,12 +576,13 @@
 	public void setColumn(int c, Array column) {
 		if( _coldata == null )
 			_coldata = new Array[getNumColumns()];
-		_coldata[c] = column; 
+		_coldata[c] = column;
+		_msize = -1;
 	}
 	
 	/**
 	 * Get a row iterator over the frame where all fields are encoded
-	 * as strings independent of their value types.  
+	 * as strings independent of their value types.
 	 * 
 	 * @return string array iterator
 	 */
@@ -738,6 +751,7 @@
 					(mvvalue==null || mvvalue.isEmpty()) ? null : mvvalue);
 			_coldata[j] = arr;
 		}
+		_msize = -1;
 	}
 
 	@Override
@@ -757,6 +771,10 @@
 	
 	@Override
 	public long getInMemorySize() {
+		//reuse previously computed size
+		if( _msize > 0 )
+			return _msize;
+		
 		//frame block header
 		long size = 16 + 4; //object, num rows
 		
@@ -788,11 +806,11 @@
 					for( int i=0; i<_numRows; i++ )
 						size += getInMemoryStringSize(arr.get(i));
 					break;
-				default: //not applicable	
+				default: //not applicable
 			}
 		}
 		
-		return size;
+		return _msize = size;
 	}
 	
 	@Override
@@ -819,7 +837,7 @@
 					for( int i=0; i<_numRows; i++ )
 						size += IOUtilFunctions.getUTFSize(arr.get(i));
 					break;
-				default: //not applicable	
+				default: //not applicable
 			}
 		}
 		
@@ -965,7 +983,7 @@
 		//allocate output frame (incl deep copy schema)
 		if( ret == null )
 			ret = new FrameBlock();
-		ret._numRows = _numRows;								
+		ret._numRows = _numRows;
 		ret._schema = _schema.clone();
 		ret._colnames = (_colnames != null) ? _colnames.clone() : null;
 		ret._colmeta = _colmeta.clone();