blob: a5fe26b5619ab3d9845465de2ece7815810f328f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.storage;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.common.Checksum;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.EOFException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
/**
* Tests for {@link ChunkInputStream}'s functionality.
*/
public class TestChunkInputStream {
private static final int CHUNK_SIZE = 100;
private static final int BYTES_PER_CHECKSUM = 20;
private static final String CHUNK_NAME = "dummyChunk";
private static final Random RANDOM = new Random();
private static Checksum checksum;
private DummyChunkInputStream chunkStream;
private ChunkInfo chunkInfo;
private byte[] chunkData;
@Before
public void setup() throws Exception {
checksum = new Checksum(ChecksumType.valueOf(
OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE_DEFAULT),
BYTES_PER_CHECKSUM);
chunkData = generateRandomData(CHUNK_SIZE);
chunkInfo = ChunkInfo.newBuilder()
.setChunkName(CHUNK_NAME)
.setOffset(0)
.setLen(CHUNK_SIZE)
.setChecksumData(checksum.computeChecksum(
chunkData, 0, CHUNK_SIZE).getProtoBufMessage())
.build();
chunkStream = new DummyChunkInputStream(chunkInfo, null, null, true);
}
static byte[] generateRandomData(int length) {
byte[] bytes = new byte[length];
RANDOM.nextBytes(bytes);
return bytes;
}
/**
* A dummy ChunkInputStream to mock read chunk calls to DN.
*/
public class DummyChunkInputStream extends ChunkInputStream {
// Stores the read chunk data in each readChunk call
private List<ByteString> readByteBuffers = new ArrayList<>();
DummyChunkInputStream(ChunkInfo chunkInfo,
BlockID blockId,
XceiverClientSpi xceiverClient,
boolean verifyChecksum) {
super(chunkInfo, blockId, xceiverClient, verifyChecksum);
}
public DummyChunkInputStream(ChunkInfo chunkInfo,
BlockID blockId,
XceiverClientSpi xceiverClient,
boolean verifyChecksum,
byte[] data) {
super(chunkInfo, blockId, xceiverClient, verifyChecksum);
chunkData = data;
}
@Override
protected ByteString readChunk(ChunkInfo readChunkInfo) {
ByteString byteString = ByteString.copyFrom(chunkData,
(int) readChunkInfo.getOffset(),
(int) readChunkInfo.getLen());
readByteBuffers.add(byteString);
return byteString;
}
@Override
protected void checkOpen() {
// No action needed
}
}
/**
* Match readData with the chunkData byte-wise.
* @param readData Data read through ChunkInputStream
* @param inputDataStartIndex first index (inclusive) in chunkData to compare
* with read data
* @param length the number of bytes of data to match starting from
* inputDataStartIndex
*/
private void matchWithInputData(byte[] readData, int inputDataStartIndex,
int length) {
for (int i = inputDataStartIndex; i < inputDataStartIndex + length; i++) {
Assert.assertEquals(chunkData[i], readData[i - inputDataStartIndex]);
}
}
/**
* Seek to a position and verify through getPos().
*/
private void seekAndVerify(int pos) throws Exception {
chunkStream.seek(pos);
Assert.assertEquals("Current position of buffer does not match with the " +
"seeked position", pos, chunkStream.getPos());
}
@Test
public void testFullChunkRead() throws Exception {
byte[] b = new byte[CHUNK_SIZE];
chunkStream.read(b, 0, CHUNK_SIZE);
matchWithInputData(b, 0, CHUNK_SIZE);
}
@Test
public void testPartialChunkRead() throws Exception {
int len = CHUNK_SIZE / 2;
byte[] b = new byte[len];
chunkStream.read(b, 0, len);
matchWithInputData(b, 0, len);
// To read chunk data from index 0 to 49 (len = 50), we need to read
// chunk from offset 0 to 60 as the checksum boundary is at every 20
// bytes. Verify that 60 bytes of chunk data are read and stored in the
// buffers.
matchWithInputData(chunkStream.readByteBuffers.get(0).toByteArray(),
0, 60);
}
@Test
public void testSeek() throws Exception {
seekAndVerify(0);
try {
seekAndVerify(CHUNK_SIZE);
Assert.fail("Seeking to Chunk Length should fail.");
} catch (EOFException e) {
GenericTestUtils.assertExceptionContains("EOF encountered at pos: "
+ CHUNK_SIZE + " for chunk: " + CHUNK_NAME, e);
}
// Seek before read should update the ChunkInputStream#chunkPosition
seekAndVerify(25);
Assert.assertEquals(25, chunkStream.getChunkPosition());
// Read from the seeked position.
// Reading from index 25 to 54 should result in the ChunkInputStream
// copying chunk data from index 20 to 59 into the buffers (checksum
// boundaries).
byte[] b = new byte[30];
chunkStream.read(b, 0, 30);
matchWithInputData(b, 25, 30);
matchWithInputData(chunkStream.readByteBuffers.get(0).toByteArray(),
20, 40);
// After read, the position of the chunkStream is evaluated from the
// buffers and the chunkPosition should be reset to -1.
Assert.assertEquals(-1, chunkStream.getChunkPosition());
// Seek to a position within the current buffers. Current buffers contain
// data from index 20 to 59. ChunkPosition should still not be used to
// set the position.
seekAndVerify(35);
Assert.assertEquals(-1, chunkStream.getChunkPosition());
// Seek to a position outside the current buffers. In this case, the
// chunkPosition should be updated to the seeked position.
seekAndVerify(75);
Assert.assertEquals(75, chunkStream.getChunkPosition());
}
@Test
public void testSeekAndRead() throws Exception {
// Seek to a position and read data
seekAndVerify(50);
byte[] b1 = new byte[20];
chunkStream.read(b1, 0, 20);
matchWithInputData(b1, 50, 20);
// Next read should start from the position of the last read + 1 i.e. 70
byte[] b2 = new byte[20];
chunkStream.read(b2, 0, 20);
matchWithInputData(b2, 70, 20);
}
}