blob: 44a0d2fd665280dc298826e3e68a09856bc01657 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.giraph.ooc;
import com.google.common.collect.Maps;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.conf.IntConfOption;
import org.apache.giraph.ooc.command.IOCommand.IOCommandType;
import org.apache.log4j.Logger;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
/**
* Class to collect statistics regarding IO operations done in out-of-core
* mechanism.
*/
public class OutOfCoreIOStatistics {
/**
* An estimate of disk bandwidth. This number is only used just at the start
* of the computation, and will be updated/replaced later on once a few disk
* operations happen.
*/
public static final IntConfOption DISK_BANDWIDTH_ESTIMATE =
new IntConfOption("giraph.diskBandwidthEstimate", 125,
"An estimate of disk bandwidth (MB/s). This number is used just at " +
"the beginning of the computation, and it will be " +
"updated/replaced once a few disk operations happen.");
/**
* How many recent IO operations should we keep track of? Any report given out
* of this statistics collector is only based on most recent IO operations.
*/
public static final IntConfOption IO_COMMAND_HISTORY_SIZE =
new IntConfOption("giraph.ioCommandHistorySize", 50,
"Number of most recent IO operations to consider for reporting the" +
"statistics.");
/** Class logger */
private static final Logger LOG =
Logger.getLogger(OutOfCoreIOStatistics.class);
/** Estimate of disk bandwidth (bytes/second) */
private final AtomicLong diskBandwidthEstimate;
/** Cached value for IO_COMMAND_HISTORY_SIZE */
private final int maxHistorySize;
/**
* Coefficient/Weight of the most recent IO operation toward the disk
* bandwidth estimate. Basically if the disk bandwidth estimate if d, and the
* latest IO command happened at the rate of r, the new estimate of disk
* bandwidth is calculated as:
* d_new = updateCoefficient * r + (1 - updateCoefficient) * d
*/
private final double updateCoefficient;
/** Queue of all recent commands */
private final Queue<StatisticsEntry> commandHistory;
/**
* Command statistics for each type of IO command. This is the statistics of
* the recent commands in the history we keep track of (with 'maxHistorySize'
* command in the history).
*/
private final Map<IOCommandType, StatisticsEntry> aggregateStats;
/** How many IO command completed? */
private int numUpdates = 0;
/**
* Constructor
*
* @param conf configuration
* @param numIOThreads number of disks/IO threads
*/
public OutOfCoreIOStatistics(ImmutableClassesGiraphConfiguration conf,
int numIOThreads) {
this.diskBandwidthEstimate =
new AtomicLong(DISK_BANDWIDTH_ESTIMATE.get(conf) *
(long) GiraphConstants.ONE_MB);
this.maxHistorySize = IO_COMMAND_HISTORY_SIZE.get(conf);
this.updateCoefficient = 1.0 / maxHistorySize;
// Adding more entry to the capacity of the queue to have a wiggle room
// if all IO threads are adding/removing entries from the queue
this.commandHistory =
new ArrayBlockingQueue<>(maxHistorySize + numIOThreads);
this.aggregateStats = Maps.newConcurrentMap();
for (IOCommandType type : IOCommandType.values()) {
aggregateStats.put(type, new StatisticsEntry(type, 0, 0, 0));
}
}
/**
* Update statistics with the last IO command that is executed.
*
* @param type type of the IO command that is executed
* @param bytesTransferred number of bytes transferred in the last IO command
* @param duration duration it took for the last IO command to complete
* (milliseconds)
*/
public void update(IOCommandType type, long bytesTransferred,
long duration) {
StatisticsEntry entry = aggregateStats.get(type);
synchronized (entry) {
entry.setOccurrence(entry.getOccurrence() + 1);
entry.setDuration(duration + entry.getDuration());
entry.setBytesTransferred(bytesTransferred + entry.getBytesTransferred());
}
commandHistory.offer(
new StatisticsEntry(type, bytesTransferred, duration, 0));
if (type != IOCommandType.WAIT) {
// If the current estimate is 'd', the new rate is 'r', and the size of
// history is 'n', we can simply model all the past command's rate as:
// d, d, d, ..., d, r
// where 'd' happens for 'n-1' times. Hence the new estimate of the
// bandwidth would be:
// d_new = (d * (n-1) + r) / n = (1-1/n)*d + 1/n*r
// where updateCoefficient = 1/n
diskBandwidthEstimate.set((long)
(updateCoefficient * (bytesTransferred / duration * 1000) +
(1 - updateCoefficient) * diskBandwidthEstimate.get()));
}
if (commandHistory.size() > maxHistorySize) {
StatisticsEntry removedEntry = commandHistory.poll();
entry = aggregateStats.get(removedEntry.getType());
synchronized (entry) {
entry.setOccurrence(entry.getOccurrence() - 1);
entry.setDuration(entry.getDuration() - removedEntry.getDuration());
entry.setBytesTransferred(
entry.getBytesTransferred() - removedEntry.getBytesTransferred());
}
}
numUpdates++;
// Outputting log every so many commands
if (numUpdates % 10 == 0) {
if (LOG.isInfoEnabled()) {
LOG.info(this);
}
}
}
@Override
public String toString() {
StringBuffer sb = new StringBuffer();
long waitTime = 0;
long loadTime = 0;
long bytesRead = 0;
long storeTime = 0;
long bytesWritten = 0;
for (Map.Entry<IOCommandType, StatisticsEntry> entry :
aggregateStats.entrySet()) {
synchronized (entry.getValue()) {
sb.append(entry.getKey() + ": " + entry.getValue() + ", ");
if (entry.getKey() == IOCommandType.WAIT) {
waitTime += entry.getValue().getDuration();
} else if (entry.getKey() == IOCommandType.LOAD_PARTITION) {
loadTime += entry.getValue().getDuration();
bytesRead += entry.getValue().getBytesTransferred();
} else {
storeTime += entry.getValue().getDuration();
bytesWritten += entry.getValue().getBytesTransferred();
}
}
}
sb.append(String.format("Average STORE: %.2f MB/s, ",
(double) bytesWritten / storeTime * 1000 / 1024 / 1024));
sb.append(String.format("DATA_INJECTION: %.2f MB/s, ",
(double) (bytesRead - bytesWritten) /
(waitTime + loadTime + storeTime) * 1000 / 1024 / 1024));
sb.append(String.format("DISK_BANDWIDTH: %.2f MB/s",
(double) diskBandwidthEstimate.get() / 1024 / 1024));
return sb.toString();
}
/**
* @return most recent estimate of the disk bandwidth
*/
public long getDiskBandwidth() {
return diskBandwidthEstimate.get();
}
/**
* Get aggregate statistics of a given command type in the command history
*
* @param type type of the command to get the statistics for
* @return aggregate statistics for the given command type
*/
public BytesDuration getCommandTypeStats(IOCommandType type) {
StatisticsEntry entry = aggregateStats.get(type);
synchronized (entry) {
return new BytesDuration(entry.getBytesTransferred(), entry.getDuration(),
entry.getOccurrence());
}
}
/**
* Helper class to return results of statistics collector for a certain
* command type
*/
public static class BytesDuration {
/** Number of bytes transferred in a few commands of the same type */
private long bytes;
/** Duration of it took to execute a few commands of the same type */
private long duration;
/** Number of commands executed of the same type */
private int occurrence;
/**
* Constructor
* @param bytes number of bytes transferred
* @param duration duration it took to execute commands
* @param occurrence number of commands
*/
BytesDuration(long bytes, long duration, int occurrence) {
this.bytes = bytes;
this.duration = duration;
this.occurrence = occurrence;
}
/**
* @return number of bytes transferred for the same command type
*/
public long getBytes() {
return bytes;
}
/**
* @return duration it took to execute a few commands of the same type
*/
public long getDuration() {
return duration;
}
/**
* @return number of commands that are executed of the same type
*/
public int getOccurrence() {
return occurrence;
}
}
/**
* Helper class to keep statistics for a certain command type
*/
private static class StatisticsEntry {
/** Type of the command */
private IOCommandType type;
/**
* Aggregate number of bytes transferred executing the particular command
* type in the history we keep
*/
private long bytesTransferred;
/**
* Aggregate duration it took executing the particular command type in the
* history we keep
*/
private long duration;
/**
* Number of occurrences of the particular command type in the history we
* keep
*/
private int occurrence;
/**
* Constructor
*
* @param type type of the command
* @param bytesTransferred aggregate number of bytes transferred
* @param duration aggregate execution time
* @param occurrence number of occurrences of the particular command type
*/
public StatisticsEntry(IOCommandType type, long bytesTransferred,
long duration, int occurrence) {
this.type = type;
this.bytesTransferred = bytesTransferred;
this.duration = duration;
this.occurrence = occurrence;
}
/**
* @return type of the command
*/
public IOCommandType getType() {
return type;
}
/**
* @return aggregate number of bytes transferred in the particular command
* type
*/
public long getBytesTransferred() {
return bytesTransferred;
}
/**
* Update the aggregate number of bytes transferred
*
* @param bytesTransferred aggregate number of bytes
*/
public void setBytesTransferred(long bytesTransferred) {
this.bytesTransferred = bytesTransferred;
}
/**
* @return aggregate duration it took to execute the particular command type
*/
public long getDuration() {
return duration;
}
/**
* Update the aggregate duration
*
* @param duration aggregate duration
*/
public void setDuration(long duration) {
this.duration = duration;
}
/**
* @return number of occurrences of the particular command type
*/
public int getOccurrence() {
return occurrence;
}
/**
* Update the number of occurrences of the particular command type
*
* @param occurrence number of occurrences
*/
public void setOccurrence(int occurrence) {
this.occurrence = occurrence;
}
@Override
public String toString() {
if (type == IOCommandType.WAIT) {
return String.format("%.2f sec", duration / 1000.0);
} else {
return String.format("%.2f MB/s",
(double) bytesTransferred / duration * 1000 / 1024 / 2014);
}
}
}
}