blob: 26b98dceb3f487e69adf875632bd04cacd8a190f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gearpump.services.main
import java.util.Random
import scala.collection.JavaConverters._
import scala.concurrent.Await
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Route
import akka.stream.ActorMaterializer
import com.typesafe.config.ConfigValueFactory
import org.slf4j.Logger
import sun.misc.BASE64Encoder
import org.apache.gearpump.cluster.ClusterConfig
import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, Gear}
import org.apache.gearpump.cluster.master.MasterProxy
import org.apache.gearpump.services.{RestServices, SecurityService}
import org.apache.gearpump.util.LogUtil.ProcessType
import org.apache.gearpump.util.{AkkaApp, Constants, LogUtil, Util}
/** Command line to start UI server */
object Services extends AkkaApp with ArgumentsParser {
private val LOG = LogUtil.getLogger(getClass)
override val options: Array[(String, CLIOption[Any])] = Array(
"master" -> CLIOption("<host:port>", required = false),
Gear.OPTION_CONFIG -> CLIOption("<provide a custom configuration file>", required = false),
"supervisor" -> CLIOption("<Supervisor Actor Path>", required = false, Some("")))
override val description = "UI Server"
override def akkaConfig: Config = {
ClusterConfig.ui()
}
override def help(): Unit = {
// scalastyle:off println
Console.println("UI Server")
// scalastyle:on println
}
private var killFunction: Option[() => Unit] = None
override def main(inputAkkaConf: Config, args: Array[String]): Unit = {
val argConfig = parse(args)
var akkaConf =
if (argConfig.exists(Gear.OPTION_CONFIG)) {
ClusterConfig.ui(argConfig.getString(Gear.OPTION_CONFIG))
} else {
inputAkkaConf
}
val LOG: Logger = {
LogUtil.loadConfiguration(akkaConf, ProcessType.UI)
LogUtil.getLogger(getClass)
}
if (argConfig.exists("master")) {
val master = argConfig.getString("master")
akkaConf = akkaConf.withValue(Constants.GEARPUMP_CLUSTER_MASTERS,
ConfigValueFactory.fromIterable(List(master).asJava))
}
akkaConf = akkaConf.withValue(Constants.GEARPUMP_SERVICE_SUPERVISOR_PATH,
ConfigValueFactory.fromAnyRef(argConfig.getString("supervisor")))
// Creates a random unique secret key for session manager.
// All previous stored session token cookies will be invalidated when UI
// server is restarted.
.withValue(SecurityService.SESSION_MANAGER_KEY,
ConfigValueFactory.fromAnyRef(randomSeverSecret()))
val masterCluster = akkaConf.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS).asScala
.flatMap(Util.parseHostList)
implicit val system = ActorSystem("services", akkaConf)
implicit val executionContext = system.dispatcher
import scala.concurrent.duration._
val master = system.actorOf(MasterProxy.props(masterCluster, 1.day),
s"masterproxy${system.name}")
val (host, port) = parseHostPort(system.settings.config)
implicit val mat = ActorMaterializer()
val services = new RestServices(master, mat, system)
val bindFuture = Http().bindAndHandle(Route.handlerFlow(services.route), host, port)
Await.result(bindFuture, 15.seconds)
val displayHost = if (host == "0.0.0.0") "127.0.0.1" else host
LOG.info(s"Please browse to http://$displayHost:$port to see the web UI")
// scalastyle:off println
println(s"Please browse to http://$displayHost:$port to see the web UI")
// scalastyle:on println
killFunction = Some { () =>
LOG.info("Shutting down UI Server")
system.terminate()
}
Await.result(system.whenTerminated, Duration.Inf)
}
private def randomSeverSecret(): String = {
val random = new Random()
val length = 64 // Required
val bytes = new Array[Byte](length)
random.nextBytes(bytes)
val encoder = new BASE64Encoder()
encoder.encode(bytes)
}
private def parseHostPort(config: Config): (String, Int) = {
val port = config.getInt(Constants.GEARPUMP_SERVICE_HTTP)
val host = config.getString(Constants.GEARPUMP_SERVICE_HOST)
(host, port)
}
// TODO: fix this
// Hacks around for YARN module, so that we can kill the UI server
// when application is shutting down.
def kill(): Unit = {
if (killFunction.isDefined) {
killFunction.get.apply()
}
}
}