| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.mapred.lib; |
| |
| import org.apache.hadoop.util.ReflectionUtils; |
| import org.apache.hadoop.classification.InterfaceAudience; |
| import org.apache.hadoop.classification.InterfaceStability; |
| import org.apache.hadoop.mapred.MapRunnable; |
| import org.apache.hadoop.mapred.JobConf; |
| import org.apache.hadoop.mapred.Mapper; |
| import org.apache.hadoop.mapred.RecordReader; |
| import org.apache.hadoop.mapred.OutputCollector; |
| import org.apache.hadoop.mapred.Reporter; |
| import org.apache.hadoop.mapred.SkipBadRecords; |
| import org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper; |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| |
| import java.io.IOException; |
| import java.util.concurrent.*; |
| |
| /** |
| * Multithreaded implementation for @link org.apache.hadoop.mapred.MapRunnable. |
| * <p> |
| * It can be used instead of the default implementation, |
| * @link org.apache.hadoop.mapred.MapRunner, when the Map operation is not CPU |
| * bound in order to improve throughput. |
| * <p> |
| * Map implementations using this MapRunnable must be thread-safe. |
| * <p> |
| * The Map-Reduce job has to be configured to use this MapRunnable class (using |
| * the JobConf.setMapRunnerClass method) and |
| * the number of thread the thread-pool can use with the |
| * <code>mapred.map.multithreadedrunner.threads</code> property, its default |
| * value is 10 threads. |
| * <p> |
| * @deprecated Use {@link MultithreadedMapper} instead. |
| */ |
| @Deprecated |
| @InterfaceAudience.Public |
| @InterfaceStability.Stable |
| public class MultithreadedMapRunner<K1, V1, K2, V2> |
| implements MapRunnable<K1, V1, K2, V2> { |
| |
| private static final Log LOG = |
| LogFactory.getLog(MultithreadedMapRunner.class.getName()); |
| |
| private JobConf job; |
| private Mapper<K1, V1, K2, V2> mapper; |
| private ExecutorService executorService; |
| private volatile IOException ioException; |
| private volatile RuntimeException runtimeException; |
| private boolean incrProcCount; |
| |
| @SuppressWarnings("unchecked") |
| public void configure(JobConf jobConf) { |
| int numberOfThreads = |
| jobConf.getInt(MultithreadedMapper.NUM_THREADS, 10); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Configuring jobConf " + jobConf.getJobName() + |
| " to use " + numberOfThreads + " threads"); |
| } |
| |
| this.job = jobConf; |
| //increment processed counter only if skipping feature is enabled |
| this.incrProcCount = SkipBadRecords.getMapperMaxSkipRecords(job)>0 && |
| SkipBadRecords.getAutoIncrMapperProcCount(job); |
| this.mapper = ReflectionUtils.newInstance(jobConf.getMapperClass(), |
| jobConf); |
| |
| // Creating a threadpool of the configured size to execute the Mapper |
| // map method in parallel. |
| executorService = new ThreadPoolExecutor(numberOfThreads, numberOfThreads, |
| 0L, TimeUnit.MILLISECONDS, |
| new BlockingArrayQueue |
| (numberOfThreads)); |
| } |
| |
| /** |
| * A blocking array queue that replaces offer and add, which throws on a full |
| * queue, to a put, which waits on a full queue. |
| */ |
| private static class BlockingArrayQueue extends ArrayBlockingQueue<Runnable> { |
| |
| private static final long serialVersionUID = 1L; |
| public BlockingArrayQueue(int capacity) { |
| super(capacity); |
| } |
| public boolean offer(Runnable r) { |
| return add(r); |
| } |
| public boolean add(Runnable r) { |
| try { |
| put(r); |
| } catch (InterruptedException ie) { |
| Thread.currentThread().interrupt(); |
| } |
| return true; |
| } |
| } |
| |
| private void checkForExceptionsFromProcessingThreads() |
| throws IOException, RuntimeException { |
| // Checking if a Mapper.map within a Runnable has generated an |
| // IOException. If so we rethrow it to force an abort of the Map |
| // operation thus keeping the semantics of the default |
| // implementation. |
| if (ioException != null) { |
| throw ioException; |
| } |
| |
| // Checking if a Mapper.map within a Runnable has generated a |
| // RuntimeException. If so we rethrow it to force an abort of the Map |
| // operation thus keeping the semantics of the default |
| // implementation. |
| if (runtimeException != null) { |
| throw runtimeException; |
| } |
| } |
| |
| public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output, |
| Reporter reporter) |
| throws IOException { |
| try { |
| // allocate key & value instances these objects will not be reused |
| // because execution of Mapper.map is not serialized. |
| K1 key = input.createKey(); |
| V1 value = input.createValue(); |
| |
| while (input.next(key, value)) { |
| |
| executorService.execute(new MapperInvokeRunable(key, value, output, |
| reporter)); |
| |
| checkForExceptionsFromProcessingThreads(); |
| |
| // Allocate new key & value instances as mapper is running in parallel |
| key = input.createKey(); |
| value = input.createValue(); |
| } |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Finished dispatching all Mappper.map calls, job " |
| + job.getJobName()); |
| } |
| |
| // Graceful shutdown of the Threadpool, it will let all scheduled |
| // Runnables to end. |
| executorService.shutdown(); |
| |
| try { |
| |
| // Now waiting for all Runnables to end. |
| while (!executorService.awaitTermination(100, TimeUnit.MILLISECONDS)) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Awaiting all running Mappper.map calls to finish, job " |
| + job.getJobName()); |
| } |
| |
| // NOTE: while Mapper.map dispatching has concluded there are still |
| // map calls in progress and exceptions would be thrown. |
| checkForExceptionsFromProcessingThreads(); |
| |
| } |
| |
| // NOTE: it could be that a map call has had an exception after the |
| // call for awaitTermination() returing true. And edge case but it |
| // could happen. |
| checkForExceptionsFromProcessingThreads(); |
| |
| } catch (IOException ioEx) { |
| // Forcing a shutdown of all thread of the threadpool and rethrowing |
| // the IOException |
| executorService.shutdownNow(); |
| throw ioEx; |
| } catch (InterruptedException iEx) { |
| throw new RuntimeException(iEx); |
| } |
| |
| } finally { |
| mapper.close(); |
| } |
| } |
| |
| |
| /** |
| * Runnable to execute a single Mapper.map call from a forked thread. |
| */ |
| private class MapperInvokeRunable implements Runnable { |
| private K1 key; |
| private V1 value; |
| private OutputCollector<K2, V2> output; |
| private Reporter reporter; |
| |
| /** |
| * Collecting all required parameters to execute a Mapper.map call. |
| * <p> |
| * |
| * @param key |
| * @param value |
| * @param output |
| * @param reporter |
| */ |
| public MapperInvokeRunable(K1 key, V1 value, |
| OutputCollector<K2, V2> output, |
| Reporter reporter) { |
| this.key = key; |
| this.value = value; |
| this.output = output; |
| this.reporter = reporter; |
| } |
| |
| /** |
| * Executes a Mapper.map call with the given Mapper and parameters. |
| * <p> |
| * This method is called from the thread-pool thread. |
| * |
| */ |
| public void run() { |
| try { |
| // map pair to output |
| MultithreadedMapRunner.this.mapper.map(key, value, output, reporter); |
| if(incrProcCount) { |
| reporter.incrCounter(SkipBadRecords.COUNTER_GROUP, |
| SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS, 1); |
| } |
| } catch (IOException ex) { |
| // If there is an IOException during the call it is set in an instance |
| // variable of the MultithreadedMapRunner from where it will be |
| // rethrown. |
| synchronized (MultithreadedMapRunner.this) { |
| if (MultithreadedMapRunner.this.ioException == null) { |
| MultithreadedMapRunner.this.ioException = ex; |
| } |
| } |
| } catch (RuntimeException ex) { |
| // If there is a RuntimeException during the call it is set in an |
| // instance variable of the MultithreadedMapRunner from where it will be |
| // rethrown. |
| synchronized (MultithreadedMapRunner.this) { |
| if (MultithreadedMapRunner.this.runtimeException == null) { |
| MultithreadedMapRunner.this.runtimeException = ex; |
| } |
| } |
| } |
| } |
| } |
| |
| } |