blob: 992f509d75feac4d9f663328a04eadb77e4d07f3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.annotations.VisibleForTesting;
import org.apache.tez.common.CallableWithNdc;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.uniffle.client.api.ShuffleReadClient;
import org.apache.uniffle.client.response.CompressedShuffleBlock;
import org.apache.uniffle.common.compression.Codec;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.exception.RssException;
public class RssTezShuffleDataFetcher extends CallableWithNdc<Void> {
private static final Logger LOG = LoggerFactory.getLogger(RssTezShuffleDataFetcher.class);
private enum ShuffleErrors {
IO_ERROR,
WRONG_LENGTH,
BAD_ID,
WRONG_MAP,
CONNECTION,
WRONG_REDUCE
}
private static final String SHUFFLE_ERR_GRP_NAME = "Shuffle Errors";
private final TezCounter ioErrs;
private final MergeManager merger;
private final long totalBlockCount;
private long copyBlockCount = 0;
private volatile boolean stopped = false;
private final ShuffleReadClient shuffleReadClient;
private long readTime = 0;
private long decompressTime = 0;
private long serializeTime = 0;
private long waitTime = 0;
private long copyTime = 0; // the sum of readTime + decompressTime + serializeTime + waitTime
private long unCompressionLength = 0;
private final InputAttemptIdentifier inputAttemptIdentifier;
private static int uniqueMapId = 0;
private boolean hasPendingData = false;
private long startWait;
private int waitCount = 0;
private byte[] uncompressedData = null;
private final Codec rssCodec;
private Integer partitionId;
private final ExceptionReporter exceptionReporter;
private final AtomicInteger issuedCnt = new AtomicInteger(0);
public RssTezShuffleDataFetcher(
InputAttemptIdentifier inputAttemptIdentifier,
Integer partitionId,
MergeManager merger,
TezCounters tezCounters,
ShuffleReadClient shuffleReadClient,
long totalBlockCount,
RssConf rssConf,
ExceptionReporter exceptionReporter) {
this.merger = merger;
this.partitionId = partitionId;
this.inputAttemptIdentifier = inputAttemptIdentifier;
this.exceptionReporter = exceptionReporter;
ioErrs =
tezCounters.findCounter(
SHUFFLE_ERR_GRP_NAME, RssTezShuffleDataFetcher.ShuffleErrors.IO_ERROR.toString());
this.shuffleReadClient = shuffleReadClient;
this.totalBlockCount = totalBlockCount;
this.rssCodec = Codec.newInstance(rssConf);
LOG.info(
"RssTezShuffleDataFetcher, partitionId:{}, inputAttemptIdentifier:{}.",
this.partitionId,
this.inputAttemptIdentifier);
}
@Override
public Void callInternal() {
try {
fetchAllRssBlocks();
} catch (InterruptedException ie) {
// might not be respected when fetcher is in progress / server is busy. TEZ-711
// Set the status back
LOG.warn(ie.getMessage(), ie);
Thread.currentThread().interrupt();
return null;
} catch (Throwable t) {
LOG.warn(t.getMessage(), t);
exceptionReporter.reportException(t);
// Shuffle knows how to deal with failures post shutdown via the onFailure hook
}
return null;
}
public void fetchAllRssBlocks() throws IOException, InterruptedException {
while (!stopped) {
try {
// If merge is on, block
merger.waitForInMemoryMerge();
// Do shuffle
copyFromRssServer();
} catch (Exception e) {
LOG.warn(e.getMessage(), e);
throw e;
}
}
}
@VisibleForTesting
public void copyFromRssServer() throws IOException {
CompressedShuffleBlock compressedBlock = null;
ByteBuffer compressedData = null;
// fetch a block
if (!hasPendingData) {
final long startFetch = System.currentTimeMillis();
compressedBlock = shuffleReadClient.readShuffleBlockData();
if (compressedBlock != null) {
compressedData = compressedBlock.getByteBuffer();
}
long fetchDuration = System.currentTimeMillis() - startFetch;
readTime += fetchDuration;
}
// uncompress the block
if (!hasPendingData && compressedData != null) {
final long startDecompress = System.currentTimeMillis();
int uncompressedLen = compressedBlock.getUncompressLength();
ByteBuffer decompressedBuffer = ByteBuffer.allocate(uncompressedLen);
rssCodec.decompress(compressedData, uncompressedLen, decompressedBuffer, 0);
uncompressedData = decompressedBuffer.array();
unCompressionLength += compressedBlock.getUncompressLength();
long decompressDuration = System.currentTimeMillis() - startDecompress;
decompressTime += decompressDuration;
}
if (uncompressedData != null) {
// start to merge
final long startSerialization = System.currentTimeMillis();
if (issueMapOutputMerge()) {
long serializationDuration = System.currentTimeMillis() - startSerialization;
serializeTime += serializationDuration;
// if reserve successes, reset status for next fetch
if (hasPendingData) {
waitTime += System.currentTimeMillis() - startWait;
}
hasPendingData = false;
uncompressedData = null;
} else {
// if reserve fail, return and wait
startWait = System.currentTimeMillis();
return;
}
// update some status
copyBlockCount++;
copyTime = readTime + decompressTime + serializeTime + waitTime;
updateStatus();
} else {
// finish reading data, close related reader and check data consistent
shuffleReadClient.close();
shuffleReadClient.checkProcessedBlockIds();
shuffleReadClient.logStatics();
LOG.info(
"Reduce task "
+ inputAttemptIdentifier
+ " read block cnt: "
+ copyBlockCount
+ " cost "
+ readTime
+ " ms to fetch and "
+ decompressTime
+ " ms to decompress with unCompressionLength["
+ unCompressionLength
+ "] and "
+ serializeTime
+ " ms to serialize and "
+ waitTime
+ " ms to wait resource"
+ ", copy time:"
+ copyTime);
stopFetch();
}
}
public Integer getPartitionId() {
return partitionId;
}
public void setPartitionId(Integer partitionId) {
this.partitionId = partitionId;
}
private boolean issueMapOutputMerge() throws IOException {
// Allocate a MapOutput (either in-memory or on-disk) to put uncompressed block
// In Rss, a MapOutput is sent as multiple blocks, so the reducer needs to
// treat each "block" as a faked "mapout".
// To avoid name conflicts, we use getNextUniqueTaskAttemptID instead.
// It will generate a unique TaskAttemptID(increased_seq++, 0).
InputAttemptIdentifier uniqueInputAttemptIdentifier = getNextUniqueInputAttemptIdentifier();
MapOutput mapOutput = null;
try {
issuedCnt.incrementAndGet();
LOG.info(
"IssueMapOutputMerge, uncompressedData length:{}, issueCnt:{}, totalBlockCount:{}",
uncompressedData.length,
issuedCnt.get(),
totalBlockCount);
mapOutput = merger.reserve(uniqueInputAttemptIdentifier, uncompressedData.length, 0, 1);
} catch (IOException ioe) {
// kill this reduce attempt
ioErrs.increment(1);
throw ioe;
}
// Check if we can shuffle *now* ...
if (mapOutput == null || mapOutput.getType() == MapOutput.Type.WAIT) {
LOG.info("RssMRFetcher - MergeManager returned status WAIT ...");
// Not an error but wait to process data.
// Use a retry flag to avoid re-fetch and re-uncompress.
hasPendingData = true;
waitCount++;
return false;
}
// write data to mapOutput
try {
RssTezBypassWriter.write(mapOutput, uncompressedData);
// let the merger knows this block is ready for merging
mapOutput.commit();
} catch (Throwable t) {
ioErrs.increment(1);
mapOutput.abort();
throw new RssException(
"Reduce: "
+ inputAttemptIdentifier
+ " cannot write block to "
+ mapOutput.getClass().getSimpleName()
+ " due to: "
+ t.getClass().getName());
}
return true;
}
private InputAttemptIdentifier getNextUniqueInputAttemptIdentifier() {
return new InputAttemptIdentifier(uniqueMapId++, 0);
}
private void updateStatus() {}
@VisibleForTesting
public int getRetryCount() {
return waitCount;
}
private void stopFetch() {
LOG.info("RssTezShuffleDataFetcher stop fetch");
stopped = true;
}
public void shutDown() {
stopFetch();
LOG.info("RssTezShuffleDataFetcher shutdown");
}
}