blob: 3a7bd4313606e649681806f65a76daf2da20542a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hama.bsp;
import java.net.InetSocketAddress;
import java.util.Arrays;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hama.Constants;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.ipc.BSPPeerProtocol;
import org.apache.hama.ipc.HamaRPCProtocolVersion;
import org.apache.hama.util.BSPNetUtils;
public class BSPRunner {
private static final Log LOG = LogFactory.getLog(BSPRunner.class);
private Configuration conf;
private TaskAttemptID id;
private BSPPeerImpl<?, ?, ?, ?, ? extends Writable> peer;
private Counters counters = new Counters();
@SuppressWarnings("rawtypes")
Class<? extends BSP> bspClass;
@SuppressWarnings({ "unchecked", "rawtypes" })
public BSPRunner(String jobId, int taskAttemptId, Path confPath)
throws Exception {
conf = new HamaConfiguration();
conf.addResource(confPath);
this.id = new TaskAttemptID(jobId, 0, taskAttemptId, 0);
this.id.id = taskAttemptId;
// use a calculatory trick to prevent port collision on the same host
int port = BSPNetUtils.getFreePort(taskAttemptId * 2 + 16000);
conf.setInt(Constants.PEER_PORT, port);
conf.set(Constants.PEER_HOST, BSPNetUtils.getCanonicalHostname());
String umbilicalAddress = conf.get("hama.umbilical.address");
if (umbilicalAddress == null || umbilicalAddress.isEmpty()
|| !umbilicalAddress.contains(":")) {
throw new IllegalArgumentException(
"Umbilical address must contain a colon and must be non-empty and not-null! Property \"hama.umbilical.address\" was: "
+ umbilicalAddress);
}
String[] hostPort = umbilicalAddress.split(":");
InetSocketAddress address = new InetSocketAddress(hostPort[0],
Integer.valueOf(hostPort[1]));
BSPPeerProtocol umbilical = RPC.getProxy(BSPPeerProtocol.class,
HamaRPCProtocolVersion.versionID, address, conf);
BSPJob job = new BSPJob(new HamaConfiguration(conf));
BSPTask task = (BSPTask) umbilical.getTask(id);
peer = new BSPPeerImpl(job, conf, id, umbilical, id.id, task.splitClass,
task.split, counters);
// this is a checked cast because we can only set a class via the BSPJob
// class which only allows derivates of BSP.
bspClass = (Class<? extends BSP>) conf.getClassByName(conf
.get("bsp.work.class"));
}
@SuppressWarnings({ "unchecked", "rawtypes" })
public void startComputation() throws Exception {
BSP bspInstance = ReflectionUtils.newInstance(bspClass, conf);
// Throw the first exception and log the remaining.
Exception firstException = null;
try {
bspInstance.setup(peer);
bspInstance.bsp(peer);
} catch (Exception e) {
LOG.error("Error occured while running bsp function.", e);
firstException = e;
} finally {
try {
bspInstance.cleanup(peer);
} catch (Exception e) {
LOG.error("Cleaning up after bsp function.", e);
if (firstException == null) {
firstException = e;
}
} finally {
try {
peer.close();
} catch (Exception e) {
LOG.error("Error closing the bsp peer", e);
if (firstException == null)
firstException = e;
} finally {
if (firstException != null)
throw firstException;
}
}
}
}
/**
* Main entry point after a container has launched.
*
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
LOG.info("Starting task with arguments: " + Arrays.toString(args));
if (args.length != 3) {
throw new IllegalArgumentException("Expected 3 args given, but found: "
+ Arrays.toString(args));
}
// jobid is the first of the args (string)
// taskid is the second arg (int)
// third arg is the qualified path of the job configuration
BSPRunner bspRunner = new BSPRunner(args[0], Integer.valueOf(args[1]),
new Path(args[2]));
bspRunner.startComputation();
LOG.info("Task successfully ended!");
}
}