| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.gearpump.services |
| |
| import java.io.{File, IOException} |
| import java.nio.charset.StandardCharsets.UTF_8 |
| import java.nio.file.Files |
| import java.nio.file.StandardOpenOption.{APPEND, WRITE} |
| import scala.collection.JavaConverters._ |
| import scala.concurrent.Future |
| import scala.util.{Failure, Success} |
| |
| import akka.actor.{ActorRef, ActorSystem} |
| import akka.http.scaladsl.server.Directives._ |
| import akka.http.scaladsl.server.directives.ParameterDirectives.ParamMagnet |
| import akka.http.scaladsl.unmarshalling.Unmarshaller._ |
| import akka.stream.Materializer |
| import com.typesafe.config.Config |
| |
| import org.apache.gearpump.cluster.AppMasterToMaster.{GetAllWorkers, GetMasterData, GetWorkerData, MasterData, WorkerData} |
| import org.apache.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, QueryMasterConfig, ReadOption} |
| import org.apache.gearpump.cluster.MasterToAppMaster.{AppMastersData, AppMastersDataRequest, WorkerList} |
| import org.apache.gearpump.cluster.MasterToClient.{HistoryMetrics, MasterConfig, SubmitApplicationResultValue} |
| import org.apache.gearpump.cluster.client.ClientContext |
| import org.apache.gearpump.cluster.worker.WorkerSummary |
| import org.apache.gearpump.cluster.{ClusterConfig, UserConfig} |
| import org.apache.gearpump.jarstore.{JarStoreClient, FileDirective, JarStoreServer} |
| import org.apache.gearpump.streaming.partitioner.{PartitionerByClassName, PartitionerDescription} |
| import org.apache.gearpump.services.MasterService.{BuiltinPartitioners, SubmitApplicationRequest} |
| // NOTE: This cannot be removed!!! |
| import org.apache.gearpump.services.util.UpickleUtil._ |
| import org.apache.gearpump.streaming.{ProcessorDescription, ProcessorId, StreamApplication} |
| import org.apache.gearpump.util.ActorUtil._ |
| import FileDirective._ |
| import org.apache.gearpump.util.{Constants, Graph, Util} |
| |
| /** Manages service for master node */ |
| class MasterService(val master: ActorRef, |
| val jarStoreClient: JarStoreClient, override val system: ActorSystem) |
| extends BasicService { |
| |
| import upickle.default.{read, write} |
| |
| private val systemConfig = system.settings.config |
| private val concise = systemConfig.getBoolean(Constants.GEARPUMP_SERVICE_RENDER_CONFIG_CONCISE) |
| |
| protected override def doRoute(implicit mat: Materializer) = pathPrefix("master") { |
| pathEnd { |
| get { |
| onComplete(askActor[MasterData](master, GetMasterData)) { |
| case Success(value: MasterData) => complete(write(value)) |
| case Failure(ex) => failWith(ex) |
| } |
| } |
| } ~ |
| path("applist") { |
| onComplete(askActor[AppMastersData](master, AppMastersDataRequest)) { |
| case Success(value: AppMastersData) => |
| complete(write(value)) |
| case Failure(ex) => failWith(ex) |
| } |
| } ~ |
| path("workerlist") { |
| def future: Future[List[WorkerSummary]] = askActor[WorkerList](master, GetAllWorkers) |
| .flatMap { workerList => |
| val workers = workerList.workers |
| val workerDataList = List.empty[WorkerSummary] |
| |
| Future.fold(workers.map { workerId => |
| askWorker[WorkerData](master, workerId, GetWorkerData(workerId)) |
| })(workerDataList) { (workerDataList, workerData) => |
| workerDataList :+ workerData.workerDescription |
| } |
| } |
| onComplete(future) { |
| case Success(result: List[WorkerSummary]) => complete(write(result)) |
| case Failure(ex) => failWith(ex) |
| } |
| } ~ |
| path("config") { |
| onComplete(askActor[MasterConfig](master, QueryMasterConfig)) { |
| case Success(value: MasterConfig) => |
| val config = Option(value.config).map(ClusterConfig.render(_, concise)).getOrElse("{}") |
| complete(config) |
| case Failure(ex) => |
| failWith(ex) |
| } |
| } ~ |
| path("metrics" / RemainingPath) { path => |
| parameters(ParamMagnet(ReadOption.Key ? ReadOption.ReadLatest)) { readOption: String => |
| val query = QueryHistoryMetrics(path.head.toString, readOption) |
| onComplete(askActor[HistoryMetrics](master, query)) { |
| case Success(value) => |
| complete(write(value)) |
| case Failure(ex) => |
| failWith(ex) |
| } |
| } |
| } ~ |
| path("submitapp") { |
| post { |
| uploadFile { form => |
| val jar = form.getFileInfo("jar").map(_.file) |
| val configFile = form.getFileInfo("configfile").map(_.file) |
| val configString = form.getValue("configstring").getOrElse("") |
| val executorCount = form.getValue("executorcount").getOrElse("1").toInt |
| val args = form.getValue("args").getOrElse("") |
| |
| val mergedConfigFile = mergeConfig(configFile, configString) |
| |
| onComplete(Future( |
| MasterService.submitGearApp(jar, executorCount, args, systemConfig, mergedConfigFile) |
| )) { |
| case Success(success) => |
| val response = MasterService.AppSubmissionResult(success) |
| complete(write(response)) |
| case Failure(ex) => |
| failWith(ex) |
| } |
| } |
| } |
| } ~ |
| path("submitstormapp") { |
| post { |
| uploadFile { form => |
| val jar = form.getFileInfo("jar").map(_.file) |
| val configFile = form.getFileInfo("configfile").map(_.file) |
| val args = form.getValue("args").getOrElse("") |
| onComplete(Future( |
| MasterService.submitStormApp(jar, configFile, args, systemConfig) |
| )) { |
| case Success(success) => |
| val response = MasterService.AppSubmissionResult(success) |
| complete(write(response)) |
| case Failure(ex) => |
| failWith(ex) |
| } |
| } |
| } |
| } ~ |
| path("submitdag") { |
| post { |
| entity(as[String]) { request => |
| val msg = java.net.URLDecoder.decode(request, "UTF-8") |
| val submitApplicationRequest = read[SubmitApplicationRequest](msg) |
| import submitApplicationRequest.{appName, dag, processors, userConfig} |
| val context = ClientContext(system.settings.config, system, master) |
| |
| val graph = dag.mapVertex { processorId => |
| processors(processorId) |
| }.mapEdge { (node1, edge, node2) => |
| PartitionerDescription(new PartitionerByClassName(edge)) |
| } |
| |
| val effectiveConfig = if (userConfig == null) UserConfig.empty else userConfig |
| val appId = context.submit(new StreamApplication(appName, effectiveConfig, graph)).appId |
| |
| import upickle.default.write |
| val submitApplicationResultValue = SubmitApplicationResultValue(appId) |
| val jsonData = write(submitApplicationResultValue) |
| complete(jsonData) |
| } |
| } |
| } ~ |
| path("uploadjar") { |
| uploadFile { form => |
| val jar = form.getFileInfo("jar").map(_.file) |
| if (jar.isEmpty) { |
| complete(write( |
| MasterService.Status(success = false, reason = "Jar file not found"))) |
| } else { |
| val jarFile = Util.uploadJar(jar.get, jarStoreClient) |
| complete(write(jarFile)) |
| } |
| } |
| } ~ |
| path("partitioners") { |
| get { |
| complete(write(BuiltinPartitioners(org.apache.gearpump.streaming.Constants |
| .BUILTIN_PARTITIONERS.map(_.getName)))) |
| } |
| } |
| } |
| |
| private def mergeConfig(configFile: Option[File], configString: String): Option[File] = { |
| if (configString == null || configString.isEmpty) { |
| configFile |
| } else { |
| configFile match { |
| case Some(file) => |
| Files.write(file.toPath, ("\n" + configString).getBytes(UTF_8), APPEND) |
| Some(file) |
| case None => |
| val file = File.createTempFile("\"userfile_configstring_", ".conf") |
| Files.write(file.toPath, configString.getBytes(UTF_8), WRITE) |
| Some(file) |
| } |
| } |
| } |
| } |
| |
| object MasterService { |
| |
| case class BuiltinPartitioners(partitioners: Array[String]) |
| |
| case class AppSubmissionResult(success: Boolean) |
| |
| case class Status(success: Boolean, reason: String = null) |
| |
| /** |
| * Submits Native Application. |
| */ |
| def submitGearApp( |
| jar: Option[File], executorNum: Int, args: String, |
| systemConfig: Config, userConfigFile: Option[File]): Boolean = { |
| submitAndDeleteTempFiles( |
| "org.apache.gearpump.cluster.main.AppSubmitter", |
| argsArray = Array("-executors", executorNum.toString) ++ spaceSeparatedArgumentsToArray(args), |
| fileMap = Map("jar" -> jar).filter(_._2.isDefined).mapValues(_.get), |
| classPath = getUserApplicationClassPath, |
| systemConfig, |
| userConfigFile |
| ) |
| } |
| |
| /** |
| * Submits Storm application. |
| */ |
| def submitStormApp( |
| jar: Option[File], stormConf: Option[File], args: String, systemConfig: Config): Boolean = { |
| submitAndDeleteTempFiles( |
| "org.apache.gearpump.experiments.storm.main.GearpumpStormClient", |
| argsArray = spaceSeparatedArgumentsToArray(args), |
| fileMap = Map("jar" -> jar, "config" -> stormConf).filter(_._2.isDefined).mapValues(_.get), |
| classPath = getStormApplicationClassPath, |
| systemConfig, |
| userConfigFile = None |
| ) |
| } |
| |
| private def submitAndDeleteTempFiles( |
| mainClass: String, argsArray: Array[String], fileMap: Map[String, File], |
| classPath: Array[String], systemConfig: Config, |
| userConfigFile: Option[File] = None): Boolean = { |
| try { |
| val jar = fileMap.get("jar") |
| if (jar.isEmpty) { |
| throw new IOException("JAR file not supplied") |
| } |
| |
| val process = Util.startProcess( |
| clusterOptions(systemConfig, userConfigFile), |
| classPath, |
| mainClass, |
| arguments = createFilePathArgArray(fileMap) ++ argsArray |
| ) |
| |
| val retval = process.exitValue() |
| if (retval != 0) { |
| throw new IOException(s"Process exit abnormally with exit code $retval.\n" + |
| s"Error message: ${process.logger.error}") |
| } |
| true |
| } finally { |
| fileMap.values.foreach(_.delete) |
| if (userConfigFile.isDefined) { |
| userConfigFile.get.delete() |
| } |
| } |
| } |
| |
| /** |
| * Returns Java options for gearpump cluster |
| */ |
| private def clusterOptions(systemConfig: Config, userConfigFile: Option[File]): Array[String] = { |
| var options = Array( |
| s"-D${Constants.GEARPUMP_HOME}=${systemConfig.getString(Constants.GEARPUMP_HOME)}", |
| s"-D${Constants.GEARPUMP_HOSTNAME}=${systemConfig.getString(Constants.GEARPUMP_HOSTNAME)}", |
| s"-D${Constants.PREFER_IPV4}=true" |
| ) |
| |
| val masters = systemConfig.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS).asScala |
| .toList.flatMap(Util.parseHostList) |
| options ++= masters.zipWithIndex.map { case (master, index) => |
| s"-D${Constants.GEARPUMP_CLUSTER_MASTERS}.$index=${master.host}:${master.port}" |
| }.toArray[String] |
| |
| if (userConfigFile.isDefined) { |
| options :+= s"-D${Constants.GEARPUMP_CUSTOM_CONFIG_FILE}=${userConfigFile.get.getPath}" |
| } |
| options |
| } |
| |
| /** |
| * Filter all defined file paths and store their config key and path into an array. |
| */ |
| private def createFilePathArgArray(fileMap: Map[String, File]): Array[String] = { |
| var args = Array.empty[String] |
| fileMap.foreach({ case (key, path) => |
| args ++= Array(s"-$key", path.getPath) |
| }) |
| args |
| } |
| |
| /** |
| * Returns a space separated arguments as an array. |
| */ |
| private def spaceSeparatedArgumentsToArray(str: String): Array[String] = { |
| str.split(" +").filter(_.nonEmpty) |
| } |
| |
| private val homeDir = System.getProperty(Constants.GEARPUMP_HOME) + "/" |
| private val libHomeDir = homeDir + "lib/" |
| |
| private def getUserApplicationClassPath: Array[String] = { |
| Array( |
| homeDir + "conf", |
| libHomeDir + "*" |
| ) |
| } |
| |
| private def getStormApplicationClassPath: Array[String] = { |
| getUserApplicationClassPath ++ Array( |
| libHomeDir + "storm/*" |
| ) |
| } |
| |
| case class SubmitApplicationRequest( |
| appName: String, |
| processors: Map[ProcessorId, ProcessorDescription], |
| dag: Graph[Int, String], |
| userConfig: UserConfig) |
| } |