Merge pull request #1 from apache/master
Merging Hama latest code into my cloned repo
diff --git a/CHANGES.txt b/CHANGES.txt
index c7e3a9b..56600ee 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,11 +1,28 @@
Hama Change Log
-Release 0.7.1 (unreleased changes)
+Release 0.7.2 (unreleased changes)
NEW FEATURES
+
+ HAMA-988: Allow to add additional no-input tasks (edwardyoon)
BUG FIXES
+ HAMA-986: There is a missing value when calculating hashcode of AsyncClient (JongYoon Lim via edwardyoon)
+
+ IMPROVEMENTS
+
+ HAMA-978: Netty native transport (JongYoon Lim via edwardyoon)
+
+Release 0.7.1 - March 14, 2016
+
+ NEW FEATURES
+
+ HAMA-900: Added a new strategy to schedule tasks based on round robin scheme (Behroz Sikander via edwardyoon)
+
+ BUG FIXES
+
+ HAMA-982: Vertex.read/writeState() method throws NullPointerException (edwardyoon)
HAMA-965: Infinite loop because of recursive function call (JongYoon Lim via edwardyoon)
HAMA-966: NioServerListener doesn't throw any exceptions (JongYoon Lim via edwardyoon)
diff --git a/README.md b/README.md
index fea4d49..b686f45 100644
--- a/README.md
+++ b/README.md
@@ -1,6 +1,6 @@
# Apache Hama
-Apache Hama is a framework for Big Data analytics which uses the Bulk Synchronous Parallel (BSP) computing model, which was established in 2012 as a Top-Level Project of The Apache Software Foundation.
+<img src="http://hama.apache.org/images/hama_paint_logo.png" width="130" align="left"> Apache Hama is a framework for Big Data analytics which uses the Bulk Synchronous Parallel (BSP) computing model, which was established in 2012 as a Top-Level Project of The Apache Software Foundation.
It provides not only pure BSP programming model but also SQL-like query interface (Apache MRQL) and vertex/neuron centric programming models, inspired by Google's Pregel and DistBelief (Apache Horn). For the latest information about Hama, please visit our website at: <https://hama.apache.org/> and our wiki at: <https://wiki.apache.org/hama/>
diff --git a/c++/pom.xml b/c++/pom.xml
index 358db37..07ecff0 100644
--- a/c++/pom.xml
+++ b/c++/pom.xml
@@ -20,13 +20,13 @@
<parent>
<groupId>org.apache.hama</groupId>
<artifactId>hama-parent</artifactId>
- <version>0.7.1-SNAPSHOT</version>
+ <version>0.7.2-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.hama</groupId>
<artifactId>hama-pipes</artifactId>
- <version>0.7.1-SNAPSHOT</version>
+ <version>0.7.2-SNAPSHOT</version>
<name>pipes</name>
<description>Apache Hama Pipes</description>
<packaging>pom</packaging>
diff --git a/commons/pom.xml b/commons/pom.xml
index 8625c2f..3277336 100644
--- a/commons/pom.xml
+++ b/commons/pom.xml
@@ -20,14 +20,14 @@
<parent>
<groupId>org.apache.hama</groupId>
<artifactId>hama-parent</artifactId>
- <version>0.7.1-SNAPSHOT</version>
+ <version>0.7.2-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.hama</groupId>
<artifactId>hama-commons</artifactId>
<name>commons</name>
- <version>0.7.1-SNAPSHOT</version>
+ <version>0.7.2-SNAPSHOT</version>
<packaging>jar</packaging>
<dependencies>
diff --git a/core/pom.xml b/core/pom.xml
index 2adb920..1305efb 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -20,14 +20,14 @@
<parent>
<groupId>org.apache.hama</groupId>
<artifactId>hama-parent</artifactId>
- <version>0.7.1-SNAPSHOT</version>
+ <version>0.7.2-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.hama</groupId>
<artifactId>hama-core</artifactId>
<name>core</name>
- <version>0.7.1-SNAPSHOT</version>
+ <version>0.7.2-SNAPSHOT</version>
<packaging>jar</packaging>
<dependencies>
diff --git a/core/src/main/java/org/apache/hama/Constants.java b/core/src/main/java/org/apache/hama/Constants.java
index c3563f9..351fb05 100644
--- a/core/src/main/java/org/apache/hama/Constants.java
+++ b/core/src/main/java/org/apache/hama/Constants.java
@@ -132,6 +132,8 @@
// If true, framework launches the number of tasks by user settings.
public static final String FORCE_SET_BSP_TASKS = "hama.force.set.bsp.tasks";
+ // framework launches additional tasks to the number of input splits
+ public static final String ADDITIONAL_BSP_TASKS = "hama.additional.bsp.tasks";
// /////////////////////////////////////
// Constants for ZooKeeper
diff --git a/core/src/main/java/org/apache/hama/bsp/BSPJob.java b/core/src/main/java/org/apache/hama/bsp/BSPJob.java
index 293e6a6..8489f83 100644
--- a/core/src/main/java/org/apache/hama/bsp/BSPJob.java
+++ b/core/src/main/java/org/apache/hama/bsp/BSPJob.java
@@ -189,6 +189,11 @@
return info.progress();
}
+ public Counters getCounters() throws IOException {
+ ensureState(JobState.RUNNING);
+ return info.getCounters();
+ }
+
public boolean isComplete() throws IOException {
ensureState(JobState.RUNNING);
return info.isComplete();
diff --git a/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java b/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
index 4ca7b61..c1ce0f4 100644
--- a/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
+++ b/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
@@ -219,6 +219,11 @@
public TaskCompletionEvent[] getTaskCompletionEvents(int startFrom) {
return jobSubmitClient.getTaskCompletionEvents(getID(), startFrom, 10);
}
+
+ @Override
+ public Counters getCounters() {
+ return status.getCounter();
+ }
}
public BSPJobClient(Configuration conf) throws IOException {
@@ -363,17 +368,22 @@
splits = job.getInputFormat().getSplits(job, maxTasks);
}
- if (maxTasks < splits.length) {
+ // the number of additional tasks to the number of input splits
+ int additionalTasks = job.getConfiguration().getInt(
+ Constants.ADDITIONAL_BSP_TASKS, 0);
+ if (maxTasks < splits.length + additionalTasks) {
throw new IOException(
"Job failed! The number of splits has exceeded the number of max tasks. The number of splits: "
- + splits.length + ", The number of max tasks: " + maxTasks);
+ + splits.length
+ + ", The number of additional tasks: "
+ + +additionalTasks + ", The number of max tasks: " + maxTasks);
}
int numOfSplits = writeSplits(job, splits, submitSplitFile, maxTasks);
if (numOfSplits > configured
|| !job.getConfiguration().getBoolean(Constants.FORCE_SET_BSP_TASKS,
false)) {
- job.setNumBspTask(numOfSplits);
+ job.setNumBspTask(numOfSplits + additionalTasks);
}
job.set("bsp.job.split.file", submitSplitFile.toString());
@@ -583,8 +593,9 @@
// set partitionID to rawSplit
if (split.getClass().getName().equals(FileSplit.class.getName())
&& job.getBoolean("input.has.partitioned", false)) {
- String[] extractPartitionID = ((FileSplit) split).getPath().getName().split("[-]");
- if(extractPartitionID.length > 1)
+ String[] extractPartitionID = ((FileSplit) split).getPath().getName()
+ .split("[-]");
+ if (extractPartitionID.length > 1)
rawSplit.setPartitionID(Integer.parseInt(extractPartitionID[1]));
}
diff --git a/core/src/main/java/org/apache/hama/bsp/RunningJob.java b/core/src/main/java/org/apache/hama/bsp/RunningJob.java
index 95faba5..dcbabdd 100644
--- a/core/src/main/java/org/apache/hama/bsp/RunningJob.java
+++ b/core/src/main/java/org/apache/hama/bsp/RunningJob.java
@@ -53,6 +53,8 @@
*/
public String getJobFile();
+ public Counters getCounters();
+
/**
* Get the <i>progress</i> of the job's tasks, as a float between 0.0 and 1.0.
* When all bsp tasks have completed, the function returns 1.0.
diff --git a/core/src/main/java/org/apache/hama/bsp/taskallocation/RoundRobinTaskAllocator.java b/core/src/main/java/org/apache/hama/bsp/taskallocation/RoundRobinTaskAllocator.java
new file mode 100644
index 0000000..0bf060b
--- /dev/null
+++ b/core/src/main/java/org/apache/hama/bsp/taskallocation/RoundRobinTaskAllocator.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.taskallocation;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hama.bsp.GroomServerStatus;
+import org.apache.hama.bsp.TaskInProgress;
+import org.apache.hama.bsp.BSPJobClient.RawSplit;
+
+/**
+ * <code>RoundRobinTaskAllocator</code> is a round robin based task allocator that equally
+ * divides the tasks among all the Grooms. It balances the load of cluster. For example
+ * if a cluster has 10 Grooms and 20 tasks are to be scheduled then each Groom which
+ * get 2 tasks.
+ */
+public class RoundRobinTaskAllocator implements TaskAllocationStrategy {
+
+ Log LOG = LogFactory.getLog(RoundRobinTaskAllocator.class);
+
+ @Override
+ public void initialize(Configuration conf) {
+ }
+
+ /**
+ * This function loops through the whole list of Grooms with their task count
+ * and returns the first Groom which contains the minimum number of tasks.
+ * @param groomStatuses The map of groom-name to
+ * <code>GroomServerStatus</code> object for all known grooms.
+ * @param taskCountInGroomMap Map of count of tasks in groom (To be deprecated
+ * soon)
+ * @return returns the groom name which should be allocated the next task or
+ * null no suitable groom was found.
+ */
+ private String findGroomWithMinimumTasks(
+ Map<String, GroomServerStatus> groomStatuses,
+ Map<GroomServerStatus, Integer> taskCountInGroomMap) {
+
+ Entry<GroomServerStatus, Integer> firstGroomWithMinimumTasks = null;
+
+ // At the start taskCountInGroomMap is empty so we have to put 0 tasks on grooms
+ if (taskCountInGroomMap.size() < groomStatuses.size()) {
+ for (String s : groomStatuses.keySet()) {
+ GroomServerStatus groom = groomStatuses.get(s);
+ if (groom == null)
+ continue;
+ Integer taskInGroom = taskCountInGroomMap.get(groom);
+
+ // Find the groom that is yet to get its first tasks and assign 0 value to it.
+ // Having zero will make sure that it gets selected.
+ if (taskInGroom == null) {
+ taskCountInGroomMap.put(groom, 0);
+ break;
+ }
+ }
+ }
+
+ for (Entry<GroomServerStatus, Integer> currentGroom : taskCountInGroomMap.entrySet()) {
+ if (firstGroomWithMinimumTasks == null || firstGroomWithMinimumTasks.getValue() > currentGroom.getValue()) {
+ if(currentGroom.getValue() < currentGroom.getKey().getMaxTasks()) { // Assign the task to groom which still has space for more tasks
+ firstGroomWithMinimumTasks = currentGroom;
+ } // If there is no space then continue and find the next best groom
+ }
+ }
+
+ return (firstGroomWithMinimumTasks == null) ? null
+ : firstGroomWithMinimumTasks.getKey().getGroomHostName();
+ }
+
+ /**
+ * Select grooms that has the block of data locally stored on the groom
+ * server.
+ */
+ @Override
+ public String[] selectGrooms(Map<String, GroomServerStatus> groomStatuses,
+ Map<GroomServerStatus, Integer> taskCountInGroomMap,
+ BSPResource[] resources, TaskInProgress taskInProgress) {
+ if (!taskInProgress.canStartTask()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Cannot start task based on id");
+ }
+ return new String[0];
+ }
+
+ RawSplit rawSplit = taskInProgress.getFileSplit();
+ if (rawSplit != null) {
+ return rawSplit.getLocations();
+ }
+ return null;
+ }
+
+ @Override
+ public GroomServerStatus getGroomToAllocate(
+ Map<String, GroomServerStatus> groomStatuses, String[] selectedGrooms,
+ Map<GroomServerStatus, Integer> taskCountInGroomMap,
+ BSPResource[] resources, TaskInProgress taskInProgress) {
+ if (!taskInProgress.canStartTask()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Exceeded allowed attempts.");
+ }
+ return null;
+ }
+
+ String groomName = null;
+
+ groomName = findGroomWithMinimumTasks(groomStatuses, taskCountInGroomMap);
+
+ if (groomName != null) {
+ return groomStatuses.get(groomName);
+ }
+
+ return null;
+ }
+
+ /**
+ * This operation is not supported.
+ */
+ @Override
+ public Set<GroomServerStatus> getGroomsToAllocate(
+ Map<String, GroomServerStatus> groomStatuses, String[] selectedGrooms,
+ Map<GroomServerStatus, Integer> taskCountInGroomMap,
+ BSPResource[] resources, TaskInProgress taskInProgress) {
+ throw new UnsupportedOperationException(
+ "This API is not supported for the called API function call.");
+ }
+}
diff --git a/core/src/main/java/org/apache/hama/ipc/AsyncClient.java b/core/src/main/java/org/apache/hama/ipc/AsyncClient.java
index ba0266a..8bb3913 100644
--- a/core/src/main/java/org/apache/hama/ipc/AsyncClient.java
+++ b/core/src/main/java/org/apache/hama/ipc/AsyncClient.java
@@ -20,24 +20,29 @@
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.ChannelPipeline;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.*;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.socket.SocketChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.ReferenceCountUtil;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.util.BSPNetUtils;
+import javax.net.SocketFactory;
import java.io.DataInputStream;
import java.io.IOException;
import java.net.ConnectException;
@@ -51,20 +56,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import javax.net.SocketFactory;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hama.util.BSPNetUtils;
-
/**
* A client for an IPC service using netty. IPC calls take a single
* {@link Writable} as a parameter, and return a {@link Writable} as their
@@ -186,7 +177,7 @@
* @throws IOException
*/
public Connection(ConnectionId remoteId) throws IOException {
- group = new NioEventLoopGroup();
+ group = new EpollEventLoopGroup();
bootstrap = new Bootstrap();
this.remoteId = remoteId;
this.serverAddress = remoteId.getAddress();
@@ -280,12 +271,12 @@
}
// Configure the client.
- // NioEventLoopGroup is a multithreaded event loop that handles I/O
+ // EpollEventLoopGroup is a multithreaded event loop that handles I/O
// operation
- group = new NioEventLoopGroup();
+ group = new EpollEventLoopGroup();
// Bootstrap is a helper class that sets up a client
bootstrap = new Bootstrap();
- bootstrap.group(group).channel(NioSocketChannel.class)
+ bootstrap.group(group).channel(EpollSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, this.tcpNoDelay)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, pingInterval)
@@ -1127,7 +1118,7 @@
result = PRIME * result + maxIdleTime;
result = PRIME * result + pingInterval;
result = PRIME * result + ((protocol == null) ? 0 : protocol.hashCode());
- result = PRIME * rpcTimeout;
+ result = PRIME * result + rpcTimeout;
result = PRIME * result
+ ((serverPrincipal == null) ? 0 : serverPrincipal.hashCode());
result = PRIME * result + (tcpNoDelay ? 1231 : 1237);
diff --git a/core/src/main/java/org/apache/hama/ipc/AsyncServer.java b/core/src/main/java/org/apache/hama/ipc/AsyncServer.java
index 93627aa..67ad5d0 100644
--- a/core/src/main/java/org/apache/hama/ipc/AsyncServer.java
+++ b/core/src/main/java/org/apache/hama/ipc/AsyncServer.java
@@ -20,12 +20,12 @@
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
-import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.util.ReferenceCountUtil;
-import io.netty.util.concurrent.*;
+import io.netty.util.concurrent.GenericFutureListener;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -48,7 +48,6 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.*;
-import java.util.concurrent.Future;
/**
* An abstract IPC service using netty. IPC calls take a single {@link Writable}
@@ -82,8 +81,8 @@
private int port; // port we listen on
private Class<? extends Writable> paramClass; // class of call parameters
// Configure the server.(constructor is thread num)
- private EventLoopGroup bossGroup = new NioEventLoopGroup(1);
- private EventLoopGroup workerGroup = new NioEventLoopGroup();
+ private EventLoopGroup bossGroup = new EpollEventLoopGroup(1);
+ private EventLoopGroup workerGroup = new EpollEventLoopGroup();
private static final Map<String, Class<?>> PROTOCOL_CACHE = new ConcurrentHashMap<String, Class<?>>();
private ExceptionsHandler exceptionsHandler = new ExceptionsHandler();
@@ -192,7 +191,7 @@
// ServerBootstrap is a helper class that sets up a server
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
- .channel(NioServerSocketChannel.class)
+ .channel(EpollServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, backlogLength)
.childOption(ChannelOption.MAX_MESSAGES_PER_READ, NIO_BUFFER_LIMIT)
.childOption(ChannelOption.TCP_NODELAY, tcpNoDelay)
diff --git a/core/src/test/java/org/apache/hama/bsp/TestAdditionalTasks.java b/core/src/test/java/org/apache/hama/bsp/TestAdditionalTasks.java
new file mode 100644
index 0000000..e50e4f9
--- /dev/null
+++ b/core/src/test/java/org/apache/hama/bsp/TestAdditionalTasks.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hama.Constants;
+import org.apache.hama.HamaCluster;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.sync.SyncException;
+import org.apache.hama.commons.util.KeyValuePair;
+
+public class TestAdditionalTasks extends HamaCluster {
+
+ public static final Log LOG = LogFactory.getLog(TestAdditionalTasks.class);
+
+ protected HamaConfiguration configuration;
+
+ // these variables are preventing from rebooting the whole stuff again since
+ // setup and teardown are called per method.
+
+ public TestAdditionalTasks() {
+ configuration = new HamaConfiguration();
+ configuration.set("bsp.master.address", "localhost");
+ configuration.set("hama.child.redirect.log.console", "true");
+ assertEquals("Make sure master addr is set to localhost:", "localhost",
+ configuration.get("bsp.master.address"));
+ configuration.set("bsp.local.dir", "/tmp/hama-test");
+ configuration.set(Constants.ZOOKEEPER_QUORUM, "localhost");
+ configuration.setInt(Constants.ZOOKEEPER_CLIENT_PORT, 21810);
+ configuration.set("hama.sync.peer.class",
+ org.apache.hama.bsp.sync.ZooKeeperSyncClientImpl.class
+ .getCanonicalName());
+ }
+
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ super.tearDown();
+ }
+
+ public void testAdditionalTasks() throws Exception {
+
+ Configuration conf = new Configuration();
+ BSPJob bsp = new BSPJob(new HamaConfiguration(conf));
+ bsp.setBspClass(TestBSP.class);
+ conf.setInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 600);
+ bsp.setInputFormat(TextInputFormat.class);
+ bsp.setOutputFormat(NullOutputFormat.class);
+ bsp.setInputPath(new Path("../CHANGES.txt"));
+
+ bsp.getConfiguration().setInt(Constants.ADDITIONAL_BSP_TASKS, 1);
+ bsp.setNumBspTask(2);
+
+ assertTrue(bsp.waitForCompletion(true));
+ Counters counter = bsp.getCounters();
+ assertTrue(2 == counter.getCounter(JobInProgress.JobCounter.LAUNCHED_TASKS));
+ }
+
+ public static class TestBSP extends
+ BSP<LongWritable, Text, NullWritable, NullWritable, NullWritable> {
+
+ @Override
+ public void bsp(
+ BSPPeer<LongWritable, Text, NullWritable, NullWritable, NullWritable> peer)
+ throws IOException, SyncException, InterruptedException {
+ long numOfPairs = 0;
+ KeyValuePair<LongWritable, Text> readNext = null;
+ while ((readNext = peer.readNext()) != null) {
+ LOG.debug(readNext.getKey().get() + " / "
+ + readNext.getValue().toString());
+ numOfPairs++;
+ }
+
+ assertTrue(numOfPairs > 2 || numOfPairs == 0);
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/core/src/test/java/org/apache/hama/bsp/TestTaskAllocationRoundRobin.java b/core/src/test/java/org/apache/hama/bsp/TestTaskAllocationRoundRobin.java
new file mode 100644
index 0000000..e471fe0
--- /dev/null
+++ b/core/src/test/java/org/apache/hama/bsp/TestTaskAllocationRoundRobin.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import junit.framework.TestCase;
+
+import org.junit.Test;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.bsp.BSPJobClient.RawSplit;
+import org.apache.hama.bsp.taskallocation.BSPResource;
+import org.apache.hama.bsp.taskallocation.RoundRobinTaskAllocator;
+import org.apache.hama.bsp.taskallocation.TaskAllocationStrategy;
+
+public class TestTaskAllocationRoundRobin extends TestCase {
+
+ public static final Log LOG = LogFactory
+ .getLog(TestTaskAllocationRoundRobin.class);
+
+ Configuration conf = new Configuration();
+ Map<String, GroomServerStatus> groomStatuses;
+ Map<GroomServerStatus, Integer> taskCountInGroomMap;
+ BSPResource[] resources;
+ TaskInProgress taskInProgress;
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+
+ String[] locations = new String[] { "host6", "host4", "host3" };
+ String value = "data";
+ RawSplit split = new RawSplit();
+ split.setLocations(locations);
+ split.setBytes(value.getBytes(), 0, value.getBytes().length);
+ split.setDataLength(value.getBytes().length);
+
+ assertEquals(value.getBytes().length, (int) split.getDataLength());
+
+ taskCountInGroomMap = new LinkedHashMap<GroomServerStatus, Integer>(10);
+ resources = new BSPResource[0];
+ BSPJob job = new BSPJob(new BSPJobID("checkpttest", 1), "/tmp");
+ JobInProgress jobProgress = new JobInProgress(job.getJobID(), conf);
+ taskInProgress = new TaskInProgress(job.getJobID(), "job.xml", split, conf,
+ jobProgress, 1);
+
+ groomStatuses = new LinkedHashMap<String, GroomServerStatus>(10);
+
+ for (int i = 0; i < 10; ++i) {
+ String name = "host" + i;
+
+ GroomServerStatus status = new GroomServerStatus(name,
+ new ArrayList<TaskStatus>(), 0, 3, "", name);
+ groomStatuses.put(name, status);
+ taskCountInGroomMap.put(status, 0);
+ }
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ }
+
+ /**
+ * This test simulates the allocation of 30 tasks in round robin fashion
+ * on 10 Grooms.
+ */
+ @Test
+ public void testRoundRobinAllocation() {
+ TaskAllocationStrategy strategy = ReflectionUtils.newInstance(conf
+ .getClass("", RoundRobinTaskAllocator.class,
+ TaskAllocationStrategy.class), conf);
+
+ for (int i = 0; i < 30; i++) {
+ GroomServerStatus groomStatus = strategy.getGroomToAllocate(
+ groomStatuses, null, taskCountInGroomMap, resources, taskInProgress);
+ if (groomStatus != null) {
+ taskCountInGroomMap.put(groomStatus,
+ taskCountInGroomMap.get(groomStatus) + 1); // Increment the total tasks in it
+
+ assertEquals("", "host" + (i % 10), groomStatus.getGroomHostName());
+ }
+ }
+ }
+
+ @Test
+ public void testRoundRobinDataLocality() throws Exception {
+
+ TaskAllocationStrategy strategy = ReflectionUtils.newInstance(conf
+ .getClass("", RoundRobinTaskAllocator.class,
+ TaskAllocationStrategy.class), conf);
+
+ String[] hosts = strategy.selectGrooms(groomStatuses, taskCountInGroomMap,
+ resources, taskInProgress);
+
+ List<String> list = new ArrayList<String>();
+
+ for (int i = 0; i < hosts.length; ++i) {
+ list.add(hosts[i]);
+ }
+
+ assertTrue(list.contains("host6"));
+ assertTrue(list.contains("host3"));
+ assertTrue(list.contains("host4"));
+ }
+
+}
diff --git a/dist/pom.xml b/dist/pom.xml
index f8cae37..a4b70cc 100644
--- a/dist/pom.xml
+++ b/dist/pom.xml
@@ -20,7 +20,7 @@
<parent>
<groupId>org.apache.hama</groupId>
<artifactId>hama-parent</artifactId>
- <version>0.7.1-SNAPSHOT</version>
+ <version>0.7.2-SNAPSHOT</version>
</parent>
<artifactId>hama-dist</artifactId>
diff --git a/examples/pom.xml b/examples/pom.xml
index 49b7227..c5fc1aa 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -20,14 +20,14 @@
<parent>
<groupId>org.apache.hama</groupId>
<artifactId>hama-parent</artifactId>
- <version>0.7.1-SNAPSHOT</version>
+ <version>0.7.2-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.hama</groupId>
<artifactId>hama-examples</artifactId>
<name>examples</name>
- <version>0.7.1-SNAPSHOT</version>
+ <version>0.7.2-SNAPSHOT</version>
<packaging>jar</packaging>
<dependencies>
diff --git a/examples/src/test/java/org/apache/hama/examples/CustomVertexReadWriteStateTest.java b/examples/src/test/java/org/apache/hama/examples/CustomVertexReadWriteStateTest.java
new file mode 100644
index 0000000..f753ffb
--- /dev/null
+++ b/examples/src/test/java/org/apache/hama/examples/CustomVertexReadWriteStateTest.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.examples;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.graph.Vertex;
+
+public class CustomVertexReadWriteStateTest extends TestCase {
+ static int initialState = 2;
+ static int changedState = 4;
+
+ public static class TestVertex extends Vertex<Text, IntWritable, IntWritable> {
+
+ private static ArrayWritable test = new ArrayWritable(IntWritable.class);
+
+ @Override
+ public void setup(HamaConfiguration conf) {
+ // Sets the initial state
+ test.set(new Writable[] { new IntWritable(initialState) });
+ }
+
+ @Override
+ public void compute(Iterable<IntWritable> messages) throws IOException {
+ if (this.getSuperstepCount() == 3) {
+ // change the state
+ test.set(new Writable[] { new IntWritable(changedState) });
+ }
+
+ if (this.getSuperstepCount() < 3) {
+ assertEquals(initialState, ((IntWritable) test.get()[0]).get());
+ } else {
+ assertEquals(changedState, ((IntWritable) test.get()[0]).get());
+ }
+ }
+
+ public void readState(DataInput in) throws IOException {
+ test.readFields(in);
+ }
+
+ public void writeState(DataOutput out) throws IOException {
+ test.write(out);
+ }
+ }
+
+}
diff --git a/examples/src/test/java/org/apache/hama/examples/SSSPTest.java b/examples/src/test/java/org/apache/hama/examples/SSSPTest.java
index 3d7f03f..116abbb 100644
--- a/examples/src/test/java/org/apache/hama/examples/SSSPTest.java
+++ b/examples/src/test/java/org/apache/hama/examples/SSSPTest.java
@@ -31,7 +31,16 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.HashPartitioner;
+import org.apache.hama.bsp.NullOutputFormat;
+import org.apache.hama.bsp.TextInputFormat;
+import org.apache.hama.examples.CustomVertexReadWriteStateTest.TestVertex;
+import org.apache.hama.examples.SSSP.SSSPTextReader;
+import org.apache.hama.graph.GraphJob;
import org.junit.Test;
/**
@@ -61,18 +70,51 @@
protected void setUp() throws Exception {
super.setUp();
fs = FileSystem.get(conf);
+ generateTestData();
+ }
+
+ protected void tearDown() throws Exception {
+ deleteTempDirs();
+ fs.close();
}
@Test
public void testShortestPaths() throws IOException, InterruptedException,
ClassNotFoundException, InstantiationException, IllegalAccessException {
- generateTestData();
- try {
- SSSP.main(new String[] { "0", INPUT, OUTPUT, "3" });
- verifyResult();
- } finally {
- deleteTempDirs();
+ SSSP.main(new String[] { "0", INPUT, OUTPUT, "3" });
+ verifyResult();
+ }
+
+ @Test
+ public void testCustomReadWriteState() throws IOException,
+ InterruptedException, ClassNotFoundException, InstantiationException,
+ IllegalAccessException {
+
+ HamaConfiguration conf = new HamaConfiguration();
+ GraphJob job = new GraphJob(conf, CustomVertexReadWriteStateTest.class);
+ // Set the job name
+ job.setJobName("test custom read/write state");
+ job.setInputPath(new Path(INPUT));
+ job.setNumBspTask(1);
+ job.setVertexClass(TestVertex.class);
+ job.setInputFormat(TextInputFormat.class);
+ job.setInputKeyClass(LongWritable.class);
+ job.setInputValueClass(Text.class);
+
+ job.setPartitioner(HashPartitioner.class);
+ job.setOutputFormat(NullOutputFormat.class);
+ job.setVertexInputReaderClass(SSSPTextReader.class);
+ // Iterate until all the nodes have been reached.
+ job.setMaxIteration(6);
+ job.setVertexIDClass(Text.class);
+ job.setVertexValueClass(IntWritable.class);
+ job.setEdgeValueClass(IntWritable.class);
+
+ long startTime = System.currentTimeMillis();
+ if (job.waitForCompletion(true)) {
+ System.out.println("Job Finished in "
+ + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
}
}
diff --git a/graph/pom.xml b/graph/pom.xml
index 14fd3ef..d56fe57 100644
--- a/graph/pom.xml
+++ b/graph/pom.xml
@@ -20,14 +20,14 @@
<parent>
<groupId>org.apache.hama</groupId>
<artifactId>hama-parent</artifactId>
- <version>0.7.1-SNAPSHOT</version>
+ <version>0.7.2-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.hama</groupId>
<artifactId>hama-graph</artifactId>
<name>graph</name>
- <version>0.7.1-SNAPSHOT</version>
+ <version>0.7.2-SNAPSHOT</version>
<packaging>jar</packaging>
<dependencies>
diff --git a/graph/src/main/java/org/apache/hama/graph/Vertex.java b/graph/src/main/java/org/apache/hama/graph/Vertex.java
index bf90187..cdbf6b5 100644
--- a/graph/src/main/java/org/apache/hama/graph/Vertex.java
+++ b/graph/src/main/java/org/apache/hama/graph/Vertex.java
@@ -17,8 +17,10 @@
*/
package org.apache.hama.graph;
+import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataOutput;
+import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -271,7 +273,11 @@
}
}
votedToHalt = in.readBoolean();
- readState(in);
+
+ boolean hasMoreContents = in.readBoolean();
+ if (hasMoreContents) {
+ readState(in);
+ }
}
@Override
@@ -308,8 +314,24 @@
}
}
out.writeBoolean(votedToHalt);
- writeState(out);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutput customOut = new DataOutputStream(baos);
+ boolean hasMoreContents = true;
+ try {
+ writeState(customOut);
+ } catch (NullPointerException e) {
+ // do nothing
+ }
+
+ // if all states are null, set hasContents to false.
+ if (baos.size() == 0) {
+ hasMoreContents = false;
+ }
+
+ out.writeBoolean(hasMoreContents);
+ if (hasMoreContents)
+ out.write(baos.toByteArray());
}
// compare across the vertex ID
diff --git a/mesos/pom.xml b/mesos/pom.xml
index c0af443..00bafdb 100644
--- a/mesos/pom.xml
+++ b/mesos/pom.xml
@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.hama</groupId>
<artifactId>hama-parent</artifactId>
- <version>0.7.1-SNAPSHOT</version>
+ <version>0.7.2-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
diff --git a/ml/pom.xml b/ml/pom.xml
index 6701df0..b6cf3cf 100644
--- a/ml/pom.xml
+++ b/ml/pom.xml
@@ -20,14 +20,14 @@
<parent>
<groupId>org.apache.hama</groupId>
<artifactId>hama-parent</artifactId>
- <version>0.7.1-SNAPSHOT</version>
+ <version>0.7.2-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.hama</groupId>
<artifactId>hama-ml</artifactId>
<name>machine learning</name>
- <version>0.7.1-SNAPSHOT</version>
+ <version>0.7.2-SNAPSHOT</version>
<packaging>jar</packaging>
<dependencies>
diff --git a/pom.xml b/pom.xml
index 7ed4d99..51677fe 100644
--- a/pom.xml
+++ b/pom.xml
@@ -20,14 +20,14 @@
<parent>
<groupId>org.apache</groupId>
<artifactId>apache</artifactId>
- <version>8</version>
+ <version>10</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.hama</groupId>
<artifactId>hama-parent</artifactId>
<name>Apache Hama parent POM</name>
- <version>0.7.1-SNAPSHOT</version>
+ <version>0.7.2-SNAPSHOT</version>
<url>http://hama.apache.org</url>
<packaging>pom</packaging>
<prerequisites>
@@ -197,7 +197,7 @@
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-auth</artifactId>
+ <artifactId>hadoop-aws</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
@@ -390,10 +390,17 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-release-plugin</artifactId>
- <version>2.2</version>
+ <version>2.5.3</version>
<configuration>
<mavenExecutorId>forked-path</mavenExecutorId>
</configuration>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.maven.scm</groupId>
+ <artifactId>maven-scm-provider-gitexe</artifactId>
+ <version>1.8.1</version>
+ </dependency>
+ </dependencies>
</plugin>
<plugin>
<groupId>org.apache.rat</groupId>
diff --git a/src/site/xdoc/index.xml b/src/site/xdoc/index.xml
index a75a846..1602680 100644
--- a/src/site/xdoc/index.xml
+++ b/src/site/xdoc/index.xml
@@ -24,7 +24,7 @@
<section name=""></section>
<p>
- <div style="float:left;margin-right:15px;"><img src="./images/hama_paint_logo.png" style="width:130px" alt="" /></div>
+ <div style="float:left;margin-right:15px;margin-bottom: 10px;"><img src="./images/hama_paint_logo.png" style="width:120px" alt="" /></div>
Apache Hama<sup>TM</sup> is a framework for Big Data analytics which uses the Bulk Synchronous Parallel (BSP) computing model,
which was established in 2012 as a Top-Level Project of The Apache Software Foundation.
<br/><br/>It provides not only pure BSP programming model
@@ -49,8 +49,8 @@
<h3 align="center">Recent News</h3>
<ul>
+ <li>Jan 28, 2016: Behroz Sikander was added as a committer and PMC</li>
<li>Jun 14, 2015: release 0.7.0 available [<a href="downloads.html">downloads</a>]</li>
- <li>Jun 11, 2015: Minho Kim was added as a committer.</li>
</ul>
</div>
diff --git a/src/site/xdoc/team-list.xml b/src/site/xdoc/team-list.xml
index 57b6755..d4bc378 100644
--- a/src/site/xdoc/team-list.xml
+++ b/src/site/xdoc/team-list.xml
@@ -65,6 +65,12 @@
<td align="center">committer</td>
</tr>
<tr valign="top">
+ <td align="center">bsikander</td>
+ <td align="center">Behroz Sikander</td>
+ <td align="center">Technical University of Munich</td>
+ <td align="center">PMC member, committer</td>
+ </tr>
+ <tr valign="top">
<td align="center">bsmin</td>
<td align="center">Byungseok Min</td>
<td align="center">LG CNS</td>
diff --git a/yarn/pom.xml b/yarn/pom.xml
index dede15f..7129321 100644
--- a/yarn/pom.xml
+++ b/yarn/pom.xml
@@ -19,14 +19,14 @@
<parent>
<groupId>org.apache.hama</groupId>
<artifactId>hama-parent</artifactId>
- <version>0.7.1-SNAPSHOT</version>
+ <version>0.7.2-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.hama</groupId>
<artifactId>hama-yarn</artifactId>
<name>yarn</name>
- <version>0.7.1-SNAPSHOT</version>
+ <version>0.7.2-SNAPSHOT</version>
<packaging>jar</packaging>
<dependencies>
diff --git a/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java b/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java
index 4d23384..2c2cb23 100644
--- a/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java
+++ b/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java
@@ -123,6 +123,12 @@
public String getJobFile() {
return null;
}
+
+ @Override
+ public Counters getCounters() {
+ // TODO Auto-generated method stub
+ return null;
+ }
}
public YARNBSPJobClient(HamaConfiguration conf) {