blob: ffc78349cb3f1d9a2c2d721a3dc8abfb382ca41b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hama.examples;
import java.io.IOException;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.BSP;
import org.apache.hama.bsp.BSPJob;
import org.apache.hama.bsp.BSPJobClient;
import org.apache.hama.bsp.BSPPeer;
import org.apache.hama.bsp.ClusterStatus;
import org.apache.hama.bsp.NullInputFormat;
import org.apache.hama.bsp.NullOutputFormat;
import org.apache.hama.bsp.sync.SyncException;
public class RandBench {
private static final String SIZEOFMSG = "msg.size";
private static final String N_COMMUNICATIONS = "communications.num";
private static final String N_SUPERSTEPS = "supersteps.num";
public static class RandBSP
extends
BSP<NullWritable, NullWritable, NullWritable, NullWritable, BytesWritable> {
public static final Log LOG = LogFactory.getLog(RandBSP.class);
private Random r = new Random();
private int sizeOfMsg;
private int nCommunications;
private int nSupersteps;
@Override
public void bsp(
BSPPeer<NullWritable, NullWritable, NullWritable, NullWritable, BytesWritable> peer)
throws IOException, SyncException, InterruptedException {
byte[] dummyData = new byte[sizeOfMsg];
String[] peers = peer.getAllPeerNames();
for (int i = (int) peer.getSuperstepCount(); i < nSupersteps; i++) {
for (int j = 0; j < nCommunications; j++) {
String tPeer = peers[r.nextInt(peers.length)];
BytesWritable data = new BytesWritable();
data.set(dummyData, 0, dummyData.length);
peer.send(tPeer, data);
}
peer.sync();
BytesWritable received;
while ((received = peer.getCurrentMessage()) != null) {
LOG.info(received.getBytes().length);
}
}
}
@Override
public void setup(
BSPPeer<NullWritable, NullWritable, NullWritable, NullWritable, BytesWritable> peer) {
this.sizeOfMsg = peer.getConfiguration().getInt(SIZEOFMSG, 1);
this.nCommunications = peer.getConfiguration()
.getInt(N_COMMUNICATIONS, 1);
this.nSupersteps = peer.getConfiguration().getInt(N_SUPERSTEPS, 1);
}
}
public static void main(String[] args) throws Exception {
if (args.length < 3) {
System.out.println("Usage: <sizeOfMsg> <nCommunications> <nSupersteps>");
System.exit(-1);
}
// BSP job configuration
HamaConfiguration conf = new HamaConfiguration();
conf.setInt(SIZEOFMSG, Integer.parseInt(args[0]));
conf.setInt(N_COMMUNICATIONS, Integer.parseInt(args[1]));
conf.setInt(N_SUPERSTEPS, Integer.parseInt(args[2]));
BSPJob bsp = new BSPJob(conf, RandBench.class);
// Set the job name
bsp.setJobName("Random Communication Benchmark");
bsp.setBspClass(RandBSP.class);
bsp.setInputFormat(NullInputFormat.class);
bsp.setOutputFormat(NullOutputFormat.class);
// Set the task size as a number of GroomServer
BSPJobClient jobClient = new BSPJobClient(conf);
ClusterStatus cluster = jobClient.getClusterStatus(false);
bsp.setNumBspTask(cluster.getMaxTasks());
long startTime = System.currentTimeMillis();
if (bsp.waitForCompletion(true)) {
System.out.println("Job Finished in "
+ (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
}
}
}