HAMA-789: BspPeer launched fail because port is bound by others (Suraj Menon via edwardyoon)

git-svn-id: https://svn.apache.org/repos/asf/hama/trunk@1522784 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 0b34907..236ef08 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -11,6 +11,7 @@
 
   BUG FIXES
   
+   HAMA-789: BspPeer launched fail because port is bound by others (Suraj Menon via edwardyoon)
    HAMA-791: Fix the problem that MultilayerPerceptron fails to learn a good hypothesis sometimes. (Yexi Jiang)
    HAMA-782: The arguments of DoubleVector.slice(int, int) method will mislead the user. (Yexi Jiang)
    HAMA-780: New launched child processes by fault tolerance may not be able to contact each other (kennethxian)
diff --git a/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java b/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
index 5560c2c..8023f90 100644
--- a/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
+++ b/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
@@ -167,6 +167,16 @@
         .getInt(Constants.PEER_PORT, Constants.DEFAULT_PEER_PORT);
     peerAddress = new InetSocketAddress(bindAddress, bindPort);
 
+    // This function call may change the current peer address
+    initializeMessaging();
+    
+    conf.set(Constants.PEER_HOST, peerAddress.getHostName());
+    conf.setInt(Constants.PEER_PORT, peerAddress.getPort());
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Initialized Messaging service.");
+    }
+    
     initializeIO();
     initializeSyncService(superstep, state);
 
@@ -180,11 +190,6 @@
     setCurrentTaskStatus(new TaskStatus(taskId.getJobID(), taskId, 1.0f, state,
         stateString, peerAddress.getHostName(), phase, counters));
 
-    initilizeMessaging();
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Initialized Messaging service.");
-    }
-
     final String combinerName = conf.get(Constants.COMBINER_CLASS);
     if (combinerName != null) {
       combiner = (Combiner<M>) ReflectionUtils.newInstance(
@@ -288,9 +293,10 @@
     return in.getPos();
   }
 
-  public final void initilizeMessaging() throws ClassNotFoundException {
+  public final void initializeMessaging() throws ClassNotFoundException {
     messenger = MessageManagerFactory.getMessageManager(conf);
     messenger.init(taskId, this, conf, peerAddress);
+    peerAddress = messenger.getListenerAddress();
   }
 
   public final void initializeSyncService(long superstep, TaskStatus.State state)
diff --git a/core/src/main/java/org/apache/hama/bsp/GroomServer.java b/core/src/main/java/org/apache/hama/bsp/GroomServer.java
index 03be1a4..32d73fc 100644
--- a/core/src/main/java/org/apache/hama/bsp/GroomServer.java
+++ b/core/src/main/java/org/apache/hama/bsp/GroomServer.java
@@ -126,7 +126,6 @@
   /** Map from taskId -> TaskInProgress. */
   Map<TaskAttemptID, TaskInProgress> runningTasks = null;
   Map<TaskAttemptID, TaskInProgress> finishedTasks = null;
-  Map<TaskAttemptID, Integer> assignedPeerNames = null;
   Map<BSPJobID, RunningJob> runningJobs = null;
 
   // new nexus between GroomServer and BSPMaster
@@ -162,17 +161,9 @@
       }
 
       if (actions != null) {
-        // assignedPeerNames = new HashMap<TaskAttemptID, Integer>();
-        int prevPort = Constants.DEFAULT_PEER_PORT;
 
         for (GroomServerAction action : actions) {
           if (action instanceof LaunchTaskAction) {
-            Task t = ((LaunchTaskAction) action).getTask();
-
-            synchronized (assignedPeerNames) {
-              prevPort = BSPNetUtils.getNextAvailable(prevPort);
-              assignedPeerNames.put(t.getTaskID(), prevPort);
-            }
             LOG.info("Launch " + actions.length + " tasks.");
             startNewTask((LaunchTaskAction) action);
           } else if (action instanceof KillTaskAction) {
@@ -199,10 +190,6 @@
             RecoverTaskAction recoverAction = (RecoverTaskAction) action;
             Task t = recoverAction.getTask();
             LOG.info("Recovery action task." + t.getTaskID());
-            synchronized (assignedPeerNames) {
-              prevPort = BSPNetUtils.getNextAvailable(prevPort);
-              assignedPeerNames.put(t.getTaskID(), prevPort);
-            }
             try {
               startRecoveryTask(recoverAction);
             } catch (IOException e) {
@@ -336,8 +323,6 @@
     this.conf.set(Constants.PEER_HOST, localHostname);
     this.conf.set(Constants.GROOM_RPC_HOST, localHostname);
     this.maxCurrentTasks = conf.getInt(Constants.MAX_TASKS_PER_GROOM, 3);
-    this.assignedPeerNames = new HashMap<TaskAttemptID, Integer>(
-        2 * this.maxCurrentTasks);
 
     int rpcPort = -1;
     String rpcAddr = null;
@@ -1231,7 +1216,8 @@
           defaultConf);
 
       final BSPTask task = (BSPTask) umbilical.getTask(taskid);
-      int peerPort = umbilical.getAssignedPortNum(taskid);
+      int peerPort = Constants.DEFAULT_PEER_PORT;
+      peerPort = BSPNetUtils.getNextAvailable(peerPort);
 
       defaultConf.addResource(new Path(task.getJobFile()));
       BSPJob job = new BSPJob(task.getJobID(), task.getJobFile());
@@ -1366,11 +1352,6 @@
   }
 
   @Override
-  public int getAssignedPortNum(TaskAttemptID taskid) {
-    return assignedPeerNames.get(taskid);
-  }
-
-  @Override
   public void process(WatchedEvent event) {
   }
 
diff --git a/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java b/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
index 5a5738d..c5a6846 100644
--- a/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
+++ b/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
@@ -335,12 +335,14 @@
 
     @SuppressWarnings("rawtypes")
     private static final ConcurrentHashMap<InetSocketAddress, LocalMessageManager> MANAGER_MAP = new ConcurrentHashMap<InetSocketAddress, LocalBSPRunner.LocalMessageManager>();
+    private InetSocketAddress selfAddress;
 
     @Override
     public void init(TaskAttemptID attemptId, BSPPeer<?, ?, ?, ?, M> peer,
         Configuration conf, InetSocketAddress peerAddress) {
       super.init(attemptId, peer, conf, peerAddress);
       MANAGER_MAP.put(peerAddress, this);
+      selfAddress = peerAddress;
     }
 
     @SuppressWarnings("unchecked")
@@ -353,6 +355,11 @@
             1L);
       }
     }
