| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.tez.runtime.library.common.shuffle.orderedgrouped; |
| |
| import java.io.IOException; |
| import java.lang.reflect.UndeclaredThrowableException; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicReference; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Preconditions; |
| import com.google.common.util.concurrent.FutureCallback; |
| import com.google.common.util.concurrent.Futures; |
| import com.google.common.util.concurrent.ListenableFuture; |
| import com.google.common.util.concurrent.ListeningExecutorService; |
| import com.google.common.util.concurrent.MoreExecutors; |
| import com.google.common.util.concurrent.ThreadFactoryBuilder; |
| import org.apache.hadoop.classification.InterfaceAudience; |
| import org.apache.hadoop.classification.InterfaceAudience.Private; |
| import org.apache.hadoop.classification.InterfaceStability; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.LocalDirAllocator; |
| import org.apache.hadoop.io.compress.CompressionCodec; |
| import org.apache.hadoop.io.compress.DefaultCodec; |
| import org.apache.hadoop.util.ReflectionUtils; |
| import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; |
| import org.apache.tez.common.CallableWithNdc; |
| import org.apache.tez.common.RssTezConfig; |
| import org.apache.tez.common.TezRuntimeFrameworkConfigs; |
| import org.apache.tez.common.TezUtilsInternal; |
| import org.apache.tez.common.counters.TaskCounter; |
| import org.apache.tez.common.counters.TezCounter; |
| import org.apache.tez.dag.api.TezException; |
| import org.apache.tez.runtime.api.Event; |
| import org.apache.tez.runtime.api.InputContext; |
| import org.apache.tez.runtime.api.TaskFailureType; |
| import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; |
| import org.apache.tez.runtime.library.common.ConfigUtils; |
| import org.apache.tez.runtime.library.common.TezRuntimeUtils; |
| import org.apache.tez.runtime.library.common.combine.Combiner; |
| import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; |
| import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator; |
| import org.apache.tez.runtime.library.exceptions.InputAlreadyClosedException; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.uniffle.common.RemoteStorageInfo; |
| |
| /** Usage: Create instance, setInitialMemoryAllocated(long), run() */ |
| @InterfaceAudience.Private |
| @InterfaceStability.Unstable |
| public class RssShuffle implements ExceptionReporter { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(RssShuffle.class); |
| |
| private final Configuration conf; |
| private final InputContext inputContext; |
| |
| private final ShuffleInputEventHandlerOrderedGrouped eventHandler; |
| @VisibleForTesting final RssShuffleScheduler rssScheduler; |
| @VisibleForTesting final MergeManager merger; |
| |
| private final CompressionCodec codec; |
| private final boolean ifileReadAhead; |
| private final int ifileReadAheadLength; |
| |
| private AtomicReference<Throwable> throwable = new AtomicReference<Throwable>(); |
| private String throwingThreadName = null; |
| |
| private final RssRunShuffleCallable rssRunShuffleCallable; |
| private volatile ListenableFuture<TezRawKeyValueIterator> rssRunShuffleFuture; |
| private final ListeningExecutorService executor; |
| |
| private final String srcNameTrimmed; |
| |
| private AtomicBoolean isShutDown = new AtomicBoolean(false); |
| private AtomicBoolean fetchersClosed = new AtomicBoolean(false); |
| private AtomicBoolean schedulerClosed = new AtomicBoolean(false); |
| private AtomicBoolean mergerClosed = new AtomicBoolean(false); |
| |
| private final long startTime; |
| private final TezCounter mergePhaseTime; |
| private final TezCounter shufflePhaseTime; |
| private Configuration remoteConf; |
| |
| /** Usage: Create instance, RssShuffle */ |
| public RssShuffle( |
| InputContext inputContext, |
| Configuration conf, |
| int numInputs, |
| long initialMemoryAvailable, |
| int shuffleId, |
| ApplicationAttemptId applicationAttemptId) |
| throws IOException { |
| this.inputContext = inputContext; |
| this.conf = conf; |
| |
| this.srcNameTrimmed = TezUtilsInternal.cleanVertexName(inputContext.getSourceVertexName()); |
| |
| if (ConfigUtils.isIntermediateInputCompressed(conf)) { |
| Class<? extends CompressionCodec> codecClass = |
| ConfigUtils.getIntermediateInputCompressorClass(conf, DefaultCodec.class); |
| codec = ReflectionUtils.newInstance(codecClass, conf); |
| // Work around needed for HADOOP-12191. Avoids the native initialization synchronization race |
| codec.getDecompressorType(); |
| } else { |
| codec = null; |
| } |
| this.ifileReadAhead = |
| conf.getBoolean( |
| TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD, |
| TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT); |
| if (this.ifileReadAhead) { |
| this.ifileReadAheadLength = |
| conf.getInt( |
| TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES, |
| TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES_DEFAULT); |
| } else { |
| this.ifileReadAheadLength = 0; |
| } |
| this.remoteConf = getRemoteConf(conf); |
| |
| LOG.info( |
| srcNameTrimmed |
| + ": " |
| + "Shuffle assigned with " |
| + numInputs |
| + " inputs" |
| + ", codec: " |
| + (codec == null ? "None" : codec.getClass().getName()) |
| + ", ifileReadAhead: " |
| + ifileReadAhead); |
| |
| startTime = System.currentTimeMillis(); |
| merger = createMergeManager(initialMemoryAvailable, applicationAttemptId); |
| rssScheduler = |
| new RssShuffleScheduler( |
| this.inputContext, |
| this.conf, |
| numInputs, |
| this, |
| merger, |
| merger, |
| startTime, |
| codec, |
| ifileReadAhead, |
| ifileReadAheadLength, |
| srcNameTrimmed, |
| shuffleId, |
| applicationAttemptId); |
| |
| this.mergePhaseTime = inputContext.getCounters().findCounter(TaskCounter.MERGE_PHASE_TIME); |
| this.shufflePhaseTime = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_PHASE_TIME); |
| |
| eventHandler = |
| new ShuffleInputEventHandlerOrderedGrouped( |
| inputContext, rssScheduler, ShuffleUtils.isTezShuffleHandler(conf)); |
| |
| ExecutorService rawExecutor = |
| Executors.newFixedThreadPool( |
| 1, |
| new ThreadFactoryBuilder() |
| .setDaemon(true) |
| .setNameFormat("ShuffleAndMergeRunner {" + srcNameTrimmed + "}") |
| .build()); |
| |
| executor = MoreExecutors.listeningDecorator(rawExecutor); |
| rssRunShuffleCallable = new RssRunShuffleCallable(); |
| } |
| |
| protected MergeManager createMergeManager( |
| long initialMemoryAvailable, ApplicationAttemptId appAttemptId) throws IOException { |
| Combiner combiner = TezRuntimeUtils.instantiateCombiner(conf, inputContext); |
| FileSystem localFS = FileSystem.getLocal(this.conf); |
| LocalDirAllocator localDirAllocator = |
| new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS); |
| TezCounter spilledRecordsCounter = |
| inputContext.getCounters().findCounter(TaskCounter.SPILLED_RECORDS); |
| TezCounter reduceCombineInputCounter = |
| inputContext.getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS); |
| TezCounter mergedMapOutputsCounter = |
| inputContext.getCounters().findCounter(TaskCounter.MERGED_MAP_OUTPUTS); |
| |
| boolean useRemoteSpill = |
| conf.getBoolean( |
| RssTezConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED, |
| RssTezConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED_DEFAULT); |
| if (useRemoteSpill) { |
| // Use minimized replica, because spilled data can be recomputed by reduce task. |
| // Instead, we use more retries on HDFS client. |
| int replication = |
| conf.getInt( |
| RssTezConfig.RSS_REDUCE_REMOTE_SPILL_REPLICATION, |
| RssTezConfig.RSS_REDUCE_REMOTE_SPILL_REPLICATION_DEFAULT); |
| int retries = |
| conf.getInt( |
| RssTezConfig.RSS_REDUCE_REMOTE_SPILL_RETRIES, |
| RssTezConfig.RSS_REDUCE_REMOTE_SPILL_RETRIES_DEFAULT); |
| LOG.info("Tez RssShuffle will use RssMergeManager!"); |
| return new RssMergeManager( |
| this.conf, |
| localFS, |
| inputContext, |
| combiner, |
| spilledRecordsCounter, |
| reduceCombineInputCounter, |
| mergedMapOutputsCounter, |
| this, |
| initialMemoryAvailable, |
| codec, |
| ifileReadAhead, |
| ifileReadAheadLength, |
| this.remoteConf, |
| replication, |
| retries, |
| appAttemptId.toString()); |
| } else { |
| return new MergeManager( |
| this.conf, |
| localFS, |
| localDirAllocator, |
| inputContext, |
| combiner, |
| spilledRecordsCounter, |
| reduceCombineInputCounter, |
| mergedMapOutputsCounter, |
| this, |
| initialMemoryAvailable, |
| codec, |
| ifileReadAhead, |
| ifileReadAheadLength); |
| } |
| } |
| |
| private static Configuration getRemoteConf(Configuration conf) { |
| String basePath = conf.get(RssTezConfig.RSS_REMOTE_STORAGE_PATH); |
| String remoteStorageConf = conf.get(RssTezConfig.RSS_REMOTE_STORAGE_CONF); |
| RemoteStorageInfo remoteStorageInfo = new RemoteStorageInfo(basePath, remoteStorageConf); |
| Configuration remoteConf = new Configuration(conf); |
| if (!remoteStorageInfo.isEmpty()) { |
| for (Map.Entry<String, String> entry : remoteStorageInfo.getConfItems().entrySet()) { |
| remoteConf.set(entry.getKey(), entry.getValue()); |
| } |
| } |
| return remoteConf; |
| } |
| |
| public void handleEvents(List<Event> events) throws IOException { |
| if (!isShutDown.get()) { |
| eventHandler.handleEvents(events); |
| } else { |
| LOG.info( |
| srcNameTrimmed |
| + ": " |
| + "Ignoring events since already shutdown. EventCount: " |
| + events.size()); |
| } |
| } |
| |
| /** |
| * Indicates whether the Shuffle and Merge processing is complete. |
| * |
| * @return false if not complete, true if complete or if an error occurred. |
| * @throws InterruptedException |
| * @throws IOException |
| * @throws InputAlreadyClosedException |
| */ |
| public boolean isInputReady() throws IOException, InterruptedException, TezException { |
| if (isShutDown.get()) { |
| throw new InputAlreadyClosedException(); |
| } |
| if (throwable.get() != null) { |
| handleThrowable(throwable.get()); |
| } |
| if (rssRunShuffleFuture == null) { |
| return false; |
| } |
| // Don't need to check merge status, since runShuffleFuture will only |
| // complete once merge is complete. |
| return rssRunShuffleFuture.isDone(); |
| } |
| |
| private void handleThrowable(Throwable t) throws IOException, InterruptedException { |
| if (t instanceof IOException) { |
| throw (IOException) t; |
| } else if (t instanceof InterruptedException) { |
| throw (InterruptedException) t; |
| } else { |
| throw new UndeclaredThrowableException(t); |
| } |
| } |
| |
| /** |
| * Waits for the Shuffle and Merge to complete, and returns an iterator over the input. |
| * |
| * @return an iterator over the fetched input. |
| * @throws IOException |
| * @throws InterruptedException |
| */ |
| public TezRawKeyValueIterator waitForInput() |
| throws IOException, InterruptedException, TezException { |
| Preconditions.checkState( |
| rssRunShuffleFuture != null, "waitForInput can only be called after run"); |
| TezRawKeyValueIterator kvIter = null; |
| try { |
| kvIter = rssRunShuffleFuture.get(); |
| } catch (ExecutionException e) { |
| Throwable cause = e.getCause(); |
| // Processor interrupted while waiting for errors, will see an InterruptedException. |
| handleThrowable(cause); |
| } |
| if (isShutDown.get()) { |
| throw new InputAlreadyClosedException(); |
| } |
| if (throwable.get() != null) { |
| handleThrowable(throwable.get()); |
| } |
| return kvIter; |
| } |
| |
| public void run() throws IOException { |
| merger.configureAndStart(); |
| rssRunShuffleFuture = executor.submit(rssRunShuffleCallable); |
| Futures.addCallback( |
| rssRunShuffleFuture, new RssShuffleRunnerFutureCallback(), MoreExecutors.directExecutor()); |
| executor.shutdown(); |
| } |
| |
| public void shutdown() { |
| if (!isShutDown.getAndSet(true)) { |
| // Interrupt so that the scheduler / merger sees this interrupt. |
| LOG.info("Shutting down Shuffle for source: " + srcNameTrimmed); |
| rssRunShuffleFuture.cancel(true); |
| cleanupIgnoreErrors(); |
| } |
| } |
| |
| // Not handling any shutdown logic here. That's handled by the callback from this invocation. |
| private class RssRunShuffleCallable extends CallableWithNdc<TezRawKeyValueIterator> { |
| @Override |
| protected TezRawKeyValueIterator callInternal() throws IOException, InterruptedException { |
| |
| if (!isShutDown.get()) { |
| try { |
| rssScheduler.start(); |
| } catch (Throwable e) { |
| throw new RssShuffleError("Error during shuffle", e); |
| } finally { |
| cleanupShuffleScheduler(); |
| } |
| } |
| // The ShuffleScheduler may have exited cleanly as a result of a shutdown invocation |
| // triggered by a previously reportedException. Check before proceeding further.s |
| synchronized (RssShuffle.this) { |
| if (throwable.get() != null) { |
| throw new RssShuffleError("error in shuffle in " + throwingThreadName, throwable.get()); |
| } |
| } |
| |
| shufflePhaseTime.setValue(System.currentTimeMillis() - startTime); |
| |
| // stop the scheduler |
| cleanupShuffleScheduler(); |
| |
| // Finish the on-going merges... |
| TezRawKeyValueIterator kvIter = null; |
| inputContext.notifyProgress(); |
| try { |
| kvIter = merger.close(true); |
| } catch (Throwable e) { |
| // Set the throwable so that future.get() sees the reported errror. |
| throwable.set(e); |
| throw new RssShuffleError("Error while doing final merge ", e); |
| } |
| mergePhaseTime.setValue(System.currentTimeMillis() - startTime); |
| |
| inputContext.notifyProgress(); |
| // Sanity check |
| synchronized (RssShuffle.this) { |
| if (throwable.get() != null) { |
| throw new RssShuffleError("error in shuffle in " + throwingThreadName, throwable.get()); |
| } |
| } |
| |
| inputContext.inputIsReady(); |
| LOG.info("merge complete for input vertex : " + srcNameTrimmed); |
| return kvIter; |
| } |
| } |
| |
| private void cleanupShuffleSchedulerIgnoreErrors() { |
| try { |
| cleanupShuffleScheduler(); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| LOG.info( |
| srcNameTrimmed |
| + ": " |
| + "Interrupted while attempting to close the scheduler during cleanup. Ignoring"); |
| } |
| } |
| |
| private void cleanupShuffleScheduler() throws InterruptedException { |
| if (!schedulerClosed.getAndSet(true)) { |
| rssScheduler.close(); |
| } |
| } |
| |
| private void cleanupMerger(boolean ignoreErrors) throws Throwable { |
| if (!mergerClosed.getAndSet(true)) { |
| try { |
| merger.close(false); |
| } catch (InterruptedException e) { |
| if (ignoreErrors) { |
| // Reset the status |
| Thread.currentThread().interrupt(); |
| LOG.info( |
| srcNameTrimmed |
| + ": " |
| + "Interrupted while attempting to close the merger during cleanup. Ignoring"); |
| } else { |
| throw e; |
| } |
| } catch (Throwable e) { |
| if (ignoreErrors) { |
| LOG.info("{}: Exception while trying to shutdown merger, Ignoring", srcNameTrimmed, e); |
| } else { |
| throw e; |
| } |
| } |
| } |
| } |
| |
| private void cleanupIgnoreErrors() { |
| try { |
| if (eventHandler != null) { |
| eventHandler.logProgress(true); |
| } |
| try { |
| cleanupShuffleSchedulerIgnoreErrors(); |
| } catch (Exception e) { |
| LOG.warn( |
| "Error cleaning up shuffle scheduler. Ignoring and continuing with shutdown. Message={}", |
| e.getMessage()); |
| } |
| cleanupMerger(true); |
| } catch (Throwable t) { |
| LOG.info("{}: Error in cleaning up.., ", srcNameTrimmed, t); |
| } |
| } |
| |
| @Private |
| @Override |
| public synchronized void reportException(Throwable t) { |
| // RunShuffleCallable onFailure deals with ignoring errors on shutdown. |
| if (throwable.get() == null) { |
| LOG.info( |
| srcNameTrimmed |
| + ": " |
| + "Setting throwable in reportException with message [" |
| + t.getMessage() |
| + "] from thread [" |
| + Thread.currentThread().getName()); |
| throwable.set(t); |
| throwingThreadName = Thread.currentThread().getName(); |
| // Notify the scheduler so that the reporting thread finds the |
| // exception immediately. |
| cleanupShuffleSchedulerIgnoreErrors(); |
| } |
| } |
| |
| @Private |
| @Override |
| public synchronized void killSelf(Exception exception, String message) { |
| if (!isShutDown.get() && throwable.get() == null) { |
| shutdown(); |
| inputContext.killSelf(exception, message); |
| } |
| } |
| |
| public static class RssShuffleError extends IOException { |
| private static final long serialVersionUID = 5753909320586607881L; |
| |
| RssShuffleError(String msg, Throwable t) { |
| super(msg, t); |
| } |
| } |
| |
| @Private |
| public static long getInitialMemoryRequirement(Configuration conf, long maxAvailableTaskMemory) { |
| return MergeManager.getInitialMemoryRequirement(conf, maxAvailableTaskMemory); |
| } |
| |
| private class RssShuffleRunnerFutureCallback implements FutureCallback<TezRawKeyValueIterator> { |
| @Override |
| public void onSuccess(TezRawKeyValueIterator result) { |
| LOG.info(srcNameTrimmed + ": RSSShuffle Runner thread complete"); |
| } |
| |
| @Override |
| public void onFailure(Throwable t) { |
| if (isShutDown.get()) { |
| LOG.info(srcNameTrimmed + ": Already shutdown. Ignoring error"); |
| } else { |
| LOG.error(srcNameTrimmed + ": RSSShuffleRunner failed with error", t); |
| // In case of an abort / Interrupt - the runtime makes sure that this is ignored. |
| inputContext.reportFailure(TaskFailureType.NON_FATAL, t, "RSSShuffle Runner Failed"); |
| cleanupIgnoreErrors(); |
| } |
| } |
| } |
| } |