blob: 940caa73b0f4e085d55e208d722c3b758ef6d6e6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.storage;
import com.google.common.primitives.Bytes;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ContainerBlockID;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.scm.pipeline.MockPipeline;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.ozone.common.Checksum;
import org.apache.hadoop.ozone.common.OzoneChecksumException;
import org.apache.hadoop.test.LambdaTestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_NOT_FOUND;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_UNHEALTHY;
import static org.apache.hadoop.hdds.scm.storage.TestChunkInputStream.generateRandomData;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
* Tests for {@link BlockInputStream}'s functionality.
*/
@RunWith(MockitoJUnitRunner.class)
public class TestBlockInputStream {
private static final int CHUNK_SIZE = 100;
private static Checksum checksum;
private BlockInputStream blockStream;
private byte[] blockData;
private int blockSize;
private List<ChunkInfo> chunks;
private Map<String, byte[]> chunkDataMap;
@Mock
private Function<BlockID, Pipeline> refreshPipeline;
@Before
public void setup() throws Exception {
BlockID blockID = new BlockID(new ContainerBlockID(1, 1));
checksum = new Checksum(ChecksumType.NONE, CHUNK_SIZE);
createChunkList(5);
blockStream = new DummyBlockInputStream(blockID, blockSize, null, null,
false, null, refreshPipeline, chunks, chunkDataMap);
}
/**
* Create a mock list of chunks. The first n-1 chunks of length CHUNK_SIZE
* and the last chunk with length CHUNK_SIZE/2.
*/
private void createChunkList(int numChunks)
throws Exception {
chunks = new ArrayList<>(numChunks);
chunkDataMap = new HashMap<>();
blockData = new byte[0];
int i, chunkLen;
byte[] byteData;
String chunkName;
for (i = 0; i < numChunks; i++) {
chunkName = "chunk-" + i;
chunkLen = CHUNK_SIZE;
if (i == numChunks - 1) {
chunkLen = CHUNK_SIZE / 2;
}
byteData = generateRandomData(chunkLen);
ChunkInfo chunkInfo = ChunkInfo.newBuilder()
.setChunkName(chunkName)
.setOffset(0)
.setLen(chunkLen)
.setChecksumData(checksum.computeChecksum(
byteData, 0, chunkLen).getProtoBufMessage())
.build();
chunkDataMap.put(chunkName, byteData);
chunks.add(chunkInfo);
blockSize += chunkLen;
blockData = Bytes.concat(blockData, byteData);
}
}
private void seekAndVerify(int pos) throws Exception {
blockStream.seek(pos);
Assert.assertEquals("Current position of buffer does not match with the " +
"seeked position", pos, blockStream.getPos());
}
/**
* Match readData with the chunkData byte-wise.
* @param readData Data read through ChunkInputStream
* @param inputDataStartIndex first index (inclusive) in chunkData to compare
* with read data
* @param length the number of bytes of data to match starting from
* inputDataStartIndex
*/
private void matchWithInputData(byte[] readData, int inputDataStartIndex,
int length) {
for (int i = inputDataStartIndex; i < inputDataStartIndex + length; i++) {
Assert.assertEquals(blockData[i], readData[i - inputDataStartIndex]);
}
}
@Test
public void testSeek() throws Exception {
// Seek to position 0
int pos = 0;
seekAndVerify(pos);
Assert.assertEquals("ChunkIndex is incorrect", 0,
blockStream.getChunkIndex());
// Before BlockInputStream is initialized (initialization happens during
// read operation), seek should update the BlockInputStream#blockPosition
pos = CHUNK_SIZE;
seekAndVerify(pos);
Assert.assertEquals("ChunkIndex is incorrect", 0,
blockStream.getChunkIndex());
Assert.assertEquals(pos, blockStream.getBlockPosition());
// Initialize the BlockInputStream. After initializtion, the chunkIndex
// should be updated to correspond to the seeked position.
blockStream.initialize();
Assert.assertEquals("ChunkIndex is incorrect", 1,
blockStream.getChunkIndex());
pos = (CHUNK_SIZE * 4) + 5;
seekAndVerify(pos);
Assert.assertEquals("ChunkIndex is incorrect", 4,
blockStream.getChunkIndex());
try {
// Try seeking beyond the blockSize.
pos = blockSize + 10;
seekAndVerify(pos);
Assert.fail("Seek to position beyond block size should fail.");
} catch (EOFException e) {
System.out.println(e);
}
// Seek to random positions between 0 and the block size.
Random random = new Random();
for (int i = 0; i < 10; i++) {
pos = random.nextInt(blockSize);
seekAndVerify(pos);
}
}
@Test
public void testRead() throws Exception {
// read 200 bytes of data starting from position 50. Chunk0 contains
// indices 0 to 99, chunk1 from 100 to 199 and chunk3 from 200 to 299. So
// the read should result in 3 ChunkInputStream reads
seekAndVerify(50);
byte[] b = new byte[200];
blockStream.read(b, 0, 200);
matchWithInputData(b, 50, 200);
// The new position of the blockInputStream should be the last index read
// + 1.
Assert.assertEquals(250, blockStream.getPos());
Assert.assertEquals(2, blockStream.getChunkIndex());
}
@Test
public void testSeekAndRead() throws Exception {
// Seek to a position and read data
seekAndVerify(50);
byte[] b1 = new byte[100];
blockStream.read(b1, 0, 100);
matchWithInputData(b1, 50, 100);
// Next read should start from the position of the last read + 1 i.e. 100
byte[] b2 = new byte[100];
blockStream.read(b2, 0, 100);
matchWithInputData(b2, 150, 100);
}
@Test
public void testRefreshPipelineFunction() throws Exception {
BlockID blockID = new BlockID(new ContainerBlockID(1, 1));
AtomicBoolean isRefreshed = new AtomicBoolean();
createChunkList(5);
BlockInputStream blockInputStreamWithRetry =
new DummyBlockInputStreamWithRetry(blockID, blockSize,
MockPipeline.createSingleNodePipeline(), null,
false, null, chunks, chunkDataMap, isRefreshed);
Assert.assertFalse(isRefreshed.get());
seekAndVerify(50);
byte[] b = new byte[200];
blockInputStreamWithRetry.read(b, 0, 200);
Assert.assertTrue(isRefreshed.get());
}
@Test
public void testRefreshOnReadFailure() throws Exception {
// GIVEN
BlockID blockID = new BlockID(new ContainerBlockID(1, 1));
Pipeline pipeline = MockPipeline.createSingleNodePipeline();
Pipeline newPipeline = MockPipeline.createSingleNodePipeline();
final int len = 200;
final ChunkInputStream stream = mock(ChunkInputStream.class);
when(stream.read(any(), anyInt(), anyInt()))
.thenThrow(new StorageContainerException("test", CONTAINER_NOT_FOUND))
.thenReturn(len);
when(stream.getRemaining())
.thenReturn((long) len);
when(refreshPipeline.apply(blockID))
.thenReturn(newPipeline);
BlockInputStream subject = new DummyBlockInputStream(blockID, blockSize,
pipeline, null, false, null, refreshPipeline, chunks, null) {
@Override
protected ChunkInputStream createChunkInputStream(ChunkInfo chunkInfo) {
return stream;
}
};
subject.initialize();
// WHEN
byte[] b = new byte[len];
int bytesRead = subject.read(b, 0, len);
// THEN
Assert.assertEquals(len, bytesRead);
verify(refreshPipeline).apply(blockID);
}
@Test
public void testRefreshExitsIfPipelineHasSameNodes() throws Exception {
// GIVEN
BlockID blockID = new BlockID(new ContainerBlockID(1, 1));
Pipeline pipeline = MockPipeline.createSingleNodePipeline();
final int len = 200;
final ChunkInputStream stream = mock(ChunkInputStream.class);
when(stream.read(any(), anyInt(), anyInt()))
.thenThrow(new StorageContainerException("test", CONTAINER_UNHEALTHY));
when(stream.getRemaining())
.thenReturn((long) len);
when(refreshPipeline.apply(blockID))
.thenAnswer(invocation -> samePipelineWithNewId(pipeline));
BlockInputStream subject = new DummyBlockInputStream(blockID, blockSize,
pipeline, null, false, null, refreshPipeline, chunks, null) {
@Override
protected ChunkInputStream createChunkInputStream(ChunkInfo chunkInfo) {
return stream;
}
};
subject.initialize();
// WHEN
byte[] b = new byte[len];
LambdaTestUtils.intercept(StorageContainerException.class,
() -> subject.read(b, 0, len));
// THEN
verify(refreshPipeline).apply(blockID);
}
@Test
public void testReadNotRetriedOnOtherException() throws Exception {
// GIVEN
BlockID blockID = new BlockID(new ContainerBlockID(1, 1));
Pipeline pipeline = MockPipeline.createSingleNodePipeline();
final int len = 200;
final ChunkInputStream stream = mock(ChunkInputStream.class);
when(stream.read(any(), anyInt(), anyInt()))
.thenThrow(new OzoneChecksumException("checksum missing"));
when(stream.getRemaining())
.thenReturn((long) len);
BlockInputStream subject = new DummyBlockInputStream(blockID, blockSize,
pipeline, null, false, null, refreshPipeline, chunks, null) {
@Override
protected ChunkInputStream createChunkInputStream(ChunkInfo chunkInfo) {
return stream;
}
};
subject.initialize();
// WHEN
byte[] b = new byte[len];
LambdaTestUtils.intercept(OzoneChecksumException.class,
() -> subject.read(b, 0, len));
// THEN
verify(refreshPipeline, never()).apply(blockID);
}
private Pipeline samePipelineWithNewId(Pipeline pipeline) {
List<DatanodeDetails> reverseOrder = new ArrayList<>(pipeline.getNodes());
Collections.reverse(reverseOrder);
return MockPipeline.createPipeline(reverseOrder);
}
@Test
public void testRefreshOnReadFailureAfterUnbuffer() throws Exception {
// GIVEN
BlockID blockID = new BlockID(new ContainerBlockID(1, 1));
Pipeline pipeline = MockPipeline.createSingleNodePipeline();
Pipeline newPipeline = MockPipeline.createSingleNodePipeline();
XceiverClientFactory clientFactory = mock(XceiverClientFactory.class);
XceiverClientSpi client = mock(XceiverClientSpi.class);
when(clientFactory.acquireClientForReadData(pipeline))
.thenReturn(client);
final int len = 200;
final ChunkInputStream stream = mock(ChunkInputStream.class);
when(stream.read(any(), anyInt(), anyInt()))
.thenThrow(new StorageContainerException("test", CONTAINER_NOT_FOUND))
.thenReturn(len);
when(stream.getRemaining())
.thenReturn((long) len);
when(refreshPipeline.apply(blockID))
.thenReturn(newPipeline);
BlockInputStream subject = new BlockInputStream(blockID, blockSize,
pipeline, null, false, clientFactory, refreshPipeline) {
@Override
protected List<ChunkInfo> getChunkInfos() throws IOException {
acquireClient();
return chunks;
}
@Override
protected ChunkInputStream createChunkInputStream(ChunkInfo chunkInfo) {
return stream;
}
};
subject.initialize();
subject.unbuffer();
// WHEN
byte[] b = new byte[len];
int bytesRead = subject.read(b, 0, len);
// THEN
Assert.assertEquals(len, bytesRead);
verify(refreshPipeline).apply(blockID);
verify(clientFactory).acquireClientForReadData(pipeline);
verify(clientFactory).releaseClient(client, false);
}
}