+
+    @Override
+    public InetSocketAddress getListenerAddress() {
+      return selfAddress;
+    }
   }
 
   public static class LocalUmbilical implements BSPPeerProtocol {
@@ -400,12 +407,6 @@
         throws IOException, InterruptedException {
       return true;
     }
-
-    @Override
-    public int getAssignedPortNum(TaskAttemptID taskid) {
-      return 0;
-    }
-
   }
 
   public static class LocalSyncClient extends BSPPeerSyncClient {
diff --git a/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java b/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
index 07789dd..9cf37d3 100644
--- a/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
+++ b/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
@@ -65,8 +65,7 @@
   protected SynchronizedQueue<M> localQueueForNextIteration;
   // this peer object is just used for counter incrementation
   protected BSPPeer<?, ?, ?, ?, M> peer;
-  // the peer address of this peer
-  protected InetSocketAddress peerAddress;
+  
   // the task attempt id
   protected TaskAttemptID attemptId;
 
@@ -89,7 +88,6 @@
     this.attemptId = attemptId;
     this.peer = peer;
     this.conf = conf;
-    this.peerAddress = peerAddress;
     this.localQueue = getReceiverQueue();
     this.localQueueForNextIteration = getSynchronizedReceiverQueue();
     this.maxCachedConnections = conf.getInt(MAX_CACHED_CONNECTIONS_KEY, 100);
diff --git a/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java b/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
index 98ed935..322ff01 100644
--- a/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
+++ b/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
@@ -25,14 +25,15 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
-import org.apache.hama.ipc.RPC;
-import org.apache.hama.ipc.RPC.Server;
+import org.apache.hama.Constants;
 import org.apache.hama.bsp.BSPMessageBundle;
 import org.apache.hama.bsp.BSPPeer;
 import org.apache.hama.bsp.BSPPeerImpl;
 import org.apache.hama.bsp.TaskAttemptID;
 import org.apache.hama.bsp.message.compress.BSPCompressedBundle;
 import org.apache.hama.ipc.HamaRPCProtocolVersion;
+import org.apache.hama.ipc.RPC;
+import org.apache.hama.ipc.RPC.Server;
 import org.apache.hama.util.LRUCache;
 
 /**
@@ -45,7 +46,7 @@
   private static final Log LOG = LogFactory
       .getLog(HadoopMessageManagerImpl.class);
 
-  private Server server = null;
+  private Server server;
 
   private LRUCache<InetSocketAddress, HadoopMessageManager<M>> peersLRUCache = null;
 
@@ -74,11 +75,16 @@
   private final void startRPCServer(Configuration conf,
       InetSocketAddress peerAddress) {
     try {
-      this.server = RPC.getServer(this, peerAddress.getHostName(),
-          peerAddress.getPort(), conf);
+      String bindAddress = conf.get(Constants.PEER_HOST,
+          Constants.DEFAULT_PEER_HOST);
+      InetSocketAddress selfAddress = new InetSocketAddress(bindAddress, 0);
+
+      this.server = RPC.getServer(this, selfAddress.getHostName(),
+          selfAddress.getPort(), conf);
       server.start();
-      LOG.info(" BSPPeer address:" + peerAddress.getHostName() + " port:"
-          + peerAddress.getPort());
+      
+      LOG.info(" BSPPeer address:" + server.getListenerAddress().getHostName()
+          + " port:" + server.getListenerAddress().getPort());
     } catch (IOException e) {
       LOG.error("Fail to start RPC server!", e);
       throw new RuntimeException("RPC Server could not be launched!");
@@ -158,4 +164,11 @@
     return versionID;
   }
 
+  @Override
+  public InetSocketAddress getListenerAddress() {
+    if (this.server != null) {
+      return this.server.getListenerAddress();
+    }
+    return null;
+  }
 }
diff --git a/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java b/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
index e26771c..856c147 100644
--- a/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
+++ b/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
@@ -45,7 +45,12 @@
 
   /**
    * Init can be used to start servers and initialize internal state. If you are
-   * implementing a subclass, please call the super version of this method.
+   * implementing a subclass, please call the super version of this method. The
+   * socket address provided may be used for initializing the server connection.
+   * If it is used or not used, the message manager should provide a unique
+   * InetSocketAddress that identifies the server for the peer listening on the
+   * socket. This socket address should be returned in
+   * {@link MessageManager#getListenerAddress()}
    * 
    */
   public void init(TaskAttemptID attemptId, BSPPeer<?, ?, ?, ?, M> peer,
@@ -123,4 +128,9 @@
   public void registerListener(MessageEventListener<M> listener)
       throws IOException;
 
+  /**
+   * Returns the server address on which the incoming connections are listening
+   * on.
+   */
+  public InetSocketAddress getListenerAddress();
 }
diff --git a/core/src/main/java/org/apache/hama/ipc/BSPPeerProtocol.java b/core/src/main/java/org/apache/hama/ipc/BSPPeerProtocol.java
index 9a46721..0776016 100644
--- a/core/src/main/java/org/apache/hama/ipc/BSPPeerProtocol.java
+++ b/core/src/main/java/org/apache/hama/ipc/BSPPeerProtocol.java
@@ -67,10 +67,4 @@
   boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus)
       throws IOException, InterruptedException;
 
