blob: 86c711cf83fc7e8eba7cb1f092c97f8723cc82f0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.giraph.graph;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.giraph.writable.kryo.KryoWritableWrapper;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;
import java.io.IOException;
/**
* This mapper that will execute the BSP graph tasks alloted to this worker.
* All tasks will be performed by calling the GraphTaskManager object managed by
* this GraphMapper wrapper classs. Since this mapper will
* not be passing data by key-value pairs through the MR framework, the
* Mapper parameter types are irrelevant, and set to <code>Object</code> type.
*
* @param <I> Vertex id
* @param <V> Vertex data
* @param <E> Edge data
*/
@SuppressWarnings("rawtypes")
public class GraphMapper<I extends WritableComparable, V extends Writable,
E extends Writable> extends
Mapper<Object, Object, Object, Object> {
/** Class logger */
private static final Logger LOG = Logger.getLogger(GraphMapper.class);
/** Manage the framework-agnostic Giraph tasks for this job run */
private GraphTaskManager<I, V, E> graphTaskManager;
@Override
public void setup(Context context)
throws IOException, InterruptedException {
// Execute all Giraph-related role(s) assigned to this compute node.
// Roles can include "master," "worker," "zookeeper," or . . . ?
graphTaskManager = new GraphTaskManager<I, V, E>(context);
graphTaskManager.setup(
DistributedCache.getLocalCacheArchives(context.getConfiguration()));
}
/**
* GraphMapper is designed to host the compute node in a Hadoop
* Mapper task. The GraphTaskManager owned by GraphMapper manages all
* framework-independent Giraph BSP operations for this job run.
* @param key unused arg required by Mapper API
* @param value unused arg required by Mapper API
* @param context the Mapper#Context required to report progress
* to the underlying cluster
*/
@Override
public void map(Object key, Object value, Context context)
throws IOException, InterruptedException {
// a no-op in Giraph
}
@Override
public void cleanup(Context context)
throws IOException, InterruptedException {
graphTaskManager.cleanup();
}
@Override
public void run(Context context) throws IOException, InterruptedException {
// Notify the master quicker if there is worker failure rather than
// waiting for ZooKeeper to timeout and delete the ephemeral znodes
try {
setup(context);
while (context.nextKeyValue()) {
graphTaskManager.execute();
}
cleanup(context);
// Checkstyle exception due to needing to dump ZooKeeper failure
// on exception
// CHECKSTYLE: stop IllegalCatch
} catch (RuntimeException e) {
// CHECKSTYLE: resume IllegalCatch
byte [] exByteArray = KryoWritableWrapper.convertToByteArray(e);
LOG.error("Caught an unrecoverable exception " + e.getMessage(), e);
graphTaskManager.getJobProgressTracker().logError(
"Exception occurred on mapper " +
graphTaskManager.getConf().getTaskPartition() + ": " +
ExceptionUtils.getStackTrace(e), exByteArray);
graphTaskManager.zooKeeperCleanup();
graphTaskManager.workerFailureCleanup();
throw new IllegalStateException(
"run: Caught an unrecoverable exception " + e.getMessage(), e);
}
}
}