HAMA-982: Vertex.read/writeState() method throws NullPointerException
diff --git a/CHANGES.txt b/CHANGES.txt
index 043fcd3..48048c8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -8,6 +8,7 @@
BUG FIXES
+ HAMA-982: Vertex.read/writeState() method throws NullPointerException (edwardyoon)
HAMA-965: Infinite loop because of recursive function call (JongYoon Lim via edwardyoon)
HAMA-966: NioServerListener doesn't throw any exceptions (JongYoon Lim via edwardyoon)
diff --git a/examples/src/test/java/org/apache/hama/examples/CustomVertexReadWriteStateTest.java b/examples/src/test/java/org/apache/hama/examples/CustomVertexReadWriteStateTest.java
new file mode 100644
index 0000000..f753ffb
--- /dev/null
+++ b/examples/src/test/java/org/apache/hama/examples/CustomVertexReadWriteStateTest.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.examples;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.graph.Vertex;
+
+public class CustomVertexReadWriteStateTest extends TestCase {
+ static int initialState = 2;
+ static int changedState = 4;
+
+ public static class TestVertex extends Vertex<Text, IntWritable, IntWritable> {
+
+ private static ArrayWritable test = new ArrayWritable(IntWritable.class);
+
+ @Override
+ public void setup(HamaConfiguration conf) {
+ // Sets the initial state
+ test.set(new Writable[] { new IntWritable(initialState) });
+ }
+
+ @Override
+ public void compute(Iterable<IntWritable> messages) throws IOException {
+ if (this.getSuperstepCount() == 3) {
+ // change the state
+ test.set(new Writable[] { new IntWritable(changedState) });
+ }
+
+ if (this.getSuperstepCount() < 3) {
+ assertEquals(initialState, ((IntWritable) test.get()[0]).get());
+ } else {
+ assertEquals(changedState, ((IntWritable) test.get()[0]).get());
+ }
+ }
+
+ public void readState(DataInput in) throws IOException {
+ test.readFields(in);
+ }
+
+ public void writeState(DataOutput out) throws IOException {
+ test.write(out);
+ }
+ }
+
+}
diff --git a/examples/src/test/java/org/apache/hama/examples/SSSPTest.java b/examples/src/test/java/org/apache/hama/examples/SSSPTest.java
index 3d7f03f..116abbb 100644
--- a/examples/src/test/java/org/apache/hama/examples/SSSPTest.java
+++ b/examples/src/test/java/org/apache/hama/examples/SSSPTest.java
@@ -31,7 +31,16 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.HashPartitioner;
+import org.apache.hama.bsp.NullOutputFormat;
+import org.apache.hama.bsp.TextInputFormat;
+import org.apache.hama.examples.CustomVertexReadWriteStateTest.TestVertex;
+import org.apache.hama.examples.SSSP.SSSPTextReader;
+import org.apache.hama.graph.GraphJob;
import org.junit.Test;
/**
@@ -61,18 +70,51 @@
protected void setUp() throws Exception {
super.setUp();
fs = FileSystem.get(conf);
+ generateTestData();
+ }
+
+ protected void tearDown() throws Exception {
+ deleteTempDirs();
+ fs.close();
}
@Test
public void testShortestPaths() throws IOException, InterruptedException,
ClassNotFoundException, InstantiationException, IllegalAccessException {
- generateTestData();
- try {
- SSSP.main(new String[] { "0", INPUT, OUTPUT, "3" });
- verifyResult();
- } finally {
- deleteTempDirs();
+ SSSP.main(new String[] { "0", INPUT, OUTPUT, "3" });
+ verifyResult();
+ }
+
+ @Test
+ public void testCustomReadWriteState() throws IOException,
+ InterruptedException, ClassNotFoundException, InstantiationException,
+ IllegalAccessException {
+
+ HamaConfiguration conf = new HamaConfiguration();
+ GraphJob job = new GraphJob(conf, CustomVertexReadWriteStateTest.class);
+ // Set the job name
+ job.setJobName("test custom read/write state");
+ job.setInputPath(new Path(INPUT));
+ job.setNumBspTask(1);
+ job.setVertexClass(TestVertex.class);
+ job.setInputFormat(TextInputFormat.class);
+ job.setInputKeyClass(LongWritable.class);
+ job.setInputValueClass(Text.class);
+
+ job.setPartitioner(HashPartitioner.class);
+ job.setOutputFormat(NullOutputFormat.class);
+ job.setVertexInputReaderClass(SSSPTextReader.class);
+ // Iterate until all the nodes have been reached.
+ job.setMaxIteration(6);
+ job.setVertexIDClass(Text.class);
+ job.setVertexValueClass(IntWritable.class);
+ job.setEdgeValueClass(IntWritable.class);
+
+ long startTime = System.currentTimeMillis();
+ if (job.waitForCompletion(true)) {
+ System.out.println("Job Finished in "
+ + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
}
}
diff --git a/graph/src/main/java/org/apache/hama/graph/Vertex.java b/graph/src/main/java/org/apache/hama/graph/Vertex.java
index bf90187..cdbf6b5 100644
--- a/graph/src/main/java/org/apache/hama/graph/Vertex.java
+++ b/graph/src/main/java/org/apache/hama/graph/Vertex.java
@@ -17,8 +17,10 @@
*/
package org.apache.hama.graph;
+import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataOutput;
+import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -271,7 +273,11 @@
}
}
votedToHalt = in.readBoolean();
- readState(in);
+
+ boolean hasMoreContents = in.readBoolean();
+ if (hasMoreContents) {
+ readState(in);
+ }
}
@Override
@@ -308,8 +314,24 @@
}
}
out.writeBoolean(votedToHalt);
- writeState(out);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutput customOut = new DataOutputStream(baos);
+ boolean hasMoreContents = true;
+ try {
+ writeState(customOut);
+ } catch (NullPointerException e) {
+ // do nothing
+ }
+
+ // if all states are null, set hasContents to false.
+ if (baos.size() == 0) {
+ hasMoreContents = false;
+ }
+
+ out.writeBoolean(hasMoreContents);
+ if (hasMoreContents)
+ out.write(baos.toByteArray());
}
// compare across the vertex ID
diff --git a/src/site/xdoc/index.xml b/src/site/xdoc/index.xml
index a75a846..1602680 100644
--- a/src/site/xdoc/index.xml
+++ b/src/site/xdoc/index.xml
@@ -24,7 +24,7 @@
<section name=""></section>
<p>
- <div style="float:left;margin-right:15px;"><img src="./images/hama_paint_logo.png" style="width:130px" alt="" /></div>
+ <div style="float:left;margin-right:15px;margin-bottom: 10px;"><img src="./images/hama_paint_logo.png" style="width:120px" alt="" /></div>
Apache Hama<sup>TM</sup> is a framework for Big Data analytics which uses the Bulk Synchronous Parallel (BSP) computing model,
which was established in 2012 as a Top-Level Project of The Apache Software Foundation.
<br/><br/>It provides not only pure BSP programming model
@@ -49,8 +49,8 @@
<h3 align="center">Recent News</h3>
<ul>
+ <li>Jan 28, 2016: Behroz Sikander was added as a committer and PMC</li>
<li>Jun 14, 2015: release 0.7.0 available [<a href="downloads.html">downloads</a>]</li>
- <li>Jun 11, 2015: Minho Kim was added as a committer.</li>
</ul>
</div>
diff --git a/src/site/xdoc/team-list.xml b/src/site/xdoc/team-list.xml
index 57b6755..d4bc378 100644
--- a/src/site/xdoc/team-list.xml
+++ b/src/site/xdoc/team-list.xml
@@ -65,6 +65,12 @@
<td align="center">committer</td>
</tr>
<tr valign="top">
+ <td align="center">bsikander</td>
+ <td align="center">Behroz Sikander</td>
+ <td align="center">Technical University of Munich</td>
+ <td align="center">PMC member, committer</td>
+ </tr>
+ <tr valign="top">
<td align="center">bsmin</td>
<td align="center">Byungseok Min</td>
<td align="center">LG CNS</td>