-  /**
-   * @param taskid
-   * @return assigned port number
-   */
-  int getAssignedPortNum(TaskAttemptID taskid);
-
 }
diff --git a/core/src/main/java/org/apache/hama/util/BSPNetUtils.java b/core/src/main/java/org/apache/hama/util/BSPNetUtils.java
index cc4f3e5..6c4e69f 100644
--- a/core/src/main/java/org/apache/hama/util/BSPNetUtils.java
+++ b/core/src/main/java/org/apache/hama/util/BSPNetUtils.java
@@ -98,14 +98,15 @@
    * @return the InetSocketAddress of the given BSP peer
    */
   public static InetSocketAddress getAddress(String peerName) {
-    String[] peerAddrParts = peerName.split(":");
-    if (peerAddrParts.length != 2) {
+    int index = peerName.lastIndexOf(':');
+    if (index <= 0 || index == peerName.length() - 1) {
       throw new ArrayIndexOutOfBoundsException(
-          "Peername must consist of exactly ONE \":\"! Given peername was: "
-              + peerName);
+          "Invalid host and port information. "
+              + "Peername must consist of atleast ONE \":\"! "
+              + "Given peername was: " + peerName);
     }
-    return new InetSocketAddress(peerAddrParts[0],
-        Integer.valueOf(peerAddrParts[1]));
+    return new InetSocketAddress(peerName.substring(0, index),
+        Integer.valueOf(peerName.substring(index + 1)));
   }
 
   /**
diff --git a/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java b/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java
index 852a723..2e79d16 100644
--- a/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java
+++ b/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java
@@ -148,11 +148,6 @@
       return true;
     }
 
-    @Override
-    public int getAssignedPortNum(TaskAttemptID taskid) {
-      return 0;
-    }
-
     public synchronized int getPingCount() {
       return pingCount;
     }
diff --git a/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java b/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
index b8994d5..085114d 100644
--- a/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
+++ b/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
@@ -150,6 +150,12 @@
       this.listener = listener;
     }
 
+	@Override
+	public InetSocketAddress getListenerAddress() {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
   }
 
   public static class TestBSPPeer implements
diff --git a/core/src/test/java/org/apache/hama/bsp/TestLocalRunner.java b/core/src/test/java/org/apache/hama/bsp/TestLocalRunner.java
index 498e21d..42294fb 100644
--- a/core/src/test/java/org/apache/hama/bsp/TestLocalRunner.java
+++ b/core/src/test/java/org/apache/hama/bsp/TestLocalRunner.java
@@ -41,13 +41,13 @@
     bsp.setOutputPath(TestBSPMasterGroomServer.OUTPUT_PATH);
 
     conf.setInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 600);
-    bsp.setNumBspTask(2);
+    bsp.setNumBspTask(3);
     bsp.setInputFormat(NullInputFormat.class);
 
     FileSystem fileSys = FileSystem.get(conf);
 
     if (bsp.waitForCompletion(true)) {
-      TestBSPMasterGroomServer.checkOutput(fileSys, conf, 2);
+      TestBSPMasterGroomServer.checkOutput(fileSys, conf, 3);
     }
   }
 
diff --git a/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java b/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java
index 42d8819..706f216 100644
--- a/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java
+++ b/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java
@@ -27,6 +27,7 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.NullWritable;
+import org.apache.hama.Constants;
 import org.apache.hama.bsp.BSPMessageBundle;
 import org.apache.hama.bsp.BSPPeer;
 import org.apache.hama.bsp.BSPPeerImpl;
@@ -73,12 +74,16 @@
     InetSocketAddress peer = new InetSocketAddress(
         BSPNetUtils.getCanonicalHostname(), BSPNetUtils.getFreePort()
             + (increment++));
+    conf.set(Constants.PEER_HOST, Constants.DEFAULT_PEER_HOST);
+    conf.setInt(Constants.PEER_PORT, Constants.DEFAULT_PEER_PORT);
+
     BSPPeer<?, ?, ?, ?, IntWritable> dummyPeer = new BSPPeerImpl<NullWritable, NullWritable, NullWritable, NullWritable, IntWritable>(
         conf, FileSystem.get(conf), new Counters());
     TaskAttemptID id = new TaskAttemptID("1", 1, 1, 1);
     messageManager.init(id, dummyPeer, conf, peer);
+    peer = messageManager.getListenerAddress();
     String peerName = peer.getHostName() + ":" + peer.getPort();
-
+    System.out.println("Peer is " + peerName);
     messageManager.send(peerName, new IntWritable(1337));
 
     Iterator<Entry<InetSocketAddress, MessageQueue<IntWritable>>> messageIterator = messageManager
diff --git a/yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java b/yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java
index 91a78d0..f979ce9 100644
--- a/yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java
+++ b/yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java
@@ -405,9 +405,4 @@
 
   }
 
-  @Override
-  public int getAssignedPortNum(TaskAttemptID taskid) {
-    return 0;
-  }
-
 }