blob: 366968c342f7a01423190f185d9a5c0836e23a4b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.uniffle.common.segment;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import com.google.common.collect.Lists;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.BufferSegment;
import org.apache.uniffle.common.ShuffleDataSegment;
import org.apache.uniffle.common.ShuffleIndexResult;
import org.apache.uniffle.common.exception.RssException;
/**
* {@class LocalOrderSegmentSplitter} will be initialized only when the {@class
* ShuffleDataDistributionType} is LOCAL_ORDER, which means the index file will be split into
* several segments according to its locally ordered properties. And it will skip some blocks, but
* the remaining blocks in a segment are continuous.
*
* <p>This strategy will be useful for Spark AQE skew optimization, it will split the single
* partition into multiple shuffle readers, and each one will fetch partial single partition data
* which is in the range of [StartMapIndex, endMapIndex). And so if one reader uses this, it will
* skip lots of unnecessary blocks.
*
* <p>Last but not least, this split strategy depends on LOCAL_ORDER of index file, which must be
* guaranteed by the shuffle server.
*/
public class LocalOrderSegmentSplitter implements SegmentSplitter {
private static final Logger LOGGER = LoggerFactory.getLogger(LocalOrderSegmentSplitter.class);
private Roaring64NavigableMap expectTaskIds;
private int readBufferSize;
public LocalOrderSegmentSplitter(Roaring64NavigableMap expectTaskIds, int readBufferSize) {
this.expectTaskIds = expectTaskIds;
this.readBufferSize = readBufferSize;
}
@Override
public List<ShuffleDataSegment> split(ShuffleIndexResult shuffleIndexResult) {
if (shuffleIndexResult == null || shuffleIndexResult.isEmpty()) {
return Lists.newArrayList();
}
ByteBuffer indexData = shuffleIndexResult.getIndexData();
long dataFileLen = shuffleIndexResult.getDataFileLen();
List<BufferSegment> bufferSegments = Lists.newArrayList();
List<ShuffleDataSegment> dataFileSegments = Lists.newArrayList();
int bufferOffset = 0;
long fileOffset = -1;
long totalLen = 0;
long lastExpectedBlockIndex = -1;
List<Long> indexTaskIds = new ArrayList<>();
/**
* One ShuffleDataSegment should meet following requirements:
*
* <p>1. taskId in [startMapIndex, endMapIndex) taskIds bitmap. Attention: the index in the
* range is not the map task id, which means the required task ids are not continuous. 2.
* ShuffleDataSegment size should < readBufferSize 3. Single shuffleDataSegment's blocks should
* be continuous
*/
int index = 0;
while (indexData.hasRemaining()) {
try {
long offset = indexData.getLong();
int length = indexData.getInt();
int uncompressLength = indexData.getInt();
long crc = indexData.getLong();
long blockId = indexData.getLong();
long taskAttemptId = indexData.getLong();
totalLen += length;
indexTaskIds.add(taskAttemptId);
// If ShuffleServer is flushing the file at this time, the length in the index file record
// may be greater
// than the length in the actual data file, and it needs to be returned at this time to
// avoid EOFException
if (dataFileLen != -1 && totalLen > dataFileLen) {
LOGGER.info(
"Abort inconsistent data, the data length: {}(bytes) recorded in index file is greater than "
+ "the real data file length: {}(bytes). Block id: {}. This should not happen. "
+ "This may happen when the data is flushing, please ignore.",
totalLen,
dataFileLen,
blockId);
break;
}
boolean conditionOfDiscontinuousBlocks =
lastExpectedBlockIndex != -1
&& bufferSegments.size() > 0
&& expectTaskIds.contains(taskAttemptId)
&& index - lastExpectedBlockIndex != 1;
boolean conditionOfLimitedBufferSize = bufferOffset >= readBufferSize;
if (conditionOfDiscontinuousBlocks || conditionOfLimitedBufferSize) {
ShuffleDataSegment sds = new ShuffleDataSegment(fileOffset, bufferOffset, bufferSegments);
dataFileSegments.add(sds);
bufferSegments = Lists.newArrayList();
bufferOffset = 0;
fileOffset = -1;
}
if (expectTaskIds.contains(taskAttemptId)) {
if (fileOffset == -1) {
fileOffset = offset;
}
bufferSegments.add(
new BufferSegment(
blockId, bufferOffset, length, uncompressLength, crc, taskAttemptId));
bufferOffset += length;
lastExpectedBlockIndex = index;
}
index++;
} catch (BufferUnderflowException ue) {
throw new RssException("Read index data under flow", ue);
}
}
if (bufferOffset > 0) {
ShuffleDataSegment sds = new ShuffleDataSegment(fileOffset, bufferOffset, bufferSegments);
dataFileSegments.add(sds);
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"Index file task-ids sequence: {}, expected task-ids: {}",
indexTaskIds,
getExpectedTaskIds(expectTaskIds));
}
return dataFileSegments;
}
private List<Long> getExpectedTaskIds(Roaring64NavigableMap expectTaskIds) {
List<Long> taskIds = new ArrayList<>();
expectTaskIds.forEach(value -> taskIds.add(value));
return taskIds;
}
}