| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.fs.s3a; |
| |
| import java.io.IOException; |
| import java.io.UncheckedIOException; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.ExecutorService; |
| |
| import org.assertj.core.api.Assertions; |
| import org.junit.Test; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FSDataInputStream; |
| import org.apache.hadoop.fs.FSDataOutputStream; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.StreamCapabilities; |
| import org.apache.hadoop.fs.statistics.IOStatistics; |
| import org.apache.hadoop.fs.statistics.StoreStatisticNames; |
| import org.apache.hadoop.fs.statistics.IOStatisticsContext; |
| import org.apache.hadoop.fs.statistics.impl.IOStatisticsContextImpl; |
| import org.apache.hadoop.util.concurrent.HadoopExecutors; |
| import org.apache.hadoop.util.functional.CloseableTaskPoolSubmitter; |
| import org.apache.hadoop.util.functional.TaskPool; |
| |
| import static org.apache.hadoop.fs.contract.ContractTestUtils.assertCapabilities; |
| import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; |
| import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset; |
| import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter; |
| import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue; |
| import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BYTES; |
| import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_WRITE_BYTES; |
| import static org.apache.hadoop.fs.statistics.impl.IOStatisticsContextIntegration.enableIOStatisticsContext; |
| import static org.apache.hadoop.fs.statistics.impl.IOStatisticsContextIntegration.getCurrentIOStatisticsContext; |
| import static org.apache.hadoop.fs.statistics.impl.IOStatisticsContextIntegration.getThreadSpecificIOStatisticsContext; |
| import static org.apache.hadoop.fs.statistics.impl.IOStatisticsContextIntegration.setThreadIOStatisticsContext; |
| import static org.apache.hadoop.util.functional.RemoteIterators.foreach; |
| |
| /** |
| * Tests to verify the Thread-level IOStatistics. |
| */ |
| public class ITestS3AIOStatisticsContext extends AbstractS3ATestBase { |
| |
| private static final int SMALL_THREADS = 2; |
| private static final int BYTES_BIG = 100; |
| private static final int BYTES_SMALL = 50; |
| private static final String[] IOSTATISTICS_CONTEXT_CAPABILITY = |
| new String[] {StreamCapabilities.IOSTATISTICS_CONTEXT}; |
| private ExecutorService executor; |
| |
| @Override |
| protected Configuration createConfiguration() { |
| Configuration configuration = super.createConfiguration(); |
| enableIOStatisticsContext(); |
| return configuration; |
| } |
| |
| @Override |
| public void setup() throws Exception { |
| super.setup(); |
| executor = HadoopExecutors.newFixedThreadPool(SMALL_THREADS); |
| } |
| |
| @Override |
| public void teardown() throws Exception { |
| if (executor != null) { |
| executor.shutdown(); |
| } |
| super.teardown(); |
| } |
| |
| /** |
| * Verify that S3AInputStream aggregates per thread IOStats collection |
| * correctly. |
| */ |
| @Test |
| public void testS3AInputStreamIOStatisticsContext() |
| throws Exception { |
| S3AFileSystem fs = getFileSystem(); |
| Path path = path(getMethodName()); |
| byte[] data = dataset(256, 'a', 'z'); |
| byte[] readDataFirst = new byte[BYTES_BIG]; |
| byte[] readDataSecond = new byte[BYTES_SMALL]; |
| writeDataset(fs, path, data, data.length, 1024, true); |
| |
| CountDownLatch latch = new CountDownLatch(SMALL_THREADS); |
| |
| try { |
| |
| for (int i = 0; i < SMALL_THREADS; i++) { |
| executor.submit(() -> { |
| try { |
| // get the thread context and reset |
| IOStatisticsContext context = |
| getAndResetThreadStatisticsContext(); |
| try (FSDataInputStream in = fs.open(path)) { |
| // Assert the InputStream's stream capability to support |
| // IOStatisticsContext. |
| assertCapabilities(in, IOSTATISTICS_CONTEXT_CAPABILITY, null); |
| in.seek(50); |
| in.read(readDataFirst); |
| } |
| assertContextBytesRead(context, BYTES_BIG); |
| // Stream is closed for a thread. Re-open and do more operations. |
| try (FSDataInputStream in = fs.open(path)) { |
| in.seek(100); |
| in.read(readDataSecond); |
| } |
| assertContextBytesRead(context, BYTES_BIG + BYTES_SMALL); |
| |
| latch.countDown(); |
| } catch (Exception e) { |
| latch.countDown(); |
| setFutureException(e); |
| LOG.error("An error occurred while doing a task in the thread", e); |
| } catch (AssertionError ase) { |
| latch.countDown(); |
| setFutureAse(ase); |
| throw ase; |
| } |
| }); |
| } |
| // wait for tasks to finish. |
| latch.await(); |
| } finally { |
| executor.shutdown(); |
| } |
| |
| // Check if an Exception or ASE was caught while the test threads were running. |
| maybeReThrowFutureException(); |
| maybeReThrowFutureASE(); |
| |
| } |
| |
| /** |
| * get the thread context and reset. |
| * @return thread context |
| */ |
| private static IOStatisticsContext getAndResetThreadStatisticsContext() { |
| IOStatisticsContext context = |
| IOStatisticsContext.getCurrentIOStatisticsContext(); |
| context.reset(); |
| return context; |
| } |
| |
| /** |
| * Verify that S3ABlockOutputStream aggregates per thread IOStats collection |
| * correctly. |
| */ |
| @Test |
| public void testS3ABlockOutputStreamIOStatisticsContext() |
| throws Exception { |
| S3AFileSystem fs = getFileSystem(); |
| Path path = path(getMethodName()); |
| byte[] writeDataFirst = new byte[BYTES_BIG]; |
| byte[] writeDataSecond = new byte[BYTES_SMALL]; |
| |
| final ExecutorService executorService = |
| HadoopExecutors.newFixedThreadPool(SMALL_THREADS); |
| CountDownLatch latch = new CountDownLatch(SMALL_THREADS); |
| |
| try { |
| for (int i = 0; i < SMALL_THREADS; i++) { |
| executorService.submit(() -> { |
| try { |
| // get the thread context and reset |
| IOStatisticsContext context = |
| getAndResetThreadStatisticsContext(); |
| try (FSDataOutputStream out = fs.create(path)) { |
| // Assert the OutputStream's stream capability to support |
| // IOStatisticsContext. |
| assertCapabilities(out, IOSTATISTICS_CONTEXT_CAPABILITY, null); |
| out.write(writeDataFirst); |
| } |
| assertContextBytesWrite(context, BYTES_BIG); |
| |
| // Stream is closed for a thread. Re-open and do more operations. |
| try (FSDataOutputStream out = fs.create(path)) { |
| out.write(writeDataSecond); |
| } |
| assertContextBytesWrite(context, BYTES_BIG + BYTES_SMALL); |
| latch.countDown(); |
| } catch (Exception e) { |
| latch.countDown(); |
| setFutureException(e); |
| LOG.error("An error occurred while doing a task in the thread", e); |
| } catch (AssertionError ase) { |
| latch.countDown(); |
| setFutureAse(ase); |
| throw ase; |
| } |
| }); |
| } |
| // wait for tasks to finish. |
| latch.await(); |
| } finally { |
| executorService.shutdown(); |
| } |
| |
| // Check if an Excp or ASE was caught while the test threads were running. |
| maybeReThrowFutureException(); |
| maybeReThrowFutureASE(); |
| } |
| |
| /** |
| * Verify stats collection and aggregation for constructor thread, Junit |
| * thread and a worker thread. |
| */ |
| @Test |
| public void testThreadIOStatisticsForDifferentThreads() |
| throws IOException, InterruptedException { |
| S3AFileSystem fs = getFileSystem(); |
| Path path = path(getMethodName()); |
| byte[] data = new byte[BYTES_BIG]; |
| long threadIdForTest = Thread.currentThread().getId(); |
| IOStatisticsContext context = |
| getAndResetThreadStatisticsContext(); |
| Assertions.assertThat(((IOStatisticsContextImpl)context).getThreadID()) |
| .describedAs("Thread ID of %s", context) |
| .isEqualTo(threadIdForTest); |
| Assertions.assertThat(((IOStatisticsContextImpl)context).getID()) |
| .describedAs("ID of %s", context) |
| .isGreaterThan(0); |
| |
| // Write in the Junit thread. |
| try (FSDataOutputStream out = fs.create(path)) { |
| out.write(data); |
| } |
| |
| // Read in the Junit thread. |
| try (FSDataInputStream in = fs.open(path)) { |
| in.read(data); |
| } |
| |
| // Worker thread work and wait for it to finish. |
| TestWorkerThread workerThread = new TestWorkerThread(path, null); |
| long workerThreadID = workerThread.getId(); |
| workerThread.start(); |
| workerThread.join(); |
| |
| assertThreadStatisticsForThread(threadIdForTest, BYTES_BIG); |
| assertThreadStatisticsForThread(workerThreadID, BYTES_SMALL); |
| } |
| |
| /** |
| * Verify stats collection and aggregation for constructor thread, Junit |
| * thread and a worker thread. |
| */ |
| @Test |
| public void testThreadSharingIOStatistics() |
| throws IOException, InterruptedException { |
| S3AFileSystem fs = getFileSystem(); |
| Path path = path(getMethodName()); |
| byte[] data = new byte[BYTES_BIG]; |
| long threadIdForTest = Thread.currentThread().getId(); |
| IOStatisticsContext context = |
| getAndResetThreadStatisticsContext(); |
| |
| |
| // Write in the Junit thread. |
| try (FSDataOutputStream out = fs.create(path)) { |
| out.write(data); |
| } |
| |
| // Read in the Junit thread. |
| try (FSDataInputStream in = fs.open(path)) { |
| in.read(data); |
| } |
| |
| // Worker thread will share the same context. |
| TestWorkerThread workerThread = new TestWorkerThread(path, context); |
| long workerThreadID = workerThread.getId(); |
| workerThread.start(); |
| workerThread.join(); |
| |
| assertThreadStatisticsForThread(threadIdForTest, BYTES_BIG + BYTES_SMALL); |
| |
| } |
| |
| /** |
| * Test to verify if setting the current IOStatisticsContext removes the |
| * current context and creates a new instance of it. |
| */ |
| @Test |
| public void testSettingNullIOStatisticsContext() { |
| IOStatisticsContext ioStatisticsContextBefore = |
| getCurrentIOStatisticsContext(); |
| // Set the current IOStatisticsContext to null, which should remove the |
| // context and set a new one. |
| setThreadIOStatisticsContext(null); |
| // Get the context again after setting. |
| IOStatisticsContext ioStatisticsContextAfter = |
| getCurrentIOStatisticsContext(); |
| //Verify the context ID after setting to null is different than the previous |
| // one. |
| Assertions.assertThat(ioStatisticsContextBefore.getID()) |
| .describedAs("A new IOStaticsContext should be set after setting the " |
| + "current to null") |
| .isNotEqualTo(ioStatisticsContextAfter.getID()); |
| } |
| |
| /** |
| * Assert bytes written by the statistics context. |
| * |
| * @param context statistics context. |
| * @param bytes expected bytes. |
| */ |
| private void assertContextBytesWrite(IOStatisticsContext context, |
| int bytes) { |
| verifyStatisticCounterValue( |
| context.getIOStatistics(), |
| STREAM_WRITE_BYTES, |
| bytes); |
| } |
| |
| /** |
| * Assert bytes read by the statistics context. |
| * |
| * @param context statistics context. |
| * @param readBytes expected bytes. |
| */ |
| private void assertContextBytesRead(IOStatisticsContext context, |
| int readBytes) { |
| verifyStatisticCounterValue( |
| context.getIOStatistics(), |
| STREAM_READ_BYTES, |
| readBytes); |
| } |
| |
| /** |
| * Assert fixed bytes wrote and read for a particular thread ID. |
| * |
| * @param testThreadId thread ID. |
| * @param expectedBytesWrittenAndRead expected bytes. |
| */ |
| private void assertThreadStatisticsForThread(long testThreadId, |
| int expectedBytesWrittenAndRead) { |
| LOG.info("Thread ID to be asserted: {}", testThreadId); |
| IOStatisticsContext ioStatisticsContext = |
| getThreadSpecificIOStatisticsContext(testThreadId); |
| Assertions.assertThat(ioStatisticsContext) |
| .describedAs("IOStatisticsContext for %d", testThreadId) |
| .isNotNull(); |
| |
| |
| IOStatistics ioStatistics = ioStatisticsContext.snapshot(); |
| |
| |
| assertThatStatisticCounter(ioStatistics, |
| STREAM_WRITE_BYTES) |
| .describedAs("Bytes written are not as expected for thread : %s", |
| testThreadId) |
| .isEqualTo(expectedBytesWrittenAndRead); |
| |
| assertThatStatisticCounter(ioStatistics, |
| STREAM_READ_BYTES) |
| .describedAs("Bytes read are not as expected for thread : %s", |
| testThreadId) |
| .isEqualTo(expectedBytesWrittenAndRead); |
| } |
| |
| @Test |
| public void testListingStatisticsContext() throws Throwable { |
| describe("verify the list operations update on close()"); |
| |
| S3AFileSystem fs = getFileSystem(); |
| Path path = methodPath(); |
| fs.mkdirs(methodPath()); |
| |
| // after all setup, get the reset context |
| IOStatisticsContext context = |
| getAndResetThreadStatisticsContext(); |
| IOStatistics ioStatistics = context.getIOStatistics(); |
| |
| fs.listStatus(path); |
| verifyStatisticCounterValue(ioStatistics, |
| StoreStatisticNames.OBJECT_LIST_REQUEST, |
| 1); |
| |
| context.reset(); |
| foreach(fs.listStatusIterator(path), i -> {}); |
| verifyStatisticCounterValue(ioStatistics, |
| StoreStatisticNames.OBJECT_LIST_REQUEST, |
| 1); |
| |
| context.reset(); |
| foreach(fs.listLocatedStatus(path), i -> {}); |
| verifyStatisticCounterValue(ioStatistics, |
| StoreStatisticNames.OBJECT_LIST_REQUEST, |
| 1); |
| |
| context.reset(); |
| foreach(fs.listFiles(path, true), i -> {}); |
| verifyStatisticCounterValue(ioStatistics, |
| StoreStatisticNames.OBJECT_LIST_REQUEST, |
| 1); |
| } |
| |
| @Test |
| public void testListingThroughTaskPool() throws Throwable { |
| describe("verify the list operations are updated through taskpool"); |
| |
| S3AFileSystem fs = getFileSystem(); |
| Path path = methodPath(); |
| fs.mkdirs(methodPath()); |
| |
| // after all setup, get the reset context |
| IOStatisticsContext context = |
| getAndResetThreadStatisticsContext(); |
| IOStatistics ioStatistics = context.getIOStatistics(); |
| |
| CloseableTaskPoolSubmitter submitter = |
| new CloseableTaskPoolSubmitter(executor); |
| TaskPool.foreach(fs.listStatusIterator(path)) |
| .executeWith(submitter) |
| .run(i -> {}); |
| |
| verifyStatisticCounterValue(ioStatistics, |
| StoreStatisticNames.OBJECT_LIST_REQUEST, |
| 1); |
| |
| } |
| |
| /** |
| * Simulating doing some work in a separate thread. |
| * If constructed with an IOStatisticsContext then |
| * that context is switched to before performing the IO. |
| */ |
| private class TestWorkerThread extends Thread implements Runnable { |
| private final Path workerThreadPath; |
| |
| private final IOStatisticsContext ioStatisticsContext; |
| |
| /** |
| * create. |
| * @param workerThreadPath thread path. |
| * @param ioStatisticsContext optional statistics context * |
| */ |
| TestWorkerThread( |
| final Path workerThreadPath, |
| final IOStatisticsContext ioStatisticsContext) { |
| this.workerThreadPath = workerThreadPath; |
| this.ioStatisticsContext = ioStatisticsContext; |
| } |
| |
| @Override |
| public void run() { |
| S3AFileSystem fs = getFileSystem(); |
| byte[] data = new byte[BYTES_SMALL]; |
| |
| // maybe switch context |
| if (ioStatisticsContext != null) { |
| IOStatisticsContext.setThreadIOStatisticsContext(ioStatisticsContext); |
| } |
| |
| // Write in the worker thread. |
| try (FSDataOutputStream out = fs.create(workerThreadPath)) { |
| out.write(data); |
| } catch (IOException e) { |
| throw new UncheckedIOException("Failure while writing", e); |
| } |
| |
| //Read in the worker thread. |
| try (FSDataInputStream in = fs.open(workerThreadPath)) { |
| in.read(data); |
| } catch (IOException e) { |
| throw new UncheckedIOException("Failure while reading", e); |
| } |
| } |
| } |
| } |