[FLINK-2659] [runtime] Fix object reuse in UnionWithTempOperator

This closes #1130
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/UnionWithTempOperator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/UnionWithTempOperator.java
index d8437a9..098686c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/UnionWithTempOperator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/UnionWithTempOperator.java
@@ -60,15 +60,16 @@
 	public void run() throws Exception {
 		
 		final Collector<T> output = this.taskContext.getOutputCollector();
-		T record = this.taskContext.<T>getInputSerializer(STREAMED_INPUT).getSerializer().createInstance();
+		T reuse = this.taskContext.<T>getInputSerializer(STREAMED_INPUT).getSerializer().createInstance();
+		T record;
 		
 		final MutableObjectIterator<T> input = this.taskContext.getInput(STREAMED_INPUT);
-		while (this.running && ((record = input.next(record)) != null)) {
+		while (this.running && ((record = input.next(reuse)) != null)) {
 			output.collect(record);
 		}
 		
 		final MutableObjectIterator<T> cache = this.taskContext.getInput(CACHED_INPUT);
-		while (this.running && ((record = cache.next(record)) != null)) {
+		while (this.running && ((record = cache.next(reuse)) != null)) {
 			output.collect(record);
 		}
 	}