blob: c413f83b20336ab697e360c171e0479916d59a9e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.aliyun.oss;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.fs.FileStatus;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Random;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
* Tests basic functionality for AliyunOSSInputStream, including seeking and
* reading files.
*/
public class TestAliyunOSSInputStream {
private FileSystem fs;
private static final Logger LOG =
LoggerFactory.getLogger(TestAliyunOSSInputStream.class);
private static String testRootPath =
AliyunOSSTestUtils.generateUniqueTestPath();
@Rule
public Timeout testTimeout = new Timeout(30 * 60 * 1000);
@Before
public void setUp() throws Exception {
Configuration conf = new Configuration();
fs = AliyunOSSTestUtils.createTestFileSystem(conf);
}
@After
public void tearDown() throws Exception {
if (fs != null) {
fs.delete(new Path(testRootPath), true);
}
}
private Path setPath(String path) {
if (path.startsWith("/")) {
return new Path(testRootPath + path);
} else {
return new Path(testRootPath + "/" + path);
}
}
@Test
public void testSeekFile() throws Exception {
Path smallSeekFile = setPath("/test/smallSeekFile.txt");
long size = 5 * 1024 * 1024;
ContractTestUtils.generateTestFile(this.fs, smallSeekFile, size, 256, 255);
LOG.info("5MB file created: smallSeekFile.txt");
FSDataInputStream instream = this.fs.open(smallSeekFile);
int seekTimes = 5;
LOG.info("multiple fold position seeking test...:");
for (int i = 0; i < seekTimes; i++) {
long pos = size / (seekTimes - i) - 1;
LOG.info("begin seeking for pos: " + pos);
instream.seek(pos);
assertTrue("expected position at:" + pos + ", but got:"
+ instream.getPos(), instream.getPos() == pos);
LOG.info("completed seeking at pos: " + instream.getPos());
}
LOG.info("random position seeking test...:");
Random rand = new Random();
for (int i = 0; i < seekTimes; i++) {
long pos = Math.abs(rand.nextLong()) % size;
LOG.info("begin seeking for pos: " + pos);
instream.seek(pos);
assertTrue("expected position at:" + pos + ", but got:"
+ instream.getPos(), instream.getPos() == pos);
LOG.info("completed seeking at pos: " + instream.getPos());
}
IOUtils.closeStream(instream);
}
@Test
public void testSequentialAndRandomRead() throws Exception {
Path smallSeekFile = setPath("/test/smallSeekFile.txt");
long size = 5 * 1024 * 1024;
ContractTestUtils.generateTestFile(this.fs, smallSeekFile, size, 256, 255);
LOG.info("5MB file created: smallSeekFile.txt");
FSDataInputStream fsDataInputStream = this.fs.open(smallSeekFile);
AliyunOSSInputStream in =
(AliyunOSSInputStream)fsDataInputStream.getWrappedStream();
assertTrue("expected position at:" + 0 + ", but got:"
+ fsDataInputStream.getPos(), fsDataInputStream.getPos() == 0);
assertTrue("expected position at:"
+ Constants.MULTIPART_DOWNLOAD_SIZE_DEFAULT + ", but got:"
+ in.getExpectNextPos(),
in.getExpectNextPos() == Constants.MULTIPART_DOWNLOAD_SIZE_DEFAULT);
fsDataInputStream.seek(4 * 1024 * 1024);
assertTrue("expected position at:" + 4 * 1024 * 1024
+ Constants.MULTIPART_DOWNLOAD_SIZE_DEFAULT + ", but got:"
+ in.getExpectNextPos(),
in.getExpectNextPos() == 4 * 1024 * 1024
+ Constants.MULTIPART_DOWNLOAD_SIZE_DEFAULT);
IOUtils.closeStream(fsDataInputStream);
}
@Test
public void testOSSFileReaderTask() throws Exception {
Path smallSeekFile = setPath("/test/smallSeekFileOSSFileReader.txt");
long size = 5 * 1024 * 1024;
ContractTestUtils.generateTestFile(this.fs, smallSeekFile, size, 256, 255);
LOG.info("5MB file created: smallSeekFileOSSFileReader.txt");
ReadBuffer readBuffer = new ReadBuffer(12, 24);
AliyunOSSFileReaderTask task = new AliyunOSSFileReaderTask("1",
((AliyunOSSFileSystem)this.fs).getStore(), readBuffer);
//NullPointerException, fail
task.run();
assertEquals(readBuffer.getStatus(), ReadBuffer.STATUS.ERROR);
//OK
task = new AliyunOSSFileReaderTask(
"test/test/smallSeekFileOSSFileReader.txt",
((AliyunOSSFileSystem)this.fs).getStore(), readBuffer);
task.run();
assertEquals(readBuffer.getStatus(), ReadBuffer.STATUS.SUCCESS);
}
@Test
public void testReadFile() throws Exception {
final int bufLen = 256;
final int sizeFlag = 5;
String filename = "readTestFile_" + sizeFlag + ".txt";
Path readTestFile = setPath("/test/" + filename);
long size = sizeFlag * 1024 * 1024;
ContractTestUtils.generateTestFile(this.fs, readTestFile, size, 256, 255);
LOG.info(sizeFlag + "MB file created: /test/" + filename);
FSDataInputStream instream = this.fs.open(readTestFile);
byte[] buf = new byte[bufLen];
long bytesRead = 0;
while (bytesRead < size) {
int bytes;
if (size - bytesRead < bufLen) {
int remaining = (int)(size - bytesRead);
bytes = instream.read(buf, 0, remaining);
} else {
bytes = instream.read(buf, 0, bufLen);
}
bytesRead += bytes;
if (bytesRead % (1024 * 1024) == 0) {
int available = instream.available();
int remaining = (int)(size - bytesRead);
assertTrue("expected remaining:" + remaining + ", but got:" + available,
remaining == available);
LOG.info("Bytes read: " + Math.round((double)bytesRead / (1024 * 1024))
+ " MB");
}
}
assertTrue(instream.available() == 0);
IOUtils.closeStream(instream);
}
@Test
public void testDirectoryModifiedTime() throws Exception {
Path emptyDirPath = setPath("/test/emptyDirectory");
fs.mkdirs(emptyDirPath);
FileStatus dirFileStatus = fs.getFileStatus(emptyDirPath);
assertTrue("expected the empty dir is new",
dirFileStatus.getModificationTime() > 0L);
}
}