blob: d7ff1b957c69c1857a61e1805aa3b4925d188b51 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.runtime.library.common.shuffle.impl;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
import com.google.common.annotations.VisibleForTesting;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.shuffle.FetchedInput;
import org.apache.tez.runtime.library.common.shuffle.FetchedInputAllocator;
import org.apache.tez.runtime.library.common.shuffle.FetcherCallback;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.RssTezBypassWriter;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.uniffle.client.api.ShuffleReadClient;
import org.apache.uniffle.client.response.CompressedShuffleBlock;
import org.apache.uniffle.common.compression.Codec;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.exception.RssException;
public class RssTezFetcher {
private static final Logger LOG = LoggerFactory.getLogger(RssTezFetcher.class);
private final FetcherCallback fetcherCallback;
private final FetchedInputAllocator inputManager;
private long copyBlockCount = 0;
private volatile boolean stopped = false;
private final ShuffleReadClient shuffleReadClient;
Map<Integer, Roaring64NavigableMap> rssSuccessBlockIdBitmapMap;
private int partitionId; // partition id to fetch for this task
private long readTime = 0;
private long decompressTime = 0;
private long serializeTime = 0;
private long waitTime = 0;
private long copyTime = 0; // the sum of readTime + decompressTime + serializeTime + waitTime
private long unCompressionLength = 0;
private int uniqueMapId = 0;
private boolean hasPendingData = false;
private long startWait;
private int waitCount = 0;
private byte[] uncompressedData = null;
private Codec codec;
RssTezFetcher(FetcherCallback fetcherCallback,
FetchedInputAllocator inputManager,
ShuffleReadClient shuffleReadClient,
Map<Integer, Roaring64NavigableMap> rssSuccessBlockIdBitmapMap,
int partitionId, RssConf rssConf) {
this.fetcherCallback = fetcherCallback;
this.inputManager = inputManager;
this.shuffleReadClient = shuffleReadClient;
this.rssSuccessBlockIdBitmapMap = rssSuccessBlockIdBitmapMap;
this.partitionId = partitionId;
this.codec = Codec.newInstance(rssConf);
}
public void fetchAllRssBlocks() throws IOException {
while (!stopped) {
try {
copyFromRssServer();
} catch (Exception e) {
LOG.error("Failed to fetchAllRssBlocks.", e);
throw e;
}
}
}
@VisibleForTesting
public void copyFromRssServer() throws IOException {
CompressedShuffleBlock compressedBlock = null;
ByteBuffer compressedData = null;
long blockStartFetch = 0;
// fetch a block
if (!hasPendingData) {
final long startFetch = System.currentTimeMillis();
blockStartFetch = System.currentTimeMillis();
compressedBlock = shuffleReadClient.readShuffleBlockData();
if (compressedBlock != null) {
compressedData = compressedBlock.getByteBuffer();
}
long fetchDuration = System.currentTimeMillis() - startFetch;
readTime += fetchDuration;
}
// uncompress the block
if (!hasPendingData && compressedData != null) {
final long startDecompress = System.currentTimeMillis();
int uncompressedLen = compressedBlock.getUncompressLength();
ByteBuffer decompressedBuffer = ByteBuffer.allocate(uncompressedLen);
codec.decompress(compressedData, uncompressedLen, decompressedBuffer, 0);
uncompressedData = decompressedBuffer.array();
unCompressionLength += compressedBlock.getUncompressLength();
long decompressDuration = System.currentTimeMillis() - startDecompress;
decompressTime += decompressDuration;
}
if (uncompressedData != null) {
// start to merge
final long startSerialization = System.currentTimeMillis();
int compressedDataLength = compressedData != null ? compressedData.capacity() : 0;
if (issueMapOutputMerge(compressedDataLength, blockStartFetch)) {
long serializationDuration = System.currentTimeMillis() - startSerialization;
serializeTime += serializationDuration;
// if reserve successes, reset status for next fetch
if (hasPendingData) {
waitTime += System.currentTimeMillis() - startWait;
}
hasPendingData = false;
uncompressedData = null;
} else {
LOG.info("uncompressedData is null");
// if reserve fail, return and wait
waitCount++;
startWait = System.currentTimeMillis();
return;
}
// update some status
copyBlockCount++;
copyTime = readTime + decompressTime + serializeTime + waitTime;
} else {
LOG.info("uncompressedData is null");
// finish reading data, close related reader and check data consistent
shuffleReadClient.close();
shuffleReadClient.checkProcessedBlockIds();
shuffleReadClient.logStatics();
LOG.info("reduce task partition:" + partitionId + " read block cnt: " + copyBlockCount + " cost "
+ readTime + " ms to fetch and " + decompressTime + " ms to decompress with unCompressionLength["
+ unCompressionLength + "] and " + serializeTime + " ms to serialize and "
+ waitTime + " ms to wait resource, total copy time: " + copyTime);
LOG.info("Stop fetcher");
stopFetch();
}
}
private boolean issueMapOutputMerge(int compressedLength, long blockStartFetch) throws IOException {
// Allocate a MapOutput (either in-memory or on-disk) to put uncompressed block
// In Rss, a MapOutput is sent as multiple blocks, so the reducer needs to
// treat each "block" as a faked "mapout".
// To avoid name conflicts, we use getNextUniqueTaskAttemptID instead.
// It will generate a unique TaskAttemptID(increased_seq++, 0).
InputAttemptIdentifier uniqueInputAttemptIdentifier = getNextUniqueInputAttemptIdentifier();
FetchedInput fetchedInput = null;
try {
fetchedInput = inputManager.allocate(uncompressedData.length,
compressedLength, uniqueInputAttemptIdentifier);
} catch (IOException ioe) {
// kill this reduce attempt
throw ioe;
}
// Allocated space and then write data to mapOutput
try {
RssTezBypassWriter.write(fetchedInput, uncompressedData);
// let the merger knows this block is ready for merging
fetcherCallback.fetchSucceeded(null, uniqueInputAttemptIdentifier, fetchedInput,
compressedLength, unCompressionLength, System.currentTimeMillis() - blockStartFetch);
} catch (Throwable t) {
LOG.error("Failed to write fetchedInput.", t);
throw new RssException("Partition: " + partitionId + " cannot write block to "
+ fetchedInput.getClass().getSimpleName() + " due to: " + t.getClass().getName());
}
return true;
}
private InputAttemptIdentifier getNextUniqueInputAttemptIdentifier() {
return new InputAttemptIdentifier(uniqueMapId++, 0);
}
private void stopFetch() {
stopped = true;
}
@VisibleForTesting
public int getRetryCount() {
return waitCount;
}
}