blob: efe229cf49d11593bf1a2cf9a7f6c4499fa87efa [file] [log] [blame]
package org.apache.spark.streaming
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.InputDStream
import scala.reflect.ClassTag
class TestInputDStream[T: ClassTag](ssc_ : StreamingContext, input: Seq[Seq[T]], numPartitions: Int)
extends InputDStream[T](ssc_) {
def start() {}
def stop() {}
def compute(validTime: Time): Option[RDD[T]] = {
logInfo("Computing RDD for time " + validTime)
val index = ((validTime - zeroTime) / slideDuration - 1).toInt
val selectedInput = if (index < input.size) input(index) else Seq[T]()
// lets us test cases where RDDs are not created
if (selectedInput == null)
return None
val rdd = ssc.sc.makeRDD(selectedInput, numPartitions)
logInfo("Created RDD " + rdd.id + " with " + selectedInput)
Some(rdd)
}
}