blob: da1fc5a807c9b7e65f885011ae8a19cc319f9c47 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileSystem.Statistics;
import org.apache.hadoop.metrics2.MetricStringBuilder;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.Interns;
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
import org.apache.hadoop.metrics2.lib.MutableMetric;
import org.apache.hadoop.metrics2.lib.MutableQuantiles;
import java.io.Closeable;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.hadoop.fs.s3a.Statistic.*;
/**
* Instrumentation of S3a.
* Derived from the {@code AzureFileSystemInstrumentation}.
*
* Counters and metrics are generally addressed in code by their name or
* {@link Statistic} key. There <i>may</i> be some Statistics which do
* not have an entry here. To avoid attempts to access such counters failing,
* the operations to increment/query metric values are designed to handle
* lookup failures.
*/
@Metrics(about = "Metrics for S3a", context = "S3AFileSystem")
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class S3AInstrumentation {
private static final Logger LOG = LoggerFactory.getLogger(
S3AInstrumentation.class);
public static final String CONTEXT = "S3AFileSystem";
private final MetricsRegistry registry =
new MetricsRegistry("S3AFileSystem").setContext(CONTEXT);
private final MutableCounterLong streamOpenOperations;
private final MutableCounterLong streamCloseOperations;
private final MutableCounterLong streamClosed;
private final MutableCounterLong streamAborted;
private final MutableCounterLong streamSeekOperations;
private final MutableCounterLong streamReadExceptions;
private final MutableCounterLong streamForwardSeekOperations;
private final MutableCounterLong streamBackwardSeekOperations;
private final MutableCounterLong streamBytesSkippedOnSeek;
private final MutableCounterLong streamBytesBackwardsOnSeek;
private final MutableCounterLong streamBytesRead;
private final MutableCounterLong streamReadOperations;
private final MutableCounterLong streamReadFullyOperations;
private final MutableCounterLong streamReadsIncomplete;
private final MutableCounterLong streamBytesReadInClose;
private final MutableCounterLong streamBytesDiscardedInAbort;
private final MutableCounterLong ignoredErrors;
private final MutableCounterLong numberOfFilesCreated;
private final MutableCounterLong numberOfFilesCopied;
private final MutableCounterLong bytesOfFilesCopied;
private final MutableCounterLong numberOfFilesDeleted;
private final MutableCounterLong numberOfFakeDirectoryDeletes;
private final MutableCounterLong numberOfDirectoriesCreated;
private final MutableCounterLong numberOfDirectoriesDeleted;
private final Map<String, MutableCounterLong> streamMetrics =
new HashMap<>(30);
/** Instantiate this without caring whether or not S3Guard is enabled. */
private final S3GuardInstrumentation s3GuardInstrumentation
= new S3GuardInstrumentation();
private static final Statistic[] COUNTERS_TO_CREATE = {
INVOCATION_COPY_FROM_LOCAL_FILE,
INVOCATION_EXISTS,
INVOCATION_GET_FILE_STATUS,
INVOCATION_GLOB_STATUS,
INVOCATION_IS_DIRECTORY,
INVOCATION_IS_FILE,
INVOCATION_LIST_FILES,
INVOCATION_LIST_LOCATED_STATUS,
INVOCATION_LIST_STATUS,
INVOCATION_MKDIRS,
INVOCATION_RENAME,
OBJECT_COPY_REQUESTS,
OBJECT_DELETE_REQUESTS,
OBJECT_LIST_REQUESTS,
OBJECT_CONTINUE_LIST_REQUESTS,
OBJECT_METADATA_REQUESTS,
OBJECT_MULTIPART_UPLOAD_ABORTED,
OBJECT_PUT_BYTES,
OBJECT_PUT_REQUESTS,
OBJECT_PUT_REQUESTS_COMPLETED,
STREAM_WRITE_FAILURES,
STREAM_WRITE_BLOCK_UPLOADS,
STREAM_WRITE_BLOCK_UPLOADS_COMMITTED,
STREAM_WRITE_BLOCK_UPLOADS_ABORTED,
STREAM_WRITE_TOTAL_TIME,
STREAM_WRITE_TOTAL_DATA,
S3GUARD_METADATASTORE_PUT_PATH_REQUEST,
S3GUARD_METADATASTORE_INITIALIZATION
};
private static final Statistic[] GAUGES_TO_CREATE = {
OBJECT_PUT_REQUESTS_ACTIVE,
OBJECT_PUT_BYTES_PENDING,
STREAM_WRITE_BLOCK_UPLOADS_ACTIVE,
STREAM_WRITE_BLOCK_UPLOADS_PENDING,
STREAM_WRITE_BLOCK_UPLOADS_DATA_PENDING,
};
public S3AInstrumentation(URI name) {
UUID fileSystemInstanceId = UUID.randomUUID();
registry.tag("FileSystemId",
"A unique identifier for the FS ",
fileSystemInstanceId.toString() + "-" + name.getHost());
registry.tag("fsURI",
"URI of this filesystem",
name.toString());
streamOpenOperations = streamCounter(STREAM_OPENED);
streamCloseOperations = streamCounter(STREAM_CLOSE_OPERATIONS);
streamClosed = streamCounter(STREAM_CLOSED);
streamAborted = streamCounter(STREAM_ABORTED);
streamSeekOperations = streamCounter(STREAM_SEEK_OPERATIONS);
streamReadExceptions = streamCounter(STREAM_READ_EXCEPTIONS);
streamForwardSeekOperations =
streamCounter(STREAM_FORWARD_SEEK_OPERATIONS);
streamBackwardSeekOperations =
streamCounter(STREAM_BACKWARD_SEEK_OPERATIONS);
streamBytesSkippedOnSeek = streamCounter(STREAM_SEEK_BYTES_SKIPPED);
streamBytesBackwardsOnSeek =
streamCounter(STREAM_SEEK_BYTES_BACKWARDS);
streamBytesRead = streamCounter(STREAM_SEEK_BYTES_READ);
streamReadOperations = streamCounter(STREAM_READ_OPERATIONS);
streamReadFullyOperations =
streamCounter(STREAM_READ_FULLY_OPERATIONS);
streamReadsIncomplete =
streamCounter(STREAM_READ_OPERATIONS_INCOMPLETE);
streamBytesReadInClose = streamCounter(STREAM_CLOSE_BYTES_READ);
streamBytesDiscardedInAbort = streamCounter(STREAM_ABORT_BYTES_DISCARDED);
numberOfFilesCreated = counter(FILES_CREATED);
numberOfFilesCopied = counter(FILES_COPIED);
bytesOfFilesCopied = counter(FILES_COPIED_BYTES);
numberOfFilesDeleted = counter(FILES_DELETED);
numberOfFakeDirectoryDeletes = counter(FAKE_DIRECTORIES_DELETED);
numberOfDirectoriesCreated = counter(DIRECTORIES_CREATED);
numberOfDirectoriesDeleted = counter(DIRECTORIES_DELETED);
ignoredErrors = counter(IGNORED_ERRORS);
for (Statistic statistic : COUNTERS_TO_CREATE) {
counter(statistic);
}
for (Statistic statistic : GAUGES_TO_CREATE) {
gauge(statistic.getSymbol(), statistic.getDescription());
}
//todo need a config for the quantiles interval?
quantiles(S3GUARD_METADATASTORE_PUT_PATH_LATENCY,
"ops", "latency", 1);
}
/**
* Create a counter in the registry.
* @param name counter name
* @param desc counter description
* @return a new counter
*/
protected final MutableCounterLong counter(String name, String desc) {
return registry.newCounter(name, desc, 0L);
}
/**
* Create a counter in the stream map: these are unregistered in the public
* metrics.
* @param name counter name
* @param desc counter description
* @return a new counter
*/
protected final MutableCounterLong streamCounter(String name, String desc) {
MutableCounterLong counter = new MutableCounterLong(
Interns.info(name, desc), 0L);
streamMetrics.put(name, counter);
return counter;
}
/**
* Create a counter in the registry.
* @param op statistic to count
* @return a new counter
*/
protected final MutableCounterLong counter(Statistic op) {
return counter(op.getSymbol(), op.getDescription());
}
/**
* Create a counter in the stream map: these are unregistered in the public
* metrics.
* @param op statistic to count
* @return a new counter
*/
protected final MutableCounterLong streamCounter(Statistic op) {
return streamCounter(op.getSymbol(), op.getDescription());
}
/**
* Create a gauge in the registry.
* @param name name gauge name
* @param desc description
* @return the gauge
*/
protected final MutableGaugeLong gauge(String name, String desc) {
return registry.newGauge(name, desc, 0L);
}
/**
* Create a quantiles in the registry.
* @param op statistic to collect
* @param sampleName sample name of the quantiles
* @param valueName value name of the quantiles
* @param interval interval of the quantiles in seconds
* @return the created quantiles metric
*/
protected final MutableQuantiles quantiles(Statistic op,
String sampleName,
String valueName,
int interval) {
return registry.newQuantiles(op.getSymbol(), op.getDescription(),
sampleName, valueName, interval);
}
/**
* Get the metrics registry.
* @return the registry
*/
public MetricsRegistry getRegistry() {
return registry;
}
/**
* Dump all the metrics to a string.
* @param prefix prefix before every entry
* @param separator separator between name and value
* @param suffix suffix
* @param all get all the metrics even if the values are not changed.
* @return a string dump of the metrics
*/
public String dump(String prefix,
String separator,
String suffix,
boolean all) {
MetricStringBuilder metricBuilder = new MetricStringBuilder(null,
prefix,
separator, suffix);
registry.snapshot(metricBuilder, all);
for (Map.Entry<String, MutableCounterLong> entry:
streamMetrics.entrySet()) {
metricBuilder.tuple(entry.getKey(),
Long.toString(entry.getValue().value()));
}
return metricBuilder.toString();
}
/**
* Get the value of a counter.
* @param statistic the operation
* @return its value, or 0 if not found.
*/
public long getCounterValue(Statistic statistic) {
return getCounterValue(statistic.getSymbol());
}
/**
* Get the value of a counter.
* If the counter is null, return 0.
* @param name the name of the counter
* @return its value.
*/
public long getCounterValue(String name) {
MutableCounterLong counter = lookupCounter(name);
return counter == null ? 0 : counter.value();
}
/**
* Lookup a counter by name. Return null if it is not known.
* @param name counter name
* @return the counter
* @throws IllegalStateException if the metric is not a counter
*/
private MutableCounterLong lookupCounter(String name) {
MutableMetric metric = lookupMetric(name);
if (metric == null) {
return null;
}
if (!(metric instanceof MutableCounterLong)) {
throw new IllegalStateException("Metric " + name
+ " is not a MutableCounterLong: " + metric);
}
return (MutableCounterLong) metric;
}
/**
* Look up a gauge.
* @param name gauge name
* @return the gauge or null
* @throws ClassCastException if the metric is not a Gauge.
*/
public MutableGaugeLong lookupGauge(String name) {
MutableMetric metric = lookupMetric(name);
if (metric == null) {
LOG.debug("No gauge {}", name);
}
return (MutableGaugeLong) metric;
}
/**
* Look up a quantiles.
* @param name quantiles name
* @return the quantiles or null
* @throws ClassCastException if the metric is not a Quantiles.
*/
public MutableQuantiles lookupQuantiles(String name) {
MutableMetric metric = lookupMetric(name);
if (metric == null) {
LOG.debug("No quantiles {}", name);
}
return (MutableQuantiles) metric;
}
/**
* Look up a metric from both the registered set and the lighter weight
* stream entries.
* @param name metric name
* @return the metric or null
*/
public MutableMetric lookupMetric(String name) {
MutableMetric metric = getRegistry().get(name);
if (metric == null) {
metric = streamMetrics.get(name);
}
return metric;
}
/**
* Indicate that S3A created a file.
*/
public void fileCreated() {
numberOfFilesCreated.incr();
}
/**
* Indicate that S3A deleted one or more file.s
* @param count number of files.
*/
public void fileDeleted(int count) {
numberOfFilesDeleted.incr(count);
}
/**
* Indicate that fake directory request was made.
* @param count number of directory entries included in the delete request.
*/
public void fakeDirsDeleted(int count) {
numberOfFakeDirectoryDeletes.incr(count);
}
/**
* Indicate that S3A created a directory.
*/
public void directoryCreated() {
numberOfDirectoriesCreated.incr();
}
/**
* Indicate that S3A just deleted a directory.
*/
public void directoryDeleted() {
numberOfDirectoriesDeleted.incr();
}
/**
* Indicate that S3A copied some files within the store.
*
* @param files number of files
* @param size total size in bytes
*/
public void filesCopied(int files, long size) {
numberOfFilesCopied.incr(files);
bytesOfFilesCopied.incr(size);
}
/**
* Note that an error was ignored.
*/
public void errorIgnored() {
ignoredErrors.incr();
}
/**
* Increment a specific counter.
* No-op if not defined.
* @param op operation
* @param count increment value
*/
public void incrementCounter(Statistic op, long count) {
MutableCounterLong counter = lookupCounter(op.getSymbol());
if (counter != null) {
counter.incr(count);
}
}
/**
* Add a value to a quantiles statistic. No-op if the quantile
* isn't found.
* @param op operation to look up.
* @param value value to add.
* @throws ClassCastException if the metric is not a Quantiles.
*/
public void addValueToQuantiles(Statistic op, long value) {
MutableQuantiles quantiles = lookupQuantiles(op.getSymbol());
if (quantiles != null) {
quantiles.add(value);
}
}
/**
* Increment a specific counter.
* No-op if not defined.
* @param op operation
* @param count atomic long containing value
*/
public void incrementCounter(Statistic op, AtomicLong count) {
incrementCounter(op, count.get());
}
/**
* Increment a specific gauge.
* No-op if not defined.
* @param op operation
* @param count increment value
* @throws ClassCastException if the metric is of the wrong type
*/
public void incrementGauge(Statistic op, long count) {
MutableGaugeLong gauge = lookupGauge(op.getSymbol());
if (gauge != null) {
gauge.incr(count);
} else {
LOG.debug("No Gauge: "+ op);
}
}
/**
* Decrement a specific gauge.
* No-op if not defined.
* @param op operation
* @param count increment value
* @throws ClassCastException if the metric is of the wrong type
*/
public void decrementGauge(Statistic op, long count) {
MutableGaugeLong gauge = lookupGauge(op.getSymbol());
if (gauge != null) {
gauge.decr(count);
} else {
LOG.debug("No Gauge: {}", op);
}
}
/**
* Create a stream input statistics instance.
* @return the new instance
*/
InputStreamStatistics newInputStreamStatistics() {
return new InputStreamStatistics();
}
/**
* Create a S3Guard instrumentation instance.
* There's likely to be at most one instance of this per FS instance.
* @return the S3Guard instrumentation point.
*/
public S3GuardInstrumentation getS3GuardInstrumentation() {
return s3GuardInstrumentation;
}
/**
* Merge in the statistics of a single input stream into
* the filesystem-wide statistics.
* @param statistics stream statistics
*/
private void mergeInputStreamStatistics(InputStreamStatistics statistics) {
streamOpenOperations.incr(statistics.openOperations);
streamCloseOperations.incr(statistics.closeOperations);
streamClosed.incr(statistics.closed);
streamAborted.incr(statistics.aborted);
streamSeekOperations.incr(statistics.seekOperations);
streamReadExceptions.incr(statistics.readExceptions);
streamForwardSeekOperations.incr(statistics.forwardSeekOperations);
streamBytesSkippedOnSeek.incr(statistics.bytesSkippedOnSeek);
streamBackwardSeekOperations.incr(statistics.backwardSeekOperations);
streamBytesBackwardsOnSeek.incr(statistics.bytesBackwardsOnSeek);
streamBytesRead.incr(statistics.bytesRead);
streamReadOperations.incr(statistics.readOperations);
streamReadFullyOperations.incr(statistics.readFullyOperations);
streamReadsIncomplete.incr(statistics.readsIncomplete);
streamBytesReadInClose.incr(statistics.bytesReadInClose);
streamBytesDiscardedInAbort.incr(statistics.bytesDiscardedInAbort);
}
/**
* Statistics updated by an input stream during its actual operation.
* These counters not thread-safe and are for use in a single instance
* of a stream.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public final class InputStreamStatistics implements AutoCloseable {
public long openOperations;
public long closeOperations;
public long closed;
public long aborted;
public long seekOperations;
public long readExceptions;
public long forwardSeekOperations;
public long backwardSeekOperations;
public long bytesRead;
public long bytesSkippedOnSeek;
public long bytesBackwardsOnSeek;
public long readOperations;
public long readFullyOperations;
public long readsIncomplete;
public long bytesReadInClose;
public long bytesDiscardedInAbort;
private InputStreamStatistics() {
}
/**
* Seek backwards, incrementing the seek and backward seek counters.
* @param negativeOffset how far was the seek?
* This is expected to be negative.
*/
public void seekBackwards(long negativeOffset) {
seekOperations++;
backwardSeekOperations++;
bytesBackwardsOnSeek -= negativeOffset;
}
/**
* Record a forward seek, adding a seek operation, a forward
* seek operation, and any bytes skipped.
* @param skipped number of bytes skipped by reading from the stream.
* If the seek was implemented by a close + reopen, set this to zero.
*/
public void seekForwards(long skipped) {
seekOperations++;
forwardSeekOperations++;
if (skipped > 0) {
bytesSkippedOnSeek += skipped;
}
}
/**
* The inner stream was opened.
*/
public void streamOpened() {
openOperations++;
}
/**
* The inner stream was closed.
* @param abortedConnection flag to indicate the stream was aborted,
* rather than closed cleanly
* @param remainingInCurrentRequest the number of bytes remaining in
* the current request.
*/
public void streamClose(boolean abortedConnection,
long remainingInCurrentRequest) {
closeOperations++;
if (abortedConnection) {
this.aborted++;
bytesDiscardedInAbort += remainingInCurrentRequest;
} else {
closed++;
bytesReadInClose += remainingInCurrentRequest;
}
}
/**
* An ignored stream read exception was received.
*/
public void readException() {
readExceptions++;
}
/**
* Increment the bytes read counter by the number of bytes;
* no-op if the argument is negative.
* @param bytes number of bytes read
*/
public void bytesRead(long bytes) {
if (bytes > 0) {
bytesRead += bytes;
}
}
/**
* A {@code read(byte[] buf, int off, int len)} operation has started.
* @param pos starting position of the read
* @param len length of bytes to read
*/
public void readOperationStarted(long pos, long len) {
readOperations++;
}
/**
* A {@code PositionedRead.read(position, buffer, offset, length)}
* operation has just started.
* @param pos starting position of the read
* @param len length of bytes to read
*/
public void readFullyOperationStarted(long pos, long len) {
readFullyOperations++;
}
/**
* A read operation has completed.
* @param requested number of requested bytes
* @param actual the actual number of bytes
*/
public void readOperationCompleted(int requested, int actual) {
if (requested > actual) {
readsIncomplete++;
}
}
/**
* Close triggers the merge of statistics into the filesystem's
* instrumentation instance.
*/
@Override
public void close() {
mergeInputStreamStatistics(this);
}
/**
* String operator describes all the current statistics.
* <b>Important: there are no guarantees as to the stability
* of this value.</b>
* @return the current values of the stream statistics.
*/
@Override
@InterfaceStability.Unstable
public String toString() {
final StringBuilder sb = new StringBuilder(
"StreamStatistics{");
sb.append("OpenOperations=").append(openOperations);
sb.append(", CloseOperations=").append(closeOperations);
sb.append(", Closed=").append(closed);
sb.append(", Aborted=").append(aborted);
sb.append(", SeekOperations=").append(seekOperations);
sb.append(", ReadExceptions=").append(readExceptions);
sb.append(", ForwardSeekOperations=")
.append(forwardSeekOperations);
sb.append(", BackwardSeekOperations=")
.append(backwardSeekOperations);
sb.append(", BytesSkippedOnSeek=").append(bytesSkippedOnSeek);
sb.append(", BytesBackwardsOnSeek=").append(bytesBackwardsOnSeek);
sb.append(", BytesRead=").append(bytesRead);
sb.append(", BytesRead excluding skipped=")
.append(bytesRead - bytesSkippedOnSeek);
sb.append(", ReadOperations=").append(readOperations);
sb.append(", ReadFullyOperations=").append(readFullyOperations);
sb.append(", ReadsIncomplete=").append(readsIncomplete);
sb.append(", BytesReadInClose=").append(bytesReadInClose);
sb.append(", BytesDiscardedInAbort=").append(bytesDiscardedInAbort);
sb.append('}');
return sb.toString();
}
}
/**
* Create a stream output statistics instance.
* @return the new instance
*/
OutputStreamStatistics newOutputStreamStatistics(Statistics statistics) {
return new OutputStreamStatistics(statistics);
}
/**
* Merge in the statistics of a single output stream into
* the filesystem-wide statistics.
* @param statistics stream statistics
*/
private void mergeOutputStreamStatistics(OutputStreamStatistics statistics) {
incrementCounter(STREAM_WRITE_TOTAL_TIME, statistics.totalUploadDuration());
incrementCounter(STREAM_WRITE_QUEUE_DURATION, statistics.queueDuration);
incrementCounter(STREAM_WRITE_TOTAL_DATA, statistics.bytesUploaded);
incrementCounter(STREAM_WRITE_BLOCK_UPLOADS,
statistics.blockUploadsCompleted);
}
/**
* Statistics updated by an output stream during its actual operation.
* Some of these stats may be relayed. However, as block upload is
* spans multiple
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public final class OutputStreamStatistics implements Closeable {
private final AtomicLong blocksSubmitted = new AtomicLong(0);
private final AtomicLong blocksInQueue = new AtomicLong(0);
private final AtomicLong blocksActive = new AtomicLong(0);
private final AtomicLong blockUploadsCompleted = new AtomicLong(0);
private final AtomicLong blockUploadsFailed = new AtomicLong(0);
private final AtomicLong bytesPendingUpload = new AtomicLong(0);
private final AtomicLong bytesUploaded = new AtomicLong(0);
private final AtomicLong transferDuration = new AtomicLong(0);
private final AtomicLong queueDuration = new AtomicLong(0);
private final AtomicLong exceptionsInMultipartFinalize = new AtomicLong(0);
private final AtomicInteger blocksAllocated = new AtomicInteger(0);
private final AtomicInteger blocksReleased = new AtomicInteger(0);
private Statistics statistics;
public OutputStreamStatistics(Statistics statistics){
this.statistics = statistics;
}
/**
* A block has been allocated.
*/
void blockAllocated() {
blocksAllocated.incrementAndGet();
}
/**
* A block has been released.
*/
void blockReleased() {
blocksReleased.incrementAndGet();
}
/**
* Block is queued for upload.
*/
void blockUploadQueued(int blockSize) {
blocksSubmitted.incrementAndGet();
blocksInQueue.incrementAndGet();
bytesPendingUpload.addAndGet(blockSize);
incrementGauge(STREAM_WRITE_BLOCK_UPLOADS_PENDING, 1);
incrementGauge(STREAM_WRITE_BLOCK_UPLOADS_DATA_PENDING, blockSize);
}
/** Queued block has been scheduled for upload. */
void blockUploadStarted(long duration, int blockSize) {
queueDuration.addAndGet(duration);
blocksInQueue.decrementAndGet();
blocksActive.incrementAndGet();
incrementGauge(STREAM_WRITE_BLOCK_UPLOADS_PENDING, -1);
incrementGauge(STREAM_WRITE_BLOCK_UPLOADS_ACTIVE, 1);
}
/** A block upload has completed. */
void blockUploadCompleted(long duration, int blockSize) {
this.transferDuration.addAndGet(duration);
incrementGauge(STREAM_WRITE_BLOCK_UPLOADS_ACTIVE, -1);
blocksActive.decrementAndGet();
blockUploadsCompleted.incrementAndGet();
}
/**
* A block upload has failed.
* A final transfer completed event is still expected, so this
* does not decrement the active block counter.
*/
void blockUploadFailed(long duration, int blockSize) {
blockUploadsFailed.incrementAndGet();
}
/** Intermediate report of bytes uploaded. */
void bytesTransferred(long byteCount) {
bytesUploaded.addAndGet(byteCount);
statistics.incrementBytesWritten(byteCount);
bytesPendingUpload.addAndGet(-byteCount);
incrementGauge(STREAM_WRITE_BLOCK_UPLOADS_DATA_PENDING, -byteCount);
}
/**
* Note an exception in a multipart complete.
*/
void exceptionInMultipartComplete() {
exceptionsInMultipartFinalize.incrementAndGet();
}
/**
* Note an exception in a multipart abort.
*/
void exceptionInMultipartAbort() {
exceptionsInMultipartFinalize.incrementAndGet();
}
/**
* Get the number of bytes pending upload.
* @return the number of bytes in the pending upload state.
*/
public long getBytesPendingUpload() {
return bytesPendingUpload.get();
}
/**
* Output stream has closed.
* Trigger merge in of all statistics not updated during operation.
*/
@Override
public void close() {
if (bytesPendingUpload.get() > 0) {
LOG.warn("Closing output stream statistics while data is still marked" +
" as pending upload in {}", this);
}
mergeOutputStreamStatistics(this);
}
long averageQueueTime() {
return blocksSubmitted.get() > 0 ?
(queueDuration.get() / blocksSubmitted.get()) : 0;
}
double effectiveBandwidth() {
double duration = totalUploadDuration() / 1000.0;
return duration > 0 ?
(bytesUploaded.get() / duration) : 0;
}
long totalUploadDuration() {
return queueDuration.get() + transferDuration.get();
}
public int blocksAllocated() {
return blocksAllocated.get();
}
public int blocksReleased() {
return blocksReleased.get();
}
/**
* Get counters of blocks actively allocated; my be inaccurate
* if the numbers change during the (non-synchronized) calculation.
* @return the number of actively allocated blocks.
*/
public int blocksActivelyAllocated() {
return blocksAllocated.get() - blocksReleased.get();
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder(
"OutputStreamStatistics{");
sb.append("blocksSubmitted=").append(blocksSubmitted);
sb.append(", blocksInQueue=").append(blocksInQueue);
sb.append(", blocksActive=").append(blocksActive);
sb.append(", blockUploadsCompleted=").append(blockUploadsCompleted);
sb.append(", blockUploadsFailed=").append(blockUploadsFailed);
sb.append(", bytesPendingUpload=").append(bytesPendingUpload);
sb.append(", bytesUploaded=").append(bytesUploaded);
sb.append(", blocksAllocated=").append(blocksAllocated);
sb.append(", blocksReleased=").append(blocksReleased);
sb.append(", blocksActivelyAllocated=").append(blocksActivelyAllocated());
sb.append(", exceptionsInMultipartFinalize=").append(
exceptionsInMultipartFinalize);
sb.append(", transferDuration=").append(transferDuration).append(" ms");
sb.append(", queueDuration=").append(queueDuration).append(" ms");
sb.append(", averageQueueTime=").append(averageQueueTime()).append(" ms");
sb.append(", totalUploadDuration=").append(totalUploadDuration())
.append(" ms");
sb.append(", effectiveBandwidth=").append(effectiveBandwidth())
.append(" bytes/s");
sb.append('}');
return sb.toString();
}
}
/**
* Instrumentation exported to S3Guard.
*/
public final class S3GuardInstrumentation {
/** Initialized event. */
public void initialized() {
incrementCounter(S3GUARD_METADATASTORE_INITIALIZATION, 1);
}
public void storeClosed() {
}
}
}