| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.fs.s3a.scale; |
| |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.function.IntFunction; |
| |
| import com.amazonaws.event.ProgressEvent; |
| import com.amazonaws.event.ProgressEventType; |
| import com.amazonaws.event.ProgressListener; |
| import org.assertj.core.api.Assertions; |
| import org.junit.FixMethodOrder; |
| import org.junit.Test; |
| import org.junit.runners.MethodSorters; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FSDataInputStream; |
| import org.apache.hadoop.fs.FSDataOutputStream; |
| import org.apache.hadoop.fs.FileRange; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.contract.ContractTestUtils; |
| import org.apache.hadoop.fs.s3a.Constants; |
| import org.apache.hadoop.fs.s3a.S3AFileSystem; |
| import org.apache.hadoop.fs.s3a.S3ATestUtils; |
| import org.apache.hadoop.fs.s3a.Statistic; |
| import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics; |
| import org.apache.hadoop.fs.statistics.IOStatistics; |
| import org.apache.hadoop.util.Progressable; |
| |
| import static org.apache.hadoop.fs.contract.ContractTestUtils.*; |
| import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult; |
| import static org.apache.hadoop.fs.s3a.Constants.*; |
| import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; |
| import static org.apache.hadoop.fs.s3a.Statistic.STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING; |
| import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupCounterStatistic; |
| import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticGaugeValue; |
| import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.demandStringifyIOStatistics; |
| import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsSourceToString; |
| |
| /** |
| * Scale test which creates a huge file. |
| * |
| * <b>Important:</b> the order in which these tests execute is fixed to |
| * alphabetical order. Test cases are numbered {@code test_123_} to impose |
| * an ordering based on the numbers. |
| * |
| * Having this ordering allows the tests to assume that the huge file |
| * exists. Even so: they should all have a {@link #assumeHugeFileExists()} |
| * check at the start, in case an individual test is executed. |
| */ |
| @FixMethodOrder(MethodSorters.NAME_ASCENDING) |
| public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase { |
| private static final Logger LOG = LoggerFactory.getLogger( |
| AbstractSTestS3AHugeFiles.class); |
| public static final int DEFAULT_UPLOAD_BLOCKSIZE = 64 * _1KB; |
| |
| private Path scaleTestDir; |
| private Path hugefile; |
| private Path hugefileRenamed; |
| |
| private int uploadBlockSize = DEFAULT_UPLOAD_BLOCKSIZE; |
| private int partitionSize; |
| private long filesize; |
| |
| @Override |
| public void setup() throws Exception { |
| super.setup(); |
| scaleTestDir = new Path(getTestPath(), getTestSuiteName()); |
| hugefile = new Path(scaleTestDir, "hugefile"); |
| hugefileRenamed = new Path(scaleTestDir, "hugefileRenamed"); |
| filesize = getTestPropertyBytes(getConf(), KEY_HUGE_FILESIZE, |
| DEFAULT_HUGE_FILESIZE); |
| } |
| |
| /** |
| * Get the name of this test suite, which is used in path generation. |
| * Base implementation uses {@link #getBlockOutputBufferName()} for this. |
| * @return the name of the suite. |
| */ |
| public String getTestSuiteName() { |
| return getBlockOutputBufferName(); |
| } |
| |
| /** |
| * Note that this can get called before test setup. |
| * @return the configuration to use. |
| */ |
| @Override |
| protected Configuration createScaleConfiguration() { |
| Configuration conf = super.createScaleConfiguration(); |
| partitionSize = (int) getTestPropertyBytes(conf, |
| KEY_HUGE_PARTITION_SIZE, |
| DEFAULT_HUGE_PARTITION_SIZE); |
| assertTrue("Partition size too small: " + partitionSize, |
| partitionSize >= MULTIPART_MIN_SIZE); |
| conf.setLong(SOCKET_SEND_BUFFER, _1MB); |
| conf.setLong(SOCKET_RECV_BUFFER, _1MB); |
| conf.setLong(MIN_MULTIPART_THRESHOLD, partitionSize); |
| conf.setInt(MULTIPART_SIZE, partitionSize); |
| conf.set(USER_AGENT_PREFIX, "STestS3AHugeFileCreate"); |
| conf.set(FAST_UPLOAD_BUFFER, getBlockOutputBufferName()); |
| S3ATestUtils.disableFilesystemCaching(conf); |
| return conf; |
| } |
| |
| /** |
| * The name of the buffering mechanism to use. |
| * @return a buffering mechanism |
| */ |
| protected abstract String getBlockOutputBufferName(); |
| |
| @Test |
| public void test_010_CreateHugeFile() throws IOException { |
| |
| long filesizeMB = filesize / _1MB; |
| |
| // clean up from any previous attempts |
| deleteHugeFile(); |
| |
| Path fileToCreate = getPathOfFileToCreate(); |
| describe("Creating file %s of size %d MB" + |
| " with partition size %d buffered by %s", |
| fileToCreate, filesizeMB, partitionSize, getBlockOutputBufferName()); |
| |
| // now do a check of available upload time, with a pessimistic bandwidth |
| // (that of remote upload tests). If the test times out then not only is |
| // the test outcome lost, as the follow-on tests continue, they will |
| // overlap with the ongoing upload test, for much confusion. |
| int timeout = getTestTimeoutSeconds(); |
| // assume 1 MB/s upload bandwidth |
| int bandwidth = _1MB; |
| long uploadTime = filesize / bandwidth; |
| assertTrue(String.format("Timeout set in %s seconds is too low;" + |
| " estimating upload time of %d seconds at 1 MB/s." + |
| " Rerun tests with -D%s=%d", |
| timeout, uploadTime, KEY_TEST_TIMEOUT, uploadTime * 2), |
| uploadTime < timeout); |
| assertEquals("File size set in " + KEY_HUGE_FILESIZE + " = " + filesize |
| + " is not a multiple of " + uploadBlockSize, |
| 0, filesize % uploadBlockSize); |
| |
| byte[] data = new byte[uploadBlockSize]; |
| for (int i = 0; i < uploadBlockSize; i++) { |
| data[i] = (byte) (i % 256); |
| } |
| |
| long blocks = filesize / uploadBlockSize; |
| long blocksPerMB = _1MB / uploadBlockSize; |
| |
| // perform the upload. |
| // there's lots of logging here, so that a tail -f on the output log |
| // can give a view of what is happening. |
| S3AFileSystem fs = getFileSystem(); |
| IOStatistics iostats = fs.getIOStatistics(); |
| |
| String putRequests = Statistic.OBJECT_PUT_REQUESTS.getSymbol(); |
| String putBytes = Statistic.OBJECT_PUT_BYTES.getSymbol(); |
| Statistic putRequestsActive = Statistic.OBJECT_PUT_REQUESTS_ACTIVE; |
| Statistic putBytesPending = Statistic.OBJECT_PUT_BYTES_PENDING; |
| |
| ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer(); |
| BlockOutputStreamStatistics streamStatistics; |
| long blocksPer10MB = blocksPerMB * 10; |
| ProgressCallback progress = new ProgressCallback(timer); |
| try (FSDataOutputStream out = fs.create(fileToCreate, |
| true, |
| uploadBlockSize, |
| progress)) { |
| try { |
| streamStatistics = getOutputStreamStatistics(out); |
| } catch (ClassCastException e) { |
| LOG.info("Wrapped output stream is not block stream: {}", |
| out.getWrappedStream()); |
| streamStatistics = null; |
| } |
| |
| for (long block = 1; block <= blocks; block++) { |
| out.write(data); |
| long written = block * uploadBlockSize; |
| // every 10 MB and on file upload @ 100%, print some stats |
| if (block % blocksPer10MB == 0 || written == filesize) { |
| long percentage = written * 100 / filesize; |
| double elapsedTime = timer.elapsedTime() / 1.0e9; |
| double writtenMB = 1.0 * written / _1MB; |
| LOG.info(String.format("[%02d%%] Buffered %.2f MB out of %d MB;" + |
| " PUT %d bytes (%d pending) in %d operations (%d active);" + |
| " elapsedTime=%.2fs; write to buffer bandwidth=%.2f MB/s", |
| percentage, |
| writtenMB, |
| filesizeMB, |
| iostats.counters().get(putBytes), |
| gaugeValue(putBytesPending), |
| iostats.counters().get(putRequests), |
| gaugeValue(putRequestsActive), |
| elapsedTime, |
| writtenMB / elapsedTime)); |
| } |
| } |
| // now close the file |
| LOG.info("Closing stream {}", out); |
| LOG.info("Statistics : {}", streamStatistics); |
| ContractTestUtils.NanoTimer closeTimer |
| = new ContractTestUtils.NanoTimer(); |
| out.close(); |
| closeTimer.end("time to close() output stream"); |
| } |
| |
| timer.end("time to write %d MB in blocks of %d", |
| filesizeMB, uploadBlockSize); |
| logFSState(); |
| bandwidth(timer, filesize); |
| LOG.info("Statistics after stream closed: {}", streamStatistics); |
| |
| LOG.info("IOStatistics after upload: {}", |
| demandStringifyIOStatistics(iostats)); |
| long putRequestCount = lookupCounterStatistic(iostats, putRequests); |
| long putByteCount = lookupCounterStatistic(iostats, putBytes); |
| Assertions.assertThat(putRequestCount) |
| .describedAs("Put request count from filesystem stats %s", |
| iostats) |
| .isGreaterThan(0); |
| Assertions.assertThat(putByteCount) |
| .describedAs("%s count from filesystem stats %s", |
| putBytes, iostats) |
| .isGreaterThan(0); |
| LOG.info("PUT {} bytes in {} operations; {} MB/operation", |
| putByteCount, putRequestCount, |
| putByteCount / (putRequestCount * _1MB)); |
| LOG.info("Time per PUT {} nS", |
| toHuman(timer.nanosPerOperation(putRequestCount))); |
| verifyStatisticGaugeValue(iostats, putRequestsActive.getSymbol(), 0); |
| verifyStatisticGaugeValue(iostats, |
| STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING.getSymbol(), 0); |
| progress.verifyNoFailures( |
| "Put file " + fileToCreate + " of size " + filesize); |
| if (streamStatistics != null) { |
| assertEquals("actively allocated blocks in " + streamStatistics, |
| 0, streamStatistics.getBlocksActivelyAllocated()); |
| } |
| } |
| |
| /** |
| * Get the path of the file which is to created. This is normally |
| * {@link #hugefile} |
| * @return the path to use when creating the file. |
| */ |
| protected Path getPathOfFileToCreate() { |
| return this.hugefile; |
| } |
| |
| protected Path getScaleTestDir() { |
| return scaleTestDir; |
| } |
| |
| protected Path getHugefile() { |
| return hugefile; |
| } |
| |
| public void setHugefile(Path hugefile) { |
| this.hugefile = hugefile; |
| } |
| |
| protected Path getHugefileRenamed() { |
| return hugefileRenamed; |
| } |
| |
| protected int getUploadBlockSize() { |
| return uploadBlockSize; |
| } |
| |
| protected int getPartitionSize() { |
| return partitionSize; |
| } |
| |
| /** |
| * Progress callback from AWS. Likely to come in on a different thread. |
| */ |
| private final class ProgressCallback implements Progressable, |
| ProgressListener { |
| private AtomicLong bytesTransferred = new AtomicLong(0); |
| private AtomicInteger failures = new AtomicInteger(0); |
| private final ContractTestUtils.NanoTimer timer; |
| |
| private ProgressCallback(NanoTimer timer) { |
| this.timer = timer; |
| } |
| |
| @Override |
| public void progress() { |
| } |
| |
| @Override |
| public void progressChanged(ProgressEvent progressEvent) { |
| ProgressEventType eventType = progressEvent.getEventType(); |
| if (eventType.isByteCountEvent()) { |
| bytesTransferred.addAndGet(progressEvent.getBytesTransferred()); |
| } |
| switch (eventType) { |
| case TRANSFER_PART_FAILED_EVENT: |
| // failure |
| failures.incrementAndGet(); |
| LOG.warn("Transfer failure"); |
| break; |
| case TRANSFER_PART_COMPLETED_EVENT: |
| // completion |
| long elapsedTime = timer.elapsedTime(); |
| double elapsedTimeS = elapsedTime / 1.0e9; |
| long written = bytesTransferred.get(); |
| long writtenMB = written / _1MB; |
| LOG.info(String.format( |
| "Event %s; total uploaded=%d MB in %.1fs;" + |
| " effective upload bandwidth = %.2f MB/s", |
| progressEvent, |
| writtenMB, elapsedTimeS, writtenMB / elapsedTimeS)); |
| break; |
| default: |
| if (eventType.isByteCountEvent()) { |
| LOG.debug("Event {}", progressEvent); |
| } else { |
| LOG.info("Event {}", progressEvent); |
| } |
| break; |
| } |
| } |
| |
| @Override |
| public String toString() { |
| String sb = "ProgressCallback{" |
| + "bytesTransferred=" + bytesTransferred + |
| ", failures=" + failures + |
| '}'; |
| return sb; |
| } |
| |
| private void verifyNoFailures(String operation) { |
| assertEquals("Failures in " + operation + ": " + this, 0, failures.get()); |
| } |
| } |
| |
| /** |
| * Assume that the huge file exists; skip the test if it does not. |
| * @throws IOException IO failure |
| */ |
| void assumeHugeFileExists() throws IOException { |
| assumeFileExists(this.hugefile); |
| } |
| |
| /** |
| * Assume a specific file exists. |
| * @param file file to look for |
| * @throws IOException IO problem |
| */ |
| private void assumeFileExists(Path file) throws IOException { |
| S3AFileSystem fs = getFileSystem(); |
| ContractTestUtils.assertPathExists(fs, "huge file not created", |
| file); |
| FileStatus status = fs.getFileStatus(file); |
| ContractTestUtils.assertIsFile(file, status); |
| assertTrue("File " + file + " is empty", status.getLen() > 0); |
| } |
| |
| private void logFSState() { |
| LOG.info("File System state after operation:\n{}", getFileSystem()); |
| } |
| |
| /** |
| * This is the set of actions to perform when verifying the file actually |
| * was created. With the S3A committer, the file doesn't come into |
| * existence; a different set of assertions must be checked. |
| */ |
| @Test |
| public void test_030_postCreationAssertions() throws Throwable { |
| S3AFileSystem fs = getFileSystem(); |
| ContractTestUtils.assertPathExists(fs, "Huge file", hugefile); |
| FileStatus status = fs.getFileStatus(hugefile); |
| ContractTestUtils.assertIsFile(hugefile, status); |
| assertEquals("File size in " + status, filesize, status.getLen()); |
| } |
| |
| /** |
| * Read in the file using Positioned read(offset) calls. |
| * @throws Throwable failure |
| */ |
| @Test |
| public void test_040_PositionedReadHugeFile() throws Throwable { |
| assumeHugeFileExists(); |
| final String encryption = getConf().getTrimmed( |
| Constants.S3_ENCRYPTION_ALGORITHM); |
| boolean encrypted = encryption != null; |
| if (encrypted) { |
| LOG.info("File is encrypted with algorithm {}", encryption); |
| } |
| String filetype = encrypted ? "encrypted file" : "file"; |
| describe("Positioned reads of %s %s", filetype, hugefile); |
| S3AFileSystem fs = getFileSystem(); |
| FileStatus status = fs.getFileStatus(hugefile); |
| long size = status.getLen(); |
| int ops = 0; |
| final int bufferSize = 8192; |
| byte[] buffer = new byte[bufferSize]; |
| long eof = size - 1; |
| |
| ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer(); |
| ContractTestUtils.NanoTimer readAtByte0, readAtByte0Again, readAtEOF; |
| try (FSDataInputStream in = fs.open(hugefile, uploadBlockSize)) { |
| readAtByte0 = new ContractTestUtils.NanoTimer(); |
| in.readFully(0, buffer); |
| readAtByte0.end("time to read data at start of file"); |
| ops++; |
| |
| readAtEOF = new ContractTestUtils.NanoTimer(); |
| in.readFully(eof - bufferSize, buffer); |
| readAtEOF.end("time to read data at end of file"); |
| ops++; |
| |
| readAtByte0Again = new ContractTestUtils.NanoTimer(); |
| in.readFully(0, buffer); |
| readAtByte0Again.end("time to read data at start of file again"); |
| ops++; |
| LOG.info("Final stream state: {}", in); |
| } |
| long mb = Math.max(size / _1MB, 1); |
| |
| logFSState(); |
| timer.end("time to perform positioned reads of %s of %d MB ", |
| filetype, mb); |
| LOG.info("Time per positioned read = {} nS", |
| toHuman(timer.nanosPerOperation(ops))); |
| } |
| |
| @Test |
| public void test_045_vectoredIOHugeFile() throws Throwable { |
| assumeHugeFileExists(); |
| List<FileRange> rangeList = new ArrayList<>(); |
| rangeList.add(FileRange.createFileRange(5856368, 116770)); |
| rangeList.add(FileRange.createFileRange(3520861, 116770)); |
| rangeList.add(FileRange.createFileRange(8191913, 116770)); |
| rangeList.add(FileRange.createFileRange(1520861, 116770)); |
| rangeList.add(FileRange.createFileRange(2520861, 116770)); |
| rangeList.add(FileRange.createFileRange(9191913, 116770)); |
| rangeList.add(FileRange.createFileRange(2820861, 156770)); |
| IntFunction<ByteBuffer> allocate = ByteBuffer::allocate; |
| FileSystem fs = getFileSystem(); |
| CompletableFuture<FSDataInputStream> builder = |
| fs.openFile(hugefile).build(); |
| try (FSDataInputStream in = builder.get()) { |
| in.readVectored(rangeList, allocate); |
| byte[] readFullRes = new byte[(int)filesize]; |
| in.readFully(0, readFullRes); |
| // Comparing vectored read results with read fully. |
| validateVectoredReadResult(rangeList, readFullRes); |
| } |
| } |
| |
| /** |
| * Read in the entire file using read() calls. |
| * @throws Throwable failure |
| */ |
| @Test |
| public void test_050_readHugeFile() throws Throwable { |
| assumeHugeFileExists(); |
| describe("Reading %s", hugefile); |
| S3AFileSystem fs = getFileSystem(); |
| FileStatus status = fs.getFileStatus(hugefile); |
| long size = status.getLen(); |
| long blocks = size / uploadBlockSize; |
| byte[] data = new byte[uploadBlockSize]; |
| |
| ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer(); |
| try (FSDataInputStream in = fs.open(hugefile, uploadBlockSize)) { |
| for (long block = 0; block < blocks; block++) { |
| in.readFully(data); |
| } |
| LOG.info("Final stream state: {}", in); |
| } |
| |
| long mb = Math.max(size / _1MB, 1); |
| timer.end("time to read file of %d MB ", mb); |
| LOG.info("Time per MB to read = {} nS", |
| toHuman(timer.nanosPerOperation(mb))); |
| bandwidth(timer, size); |
| logFSState(); |
| } |
| |
| /** |
| * Test to verify source file encryption key. |
| * @throws IOException |
| */ |
| @Test |
| public void test_090_verifyRenameSourceEncryption() throws IOException { |
| if(isEncrypted(getFileSystem())) { |
| assertEncrypted(getHugefile()); |
| } |
| } |
| |
| protected void assertEncrypted(Path hugeFile) throws IOException { |
| //Concrete classes will have implementation. |
| } |
| |
| /** |
| * Checks if the encryption is enabled for the file system. |
| * @param fileSystem |
| * @return |
| */ |
| protected boolean isEncrypted(S3AFileSystem fileSystem) { |
| return false; |
| } |
| |
| @Test |
| public void test_100_renameHugeFile() throws Throwable { |
| assumeHugeFileExists(); |
| describe("renaming %s to %s", hugefile, hugefileRenamed); |
| S3AFileSystem fs = getFileSystem(); |
| FileStatus status = fs.getFileStatus(hugefile); |
| long size = status.getLen(); |
| fs.delete(hugefileRenamed, false); |
| ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer(); |
| fs.rename(hugefile, hugefileRenamed); |
| long mb = Math.max(size / _1MB, 1); |
| timer.end("time to rename file of %d MB", mb); |
| LOG.info("Time per MB to rename = {} nS", |
| toHuman(timer.nanosPerOperation(mb))); |
| bandwidth(timer, size); |
| logFSState(); |
| FileStatus destFileStatus = fs.getFileStatus(hugefileRenamed); |
| assertEquals(size, destFileStatus.getLen()); |
| |
| // rename back |
| ContractTestUtils.NanoTimer timer2 = new ContractTestUtils.NanoTimer(); |
| fs.rename(hugefileRenamed, hugefile); |
| timer2.end("Renaming back"); |
| LOG.info("Time per MB to rename = {} nS", |
| toHuman(timer2.nanosPerOperation(mb))); |
| bandwidth(timer2, size); |
| } |
| |
| /** |
| * Test to verify target file encryption key. |
| * @throws IOException |
| */ |
| @Test |
| public void test_110_verifyRenameDestEncryption() throws IOException { |
| if(isEncrypted(getFileSystem())) { |
| /** |
| * Using hugeFile again as hugeFileRenamed is renamed back |
| * to hugeFile. |
| */ |
| assertEncrypted(hugefile); |
| } |
| } |
| /** |
| * Cleanup: delete the files. |
| */ |
| @Test |
| public void test_800_DeleteHugeFiles() throws IOException { |
| try { |
| deleteHugeFile(); |
| delete(hugefileRenamed, false); |
| } finally { |
| ContractTestUtils.rm(getFileSystem(), getTestPath(), true, false); |
| } |
| } |
| |
| /** |
| * After all the work, dump the statistics. |
| */ |
| @Test |
| public void test_900_dumpStats() { |
| LOG.info("Statistics\n{}", ioStatisticsSourceToString(getFileSystem())); |
| } |
| |
| protected void deleteHugeFile() throws IOException { |
| delete(hugefile, false); |
| } |
| |
| /** |
| * Delete any file, time how long it took. |
| * @param path path to delete |
| * @param recursive recursive flag |
| */ |
| protected void delete(Path path, boolean recursive) throws IOException { |
| describe("Deleting %s", path); |
| ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer(); |
| getFileSystem().delete(path, recursive); |
| timer.end("time to delete %s", path); |
| } |
| |
| } |