| package org.apache.spark.streaming |
| |
| import org.apache.spark.rdd.RDD |
| import org.apache.spark.streaming.dstream.InputDStream |
| |
| import scala.reflect.ClassTag |
| |
| class TestInputDStream[T: ClassTag](ssc_ : StreamingContext, input: Seq[Seq[T]], numPartitions: Int) |
| extends InputDStream[T](ssc_) { |
| |
| def start() {} |
| |
| def stop() {} |
| |
| def compute(validTime: Time): Option[RDD[T]] = { |
| logInfo("Computing RDD for time " + validTime) |
| val index = ((validTime - zeroTime) / slideDuration - 1).toInt |
| val selectedInput = if (index < input.size) input(index) else Seq[T]() |
| |
| // lets us test cases where RDDs are not created |
| if (selectedInput == null) |
| return None |
| |
| val rdd = ssc.sc.makeRDD(selectedInput, numPartitions) |
| logInfo("Created RDD " + rdd.id + " with " + selectedInput) |
| Some(rdd) |
| } |
| } |