blob: 88ac17307da583022ed307c6c6511ac9cea6cb1f [file] [log] [blame]
package org.apache.carbon.flink
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext}
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction
abstract class TestSource(val dataCount: Int) extends SourceFunction[String] with CheckpointedFunction {
private var dataIndex = 0
private var dataIndexState: ListState[Integer] = _
private var running = false
@throws[Exception]
def get(index: Int): String
@throws[Exception]
def onFinish(): Unit = {
// to do nothing.
}
@throws[Exception]
override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = {
this.running = true
while ( {
this.running && this.dataIndex < this.dataCount
}) {
sourceContext.collectWithTimestamp(this.get(this.dataIndex), System.currentTimeMillis)
this.dataIndex += 1
}
this.onFinish()
}
override def cancel(): Unit = {
this.running = false
}
@throws[Exception]
override def snapshotState(context: FunctionSnapshotContext): Unit = {
this.dataIndexState.clear()
this.dataIndexState.add(this.dataIndex)
}
@throws[Exception]
override def initializeState(context: FunctionInitializationContext): Unit = {
this.dataIndexState = context.getOperatorStateStore.getListState(new ListStateDescriptor[Integer]("dataIndex", classOf[Integer]))
if (!context.isRestored) return
import scala.collection.JavaConversions._
for (dataIndex <- this.dataIndexState.get) {
this.dataIndex = dataIndex
}
}
}