blob: fc051c82fc44f4d85fdb7e5a610d806fbe71b820 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.client.rpc.read;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
import org.apache.hadoop.hdds.scm.storage.ChunkInputStream;
import org.apache.hadoop.ozone.client.io.KeyInputStream;
import org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion;
import org.junit.Assert;
import org.junit.Test;
/**
* Tests {@link ChunkInputStream}.
*/
public class TestChunkInputStream extends TestInputStreamBase {
public TestChunkInputStream(ChunkLayOutVersion layout) {
super(layout);
}
/**
* Test to verify that data read from chunks is stored in a list of buffers
* with max capacity equal to the bytes per checksum.
*/
@Test
public void testChunkReadBuffers() throws Exception {
String keyName = getNewKeyName();
int dataLength = (2 * BLOCK_SIZE) + (CHUNK_SIZE);
byte[] inputData = writeRandomBytes(keyName, dataLength);
KeyInputStream keyInputStream = getKeyInputStream(keyName);
BlockInputStream block0Stream = keyInputStream.getBlockStreams().get(0);
block0Stream.initialize();
ChunkInputStream chunk0Stream = block0Stream.getChunkStreams().get(0);
// To read 1 byte of chunk data, ChunkInputStream should get one full
// checksum boundary worth of data from Container and store it in buffers.
chunk0Stream.read(new byte[1]);
checkBufferSizeAndCapacity(chunk0Stream.getCachedBuffers(), 1, 0,
BYTES_PER_CHECKSUM);
// Read > checksum boundary of data from chunk0
int readDataLen = BYTES_PER_CHECKSUM + (BYTES_PER_CHECKSUM / 2);
byte[] readData = readDataFromChunk(chunk0Stream, 0, readDataLen);
validateData(inputData, 0, readData);
// The first checksum boundary size of data was already existing in the
// ChunkStream buffers. Once that data is read, the next checksum
// boundary size of data will be fetched again to read the remaining data.
// Hence there should be 1 checksum boundary size of data stored in the
// ChunkStreams buffers at the end of the read.
checkBufferSizeAndCapacity(chunk0Stream.getCachedBuffers(), 1, 0,
BYTES_PER_CHECKSUM);
// Seek to a position in the third checksum boundary (so that current
// buffers do not have the seeked position) and read > BYTES_PER_CHECKSUM
// bytes of data. This should result in 2 * BYTES_PER_CHECKSUM amount of
// data being read into the buffers. There should be 2 buffers in the
// stream but the the first buffer should be released after it is read
// and the second buffer should have BYTES_PER_CHECKSUM capacity.
readDataLen = BYTES_PER_CHECKSUM + (BYTES_PER_CHECKSUM / 2);
int offset = 2 * BYTES_PER_CHECKSUM + 1;
readData = readDataFromChunk(chunk0Stream, offset, readDataLen);
validateData(inputData, offset, readData);
checkBufferSizeAndCapacity(chunk0Stream.getCachedBuffers(), 2, 1,
BYTES_PER_CHECKSUM);
// Read the full chunk data - 1 and verify that all chunk data is read into
// buffers. We read CHUNK_SIZE - 1 as otherwise all the buffers will be
// released once all chunk data is read.
readData = readDataFromChunk(chunk0Stream, 0, CHUNK_SIZE - 1);
validateData(inputData, 0, readData);
int expectedNumBuffers = CHUNK_SIZE / BYTES_PER_CHECKSUM;
checkBufferSizeAndCapacity(chunk0Stream.getCachedBuffers(),
expectedNumBuffers, expectedNumBuffers - 1, BYTES_PER_CHECKSUM);
// Read the last byte of chunk and verify that the buffers are released.
chunk0Stream.read(new byte[1]);
Assert.assertNull("ChunkInputStream did not release buffers after " +
"reaching EOF.", chunk0Stream.getCachedBuffers());
}
/**
* Test that ChunkInputStream buffers are released as soon as the last byte
* of the buffer is read.
*/
@Test
public void testBufferRelease() throws Exception {
String keyName = getNewKeyName();
int dataLength = CHUNK_SIZE;
byte[] inputData = writeRandomBytes(keyName, dataLength);
try (KeyInputStream keyInputStream = getKeyInputStream(keyName)) {
BlockInputStream block0Stream = keyInputStream.getBlockStreams().get(0);
block0Stream.initialize();
ChunkInputStream chunk0Stream = block0Stream.getChunkStreams().get(0);
// Read checksum boundary - 1 bytes of data
int readDataLen = BYTES_PER_CHECKSUM - 1;
byte[] readData = readDataFromChunk(chunk0Stream, 0, readDataLen);
validateData(inputData, 0, readData);
// There should be 1 byte of data remaining in the buffer which is not
// yet read. Hence, the buffer should not be released.
checkBufferSizeAndCapacity(chunk0Stream.getCachedBuffers(),
1, 0, BYTES_PER_CHECKSUM);
Assert.assertEquals(1, chunk0Stream.getCachedBuffers()[0].remaining());
// Reading the last byte in the buffer should result in all the buffers
// being released.
readData = readDataFromChunk(chunk0Stream, 1);
validateData(inputData, readDataLen, readData);
Assert
.assertNull("Chunk stream buffers not released after last byte is " +
"read", chunk0Stream.getCachedBuffers());
// Read more data to get the data till the next checksum boundary.
readDataLen = BYTES_PER_CHECKSUM / 2;
readData = readDataFromChunk(chunk0Stream, readDataLen);
// There should be one buffer and the buffer should not be released as
// there is data pending to be read from the buffer
checkBufferSizeAndCapacity(chunk0Stream.getCachedBuffers(), 1, 0,
BYTES_PER_CHECKSUM);
ByteBuffer lastCachedBuffer = chunk0Stream.getCachedBuffers()[0];
Assert.assertEquals(BYTES_PER_CHECKSUM - readDataLen,
lastCachedBuffer.remaining());
// Read more than the remaining data in buffer (but less than the next
// checksum boundary).
int position = (int) chunk0Stream.getPos();
readDataLen = lastCachedBuffer.remaining() + BYTES_PER_CHECKSUM / 2;
readData = readDataFromChunk(chunk0Stream, readDataLen);
validateData(inputData, position, readData);
// After reading the remaining data in the buffer, the buffer should be
// released and next checksum size of data must be read into the buffers
checkBufferSizeAndCapacity(chunk0Stream.getCachedBuffers(), 1, 0,
BYTES_PER_CHECKSUM);
// Verify that the previously cached buffer is released by comparing it
// with the current cached buffer
Assert.assertNotEquals(lastCachedBuffer,
chunk0Stream.getCachedBuffers()[0]);
}
}
private byte[] readDataFromChunk(ChunkInputStream chunkInputStream,
int offset, int readDataLength) throws IOException {
byte[] readData = new byte[readDataLength];
chunkInputStream.seek(offset);
chunkInputStream.read(readData, 0, readDataLength);
return readData;
}
private byte[] readDataFromChunk(ChunkInputStream chunkInputStream,
int readDataLength) throws IOException {
byte[] readData = new byte[readDataLength];
chunkInputStream.read(readData, 0, readDataLength);
return readData;
}
/**
* Verify number of buffers and their capacities.
* @param buffers chunk stream buffers
* @param expectedNumBuffers expected number of buffers
* @param numReleasedBuffers first numReleasedBuffers are expected to
* be released and hence null
* @param expectedBufferCapacity expected buffer capacity of unreleased
* buffers
*/
private void checkBufferSizeAndCapacity(ByteBuffer[] buffers,
int expectedNumBuffers, int numReleasedBuffers,
long expectedBufferCapacity) {
Assert.assertEquals("ChunkInputStream does not have expected number of " +
"ByteBuffers", expectedNumBuffers, buffers.length);
for (int i = 0; i < buffers.length; i++) {
if (i <= numReleasedBuffers - 1) {
// This buffer should have been released and hence null
Assert.assertNull("ChunkInputStream Buffer not released after being " +
"read", buffers[i]);
} else {
Assert.assertEquals("ChunkInputStream ByteBuffer capacity is wrong",
expectedBufferCapacity, buffers[i].capacity());
}
}
}
}