blob: 758e962511e3f4eedefd6a755697dbc03fbd2fd3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nemo.compiler.frontend.spark.core.rdd
import java.util
import java.util.UUID
import org.apache.nemo.client.JobLauncher
import org.apache.nemo.common.dag.{DAG, DAGBuilder}
import org.apache.nemo.common.ir.edge.IREdge
import org.apache.nemo.common.ir.edge.executionproperty._
import org.apache.nemo.common.ir.executionproperty.EdgeExecutionProperty
import org.apache.nemo.common.ir.vertex.executionproperty.IgnoreSchedulingTempDataReceiverProperty
import org.apache.nemo.common.ir.vertex.{IRVertex, LoopVertex, OperatorVertex}
import org.apache.nemo.common.test.EmptyComponents.EmptyTransform
import org.apache.nemo.compiler.frontend.spark.{SparkBroadcastVariables, SparkKeyExtractor}
import org.apache.nemo.compiler.frontend.spark.coder.{SparkDecoderFactory, SparkEncoderFactory}
import org.apache.nemo.compiler.frontend.spark.core.SparkFrontendUtils
import org.apache.nemo.compiler.frontend.spark.transform._
import org.apache.hadoop.io.WritableFactory
import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.nemo.common.ir.IRDAG
import org.apache.spark.api.java.function.{FlatMapFunction, Function, Function2}
import org.apache.spark.partial.{BoundedDouble, PartialResult}
import org.apache.spark.rdd.{AsyncRDDActions, DoubleRDDFunctions, OrderedRDDFunctions, PartitionCoalescer, SequenceFileRDDFunctions}
import org.apache.spark.serializer.Serializer
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{Dependency, Partition, Partitioner, SparkContext, SparkEnv, TaskContext}
import org.slf4j.LoggerFactory
import scala.collection.JavaConverters._
import scala.language.implicitConversions
import scala.reflect.ClassTag
/**
* RDD for Nemo.
*/
final class RDD[T: ClassTag] protected[rdd] (
protected[rdd] val _sc: SparkContext,
private val deps: Seq[Dependency[_]],
protected[rdd] var dag: DAG[IRVertex, IREdge],
protected[rdd] val lastVertex: IRVertex,
private val sourceRDD: Option[org.apache.spark.rdd.RDD[T]]) extends org.apache.spark.rdd.RDD[T](_sc, deps) {
private val LOG = LoggerFactory.getLogger(classOf[RDD[T]].getName)
protected[rdd] val serializer: Serializer = SparkFrontendUtils.deriveSerializerFrom(_sc)
private val loopVertexStack = new util.Stack[LoopVertex]
private val encoderProperty: EdgeExecutionProperty[_ <: Serializable] =
EncoderProperty.of(new SparkEncoderFactory[T](serializer)).asInstanceOf[EdgeExecutionProperty[_ <: Serializable]]
private val decoderProperty: EdgeExecutionProperty[_ <: Serializable] =
DecoderProperty.of(new SparkDecoderFactory[T](serializer)).asInstanceOf[EdgeExecutionProperty[_ <: Serializable]]
private val keyExtractorProperty: KeyExtractorProperty = KeyExtractorProperty.of(new SparkKeyExtractor)
private var persistedMarkerVertex: Option[IRVertex] = Option.empty
/**
* Constructor without dependencies (not needed in Nemo RDD).
*
* @param sparkContext the spark context.
*/
protected[rdd] def this(sparkContext: SparkContext,
dagFrom: DAG[IRVertex, IREdge],
lastVertexFrom: IRVertex,
sourceRDDFrom: Option[org.apache.spark.rdd.RDD[T]]) = {
this(sparkContext, Nil, dagFrom, lastVertexFrom, sourceRDDFrom)
}
/**
* @return converted JavaRDD.
*/
override def toJavaRDD() : JavaRDD[T] = {
new JavaRDD[T](this)
}
/**
* Not supported yet.
*/
override def getPartitions: Array[Partition] = {
throw new UnsupportedOperationException("Operation unsupported.")
}
/**
* Not supported yet.
*/
override def compute(split: Partition, context: TaskContext): Iterator[T] = {
throw new UnsupportedOperationException("Operation unsupported.")
}
/////////////// WRAPPER FUNCTIONS /////////////
/**
* A scala wrapper for map transformation.
*/
override def map[U](f: (T) => U)(implicit evidence$3: ClassManifest[U]): RDD[U] = {
val javaFunc = SparkFrontendUtils.toJavaFunction(f)
map(javaFunc)
}
/**
* A scala wrapper for flatMap transformation.
*/
override def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = {
val javaFunc = SparkFrontendUtils.toJavaFlatMapFunction(f)
flatMap(javaFunc)
}
/**
* A scala wrapper for reduce action.
*/
override def reduce(f: (T, T) => T): T = {
val javaFunc = SparkFrontendUtils.toJavaFunction(f)
reduce(javaFunc)
}
/**
* A scala wrapper for collect action.
*
* @return the collected value.
* @note This method should only be used if the resulting array is expected to be small, as
* all the data is loaded into the driver's memory.
*/
override def collect(): Array[T] =
collectAsList().asScala.toArray
/////////////// TRANSFORMATIONS ///////////////
/**
* Return a new RDD by applying a function to all elements of this RDD.
*/
protected[rdd] def map[U: ClassTag](javaFunc: Function[T, U]): RDD[U] = {
val builder: DAGBuilder[IRVertex, IREdge] = new DAGBuilder[IRVertex, IREdge](dag)
val mapVertex: IRVertex = new OperatorVertex(new MapTransform[T, U](javaFunc))
builder.addVertex(mapVertex, loopVertexStack)
val newEdge: IREdge = new IREdge(SparkFrontendUtils.getEdgeCommunicationPattern(lastVertex, mapVertex),
lastVertex, mapVertex)
newEdge.setProperty(encoderProperty)
newEdge.setProperty(decoderProperty)
newEdge.setProperty(keyExtractorProperty)
builder.connectVertices(newEdge)
new RDD[U](_sc, builder.buildWithoutSourceSinkCheck, mapVertex, Option.empty)
}
/**
* Return a new RDD by first applying a function to all elements of this
* RDD, and then flattening the results.
*/
protected[rdd] def flatMap[U: ClassTag](javaFunc: FlatMapFunction[T, U]): RDD[U] = {
val builder = new DAGBuilder[IRVertex, IREdge](dag)
val flatMapVertex = new OperatorVertex(new FlatMapTransform[T, U](javaFunc))
builder.addVertex(flatMapVertex, loopVertexStack)
val newEdge = new IREdge(SparkFrontendUtils.getEdgeCommunicationPattern(lastVertex, flatMapVertex),
lastVertex, flatMapVertex)
newEdge.setProperty(encoderProperty)
newEdge.setProperty(decoderProperty)
newEdge.setProperty(keyExtractorProperty)
builder.connectVertices(newEdge)
new RDD[U](_sc, builder.buildWithoutSourceSinkCheck, flatMapVertex, Option.empty)
}
/////////////// ACTIONS ///////////////
/**
* Return a list that contains all of the elements in this RDD.
*
* @note This method should only be used if the resulting array is expected to be small, as
* all the data is loaded into the driver's memory.
*/
protected[rdd] def collectAsList(): util.List[T] =
SparkFrontendUtils.collect(dag, loopVertexStack, lastVertex, serializer)
/**
* Reduces the elements of this RDD using the specified commutative and
* associative binary operator.
*/
protected[rdd] def reduce(javaFunc: Function2[T, T, T]): T = {
val builder = new DAGBuilder[IRVertex, IREdge](dag)
val reduceVertex = new OperatorVertex(new ReduceTransform[T](javaFunc))
builder.addVertex(reduceVertex, loopVertexStack)
val newEdge = new IREdge(SparkFrontendUtils.getEdgeCommunicationPattern(lastVertex, reduceVertex),
lastVertex, reduceVertex)
newEdge.setProperty(encoderProperty)
newEdge.setProperty(decoderProperty)
newEdge.setProperty(keyExtractorProperty)
builder.connectVertices(newEdge)
ReduceTransform.reduceIterator(
SparkFrontendUtils.collect(dag, loopVertexStack, lastVertex, serializer).iterator(), javaFunc)
}
/**
* Save this RDD as a text file, using string representations of elements.
*/
override def saveAsTextFile(path: String): Unit = {
// Check if given path is HDFS path.
val isHDFSPath = path.startsWith("hdfs://") || path.startsWith("s3a://") || path.startsWith("file://")
val textFileTransform =
if (isHDFSPath) new HDFSTextFileTransform[T](path)
else new LocalTextFileTransform[T](path)
val builder = new DAGBuilder[IRVertex, IREdge](dag)
val flatMapVertex = new OperatorVertex(textFileTransform)
builder.addVertex(flatMapVertex, loopVertexStack)
val newEdge = new IREdge(SparkFrontendUtils.getEdgeCommunicationPattern(lastVertex, flatMapVertex),
lastVertex, flatMapVertex)
newEdge.setProperty(encoderProperty)
newEdge.setProperty(decoderProperty)
newEdge.setProperty(keyExtractorProperty)
builder.connectVertices(newEdge)
JobLauncher.launchDAG(new IRDAG(builder.build), SparkBroadcastVariables.getAll, "")
}
/////////////// CACHING ///////////////
/**
* Set this RDD's storage level to persist its values across operations after the first time
* it is computed. This can only be used to assign a new storage level if the RDD does not
* have a storage level set yet. Local checkpointing is an exception.
*/
override def persist(newLevel: StorageLevel): RDD.this.type = {
var actualUseDisk = false
var actualUseMemory = false
var actualDeserialized = false
if (!newLevel.isValid) {
throw new RuntimeException("Non-valid StorageLevel: " + newLevel.toString())
}
// Modify un-available options to an available option
if (newLevel.useDisk && newLevel.useMemory) {
actualUseDisk = true
actualUseMemory = false
// TODO #187: Implement disk and memory persistence (Spill)
LOG.warn("Cannot persist data in disk and memory at the same time. The data will be persisted in disk only.")
} else {
actualUseDisk = newLevel.useDisk
actualUseMemory = newLevel.useMemory
}
if (newLevel.useOffHeap) {
// TODO #188: Implement off-heap memory persistence
LOG.warn("Cannot persist data using off-heap area. The data will be persisted in heap instead of off-heap.")
}
if (newLevel.deserialized && actualUseDisk) {
LOG.warn(
"Cannot persist data as deserialized form in disk. The data will be persisted in serialized form instead.")
actualDeserialized = false
} else {
actualDeserialized = newLevel.deserialized
}
if (newLevel.replication > 1) {
// TODO #189: Implement replication for persisted data
LOG.warn("Cannot persist data with replication. The data will not be replicated.")
}
// TODO #190: Disable changing persistence strategy after a RDD is calculated
val builder = new DAGBuilder[IRVertex, IREdge](dag)
if (persistedMarkerVertex.isDefined) {
builder.removeVertex(persistedMarkerVertex.get)
}
val cacheID = UUID.randomUUID()
val ghostVertex = new OperatorVertex(new EmptyTransform[T, T]("CacheMarkerTransform-" + cacheID.toString))
ghostVertex.setProperty(IgnoreSchedulingTempDataReceiverProperty.of())
builder.addVertex(ghostVertex, loopVertexStack)
val newEdge = new IREdge(CommunicationPatternProperty.Value.ONE_TO_ONE, lastVertex, ghostVertex)
// Setup default properties
newEdge.setProperty(encoderProperty)
newEdge.setProperty(decoderProperty)
newEdge.setProperty(keyExtractorProperty)
// Setup cache-related properties
if (actualUseDisk) {
newEdge.setProperty(DataStoreProperty.of(DataStoreProperty.Value.LOCAL_FILE_STORE))
} else if (actualDeserialized) {
newEdge.setProperty(DataStoreProperty.of(DataStoreProperty.Value.MEMORY_STORE))
} else {
newEdge.setProperty(DataStoreProperty.of(DataStoreProperty.Value.SERIALIZED_MEMORY_STORE))
}
newEdge.setProperty(DataPersistenceProperty.of(DataPersistenceProperty.Value.KEEP))
newEdge.setProperty(CacheIDProperty.of(cacheID))
val dupEdgeVal = new DuplicateEdgeGroupPropertyValue("CacheGroup-" + cacheID)
dupEdgeVal.setRepresentativeEdgeId(newEdge.getId)
newEdge.setProperty(DuplicateEdgeGroupProperty.of(dupEdgeVal))
builder.connectVertices(newEdge)
dag = builder.buildWithoutSourceSinkCheck()
persistedMarkerVertex = Option.apply(ghostVertex)
this
}
/**
* Persist this RDD with the default storage level (`MEMORY_ONLY`).
*/
override def persist(): RDD.this.type = persist(StorageLevel.MEMORY_ONLY)
/**
* Persist this RDD with the default storage level (`MEMORY_ONLY`).
*/
override def cache(): RDD.this.type = persist()
/////////////// UNSUPPORTED METHODS ///////////////
//TODO#92: Implement the unimplemented transformations/actions & dataset initialization methods for Spark frontend.
override protected def getDependencies: Seq[Dependency[_]] =
throw new UnsupportedOperationException("Operation not yet implemented.")
override protected def getPreferredLocations(split: Partition): Seq[String] =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def sparkContext: SparkContext = super.sparkContext
override def setName(_name: String): RDD.this.type =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def unpersist(blocking: Boolean): RDD.this.type =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def getStorageLevel: StorageLevel =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def filter(f: (T) => Boolean): RDD[T] =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def distinct(numPartitions: Int)(implicit ord: Ordering[T]): RDD[T] =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def distinct(): RDD[T] =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def repartition(numPartitions: Int)(implicit ord: Ordering[T]): RDD[T] =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def coalesce(numPartitions: Int, shuffle: Boolean, partitionCoalescer: Option[PartitionCoalescer])
(implicit ord: Ordering[T]): RDD[T] =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def sample(withReplacement: Boolean, fraction: Double, seed: Long): RDD[T] =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def randomSplit(weights: Array[Double], seed: Long): Array[org.apache.spark.rdd.RDD[T]] =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def takeSample(withReplacement: Boolean, num: Int, seed: Long): Array[T] =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def union(other: org.apache.spark.rdd.RDD[T]): RDD[T] =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def ++(other: org.apache.spark.rdd.RDD[T]): RDD[T] =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def sortBy[K](f: (T) => K, ascending: Boolean, numPartitions: Int)
(implicit ord: Ordering[K], ctag: ClassManifest[K]): RDD[T] =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def intersection(other: org.apache.spark.rdd.RDD[T]): RDD[T] =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def intersection(other: org.apache.spark.rdd.RDD[T], partitioner: Partitioner)
(implicit ord: Ordering[T]): RDD[T] =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def intersection(other: org.apache.spark.rdd.RDD[T], numPartitions: Int): RDD[T] =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def glom(): RDD[Array[T]] =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def cartesian[U](other: org.apache.spark.rdd.RDD[U])(implicit evidence$5: ClassManifest[U]): RDD[(T, U)] =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def groupBy[K](f: (T) => K)(implicit kt: ClassManifest[K]): RDD[(K, Iterable[T])] =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def groupBy[K](f: (T) => K, numPartitions: Int)(implicit kt: ClassManifest[K]): RDD[(K, Iterable[T])] =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def groupBy[K](f: (T) => K, p: Partitioner)
(implicit kt: ClassManifest[K], ord: Ordering[K]): RDD[(K, Iterable[T])] =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def pipe(command: String): RDD[String] =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def pipe(command: String, env: scala.collection.Map[String, String]): RDD[String] =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def pipe(command: Seq[String], env: scala.collection.Map[String, String],
printPipeContext: ((String) => Unit) => Unit,
printRDDElement: (T, (String) => Unit) => Unit, separateWorkingDir: Boolean,
bufferSize: Int, encoding: String): RDD[String] =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def mapPartitions[U](f: (Iterator[T]) => Iterator[U], preservesPartitioning: Boolean)
(implicit evidence$6: ClassManifest[U]): RDD[U] =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def mapPartitionsWithIndex[U](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean)
(implicit evidence$9: ClassManifest[U]): RDD[U] =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def zip[U](other: org.apache.spark.rdd.RDD[U])(implicit evidence$10: ClassManifest[U]): RDD[(T, U)] =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def zipPartitions[B, V](rdd2: org.apache.spark.rdd.RDD[B], preservesPartitioning: Boolean)
(f: (Iterator[T], Iterator[B]) => Iterator[V])
(implicit evidence$11: ClassManifest[B], evidence$12: ClassManifest[V]): RDD[V] =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def zipPartitions[B, V](rdd2: org.apache.spark.rdd.RDD[B])(f: (Iterator[T], Iterator[B]) => Iterator[V])
(implicit evidence$13: ClassManifest[B], evidence$14: ClassManifest[V]): RDD[V] =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def zipPartitions[B, C, V](rdd2: org.apache.spark.rdd.RDD[B],
rdd3: org.apache.spark.rdd.RDD[C], preservesPartitioning: Boolean)
(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V])
(implicit evidence$15: ClassManifest[B], evidence$16: ClassManifest[C],
evidence$17: ClassManifest[V]): RDD[V] =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def zipPartitions[B, C, V](rdd2: org.apache.spark.rdd.RDD[B],
rdd3: org.apache.spark.rdd.RDD[C])
(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V])
(implicit evidence$18: ClassManifest[B], evidence$19: ClassManifest[C],
evidence$20: ClassManifest[V]): RDD[V] =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def zipPartitions[B, C, D, V](rdd2: org.apache.spark.rdd.RDD[B],
rdd3: org.apache.spark.rdd.RDD[C],
rdd4: org.apache.spark.rdd.RDD[D], preservesPartitioning: Boolean)
(f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V])
(implicit evidence$21: ClassManifest[B], evidence$22: ClassManifest[C],
evidence$23: ClassManifest[D], evidence$24: ClassManifest[V]): RDD[V] =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def zipPartitions[B, C, D, V](rdd2: org.apache.spark.rdd.RDD[B],
rdd3: org.apache.spark.rdd.RDD[C],
rdd4: org.apache.spark.rdd.RDD[D])
(f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V])
(implicit evidence$25: ClassManifest[B], evidence$26: ClassManifest[C],
evidence$27: ClassManifest[D], evidence$28: ClassManifest[V]): RDD[V] =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def foreach(f: (T) => Unit): Unit =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def foreachPartition(f: (Iterator[T]) => Unit): Unit =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def toLocalIterator: Iterator[T] =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def collect[U](f: PartialFunction[T, U])(implicit evidence$29: ClassManifest[U]): RDD[U] =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def subtract(other: org.apache.spark.rdd.RDD[T]): RDD[T] =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def subtract(other: org.apache.spark.rdd.RDD[T], numPartitions: Int): RDD[T] =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def subtract(other: org.apache.spark.rdd.RDD[T], p: Partitioner)(implicit ord: Ordering[T]): RDD[T] =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def treeReduce(f: (T, T) => T, depth: Int): T =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def fold(zeroValue: T)(op: (T, T) => T): T =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def aggregate[U](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U)
(implicit evidence$30: ClassManifest[U]): U =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def treeAggregate[U](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U, depth: Int)
(implicit evidence$31: ClassManifest[U]): U =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def count(): Long =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def countApprox(timeout: Long, confidence: Double): PartialResult[BoundedDouble] =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def countByValue()(implicit ord: Ordering[T]): Map[T, Long] =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def countByValueApprox(timeout: Long, confidence: Double)
(implicit ord: Ordering[T]): PartialResult[scala.collection.Map[T, BoundedDouble]] =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def countApproxDistinct(p: Int, sp: Int): Long =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def countApproxDistinct(relativeSD: Double): Long =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def zipWithIndex(): RDD[(T, Long)] =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def zipWithUniqueId(): RDD[(T, Long)] =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def take(num: Int): Array[T] =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def first(): T =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def top(num: Int)(implicit ord: Ordering[T]): Array[T] =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def max()(implicit ord: Ordering[T]): T =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def min()(implicit ord: Ordering[T]): T =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def isEmpty(): Boolean =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def saveAsObjectFile(path: String): Unit =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def keyBy[K](f: (T) => K): RDD[(K, T)] =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def checkpoint(): Unit =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def localCheckpoint(): RDD.this.type =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def isCheckpointed: Boolean =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def getCheckpointFile: Option[String] =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def firstParent[U](implicit evidence$32: ClassManifest[U]): RDD[U] =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def parent[U](j: Int)(implicit evidence$33: ClassManifest[U]): RDD[U] =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def context: SparkContext =
throw new UnsupportedOperationException("Operation not yet implemented.")
override protected def clearDependencies(): Unit =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def toDebugString: String =
throw new UnsupportedOperationException("Operation not yet implemented.")
override def toString(): String =
throw new UnsupportedOperationException("Operation not yet implemented.")
}
/**
* Defines implicit functions that provide extra functionalities on RDDs of specific types.
*
* For example, [[RDD.rddToPairRDDFunctions]] converts an RDD into a [[PairRDDFunctions]] for
* key-value-pair RDDs, and enabling extra functionalities such as `PairRDDFunctions.reduceByKey`.
*/
object RDD {
implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V] = {
new PairRDDFunctions(rdd)
}
implicit def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]): AsyncRDDActions[T] = {
throw new UnsupportedOperationException("Operation unsupported.")
}
implicit def rddToSequenceFileRDDFunctions[K, V](rdd: RDD[(K, V)])
(implicit kt: ClassTag[K], vt: ClassTag[V],
keyWritableFactory: WritableFactory,
valueWritableFactory: WritableFactory): SequenceFileRDDFunctions[K, V] = {
throw new UnsupportedOperationException("Operation unsupported.")
}
implicit def rddToOrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag](rdd: RDD[(K, V)])
: OrderedRDDFunctions[K, V, (K, V)] = {
throw new UnsupportedOperationException("Operation unsupported.")
}
implicit def doubleRDDToDoubleRDDFunctions(rdd: RDD[Double]): DoubleRDDFunctions = {
throw new UnsupportedOperationException("Operation unsupported.")
}
implicit def numericRDDToDoubleRDDFunctions[T](rdd: RDD[T])(implicit num: Numeric[T])
: DoubleRDDFunctions = {
throw new UnsupportedOperationException("Operation unsupported.")
}
}