blob: 3092074232d18067191e61e0ca39c0a9a7ac8d64 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark
import java.io.Serializable
import scala.collection.generic.Growable
import scala.reflect.ClassTag
import org.apache.spark.scheduler.AccumulableInfo
import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.util.{AccumulatorContext, AccumulatorMetadata, LegacyAccumulatorWrapper}
/**
* A data type that can be accumulated, i.e. has a commutative and associative "add" operation,
* but where the result type, `R`, may be different from the element type being added, `T`.
*
* You must define how to add data, and how to merge two of these together. For some data types,
* such as a counter, these might be the same operation. In that case, you can use the simpler
* [[org.apache.spark.Accumulator]]. They won't always be the same, though -- e.g., imagine you are
* accumulating a set. You will add items to the set, and you will union two sets together.
*
* Operations are not thread-safe.
*
* @param id ID of this accumulator; for internal use only.
* @param initialValue initial value of accumulator
* @param param helper object defining how to add elements of type `R` and `T`
* @param name human-readable name for use in Spark's web UI
* @param countFailedValues whether to accumulate values from failed tasks. This is set to true
* for system and time metrics like serialization time or bytes spilled,
* and false for things with absolute values like number of input rows.
* This should be used for internal metrics only.
* @tparam R the full accumulated data (result type)
* @tparam T partial data that can be added in
*/
@deprecated("use AccumulatorV2", "2.0.0")
class Accumulable[R, T] private (
val id: Long,
// SI-8813: This must explicitly be a private val, or else scala 2.11 doesn't compile
@transient private val initialValue: R,
param: AccumulableParam[R, T],
val name: Option[String],
private[spark] val countFailedValues: Boolean)
extends Serializable {
private[spark] def this(
initialValue: R,
param: AccumulableParam[R, T],
name: Option[String],
countFailedValues: Boolean) = {
this(AccumulatorContext.newId(), initialValue, param, name, countFailedValues)
}
private[spark] def this(initialValue: R, param: AccumulableParam[R, T], name: Option[String]) = {
this(initialValue, param, name, false /* countFailedValues */)
}
def this(initialValue: R, param: AccumulableParam[R, T]) = this(initialValue, param, None)
val zero = param.zero(initialValue)
private[spark] val newAcc = new LegacyAccumulatorWrapper(initialValue, param)
newAcc.metadata = AccumulatorMetadata(id, name, countFailedValues)
// Register the new accumulator in ctor, to follow the previous behaviour.
AccumulatorContext.register(newAcc)
/**
* Add more data to this accumulator / accumulable
* @param term the data to add
*/
def += (term: T) { newAcc.add(term) }
/**
* Add more data to this accumulator / accumulable
* @param term the data to add
*/
def add(term: T) { newAcc.add(term) }
/**
* Merge two accumulable objects together
*
* Normally, a user will not want to use this version, but will instead call `+=`.
* @param term the other `R` that will get merged with this
*/
def ++= (term: R) { newAcc._value = param.addInPlace(newAcc._value, term) }
/**
* Merge two accumulable objects together
*
* Normally, a user will not want to use this version, but will instead call `add`.
* @param term the other `R` that will get merged with this
*/
def merge(term: R) { newAcc._value = param.addInPlace(newAcc._value, term) }
/**
* Access the accumulator's current value; only allowed on driver.
*/
def value: R = {
if (newAcc.isAtDriverSide) {
newAcc.value
} else {
throw new UnsupportedOperationException("Can't read accumulator value in task")
}
}
/**
* Get the current value of this accumulator from within a task.
*
* This is NOT the global value of the accumulator. To get the global value after a
* completed operation on the dataset, call `value`.
*
* The typical use of this method is to directly mutate the local value, eg., to add
* an element to a Set.
*/
def localValue: R = newAcc.value
/**
* Set the accumulator's value; only allowed on driver.
*/
def value_= (newValue: R) {
if (newAcc.isAtDriverSide) {
newAcc._value = newValue
} else {
throw new UnsupportedOperationException("Can't assign accumulator value in task")
}
}
/**
* Set the accumulator's value. For internal use only.
*/
def setValue(newValue: R): Unit = { newAcc._value = newValue }
/**
* Set the accumulator's value. For internal use only.
*/
private[spark] def setValueAny(newValue: Any): Unit = { setValue(newValue.asInstanceOf[R]) }
/**
* Create an [[AccumulableInfo]] representation of this [[Accumulable]] with the provided values.
*/
private[spark] def toInfo(update: Option[Any], value: Option[Any]): AccumulableInfo = {
val isInternal = name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX))
new AccumulableInfo(id, name, update, value, isInternal, countFailedValues)
}
override def toString: String = if (newAcc._value == null) "null" else newAcc._value.toString
}
/**
* Helper object defining how to accumulate values of a particular type. An implicit
* AccumulableParam needs to be available when you create [[Accumulable]]s of a specific type.
*
* @tparam R the full accumulated data (result type)
* @tparam T partial data that can be added in
*/
@deprecated("use AccumulatorV2", "2.0.0")
trait AccumulableParam[R, T] extends Serializable {
/**
* Add additional data to the accumulator value. Is allowed to modify and return `r`
* for efficiency (to avoid allocating objects).
*
* @param r the current value of the accumulator
* @param t the data to be added to the accumulator
* @return the new value of the accumulator
*/
def addAccumulator(r: R, t: T): R
/**
* Merge two accumulated values together. Is allowed to modify and return the first value
* for efficiency (to avoid allocating objects).
*
* @param r1 one set of accumulated data
* @param r2 another set of accumulated data
* @return both data sets merged together
*/
def addInPlace(r1: R, r2: R): R
/**
* Return the "zero" (identity) value for an accumulator type, given its initial value. For
* example, if R was a vector of N dimensions, this would return a vector of N zeroes.
*/
def zero(initialValue: R): R
}
@deprecated("use AccumulatorV2", "2.0.0")
private[spark] class
GrowableAccumulableParam[R : ClassTag, T]
(implicit rg: R => Growable[T] with TraversableOnce[T] with Serializable)
extends AccumulableParam[R, T] {
def addAccumulator(growable: R, elem: T): R = {
growable += elem
growable
}
def addInPlace(t1: R, t2: R): R = {
t1 ++= t2
t1
}
def zero(initialValue: R): R = {
// We need to clone initialValue, but it's hard to specify that R should also be Cloneable.
// Instead we'll serialize it to a buffer and load it back.
val ser = new JavaSerializer(new SparkConf(false)).newInstance()
val copy = ser.deserialize[R](ser.serialize(initialValue))
copy.clear() // In case it contained stuff
copy
}
}