blob: 700ef5ced3d8a7024aafe4377b5047e50cb43234 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
/**
* Unit tests for {@link S3ADataBlocks}.
*/
public class TestDataBlocks extends Assert {
@Rule
public Timeout testTimeout = new Timeout(30 * 1000);
@Before
public void nameThread() {
Thread.currentThread().setName("JUnit");
}
/**
* Test the {@link S3ADataBlocks.ByteBufferBlockFactory}.
* That code implements an input stream over a ByteBuffer, and has to
* return the buffer to the pool after the read complete.
*
* This test verifies the basic contract of the process.
*/
@Test
public void testByteBufferIO() throws Throwable {
try (S3ADataBlocks.ByteBufferBlockFactory factory =
new S3ADataBlocks.ByteBufferBlockFactory(null)) {
int limit = 128;
S3ADataBlocks.ByteBufferBlockFactory.ByteBufferBlock block
= factory.create(1, limit, null);
assertOutstandingBuffers(factory, 1);
byte[] buffer = ContractTestUtils.toAsciiByteArray("test data");
int bufferLen = buffer.length;
block.write(buffer, 0, bufferLen);
assertEquals(bufferLen, block.dataSize());
assertEquals("capacity in " + block,
limit - bufferLen, block.remainingCapacity());
assertTrue("hasCapacity(64) in " + block, block.hasCapacity(64));
assertTrue("No capacity in " + block,
block.hasCapacity(limit - bufferLen));
// now start the write
S3ADataBlocks.BlockUploadData blockUploadData = block.startUpload();
S3ADataBlocks.ByteBufferBlockFactory.ByteBufferBlock.ByteBufferInputStream
stream =
(S3ADataBlocks.ByteBufferBlockFactory.ByteBufferBlock.ByteBufferInputStream)
blockUploadData.getUploadStream();
assertTrue("Mark not supported in " + stream, stream.markSupported());
assertTrue("!hasRemaining() in " + stream, stream.hasRemaining());
int expected = bufferLen;
assertEquals("wrong available() in " + stream,
expected, stream.available());
assertEquals('t', stream.read());
stream.mark(limit);
expected--;
assertEquals("wrong available() in " + stream,
expected, stream.available());
// read into a byte array with an offset
int offset = 5;
byte[] in = new byte[limit];
assertEquals(2, stream.read(in, offset, 2));
assertEquals('e', in[offset]);
assertEquals('s', in[offset + 1]);
expected -= 2;
assertEquals("wrong available() in " + stream,
expected, stream.available());
// read to end
byte[] remainder = new byte[limit];
int c;
int index = 0;
while ((c = stream.read()) >= 0) {
remainder[index++] = (byte) c;
}
assertEquals(expected, index);
assertEquals('a', remainder[--index]);
assertEquals("wrong available() in " + stream,
0, stream.available());
assertTrue("hasRemaining() in " + stream, !stream.hasRemaining());
// go the mark point
stream.reset();
assertEquals('e', stream.read());
// when the stream is closed, the data should be returned
stream.close();
assertOutstandingBuffers(factory, 1);
block.close();
assertOutstandingBuffers(factory, 0);
stream.close();
assertOutstandingBuffers(factory, 0);
}
}
/**
* Assert the number of buffers active for a block factory.
* @param factory factory
* @param expectedCount expected count.
*/
private static void assertOutstandingBuffers(
S3ADataBlocks.ByteBufferBlockFactory factory,
int expectedCount) {
assertEquals("outstanding buffers in " + factory,
expectedCount, factory.getOutstandingBufferCount());
}
}