blob: 99a0540d8dca26aec48740b18d0e4689c5fb2136 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.streampark.spark.core
import org.apache.streampark.common.conf.ConfigKeys._
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.annotation.meta.getter
/** <b><code>SparkBatch</code></b> <p/> Spark streaming handle entrance <p/> */
trait SparkStreaming extends Spark {
@(transient @getter)
protected lazy val context: StreamingContext = {
/** Construct StreamingContext */
def _context(): StreamingContext = {
val duration = sparkConf.get(KEY_SPARK_BATCH_DURATION).toInt
new StreamingContext(sparkSession.sparkContext, Seconds(duration))
}
checkpoint match {
case "" => _context()
case checkpointPath =>
val tmpContext =
StreamingContext.getOrCreate(checkpointPath, _context, createOnError = createOnError)
tmpContext.checkpoint(checkpointPath)
tmpContext
}
}
final override def start(): Unit = {
context.start()
context.awaitTermination()
}
/**
* The purpose of the config phase is to allow the developer to set more parameters (other than
* the agreed configuration file) by means of hooks. Such as, conf.set("spark.serializer",
* "org.apache.spark.serializer.KryoSerializer") conf.registerKryoClasses(Array(classOf[User],
* classOf[Order],...))
*/
override def config(sparkConf: SparkConf): Unit = {}
/**
* The ready phase is an entry point for the developer to do other actions after the parameters
* have been set, and is done after initialization and before the program starts.
*/
override def ready(): Unit = {}
/**
* The destroy phase, is the last phase before jvm exits after the program has finished running,
* and is generally used to wrap up the work.
*/
override def destroy(): Unit = {}
}