git-svn-id: https://svn.apache.org/repos/asf/hama/trunk@1679360 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java b/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
index ec2094e..cd45d95 100644
--- a/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
+++ b/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
@@ -471,7 +471,7 @@
partitioningJob.getConfiguration().setClass(
MessageManager.RECEIVE_QUEUE_TYPE_CLASS, MemoryQueue.class,
MessageQueue.class);
-
+
partitioningJob.setBoolean(Constants.FORCE_SET_BSP_TASKS, true);
partitioningJob.setInputFormat(job.getInputFormat().getClass());
partitioningJob.setInputKeyClass(job.getInputKeyClass());
@@ -578,6 +578,15 @@
DataOutputBuffer buffer = new DataOutputBuffer();
RawSplit rawSplit = new RawSplit();
for (InputSplit split : splits) {
+
+ // set partitionID to rawSplit
+ if (split.getClass().getName().equals(FileSplit.class.getName())
+ && job.getConfiguration().get(Constants.RUNTIME_PARTITIONING_CLASS) != null
+ && job.get("bsp.partitioning.runner.job") == null) {
+ String[] extractPartitionID = ((FileSplit) split).getPath().getName().split("[-]");
+ rawSplit.setPartitionID(Integer.parseInt(extractPartitionID[1]));
+ }
+
rawSplit.setClassName(split.getClass().getName());
buffer.reset();
split.write(buffer);
@@ -629,7 +638,10 @@
for (int i = 0; i < len; ++i) {
RawSplit split = new RawSplit();
split.readFields(in);
- result[i] = split;
+ if (split.getPartitionID() != Integer.MIN_VALUE)
+ result[split.getPartitionID()] = split;
+ else
+ result[i] = split;
}
return result;
}
@@ -1075,12 +1087,21 @@
private String splitClass;
private BytesWritable bytes = new BytesWritable();
private String[] locations;
+ private int partitionID = Integer.MIN_VALUE;
long dataLength;
public void setBytes(byte[] data, int offset, int length) {
bytes.set(data, offset, length);
}
+ public void setPartitionID(int id) {
+ this.partitionID = id;
+ }
+
+ public int getPartitionID() {
+ return partitionID;
+ }
+
public void setClassName(String className) {
splitClass = className;
}
diff --git a/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java b/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
index 64ba941..6631b2f 100644
--- a/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
+++ b/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
@@ -144,7 +144,6 @@
raw = new MapWritable();
raw.put(rawRecord.getKey(), rawRecord.getValue());
- System.out.println(peer.getPeerName(index) + ", " + rawRecord.getKey() + ", " + rawRecord.getValue());
peer.send(peer.getPeerName(index), raw);
}
diff --git a/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java b/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java
index 495fa74..4b548fd 100644
--- a/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java
+++ b/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java
@@ -78,8 +78,6 @@
int expectedPeerId = Math.abs(key.hashCode() % numTasks);
- System.out.println(peer.getPeerName() + ": " + key + ", " + value + ", " + expectedPeerId);
- /*
if (expectedPeerId == peer.getPeerIndex()) {
expectedKeys.put(new Text(key), new Text(value));
} else {
@@ -88,7 +86,6 @@
new BooleanWritable(true));
break;
}
- */
}
message.put(new Text(KeyValueHashPartitionedBSP.TEST_INPUT_VALUES),
expectedKeys);
@@ -106,7 +103,6 @@
while ((msg = peer.getCurrentMessage()) != null) {
blValue = (BooleanWritable) msg.get(new Text(
KeyValueHashPartitionedBSP.TEST_UNEXPECTED_KEYS));
- System.out.println(">>>>> " + peer.getPeerName() + ", " + blValue.get());
assertEquals(false, blValue.get());
values = (MapWritable) msg.get(new Text(
KeyValueHashPartitionedBSP.TEST_INPUT_VALUES));