blob: 0a7627446679e926fef4496b4cf1545dbe3a1430 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.s2graph.s2jobs
import org.apache.s2graph.s2jobs.udfs.Udf
import org.apache.spark.sql.SparkSession
import play.api.libs.json.{JsValue, Json}
import scala.io.Source
case class JobOption(
name:String = "S2BatchJob",
confType:String = "db",
jobId:Int = -1,
confFile:String = ""
)
object JobLauncher extends Logger {
def parseArguments(args: Array[String]): JobOption = {
val parser = new scopt.OptionParser[JobOption]("run") {
opt[String]('n', "name").required().action((x, c) =>
c.copy(name = x)).text("job display name")
cmd("file").action((_, c) => c.copy(confType = "file"))
.text("get config from file")
.children(
opt[String]('f', "confFile").required().valueName("<file>").action((x, c) =>
c.copy(confFile = x)).text("configuration file")
)
cmd("db").action((_, c) => c.copy(confType = "db"))
.text("get config from db")
.children(
opt[String]('i', "jobId").required().valueName("<jobId>").action((x, c) =>
c.copy(jobId = x.toInt)).text("configuration file")
)
}
parser.parse(args, JobOption()) match {
case Some(o) => o
case None =>
parser.showUsage()
throw new IllegalArgumentException(s"failed to parse options... (${args.mkString(",")}")
}
}
def getConfig(options: JobOption):JsValue = options.confType match {
case "file" =>
Json.parse(Source.fromFile(options.confFile).mkString)
case "db" =>
throw new IllegalArgumentException(s"'db' option that read config file from database is not supported yet.. ")
}
def main(args: Array[String]): Unit = {
val options = parseArguments(args)
logger.info(s"Job Options : ${options}")
val jobDescription = JobDescription(getConfig(options))
val ss = SparkSession
.builder()
.appName(s"${jobDescription.name}")
.config("spark.driver.maxResultSize", "20g")
.enableHiveSupport()
.getOrCreate()
// register udfs
jobDescription.udfs.foreach{ udfOption =>
val udf = Class.forName(udfOption.`class`).newInstance().asInstanceOf[Udf]
logger.info((s"[udf register] ${udfOption}"))
udf.register(ss, udfOption.name, udfOption.params.getOrElse(Map.empty))
}
val job = new Job(ss, jobDescription)
job.run()
}
}