[SYSTEMDS-2784] Calculate checksums for Fed PUT requests

For lineage-based reuse, it is necessary to uniquely
identify each data item sent via PUT requests. Inspired
by Spark, this patch introduces Adler32 checksum calculation
for every CacheBlock, and materialize it in Federated request.
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedRequest.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedRequest.java
index 33dad44..05d7e57 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedRequest.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedRequest.java
@@ -19,14 +19,21 @@
 
 package org.apache.sysds.runtime.controlprogram.federated;
 
+import java.io.DataOutput;
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import java.util.stream.Collectors;
+import java.util.zip.Adler32;
+import java.util.zip.Checksum;
 
+import org.apache.sysds.api.DMLException;
 import org.apache.sysds.api.DMLScript;
-import org.apache.sysds.runtime.lineage.LineageItem;
+import org.apache.sysds.runtime.controlprogram.caching.CacheBlock;
+import org.apache.sysds.runtime.controlprogram.caching.CacheDataOutput;
+import org.apache.sysds.runtime.controlprogram.caching.LazyWriteBuffer;
+import org.apache.sysds.runtime.instructions.cp.ScalarObject;
 import org.apache.sysds.utils.Statistics;
 
 public class FederatedRequest implements Serializable {
@@ -47,7 +54,7 @@
 	private long _tid;
 	private List<Object> _data;
 	private boolean _checkPrivacy;
-	private List<Integer> _lineageHash;
+	private List<Long> _checksums;
 	
 	
 	public FederatedRequest(RequestType method) {
@@ -68,6 +75,8 @@
 		_id = id;
 		_data = data;
 		setCheckPrivacy();
+		if (DMLScript.LINEAGE)
+			setChecksum();
 	}
 	
 	public RequestType getType() {
@@ -120,14 +129,50 @@
 		return _checkPrivacy;
 	}
 	
-	public void setLineageHash(LineageItem[] liItems) {
-		// copy the hash of the corresponding lineage DAG
-		// TODO: copy both Adler32 checksum (on data) and hash (on lineage DAG)
-		_lineageHash = Arrays.stream(liItems).map(li -> li.hashCode()).collect(Collectors.toList());
+	public void setChecksum() {
+		// Calculate Adler32 checksum. This is used as a leaf node of Lineage DAGs
+		// in the workers, and helps to uniquely identify a node (tracing PUT)
+		// TODO: append lineageitem hash if checksum is not enough
+		_checksums = new ArrayList<>();
+		try {
+			calcChecksum();
+		}
+		catch (IOException e) {
+			throw new DMLException(e);
+		}
 	}
 	
-	public int getLineageHash(int i) {
-		return _lineageHash.get(i);
+	public long getChecksum(int i) {
+		return _checksums.get(i);
+	}
+	
+	private void calcChecksum() throws IOException {
+		for (Object ob : _data) {
+			if (!(ob instanceof CacheBlock) && !(ob instanceof ScalarObject))
+				continue;
+			
+			Checksum checksum = new Adler32();
+			if (ob instanceof ScalarObject) {
+				byte bytes[] = ((ScalarObject)ob).getStringValue().getBytes();
+				checksum.update(bytes, 0, bytes.length);
+				_checksums.add(checksum.getValue());
+			}
+			
+			if (ob instanceof CacheBlock) {
+				try {
+					CacheBlock cb = (CacheBlock)ob;
+					long cbsize = LazyWriteBuffer.getCacheBlockSize((CacheBlock)ob);
+					DataOutput dout = new CacheDataOutput(new byte[(int)cbsize]);
+					cb.write(dout);
+					byte bytes[] = ((CacheDataOutput) dout).getBytes();
+					checksum.update(bytes, 0, bytes.length);
+					_checksums.add(checksum.getValue());
+				}
+				catch(Exception ex) {
+					throw new IOException("Failed to serialize cache block.", ex);
+				}
+			}
+		}
 	}
 	
 	@Override
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
index 5c0a0bc..4a69a10 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
@@ -272,7 +272,7 @@
 		ec.setVariable(varname, data);
 		if (DMLScript.LINEAGE)
 			// TODO: Identify MO uniquely. Use Adler32 checksum.
-			ec.getLineage().set(varname, new LineageItem(String.valueOf(request.getLineageHash(0))));
+			ec.getLineage().set(varname, new LineageItem(String.valueOf(request.getChecksum(0))));
 
 		return new FederatedResponse(ResponseType.SUCCESS_EMPTY);
 	}
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/fed/AggregateBinaryFEDInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/fed/AggregateBinaryFEDInstruction.java
index 4a8194b..6ed642e 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/fed/AggregateBinaryFEDInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/fed/AggregateBinaryFEDInstruction.java
@@ -21,7 +21,6 @@
 
 import java.util.concurrent.Future;
 
-import org.apache.sysds.api.DMLScript;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
@@ -32,7 +31,6 @@
 import org.apache.sysds.runtime.controlprogram.federated.FederationUtils;
 import org.apache.sysds.runtime.instructions.InstructionUtils;
 import org.apache.sysds.runtime.instructions.cp.CPOperand;
-import org.apache.sysds.runtime.lineage.LineageItemUtils;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.matrix.operators.Operator;
 
@@ -80,10 +78,6 @@
 		else if(mo1.isFederated(FType.ROW)) { // MV + MM
 			//construct commands: broadcast rhs, fed mv, retrieve results
 			FederatedRequest fr1 = mo1.getFedMapping().broadcast(mo2);
-			if (DMLScript.LINEAGE)
-				//also copy the hash of the lineage DAG
-				fr1.setLineageHash(LineageItemUtils.getLineage(ec, input1));
-				//TODO: calculate Adler32 checksum on data, and move this code inside FederationMap.
 			FederatedRequest fr2 = FederationUtils.callInstruction(instString, output,
 				new CPOperand[]{input1, input2}, new long[]{mo1.getFedMapping().getID(), fr1.getID()});
 			if( mo2.getNumColumns() == 1 ) { //MV