HAMA-789: BspPeer launched fail because port is bound by others (Suraj Menon via edwardyoon)
git-svn-id: https://svn.apache.org/repos/asf/hama/trunk@1522784 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 0b34907..236ef08 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -11,6 +11,7 @@
BUG FIXES
+ HAMA-789: BspPeer launched fail because port is bound by others (Suraj Menon via edwardyoon)
HAMA-791: Fix the problem that MultilayerPerceptron fails to learn a good hypothesis sometimes. (Yexi Jiang)
HAMA-782: The arguments of DoubleVector.slice(int, int) method will mislead the user. (Yexi Jiang)
HAMA-780: New launched child processes by fault tolerance may not be able to contact each other (kennethxian)
diff --git a/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java b/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
index 5560c2c..8023f90 100644
--- a/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
+++ b/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
@@ -167,6 +167,16 @@
.getInt(Constants.PEER_PORT, Constants.DEFAULT_PEER_PORT);
peerAddress = new InetSocketAddress(bindAddress, bindPort);
+ // This function call may change the current peer address
+ initializeMessaging();
+
+ conf.set(Constants.PEER_HOST, peerAddress.getHostName());
+ conf.setInt(Constants.PEER_PORT, peerAddress.getPort());
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Initialized Messaging service.");
+ }
+
initializeIO();
initializeSyncService(superstep, state);
@@ -180,11 +190,6 @@
setCurrentTaskStatus(new TaskStatus(taskId.getJobID(), taskId, 1.0f, state,
stateString, peerAddress.getHostName(), phase, counters));
- initilizeMessaging();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Initialized Messaging service.");
- }
-
final String combinerName = conf.get(Constants.COMBINER_CLASS);
if (combinerName != null) {
combiner = (Combiner<M>) ReflectionUtils.newInstance(
@@ -288,9 +293,10 @@
return in.getPos();
}
- public final void initilizeMessaging() throws ClassNotFoundException {
+ public final void initializeMessaging() throws ClassNotFoundException {
messenger = MessageManagerFactory.getMessageManager(conf);
messenger.init(taskId, this, conf, peerAddress);
+ peerAddress = messenger.getListenerAddress();
}
public final void initializeSyncService(long superstep, TaskStatus.State state)
diff --git a/core/src/main/java/org/apache/hama/bsp/GroomServer.java b/core/src/main/java/org/apache/hama/bsp/GroomServer.java
index 03be1a4..32d73fc 100644
--- a/core/src/main/java/org/apache/hama/bsp/GroomServer.java
+++ b/core/src/main/java/org/apache/hama/bsp/GroomServer.java
@@ -126,7 +126,6 @@
/** Map from taskId -> TaskInProgress. */
Map<TaskAttemptID, TaskInProgress> runningTasks = null;
Map<TaskAttemptID, TaskInProgress> finishedTasks = null;
- Map<TaskAttemptID, Integer> assignedPeerNames = null;
Map<BSPJobID, RunningJob> runningJobs = null;
// new nexus between GroomServer and BSPMaster
@@ -162,17 +161,9 @@
}
if (actions != null) {
- // assignedPeerNames = new HashMap<TaskAttemptID, Integer>();
- int prevPort = Constants.DEFAULT_PEER_PORT;
for (GroomServerAction action : actions) {
if (action instanceof LaunchTaskAction) {
- Task t = ((LaunchTaskAction) action).getTask();
-
- synchronized (assignedPeerNames) {
- prevPort = BSPNetUtils.getNextAvailable(prevPort);
- assignedPeerNames.put(t.getTaskID(), prevPort);
- }
LOG.info("Launch " + actions.length + " tasks.");
startNewTask((LaunchTaskAction) action);
} else if (action instanceof KillTaskAction) {
@@ -199,10 +190,6 @@
RecoverTaskAction recoverAction = (RecoverTaskAction) action;
Task t = recoverAction.getTask();
LOG.info("Recovery action task." + t.getTaskID());
- synchronized (assignedPeerNames) {
- prevPort = BSPNetUtils.getNextAvailable(prevPort);
- assignedPeerNames.put(t.getTaskID(), prevPort);
- }
try {
startRecoveryTask(recoverAction);
} catch (IOException e) {
@@ -336,8 +323,6 @@
this.conf.set(Constants.PEER_HOST, localHostname);
this.conf.set(Constants.GROOM_RPC_HOST, localHostname);
this.maxCurrentTasks = conf.getInt(Constants.MAX_TASKS_PER_GROOM, 3);
- this.assignedPeerNames = new HashMap<TaskAttemptID, Integer>(
- 2 * this.maxCurrentTasks);
int rpcPort = -1;
String rpcAddr = null;
@@ -1231,7 +1216,8 @@
defaultConf);
final BSPTask task = (BSPTask) umbilical.getTask(taskid);
- int peerPort = umbilical.getAssignedPortNum(taskid);
+ int peerPort = Constants.DEFAULT_PEER_PORT;
+ peerPort = BSPNetUtils.getNextAvailable(peerPort);
defaultConf.addResource(new Path(task.getJobFile()));
BSPJob job = new BSPJob(task.getJobID(), task.getJobFile());
@@ -1366,11 +1352,6 @@
}
@Override
- public int getAssignedPortNum(TaskAttemptID taskid) {
- return assignedPeerNames.get(taskid);
- }
-
- @Override
public void process(WatchedEvent event) {
}
diff --git a/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java b/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
index 5a5738d..c5a6846 100644
--- a/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
+++ b/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
@@ -335,12 +335,14 @@
@SuppressWarnings("rawtypes")
private static final ConcurrentHashMap<InetSocketAddress, LocalMessageManager> MANAGER_MAP = new ConcurrentHashMap<InetSocketAddress, LocalBSPRunner.LocalMessageManager>();
+ private InetSocketAddress selfAddress;
@Override
public void init(TaskAttemptID attemptId, BSPPeer<?, ?, ?, ?, M> peer,
Configuration conf, InetSocketAddress peerAddress) {
super.init(attemptId, peer, conf, peerAddress);
MANAGER_MAP.put(peerAddress, this);
+ selfAddress = peerAddress;
}
@SuppressWarnings("unchecked")
@@ -353,6 +355,11 @@
1L);
}
}
+
+ @Override
+ public InetSocketAddress getListenerAddress() {
+ return selfAddress;
+ }
}
public static class LocalUmbilical implements BSPPeerProtocol {
@@ -400,12 +407,6 @@
throws IOException, InterruptedException {
return true;
}
-
- @Override
- public int getAssignedPortNum(TaskAttemptID taskid) {
- return 0;
- }
-
}
public static class LocalSyncClient extends BSPPeerSyncClient {
diff --git a/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java b/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
index 07789dd..9cf37d3 100644
--- a/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
+++ b/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
@@ -65,8 +65,7 @@
protected SynchronizedQueue<M> localQueueForNextIteration;
// this peer object is just used for counter incrementation
protected BSPPeer<?, ?, ?, ?, M> peer;
- // the peer address of this peer
- protected InetSocketAddress peerAddress;
+
// the task attempt id
protected TaskAttemptID attemptId;
@@ -89,7 +88,6 @@
this.attemptId = attemptId;
this.peer = peer;
this.conf = conf;
- this.peerAddress = peerAddress;
this.localQueue = getReceiverQueue();
this.localQueueForNextIteration = getSynchronizedReceiverQueue();
this.maxCachedConnections = conf.getInt(MAX_CACHED_CONNECTIONS_KEY, 100);
diff --git a/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java b/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
index 98ed935..322ff01 100644
--- a/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
+++ b/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
@@ -25,14 +25,15 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
-import org.apache.hama.ipc.RPC;
-import org.apache.hama.ipc.RPC.Server;
+import org.apache.hama.Constants;
import org.apache.hama.bsp.BSPMessageBundle;
import org.apache.hama.bsp.BSPPeer;
import org.apache.hama.bsp.BSPPeerImpl;
import org.apache.hama.bsp.TaskAttemptID;
import org.apache.hama.bsp.message.compress.BSPCompressedBundle;
import org.apache.hama.ipc.HamaRPCProtocolVersion;
+import org.apache.hama.ipc.RPC;
+import org.apache.hama.ipc.RPC.Server;
import org.apache.hama.util.LRUCache;
/**
@@ -45,7 +46,7 @@
private static final Log LOG = LogFactory
.getLog(HadoopMessageManagerImpl.class);
- private Server server = null;
+ private Server server;
private LRUCache<InetSocketAddress, HadoopMessageManager<M>> peersLRUCache = null;
@@ -74,11 +75,16 @@
private final void startRPCServer(Configuration conf,
InetSocketAddress peerAddress) {
try {
- this.server = RPC.getServer(this, peerAddress.getHostName(),
- peerAddress.getPort(), conf);
+ String bindAddress = conf.get(Constants.PEER_HOST,
+ Constants.DEFAULT_PEER_HOST);
+ InetSocketAddress selfAddress = new InetSocketAddress(bindAddress, 0);
+
+ this.server = RPC.getServer(this, selfAddress.getHostName(),
+ selfAddress.getPort(), conf);
server.start();
- LOG.info(" BSPPeer address:" + peerAddress.getHostName() + " port:"
- + peerAddress.getPort());
+
+ LOG.info(" BSPPeer address:" + server.getListenerAddress().getHostName()
+ + " port:" + server.getListenerAddress().getPort());
} catch (IOException e) {
LOG.error("Fail to start RPC server!", e);
throw new RuntimeException("RPC Server could not be launched!");
@@ -158,4 +164,11 @@
return versionID;
}
+ @Override
+ public InetSocketAddress getListenerAddress() {
+ if (this.server != null) {
+ return this.server.getListenerAddress();
+ }
+ return null;
+ }
}
diff --git a/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java b/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
index e26771c..856c147 100644
--- a/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
+++ b/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
@@ -45,7 +45,12 @@
/**
* Init can be used to start servers and initialize internal state. If you are
- * implementing a subclass, please call the super version of this method.
+ * implementing a subclass, please call the super version of this method. The
+ * socket address provided may be used for initializing the server connection.
+ * If it is used or not used, the message manager should provide a unique
+ * InetSocketAddress that identifies the server for the peer listening on the
+ * socket. This socket address should be returned in
+ * {@link MessageManager#getListenerAddress()}
*
*/
public void init(TaskAttemptID attemptId, BSPPeer<?, ?, ?, ?, M> peer,
@@ -123,4 +128,9 @@
public void registerListener(MessageEventListener<M> listener)
throws IOException;
+ /**
+ * Returns the server address on which the incoming connections are listening
+ * on.
+ */
+ public InetSocketAddress getListenerAddress();
}
diff --git a/core/src/main/java/org/apache/hama/ipc/BSPPeerProtocol.java b/core/src/main/java/org/apache/hama/ipc/BSPPeerProtocol.java
index 9a46721..0776016 100644
--- a/core/src/main/java/org/apache/hama/ipc/BSPPeerProtocol.java
+++ b/core/src/main/java/org/apache/hama/ipc/BSPPeerProtocol.java
@@ -67,10 +67,4 @@
boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus)
throws IOException, InterruptedException;
- /**
- * @param taskid
- * @return assigned port number
- */
- int getAssignedPortNum(TaskAttemptID taskid);
-
}
diff --git a/core/src/main/java/org/apache/hama/util/BSPNetUtils.java b/core/src/main/java/org/apache/hama/util/BSPNetUtils.java
index cc4f3e5..6c4e69f 100644
--- a/core/src/main/java/org/apache/hama/util/BSPNetUtils.java
+++ b/core/src/main/java/org/apache/hama/util/BSPNetUtils.java
@@ -98,14 +98,15 @@
* @return the InetSocketAddress of the given BSP peer
*/
public static InetSocketAddress getAddress(String peerName) {
- String[] peerAddrParts = peerName.split(":");
- if (peerAddrParts.length != 2) {
+ int index = peerName.lastIndexOf(':');
+ if (index <= 0 || index == peerName.length() - 1) {
throw new ArrayIndexOutOfBoundsException(
- "Peername must consist of exactly ONE \":\"! Given peername was: "
- + peerName);
+ "Invalid host and port information. "
+ + "Peername must consist of atleast ONE \":\"! "
+ + "Given peername was: " + peerName);
}
- return new InetSocketAddress(peerAddrParts[0],
- Integer.valueOf(peerAddrParts[1]));
+ return new InetSocketAddress(peerName.substring(0, index),
+ Integer.valueOf(peerName.substring(index + 1)));
}
/**
diff --git a/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java b/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java
index 852a723..2e79d16 100644
--- a/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java
+++ b/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java
@@ -148,11 +148,6 @@
return true;
}
- @Override
- public int getAssignedPortNum(TaskAttemptID taskid) {
- return 0;
- }
-
public synchronized int getPingCount() {
return pingCount;
}
diff --git a/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java b/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
index b8994d5..085114d 100644
--- a/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
+++ b/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
@@ -150,6 +150,12 @@
this.listener = listener;
}
+ @Override
+ public InetSocketAddress getListenerAddress() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
}
public static class TestBSPPeer implements
diff --git a/core/src/test/java/org/apache/hama/bsp/TestLocalRunner.java b/core/src/test/java/org/apache/hama/bsp/TestLocalRunner.java
index 498e21d..42294fb 100644
--- a/core/src/test/java/org/apache/hama/bsp/TestLocalRunner.java
+++ b/core/src/test/java/org/apache/hama/bsp/TestLocalRunner.java
@@ -41,13 +41,13 @@
bsp.setOutputPath(TestBSPMasterGroomServer.OUTPUT_PATH);
conf.setInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 600);
- bsp.setNumBspTask(2);
+ bsp.setNumBspTask(3);
bsp.setInputFormat(NullInputFormat.class);
FileSystem fileSys = FileSystem.get(conf);
if (bsp.waitForCompletion(true)) {
- TestBSPMasterGroomServer.checkOutput(fileSys, conf, 2);
+ TestBSPMasterGroomServer.checkOutput(fileSys, conf, 3);
}
}
diff --git a/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java b/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java
index 42d8819..706f216 100644
--- a/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java
+++ b/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java
@@ -27,6 +27,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
+import org.apache.hama.Constants;
import org.apache.hama.bsp.BSPMessageBundle;
import org.apache.hama.bsp.BSPPeer;
import org.apache.hama.bsp.BSPPeerImpl;
@@ -73,12 +74,16 @@
InetSocketAddress peer = new InetSocketAddress(
BSPNetUtils.getCanonicalHostname(), BSPNetUtils.getFreePort()
+ (increment++));
+ conf.set(Constants.PEER_HOST, Constants.DEFAULT_PEER_HOST);
+ conf.setInt(Constants.PEER_PORT, Constants.DEFAULT_PEER_PORT);
+
BSPPeer<?, ?, ?, ?, IntWritable> dummyPeer = new BSPPeerImpl<NullWritable, NullWritable, NullWritable, NullWritable, IntWritable>(
conf, FileSystem.get(conf), new Counters());
TaskAttemptID id = new TaskAttemptID("1", 1, 1, 1);
messageManager.init(id, dummyPeer, conf, peer);
+ peer = messageManager.getListenerAddress();
String peerName = peer.getHostName() + ":" + peer.getPort();
-
+ System.out.println("Peer is " + peerName);
messageManager.send(peerName, new IntWritable(1337));
Iterator<Entry<InetSocketAddress, MessageQueue<IntWritable>>> messageIterator = messageManager
diff --git a/yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java b/yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java
index 91a78d0..f979ce9 100644
--- a/yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java
+++ b/yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java
@@ -405,9 +405,4 @@
}
- @Override
- public int getAssignedPortNum(TaskAttemptID taskid) {
- return 0;
- }
-
}