blob: 87f676c3c8492d29ce382ac8298d36e7208a0594 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.io.IOUtils;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import static org.apache.hadoop.fs.s3a.Constants.*;
/**
* Tests small file upload functionality for
* {@link S3ABlockOutputStream} with the blocks buffered in byte arrays.
*
* File sizes are kept small to reduce test duration on slow connections;
* multipart tests are kept in scale tests.
*/
public class ITestS3ABlockOutputArray extends AbstractS3ATestBase {
private static final int BLOCK_SIZE = 256 * 1024;
private static byte[] dataset;
@BeforeClass
public static void setupDataset() {
dataset = ContractTestUtils.dataset(BLOCK_SIZE, 0, 256);
}
@Override
protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
S3ATestUtils.disableFilesystemCaching(conf);
conf.setLong(MIN_MULTIPART_THRESHOLD, MULTIPART_MIN_SIZE);
conf.setInt(MULTIPART_SIZE, MULTIPART_MIN_SIZE);
conf.setBoolean(Constants.FAST_UPLOAD, true);
conf.set(FAST_UPLOAD_BUFFER, getBlockOutputBufferName());
return conf;
}
protected String getBlockOutputBufferName() {
return FAST_UPLOAD_BUFFER_ARRAY;
}
@Test
public void testZeroByteUpload() throws IOException {
verifyUpload("0", 0);
}
@Test
public void testRegularUpload() throws IOException {
verifyUpload("regular", 1024);
}
@Test(expected = IOException.class)
public void testWriteAfterStreamClose() throws Throwable {
Path dest = path("testWriteAfterStreamClose");
describe(" testWriteAfterStreamClose");
FSDataOutputStream stream = getFileSystem().create(dest, true);
byte[] data = ContractTestUtils.dataset(16, 'a', 26);
try {
stream.write(data);
stream.close();
stream.write(data);
} finally {
IOUtils.closeStream(stream);
}
}
@Test
public void testBlocksClosed() throws Throwable {
Path dest = path("testBlocksClosed");
describe(" testBlocksClosed");
FSDataOutputStream stream = getFileSystem().create(dest, true);
S3AInstrumentation.OutputStreamStatistics statistics
= S3ATestUtils.getOutputStreamStatistics(stream);
byte[] data = ContractTestUtils.dataset(16, 'a', 26);
stream.write(data);
LOG.info("closing output stream");
stream.close();
assertEquals("total allocated blocks in " + statistics,
1, statistics.blocksAllocated());
assertEquals("actively allocated blocks in " + statistics,
0, statistics.blocksActivelyAllocated());
LOG.info("end of test case");
}
private void verifyUpload(String name, int fileSize) throws IOException {
Path dest = path(name);
describe(name + " upload to " + dest);
ContractTestUtils.createAndVerifyFile(
getFileSystem(),
dest,
fileSize);
}
/**
* Create a factory for used in mark/reset tests.
* @param fileSystem source FS
* @return the factory
*/
protected S3ADataBlocks.BlockFactory createFactory(S3AFileSystem fileSystem) {
return new S3ADataBlocks.ArrayBlockFactory(fileSystem);
}
private void markAndResetDatablock(S3ADataBlocks.BlockFactory factory)
throws Exception {
S3AInstrumentation instrumentation =
new S3AInstrumentation(new URI("s3a://example"));
S3AInstrumentation.OutputStreamStatistics outstats
= instrumentation.newOutputStreamStatistics(null);
S3ADataBlocks.DataBlock block = factory.create(1, BLOCK_SIZE, outstats);
block.write(dataset, 0, dataset.length);
S3ADataBlocks.BlockUploadData uploadData = block.startUpload();
InputStream stream = uploadData.getUploadStream();
assertNotNull(stream);
assertTrue("Mark not supported in " + stream, stream.markSupported());
assertEquals(0, stream.read());
stream.mark(BLOCK_SIZE);
// read a lot
long l = 0;
while (stream.read() != -1) {
// do nothing
l++;
}
stream.reset();
assertEquals(1, stream.read());
}
@Test
public void testMarkReset() throws Throwable {
markAndResetDatablock(createFactory(getFileSystem()));
}
}