blob: 2a55ae4de279c2638ce4a4e5840b5816c6ac6aba [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.uniffle.storage.handler.impl;
import java.nio.ByteBuffer;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import com.google.common.collect.Maps;
import io.netty.buffer.Unpooled;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatcher;
import org.mockito.Mockito;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.apache.uniffle.client.api.ShuffleServerClient;
import org.apache.uniffle.client.request.RssGetShuffleDataRequest;
import org.apache.uniffle.client.response.RssGetShuffleDataResponse;
import org.apache.uniffle.client.response.RssGetShuffleIndexResponse;
import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.ShufflePartitionedBlock;
import org.apache.uniffle.common.netty.buffer.NettyManagedBuffer;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.storage.common.FileBasedShuffleSegment;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class LocalFileServerReadHandlerTest {
@Test
public void testDataInconsistent() throws Exception {
Map<Long, byte[]> expectedData = Maps.newHashMap();
int expectTotalBlockNum = 4;
int blockSize = 7;
ByteBuffer byteBuffer = ByteBuffer.allocate(expectTotalBlockNum * 40);
Roaring64NavigableMap expectBlockIds = Roaring64NavigableMap.bitmapOf();
// We simulate the generation of 4 block index files and 3 block data files to test
// LocalFileClientReadHandler
List<ShufflePartitionedBlock> blocks =
LocalFileHandlerTestBase.generateBlocks(expectTotalBlockNum, blockSize);
LocalFileHandlerTestBase.writeTestData(
blocks,
shuffleBlocks -> {
int offset = 0;
for (ShufflePartitionedBlock block : shuffleBlocks) {
FileBasedShuffleSegment segment =
new FileBasedShuffleSegment(
block.getBlockId(),
offset,
block.getLength(),
block.getUncompressLength(),
block.getCrc(),
block.getTaskAttemptId());
offset += block.getLength();
LocalFileHandlerTestBase.writeIndex(byteBuffer, segment);
}
},
expectedData,
new HashSet<>());
byteBuffer.rewind();
blocks.forEach(block -> expectBlockIds.addLong(block.getBlockId()));
String appId = "app1";
int shuffleId = 1;
int partitionId = 1;
ShuffleServerClient mockShuffleServerClient = Mockito.mock(ShuffleServerClient.class);
int actualWriteDataBlock = expectTotalBlockNum - 1;
int actualFileLen = blockSize * actualWriteDataBlock;
RssGetShuffleIndexResponse response =
new RssGetShuffleIndexResponse(
StatusCode.SUCCESS,
new NettyManagedBuffer(Unpooled.wrappedBuffer(byteBuffer)),
actualFileLen);
Mockito.doReturn(response).when(mockShuffleServerClient).getShuffleIndex(Mockito.any());
int readBufferSize = 13;
int bytesPerSegment = ((readBufferSize / blockSize) + 1) * blockSize;
List<Long> actualWriteBlockIds =
blocks.stream()
.map(ShufflePartitionedBlock::getBlockId)
.limit(actualWriteDataBlock)
.collect(Collectors.toList());
List<byte[]> segments =
LocalFileHandlerTestBase.calcSegmentBytes(
expectedData, bytesPerSegment, actualWriteBlockIds);
// first segment include 2 blocks
ArgumentMatcher<RssGetShuffleDataRequest> segment1Match =
(request) -> request.getOffset() == 0 && request.getLength() == bytesPerSegment;
// second segment include 1 block
ArgumentMatcher<RssGetShuffleDataRequest> segment2Match =
(request) -> request.getOffset() == bytesPerSegment && request.getLength() == blockSize;
RssGetShuffleDataResponse segment1Response =
new RssGetShuffleDataResponse(StatusCode.SUCCESS, ByteBuffer.wrap(segments.get(0)));
RssGetShuffleDataResponse segment2Response =
new RssGetShuffleDataResponse(StatusCode.SUCCESS, ByteBuffer.wrap(segments.get(1)));
Mockito.doReturn(segment1Response)
.when(mockShuffleServerClient)
.getShuffleData(Mockito.argThat(segment1Match));
Mockito.doReturn(segment2Response)
.when(mockShuffleServerClient)
.getShuffleData(Mockito.argThat(segment2Match));
Roaring64NavigableMap processBlockIds = Roaring64NavigableMap.bitmapOf();
LocalFileClientReadHandler handler =
new LocalFileClientReadHandler(
appId,
partitionId,
shuffleId,
-1,
1,
1,
readBufferSize,
expectBlockIds,
processBlockIds,
mockShuffleServerClient);
int totalSegment = ((blockSize * actualWriteDataBlock) / bytesPerSegment) + 1;
int readBlocks = 0;
for (int i = 0; i < totalSegment; i++) {
ShuffleDataResult result = handler.readShuffleData();
LocalFileHandlerTestBase.checkData(result, expectedData);
readBlocks += result.getBufferSegments().size();
}
assertEquals(actualWriteDataBlock, readBlocks);
}
}