Fix DeltaIteration Bug with Immutable Types

The CompactingHashTable did not obey the new serialization contract that
allows for immutable objects. We must always use the return value of the
deserialization methods and cannot assume that the data was put into the
reuse object.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java
index 8590b78..912cbb6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java
@@ -136,8 +136,9 @@
 		
 		while (this.running && probeSideInput.nextKey()) {
 			IT2 current = probeSideInput.getCurrent();
-			
-			if (prober.getMatchFor(current, buildSideRecord)) {
+
+			buildSideRecord = prober.getMatchFor(current, buildSideRecord);
+			if (buildSideRecord != null) {
 				siIter.set(buildSideRecord);
 				coGroupStub.coGroup(siIter, probeSideInput.getValues(), collector);
 			}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java
index b3c0ece..a6b747a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java
@@ -135,8 +135,9 @@
 		
 		while (this.running && probeSideInput.nextKey()) {
 			IT1 current = probeSideInput.getCurrent();
-			
-			if (prober.getMatchFor(current, buildSideRecord)) {
+
+			buildSideRecord = prober.getMatchFor(current, buildSideRecord);
+			if (buildSideRecord != null) {
 				siIter.set(buildSideRecord);
 				coGroupStub.coGroup(probeSideInput.getValues(), siIter, collector);
 			}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java
index 342f307..2735fd5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java
@@ -137,11 +137,8 @@
 			
 		final CompactingHashTable<IT1>.HashTableProber<IT2> prober = join.getProber(probeSideComparator, pairComparator);
 		while (this.running && ((probeSideRecord = probeSideInput.next(probeSideRecord)) != null)) {
-			if (prober.getMatchFor(probeSideRecord, buildSideRecord)) {
-				joinFunction.join(buildSideRecord, probeSideRecord, collector);
-			} else {
-				joinFunction.join(null, probeSideRecord, collector);
-			}
+			buildSideRecord = prober.getMatchFor(probeSideRecord, buildSideRecord);
+			joinFunction.join(buildSideRecord, probeSideRecord, collector);
 		}
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java
index c38a81a..2d834b7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java
@@ -137,11 +137,8 @@
 			
 		final CompactingHashTable<IT2>.HashTableProber<IT1> prober = join.getProber(probeSideComparator, pairComparator);
 		while (this.running && ((probeSideRecord = probeSideInput.next(probeSideRecord)) != null)) {
-			if (prober.getMatchFor(probeSideRecord, buildSideRecord)) {
-				joinFunction.join(probeSideRecord, buildSideRecord, collector);
-			} else {
-				joinFunction.join(probeSideRecord, null, collector);
-			}
+			buildSideRecord = prober.getMatchFor(probeSideRecord, buildSideRecord);
+			joinFunction.join(probeSideRecord, buildSideRecord, collector);
 		}
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/AbstractHashTableProber.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/AbstractHashTableProber.java
index 642f7fd..3ecd911 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/AbstractHashTableProber.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/AbstractHashTableProber.java
@@ -38,7 +38,7 @@
 		this.pairComparator = pairComparator;
 	}
 	
-	public abstract boolean getMatchFor(PT probeSideRecord, BT targetForMatch);
+	public abstract BT getMatchFor(PT probeSideRecord, BT targetForMatch);
 	
 	public abstract void updateMatch(BT record) throws IOException;
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
index 239786d..bfef3d5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
@@ -462,7 +462,7 @@
 					
 					// deserialize the key to check whether it is really equal, or whether we had only a hash collision
 					try {
-						partition.readRecordAt(pointer, tempHolder);
+						tempHolder = partition.readRecordAt(pointer, tempHolder);
 						if (this.buildSideComparator.equalToReference(tempHolder)) {
 							long newPointer = partition.appendRecord(record);
 							bucket.putLong(pointerOffset, newPointer);
@@ -1115,7 +1115,7 @@
 				while (true) {
 					while (numInSegment < countInSegment) {
 						pointer = segment.getLong(pointerOffset);
-						partition.readRecordAt(pointer, tempHolder);
+						tempHolder = partition.readRecordAt(pointer, tempHolder);
 						pointer = this.compactionMemory.appendRecord(tempHolder);
 						segment.putLong(pointerOffset, pointer);
 						pointerOffset += POINTER_LEN;
@@ -1267,7 +1267,7 @@
 					numInSegment++;
 					T target = table.buildSideSerializer.createInstance();
 					try {
-						partition.readRecordAt(pointer, target);
+						target = partition.readRecordAt(pointer, target);
 						cache.add(target);
 					} catch (IOException e) {
 							throw new RuntimeException("Error deserializing record from the Hash Table: " + e.getMessage(), e);
@@ -1311,9 +1311,9 @@
 			super(probeTypeComparator, pairComparator);
 		}
 		
-		public boolean getMatchFor(PT probeSideRecord, T targetForMatch) {
+		public T getMatchFor(PT probeSideRecord, T targetForMatch) {
 			if(closed.get()) {
-				return false;
+				return null;
 			}
 			final int searchHashCode = hash(this.probeTypeComparator.hash(probeSideRecord));
 			
@@ -1351,13 +1351,13 @@
 						
 						// deserialize the key to check whether it is really equal, or whether we had only a hash collision
 						try {
-							p.readRecordAt(pointer, targetForMatch);
+							targetForMatch = p.readRecordAt(pointer, targetForMatch);
 							
 							if (this.pairComparator.equalToReference(targetForMatch)) {
 								this.partition = p;
 								this.bucket = bucket;
 								this.pointerOffsetInBucket = pointerOffset;
-								return true;
+								return targetForMatch;
 							}
 						}
 						catch (IOException e) {
@@ -1372,7 +1372,7 @@
 				// this segment is done. check if there is another chained bucket
 				final long forwardPointer = bucket.getLong(bucketInSegmentOffset + HEADER_FORWARD_OFFSET);
 				if (forwardPointer == BUCKET_FORWARD_POINTER_NOT_SET) {
-					return false;
+					return null;
 				}
 				
 				final int overflowSegNum = (int) (forwardPointer >>> 32);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/InMemoryPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/InMemoryPartition.java
index b90ca1a..a46842f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/InMemoryPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/InMemoryPartition.java
@@ -231,9 +231,9 @@
 		}
 	}
 	
-	public void readRecordAt(long pointer, T record) throws IOException {
+	public T readRecordAt(long pointer, T reuse) throws IOException {
 		this.readView.setReadPosition(pointer);
-		this.serializer.deserialize(record, this.readView);
+		return this.serializer.deserialize(reuse, this.readView);
 	}
 	
 	/**
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTablePerformanceComparison.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTablePerformanceComparison.java
index 3996bf9..9bbf123 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTablePerformanceComparison.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTablePerformanceComparison.java
@@ -18,10 +18,6 @@
 
 package org.apache.flink.runtime.operators.hash;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
 import java.util.ArrayList;
 import java.util.List;
 
@@ -43,6 +39,8 @@
 import org.apache.flink.util.MutableObjectIterator;
 import org.junit.Test;
 
+import static org.junit.Assert.*;
+
 public class HashTablePerformanceComparison {
 		
 	private static final int PAGE_SIZE = 16 * 1024;
@@ -96,7 +94,7 @@
 			AbstractHashTableProber<IntPair, IntPair> prober = table.getProber(comparator, pairComparator);
 			IntPair temp = new IntPair();
 			while(probeTester.next(target) != null) {
-				assertTrue(prober.getMatchFor(target, temp));
+				assertNotNull(prober.getMatchFor(target, temp));
 				assertEquals(temp.getValue(), target.getValue());
 			}
 			end = System.currentTimeMillis();
@@ -114,7 +112,7 @@
 			System.out.println("Starting second probing run...");
 			start = System.currentTimeMillis();
 			while (updateTester.next(target) != null) {
-				assertTrue(prober.getMatchFor(target, temp));
+				assertNotNull(prober.getMatchFor(target, temp));
 				assertEquals(target.getValue(), temp.getValue());
 			}
 			end = System.currentTimeMillis();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MemoryHashTableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MemoryHashTableTest.java
index b644da1..ce9e469 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MemoryHashTableTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MemoryHashTableTest.java
@@ -18,12 +18,6 @@
 
 package org.apache.flink.runtime.operators.hash;
 
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
 import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.List;
@@ -54,6 +48,8 @@
 import org.junit.Test;
 import org.powermock.reflect.Whitebox;
 
+import static org.junit.Assert.*;
+
 
 public class MemoryHashTableTest {
 	
@@ -134,7 +130,7 @@
 			IntPair target = new IntPair();
 			
 			for (int i = 0; i < NUM_PAIRS; i++) {
-				assertTrue(prober.getMatchFor(pairs[i], target));
+				assertNotNull(prober.getMatchFor(pairs[i], target));
 				assertEquals(pairs[i].getValue(), target.getValue());
 			}
 			
@@ -197,8 +193,8 @@
 			
 			IntList target = new IntList();
 			for (int i = 0; i < NUM_LISTS; i++) {
-				assertTrue(pairProber.getMatchFor(pairs[i], target));
-				assertTrue(listProber.getMatchFor(lists[i], target));
+				assertNotNull(pairProber.getMatchFor(pairs[i], target));
+				assertNotNull(listProber.getMatchFor(lists[i], target));
 				assertArrayEquals(lists[i].getValue(), target.getValue());
 			}
 			table.close();
@@ -232,7 +228,7 @@
 			IntList target = new IntList();
 			
 			for (int i = 0; i < NUM_LISTS; i++) {
-				assertTrue(prober.getMatchFor(lists[i], target));
+				assertNotNull(prober.getMatchFor(lists[i], target));
 				assertArrayEquals(lists[i].getValue(), target.getValue());
 			}
 			
@@ -245,7 +241,7 @@
 			}
 			
 			for (int i = 0; i < NUM_LISTS; i++) {
-				assertTrue("" + i, prober.getMatchFor(overwriteLists[i], target));
+				assertNotNull("" + i, prober.getMatchFor(overwriteLists[i], target));
 				assertArrayEquals(overwriteLists[i].getValue(), target.getValue());
 			}
 			
@@ -275,7 +271,7 @@
 			IntList target = new IntList();
 			
 			for (int i = 0; i < NUM_LISTS; i++) {
-				assertTrue(prober.getMatchFor(lists[i], target));
+				assertNotNull(prober.getMatchFor(lists[i], target));
 				assertArrayEquals(lists[i].getValue(), target.getValue());
 			}
 			
@@ -291,7 +287,7 @@
 			}
 			
 			for (int i = 0; i < NUM_LISTS; i++) {
-				assertTrue("" + i, prober.getMatchFor(lists[i], target));
+				assertNotNull("" + i, prober.getMatchFor(lists[i], target));
 				assertArrayEquals(lists[i].getValue(), target.getValue());
 			}
 			
@@ -324,7 +320,7 @@
 			IntList target = new IntList();
 			
 			for (int i = 0; i < NUM_LISTS; i++) {
-				assertTrue(prober.getMatchFor(lists[i], target));
+				assertNotNull(prober.getMatchFor(lists[i], target));
 				assertArrayEquals(lists[i].getValue(), target.getValue());
 			}
 			
@@ -339,7 +335,7 @@
 			}
 			
 			for (int i = 0; i < NUM_LISTS; i++) {
-				assertTrue(prober.getMatchFor(lists[i], target));
+				assertNotNull(prober.getMatchFor(lists[i], target));
 				assertArrayEquals(lists[i].getValue(), target.getValue());
 			}
 			
@@ -374,7 +370,7 @@
 			IntList target = new IntList();
 			
 			for (int i = 0; i < NUM_LISTS; i++) {
-				assertTrue(prober.getMatchFor(lists[i], target));
+				assertNotNull(prober.getMatchFor(lists[i], target));
 				assertArrayEquals(lists[i].getValue(), target.getValue());
 			}
 			
@@ -389,7 +385,7 @@
 				}
 			
 				for (int i = 0; i < NUM_LISTS; i++) {
-					assertTrue("" + i, prober.getMatchFor(overwriteLists[i], target));
+					assertNotNull("" + i, prober.getMatchFor(overwriteLists[i], target));
 					assertArrayEquals(overwriteLists[i].getValue(), target.getValue());
 				}
 			}
@@ -426,13 +422,13 @@
 			IntList target = new IntList();
 			
 			for (int i = 0; i < NUM_LISTS; i++) {
-				assertTrue(""+i,prober.getMatchFor(lists[i], target));
+				assertNotNull(""+i,prober.getMatchFor(lists[i], target));
 				assertArrayEquals(lists[i].getValue(), target.getValue());
 				prober.updateMatch(overwriteLists[i]);
 			}
 			
 			for (int i = 0; i < NUM_LISTS; i++) {
-				assertTrue("" + i, prober.getMatchFor(overwriteLists[i], target));
+				assertNotNull("" + i, prober.getMatchFor(overwriteLists[i], target));
 				assertArrayEquals(overwriteLists[i].getValue(), target.getValue());
 			}
 			
@@ -462,7 +458,7 @@
 			IntPair target = new IntPair();
 			
 			for (int i = 0; i < NUM_PAIRS; i++) {
-				assertTrue(prober.getMatchFor(pairs[i], target));
+				assertNotNull(prober.getMatchFor(pairs[i], target));
 				assertEquals(pairs[i].getValue(), target.getValue());
 			}
 			
@@ -472,7 +468,7 @@
 			assertTrue(b.booleanValue());
 			
 			for (int i = 0; i < NUM_PAIRS; i++) {
-				assertTrue(pairs[i].getKey() + " " + pairs[i].getValue(), prober.getMatchFor(pairs[i], target));
+				assertNotNull(pairs[i].getKey() + " " + pairs[i].getValue(), prober.getMatchFor(pairs[i], target));
 				assertEquals(pairs[i].getValue(), target.getValue());
 			}
 			
@@ -502,7 +498,7 @@
 			IntPair target = new IntPair();
 			
 			for (int i = 0; i < NUM_PAIRS; i++) {
-				assertTrue(prober.getMatchFor(pairs[i], target));
+				assertNotNull(prober.getMatchFor(pairs[i], target));
 				assertEquals(pairs[i].getValue(), target.getValue());
 			}
 			
@@ -512,7 +508,7 @@
 			assertTrue(b.booleanValue());
 			
 			for (int i = 0; i < NUM_PAIRS; i++) {
-				assertTrue(pairs[i].getKey() + " " + pairs[i].getValue(), prober.getMatchFor(pairs[i], target));
+				assertNotNull(pairs[i].getKey() + " " + pairs[i].getValue(), prober.getMatchFor(pairs[i], target));
 				assertEquals(pairs[i].getValue(), target.getValue());
 			}
 			
@@ -522,7 +518,7 @@
 			assertTrue(b.booleanValue());
 						
 			for (int i = 0; i < NUM_PAIRS; i++) {
-				assertTrue(pairs[i].getKey() + " " + pairs[i].getValue(), prober.getMatchFor(pairs[i], target));
+				assertNotNull(pairs[i].getKey() + " " + pairs[i].getValue(), prober.getMatchFor(pairs[i], target));
 				assertEquals(pairs[i].getValue(), target.getValue());
 			}
 						
@@ -552,7 +548,7 @@
 			IntPair target = new IntPair();
 			
 			for (int i = 0; i < NUM_PAIRS; i++) {
-				assertTrue(prober.getMatchFor(pairs[i], target));
+				assertNotNull(prober.getMatchFor(pairs[i], target));
 				assertEquals(pairs[i].getValue(), target.getValue());
 			}
 			
@@ -562,7 +558,7 @@
 			assertTrue(b.booleanValue());
 			
 			for (int i = 0; i < NUM_PAIRS; i++) {
-				assertTrue(pairs[i].getKey() + " " + pairs[i].getValue(), prober.getMatchFor(pairs[i], target));
+				assertNotNull(pairs[i].getKey() + " " + pairs[i].getValue(), prober.getMatchFor(pairs[i], target));
 				assertEquals(pairs[i].getValue(), target.getValue());
 			}
 			
@@ -572,7 +568,7 @@
 			assertTrue(b.booleanValue());
 						
 			for (int i = 0; i < NUM_PAIRS; i++) {
-				assertTrue(pairs[i].getKey() + " " + pairs[i].getValue(), prober.getMatchFor(pairs[i], target));
+				assertNotNull(pairs[i].getKey() + " " + pairs[i].getValue(), prober.getMatchFor(pairs[i], target));
 				assertEquals(pairs[i].getValue(), target.getValue());
 			}
 			
@@ -582,7 +578,7 @@
 			assertTrue(b.booleanValue());
 									
 			for (int i = 0; i < NUM_PAIRS; i++) {
-				assertTrue(pairs[i].getKey() + " " + pairs[i].getValue(), prober.getMatchFor(pairs[i], target));
+				assertNotNull(pairs[i].getKey() + " " + pairs[i].getValue(), prober.getMatchFor(pairs[i], target));
 				assertEquals(pairs[i].getValue(), target.getValue());
 			}
 						
@@ -617,7 +613,7 @@
 			IntList target = new IntList();
 			
 			for (int i = 0; i < NUM_LISTS; i++) {
-				assertTrue(prober.getMatchFor(lists[i], target));
+				assertNotNull(prober.getMatchFor(lists[i], target));
 				assertArrayEquals(lists[i].getValue(), target.getValue());
 			}
 			
@@ -627,7 +623,7 @@
 			assertTrue(b.booleanValue());
 						
 			for (int i = 0; i < NUM_LISTS; i++) {
-				assertTrue(prober.getMatchFor(lists[i], target));
+				assertNotNull(prober.getMatchFor(lists[i], target));
 				assertArrayEquals(lists[i].getValue(), target.getValue());
 			}
 			
@@ -653,7 +649,7 @@
 			assertTrue(b.booleanValue());									
 			
 			for (int i = 0; i < NUM_LISTS; i++) {
-				assertTrue("" + i, prober.getMatchFor(overwriteLists[i], target));
+				assertNotNull("" + i, prober.getMatchFor(overwriteLists[i], target));
 				assertArrayEquals(overwriteLists[i].getValue(), target.getValue());
 			}
 			
@@ -689,7 +685,7 @@
 			AbstractHashTableProber<StringPair, StringPair> prober = table.getProber(comparatorS, pairComparatorS);
 			StringPair temp = new StringPair();
 			while(probeTester.next(target) != null) {
-				assertTrue("" + target.getKey(), prober.getMatchFor(target, temp));
+				assertNotNull("" + target.getKey(), prober.getMatchFor(target, temp));
 				assertEquals(temp.getValue(), target.getValue());
 			}
 			
@@ -699,7 +695,7 @@
 			}
 			
 			while (updateTester.next(target) != null) {
-				assertTrue(prober.getMatchFor(target, temp));
+				assertNotNull(prober.getMatchFor(target, temp));
 				assertEquals(target.getValue(), temp.getValue());
 			}