blob: 2ba3fd7a65cde7f96b907287fdacc9fe1396f461 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.io.IOUtils;
import org.junit.Test;
import java.io.IOException;
import static org.apache.hadoop.fs.s3a.Statistic.STREAM_SEEK_BYTES_READ;
/**
* Integration test for calling
* {@link org.apache.hadoop.fs.CanUnbuffer#unbuffer} on {@link S3AInputStream}.
* Validates that the object has been closed using the
* {@link S3AInputStream#isObjectStreamOpen()} method. Unlike the
* {@link org.apache.hadoop.fs.contract.s3a.ITestS3AContractUnbuffer} tests,
* these tests leverage the fact that isObjectStreamOpen exposes if the
* underlying stream has been closed or not.
*/
public class ITestS3AUnbuffer extends AbstractS3ATestBase {
private Path dest;
@Override
public void setup() throws Exception {
super.setup();
dest = path("ITestS3AUnbuffer");
describe("ITestS3AUnbuffer");
byte[] data = ContractTestUtils.dataset(16, 'a', 26);
ContractTestUtils.writeDataset(getFileSystem(), dest, data, data.length,
16, true);
}
@Test
public void testUnbuffer() throws IOException {
describe("testUnbuffer");
// Open file, read half the data, and then call unbuffer
try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
assertTrue(inputStream.getWrappedStream() instanceof S3AInputStream);
readAndAssertBytesRead(inputStream, 8);
assertTrue(isObjectStreamOpen(inputStream));
inputStream.unbuffer();
// Check the the wrapped stream is closed
assertFalse(isObjectStreamOpen(inputStream));
}
}
/**
* Test that calling {@link S3AInputStream#unbuffer()} merges a stream's
* {@link org.apache.hadoop.fs.s3a.S3AInstrumentation.InputStreamStatistics}
* into the {@link S3AFileSystem}'s {@link S3AInstrumentation} instance.
*/
@Test
public void testUnbufferStreamStatistics() throws IOException {
describe("testUnbufferStreamStatistics");
// Validate bytesRead is updated correctly
S3ATestUtils.MetricDiff bytesRead = new S3ATestUtils.MetricDiff(
getFileSystem(), STREAM_SEEK_BYTES_READ);
// Open file, read half the data, and then call unbuffer
FSDataInputStream inputStream = null;
try {
inputStream = getFileSystem().open(dest);
readAndAssertBytesRead(inputStream, 8);
inputStream.unbuffer();
// Validate that calling unbuffer updates the input stream statistics
bytesRead.assertDiffEquals(8);
// Validate that calling unbuffer twice in a row updates the statistics
// correctly
readAndAssertBytesRead(inputStream, 4);
inputStream.unbuffer();
bytesRead.assertDiffEquals(12);
} finally {
IOUtils.closeStream(inputStream);
}
// Validate that closing the file does not further change the statistics
bytesRead.assertDiffEquals(12);
// Validate that the input stream stats are correct when the file is closed
assertEquals("S3AInputStream statistics were not updated properly", 12,
((S3AInputStream) inputStream.getWrappedStream())
.getS3AStreamStatistics().bytesRead);
}
private boolean isObjectStreamOpen(FSDataInputStream inputStream) {
return ((S3AInputStream) inputStream.getWrappedStream()).isObjectStreamOpen();
}
/**
* Read the specified number of bytes from the given
* {@link FSDataInputStream} and assert that
* {@link FSDataInputStream#read(byte[])} read the specified number of bytes.
*/
private static void readAndAssertBytesRead(FSDataInputStream inputStream,
int bytesToRead) throws IOException {
assertEquals("S3AInputStream#read did not read the correct number of " +
"bytes", bytesToRead,
inputStream.read(new byte[bytesToRead]));
}
}