blob: 95620b3ccb8aebbc5886c361d789d7df7d2bbdc1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred.lib;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapred.MapRunnable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SkipBadRecords;
import org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.io.IOException;
import java.util.concurrent.*;
/**
* Multithreaded implementation for @link org.apache.hadoop.mapred.MapRunnable.
* <p>
* It can be used instead of the default implementation,
* @link org.apache.hadoop.mapred.MapRunner, when the Map operation is not CPU
* bound in order to improve throughput.
* <p>
* Map implementations using this MapRunnable must be thread-safe.
* <p>
* The Map-Reduce job has to be configured to use this MapRunnable class (using
* the JobConf.setMapRunnerClass method) and
* the number of thread the thread-pool can use with the
* <code>mapred.map.multithreadedrunner.threads</code> property, its default
* value is 10 threads.
* <p>
* @deprecated Use {@link MultithreadedMapper} instead.
*/
@Deprecated
@InterfaceAudience.Public
@InterfaceStability.Stable
public class MultithreadedMapRunner<K1, V1, K2, V2>
implements MapRunnable<K1, V1, K2, V2> {
private static final Log LOG =
LogFactory.getLog(MultithreadedMapRunner.class.getName());
private JobConf job;
private Mapper<K1, V1, K2, V2> mapper;
private ExecutorService executorService;
private volatile IOException ioException;
private volatile RuntimeException runtimeException;
private boolean incrProcCount;
@SuppressWarnings("unchecked")
public void configure(JobConf jobConf) {
int numberOfThreads =
jobConf.getInt(MultithreadedMapper.NUM_THREADS, 10);
if (LOG.isDebugEnabled()) {
LOG.debug("Configuring jobConf " + jobConf.getJobName() +
" to use " + numberOfThreads + " threads");
}
this.job = jobConf;
//increment processed counter only if skipping feature is enabled
this.incrProcCount = SkipBadRecords.getMapperMaxSkipRecords(job)>0 &&
SkipBadRecords.getAutoIncrMapperProcCount(job);
this.mapper = ReflectionUtils.newInstance(jobConf.getMapperClass(),
jobConf);
// Creating a threadpool of the configured size to execute the Mapper
// map method in parallel.
executorService = new ThreadPoolExecutor(numberOfThreads, numberOfThreads,
0L, TimeUnit.MILLISECONDS,
new BlockingArrayQueue
(numberOfThreads));
}
/**
* A blocking array queue that replaces offer and add, which throws on a full
* queue, to a put, which waits on a full queue.
*/
private static class BlockingArrayQueue extends ArrayBlockingQueue<Runnable> {
private static final long serialVersionUID = 1L;
public BlockingArrayQueue(int capacity) {
super(capacity);
}
public boolean offer(Runnable r) {
return add(r);
}
public boolean add(Runnable r) {
try {
put(r);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
return true;
}
}
private void checkForExceptionsFromProcessingThreads()
throws IOException, RuntimeException {
// Checking if a Mapper.map within a Runnable has generated an
// IOException. If so we rethrow it to force an abort of the Map
// operation thus keeping the semantics of the default
// implementation.
if (ioException != null) {
throw ioException;
}
// Checking if a Mapper.map within a Runnable has generated a
// RuntimeException. If so we rethrow it to force an abort of the Map
// operation thus keeping the semantics of the default
// implementation.
if (runtimeException != null) {
throw runtimeException;
}
}
public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
Reporter reporter)
throws IOException {
try {
// allocate key & value instances these objects will not be reused
// because execution of Mapper.map is not serialized.
K1 key = input.createKey();
V1 value = input.createValue();
while (input.next(key, value)) {
executorService.execute(new MapperInvokeRunable(key, value, output,
reporter));
checkForExceptionsFromProcessingThreads();
// Allocate new key & value instances as mapper is running in parallel
key = input.createKey();
value = input.createValue();
}
if (LOG.isDebugEnabled()) {
LOG.debug("Finished dispatching all Mappper.map calls, job "
+ job.getJobName());
}
// Graceful shutdown of the Threadpool, it will let all scheduled
// Runnables to end.
executorService.shutdown();
try {
// Now waiting for all Runnables to end.
while (!executorService.awaitTermination(100, TimeUnit.MILLISECONDS)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Awaiting all running Mappper.map calls to finish, job "
+ job.getJobName());
}
// NOTE: while Mapper.map dispatching has concluded there are still
// map calls in progress and exceptions would be thrown.
checkForExceptionsFromProcessingThreads();
}
// NOTE: it could be that a map call has had an exception after the
// call for awaitTermination() returing true. And edge case but it
// could happen.
checkForExceptionsFromProcessingThreads();
} catch (IOException ioEx) {
// Forcing a shutdown of all thread of the threadpool and rethrowing
// the IOException
executorService.shutdownNow();
throw ioEx;
} catch (InterruptedException iEx) {
throw new RuntimeException(iEx);
}
} finally {
mapper.close();
}
}
/**
* Runnable to execute a single Mapper.map call from a forked thread.
*/
private class MapperInvokeRunable implements Runnable {
private K1 key;
private V1 value;
private OutputCollector<K2, V2> output;
private Reporter reporter;
/**
* Collecting all required parameters to execute a Mapper.map call.
* <p>
*
* @param key
* @param value
* @param output
* @param reporter
*/
public MapperInvokeRunable(K1 key, V1 value,
OutputCollector<K2, V2> output,
Reporter reporter) {
this.key = key;
this.value = value;
this.output = output;
this.reporter = reporter;
}
/**
* Executes a Mapper.map call with the given Mapper and parameters.
* <p>
* This method is called from the thread-pool thread.
*
*/
public void run() {
try {
// map pair to output
MultithreadedMapRunner.this.mapper.map(key, value, output, reporter);
if(incrProcCount) {
reporter.incrCounter(SkipBadRecords.COUNTER_GROUP,
SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS, 1);
}
} catch (IOException ex) {
// If there is an IOException during the call it is set in an instance
// variable of the MultithreadedMapRunner from where it will be
// rethrown.
synchronized (MultithreadedMapRunner.this) {
if (MultithreadedMapRunner.this.ioException == null) {
MultithreadedMapRunner.this.ioException = ex;
}
}
} catch (RuntimeException ex) {
// If there is a RuntimeException during the call it is set in an
// instance variable of the MultithreadedMapRunner from where it will be
// rethrown.
synchronized (MultithreadedMapRunner.this) {
if (MultithreadedMapRunner.this.runtimeException == null) {
MultithreadedMapRunner.this.runtimeException = ex;
}
}
}
}
}
}