reformat the code in in r3269

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_asterix_stabilization@3270 123451ca-8445-de46-9d55-352943316053
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
index af95064..0c09757 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
@@ -44,249 +44,233 @@
 
 @SuppressWarnings({ "rawtypes", "unchecked" })
 public class ComputeUpdateFunctionFactory implements IUpdateFunctionFactory {
-	private static final long serialVersionUID = 1L;
-	private final IConfigurationFactory confFactory;
+    private static final long serialVersionUID = 1L;
+    private final IConfigurationFactory confFactory;
 
-	public ComputeUpdateFunctionFactory(IConfigurationFactory confFactory) {
-		this.confFactory = confFactory;
-	}
+    public ComputeUpdateFunctionFactory(IConfigurationFactory confFactory) {
+        this.confFactory = confFactory;
+    }
 
-	@Override
-	public IUpdateFunction createFunction() {
-		return new IUpdateFunction() {
-			// for writing intermediate data
-			private final ArrayTupleBuilder tbMsg = new ArrayTupleBuilder(2);
-			private final ArrayTupleBuilder tbAlive = new ArrayTupleBuilder(2);
-			private final ArrayTupleBuilder tbTerminate = new ArrayTupleBuilder(
-					1);
-			private final ArrayTupleBuilder tbGlobalAggregate = new ArrayTupleBuilder(
-					1);
-			private final ArrayTupleBuilder tbInsert = new ArrayTupleBuilder(2);
-			private final ArrayTupleBuilder tbDelete = new ArrayTupleBuilder(1);
+    @Override
+    public IUpdateFunction createFunction() {
+        return new IUpdateFunction() {
+            // for writing intermediate data
+            private final ArrayTupleBuilder tbMsg = new ArrayTupleBuilder(2);
+            private final ArrayTupleBuilder tbAlive = new ArrayTupleBuilder(2);
+            private final ArrayTupleBuilder tbTerminate = new ArrayTupleBuilder(1);
+            private final ArrayTupleBuilder tbGlobalAggregate = new ArrayTupleBuilder(1);
+            private final ArrayTupleBuilder tbInsert = new ArrayTupleBuilder(2);
+            private final ArrayTupleBuilder tbDelete = new ArrayTupleBuilder(1);
 
-			// for writing out to message channel
-			private IFrameWriter writerMsg;
-			private FrameTupleAppender appenderMsg;
-			private ByteBuffer bufferMsg;
+            // for writing out to message channel
+            private IFrameWriter writerMsg;
+            private FrameTupleAppender appenderMsg;
+            private ByteBuffer bufferMsg;
 
-			// for writing out to alive message channel
-			private IFrameWriter writerAlive;
-			private FrameTupleAppender appenderAlive;
-			private ByteBuffer bufferAlive;
-			private boolean pushAlive;
+            // for writing out to alive message channel
+            private IFrameWriter writerAlive;
+            private FrameTupleAppender appenderAlive;
+            private ByteBuffer bufferAlive;
+            private boolean pushAlive;
 
-			// for writing out termination detection control channel
-			private IFrameWriter writerTerminate;
-			private FrameTupleAppender appenderTerminate;
-			private ByteBuffer bufferTerminate;
-			private boolean terminate = true;
+            // for writing out termination detection control channel
+            private IFrameWriter writerTerminate;
+            private FrameTupleAppender appenderTerminate;
+            private ByteBuffer bufferTerminate;
+            private boolean terminate = true;
 
-			// for writing out termination detection control channel
-			private IFrameWriter writerGlobalAggregate;
-			private FrameTupleAppender appenderGlobalAggregate;
-			private ByteBuffer bufferGlobalAggregate;
-			private GlobalAggregator aggregator;
+            // for writing out termination detection control channel
+            private IFrameWriter writerGlobalAggregate;
+            private FrameTupleAppender appenderGlobalAggregate;
+            private ByteBuffer bufferGlobalAggregate;
+            private GlobalAggregator aggregator;
 
-			// for writing out to insert vertex channel
-			private IFrameWriter writerInsert;
-			private FrameTupleAppender appenderInsert;
-			private ByteBuffer bufferInsert;
+            // for writing out to insert vertex channel
+            private IFrameWriter writerInsert;
+            private FrameTupleAppender appenderInsert;
+            private ByteBuffer bufferInsert;
 
-			// for writing out to delete vertex channel
-			private IFrameWriter writerDelete;
-			private FrameTupleAppender appenderDelete;
-			private ByteBuffer bufferDelete;
+            // for writing out to delete vertex channel
+            private IFrameWriter writerDelete;
+            private FrameTupleAppender appenderDelete;
+            private ByteBuffer bufferDelete;
 
-			private Vertex vertex;
-			private ResetableByteArrayOutputStream bbos = new ResetableByteArrayOutputStream();
-			private DataOutput output = new DataOutputStream(bbos);
+            private Vertex vertex;
+            private ResetableByteArrayOutputStream bbos = new ResetableByteArrayOutputStream();
+            private DataOutput output = new DataOutputStream(bbos);
 
-			private ArrayIterator msgIterator = new ArrayIterator();
-			private final List<IFrameWriter> writers = new ArrayList<IFrameWriter>();
-			private final List<FrameTupleAppender> appenders = new ArrayList<FrameTupleAppender>();
-			private final List<ArrayTupleBuilder> tbs = new ArrayList<ArrayTupleBuilder>();
-			private Configuration conf;
-			private boolean dynamicStateLength;
+            private ArrayIterator msgIterator = new ArrayIterator();
+            private final List<IFrameWriter> writers = new ArrayList<IFrameWriter>();
+            private final List<FrameTupleAppender> appenders = new ArrayList<FrameTupleAppender>();
+            private final List<ArrayTupleBuilder> tbs = new ArrayList<ArrayTupleBuilder>();
+            private Configuration conf;
+            private boolean dynamicStateLength;
 
-			@Override
-			public void open(IHyracksTaskContext ctx, RecordDescriptor rd,
-					IFrameWriter... writers) throws HyracksDataException {
-				this.conf = confFactory.createConfiguration();
-				this.dynamicStateLength = BspUtils
-						.getDynamicVertexValueSize(conf);
-				this.aggregator = BspUtils.createGlobalAggregator(conf);
-				this.aggregator.init();
+            @Override
+            public void open(IHyracksTaskContext ctx, RecordDescriptor rd, IFrameWriter... writers)
+                    throws HyracksDataException {
+                this.conf = confFactory.createConfiguration();
+                this.dynamicStateLength = BspUtils.getDynamicVertexValueSize(conf);
+                this.aggregator = BspUtils.createGlobalAggregator(conf);
+                this.aggregator.init();
 
-				this.writerMsg = writers[0];
-				this.bufferMsg = ctx.allocateFrame();
-				this.appenderMsg = new FrameTupleAppender(ctx.getFrameSize());
-				this.appenderMsg.reset(bufferMsg, true);
-				this.writers.add(writerMsg);
-				this.appenders.add(appenderMsg);
+                this.writerMsg = writers[0];
+                this.bufferMsg = ctx.allocateFrame();
+                this.appenderMsg = new FrameTupleAppender(ctx.getFrameSize());
+                this.appenderMsg.reset(bufferMsg, true);
+                this.writers.add(writerMsg);
+                this.appenders.add(appenderMsg);
 
-				this.writerTerminate = writers[1];
-				this.bufferTerminate = ctx.allocateFrame();
-				this.appenderTerminate = new FrameTupleAppender(
-						ctx.getFrameSize());
-				this.appenderTerminate.reset(bufferTerminate, true);
+                this.writerTerminate = writers[1];
+                this.bufferTerminate = ctx.allocateFrame();
+                this.appenderTerminate = new FrameTupleAppender(ctx.getFrameSize());
+                this.appenderTerminate.reset(bufferTerminate, true);
 
-				this.writerGlobalAggregate = writers[2];
-				this.bufferGlobalAggregate = ctx.allocateFrame();
-				this.appenderGlobalAggregate = new FrameTupleAppender(
-						ctx.getFrameSize());
-				this.appenderGlobalAggregate.reset(bufferGlobalAggregate, true);
+                this.writerGlobalAggregate = writers[2];
+                this.bufferGlobalAggregate = ctx.allocateFrame();
+                this.appenderGlobalAggregate = new FrameTupleAppender(ctx.getFrameSize());
+                this.appenderGlobalAggregate.reset(bufferGlobalAggregate, true);
 
-				this.writerInsert = writers[3];
-				this.bufferInsert = ctx.allocateFrame();
-				this.appenderInsert = new FrameTupleAppender(ctx.getFrameSize());
-				this.appenderInsert.reset(bufferInsert, true);
-				this.writers.add(writerInsert);
-				this.appenders.add(appenderInsert);
+                this.writerInsert = writers[3];
+                this.bufferInsert = ctx.allocateFrame();
+                this.appenderInsert = new FrameTupleAppender(ctx.getFrameSize());
+                this.appenderInsert.reset(bufferInsert, true);
+                this.writers.add(writerInsert);
+                this.appenders.add(appenderInsert);
 
-				this.writerDelete = writers[4];
-				this.bufferDelete = ctx.allocateFrame();
-				this.appenderDelete = new FrameTupleAppender(ctx.getFrameSize());
-				this.appenderDelete.reset(bufferDelete, true);
-				this.writers.add(writerDelete);
-				this.appenders.add(appenderDelete);
+                this.writerDelete = writers[4];
+                this.bufferDelete = ctx.allocateFrame();
+                this.appenderDelete = new FrameTupleAppender(ctx.getFrameSize());
+                this.appenderDelete.reset(bufferDelete, true);
+                this.writers.add(writerDelete);
+                this.appenders.add(appenderDelete);
 
-				if (writers.length > 5) {
-					this.writerAlive = writers[5];
-					this.bufferAlive = ctx.allocateFrame();
-					this.appenderAlive = new FrameTupleAppender(
-							ctx.getFrameSize());
-					this.appenderAlive.reset(bufferAlive, true);
-					this.pushAlive = true;
-					this.writers.add(writerAlive);
-					this.appenders.add(appenderAlive);
-				}
+                if (writers.length > 5) {
+                    this.writerAlive = writers[5];
+                    this.bufferAlive = ctx.allocateFrame();
+                    this.appenderAlive = new FrameTupleAppender(ctx.getFrameSize());
+                    this.appenderAlive.reset(bufferAlive, true);
+                    this.pushAlive = true;
+                    this.writers.add(writerAlive);
+                    this.appenders.add(appenderAlive);
+                }
 
-				tbs.add(tbMsg);
-				tbs.add(tbInsert);
-				tbs.add(tbDelete);
-				tbs.add(tbAlive);
-			}
+                tbs.add(tbMsg);
+                tbs.add(tbInsert);
+                tbs.add(tbDelete);
+                tbs.add(tbAlive);
+            }
 
-			@Override
-			public void process(Object[] tuple) throws HyracksDataException {
-				// vertex Id, msg content List, vertex Id, vertex
-				tbMsg.reset();
-				tbAlive.reset();
+            @Override
+            public void process(Object[] tuple) throws HyracksDataException {
+                // vertex Id, msg content List, vertex Id, vertex
+                tbMsg.reset();
+                tbAlive.reset();
 
-				vertex = (Vertex) tuple[3];
-				vertex.setOutputWriters(writers);
-				vertex.setOutputAppenders(appenders);
-				vertex.setOutputTupleBuilders(tbs);
+                vertex = (Vertex) tuple[3];
+                vertex.setOutputWriters(writers);
+                vertex.setOutputAppenders(appenders);
+                vertex.setOutputTupleBuilders(tbs);
 
-				ArrayListWritable msgContentList = (ArrayListWritable) tuple[1];
-				msgContentList.reset(msgIterator);
+                ArrayListWritable msgContentList = (ArrayListWritable) tuple[1];
+                msgContentList.reset(msgIterator);
 
-				if (!msgIterator.hasNext() && vertex.isHalted()) {
-					return;
-				}
-				if (vertex.isHalted()) {
-					vertex.activate();
-				}
+                if (!msgIterator.hasNext() && vertex.isHalted()) {
+                    return;
+                }
+                if (vertex.isHalted()) {
+                    vertex.activate();
+                }
 
-				try {
-					vertex.compute(msgIterator);
-					vertex.finishCompute();
-				} catch (IOException e) {
-					throw new HyracksDataException(e);
-				}
+                try {
+                    vertex.compute(msgIterator);
+                    vertex.finishCompute();
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
 
-				/**
-				 * this partition should not terminate
-				 */
-				if (terminate
-						&& (!vertex.isHalted() || vertex.hasMessage() || vertex
-								.createdNewLiveVertex()))
-					terminate = false;
+                /**
+                 * this partition should not terminate
+                 */
+                if (terminate && (!vertex.isHalted() || vertex.hasMessage() || vertex.createdNewLiveVertex()))
+                    terminate = false;
 
-				aggregator.step(vertex);
-			}
+                aggregator.step(vertex);
+            }
 
-			@Override
-			public void close() throws HyracksDataException {
-				FrameTupleUtils.flushTuplesFinal(appenderMsg, writerMsg);
-				FrameTupleUtils.flushTuplesFinal(appenderInsert, writerInsert);
-				FrameTupleUtils.flushTuplesFinal(appenderDelete, writerDelete);
+            @Override
+            public void close() throws HyracksDataException {
+                FrameTupleUtils.flushTuplesFinal(appenderMsg, writerMsg);
+                FrameTupleUtils.flushTuplesFinal(appenderInsert, writerInsert);
+                FrameTupleUtils.flushTuplesFinal(appenderDelete, writerDelete);
 
-				if (pushAlive)
-					FrameTupleUtils
-							.flushTuplesFinal(appenderAlive, writerAlive);
-				if (!terminate) {
-					writeOutTerminationState();
-				}
+                if (pushAlive)
+                    FrameTupleUtils.flushTuplesFinal(appenderAlive, writerAlive);
+                if (!terminate) {
+                    writeOutTerminationState();
+                }
 
-				/** write out global aggregate value */
-				writeOutGlobalAggregate();
-			}
+                /** write out global aggregate value */
+                writeOutGlobalAggregate();
+            }
 
-			private void writeOutGlobalAggregate() throws HyracksDataException {
-				try {
-					/**
-					 * get partial aggregate result and flush to the final
-					 * aggregator
-					 */
-					Writable agg = aggregator.finishPartial();
-					agg.write(tbGlobalAggregate.getDataOutput());
-					tbGlobalAggregate.addFieldEndOffset();
-					appenderGlobalAggregate.append(
-							tbGlobalAggregate.getFieldEndOffsets(),
-							tbGlobalAggregate.getByteArray(), 0,
-							tbGlobalAggregate.getSize());
-					FrameTupleUtils.flushTuplesFinal(appenderGlobalAggregate,
-							writerGlobalAggregate);
-				} catch (IOException e) {
-					throw new HyracksDataException(e);
-				}
-			}
+            private void writeOutGlobalAggregate() throws HyracksDataException {
+                try {
+                    /**
+                     * get partial aggregate result and flush to the final
+                     * aggregator
+                     */
+                    Writable agg = aggregator.finishPartial();
+                    agg.write(tbGlobalAggregate.getDataOutput());
+                    tbGlobalAggregate.addFieldEndOffset();
+                    appenderGlobalAggregate.append(tbGlobalAggregate.getFieldEndOffsets(),
+                            tbGlobalAggregate.getByteArray(), 0, tbGlobalAggregate.getSize());
+                    FrameTupleUtils.flushTuplesFinal(appenderGlobalAggregate, writerGlobalAggregate);
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
 
-			private void writeOutTerminationState() throws HyracksDataException {
-				try {
-					tbTerminate.getDataOutput().writeLong(0);
-					tbTerminate.addFieldEndOffset();
-					appenderTerminate.append(tbTerminate.getFieldEndOffsets(),
-							tbTerminate.getByteArray(), 0,
-							tbTerminate.getSize());
-					FrameTupleUtils.flushTuplesFinal(appenderTerminate,
-							writerTerminate);
-				} catch (IOException e) {
-					throw new HyracksDataException(e);
-				}
-			}
+            private void writeOutTerminationState() throws HyracksDataException {
+                try {
+                    tbTerminate.getDataOutput().writeLong(0);
+                    tbTerminate.addFieldEndOffset();
+                    appenderTerminate.append(tbTerminate.getFieldEndOffsets(), tbTerminate.getByteArray(), 0,
+                            tbTerminate.getSize());
+                    FrameTupleUtils.flushTuplesFinal(appenderTerminate, writerTerminate);
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
 
-			@Override
-			public void update(ITupleReference tupleRef,
-					ArrayTupleBuilder cloneUpdateTb)
-					throws HyracksDataException {
-				try {
-					if (vertex != null && vertex.hasUpdate()) {
-						if (!dynamicStateLength) {
-							// in-place update
-							int fieldCount = tupleRef.getFieldCount();
-							for (int i = 1; i < fieldCount; i++) {
-								byte[] data = tupleRef.getFieldData(i);
-								int offset = tupleRef.getFieldStart(i);
-								bbos.setByteArray(data, offset);
-								vertex.write(output);
-							}
-						} else {
-							// write the vertex id
-							DataOutput tbOutput = cloneUpdateTb.getDataOutput();
-							vertex.getVertexId().write(tbOutput);
-							cloneUpdateTb.addFieldEndOffset();
+            @Override
+            public void update(ITupleReference tupleRef, ArrayTupleBuilder cloneUpdateTb) throws HyracksDataException {
+                try {
+                    if (vertex != null && vertex.hasUpdate()) {
+                        if (!dynamicStateLength) {
+                            // in-place update
+                            int fieldCount = tupleRef.getFieldCount();
+                            for (int i = 1; i < fieldCount; i++) {
+                                byte[] data = tupleRef.getFieldData(i);
+                                int offset = tupleRef.getFieldStart(i);
+                                bbos.setByteArray(data, offset);
+                                vertex.write(output);
+                            }
+                        } else {
+                            // write the vertex id
+                            DataOutput tbOutput = cloneUpdateTb.getDataOutput();
+                            vertex.getVertexId().write(tbOutput);
+                            cloneUpdateTb.addFieldEndOffset();
 
-							// write the vertex value
-							vertex.write(tbOutput);
-							cloneUpdateTb.addFieldEndOffset();
-						}
-					}
-				} catch (IOException e) {
-					throw new HyracksDataException(e);
-				}
-			}
-		};
-	}
+                            // write the vertex value
+                            vertex.write(tbOutput);
+                            cloneUpdateTb.addFieldEndOffset();
+                        }
+                    }
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+        };
+    }
 }
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
index a241c9c..1bf6a2b 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
@@ -43,255 +43,238 @@
 import edu.uci.ics.pregelix.dataflow.util.ResetableByteArrayOutputStream;
 
 @SuppressWarnings({ "rawtypes", "unchecked" })
-public class StartComputeUpdateFunctionFactory implements
-		IUpdateFunctionFactory {
-	private static final long serialVersionUID = 1L;
-	private final IConfigurationFactory confFactory;
+public class StartComputeUpdateFunctionFactory implements IUpdateFunctionFactory {
+    private static final long serialVersionUID = 1L;
+    private final IConfigurationFactory confFactory;
 
-	public StartComputeUpdateFunctionFactory(IConfigurationFactory confFactory) {
-		this.confFactory = confFactory;
-	}
+    public StartComputeUpdateFunctionFactory(IConfigurationFactory confFactory) {
+        this.confFactory = confFactory;
+    }
 
-	@Override
-	public IUpdateFunction createFunction() {
-		return new IUpdateFunction() {
-			// for writing intermediate data
-			private final ArrayTupleBuilder tbMsg = new ArrayTupleBuilder(2);
-			private final ArrayTupleBuilder tbAlive = new ArrayTupleBuilder(2);
-			private final ArrayTupleBuilder tbTerminate = new ArrayTupleBuilder(
-					1);
-			private final ArrayTupleBuilder tbGlobalAggregate = new ArrayTupleBuilder(
-					1);
-			private final ArrayTupleBuilder tbInsert = new ArrayTupleBuilder(2);
-			private final ArrayTupleBuilder tbDelete = new ArrayTupleBuilder(1);
+    @Override
+    public IUpdateFunction createFunction() {
+        return new IUpdateFunction() {
+            // for writing intermediate data
+            private final ArrayTupleBuilder tbMsg = new ArrayTupleBuilder(2);
+            private final ArrayTupleBuilder tbAlive = new ArrayTupleBuilder(2);
+            private final ArrayTupleBuilder tbTerminate = new ArrayTupleBuilder(1);
+            private final ArrayTupleBuilder tbGlobalAggregate = new ArrayTupleBuilder(1);
+            private final ArrayTupleBuilder tbInsert = new ArrayTupleBuilder(2);
+            private final ArrayTupleBuilder tbDelete = new ArrayTupleBuilder(1);
 
-			// for writing out to message channel
-			private IFrameWriter writerMsg;
-			private FrameTupleAppender appenderMsg;
-			private ByteBuffer bufferMsg;
+            // for writing out to message channel
+            private IFrameWriter writerMsg;
+            private FrameTupleAppender appenderMsg;
+            private ByteBuffer bufferMsg;
 
-			// for writing out to alive message channel
-			private IFrameWriter writerAlive;
-			private FrameTupleAppender appenderAlive;
-			private ByteBuffer bufferAlive;
-			private boolean pushAlive;
+            // for writing out to alive message channel
+            private IFrameWriter writerAlive;
+            private FrameTupleAppender appenderAlive;
+            private ByteBuffer bufferAlive;
+            private boolean pushAlive;
 
-			// for writing out termination detection control channel
-			private IFrameWriter writerGlobalAggregate;
-			private FrameTupleAppender appenderGlobalAggregate;
-			private ByteBuffer bufferGlobalAggregate;
-			private GlobalAggregator aggregator;
+            // for writing out termination detection control channel
+            private IFrameWriter writerGlobalAggregate;
+            private FrameTupleAppender appenderGlobalAggregate;
+            private ByteBuffer bufferGlobalAggregate;
+            private GlobalAggregator aggregator;
 
-			// for writing out the global aggregate
-			private IFrameWriter writerTerminate;
-			private FrameTupleAppender appenderTerminate;
-			private ByteBuffer bufferTerminate;
-			private boolean terminate = true;
+            // for writing out the global aggregate
+            private IFrameWriter writerTerminate;
+            private FrameTupleAppender appenderTerminate;
+            private ByteBuffer bufferTerminate;
+            private boolean terminate = true;
 
-			// for writing out to insert vertex channel
-			private IFrameWriter writerInsert;
-			private FrameTupleAppender appenderInsert;
-			private ByteBuffer bufferInsert;
+            // for writing out to insert vertex channel
+            private IFrameWriter writerInsert;
+            private FrameTupleAppender appenderInsert;
+            private ByteBuffer bufferInsert;
 
-			// for writing out to delete vertex channel
-			private IFrameWriter writerDelete;
-			private FrameTupleAppender appenderDelete;
-			private ByteBuffer bufferDelete;
+            // for writing out to delete vertex channel
+            private IFrameWriter writerDelete;
+            private FrameTupleAppender appenderDelete;
+            private ByteBuffer bufferDelete;
 
-			// dummy empty msgList
-			private MsgList msgList = new MsgList();
-			private ArrayIterator msgIterator = new ArrayIterator();
+            // dummy empty msgList
+            private MsgList msgList = new MsgList();
+            private ArrayIterator msgIterator = new ArrayIterator();
 
-			private Vertex vertex;
-			private ResetableByteArrayOutputStream bbos = new ResetableByteArrayOutputStream();
-			private DataOutput output = new DataOutputStream(bbos);
+            private Vertex vertex;
+            private ResetableByteArrayOutputStream bbos = new ResetableByteArrayOutputStream();
+            private DataOutput output = new DataOutputStream(bbos);
 
-			private final List<IFrameWriter> writers = new ArrayList<IFrameWriter>();
-			private final List<FrameTupleAppender> appenders = new ArrayList<FrameTupleAppender>();
-			private final List<ArrayTupleBuilder> tbs = new ArrayList<ArrayTupleBuilder>();
-			private Configuration conf;
-			private boolean dynamicStateLength;
+            private final List<IFrameWriter> writers = new ArrayList<IFrameWriter>();
+            private final List<FrameTupleAppender> appenders = new ArrayList<FrameTupleAppender>();
+            private final List<ArrayTupleBuilder> tbs = new ArrayList<ArrayTupleBuilder>();
+            private Configuration conf;
+            private boolean dynamicStateLength;
 
-			@Override
-			public void open(IHyracksTaskContext ctx, RecordDescriptor rd,
-					IFrameWriter... writers) throws HyracksDataException {
-				this.conf = confFactory.createConfiguration();
-				this.dynamicStateLength = BspUtils
-						.getDynamicVertexValueSize(conf);
-				this.aggregator = BspUtils.createGlobalAggregator(conf);
-				this.aggregator.init();
+            @Override
+            public void open(IHyracksTaskContext ctx, RecordDescriptor rd, IFrameWriter... writers)
+                    throws HyracksDataException {
+                this.conf = confFactory.createConfiguration();
+                this.dynamicStateLength = BspUtils.getDynamicVertexValueSize(conf);
+                this.aggregator = BspUtils.createGlobalAggregator(conf);
+                this.aggregator.init();
 
-				this.writerMsg = writers[0];
-				this.bufferMsg = ctx.allocateFrame();
-				this.appenderMsg = new FrameTupleAppender(ctx.getFrameSize());
-				this.appenderMsg.reset(bufferMsg, true);
-				this.writers.add(writerMsg);
-				this.appenders.add(appenderMsg);
+                this.writerMsg = writers[0];
+                this.bufferMsg = ctx.allocateFrame();
+                this.appenderMsg = new FrameTupleAppender(ctx.getFrameSize());
+                this.appenderMsg.reset(bufferMsg, true);
+                this.writers.add(writerMsg);
+                this.appenders.add(appenderMsg);
 
-				this.writerTerminate = writers[1];
-				this.bufferTerminate = ctx.allocateFrame();
-				this.appenderTerminate = new FrameTupleAppender(
-						ctx.getFrameSize());
-				this.appenderTerminate.reset(bufferTerminate, true);
+                this.writerTerminate = writers[1];
+                this.bufferTerminate = ctx.allocateFrame();
+                this.appenderTerminate = new FrameTupleAppender(ctx.getFrameSize());
+                this.appenderTerminate.reset(bufferTerminate, true);
 
-				this.writerGlobalAggregate = writers[2];
-				this.bufferGlobalAggregate = ctx.allocateFrame();
-				this.appenderGlobalAggregate = new FrameTupleAppender(
-						ctx.getFrameSize());
-				this.appenderGlobalAggregate.reset(bufferGlobalAggregate, true);
+                this.writerGlobalAggregate = writers[2];
+                this.bufferGlobalAggregate = ctx.allocateFrame();
+                this.appenderGlobalAggregate = new FrameTupleAppender(ctx.getFrameSize());
+                this.appenderGlobalAggregate.reset(bufferGlobalAggregate, true);
 
-				this.writerInsert = writers[3];
-				this.bufferInsert = ctx.allocateFrame();
-				this.appenderInsert = new FrameTupleAppender(ctx.getFrameSize());
-				this.appenderInsert.reset(bufferInsert, true);
-				this.writers.add(writerInsert);
-				this.appenders.add(appenderInsert);
+                this.writerInsert = writers[3];
+                this.bufferInsert = ctx.allocateFrame();
+                this.appenderInsert = new FrameTupleAppender(ctx.getFrameSize());
+                this.appenderInsert.reset(bufferInsert, true);
+                this.writers.add(writerInsert);
+                this.appenders.add(appenderInsert);
 
-				this.writerDelete = writers[4];
-				this.bufferDelete = ctx.allocateFrame();
-				this.appenderDelete = new FrameTupleAppender(ctx.getFrameSize());
-				this.appenderDelete.reset(bufferDelete, true);
-				this.writers.add(writerDelete);
-				this.appenders.add(appenderDelete);
+                this.writerDelete = writers[4];
+                this.bufferDelete = ctx.allocateFrame();
+                this.appenderDelete = new FrameTupleAppender(ctx.getFrameSize());
+                this.appenderDelete.reset(bufferDelete, true);
+                this.writers.add(writerDelete);
+                this.appenders.add(appenderDelete);
 
-				if (writers.length > 5) {
-					this.writerAlive = writers[5];
-					this.bufferAlive = ctx.allocateFrame();
-					this.appenderAlive = new FrameTupleAppender(
-							ctx.getFrameSize());
-					this.appenderAlive.reset(bufferAlive, true);
-					this.pushAlive = true;
-					this.writers.add(writerAlive);
-					this.appenders.add(appenderAlive);
-				}
-				msgList.reset(msgIterator);
+                if (writers.length > 5) {
+                    this.writerAlive = writers[5];
+                    this.bufferAlive = ctx.allocateFrame();
+                    this.appenderAlive = new FrameTupleAppender(ctx.getFrameSize());
+                    this.appenderAlive.reset(bufferAlive, true);
+                    this.pushAlive = true;
+                    this.writers.add(writerAlive);
+                    this.appenders.add(appenderAlive);
+                }
+                msgList.reset(msgIterator);
 
-				tbs.add(tbMsg);
-				tbs.add(tbInsert);
-				tbs.add(tbDelete);
-				tbs.add(tbAlive);
-			}
+                tbs.add(tbMsg);
+                tbs.add(tbInsert);
+                tbs.add(tbDelete);
+                tbs.add(tbAlive);
+            }
 
-			@Override
-			public void process(Object[] tuple) throws HyracksDataException {
-				// vertex Id, vertex
-				tbMsg.reset();
-				tbAlive.reset();
+            @Override
+            public void process(Object[] tuple) throws HyracksDataException {
+                // vertex Id, vertex
+                tbMsg.reset();
+                tbAlive.reset();
 
-				vertex = (Vertex) tuple[1];
-				vertex.setOutputWriters(writers);
-				vertex.setOutputAppenders(appenders);
-				vertex.setOutputTupleBuilders(tbs);
+                vertex = (Vertex) tuple[1];
+                vertex.setOutputWriters(writers);
+                vertex.setOutputAppenders(appenders);
+                vertex.setOutputTupleBuilders(tbs);
 
-				if (!msgIterator.hasNext() && vertex.isHalted()) {
-					return;
-				}
-				if (vertex.isHalted()) {
-					vertex.activate();
-				}
+                if (!msgIterator.hasNext() && vertex.isHalted()) {
+                    return;
+                }
+                if (vertex.isHalted()) {
+                    vertex.activate();
+                }
 
-				try {
-					vertex.compute(msgIterator);
-					vertex.finishCompute();
-				} catch (IOException e) {
-					throw new HyracksDataException(e);
-				}
+                try {
+                    vertex.compute(msgIterator);
+                    vertex.finishCompute();
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
 
-				/**
-				 * this partition should not terminate
-				 */
-				if (terminate
-						&& (!vertex.isHalted() || vertex.hasMessage() || vertex
-								.createdNewLiveVertex()))
-					terminate = false;
+                /**
+                 * this partition should not terminate
+                 */
+                if (terminate && (!vertex.isHalted() || vertex.hasMessage() || vertex.createdNewLiveVertex()))
+                    terminate = false;
 
-				/**
-				 * call the global aggregator
-				 */
-				aggregator.step(vertex);
-			}
+                /**
+                 * call the global aggregator
+                 */
+                aggregator.step(vertex);
+            }
 
-			@Override
-			public void close() throws HyracksDataException {
-				FrameTupleUtils.flushTuplesFinal(appenderMsg, writerMsg);
-				FrameTupleUtils.flushTuplesFinal(appenderInsert, writerInsert);
-				FrameTupleUtils.flushTuplesFinal(appenderDelete, writerDelete);
+            @Override
+            public void close() throws HyracksDataException {
+                FrameTupleUtils.flushTuplesFinal(appenderMsg, writerMsg);
+                FrameTupleUtils.flushTuplesFinal(appenderInsert, writerInsert);
+                FrameTupleUtils.flushTuplesFinal(appenderDelete, writerDelete);
 
-				if (pushAlive)
-					FrameTupleUtils
-							.flushTuplesFinal(appenderAlive, writerAlive);
-				if (!terminate) {
-					writeOutTerminationState();
-				}
+                if (pushAlive)
+                    FrameTupleUtils.flushTuplesFinal(appenderAlive, writerAlive);
+                if (!terminate) {
+                    writeOutTerminationState();
+                }
 
-				/** write out global aggregate value */
-				writeOutGlobalAggregate();
-			}
+                /** write out global aggregate value */
+                writeOutGlobalAggregate();
+            }
 
-			private void writeOutGlobalAggregate() throws HyracksDataException {
-				try {
-					/**
-					 * get partial aggregate result and flush to the final
-					 * aggregator
-					 */
-					Writable agg = aggregator.finishPartial();
-					agg.write(tbGlobalAggregate.getDataOutput());
-					tbGlobalAggregate.addFieldEndOffset();
-					appenderGlobalAggregate.append(
-							tbGlobalAggregate.getFieldEndOffsets(),
-							tbGlobalAggregate.getByteArray(), 0,
-							tbGlobalAggregate.getSize());
-					FrameTupleUtils.flushTuplesFinal(appenderGlobalAggregate,
-							writerGlobalAggregate);
-				} catch (IOException e) {
-					throw new HyracksDataException(e);
-				}
-			}
+            private void writeOutGlobalAggregate() throws HyracksDataException {
+                try {
+                    /**
+                     * get partial aggregate result and flush to the final
+                     * aggregator
+                     */
+                    Writable agg = aggregator.finishPartial();
+                    agg.write(tbGlobalAggregate.getDataOutput());
+                    tbGlobalAggregate.addFieldEndOffset();
+                    appenderGlobalAggregate.append(tbGlobalAggregate.getFieldEndOffsets(),
+                            tbGlobalAggregate.getByteArray(), 0, tbGlobalAggregate.getSize());
+                    FrameTupleUtils.flushTuplesFinal(appenderGlobalAggregate, writerGlobalAggregate);
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
 
-			private void writeOutTerminationState() throws HyracksDataException {
-				try {
-					tbTerminate.getDataOutput().writeLong(0);
-					tbTerminate.addFieldEndOffset();
-					appenderTerminate.append(tbTerminate.getFieldEndOffsets(),
-							tbTerminate.getByteArray(), 0,
-							tbTerminate.getSize());
-					FrameTupleUtils.flushTuplesFinal(appenderTerminate,
-							writerTerminate);
-				} catch (IOException e) {
-					throw new HyracksDataException(e);
-				}
-			}
+            private void writeOutTerminationState() throws HyracksDataException {
+                try {
+                    tbTerminate.getDataOutput().writeLong(0);
+                    tbTerminate.addFieldEndOffset();
+                    appenderTerminate.append(tbTerminate.getFieldEndOffsets(), tbTerminate.getByteArray(), 0,
+                            tbTerminate.getSize());
+                    FrameTupleUtils.flushTuplesFinal(appenderTerminate, writerTerminate);
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
 
-			@Override
-			public void update(ITupleReference tupleRef,
-					ArrayTupleBuilder cloneUpdateTb)
-					throws HyracksDataException {
-				try {
-					if (vertex != null && vertex.hasUpdate()) {
-						if (!dynamicStateLength) {
-							// in-place update
-							int fieldCount = tupleRef.getFieldCount();
-							for (int i = 1; i < fieldCount; i++) {
-								byte[] data = tupleRef.getFieldData(i);
-								int offset = tupleRef.getFieldStart(i);
-								bbos.setByteArray(data, offset);
-								vertex.write(output);
-							}
-						} else {
-							// write the vertex id
-							DataOutput tbOutput = cloneUpdateTb.getDataOutput();
-							vertex.getVertexId().write(tbOutput);
-							cloneUpdateTb.addFieldEndOffset();
+            @Override
+            public void update(ITupleReference tupleRef, ArrayTupleBuilder cloneUpdateTb) throws HyracksDataException {
+                try {
+                    if (vertex != null && vertex.hasUpdate()) {
+                        if (!dynamicStateLength) {
+                            // in-place update
+                            int fieldCount = tupleRef.getFieldCount();
+                            for (int i = 1; i < fieldCount; i++) {
+                                byte[] data = tupleRef.getFieldData(i);
+                                int offset = tupleRef.getFieldStart(i);
+                                bbos.setByteArray(data, offset);
+                                vertex.write(output);
+                            }
+                        } else {
+                            // write the vertex id
+                            DataOutput tbOutput = cloneUpdateTb.getDataOutput();
+                            vertex.getVertexId().write(tbOutput);
+                            cloneUpdateTb.addFieldEndOffset();
 
-							// write the vertex value
-							vertex.write(tbOutput);
-							cloneUpdateTb.addFieldEndOffset();
-						}
-					}
-				} catch (IOException e) {
-					throw new HyracksDataException(e);
-				}
-			}
-		};
-	}
+                            // write the vertex value
+                            vertex.write(tbOutput);
+                            cloneUpdateTb.addFieldEndOffset();
+                        }
+                    }
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+        };
+    }
 }