| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.uniffle.common.segment; |
| |
| import java.nio.BufferUnderflowException; |
| import java.nio.ByteBuffer; |
| import java.util.List; |
| |
| import com.google.common.collect.Lists; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.uniffle.common.BufferSegment; |
| import org.apache.uniffle.common.ShuffleDataSegment; |
| import org.apache.uniffle.common.ShuffleIndexResult; |
| import org.apache.uniffle.common.exception.RssException; |
| |
| public class FixedSizeSegmentSplitter implements SegmentSplitter { |
| private static final Logger LOGGER = LoggerFactory.getLogger(FixedSizeSegmentSplitter.class); |
| |
| private int readBufferSize; |
| |
| public FixedSizeSegmentSplitter(int readBufferSize) { |
| this.readBufferSize = readBufferSize; |
| } |
| |
| @Override |
| public List<ShuffleDataSegment> split(ShuffleIndexResult shuffleIndexResult) { |
| if (shuffleIndexResult == null || shuffleIndexResult.isEmpty()) { |
| return Lists.newArrayList(); |
| } |
| |
| ByteBuffer indexData = shuffleIndexResult.getIndexData(); |
| long dataFileLen = shuffleIndexResult.getDataFileLen(); |
| return transIndexDataToSegments(indexData, readBufferSize, dataFileLen); |
| } |
| |
| private static List<ShuffleDataSegment> transIndexDataToSegments( |
| ByteBuffer indexData, int readBufferSize, long dataFileLen) { |
| List<BufferSegment> bufferSegments = Lists.newArrayList(); |
| List<ShuffleDataSegment> dataFileSegments = Lists.newArrayList(); |
| int bufferOffset = 0; |
| long fileOffset = -1; |
| long totalLength = 0; |
| |
| while (indexData.hasRemaining()) { |
| try { |
| final long offset = indexData.getLong(); |
| final int length = indexData.getInt(); |
| final int uncompressLength = indexData.getInt(); |
| final long crc = indexData.getLong(); |
| final long blockId = indexData.getLong(); |
| final long taskAttemptId = indexData.getLong(); |
| |
| // The index file is written, read and parsed sequentially, so these parsed index segments |
| // index a continuous shuffle data in the corresponding data file and the first segment's |
| // offset field is the offset of these shuffle data in the data file. |
| if (fileOffset == -1) { |
| fileOffset = offset; |
| } |
| |
| totalLength += length; |
| |
| // If ShuffleServer is flushing the file at this time, the length in the index file record |
| // may be greater |
| // than the length in the actual data file, and it needs to be returned at this time to |
| // avoid EOFException |
| if (dataFileLen != -1 && totalLength > dataFileLen) { |
| LOGGER.info( |
| "Abort inconsistent data, the data length: {}(bytes) recorded in index file is greater than " |
| + "the real data file length: {}(bytes). Block id: {}" |
| + "This may happen when the data is flushing, please ignore.", |
| totalLength, |
| dataFileLen, |
| blockId); |
| break; |
| } |
| |
| bufferSegments.add( |
| new BufferSegment(blockId, bufferOffset, length, uncompressLength, crc, taskAttemptId)); |
| bufferOffset += length; |
| |
| if (bufferOffset >= readBufferSize) { |
| ShuffleDataSegment sds = new ShuffleDataSegment(fileOffset, bufferOffset, bufferSegments); |
| dataFileSegments.add(sds); |
| bufferSegments = Lists.newArrayList(); |
| bufferOffset = 0; |
| fileOffset = -1; |
| } |
| } catch (BufferUnderflowException ue) { |
| throw new RssException("Read index data under flow", ue); |
| } |
| } |
| |
| if (bufferOffset > 0) { |
| ShuffleDataSegment sds = new ShuffleDataSegment(fileOffset, bufferOffset, bufferSegments); |
| dataFileSegments.add(sds); |
| } |
| |
| return dataFileSegments; |
| } |
| } |