blob: b07581a2e2cef497a34770026c780a30a4ff7554 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.task.reduce;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.text.DecimalFormat;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TaskStatus;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.util.Progress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.uniffle.client.api.ShuffleReadClient;
import org.apache.uniffle.client.response.CompressedShuffleBlock;
import org.apache.uniffle.common.compression.Codec;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.util.ByteUnit;
public class RssFetcher<K, V> {
private static final Logger LOG = LoggerFactory.getLogger(RssFetcher.class);
private final Reporter reporter;
private enum ShuffleErrors {
IO_ERROR,
WRONG_LENGTH,
BAD_ID,
WRONG_MAP,
CONNECTION,
WRONG_REDUCE
}
private static final String SHUFFLE_ERR_GRP_NAME = "Shuffle Errors";
private static final double BYTES_PER_MILLIS_TO_MBS = 1000d / (ByteUnit.MiB.toBytes(1));
private final DecimalFormat mbpsFormat = new DecimalFormat("0.00");
private final JobConf jobConf;
private final Counters.Counter connectionErrs;
private final Counters.Counter ioErrs;
private final Counters.Counter wrongLengthErrs;
private final Counters.Counter badIdErrs;
private final Counters.Counter wrongMapErrs;
private final Counters.Counter wrongReduceErrs;
private final TaskStatus status;
private final MergeManager<K, V> merger;
private final Progress progress;
private final ShuffleClientMetrics metrics;
private long totalBlockCount;
private long copyBlockCount = 0;
private volatile boolean stopped = false;
private ShuffleReadClient shuffleReadClient;
private long readTime = 0;
private long decompressTime = 0;
private long serializeTime = 0;
private long waitTime = 0;
private long copyTime = 0; // the sum of readTime + decompressTime + serializeTime + waitTime
private long unCompressionLength = 0;
private final TaskAttemptID reduceId;
private static int uniqueMapId = 0;
private boolean hasPendingData = false;
private long startWait;
private int waitCount = 0;
private byte[] uncompressedData = null;
private RssConf rssConf;
private Codec codec;
RssFetcher(
JobConf job,
TaskAttemptID reduceId,
TaskStatus status,
MergeManager<K, V> merger,
Progress progress,
Reporter reporter,
ShuffleClientMetrics metrics,
ShuffleReadClient shuffleReadClient,
long totalBlockCount,
RssConf rssConf) {
this.jobConf = job;
this.reporter = reporter;
this.status = status;
this.merger = merger;
this.progress = progress;
this.metrics = metrics;
this.reduceId = reduceId;
ioErrs =
reporter.getCounter(SHUFFLE_ERR_GRP_NAME, RssFetcher.ShuffleErrors.IO_ERROR.toString());
wrongLengthErrs =
reporter.getCounter(SHUFFLE_ERR_GRP_NAME, RssFetcher.ShuffleErrors.WRONG_LENGTH.toString());
badIdErrs =
reporter.getCounter(SHUFFLE_ERR_GRP_NAME, RssFetcher.ShuffleErrors.BAD_ID.toString());
wrongMapErrs =
reporter.getCounter(SHUFFLE_ERR_GRP_NAME, RssFetcher.ShuffleErrors.WRONG_MAP.toString());
connectionErrs =
reporter.getCounter(SHUFFLE_ERR_GRP_NAME, RssFetcher.ShuffleErrors.CONNECTION.toString());
wrongReduceErrs =
reporter.getCounter(SHUFFLE_ERR_GRP_NAME, RssFetcher.ShuffleErrors.WRONG_REDUCE.toString());
this.shuffleReadClient = shuffleReadClient;
this.totalBlockCount = totalBlockCount;
this.rssConf = rssConf;
this.codec = Codec.newInstance(rssConf);
}
public void fetchAllRssBlocks() throws IOException, InterruptedException {
while (!stopped) {
try {
// If merge is on, block
merger.waitForResource();
// Do shuffle
metrics.threadBusy();
copyFromRssServer();
} finally {
metrics.threadFree();
}
}
}
@VisibleForTesting
public void copyFromRssServer() throws IOException {
CompressedShuffleBlock compressedBlock = null;
ByteBuffer compressedData = null;
// fetch a block
if (!hasPendingData) {
final long startFetch = System.currentTimeMillis();
compressedBlock = shuffleReadClient.readShuffleBlockData();
if (compressedBlock != null) {
compressedData = compressedBlock.getByteBuffer();
}
long fetchDuration = System.currentTimeMillis() - startFetch;
readTime += fetchDuration;
}
// uncompress the block
if (!hasPendingData && compressedData != null) {
final long startDecompress = System.currentTimeMillis();
int uncompressedLen = compressedBlock.getUncompressLength();
ByteBuffer decompressedBuffer = ByteBuffer.allocate(uncompressedLen);
codec.decompress(compressedData, uncompressedLen, decompressedBuffer, 0);
uncompressedData = decompressedBuffer.array();
unCompressionLength += compressedBlock.getUncompressLength();
long decompressDuration = System.currentTimeMillis() - startDecompress;
decompressTime += decompressDuration;
}
if (uncompressedData != null) {
// start to merge
final long startSerialization = System.currentTimeMillis();
if (issueMapOutputMerge()) {
long serializationDuration = System.currentTimeMillis() - startSerialization;
serializeTime += serializationDuration;
// if reserve successes, reset status for next fetch
if (hasPendingData) {
waitTime += System.currentTimeMillis() - startWait;
}
hasPendingData = false;
uncompressedData = null;
} else {
// if reserve fail, return and wait
startWait = System.currentTimeMillis();
return;
}
// update some status
copyBlockCount++;
copyTime = readTime + decompressTime + serializeTime + waitTime;
updateStatus();
reporter.progress();
} else {
// finish reading data, close related reader and check data consistent
shuffleReadClient.close();
shuffleReadClient.checkProcessedBlockIds();
shuffleReadClient.logStatics();
metrics.inputBytes(unCompressionLength);
LOG.info(
"reduce task "
+ reduceId.toString()
+ " cost "
+ readTime
+ " ms to fetch and "
+ decompressTime
+ " ms to decompress with unCompressionLength["
+ unCompressionLength
+ "] and "
+ serializeTime
+ " ms to serialize and "
+ waitTime
+ " ms to wait resource");
stopFetch();
}
}
private boolean issueMapOutputMerge() throws IOException {
// Allocate a MapOutput (either in-memory or on-disk) to put uncompressed block
// In Rss, a MapOutput is sent as multiple blocks, so the reducer needs to
// treat each "block" as a faked "mapout".
// To avoid name conflicts, we use getNextUniqueTaskAttemptID instead.
// It will generate a unique TaskAttemptID(increased_seq++, 0).
TaskAttemptID mapId = getNextUniqueTaskAttemptID();
MapOutput<K, V> mapOutput = null;
try {
mapOutput = merger.reserve(mapId, uncompressedData.length, 0);
} catch (IOException ioe) {
// kill this reduce attempt
ioErrs.increment(1);
throw ioe;
}
// Check if we can shuffle *now* ...
if (mapOutput == null) {
LOG.info("RssMRFetcher - MergeManager returned status WAIT ...");
// Not an error but wait to process data.
// Use a retry flag to avoid re-fetch and re-uncompress.
hasPendingData = true;
waitCount++;
return false;
}
// write data to mapOutput
try {
RssBypassWriter.write(mapOutput, uncompressedData);
// let the merger knows this block is ready for merging
mapOutput.commit();
if (mapOutput instanceof OnDiskMapOutput) {
LOG.info(
"Reduce: "
+ reduceId
+ " allocates disk to accept block "
+ " with byte sizes: "
+ uncompressedData.length);
}
} catch (Throwable t) {
ioErrs.increment(1);
mapOutput.abort();
throw new RssException(
"Reduce: "
+ reduceId
+ " cannot write block to "
+ mapOutput.getClass().getSimpleName()
+ " due to: "
+ t.getClass().getName());
}
return true;
}
private TaskAttemptID getNextUniqueTaskAttemptID() {
TaskID taskID = new TaskID(reduceId.getJobID(), TaskType.MAP, uniqueMapId++);
return new TaskAttemptID(taskID, 0);
}
private void stopFetch() {
stopped = true;
}
private void updateStatus() {
progress.set((float) copyBlockCount / totalBlockCount);
String statusString = copyBlockCount + " / " + totalBlockCount + " copied.";
status.setStateString(statusString);
if (copyTime == 0) {
copyTime = 1;
}
double bytesPerMillis = (double) unCompressionLength / copyTime;
double transferRate = bytesPerMillis * BYTES_PER_MILLIS_TO_MBS;
progress.setStatus(
"copy("
+ copyBlockCount
+ " of "
+ totalBlockCount
+ " at "
+ mbpsFormat.format(transferRate)
+ " MB/s)");
}
@VisibleForTesting
public int getRetryCount() {
return waitCount;
}
}