blob: 042bfd941743e4283f217787f96153443658bff3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.storage;
import com.google.common.primitives.Bytes;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ContainerBlockID;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
import org.apache.hadoop.ozone.common.Checksum;
import org.apache.hadoop.security.token.Token;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import static org.apache.hadoop.hdds.scm.storage.TestChunkInputStream.generateRandomData;
/**
* Tests for {@link BlockInputStream}'s functionality.
*/
public class TestBlockInputStream {
private static final int CHUNK_SIZE = 100;
private static Checksum checksum;
private BlockInputStream blockStream;
private byte[] blockData;
private int blockSize;
private List<ChunkInfo> chunks;
private Map<String, byte[]> chunkDataMap;
@Before
public void setup() throws Exception {
BlockID blockID = new BlockID(new ContainerBlockID(1, 1));
checksum = new Checksum(ChecksumType.NONE, CHUNK_SIZE);
createChunkList(5);
blockStream = new DummyBlockInputStream(blockID, blockSize, null, null,
false, null);
}
/**
* Create a mock list of chunks. The first n-1 chunks of length CHUNK_SIZE
* and the last chunk with length CHUNK_SIZE/2.
*/
private void createChunkList(int numChunks)
throws Exception {
chunks = new ArrayList<>(numChunks);
chunkDataMap = new HashMap<>();
blockData = new byte[0];
int i, chunkLen;
byte[] byteData;
String chunkName;
for (i = 0; i < numChunks; i++) {
chunkName = "chunk-" + i;
chunkLen = CHUNK_SIZE;
if (i == numChunks - 1) {
chunkLen = CHUNK_SIZE / 2;
}
byteData = generateRandomData(chunkLen);
ChunkInfo chunkInfo = ChunkInfo.newBuilder()
.setChunkName(chunkName)
.setOffset(0)
.setLen(chunkLen)
.setChecksumData(checksum.computeChecksum(
byteData, 0, chunkLen).getProtoBufMessage())
.build();
chunkDataMap.put(chunkName, byteData);
chunks.add(chunkInfo);
blockSize += chunkLen;
blockData = Bytes.concat(blockData, byteData);
}
}
/**
* A dummy BlockInputStream to mock read block call to DN.
*/
private class DummyBlockInputStream extends BlockInputStream {
DummyBlockInputStream(BlockID blockId,
long blockLen,
Pipeline pipeline,
Token<OzoneBlockTokenIdentifier> token,
boolean verifyChecksum,
XceiverClientManager xceiverClientManager) {
super(blockId, blockLen, pipeline, token, verifyChecksum,
xceiverClientManager);
}
@Override
protected List<ChunkInfo> getChunkInfos() {
return chunks;
}
@Override
protected void addStream(ChunkInfo chunkInfo) {
TestChunkInputStream testChunkInputStream = new TestChunkInputStream();
getChunkStreams().add(testChunkInputStream.new DummyChunkInputStream(
chunkInfo, null, null, false,
chunkDataMap.get(chunkInfo.getChunkName()).clone()));
}
@Override
protected synchronized void checkOpen() throws IOException {
// No action needed
}
}
private void seekAndVerify(int pos) throws Exception {
blockStream.seek(pos);
Assert.assertEquals("Current position of buffer does not match with the " +
"seeked position", pos, blockStream.getPos());
}
/**
* Match readData with the chunkData byte-wise.
* @param readData Data read through ChunkInputStream
* @param inputDataStartIndex first index (inclusive) in chunkData to compare
* with read data
* @param length the number of bytes of data to match starting from
* inputDataStartIndex
*/
private void matchWithInputData(byte[] readData, int inputDataStartIndex,
int length) {
for (int i = inputDataStartIndex; i < inputDataStartIndex + length; i++) {
Assert.assertEquals(blockData[i], readData[i - inputDataStartIndex]);
}
}
@Test
public void testSeek() throws Exception {
// Seek to position 0
int pos = 0;
seekAndVerify(pos);
Assert.assertEquals("ChunkIndex is incorrect", 0,
blockStream.getChunkIndex());
// Before BlockInputStream is initialized (initialization happens during
// read operation), seek should update the BlockInputStream#blockPosition
pos = CHUNK_SIZE;
seekAndVerify(pos);
Assert.assertEquals("ChunkIndex is incorrect", 0,
blockStream.getChunkIndex());
Assert.assertEquals(pos, blockStream.getBlockPosition());
// Initialize the BlockInputStream. After initializtion, the chunkIndex
// should be updated to correspond to the seeked position.
blockStream.initialize();
Assert.assertEquals("ChunkIndex is incorrect", 1,
blockStream.getChunkIndex());
pos = (CHUNK_SIZE * 4) + 5;
seekAndVerify(pos);
Assert.assertEquals("ChunkIndex is incorrect", 4,
blockStream.getChunkIndex());
try {
// Try seeking beyond the blockSize.
pos = blockSize + 10;
seekAndVerify(pos);
Assert.fail("Seek to position beyond block size should fail.");
} catch (EOFException e) {
System.out.println(e);
}
// Seek to random positions between 0 and the block size.
Random random = new Random();
for (int i = 0; i < 10; i++) {
pos = random.nextInt(blockSize);
seekAndVerify(pos);
}
}
@Test
public void testRead() throws Exception {
// read 200 bytes of data starting from position 50. Chunk0 contains
// indices 0 to 99, chunk1 from 100 to 199 and chunk3 from 200 to 299. So
// the read should result in 3 ChunkInputStream reads
seekAndVerify(50);
byte[] b = new byte[200];
blockStream.read(b, 0, 200);
matchWithInputData(b, 50, 200);
// The new position of the blockInputStream should be the last index read
// + 1.
Assert.assertEquals(250, blockStream.getPos());
Assert.assertEquals(2, blockStream.getChunkIndex());
}
@Test
public void testSeekAndRead() throws Exception {
// Seek to a position and read data
seekAndVerify(50);
byte[] b1 = new byte[100];
blockStream.read(b1, 0, 100);
matchWithInputData(b1, 50, 100);
// Next read should start from the position of the last read + 1 i.e. 100
byte[] b2 = new byte[100];
blockStream.read(b2, 0, 100);
matchWithInputData(b2, 150, 100);
}
}