blob: 16e3a3a6fb7633e572ed8e87d309e8f16ce06c77 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.task.reduce;
import java.io.IOException;
import java.util.Set;
import java.util.TreeSet;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapOutputFile;
import org.apache.hadoop.mapred.Merger;
import org.apache.hadoop.mapred.RawKeyValueIterator;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.Task;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.util.Progress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
public class RssRemoteMergeManagerImpl<K, V> extends MergeManagerImpl<K, V> {
private static final Logger LOG = LoggerFactory.getLogger(RssRemoteMergeManagerImpl.class);
private final String appId;
private final TaskAttemptID reduceId;
private final JobConf jobConf;
Set<InMemoryMapOutput<K, V>> inMemoryMapOutputs =
new TreeSet<InMemoryMapOutput<K, V>>(new MapOutput.MapOutputComparator<K, V>());
private final RssInMemoryRemoteMerger<K, V> inMemoryMerger;
/** Spilled segments on HDFS */
Set<Path> onHDFSMapOutputs = new TreeSet<Path>();
@VisibleForTesting final long memoryLimit;
private long usedMemory;
private long commitMemory;
private final long mergeThreshold;
private final Reporter reporter;
private final ExceptionReporter exceptionReporter;
/** Combiner class to run during in-memory merge, if defined. */
private final Class<? extends Reducer> combinerClass;
/** Resettable collector used for combine. */
private final Task.CombineOutputCollector<K, V> combineCollector;
private final Counters.Counter spilledRecordsCounter;
private final Counters.Counter reduceCombineInputCounter;
private final Counters.Counter mergedMapOutputsCounter;
private final CompressionCodec codec;
private final Progress mergePhase;
private String basePath;
private FileSystem remoteFS;
public RssRemoteMergeManagerImpl(
String appId,
TaskAttemptID reduceId,
JobConf jobConf,
String basePath,
int replication,
int retries,
FileSystem localFS,
LocalDirAllocator localDirAllocator,
Reporter reporter,
CompressionCodec codec,
Class<? extends Reducer> combinerClass,
Task.CombineOutputCollector<K, V> combineCollector,
Counters.Counter spilledRecordsCounter,
Counters.Counter reduceCombineInputCounter,
Counters.Counter mergedMapOutputsCounter,
ExceptionReporter exceptionReporter,
Progress mergePhase,
MapOutputFile mapOutputFile,
JobConf remoteConf) {
super(
reduceId,
jobConf,
localFS,
localDirAllocator,
reporter,
codec,
combinerClass,
combineCollector,
spilledRecordsCounter,
reduceCombineInputCounter,
mergedMapOutputsCounter,
exceptionReporter,
mergePhase,
mapOutputFile);
this.appId = appId;
this.reduceId = reduceId;
this.jobConf = jobConf;
this.exceptionReporter = exceptionReporter;
this.mergePhase = mergePhase;
this.reporter = reporter;
this.codec = codec;
this.combinerClass = combinerClass;
this.combineCollector = combineCollector;
this.reduceCombineInputCounter = reduceCombineInputCounter;
this.spilledRecordsCounter = spilledRecordsCounter;
this.mergedMapOutputsCounter = mergedMapOutputsCounter;
try {
remoteConf.setInt("dfs.replication", replication);
remoteConf.setInt("dfs.client.block.write.retries", retries); // origin=3
this.remoteFS = HadoopFilesystemProvider.getFilesystem(new Path(basePath), remoteConf);
} catch (Exception e) {
throw new RssException("Cannot init remoteFS on path:" + basePath);
}
this.basePath = basePath;
final float maxInMemCopyUse =
jobConf.getFloat(
MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT,
MRJobConfig.DEFAULT_SHUFFLE_INPUT_BUFFER_PERCENT);
if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
throw new IllegalArgumentException(
"Invalid value for " + MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT + ": " + maxInMemCopyUse);
}
// Allow unit tests to fix Runtime memory
this.memoryLimit =
(long)
(jobConf.getLong(
MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES, Runtime.getRuntime().maxMemory())
* maxInMemCopyUse);
this.usedMemory = 0L;
this.commitMemory = 0L;
this.mergeThreshold =
(long)
(this.memoryLimit
* jobConf.getFloat(
MRJobConfig.SHUFFLE_MERGE_PERCENT, MRJobConfig.DEFAULT_SHUFFLE_MERGE_PERCENT));
LOG.info("MergerManager: memoryLimit={}, mergeThreshold={}", memoryLimit, mergeThreshold);
this.inMemoryMerger = createRssInMemoryMerger();
this.inMemoryMerger.start();
}
protected RssInMemoryRemoteMerger<K, V> createRssInMemoryMerger() {
return new RssInMemoryRemoteMerger<K, V>(
this,
jobConf,
remoteFS,
new Path(basePath, appId),
reduceId.toString(),
codec,
reporter,
spilledRecordsCounter,
combinerClass,
exceptionReporter,
combineCollector,
reduceCombineInputCounter,
mergedMapOutputsCounter);
}
@Override
public void waitForResource() throws InterruptedException {
inMemoryMerger.waitForMerge();
}
@Override
public synchronized MapOutput<K, V> reserve(TaskAttemptID mapId, long requestedSize, int fetcher)
throws IOException {
// we disable OnDisk MapOutput to avoid merging disk immediate data
if (usedMemory > memoryLimit) {
if (LOG.isDebugEnabled()) {
LOG.debug(
mapId
+ ": Stalling shuffle since usedMemory ("
+ usedMemory
+ ") is greater than memoryLimit ("
+ memoryLimit
+ ")."
+ " CommitMemory is ("
+ commitMemory
+ ")");
}
return null;
}
// Allow the in-memory shuffle to progress
if (LOG.isDebugEnabled()) {
LOG.debug(
mapId
+ ": Proceeding with shuffle since usedMemory ("
+ usedMemory
+ ") is lesser than memoryLimit ("
+ memoryLimit
+ ")."
+ "CommitMemory is ("
+ commitMemory
+ ")");
}
usedMemory += requestedSize;
// use this rss merger as the callback
return new InMemoryMapOutput<K, V>(jobConf, mapId, this, (int) requestedSize, codec, true);
}
@Override
synchronized void unreserve(long size) {
usedMemory -= size;
}
@Override
public synchronized void closeInMemoryFile(InMemoryMapOutput<K, V> mapOutput) {
inMemoryMapOutputs.add(mapOutput);
LOG.info(
"closeInMemoryFile -> map-output of size: "
+ mapOutput.getSize()
+ ", inMemoryMapOutputs.size() -> "
+ inMemoryMapOutputs.size()
+ ", commitMemory -> "
+ commitMemory
+ ", usedMemory ->"
+ usedMemory);
commitMemory += mapOutput.getSize();
// Can hang if mergeThreshold is really low.
if (commitMemory >= mergeThreshold) {
LOG.info(
"Starting inMemoryMerger's merge since commitMemory="
+ commitMemory
+ " > mergeThreshold="
+ mergeThreshold
+ ". Current usedMemory="
+ usedMemory);
inMemoryMergedMapOutputs.clear();
inMemoryMerger.startMerge(inMemoryMapOutputs);
commitMemory = 0L; // Reset commitMemory.
}
// we disable memToMemMerger to simplify design
}
public synchronized void closeOnHDFSFile(Path file) {
// many in-memory segments have been commit to HDFS
onHDFSMapOutputs.add(file);
}
@Override
public synchronized void closeInMemoryMergedFile(InMemoryMapOutput<K, V> mapOutput) {
throw new IllegalStateException("closeInMemoryMergedFile is unsupported for rss merger");
}
@Override
public synchronized void closeOnDiskFile(CompressAwarePath file) {
throw new IllegalStateException("closeOnDiskFile is unsupported for rss merger");
}
@Override
public RawKeyValueIterator close() throws Throwable {
// Wait for on-going merges to complete
inMemoryMerger.startMerge(inMemoryMapOutputs);
inMemoryMerger.close();
if (!inMemoryMapOutputs.isEmpty()) {
throw new RssException("InMemoryMapOutputs should be empty");
}
return finalMerge();
}
// Read HDFS segments for reduce
private RawKeyValueIterator finalMerge() throws IOException {
Class<K> keyClass = (Class<K>) jobConf.getMapOutputKeyClass();
Class<V> valueClass = (Class<V>) jobConf.getMapOutputValueClass();
final RawComparator<K> comparator = (RawComparator<K>) jobConf.getOutputKeyComparator();
// We will only merge sort once time
return Merger.merge(
jobConf,
remoteFS,
keyClass,
valueClass,
codec,
onHDFSMapOutputs.toArray(new Path[onHDFSMapOutputs.size()]),
true,
Integer.MAX_VALUE,
new Path("reduceId"),
comparator,
reporter,
spilledRecordsCounter,
null,
null,
null);
}
}