| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.fs.s3a; |
| |
| import javax.annotation.Nullable; |
| |
| import org.apache.hadoop.classification.VisibleForTesting; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.hadoop.classification.InterfaceAudience; |
| import org.apache.hadoop.classification.InterfaceStability; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics; |
| import org.apache.hadoop.fs.s3a.statistics.ChangeTrackerStatistics; |
| import org.apache.hadoop.fs.s3a.statistics.CommitterStatistics; |
| import org.apache.hadoop.fs.s3a.statistics.CountersAndGauges; |
| import org.apache.hadoop.fs.s3a.statistics.DelegationTokenStatistics; |
| import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; |
| import org.apache.hadoop.fs.s3a.statistics.StatisticTypeEnum; |
| import org.apache.hadoop.fs.s3a.statistics.impl.AbstractS3AStatisticsSource; |
| import org.apache.hadoop.fs.s3a.statistics.impl.CountingChangeTracker; |
| import org.apache.hadoop.fs.s3a.statistics.impl.ForwardingIOStatisticsStore; |
| import org.apache.hadoop.fs.statistics.DurationTrackerFactory; |
| import org.apache.hadoop.fs.statistics.IOStatisticsLogging; |
| import org.apache.hadoop.fs.statistics.IOStatisticsSource; |
| import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot; |
| import org.apache.hadoop.fs.statistics.StoreStatisticNames; |
| import org.apache.hadoop.fs.statistics.StreamStatisticNames; |
| import org.apache.hadoop.fs.statistics.DurationTracker; |
| import org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding; |
| import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore; |
| import org.apache.hadoop.fs.statistics.impl.IOStatisticsStoreBuilder; |
| import org.apache.hadoop.metrics2.AbstractMetric; |
| import org.apache.hadoop.metrics2.MetricStringBuilder; |
| import org.apache.hadoop.metrics2.MetricsCollector; |
| import org.apache.hadoop.metrics2.MetricsInfo; |
| import org.apache.hadoop.metrics2.MetricsRecordBuilder; |
| import org.apache.hadoop.metrics2.MetricsSource; |
| import org.apache.hadoop.metrics2.MetricsSystem; |
| import org.apache.hadoop.metrics2.MetricsTag; |
| import org.apache.hadoop.metrics2.impl.MetricsSystemImpl; |
| import org.apache.hadoop.metrics2.lib.MetricsRegistry; |
| import org.apache.hadoop.metrics2.lib.MutableCounterLong; |
| import org.apache.hadoop.metrics2.lib.MutableGaugeLong; |
| import org.apache.hadoop.metrics2.lib.MutableMetric; |
| import org.apache.hadoop.metrics2.lib.MutableQuantiles; |
| |
| import java.io.Closeable; |
| import java.net.URI; |
| import java.time.Duration; |
| import java.util.EnumSet; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.UUID; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicLong; |
| |
| import static org.apache.hadoop.fs.s3a.Constants.STREAM_READ_GAUGE_INPUT_POLICY; |
| import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.demandStringifyIOStatistics; |
| import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.snapshotIOStatistics; |
| import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_EXECUTOR_ACQUIRED; |
| import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_HTTP_GET_REQUEST; |
| import static org.apache.hadoop.fs.statistics.StoreStatisticNames.SUFFIX_FAILURES; |
| import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_UNBUFFERED; |
| import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.iostatisticsStore; |
| import static org.apache.hadoop.fs.s3a.Statistic.*; |
| |
| /** |
| * Instrumentation of S3A. |
| * <p> |
| * History |
| * </p> |
| * <ol> |
| * <li> |
| * HADOOP-13028. Initial implementation. |
| * Derived from the {@code AzureFileSystemInstrumentation}. |
| * </li> |
| * <li> |
| * Broadly (and directly) used in S3A. |
| * The use of direct references causes "problems" in mocking tests. |
| * </li> |
| * <li> |
| * HADOOP-16830. IOStatistics. Move to an interface and implementation |
| * design for the different inner classes. |
| * </li> |
| * </ol> |
| * <p> |
| * Counters and metrics are generally addressed in code by their name or |
| * {@link Statistic} key. There <i>may</i> be some Statistics which do |
| * not have an entry here. To avoid attempts to access such counters failing, |
| * the operations to increment/query metric values are designed to handle |
| * lookup failures. |
| * </p> |
| * <p> |
| * S3AFileSystem StorageStatistics are dynamically derived from |
| * the IOStatistics. |
| * </p> |
| * <p> |
| * The toString() operation includes the entire IOStatistics when this |
| * class's log is set to DEBUG. This keeps the logs somewhat manageable |
| * on normal runs, but allows for more reporting. |
| * </p> |
| */ |
| @InterfaceAudience.Private |
| @InterfaceStability.Evolving |
| public class S3AInstrumentation implements Closeable, MetricsSource, |
| CountersAndGauges, IOStatisticsSource { |
| private static final Logger LOG = LoggerFactory.getLogger( |
| S3AInstrumentation.class); |
| |
| private static final String METRICS_SOURCE_BASENAME = "S3AMetrics"; |
| |
| /** |
| * {@value} The name of the s3a-specific metrics |
| * system instance used for s3a metrics. |
| */ |
| public static final String METRICS_SYSTEM_NAME = "s3a-file-system"; |
| |
| /** |
| * {@value} Currently all s3a metrics are placed in a single |
| * "context". Distinct contexts may be used in the future. |
| */ |
| public static final String CONTEXT = "s3aFileSystem"; |
| |
| /** |
| * {@value} The name of a field added to metrics |
| * records that uniquely identifies a specific FileSystem instance. |
| */ |
| public static final String METRIC_TAG_FILESYSTEM_ID = "s3aFileSystemId"; |
| |
| /** |
| * {@value} The name of a field added to metrics records |
| * that indicates the hostname portion of the FS URL. |
| */ |
| public static final String METRIC_TAG_BUCKET = "bucket"; |
| |
| // metricsSystemLock must be used to synchronize modifications to |
| // metricsSystem and the following counters. |
| private static final Object METRICS_SYSTEM_LOCK = new Object(); |
| private static MetricsSystem metricsSystem = null; |
| private static int metricsSourceNameCounter = 0; |
| private static int metricsSourceActiveCounter = 0; |
| |
| private final DurationTrackerFactory durationTrackerFactory; |
| |
| private String metricsSourceName; |
| |
| private final MetricsRegistry registry = |
| new MetricsRegistry("s3aFileSystem").setContext(CONTEXT); |
| private final MutableQuantiles throttleRateQuantile; |
| |
| /** |
| * This is the IOStatistics store for the S3AFileSystem |
| * instance. |
| * It is not kept in sync with the rest of the S3A instrumentation. |
| * Most inner statistics implementation classes only update this |
| * store when it is pushed back, such as as in close(). |
| */ |
| private final IOStatisticsStore instanceIOStatistics; |
| |
| /** |
| * Construct the instrumentation for a filesystem. |
| * @param name URI of filesystem. |
| */ |
| public S3AInstrumentation(URI name) { |
| UUID fileSystemInstanceId = UUID.randomUUID(); |
| registry.tag(METRIC_TAG_FILESYSTEM_ID, |
| "A unique identifier for the instance", |
| fileSystemInstanceId.toString()); |
| registry.tag(METRIC_TAG_BUCKET, "Hostname from the FS URL", name.getHost()); |
| |
| // now set up the instance IOStatistics. |
| // create the builder |
| IOStatisticsStoreBuilder storeBuilder = iostatisticsStore(); |
| |
| // declare all counter statistics |
| EnumSet.allOf(Statistic.class).stream() |
| .filter(statistic -> |
| statistic.getType() == StatisticTypeEnum.TYPE_COUNTER) |
| .forEach(stat -> { |
| counter(stat); |
| storeBuilder.withCounters(stat.getSymbol()); |
| }); |
| // declare all gauge statistics |
| EnumSet.allOf(Statistic.class).stream() |
| .filter(statistic -> |
| statistic.getType() == StatisticTypeEnum.TYPE_GAUGE) |
| .forEach(stat -> { |
| gauge(stat); |
| storeBuilder.withGauges(stat.getSymbol()); |
| }); |
| |
| // and durations |
| EnumSet.allOf(Statistic.class).stream() |
| .filter(statistic -> |
| statistic.getType() == StatisticTypeEnum.TYPE_DURATION) |
| .forEach(stat -> { |
| duration(stat); |
| storeBuilder.withDurationTracking(stat.getSymbol()); |
| }); |
| |
| //todo need a config for the quantiles interval? |
| int interval = 1; |
| throttleRateQuantile = quantiles(STORE_IO_THROTTLE_RATE, |
| "events", "frequency (Hz)", interval); |
| |
| // register with Hadoop metrics |
| registerAsMetricsSource(name); |
| |
| // and build the IO Statistics |
| instanceIOStatistics = storeBuilder.build(); |
| |
| // duration track metrics (Success/failure) and IOStatistics. |
| durationTrackerFactory = IOStatisticsBinding.pairedTrackerFactory( |
| instanceIOStatistics, |
| new MetricDurationTrackerFactory()); |
| } |
| |
| @VisibleForTesting |
| public MetricsSystem getMetricsSystem() { |
| synchronized (METRICS_SYSTEM_LOCK) { |
| if (metricsSystem == null) { |
| metricsSystem = new MetricsSystemImpl(); |
| metricsSystem.init(METRICS_SYSTEM_NAME); |
| } |
| } |
| return metricsSystem; |
| } |
| |
| /** |
| * Register this instance as a metrics source. |
| * @param name s3a:// URI for the associated FileSystem instance |
| */ |
| private void registerAsMetricsSource(URI name) { |
| int number; |
| synchronized(METRICS_SYSTEM_LOCK) { |
| getMetricsSystem(); |
| |
| metricsSourceActiveCounter++; |
| number = ++metricsSourceNameCounter; |
| } |
| String msName = METRICS_SOURCE_BASENAME + number; |
| metricsSourceName = msName + "-" + name.getHost(); |
| metricsSystem.register(metricsSourceName, "", this); |
| } |
| |
| /** |
| * Create a counter in the registry. |
| * @param name counter name |
| * @param desc counter description |
| * @return a new counter |
| */ |
| protected final MutableCounterLong counter(String name, String desc) { |
| return registry.newCounter(name, desc, 0L); |
| } |
| |
| /** |
| * Create a counter in the registry. |
| * @param op statistic to count |
| * @return a new counter |
| */ |
| protected final MutableCounterLong counter(Statistic op) { |
| return counter(op.getSymbol(), op.getDescription()); |
| } |
| |
| /** |
| * Registering a duration adds the success and failure counters. |
| * @param op statistic to track |
| */ |
| protected final void duration(Statistic op) { |
| counter(op.getSymbol(), op.getDescription()); |
| counter(op.getSymbol() + SUFFIX_FAILURES, op.getDescription()); |
| } |
| |
| /** |
| * Create a gauge in the registry. |
| * @param name name gauge name |
| * @param desc description |
| * @return the gauge |
| */ |
| protected final MutableGaugeLong gauge(String name, String desc) { |
| return registry.newGauge(name, desc, 0L); |
| } |
| |
| /** |
| * Create a gauge in the registry. |
| * @param op statistic to count |
| * @return the gauge |
| */ |
| protected final MutableGaugeLong gauge(Statistic op) { |
| return gauge(op.getSymbol(), op.getDescription()); |
| } |
| |
| /** |
| * Create a quantiles in the registry. |
| * @param op statistic to collect |
| * @param sampleName sample name of the quantiles |
| * @param valueName value name of the quantiles |
| * @param interval interval of the quantiles in seconds |
| * @return the created quantiles metric |
| */ |
| protected final MutableQuantiles quantiles(Statistic op, |
| String sampleName, |
| String valueName, |
| int interval) { |
| return registry.newQuantiles(op.getSymbol(), op.getDescription(), |
| sampleName, valueName, interval); |
| } |
| |
| /** |
| * Get the metrics registry. |
| * @return the registry |
| */ |
| public MetricsRegistry getRegistry() { |
| return registry; |
| } |
| |
| /** |
| * Dump all the metrics to a string. |
| * @param prefix prefix before every entry |
| * @param separator separator between name and value |
| * @param suffix suffix |
| * @param all get all the metrics even if the values are not changed. |
| * @return a string dump of the metrics |
| */ |
| public String dump(String prefix, |
| String separator, |
| String suffix, |
| boolean all) { |
| MetricStringBuilder metricBuilder = new MetricStringBuilder(null, |
| prefix, |
| separator, suffix); |
| registry.snapshot(metricBuilder, all); |
| return metricBuilder.toString(); |
| } |
| |
| /** |
| * Get the value of a counter. |
| * @param statistic the operation |
| * @return its value, or 0 if not found. |
| */ |
| public long getCounterValue(Statistic statistic) { |
| return getCounterValue(statistic.getSymbol()); |
| } |
| |
| /** |
| * Get the value of a counter. |
| * If the counter is null, return 0. |
| * @param name the name of the counter |
| * @return its value. |
| */ |
| public long getCounterValue(String name) { |
| MutableCounterLong counter = lookupCounter(name); |
| return counter == null ? 0 : counter.value(); |
| } |
| |
| /** |
| * Lookup a counter by name. Return null if it is not known. |
| * @param name counter name |
| * @return the counter |
| * @throws IllegalStateException if the metric is not a counter |
| */ |
| private MutableCounterLong lookupCounter(String name) { |
| MutableMetric metric = lookupMetric(name); |
| if (metric == null) { |
| return null; |
| } |
| if (!(metric instanceof MutableCounterLong)) { |
| throw new IllegalStateException("Metric " + name |
| + " is not a MutableCounterLong: " + metric |
| + " (type: " + metric.getClass() +")"); |
| } |
| return (MutableCounterLong) metric; |
| } |
| |
| /** |
| * Look up a gauge. |
| * @param name gauge name |
| * @return the gauge or null |
| * @throws ClassCastException if the metric is not a Gauge. |
| */ |
| public MutableGaugeLong lookupGauge(String name) { |
| MutableMetric metric = lookupMetric(name); |
| if (metric == null) { |
| LOG.debug("No gauge {}", name); |
| } |
| return (MutableGaugeLong) metric; |
| } |
| |
| /** |
| * Look up a quantiles. |
| * @param name quantiles name |
| * @return the quantiles or null |
| * @throws ClassCastException if the metric is not a Quantiles. |
| */ |
| public MutableQuantiles lookupQuantiles(String name) { |
| MutableMetric metric = lookupMetric(name); |
| if (metric == null) { |
| LOG.debug("No quantiles {}", name); |
| } |
| return (MutableQuantiles) metric; |
| } |
| |
| /** |
| * Look up a metric from both the registered set and the lighter weight |
| * stream entries. |
| * @param name metric name |
| * @return the metric or null |
| */ |
| public MutableMetric lookupMetric(String name) { |
| MutableMetric metric = getRegistry().get(name); |
| return metric; |
| } |
| |
| /** |
| * Get the instance IO Statistics. |
| * @return statistics. |
| */ |
| @Override |
| public IOStatisticsStore getIOStatistics() { |
| return instanceIOStatistics; |
| } |
| |
| /** |
| * Get the duration tracker factory. |
| * @return duration tracking for the instrumentation. |
| */ |
| public DurationTrackerFactory getDurationTrackerFactory() { |
| return durationTrackerFactory; |
| } |
| |
| /** |
| * The duration tracker updates the metrics with the count |
| * and IOStatistics will full duration information. |
| * @param key statistic key prefix |
| * @param count #of times to increment the matching counter in this |
| * operation. |
| * @return a duration tracker. |
| */ |
| @Override |
| public DurationTracker trackDuration(final String key, final long count) { |
| return durationTrackerFactory.trackDuration(key, count); |
| } |
| |
| /** |
| * Create an IOStatistics store which updates FS metrics |
| * as well as IOStatistics. |
| * @return instance of the store. |
| */ |
| public IOStatisticsStore createMetricsUpdatingStore() { |
| return new MetricsUpdatingIOStatisticsStore(); |
| } |
| |
| /** |
| * String representation. Includes the IOStatistics |
| * when logging is at DEBUG. |
| * @return a string form. |
| */ |
| @Override |
| public String toString() { |
| final StringBuilder sb = new StringBuilder( |
| "S3AInstrumentation{"); |
| if (LOG.isDebugEnabled()) { |
| sb.append("instanceIOStatistics=").append(instanceIOStatistics); |
| } |
| sb.append('}'); |
| return sb.toString(); |
| } |
| |
| /** |
| * Indicate that S3A created a file. |
| */ |
| public void fileCreated() { |
| incrementCounter(FILES_CREATED, 1); |
| } |
| |
| /** |
| * Indicate that S3A deleted one or more files. |
| * @param count number of files. |
| */ |
| public void fileDeleted(int count) { |
| incrementCounter(FILES_DELETED, count); |
| } |
| |
| /** |
| * Indicate that fake directory request was made. |
| * @param count number of directory entries included in the delete request. |
| */ |
| public void fakeDirsDeleted(int count) { |
| incrementCounter(FAKE_DIRECTORIES_DELETED, count); |
| } |
| |
| /** |
| * Indicate that S3A created a directory. |
| */ |
| public void directoryCreated() { |
| incrementCounter(DIRECTORIES_CREATED, 1); |
| } |
| |
| /** |
| * Indicate that S3A just deleted a directory. |
| */ |
| public void directoryDeleted() { |
| incrementCounter(DIRECTORIES_DELETED, 1); |
| } |
| |
| /** |
| * Indicate that S3A copied some files within the store. |
| * |
| * @param files number of files |
| * @param size total size in bytes |
| */ |
| public void filesCopied(int files, long size) { |
| incrementCounter(FILES_COPIED, files); |
| incrementCounter(FILES_COPIED_BYTES, size); |
| } |
| |
| /** |
| * Note that an error was ignored. |
| */ |
| public void errorIgnored() { |
| incrementCounter(IGNORED_ERRORS, 1); |
| } |
| |
| /** |
| * Increments a mutable counter and the matching |
| * instance IOStatistics counter. |
| * No-op if the counter is not defined, or the count == 0. |
| * @param op operation |
| * @param count increment value |
| */ |
| public void incrementCounter(Statistic op, long count) { |
| incrementNamedCounter(op.getSymbol(), count); |
| } |
| |
| /** |
| * Increments a mutable counter and the matching |
| * instance IOStatistics counter. |
| * No-op if the counter is not defined, or the count == 0. |
| * @param name counter name |
| * @param count increment value |
| * @return the updated value or, if the counter is unknown: 0 |
| */ |
| private long incrementNamedCounter(final String name, |
| final long count) { |
| if (count != 0) { |
| incrementMutableCounter(name, count); |
| return instanceIOStatistics.incrementCounter(name, count); |
| } else { |
| return 0; |
| } |
| } |
| |
| /** |
| * Increments a Mutable counter. |
| * No-op if not a positive integer. |
| * @param name counter name. |
| * @param count increment value |
| */ |
| private void incrementMutableCounter(final String name, final long count) { |
| if (count > 0) { |
| MutableCounterLong counter = lookupCounter(name); |
| if (counter != null) { |
| counter.incr(count); |
| } |
| } |
| } |
| |
| /** |
| * Add a value to a quantiles statistic. No-op if the quantile |
| * isn't found. |
| * @param op operation to look up. |
| * @param value value to add. |
| * @throws ClassCastException if the metric is not a Quantiles. |
| */ |
| public void addValueToQuantiles(Statistic op, long value) { |
| MutableQuantiles quantiles = lookupQuantiles(op.getSymbol()); |
| if (quantiles != null) { |
| quantiles.add(value); |
| } |
| } |
| |
| /** |
| * Increments a mutable counter and the matching |
| * instance IOStatistics counter with the value of |
| * the atomic long. |
| * No-op if the counter is not defined, or the count == 0. |
| * @param op operation |
| * @param count atomic long containing value |
| */ |
| public void incrementCounter(Statistic op, AtomicLong count) { |
| incrementCounter(op, count.get()); |
| } |
| |
| /** |
| * Increment a specific gauge. |
| * No-op if not defined. |
| * @param op operation |
| * @param count increment value |
| * @throws ClassCastException if the metric is of the wrong type |
| */ |
| public void incrementGauge(Statistic op, long count) { |
| MutableGaugeLong gauge = lookupGauge(op.getSymbol()); |
| if (gauge != null) { |
| gauge.incr(count); |
| } else { |
| LOG.debug("No Gauge: "+ op); |
| } |
| } |
| |
| /** |
| * Decrement a specific gauge. |
| * No-op if not defined. |
| * @param op operation |
| * @param count increment value |
| * @throws ClassCastException if the metric is of the wrong type |
| */ |
| public void decrementGauge(Statistic op, long count) { |
| MutableGaugeLong gauge = lookupGauge(op.getSymbol()); |
| if (gauge != null) { |
| gauge.decr(count); |
| } else { |
| LOG.debug("No Gauge: {}", op); |
| } |
| } |
| |
| /** |
| * Add the duration as a timed statistic, deriving |
| * statistic name from the operation symbol and the outcome. |
| * @param op operation |
| * @param success was the operation a success? |
| * @param duration how long did it take |
| */ |
| @Override |
| public void recordDuration(final Statistic op, |
| final boolean success, |
| final Duration duration) { |
| String name = op.getSymbol() |
| + (success ? "" : SUFFIX_FAILURES); |
| instanceIOStatistics.addTimedOperation(name, duration); |
| } |
| |
| /** |
| * Create a stream input statistics instance. |
| * @return the new instance |
| * @param filesystemStatistics FS Statistics to update in close(). |
| */ |
| public S3AInputStreamStatistics newInputStreamStatistics( |
| @Nullable final FileSystem.Statistics filesystemStatistics) { |
| return new InputStreamStatistics(filesystemStatistics); |
| } |
| |
| /** |
| * Create a new instance of the committer statistics. |
| * @return a new committer statistics instance |
| */ |
| public CommitterStatistics newCommitterStatistics() { |
| return new CommitterStatisticsImpl(); |
| } |
| |
| @Override |
| public void getMetrics(MetricsCollector collector, boolean all) { |
| registry.snapshot(collector.addRecord(registry.info().name()), true); |
| } |
| |
| public void close() { |
| synchronized (METRICS_SYSTEM_LOCK) { |
| // it is critical to close each quantile, as they start a scheduled |
| // task in a shared thread pool. |
| throttleRateQuantile.stop(); |
| metricsSystem.unregisterSource(metricsSourceName); |
| metricsSourceActiveCounter--; |
| int activeSources = metricsSourceActiveCounter; |
| if (activeSources == 0) { |
| LOG.debug("Shutting down metrics publisher"); |
| metricsSystem.publishMetricsNow(); |
| metricsSystem.shutdown(); |
| metricsSystem = null; |
| } |
| } |
| } |
| |
| /** |
| * A duration tracker which updates a mutable counter with a metric. |
| * The metric is updated with the count on start; after a failure |
| * the failures count is incremented by one. |
| */ |
| private final class MetricUpdatingDurationTracker |
| implements DurationTracker { |
| |
| private final String symbol; |
| |
| private boolean failed; |
| |
| private MetricUpdatingDurationTracker( |
| final String symbol, |
| final long count) { |
| this.symbol = symbol; |
| incrementMutableCounter(symbol, count); |
| } |
| |
| @Override |
| public void failed() { |
| failed = true; |
| } |
| |
| /** |
| * Close: on failure increment any mutable counter of |
| * failures. |
| */ |
| @Override |
| public void close() { |
| if (failed) { |
| incrementMutableCounter(symbol + SUFFIX_FAILURES, 1); |
| } |
| } |
| } |
| |
| /** |
| * Duration Tracker Factory for updating metrics. |
| */ |
| private final class MetricDurationTrackerFactory |
| implements DurationTrackerFactory { |
| |
| @Override |
| public DurationTracker trackDuration(final String key, final long count) { |
| return new MetricUpdatingDurationTracker(key, count); |
| } |
| |
| } |
| |
| /** |
| * Statistics updated by an S3AInputStream during its actual operation. |
| * <p> |
| * When {@code unbuffer()} is called, the changed numbers are propagated |
| * to the S3AFileSystem metrics. |
| * </p> |
| * <p> |
| * When {@code close()} is called, the final set of numbers are propagated |
| * to the S3AFileSystem metrics. |
| * </p> |
| * The {@link FileSystem.Statistics} statistics passed in are also |
| * updated. This ensures that whichever thread calls close() gets the |
| * total count of bytes read, even if any work is done in other |
| * threads. |
| * |
| */ |
| private final class InputStreamStatistics |
| extends AbstractS3AStatisticsSource |
| implements S3AInputStreamStatistics { |
| |
| /** |
| * Distance used when incrementing FS stats. |
| */ |
| private static final int DISTANCE = 5; |
| |
| /** |
| * FS statistics for the thread creating the stream. |
| */ |
| private final FileSystem.Statistics filesystemStatistics; |
| |
| /** |
| * The statistics from the last merge. |
| */ |
| private IOStatisticsSnapshot mergedStats; |
| |
| /* |
| The core counters are extracted to atomic longs for slightly |
| faster resolution on the critical paths, especially single byte |
| reads and the like. |
| */ |
| private final AtomicLong aborted; |
| private final AtomicLong backwardSeekOperations; |
| private final AtomicLong bytesBackwardsOnSeek; |
| private final AtomicLong bytesDiscardedInAbort; |
| /** Bytes read by the application. */ |
| private final AtomicLong bytesRead; |
| private final AtomicLong bytesDiscardedInClose; |
| private final AtomicLong bytesDiscardedOnSeek; |
| private final AtomicLong bytesSkippedOnSeek; |
| private final AtomicLong closed; |
| private final AtomicLong forwardSeekOperations; |
| private final AtomicLong openOperations; |
| private final AtomicLong readExceptions; |
| private final AtomicLong readsIncomplete; |
| private final AtomicLong readOperations; |
| private final AtomicLong readFullyOperations; |
| private final AtomicLong seekOperations; |
| private final AtomicLong readVectoredOperations; |
| private final AtomicLong bytesDiscardedInVectoredIO; |
| private final AtomicLong readVectoredIncomingRanges; |
| private final AtomicLong readVectoredCombinedRanges; |
| |
| /** Bytes read by the application and any when draining streams . */ |
| private final AtomicLong totalBytesRead; |
| |
| /** |
| * Instantiate. |
| * @param filesystemStatistics FS Statistics to update in close(). |
| */ |
| private InputStreamStatistics( |
| @Nullable FileSystem.Statistics filesystemStatistics) { |
| this.filesystemStatistics = filesystemStatistics; |
| IOStatisticsStore st = iostatisticsStore() |
| .withCounters( |
| StreamStatisticNames.STREAM_READ_ABORTED, |
| StreamStatisticNames.STREAM_READ_BYTES_DISCARDED_ABORT, |
| StreamStatisticNames.STREAM_READ_CLOSED, |
| StreamStatisticNames.STREAM_READ_BYTES_DISCARDED_CLOSE, |
| StreamStatisticNames.STREAM_READ_CLOSE_OPERATIONS, |
| StreamStatisticNames.STREAM_READ_OPENED, |
| StreamStatisticNames.STREAM_READ_BYTES, |
| StreamStatisticNames.STREAM_READ_EXCEPTIONS, |
| StreamStatisticNames.STREAM_READ_FULLY_OPERATIONS, |
| StreamStatisticNames.STREAM_READ_OPERATIONS, |
| StreamStatisticNames.STREAM_READ_OPERATIONS_INCOMPLETE, |
| StreamStatisticNames.STREAM_READ_SEEK_OPERATIONS, |
| StreamStatisticNames.STREAM_READ_SEEK_POLICY_CHANGED, |
| StreamStatisticNames.STREAM_READ_SEEK_BACKWARD_OPERATIONS, |
| StreamStatisticNames.STREAM_READ_SEEK_FORWARD_OPERATIONS, |
| StreamStatisticNames.STREAM_READ_SEEK_BYTES_BACKWARDS, |
| StreamStatisticNames.STREAM_READ_SEEK_BYTES_DISCARDED, |
| StreamStatisticNames.STREAM_READ_SEEK_BYTES_SKIPPED, |
| StreamStatisticNames.STREAM_READ_TOTAL_BYTES, |
| StreamStatisticNames.STREAM_READ_UNBUFFERED, |
| StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES, |
| StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES, |
| StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS, |
| StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED, |
| StreamStatisticNames.STREAM_READ_VERSION_MISMATCHES) |
| .withGauges(STREAM_READ_GAUGE_INPUT_POLICY) |
| .withDurationTracking(ACTION_HTTP_GET_REQUEST, |
| StoreStatisticNames.ACTION_FILE_OPENED, |
| StreamStatisticNames.STREAM_READ_REMOTE_STREAM_ABORTED, |
| StreamStatisticNames.STREAM_READ_REMOTE_STREAM_DRAINED) |
| .build(); |
| setIOStatistics(st); |
| aborted = st.getCounterReference( |
| StreamStatisticNames.STREAM_READ_ABORTED); |
| backwardSeekOperations = st.getCounterReference( |
| StreamStatisticNames.STREAM_READ_SEEK_BACKWARD_OPERATIONS); |
| bytesBackwardsOnSeek = st.getCounterReference( |
| StreamStatisticNames.STREAM_READ_SEEK_BYTES_BACKWARDS); |
| bytesDiscardedInAbort = st.getCounterReference( |
| StreamStatisticNames.STREAM_READ_BYTES_DISCARDED_ABORT); |
| bytesRead = st.getCounterReference( |
| StreamStatisticNames.STREAM_READ_BYTES); |
| bytesDiscardedInClose = st.getCounterReference( |
| StreamStatisticNames.STREAM_READ_BYTES_DISCARDED_CLOSE); |
| bytesDiscardedOnSeek = st.getCounterReference( |
| StreamStatisticNames.STREAM_READ_SEEK_BYTES_DISCARDED); |
| bytesSkippedOnSeek = st.getCounterReference( |
| StreamStatisticNames.STREAM_READ_SEEK_BYTES_SKIPPED); |
| closed = st.getCounterReference( |
| StreamStatisticNames.STREAM_READ_CLOSED); |
| forwardSeekOperations = st.getCounterReference( |
| StreamStatisticNames.STREAM_READ_SEEK_FORWARD_OPERATIONS); |
| openOperations = st.getCounterReference( |
| StreamStatisticNames.STREAM_READ_OPENED); |
| readExceptions = st.getCounterReference( |
| StreamStatisticNames.STREAM_READ_EXCEPTIONS); |
| readsIncomplete = st.getCounterReference( |
| StreamStatisticNames.STREAM_READ_OPERATIONS_INCOMPLETE); |
| readOperations = st.getCounterReference( |
| StreamStatisticNames.STREAM_READ_OPERATIONS); |
| readVectoredOperations = st.getCounterReference( |
| StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS); |
| bytesDiscardedInVectoredIO = st.getCounterReference( |
| StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED); |
| readVectoredIncomingRanges = st.getCounterReference( |
| StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES); |
| readVectoredCombinedRanges = st.getCounterReference( |
| StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES); |
| readFullyOperations = st.getCounterReference( |
| StreamStatisticNames.STREAM_READ_FULLY_OPERATIONS); |
| seekOperations = st.getCounterReference( |
| StreamStatisticNames.STREAM_READ_SEEK_OPERATIONS); |
| totalBytesRead = st.getCounterReference( |
| StreamStatisticNames.STREAM_READ_TOTAL_BYTES); |
| setIOStatistics(st); |
| // create initial snapshot of merged statistics |
| mergedStats = snapshotIOStatistics(st); |
| } |
| |
| /** |
| * Increment a named counter by one. |
| * @param name counter name |
| * @return the new value |
| */ |
| private long increment(String name) { |
| return increment(name, 1); |
| } |
| |
| /** |
| * Increment a named counter by a given value. |
| * @param name counter name |
| * @param value value to increment by. |
| * @return the new value |
| */ |
| private long increment(String name, long value) { |
| return incCounter(name, value); |
| } |
| |
| /** |
| * {@inheritDoc}. |
| * Increments the number of seek operations, |
| * and backward seek operations. |
| * The offset is inverted and used as the increment |
| * of {@link #bytesBackwardsOnSeek}. |
| */ |
| @Override |
| public void seekBackwards(long negativeOffset) { |
| seekOperations.incrementAndGet(); |
| backwardSeekOperations.incrementAndGet(); |
| bytesBackwardsOnSeek.addAndGet(-negativeOffset); |
| } |
| |
| /** |
| * {@inheritDoc}. |
| * Increment the number of seek and forward seek |
| * operations, as well as counters of bytes skipped |
| * and bytes read in seek, where appropriate. |
| * Bytes read in seek are also added to the totalBytesRead |
| * counter. |
| */ |
| @Override |
| public void seekForwards(final long skipped, |
| long bytesReadInSeek) { |
| seekOperations.incrementAndGet(); |
| forwardSeekOperations.incrementAndGet(); |
| if (skipped > 0) { |
| bytesSkippedOnSeek.addAndGet(skipped); |
| } |
| if (bytesReadInSeek > 0) { |
| bytesDiscardedOnSeek.addAndGet(bytesReadInSeek); |
| totalBytesRead.addAndGet(bytesReadInSeek); |
| } |
| } |
| |
| /** |
| * {@inheritDoc}. |
| * Use {@code getAnIncrement()} on {@link #openOperations} |
| * so that on invocation 1 it returns 0. |
| * The caller will know that this is the first invocation. |
| */ |
| @Override |
| public long streamOpened() { |
| return openOperations.getAndIncrement(); |
| } |
| |
| /** |
| * {@inheritDoc}. |
| * If the connection was aborted, increment {@link #aborted} |
| * and add the byte's remaining count to {@link #bytesDiscardedInAbort}. |
| * If not aborted, increment {@link #closed} and |
| * then {@link #bytesDiscardedInClose} and {@link #totalBytesRead} |
| * with the bytes remaining value. |
| */ |
| @Override |
| public void streamClose(boolean abortedConnection, |
| long remainingInCurrentRequest) { |
| if (abortedConnection) { |
| // the connection was aborted. |
| // update the counter of abort() calls and bytes discarded |
| aborted.incrementAndGet(); |
| bytesDiscardedInAbort.addAndGet(remainingInCurrentRequest); |
| } else { |
| // connection closed, possibly draining the stream of surplus |
| // bytes. |
| closed.incrementAndGet(); |
| bytesDiscardedInClose.addAndGet(remainingInCurrentRequest); |
| totalBytesRead.addAndGet(remainingInCurrentRequest); |
| filesystemStatistics.incrementBytesRead(remainingInCurrentRequest); |
| } |
| } |
| |
| /** |
| * {@inheritDoc}. |
| */ |
| @Override |
| public void readException() { |
| readExceptions.incrementAndGet(); |
| } |
| |
| /** |
| * {@inheritDoc}. |
| * If the byte counter is positive, increment bytesRead and totalBytesRead. |
| */ |
| @Override |
| public void bytesRead(long bytes) { |
| if (bytes > 0) { |
| bytesRead.addAndGet(bytes); |
| totalBytesRead.addAndGet(bytes); |
| } |
| } |
| |
| @Override |
| public void readOperationStarted(long pos, long len) { |
| readOperations.incrementAndGet(); |
| } |
| |
| @Override |
| public void readFullyOperationStarted(long pos, long len) { |
| readFullyOperations.incrementAndGet(); |
| } |
| |
| /** |
| * {@inheritDoc}. |
| * If more data was requested than was actually returned, this |
| * was an incomplete read. Increment {@link #readsIncomplete}. |
| */ |
| @Override |
| public void readOperationCompleted(int requested, int actual) { |
| if (requested > actual) { |
| readsIncomplete.incrementAndGet(); |
| } |
| } |
| |
| @Override |
| public void readVectoredOperationStarted(int numIncomingRanges, |
| int numCombinedRanges) { |
| readVectoredIncomingRanges.addAndGet(numIncomingRanges); |
| readVectoredCombinedRanges.addAndGet(numCombinedRanges); |
| readVectoredOperations.incrementAndGet(); |
| } |
| |
| @Override |
| public void readVectoredBytesDiscarded(int discarded) { |
| bytesDiscardedInVectoredIO.addAndGet(discarded); |
| } |
| |
| /** |
| * {@code close()} merges the stream statistics into the filesystem's |
| * instrumentation instance. |
| */ |
| @Override |
| public void close() { |
| increment(StreamStatisticNames.STREAM_READ_CLOSE_OPERATIONS); |
| merge(true); |
| } |
| |
| /** |
| * {@inheritDoc}. |
| * As well as incrementing the {@code STREAM_READ_SEEK_POLICY_CHANGED} |
| * counter, the |
| * {@code STREAM_READ_GAUGE_INPUT_POLICY} gauge is set to the new value. |
| * |
| */ |
| @Override |
| public void inputPolicySet(int updatedPolicy) { |
| increment(StreamStatisticNames.STREAM_READ_SEEK_POLICY_CHANGED); |
| localIOStatistics().setGauge(STREAM_READ_GAUGE_INPUT_POLICY, |
| updatedPolicy); |
| } |
| |
| /** |
| * Get the inner class's IO Statistics. This is |
| * needed to avoid findbugs warnings about ambiguity. |
| * @return the Input Stream's statistics. |
| */ |
| private IOStatisticsStore localIOStatistics() { |
| return InputStreamStatistics.super.getIOStatistics(); |
| } |
| |
| /** |
| * The change tracker increments {@code versionMismatches} on any |
| * mismatch. |
| * @return change tracking. |
| */ |
| @Override |
| public ChangeTrackerStatistics getChangeTrackerStatistics() { |
| return new CountingChangeTracker( |
| localIOStatistics().getCounterReference( |
| StreamStatisticNames.STREAM_READ_VERSION_MISMATCHES)); |
| } |
| |
| /** |
| * String operator describes all the current statistics. |
| * <b>Important: there are no guarantees as to the stability |
| * of this value.</b> |
| * @return the current values of the stream statistics. |
| */ |
| @Override |
| @InterfaceStability.Unstable |
| public String toString() { |
| final StringBuilder sb = new StringBuilder( |
| "StreamStatistics{"); |
| sb.append(IOStatisticsLogging.ioStatisticsToString( |
| localIOStatistics())); |
| sb.append('}'); |
| return sb.toString(); |
| } |
| |
| /** |
| * {@inheritDoc} |
| * Increment the counter {@code STREAM_READ_UNBUFFERED} |
| * and then merge the current set of statistics into the |
| * FileSystem's statistics through {@link #merge(boolean)}. |
| */ |
| @Override |
| public void unbuffered() { |
| increment(STREAM_READ_UNBUFFERED); |
| merge(false); |
| } |
| |
| /** |
| * Merge the statistics into the filesystem's instrumentation instance. |
| * <p> |
| * If the merge is invoked because the stream has been closed, |
| * then all statistics are merged, and the filesystem |
| * statistics of {@link #filesystemStatistics} updated |
| * with the bytes read values. |
| * </p> |
| * <p> |
| * Whichever thread close()d the stream will have its counters |
| * updated. |
| * </p> |
| * <p> |
| * If the merge is due to an unbuffer() call, the change in all |
| * counters since the last merge will be pushed to the Instrumentation's |
| * counters. |
| * </p> |
| * |
| * @param isClosed is this merge invoked because the stream is closed? |
| */ |
| private void merge(boolean isClosed) { |
| |
| IOStatisticsStore ioStatistics = localIOStatistics(); |
| LOG.debug("Merging statistics into FS statistics in {}: {}", |
| (isClosed ? "close()" : "unbuffer()"), |
| demandStringifyIOStatistics(ioStatistics)); |
| promoteInputStreamCountersToMetrics(); |
| mergedStats = snapshotIOStatistics(localIOStatistics()); |
| |
| if (isClosed) { |
| // stream is being closed. |
| // merge in all the IOStatistics |
| S3AInstrumentation.this.getIOStatistics().aggregate(ioStatistics); |
| |
| // increment the filesystem statistics for this thread. |
| if (filesystemStatistics != null) { |
| long t = getTotalBytesRead(); |
| filesystemStatistics.incrementBytesReadByDistance(DISTANCE, t); |
| } |
| } |
| } |
| |
| /** |
| * Propagate a counter from the instance-level statistics |
| * to the S3A instrumentation, subtracting the previous merged value. |
| * @param name statistic to promote |
| */ |
| void promoteIOCounter(String name) { |
| incrementMutableCounter(name, |
| lookupCounterValue(name) |
| - mergedStats.counters().get(name)); |
| } |
| |
| /** |
| * Merge in the statistics of a single input stream into |
| * the filesystem-wide metrics counters. |
| * This does not update the FS IOStatistics values. |
| */ |
| private void promoteInputStreamCountersToMetrics() { |
| // iterate through all the counters |
| localIOStatistics().counters() |
| .keySet().stream() |
| .forEach(e -> promoteIOCounter(e)); |
| } |
| |
| @Override |
| public long getCloseOperations() { |
| return lookupCounterValue( |
| StreamStatisticNames.STREAM_READ_CLOSE_OPERATIONS); |
| } |
| |
| @Override |
| public long getClosed() { |
| return lookupCounterValue(StreamStatisticNames.STREAM_READ_CLOSED); |
| } |
| |
| @Override |
| public long getAborted() { |
| return lookupCounterValue(StreamStatisticNames.STREAM_READ_ABORTED); |
| } |
| |
| @Override |
| public long getForwardSeekOperations() { |
| return lookupCounterValue( |
| StreamStatisticNames.STREAM_READ_SEEK_FORWARD_OPERATIONS); |
| } |
| |
| @Override |
| public long getBackwardSeekOperations() { |
| return lookupCounterValue( |
| StreamStatisticNames.STREAM_READ_SEEK_BACKWARD_OPERATIONS); |
| } |
| |
| @Override |
| public long getBytesRead() { |
| return lookupCounterValue(StreamStatisticNames.STREAM_READ_BYTES); |
| } |
| |
| @Override |
| public long getTotalBytesRead() { |
| return lookupCounterValue(StreamStatisticNames.STREAM_READ_TOTAL_BYTES); |
| } |
| |
| @Override |
| public long getBytesSkippedOnSeek() { |
| return lookupCounterValue( |
| StreamStatisticNames.STREAM_READ_SEEK_BYTES_SKIPPED); |
| } |
| |
| @Override |
| public long getBytesBackwardsOnSeek() { |
| return lookupCounterValue( |
| StreamStatisticNames.STREAM_READ_SEEK_BYTES_BACKWARDS); |
| } |
| |
| @Override |
| public long getBytesReadInClose() { |
| return lookupCounterValue( |
| StreamStatisticNames.STREAM_READ_BYTES_DISCARDED_CLOSE); |
| } |
| |
| @Override |
| public long getBytesDiscardedInAbort() { |
| return lookupCounterValue( |
| StreamStatisticNames.STREAM_READ_BYTES_DISCARDED_ABORT); |
| } |
| |
| @Override |
| public long getOpenOperations() { |
| return lookupCounterValue(StreamStatisticNames.STREAM_READ_OPENED); |
| } |
| |
| @Override |
| public long getSeekOperations() { |
| return lookupCounterValue( |
| StreamStatisticNames.STREAM_READ_SEEK_OPERATIONS); |
| } |
| |
| @Override |
| public long getReadExceptions() { |
| return lookupCounterValue( |
| StreamStatisticNames.STREAM_READ_EXCEPTIONS); |
| } |
| |
| @Override |
| public long getReadOperations() { |
| return lookupCounterValue(StreamStatisticNames.STREAM_READ_OPERATIONS); |
| } |
| |
| @Override |
| public long getReadFullyOperations() { |
| return lookupCounterValue( |
| StreamStatisticNames.STREAM_READ_FULLY_OPERATIONS); |
| } |
| |
| @Override |
| public long getReadsIncomplete() { |
| return lookupCounterValue( |
| StreamStatisticNames.STREAM_READ_OPERATIONS_INCOMPLETE); |
| } |
| |
| @Override |
| public long getPolicySetCount() { |
| return lookupCounterValue( |
| StreamStatisticNames.STREAM_READ_SEEK_POLICY_CHANGED); |
| } |
| |
| @Override |
| public long getVersionMismatches() { |
| return lookupCounterValue( |
| StreamStatisticNames.STREAM_READ_VERSION_MISMATCHES); |
| } |
| |
| @Override |
| public long getInputPolicy() { |
| return localIOStatistics().gauges() |
| .get(STREAM_READ_GAUGE_INPUT_POLICY); |
| } |
| |
| @Override |
| public DurationTracker initiateGetRequest() { |
| return trackDuration(ACTION_HTTP_GET_REQUEST); |
| } |
| |
| @Override |
| public DurationTracker initiateInnerStreamClose(final boolean abort) { |
| return trackDuration(abort |
| ? StreamStatisticNames.STREAM_READ_REMOTE_STREAM_ABORTED |
| : StreamStatisticNames.STREAM_READ_REMOTE_STREAM_DRAINED); |
| } |
| } |
| |
| /** |
| * Create a stream output statistics instance. |
| * @param filesystemStatistics thread-local FS statistics. |
| * @return the new instance |
| */ |
| public BlockOutputStreamStatistics newOutputStreamStatistics( |
| FileSystem.Statistics filesystemStatistics) { |
| return new OutputStreamStatistics(filesystemStatistics); |
| } |
| |
| /** |
| * Merge in the statistics of a single output stream into |
| * the filesystem-wide statistics. |
| * @param source stream statistics |
| */ |
| private void mergeOutputStreamStatistics( |
| OutputStreamStatistics source) { |
| incrementCounter(STREAM_WRITE_TOTAL_TIME, source.totalUploadDuration()); |
| incrementCounter(STREAM_WRITE_QUEUE_DURATION, source.queueDuration); |
| incrementCounter(STREAM_WRITE_TOTAL_DATA, source.bytesUploaded); |
| incrementCounter(STREAM_WRITE_BLOCK_UPLOADS, |
| source.blockUploadsCompleted); |
| incrementCounter(STREAM_WRITE_EXCEPTIONS, |
| source.lookupCounterValue( |
| StreamStatisticNames.STREAM_WRITE_EXCEPTIONS)); |
| |
| // merge in all the IOStatistics |
| final IOStatisticsStore sourceIOStatistics = source.getIOStatistics(); |
| this.getIOStatistics().aggregate(sourceIOStatistics); |
| |
| // propagate any extra values into the FS-level stats. |
| incrementMutableCounter(OBJECT_PUT_REQUESTS.getSymbol(), |
| sourceIOStatistics.counters().get(OBJECT_PUT_REQUESTS.getSymbol())); |
| incrementMutableCounter( |
| COMMITTER_MAGIC_MARKER_PUT.getSymbol(), |
| sourceIOStatistics.counters().get(COMMITTER_MAGIC_MARKER_PUT.getSymbol())); |
| |
| } |
| |
| /** |
| * Statistics updated by an output stream during its actual operation. |
| * <p> |
| * Some of these stats are propagated to any passed in |
| * {@link FileSystem.Statistics} instance; this is done |
| * in close() for better cross-thread accounting. |
| * </p> |
| * <p> |
| * Some of the collected statistics are not directly served via |
| * IOStatistics. |
| * They are added to the instrumentation IOStatistics and metric counters |
| * during the {@link #mergeOutputStreamStatistics(OutputStreamStatistics)} |
| * operation. |
| * </p> |
| */ |
| private final class OutputStreamStatistics |
| extends AbstractS3AStatisticsSource |
| implements BlockOutputStreamStatistics { |
| |
| private final AtomicLong blocksActive = new AtomicLong(0); |
| private final AtomicLong blockUploadsCompleted = new AtomicLong(0); |
| |
| private final AtomicLong bytesWritten; |
| private final AtomicLong bytesUploaded; |
| private final AtomicLong transferDuration = new AtomicLong(0); |
| private final AtomicLong queueDuration = new AtomicLong(0); |
| private final AtomicInteger blocksAllocated = new AtomicInteger(0); |
| private final AtomicInteger blocksReleased = new AtomicInteger(0); |
| |
| private final FileSystem.Statistics filesystemStatistics; |
| |
| /** |
| * Instantiate. |
| * @param filesystemStatistics FS Statistics to update in close(). |
| */ |
| private OutputStreamStatistics( |
| @Nullable FileSystem.Statistics filesystemStatistics) { |
| this.filesystemStatistics = filesystemStatistics; |
| IOStatisticsStore st = iostatisticsStore() |
| .withCounters( |
| STREAM_WRITE_BLOCK_UPLOADS.getSymbol(), |
| STREAM_WRITE_BYTES.getSymbol(), |
| STREAM_WRITE_EXCEPTIONS.getSymbol(), |
| STREAM_WRITE_EXCEPTIONS_COMPLETING_UPLOADS.getSymbol(), |
| STREAM_WRITE_QUEUE_DURATION.getSymbol(), |
| STREAM_WRITE_TOTAL_DATA.getSymbol(), |
| STREAM_WRITE_TOTAL_TIME.getSymbol(), |
| INVOCATION_HFLUSH.getSymbol(), |
| INVOCATION_HSYNC.getSymbol()) |
| .withGauges( |
| STREAM_WRITE_BLOCK_UPLOADS_PENDING.getSymbol(), |
| STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING.getSymbol()) |
| .withDurationTracking( |
| ACTION_EXECUTOR_ACQUIRED, |
| COMMITTER_MAGIC_MARKER_PUT.getSymbol(), |
| INVOCATION_ABORT.getSymbol(), |
| MULTIPART_UPLOAD_COMPLETED.getSymbol(), |
| OBJECT_MULTIPART_UPLOAD_ABORTED.getSymbol(), |
| OBJECT_MULTIPART_UPLOAD_INITIATED.getSymbol(), |
| OBJECT_PUT_REQUESTS.getSymbol()) |
| .build(); |
| setIOStatistics(st); |
| // these are extracted to avoid lookups on heavily used counters. |
| bytesUploaded = st.getCounterReference( |
| STREAM_WRITE_TOTAL_DATA.getSymbol()); |
| bytesWritten = st.getCounterReference( |
| StreamStatisticNames.STREAM_WRITE_BYTES); |
| } |
| |
| /** |
| * Increment the Statistic gauge and the local IOStatistics |
| * equivalent. |
| * @param statistic statistic |
| * @param v value. |
| * @return local IOStatistic value |
| */ |
| private long incAllGauges(Statistic statistic, long v) { |
| incrementGauge(statistic, v); |
| return incGauge(statistic.getSymbol(), v); |
| } |
| |
| @Override |
| public void blockAllocated() { |
| blocksAllocated.incrementAndGet(); |
| } |
| |
| @Override |
| public void blockReleased() { |
| blocksReleased.incrementAndGet(); |
| } |
| |
| /** |
| * {@inheritDoc} |
| * Increments the counter of block uplaods, and the gauges |
| * of block uploads pending (1) and the bytes pending (blockSize). |
| */ |
| @Override |
| public void blockUploadQueued(int blockSize) { |
| incCounter(StreamStatisticNames.STREAM_WRITE_BLOCK_UPLOADS); |
| incAllGauges(STREAM_WRITE_BLOCK_UPLOADS_PENDING, 1); |
| incAllGauges(STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING, blockSize); |
| } |
| |
| /** |
| * {@inheritDoc} |
| * Update {@link #queueDuration} with queue duration, decrement |
| * {@code STREAM_WRITE_BLOCK_UPLOADS_PENDING} gauge and increment |
| * {@code STREAM_WRITE_BLOCK_UPLOADS_ACTIVE}. |
| */ |
| @Override |
| public void blockUploadStarted(Duration timeInQueue, int blockSize) { |
| // the local counter is used in toString reporting. |
| queueDuration.addAndGet(timeInQueue.toMillis()); |
| // update the duration fields in the IOStatistics. |
| localIOStatistics().addTimedOperation( |
| ACTION_EXECUTOR_ACQUIRED, |
| timeInQueue); |
| incAllGauges(STREAM_WRITE_BLOCK_UPLOADS_PENDING, -1); |
| incAllGauges(STREAM_WRITE_BLOCK_UPLOADS_ACTIVE, 1); |
| } |
| |
| /** |
| * Get the inner class's IO Statistics. This is |
| * needed to avoid findbugs warnings about ambiguity. |
| * @return the Input Stream's statistics. |
| */ |
| private IOStatisticsStore localIOStatistics() { |
| return OutputStreamStatistics.super.getIOStatistics(); |
| } |
| |
| /** |
| * {@inheritDoc} |
| * Increment the transfer duration; decrement the |
| * {@code STREAM_WRITE_BLOCK_UPLOADS_ACTIVE} gauge. |
| */ |
| @Override |
| public void blockUploadCompleted( |
| Duration timeSinceUploadStarted, |
| int blockSize) { |
| transferDuration.addAndGet(timeSinceUploadStarted.toMillis()); |
| incAllGauges(STREAM_WRITE_BLOCK_UPLOADS_ACTIVE, -1); |
| blockUploadsCompleted.incrementAndGet(); |
| } |
| |
| /** |
| * A block upload has failed. |
| * A final transfer completed event is still expected, so this |
| * does not decrement any gauges. |
| */ |
| @Override |
| public void blockUploadFailed( |
| Duration timeSinceUploadStarted, |
| int blockSize) { |
| incCounter(StreamStatisticNames.STREAM_WRITE_EXCEPTIONS); |
| } |
| |
| /** |
| * Intermediate report of bytes uploaded. |
| * Increment counters of bytes upload, reduce the counter and |
| * gauge of pending bytes.; |
| * @param byteCount bytes uploaded |
| */ |
| @Override |
| public void bytesTransferred(long byteCount) { |
| bytesUploaded.addAndGet(byteCount); |
| incAllGauges(STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING, -byteCount); |
| } |
| |
| @Override |
| public void exceptionInMultipartComplete(int count) { |
| if (count > 0) { |
| incCounter( |
| STREAM_WRITE_EXCEPTIONS_COMPLETING_UPLOADS.getSymbol(), |
| count); |
| } |
| } |
| |
| @Override |
| public void exceptionInMultipartAbort() { |
| incCounter( |
| STREAM_WRITE_EXCEPTIONS_COMPLETING_UPLOADS.getSymbol()); |
| } |
| |
| @Override |
| public long getBytesPendingUpload() { |
| return lookupGaugeValue( |
| STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING.getSymbol()); |
| } |
| |
| @Override |
| public void commitUploaded(long size) { |
| incrementCounter(COMMITTER_BYTES_UPLOADED, size); |
| } |
| |
| @Override |
| public void hflushInvoked() { |
| incCounter(INVOCATION_HFLUSH.getSymbol(), 1); |
| } |
| |
| @Override |
| public void hsyncInvoked() { |
| incCounter(INVOCATION_HSYNC.getSymbol(), 1); |
| } |
| |
| @Override |
| public void close() { |
| if (getBytesPendingUpload() > 0) { |
| LOG.warn("Closing output stream statistics while data is still marked" + |
| " as pending upload in {}", this); |
| } |
| mergeOutputStreamStatistics(this); |
| // and patch the FS statistics. |
| // provided the stream is closed in the worker thread, this will |
| // ensure that the thread-specific worker stats are updated. |
| if (filesystemStatistics != null) { |
| filesystemStatistics.incrementBytesWritten(bytesUploaded.get()); |
| } |
| } |
| |
| /** |
| * What is the effective bandwidth of this stream's write. |
| * @return the bytes uploaded divided by the total duration. |
| */ |
| private double effectiveBandwidth() { |
| double duration = totalUploadDuration() / 1000.0; |
| return duration > 0 ? |
| (bytesUploaded.get() / duration) : 0; |
| } |
| |
| /** |
| * Total of time spend uploading bytes. |
| * @return the transfer duration plus queue duration. |
| */ |
| private long totalUploadDuration() { |
| return queueDuration.get() + transferDuration.get(); |
| } |
| |
| @Override |
| public int getBlocksAllocated() { |
| return blocksAllocated.get(); |
| } |
| |
| @Override |
| public int getBlocksReleased() { |
| return blocksReleased.get(); |
| } |
| |
| /** |
| * Get counters of blocks actively allocated; may be inaccurate |
| * if the numbers change during the (non-synchronized) calculation. |
| * @return the number of actively allocated blocks. |
| */ |
| @Override |
| public int getBlocksActivelyAllocated() { |
| return blocksAllocated.get() - blocksReleased.get(); |
| } |
| |
| /** |
| * Record bytes written. |
| * @param count number of bytes |
| */ |
| @Override |
| public void writeBytes(long count) { |
| bytesWritten.addAndGet(count); |
| } |
| |
| /** |
| * Get the current count of bytes written. |
| * @return the counter value. |
| */ |
| @Override |
| public long getBytesWritten() { |
| return bytesWritten.get(); |
| } |
| |
| @Override |
| public String toString() { |
| final StringBuilder sb = new StringBuilder( |
| "OutputStreamStatistics{"); |
| sb.append(localIOStatistics().toString()); |
| sb.append(", blocksActive=").append(blocksActive); |
| sb.append(", blockUploadsCompleted=").append(blockUploadsCompleted); |
| sb.append(", blocksAllocated=").append(blocksAllocated); |
| sb.append(", blocksReleased=").append(blocksReleased); |
| sb.append(", blocksActivelyAllocated=") |
| .append(getBlocksActivelyAllocated()); |
| sb.append(", transferDuration=").append(transferDuration).append(" ms"); |
| sb.append(", totalUploadDuration=").append(totalUploadDuration()) |
| .append(" ms"); |
| sb.append(", effectiveBandwidth=").append(effectiveBandwidth()) |
| .append(" bytes/s"); |
| sb.append('}'); |
| return sb.toString(); |
| } |
| } |
| |
| /** |
| * Instrumentation exported to S3A Committers. |
| * The S3AInstrumentation metrics and |
| * {@link #instanceIOStatistics} are updated continuously. |
| */ |
| private final class CommitterStatisticsImpl |
| extends AbstractS3AStatisticsSource |
| implements CommitterStatistics { |
| |
| private CommitterStatisticsImpl() { |
| IOStatisticsStore st = iostatisticsStore() |
| .withCounters( |
| COMMITTER_BYTES_COMMITTED.getSymbol(), |
| COMMITTER_BYTES_UPLOADED.getSymbol(), |
| COMMITTER_COMMITS_CREATED.getSymbol(), |
| COMMITTER_COMMITS_ABORTED.getSymbol(), |
| COMMITTER_COMMITS_COMPLETED.getSymbol(), |
| COMMITTER_COMMITS_FAILED.getSymbol(), |
| COMMITTER_COMMITS_REVERTED.getSymbol(), |
| COMMITTER_JOBS_FAILED.getSymbol(), |
| COMMITTER_JOBS_SUCCEEDED.getSymbol(), |
| COMMITTER_TASKS_FAILED.getSymbol(), |
| COMMITTER_TASKS_SUCCEEDED.getSymbol()) |
| .withDurationTracking( |
| COMMITTER_COMMIT_JOB.getSymbol(), |
| COMMITTER_LOAD_SINGLE_PENDING_FILE.getSymbol(), |
| COMMITTER_MATERIALIZE_FILE.getSymbol(), |
| COMMITTER_STAGE_FILE_UPLOAD.getSymbol()) |
| .build(); |
| setIOStatistics(st); |
| } |
| |
| /** |
| * Increment both the local counter and the S3AInstrumentation counters. |
| * @param stat statistic |
| * @param value value |
| * @return the new value |
| */ |
| private long increment(Statistic stat, long value) { |
| incrementCounter(stat, value); |
| return incCounter(stat.getSymbol(), value); |
| } |
| |
| /** A commit has been created. */ |
| @Override |
| public void commitCreated() { |
| increment(COMMITTER_COMMITS_CREATED, 1); |
| } |
| |
| @Override |
| public void commitUploaded(long size) { |
| increment(COMMITTER_BYTES_UPLOADED, size); |
| } |
| |
| @Override |
| public void commitCompleted(long size) { |
| increment(COMMITTER_COMMITS_COMPLETED, 1); |
| increment(COMMITTER_BYTES_COMMITTED, size); |
| } |
| |
| @Override |
| public void commitAborted() { |
| increment(COMMITTER_COMMITS_ABORTED, 1); |
| } |
| |
| @Override |
| public void commitReverted() { |
| increment(COMMITTER_COMMITS_REVERTED, 1); |
| } |
| |
| @Override |
| public void commitFailed() { |
| increment(COMMITTER_COMMITS_FAILED, 1); |
| } |
| |
| @Override |
| public void taskCompleted(boolean success) { |
| increment(success |
| ? COMMITTER_TASKS_SUCCEEDED |
| : COMMITTER_TASKS_FAILED, |
| 1); |
| } |
| |
| @Override |
| public void jobCompleted(boolean success) { |
| increment(success |
| ? COMMITTER_JOBS_SUCCEEDED |
| : COMMITTER_JOBS_FAILED, |
| 1); |
| } |
| |
| } |
| |
| /** |
| * Create a delegation token statistics instance. |
| * @return an instance of delegation token statistics |
| */ |
| public DelegationTokenStatistics newDelegationTokenStatistics() { |
| return new DelegationTokenStatisticsImpl(); |
| } |
| |
| /** |
| * Instrumentation exported to S3A Delegation Token support. |
| * The {@link #tokenIssued()} call is a no-op; |
| * This statistics class doesn't collect any local statistics. |
| * Instead it directly updates the S3A Instrumentation. |
| */ |
| private final class DelegationTokenStatisticsImpl implements |
| DelegationTokenStatistics { |
| |
| private DelegationTokenStatisticsImpl() { |
| } |
| |
| @Override |
| public void tokenIssued() { |
| } |
| |
| @Override |
| public DurationTracker trackDuration(final String key, final long count) { |
| return getDurationTrackerFactory() |
| .trackDuration(key, count); |
| } |
| } |
| |
| /** |
| * Copy all the metrics to a map of (name, long-value). |
| * @return a map of the metrics |
| */ |
| public Map<String, Long> toMap() { |
| MetricsToMap metricBuilder = new MetricsToMap(null); |
| registry.snapshot(metricBuilder, true); |
| return metricBuilder.getMap(); |
| } |
| |
| /** |
| * Convert all metrics to a map. |
| */ |
| private static class MetricsToMap extends MetricsRecordBuilder { |
| private final MetricsCollector parent; |
| private final Map<String, Long> map = |
| new HashMap<>(); |
| |
| MetricsToMap(MetricsCollector parent) { |
| this.parent = parent; |
| } |
| |
| @Override |
| public MetricsRecordBuilder tag(MetricsInfo info, String value) { |
| return this; |
| } |
| |
| @Override |
| public MetricsRecordBuilder add(MetricsTag tag) { |
| return this; |
| } |
| |
| @Override |
| public MetricsRecordBuilder add(AbstractMetric metric) { |
| return this; |
| } |
| |
| @Override |
| public MetricsRecordBuilder setContext(String value) { |
| return this; |
| } |
| |
| @Override |
| public MetricsRecordBuilder addCounter(MetricsInfo info, int value) { |
| return tuple(info, value); |
| } |
| |
| @Override |
| public MetricsRecordBuilder addCounter(MetricsInfo info, long value) { |
| return tuple(info, value); |
| } |
| |
| @Override |
| public MetricsRecordBuilder addGauge(MetricsInfo info, int value) { |
| return tuple(info, value); |
| } |
| |
| @Override |
| public MetricsRecordBuilder addGauge(MetricsInfo info, long value) { |
| return tuple(info, value); |
| } |
| |
| public MetricsToMap tuple(MetricsInfo info, long value) { |
| return tuple(info.name(), value); |
| } |
| |
| public MetricsToMap tuple(String name, long value) { |
| map.put(name, value); |
| return this; |
| } |
| |
| @Override |
| public MetricsRecordBuilder addGauge(MetricsInfo info, float value) { |
| return tuple(info, (long) value); |
| } |
| |
| @Override |
| public MetricsRecordBuilder addGauge(MetricsInfo info, double value) { |
| return tuple(info, (long) value); |
| } |
| |
| @Override |
| public MetricsCollector parent() { |
| return parent; |
| } |
| |
| /** |
| * Get the map. |
| * @return the map of metrics |
| */ |
| public Map<String, Long> getMap() { |
| return map; |
| } |
| } |
| |
| /** |
| * An IOStatisticsStore which updates metrics on calls to |
| * {@link #incrementCounter(String, long)}. |
| * This helps keeps FS metrics and IOStats in sync. |
| * Duration tracking methods are forwarded to |
| * the S3A Instrumentation duration tracker, which will |
| * update the instance IOStatistics. |
| */ |
| private final class MetricsUpdatingIOStatisticsStore |
| extends ForwardingIOStatisticsStore { |
| |
| private MetricsUpdatingIOStatisticsStore() { |
| super(S3AInstrumentation.this.getIOStatistics()); |
| } |
| |
| /** |
| * Incrementing the counter also implements the metric alongside |
| * the IOStatistics value. |
| * @param key counter key |
| * @param value increment value. |
| * @return the value in the wrapped IOStatistics. |
| */ |
| @Override |
| public long incrementCounter(final String key, final long value) { |
| incrementMutableCounter(key, value); |
| return super.incrementCounter(key, value); |
| } |
| |
| @Override |
| public DurationTracker trackDuration(final String key, final long count) { |
| return S3AInstrumentation.this.trackDuration(key, count); |
| } |
| |
| @Override |
| public DurationTracker trackDuration(final String key) { |
| return S3AInstrumentation.this.trackDuration(key); |
| } |
| } |
| } |