blob: b45379a2fb0566ccd9a9e301322bbd4d978a6ed0 [file] [log] [blame]
package org.apache.helix.task;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.Map;
import java.util.TimerTask;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.helix.HelixManager;
import org.apache.helix.NotificationContext;
import org.apache.helix.model.Message;
import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelInfo;
import org.apache.helix.participant.statemachine.Transition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@StateModelInfo(states = "{'NOT USED BY HELIX'}", initialState = "INIT")
public class TaskStateModel extends StateModel {
private static final Logger LOG = LoggerFactory.getLogger(TaskStateModel.class);
private final HelixManager _manager;
private final ScheduledExecutorService _taskExecutor;
private final Map<String, TaskFactory> _taskFactoryRegistry;
private ScheduledFuture timeout_task;
private TaskRunner _taskRunner;
private final ScheduledExecutorService _timeoutTaskExecutor;
public TaskStateModel(HelixManager manager, Map<String, TaskFactory> taskFactoryRegistry,
ScheduledExecutorService taskExecutor) {
this(manager, taskFactoryRegistry, taskExecutor, taskExecutor);
}
public TaskStateModel(HelixManager manager, Map<String, TaskFactory> taskFactoryRegistry,
ScheduledExecutorService taskExecutor, ScheduledExecutorService timerTaskExecutor) {
_manager = manager;
_taskFactoryRegistry = taskFactoryRegistry;
_taskExecutor = taskExecutor;
_timeoutTaskExecutor = timerTaskExecutor;
}
public boolean isShutdown() {
return _taskExecutor.isShutdown();
}
public boolean isTerminated() {
return _taskExecutor.isTerminated();
}
public void shutdown() {
reset();
}
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
return _taskExecutor.awaitTermination(timeout, unit);
}
@Transition(to = "RUNNING", from = "INIT")
public void onBecomeRunningFromInit(Message msg, NotificationContext context) {
startTask(msg, msg.getPartitionName());
}
@Transition(to = "STOPPED", from = "RUNNING")
public String onBecomeStoppedFromRunning(Message msg, NotificationContext context) {
String taskPartition = msg.getPartitionName();
if (_taskRunner == null) {
throw new IllegalStateException(String.format(
"Invalid state transition. There is no running task for partition %s.", taskPartition));
}
_taskRunner.cancel();
TaskResult r = _taskRunner.waitTillDone();
LOG.info(String.format("Task %s completed with result %s.", msg.getPartitionName(), r));
timeout_task.cancel(false);
return r.getInfo();
}
@Transition(to = "COMPLETED", from = "RUNNING")
public String onBecomeCompletedFromRunning(Message msg, NotificationContext context) {
String taskPartition = msg.getPartitionName();
if (_taskRunner == null) {
throw new IllegalStateException(String.format(
"Invalid state transition. There is no running task for partition %s.", taskPartition));
}
TaskResult r = _taskRunner.waitTillDone();
if (r.getStatus() != TaskResult.Status.COMPLETED) {
throw new IllegalStateException(String.format(
"Partition %s received a state transition to %s but the result status code is %s.",
msg.getPartitionName(), msg.getToState(), r.getStatus()));
}
timeout_task.cancel(false);
return r.getInfo();
}
@Transition(to = "TIMED_OUT", from = "RUNNING")
public String onBecomeTimedOutFromRunning(Message msg, NotificationContext context) {
String taskPartition = msg.getPartitionName();
if (_taskRunner == null) {
throw new IllegalStateException(String.format(
"Invalid state transition. There is no running task for partition %s.", taskPartition));
}
TaskResult r = _taskRunner.waitTillDone();
if (r.getStatus() != TaskResult.Status.CANCELED) {
throw new IllegalStateException(String.format(
"Partition %s received a state transition to %s but the result status code is %s.",
msg.getPartitionName(), msg.getToState(), r.getStatus()));
}
timeout_task.cancel(false);
return r.getInfo();
}
@Transition(to = "TASK_ERROR", from = "RUNNING")
public String onBecomeTaskErrorFromRunning(Message msg, NotificationContext context) {
String taskPartition = msg.getPartitionName();
if (_taskRunner == null) {
throw new IllegalStateException(String.format(
"Invalid state transition. There is no running task for partition %s.", taskPartition));
}
TaskResult r = _taskRunner.waitTillDone();
if (r.getStatus() != TaskResult.Status.ERROR && r.getStatus() != TaskResult.Status.FAILED) {
throw new IllegalStateException(String.format(
"Partition %s received a state transition to %s but the result status code is %s.",
msg.getPartitionName(), msg.getToState(), r.getStatus()));
}
timeout_task.cancel(false);
return r.getInfo();
}
@Transition(to = "TASK_ABORTED", from = "RUNNING")
public String onBecomeTaskAbortedFromRunning(Message msg, NotificationContext context) {
String taskPartition = msg.getPartitionName();
if (_taskRunner == null) {
throw new IllegalStateException(String.format(
"Invalid state transition. There is no running task for partition %s.", taskPartition));
}
_taskRunner.cancel();
TaskResult r = _taskRunner.waitTillDone();
if (r.getStatus() != TaskResult.Status.FATAL_FAILED && r.getStatus() != TaskResult.Status.CANCELED) {
throw new IllegalStateException(String.format(
"Partition %s received a state transition to %s but the result status code is %s.",
msg.getPartitionName(), msg.getToState(), r.getStatus()));
}
timeout_task.cancel(false);
return r.getInfo();
}
@Transition(to = "RUNNING", from = "STOPPED")
public void onBecomeRunningFromStopped(Message msg, NotificationContext context) {
startTask(msg, msg.getPartitionName());
}
@Transition(to = "DROPPED", from = "INIT")
public void onBecomeDroppedFromInit(Message msg, NotificationContext context) {
reset();
}
@Transition(to = "DROPPED", from = "RUNNING")
public void onBecomeDroppedFromRunning(Message msg, NotificationContext context) {
String taskPartition = msg.getPartitionName();
if (_taskRunner == null) {
if (timeout_task != null) {
timeout_task.cancel(true);
}
LOG.error(
"Participant {}'s thread for task partition {} not found while attempting to cancel the task; Manual cleanup may be required.",
_manager.getInstanceName(), taskPartition);
return;
}
_taskRunner.cancel();
TaskResult r = _taskRunner.waitTillDone();
LOG.info(String.format("Task partition %s returned result %s.", msg.getPartitionName(), r));
_taskRunner = null;
timeout_task.cancel(false);
}
@Transition(to = "DROPPED", from = "COMPLETED")
public void onBecomeDroppedFromCompleted(Message msg, NotificationContext context) {
reset();
}
@Transition(to = "DROPPED", from = "STOPPED")
public void onBecomeDroppedFromStopped(Message msg, NotificationContext context) {
reset();
}
@Transition(to = "DROPPED", from = "TIMED_OUT")
public void onBecomeDroppedFromTimedOut(Message msg, NotificationContext context) {
reset();
}
@Transition(to = "DROPPED", from = "TASK_ERROR")
public void onBecomeDroppedFromTaskError(Message msg, NotificationContext context) {
reset();
}
@Transition(to = "DROPPED", from = "TASK_ABORTED")
public void onBecomeDroppedFromTaskAborted(Message msg, NotificationContext context) {
reset();
}
@Transition(to = "INIT", from = "RUNNING")
public void onBecomeInitFromRunning(Message msg, NotificationContext context) {
String taskPartition = msg.getPartitionName();
if (_taskRunner == null) {
throw new IllegalStateException(String
.format("Invalid state transition. There is no running task for partition %s.", taskPartition));
}
_taskRunner.cancel();
TaskResult r = _taskRunner.waitTillDone();
LOG.info(String.format("Task partition %s returned result %s.", msg.getPartitionName(), r));
_taskRunner = null;
}
@Transition(to = "INIT", from = "COMPLETED")
public void onBecomeInitFromCompleted(Message msg, NotificationContext context) {
reset();
}
@Transition(to = "INIT", from = "STOPPED")
public void onBecomeInitFromStopped(Message msg, NotificationContext context) {
reset();
}
@Transition(to = "INIT", from = "TIMED_OUT")
public void onBecomeInitFromTimedOut(Message msg, NotificationContext context) {
reset();
}
@Transition(to = "INIT", from = "TASK_ERROR")
public void onBecomeInitFromTaskError(Message msg, NotificationContext context) {
reset();
}
@Transition(to = "INIT", from = "TASK_ABORTED")
public void onBecomeInitFromTaskAborted(Message msg, NotificationContext context) {
reset();
}
@Override
public void reset() {
if (_taskRunner != null) {
_taskRunner.cancel();
_taskRunner = null;
}
if (timeout_task != null) {
timeout_task.cancel(false);
timeout_task = null;
}
}
private void startTask(Message msg, String taskPartition) {
JobConfig cfg = TaskUtil.getJobConfig(_manager, msg.getResourceName());
TaskConfig taskConfig = null;
String command = cfg.getCommand();
// Get a task-specific command if specified
JobContext ctx = TaskUtil.getJobContext(_manager, msg.getResourceName());
int pId = Integer.parseInt(taskPartition.substring(taskPartition.lastIndexOf('_') + 1));
if (ctx.getTaskIdForPartition(pId) != null) {
taskConfig = cfg.getTaskConfig(ctx.getTaskIdForPartition(pId));
if (taskConfig != null) {
if (taskConfig.getCommand() != null) {
command = taskConfig.getCommand();
}
}
}
// Report a target if that was used to assign the partition
String target = ctx.getTargetForPartition(pId);
if (taskConfig == null && target != null) {
taskConfig = TaskConfig.Builder.from(target);
}
// Populate a task callback context
TaskCallbackContext callbackContext = new TaskCallbackContext();
callbackContext.setManager(_manager);
callbackContext.setJobConfig(cfg);
callbackContext.setTaskConfig(taskConfig);
// Create a task instance with this command
if (command == null || _taskFactoryRegistry == null
|| !_taskFactoryRegistry.containsKey(command)) {
throw new IllegalStateException("No callback implemented(or not registered) for task " + command);
}
TaskFactory taskFactory = _taskFactoryRegistry.get(command);
Task task = taskFactory.createNewTask(callbackContext);
if (task instanceof UserContentStore) {
((UserContentStore) task).init(_manager, cfg.getWorkflow(), msg.getResourceName(), taskPartition);
}
// Submit the task for execution
_taskRunner =
new TaskRunner(task, msg.getResourceName(), taskPartition, msg.getTgtName(), _manager,
msg.getTgtSessionId());
_taskExecutor.submit(_taskRunner);
_taskRunner.waitTillStarted();
// Set up a timer to cancel the task when its time out expires.
timeout_task = _timeoutTaskExecutor.schedule(new TimerTask() {
@Override
public void run() {
if (_taskRunner != null) {
_taskRunner.timeout();
}
}
}, cfg.getTimeoutPerTask(), TimeUnit.MILLISECONDS);
}
}