git-svn-id: https://svn.apache.org/repos/asf/hama/trunk@1679360 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java b/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
index ec2094e..cd45d95 100644
--- a/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
+++ b/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
@@ -471,7 +471,7 @@
         partitioningJob.getConfiguration().setClass(
             MessageManager.RECEIVE_QUEUE_TYPE_CLASS, MemoryQueue.class,
             MessageQueue.class);
-        
+
         partitioningJob.setBoolean(Constants.FORCE_SET_BSP_TASKS, true);
         partitioningJob.setInputFormat(job.getInputFormat().getClass());
         partitioningJob.setInputKeyClass(job.getInputKeyClass());
@@ -578,6 +578,15 @@
       DataOutputBuffer buffer = new DataOutputBuffer();
       RawSplit rawSplit = new RawSplit();
       for (InputSplit split : splits) {
+
+        // set partitionID to rawSplit
+        if (split.getClass().getName().equals(FileSplit.class.getName())
+            && job.getConfiguration().get(Constants.RUNTIME_PARTITIONING_CLASS) != null
+            && job.get("bsp.partitioning.runner.job") == null) {
+          String[] extractPartitionID = ((FileSplit) split).getPath().getName().split("[-]");
+          rawSplit.setPartitionID(Integer.parseInt(extractPartitionID[1]));
+        }
+
         rawSplit.setClassName(split.getClass().getName());
         buffer.reset();
         split.write(buffer);
@@ -629,7 +638,10 @@
     for (int i = 0; i < len; ++i) {
       RawSplit split = new RawSplit();
       split.readFields(in);
-      result[i] = split;
+      if (split.getPartitionID() != Integer.MIN_VALUE)
+        result[split.getPartitionID()] = split;
+      else
+        result[i] = split;
     }
     return result;
   }
@@ -1075,12 +1087,21 @@
     private String splitClass;
     private BytesWritable bytes = new BytesWritable();
     private String[] locations;
+    private int partitionID = Integer.MIN_VALUE;
     long dataLength;
 
     public void setBytes(byte[] data, int offset, int length) {
       bytes.set(data, offset, length);
     }
 
+    public void setPartitionID(int id) {
+      this.partitionID = id;
+    }
+
+    public int getPartitionID() {
+      return partitionID;
+    }
+
     public void setClassName(String className) {
       splitClass = className;
     }
diff --git a/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java b/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
index 64ba941..6631b2f 100644
--- a/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
+++ b/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
@@ -144,7 +144,6 @@
       raw = new MapWritable();
       raw.put(rawRecord.getKey(), rawRecord.getValue());
       
-      System.out.println(peer.getPeerName(index) + ", " + rawRecord.getKey() + ", " + rawRecord.getValue());
       peer.send(peer.getPeerName(index), raw);
     }
 
diff --git a/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java b/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java
index 495fa74..4b548fd 100644
--- a/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java
+++ b/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java
@@ -78,8 +78,6 @@
 
         int expectedPeerId = Math.abs(key.hashCode() % numTasks);
 
-        System.out.println(peer.getPeerName() + ": " + key + ", " + value + ", " + expectedPeerId);
-        /*
         if (expectedPeerId == peer.getPeerIndex()) {
           expectedKeys.put(new Text(key), new Text(value));
         } else {
@@ -88,7 +86,6 @@
               new BooleanWritable(true));
           break;
         }
-        */
       }
       message.put(new Text(KeyValueHashPartitionedBSP.TEST_INPUT_VALUES),
           expectedKeys);
@@ -106,7 +103,6 @@
         while ((msg = peer.getCurrentMessage()) != null) {
           blValue = (BooleanWritable) msg.get(new Text(
               KeyValueHashPartitionedBSP.TEST_UNEXPECTED_KEYS));
-          System.out.println(">>>>> " + peer.getPeerName() + ", " + blValue.get());
           assertEquals(false, blValue.get());
           values = (MapWritable) msg.get(new Text(
               KeyValueHashPartitionedBSP.TEST_INPUT_VALUES));