blob: 4be395c8358b2193bf6d2ca240923e6ca3ac014d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.executor
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.LongAccumulator
/**
* :: DeveloperApi ::
* A collection of accumulators that represent metrics about reading shuffle data.
* Operations are not thread-safe.
*/
@DeveloperApi
class ShuffleReadMetrics private[spark] () extends Serializable {
private[executor] val _remoteBlocksFetched = new LongAccumulator
private[executor] val _localBlocksFetched = new LongAccumulator
private[executor] val _remoteBytesRead = new LongAccumulator
private[executor] val _remoteBytesReadToDisk = new LongAccumulator
private[executor] val _localBytesRead = new LongAccumulator
private[executor] val _fetchWaitTime = new LongAccumulator
private[executor] val _recordsRead = new LongAccumulator
/**
* Number of remote blocks fetched in this shuffle by this task.
*/
def remoteBlocksFetched: Long = _remoteBlocksFetched.sum
/**
* Number of local blocks fetched in this shuffle by this task.
*/
def localBlocksFetched: Long = _localBlocksFetched.sum
/**
* Total number of remote bytes read from the shuffle by this task.
*/
def remoteBytesRead: Long = _remoteBytesRead.sum
/**
* Total number of remotes bytes read to disk from the shuffle by this task.
*/
def remoteBytesReadToDisk: Long = _remoteBytesReadToDisk.sum
/**
* Shuffle data that was read from the local disk (as opposed to from a remote executor).
*/
def localBytesRead: Long = _localBytesRead.sum
/**
* Time the task spent waiting for remote shuffle blocks. This only includes the time
* blocking on shuffle input data. For instance if block B is being fetched while the task is
* still not finished processing block A, it is not considered to be blocking on block B.
*/
def fetchWaitTime: Long = _fetchWaitTime.sum
/**
* Total number of records read from the shuffle by this task.
*/
def recordsRead: Long = _recordsRead.sum
/**
* Total bytes fetched in the shuffle by this task (both remote and local).
*/
def totalBytesRead: Long = remoteBytesRead + localBytesRead
/**
* Number of blocks fetched in this shuffle by this task (remote or local).
*/
def totalBlocksFetched: Long = remoteBlocksFetched + localBlocksFetched
private[spark] def incRemoteBlocksFetched(v: Long): Unit = _remoteBlocksFetched.add(v)
private[spark] def incLocalBlocksFetched(v: Long): Unit = _localBlocksFetched.add(v)
private[spark] def incRemoteBytesRead(v: Long): Unit = _remoteBytesRead.add(v)
private[spark] def incRemoteBytesReadToDisk(v: Long): Unit = _remoteBytesReadToDisk.add(v)
private[spark] def incLocalBytesRead(v: Long): Unit = _localBytesRead.add(v)
private[spark] def incFetchWaitTime(v: Long): Unit = _fetchWaitTime.add(v)
private[spark] def incRecordsRead(v: Long): Unit = _recordsRead.add(v)
private[spark] def setRemoteBlocksFetched(v: Int): Unit = _remoteBlocksFetched.setValue(v)
private[spark] def setLocalBlocksFetched(v: Int): Unit = _localBlocksFetched.setValue(v)
private[spark] def setRemoteBytesRead(v: Long): Unit = _remoteBytesRead.setValue(v)
private[spark] def setRemoteBytesReadToDisk(v: Long): Unit = _remoteBytesReadToDisk.setValue(v)
private[spark] def setLocalBytesRead(v: Long): Unit = _localBytesRead.setValue(v)
private[spark] def setFetchWaitTime(v: Long): Unit = _fetchWaitTime.setValue(v)
private[spark] def setRecordsRead(v: Long): Unit = _recordsRead.setValue(v)
/**
* Resets the value of the current metrics (`this`) and merges all the independent
* [[TempShuffleReadMetrics]] into `this`.
*/
private[spark] def setMergeValues(metrics: Seq[TempShuffleReadMetrics]): Unit = {
_remoteBlocksFetched.setValue(0)
_localBlocksFetched.setValue(0)
_remoteBytesRead.setValue(0)
_remoteBytesReadToDisk.setValue(0)
_localBytesRead.setValue(0)
_fetchWaitTime.setValue(0)
_recordsRead.setValue(0)
metrics.foreach { metric =>
_remoteBlocksFetched.add(metric.remoteBlocksFetched)
_localBlocksFetched.add(metric.localBlocksFetched)
_remoteBytesRead.add(metric.remoteBytesRead)
_remoteBytesReadToDisk.add(metric.remoteBytesReadToDisk)
_localBytesRead.add(metric.localBytesRead)
_fetchWaitTime.add(metric.fetchWaitTime)
_recordsRead.add(metric.recordsRead)
}
}
}
/**
* A temporary shuffle read metrics holder that is used to collect shuffle read metrics for each
* shuffle dependency, and all temporary metrics will be merged into the [[ShuffleReadMetrics]] at
* last.
*/
private[spark] class TempShuffleReadMetrics {
private[this] var _remoteBlocksFetched = 0L
private[this] var _localBlocksFetched = 0L
private[this] var _remoteBytesRead = 0L
private[this] var _remoteBytesReadToDisk = 0L
private[this] var _localBytesRead = 0L
private[this] var _fetchWaitTime = 0L
private[this] var _recordsRead = 0L
def incRemoteBlocksFetched(v: Long): Unit = _remoteBlocksFetched += v
def incLocalBlocksFetched(v: Long): Unit = _localBlocksFetched += v
def incRemoteBytesRead(v: Long): Unit = _remoteBytesRead += v
def incRemoteBytesReadToDisk(v: Long): Unit = _remoteBytesReadToDisk += v
def incLocalBytesRead(v: Long): Unit = _localBytesRead += v
def incFetchWaitTime(v: Long): Unit = _fetchWaitTime += v
def incRecordsRead(v: Long): Unit = _recordsRead += v
def remoteBlocksFetched: Long = _remoteBlocksFetched
def localBlocksFetched: Long = _localBlocksFetched
def remoteBytesRead: Long = _remoteBytesRead
def remoteBytesReadToDisk: Long = _remoteBytesReadToDisk
def localBytesRead: Long = _localBytesRead
def fetchWaitTime: Long = _fetchWaitTime
def recordsRead: Long = _recordsRead
}