| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.spark |
| |
| import java.io.Serializable |
| |
| import scala.collection.generic.Growable |
| import scala.reflect.ClassTag |
| |
| import org.apache.spark.scheduler.AccumulableInfo |
| import org.apache.spark.serializer.JavaSerializer |
| import org.apache.spark.util.{AccumulatorContext, AccumulatorMetadata, LegacyAccumulatorWrapper} |
| |
| |
| /** |
| * A data type that can be accumulated, i.e. has a commutative and associative "add" operation, |
| * but where the result type, `R`, may be different from the element type being added, `T`. |
| * |
| * You must define how to add data, and how to merge two of these together. For some data types, |
| * such as a counter, these might be the same operation. In that case, you can use the simpler |
| * [[org.apache.spark.Accumulator]]. They won't always be the same, though -- e.g., imagine you are |
| * accumulating a set. You will add items to the set, and you will union two sets together. |
| * |
| * Operations are not thread-safe. |
| * |
| * @param id ID of this accumulator; for internal use only. |
| * @param initialValue initial value of accumulator |
| * @param param helper object defining how to add elements of type `R` and `T` |
| * @param name human-readable name for use in Spark's web UI |
| * @param countFailedValues whether to accumulate values from failed tasks. This is set to true |
| * for system and time metrics like serialization time or bytes spilled, |
| * and false for things with absolute values like number of input rows. |
| * This should be used for internal metrics only. |
| * @tparam R the full accumulated data (result type) |
| * @tparam T partial data that can be added in |
| */ |
| @deprecated("use AccumulatorV2", "2.0.0") |
| class Accumulable[R, T] private ( |
| val id: Long, |
| // SI-8813: This must explicitly be a private val, or else scala 2.11 doesn't compile |
| @transient private val initialValue: R, |
| param: AccumulableParam[R, T], |
| val name: Option[String], |
| private[spark] val countFailedValues: Boolean) |
| extends Serializable { |
| |
| private[spark] def this( |
| initialValue: R, |
| param: AccumulableParam[R, T], |
| name: Option[String], |
| countFailedValues: Boolean) = { |
| this(AccumulatorContext.newId(), initialValue, param, name, countFailedValues) |
| } |
| |
| private[spark] def this(initialValue: R, param: AccumulableParam[R, T], name: Option[String]) = { |
| this(initialValue, param, name, false /* countFailedValues */) |
| } |
| |
| def this(initialValue: R, param: AccumulableParam[R, T]) = this(initialValue, param, None) |
| |
| val zero = param.zero(initialValue) |
| private[spark] val newAcc = new LegacyAccumulatorWrapper(initialValue, param) |
| newAcc.metadata = AccumulatorMetadata(id, name, countFailedValues) |
| // Register the new accumulator in ctor, to follow the previous behaviour. |
| AccumulatorContext.register(newAcc) |
| |
| /** |
| * Add more data to this accumulator / accumulable |
| * @param term the data to add |
| */ |
| def += (term: T) { newAcc.add(term) } |
| |
| /** |
| * Add more data to this accumulator / accumulable |
| * @param term the data to add |
| */ |
| def add(term: T) { newAcc.add(term) } |
| |
| /** |
| * Merge two accumulable objects together |
| * |
| * Normally, a user will not want to use this version, but will instead call `+=`. |
| * @param term the other `R` that will get merged with this |
| */ |
| def ++= (term: R) { newAcc._value = param.addInPlace(newAcc._value, term) } |
| |
| /** |
| * Merge two accumulable objects together |
| * |
| * Normally, a user will not want to use this version, but will instead call `add`. |
| * @param term the other `R` that will get merged with this |
| */ |
| def merge(term: R) { newAcc._value = param.addInPlace(newAcc._value, term) } |
| |
| /** |
| * Access the accumulator's current value; only allowed on driver. |
| */ |
| def value: R = { |
| if (newAcc.isAtDriverSide) { |
| newAcc.value |
| } else { |
| throw new UnsupportedOperationException("Can't read accumulator value in task") |
| } |
| } |
| |
| /** |
| * Get the current value of this accumulator from within a task. |
| * |
| * This is NOT the global value of the accumulator. To get the global value after a |
| * completed operation on the dataset, call `value`. |
| * |
| * The typical use of this method is to directly mutate the local value, eg., to add |
| * an element to a Set. |
| */ |
| def localValue: R = newAcc.value |
| |
| /** |
| * Set the accumulator's value; only allowed on driver. |
| */ |
| def value_= (newValue: R) { |
| if (newAcc.isAtDriverSide) { |
| newAcc._value = newValue |
| } else { |
| throw new UnsupportedOperationException("Can't assign accumulator value in task") |
| } |
| } |
| |
| /** |
| * Set the accumulator's value. For internal use only. |
| */ |
| def setValue(newValue: R): Unit = { newAcc._value = newValue } |
| |
| /** |
| * Set the accumulator's value. For internal use only. |
| */ |
| private[spark] def setValueAny(newValue: Any): Unit = { setValue(newValue.asInstanceOf[R]) } |
| |
| /** |
| * Create an [[AccumulableInfo]] representation of this [[Accumulable]] with the provided values. |
| */ |
| private[spark] def toInfo(update: Option[Any], value: Option[Any]): AccumulableInfo = { |
| val isInternal = name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX)) |
| new AccumulableInfo(id, name, update, value, isInternal, countFailedValues) |
| } |
| |
| override def toString: String = if (newAcc._value == null) "null" else newAcc._value.toString |
| } |
| |
| |
| /** |
| * Helper object defining how to accumulate values of a particular type. An implicit |
| * AccumulableParam needs to be available when you create [[Accumulable]]s of a specific type. |
| * |
| * @tparam R the full accumulated data (result type) |
| * @tparam T partial data that can be added in |
| */ |
| @deprecated("use AccumulatorV2", "2.0.0") |
| trait AccumulableParam[R, T] extends Serializable { |
| /** |
| * Add additional data to the accumulator value. Is allowed to modify and return `r` |
| * for efficiency (to avoid allocating objects). |
| * |
| * @param r the current value of the accumulator |
| * @param t the data to be added to the accumulator |
| * @return the new value of the accumulator |
| */ |
| def addAccumulator(r: R, t: T): R |
| |
| /** |
| * Merge two accumulated values together. Is allowed to modify and return the first value |
| * for efficiency (to avoid allocating objects). |
| * |
| * @param r1 one set of accumulated data |
| * @param r2 another set of accumulated data |
| * @return both data sets merged together |
| */ |
| def addInPlace(r1: R, r2: R): R |
| |
| /** |
| * Return the "zero" (identity) value for an accumulator type, given its initial value. For |
| * example, if R was a vector of N dimensions, this would return a vector of N zeroes. |
| */ |
| def zero(initialValue: R): R |
| } |
| |
| |
| @deprecated("use AccumulatorV2", "2.0.0") |
| private[spark] class |
| GrowableAccumulableParam[R : ClassTag, T] |
| (implicit rg: R => Growable[T] with TraversableOnce[T] with Serializable) |
| extends AccumulableParam[R, T] { |
| |
| def addAccumulator(growable: R, elem: T): R = { |
| growable += elem |
| growable |
| } |
| |
| def addInPlace(t1: R, t2: R): R = { |
| t1 ++= t2 |
| t1 |
| } |
| |
| def zero(initialValue: R): R = { |
| // We need to clone initialValue, but it's hard to specify that R should also be Cloneable. |
| // Instead we'll serialize it to a buffer and load it back. |
| val ser = new JavaSerializer(new SparkConf(false)).newInstance() |
| val copy = ser.deserialize[R](ser.serialize(initialValue)) |
| copy.clear() // In case it contained stuff |
| copy |
| } |
| } |