blob: 38f4b7c56b2259b0880ac4f3e9fe52af997e03cd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.amaterasu.leader.execution
import java.util.concurrent.BlockingQueue
import org.apache.amaterasu.common.configuration.enums.ActionStatus
import org.apache.amaterasu.common.dataobjects.ActionData
import org.apache.amaterasu.common.logging.Logging
import org.apache.amaterasu.leader.execution.actions.Action
import org.apache.curator.framework.CuratorFramework
import scala.collection.concurrent.TrieMap
import scala.collection.mutable
/**
* The JobManager manages the lifecycle of a job. It queues new actions for execution,
* tracks the state of actions and is in charge of communication with the underlying
* cluster management framework (mesos)
*/
class JobManager extends Logging {
var name: String = _
var jobId: String = _
var client: CuratorFramework = _
var head: Action = _
// TODO: this is not private due to tests, fix this!!!
val registeredActions = new TrieMap[String, Action]
val frameworks = new TrieMap[String, mutable.HashSet[String]]
private var executionQueue: BlockingQueue[ActionData] = _
/**
* The start method initiates the job execution by executing the first action.
* start mast be called once and by the JobManager only
*/
def start(): Unit = {
head.execute()
}
def outOfActions: Boolean = !registeredActions.values.exists(a => a.data.status == ActionStatus.pending ||
a.data.status == ActionStatus.queued ||
a.data.status == ActionStatus.started)
/**
* getNextActionData returns the data of the next action to be executed if such action
* exists
*
* @return the ActionData of the next action, returns null if no such action exists
*/
def getNextActionData: ActionData = {
val nextAction: ActionData = executionQueue.poll()
if (nextAction != null) {
registeredActions(nextAction.id).announceStart
}
nextAction
}
def reQueueAction(actionId: String): Unit = {
val action = registeredActions(actionId)
executionQueue.put(action.data)
registeredActions(actionId).announceQueued
}
/**
* Registers an action with the job
*
* @param action
*/
def registerAction(action: Action): Unit = {
registeredActions.put(action.actionId, action)
}
/**
* announce the completion of an action and executes the next actions
*
* @param actionId
*/
def actionComplete(actionId: String): Unit = {
val action = registeredActions.get(actionId).get
action.announceComplete
action.data.nextActionIds.foreach(id =>
registeredActions.get(id).get.execute())
// we don't need the error action anymore
if (action.data.errorActionId != null)
registeredActions.get(action.data.errorActionId).get.announceCanceled
}
/**
* gets the next action id which can be either the same action or an error action
* and if it exist (we have an error action or a retry)
*
* @param actionId
*/
def actionFailed(actionId: String, message: String): Unit = {
log.warn(message)
val action = registeredActions.get(actionId).get
val id = action.handleFailure(message)
if (id != null)
registeredActions.get(id).get.execute()
//delete all future actions
cancelFutureActions(action)
}
def cancelFutureActions(action: Action): Unit = {
if (action.data.status != ActionStatus.failed)
action.announceCanceled
action.data.nextActionIds.foreach(id =>
cancelFutureActions(registeredActions.get(id).get))
}
/**
* announce the start of execution of the action
*/
def actionStarted(actionId: String): Unit = {
val action = registeredActions.get(actionId).get
action.announceStart
}
def actionsCount(): Int = {
executionQueue.size()
}
}
object JobManager {
/**
* The apply method starts the job execution once the job is created from the maki.yaml file
* it is in charge of creating the internal flow map, setting up ZooKeeper and executing
* the first action
* If the job execution is resumed (a job that was stooped) the init method will restore the
* state of the job from ZooKepper
*
* @param jobId
* @param name
* @param jobsQueue
* @param client
* @return
*/
def apply(
jobId: String,
name: String,
jobsQueue: BlockingQueue[ActionData],
client: CuratorFramework
): JobManager = {
val manager = new JobManager()
manager.name = name
manager.executionQueue = jobsQueue
manager.jobId = jobId
manager.client = client
manager
}
}