blob: b004620c0af8db070ef55dbc724085b545b385be [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.streampark.spark.core
import org.apache.streampark.common.conf.ConfigKeys._
import org.apache.streampark.common.util.{DeflaterUtils, Logger, PropertiesUtils}
import org.apache.streampark.spark.core.util.{ParameterTool, SqlCommandParser}
import org.apache.commons.lang3.StringUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
import java.util.concurrent.locks.ReentrantReadWriteLock
import scala.annotation.meta.getter
import scala.collection.mutable.ArrayBuffer
import scala.util.{Failure, Success, Try}
/** <b><code>Spark</code></b> <p/> Spark Basic Traits <p/> */
trait Spark extends Logger {
@(transient @getter)
final protected lazy val sparkConf: SparkConf = new SparkConf()
@(transient @getter)
final private[this] val sparkListeners = new ArrayBuffer[String]()
@(transient @getter)
final protected var sparkSession: SparkSession = _
// Directory of checkpoint
final protected var checkpoint: String = ""
// If recovery from checkpoint fails, recreate
final protected var createOnError: Boolean = true
private[this] val lock = new ReentrantReadWriteLock().writeLock
/** Entrance */
def main(args: Array[String]): Unit = {
init(args)
config(sparkConf)
// 1) system.properties
val sysProps = sparkConf.getAllWithPrefix("spark.config.system.properties")
if (sysProps != null) {
sysProps.foreach(
x => {
System.getProperties.setProperty(x._1.drop(1), x._2)
})
}
val builder = SparkSession.builder().config(sparkConf)
val enableHive = sparkConf.getBoolean("spark.config.enable.hive.support", defaultValue = false)
if (enableHive) {
builder.enableHiveSupport()
}
sparkSession = builder.getOrCreate()
// 2) hive
val sparkSql = sparkConf.getAllWithPrefix("spark.config.spark.sql")
if (sparkSql != null) {
sparkSql.foreach(
x => {
sparkSession.sparkContext.getConf.set(x._1.drop(1), x._2)
})
}
ready()
val parameterTool = ParameterTool.fromArgs(args)
val sparkSqls = {
val sql = parameterTool.get(KEY_FLINK_SQL())
require(StringUtils.isNotBlank(sql), "Usage: spark sql cannot be null")
Try(DeflaterUtils.unzipString(sql)) match {
case Success(value) => value
case Failure(_) =>
throw new IllegalArgumentException("Usage: spark sql is invalid or null, please check")
}
}
SqlCommandParser
.parseSQL(sparkSqls)
.foreach(
x => {
val args = if (x.operands.isEmpty) null else x.operands.head
val command = x.command.name
x.command match {
case _ =>
try {
lock.lock()
val dataFrame: DataFrame = handle(x.originSql)
logInfo(s"$command:$args")
} finally {
if (lock.isHeldByCurrentThread) {
lock.unlock()
}
}
}
})
// handle(sparkSqls)
start()
destroy()
}
/** Initialize sparkConf according to user parameters */
final private def init(args: Array[String]): Unit = {
var argv = args.toList
var conf: String = null
val userArgs = ArrayBuffer[(String, String)]()
while (argv.nonEmpty) {
argv match {
case "--conf" :: value :: tail =>
conf = value
argv = tail
case "--checkpoint" :: value :: tail =>
checkpoint = value
argv = tail
case "--createOnError" :: value :: tail =>
createOnError = value.toBoolean
argv = tail
case Nil =>
case other :: value :: tail if other.startsWith(PARAM_PREFIX) =>
userArgs += other.drop(2) -> value
argv = tail
case tail =>
logError(s"Unrecognized options: ${tail.mkString(" ")}")
printUsageAndExit()
}
}
if (conf != null) {
val localConf = conf.split("\\.").last match {
case "conf" => PropertiesUtils.fromHoconFile(conf)
case "properties" => PropertiesUtils.fromPropertiesFile(conf)
case "yaml" | "yml" => PropertiesUtils.fromYamlFile(conf)
case _ =>
throw new IllegalArgumentException(
"[StreamPark] Usage: config file error,must be [properties|yaml|conf]")
}
sparkConf.setAll(localConf)
}
sparkConf.setAll(userArgs)
val appMain = sparkConf.get(KEY_SPARK_MAIN_CLASS, "org.apache.streampark.spark.cli.SqlClient")
if (appMain == null) {
logError(s"[StreamPark] parameter: $KEY_SPARK_MAIN_CLASS must not be empty!")
System.exit(1)
}
val appName = sparkConf.get(KEY_SPARK_APP_NAME, null) match {
case null | "" => appMain
case name => name
}
// debug mode
val localMode = sparkConf.get("spark.master", null) == "local"
if (localMode) {
sparkConf.setAppName(s"[LocalDebug] $appName").setMaster("local[*]")
sparkConf.set("spark.streaming.kafka.maxRatePerPartition", "10")
}
// stop...
sparkConf.set("spark.streaming.stopGracefullyOnShutdown", "true")
val extraListeners =
sparkListeners.mkString(",") + "," + sparkConf.get("spark.extraListeners", "")
if (extraListeners != "") {
sparkConf.set("spark.extraListeners", extraListeners)
}
}
/**
* The purpose of the config phase is to allow the developer to set more parameters (other than
* the agreed configuration file) by means of hooks. Such as, conf.set("spark.serializer",
* "org.apache.spark.serializer.KryoSerializer") conf.registerKryoClasses(Array(classOf[User],
* classOf[Order],...))
*/
def config(sparkConf: SparkConf): Unit = {}
/**
* The ready phase is an entry point for the developer to do other actions after the parameters
* have been set, and is done after initialization and before the program starts.
*/
def ready(): Unit = {}
/**
* The handle phase is the entry point to the code written by the developer and is the most
* important phase.
*/
def handle(sql: String = null): DataFrame = sparkSession.sql(sql)
/** The start phase starts the task, which is executed automatically by the framework. */
def start(): Unit = {}
/**
* The destroy phase, is the last phase before jvm exits after the program has finished running,
* and is generally used to wrap up the work.
*/
def destroy(): Unit
/** printUsageAndExit */
private[this] def printUsageAndExit(): Unit = {
logError(
"""
|"Usage: Streaming [options]
|
| Options are:
| --checkpoint <checkpoint dir>
| --createOnError <Failed to recover from checkpoint, whether to recreated, true or false>
|""".stripMargin)
System.exit(1)
}
}