blob: 53ee69264a382943785e7c5035eb127d8acfe844 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gearpump.services
import scala.util.{Failure, Success, Try}
import akka.actor.{ActorRef, ActorSystem}
import akka.http.scaladsl.model.{FormData, Multipart}
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import akka.http.scaladsl.server.directives.ParameterDirectives.ParamMagnet
import akka.stream.Materializer
import upickle.default.{read, write}
import org.apache.gearpump.cluster.AppMasterToMaster.{AppMasterSummary, GeneralAppMasterSummary}
import org.apache.gearpump.cluster.ClientToMaster._
import org.apache.gearpump.cluster.ClusterConfig
import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMasterDataDetailRequest, AppMasterDataRequest}
import org.apache.gearpump.cluster.MasterToClient._
import org.apache.gearpump.jarstore.{JarStoreClient, FileDirective}
import org.apache.gearpump.services.AppMasterService.Status
// NOTE: This cannot be removed!!!
import org.apache.gearpump.services.util.UpickleUtil._
import org.apache.gearpump.streaming.AppMasterToMaster.StallingTasks
import org.apache.gearpump.streaming.appmaster.DagManager._
import org.apache.gearpump.streaming.appmaster.StreamAppMasterSummary
import org.apache.gearpump.streaming.executor.Executor.{ExecutorConfig, ExecutorSummary, GetExecutorSummary, QueryExecutorConfig}
import org.apache.gearpump.util.ActorUtil.{askActor, askAppMaster}
import FileDirective._
import org.apache.gearpump.util.{Constants, Util}
/**
* Management service for AppMaster
*/
class AppMasterService(val master: ActorRef,
val jarStoreClient: JarStoreClient, override val system: ActorSystem)
extends BasicService {
private val systemConfig = system.settings.config
private val concise = systemConfig.getBoolean(Constants.GEARPUMP_SERVICE_RENDER_CONFIG_CONCISE)
protected override def doRoute(implicit mat: Materializer) = pathPrefix("appmaster" / IntNumber) {
appId => {
path("dynamicdag") {
parameters(ParamMagnet("args")) { args: String =>
def replaceProcessor(dagOperation: DAGOperation): Route = {
onComplete(askAppMaster[DAGOperationResult](master, appId, dagOperation)) {
case Success(value) =>
complete(write(value))
case Failure(ex) =>
failWith(ex)
}
}
val msg = java.net.URLDecoder.decode(args, "UTF-8")
val dagOperation = read[DAGOperation](msg)
(post & entity(as[Multipart.FormData])) { _ =>
uploadFile { form =>
val jar = form.getFileInfo("jar").map(_.file)
if (jar.nonEmpty) {
dagOperation match {
case replace: ReplaceProcessor =>
val description = replace.newProcessorDescription.copy(jar =
Util.uploadJar(jar.get, jarStoreClient))
val dagOperationWithJar = replace.copy(newProcessorDescription = description)
replaceProcessor(dagOperationWithJar)
}
} else {
replaceProcessor(dagOperation)
}
}
} ~ (post & entity(as[FormData])) { _ =>
replaceProcessor(dagOperation)
}
}
} ~
path("stallingtasks") {
onComplete(askAppMaster[StallingTasks](master, appId, GetStallingTasks(appId))) {
case Success(value) =>
complete(write(value))
case Failure(ex) => failWith(ex)
}
} ~
path("errors") {
onComplete(askAppMaster[LastFailure](master, appId, GetLastFailure(appId))) {
case Success(value) =>
complete(write(value))
case Failure(ex) => failWith(ex)
}
} ~
path("restart") {
post {
onComplete(askActor[SubmitApplicationResult](master, RestartApplication(appId))) {
case Success(_) =>
complete(write(Status(true)))
case Failure(ex) =>
complete(write(Status(false, ex.getMessage)))
}
}
} ~
path("config") {
onComplete(askActor[AppMasterConfig](master, QueryAppMasterConfig(appId))) {
case Success(value: AppMasterConfig) =>
val config = Option(value.config).map(ClusterConfig.render(_, concise)).getOrElse("{}")
complete(config)
case Failure(ex) =>
failWith(ex)
}
} ~
pathPrefix("executor" / Segment) { executorIdString =>
path("config") {
val executorId = Integer.parseInt(executorIdString)
onComplete(askAppMaster[ExecutorConfig](master, appId, QueryExecutorConfig(executorId))) {
case Success(value) =>
val config = Option(value.config).map(ClusterConfig.render(_, concise))
.getOrElse("{}")
complete(config)
case Failure(ex) =>
failWith(ex)
}
} ~
pathEnd {
get {
val executorId = Integer.parseInt(executorIdString)
onComplete(askAppMaster[ExecutorSummary](master, appId,
GetExecutorSummary(executorId))) {
case Success(value) =>
complete(write(value))
case Failure(ex) =>
failWith(ex)
}
}
}
} ~
path("metrics" / RemainingPath) { path =>
parameterMap { optionMap =>
parameter("aggregator" ? "") { aggregator =>
parameter(ReadOption.Key ? ReadOption.ReadLatest) { readOption =>
val query = QueryHistoryMetrics(path.head.toString, readOption, aggregator, optionMap)
onComplete(askAppMaster[HistoryMetrics](master, appId, query)) {
case Success(value) =>
complete(write(value))
case Failure(ex) =>
failWith(ex)
}
}
}
}
} ~
pathEnd {
get {
parameter("detail" ? "false") { detail =>
val queryDetails = Try(detail.toBoolean).getOrElse(false)
val request = AppMasterDataDetailRequest(appId)
queryDetails match {
case true =>
onComplete(askAppMaster[AppMasterSummary](master, appId, request)) {
case Success(value) =>
value match {
case data: GeneralAppMasterSummary =>
complete(write(data))
case data: StreamAppMasterSummary =>
complete(write(data))
}
case Failure(ex) =>
failWith(ex)
}
case false =>
onComplete(askActor[AppMasterData](master, AppMasterDataRequest(appId))) {
case Success(value) =>
complete(write(value))
case Failure(ex) =>
failWith(ex)
}
}
}
}
} ~
pathEnd {
delete {
onComplete(askActor[ShutdownApplicationResult](master, ShutdownApplication(appId))) {
case Success(result) =>
val output = if (result.appId.isSuccess) {
Map("status" -> "success", "info" -> null)
} else {
Map("status" -> "fail", "info" -> result.appId.failed.get.toString)
}
complete(write(output))
case Failure(ex) =>
failWith(ex)
}
}
}
}
}
}
object AppMasterService {
case class Status(success: Boolean, reason: String = null)
}