blob: eb993229b3085489a7abbbd3dd60ca149a3c315a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oodt.cas.workflow.engine.runner;
//JDK imports
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;
//OODT imports
import org.apache.oodt.cas.workflow.engine.processor.TaskProcessor;
import org.apache.oodt.cas.workflow.instrepo.WorkflowInstanceRepository;
import org.apache.oodt.cas.workflow.lifecycle.WorkflowLifecycle;
import org.apache.oodt.cas.workflow.lifecycle.WorkflowState;
import org.apache.oodt.cas.workflow.structs.WorkflowTask;
import org.apache.oodt.cas.workflow.structs.WorkflowTaskInstance;
import org.apache.oodt.cas.workflow.util.GenericWorkflowObjectFactory;
/**
* Runs a local version of a {@link TaskProcessor} asynchronously.
*
* @author mattmann (Chris Mattmann)
* @author bfoster (Brian Foster)
*/
public class AsynchronousLocalEngineRunner extends AbstractEngineRunnerBase {
private static final Logger LOG = Logger
.getLogger(AsynchronousLocalEngineRunner.class.getName());
public static final int DEFAULT_NUM_THREADS = 25;
private final ExecutorService executor;
private final Map<String, Thread> workerMap;
public AsynchronousLocalEngineRunner() {
this(DEFAULT_NUM_THREADS);
}
public AsynchronousLocalEngineRunner(int numThreads) {
super();
this.executor = Executors.newFixedThreadPool(numThreads);
this.workerMap = new HashMap<String, Thread>();
}
/*
* (non-Javadoc)
*
* @see
* org.apache.oodt.cas.workflow.engine.runner.EngineRunner#execute(org.apache
* .oodt.cas.workflow.engine.processor.TaskProcessor)
*/
@Override
public void execute(final TaskProcessor taskProcessor) throws Exception {
Thread worker = new Thread() {
@Override
public void run() {
WorkflowLifecycle lifecycle = getLifecycle(taskProcessor);
WorkflowTask workflowTask = getTaskFromProcessor(taskProcessor);
WorkflowTaskInstance inst = GenericWorkflowObjectFactory
.getTaskObjectFromClassName(workflowTask.getTaskInstanceClassName());
try {
inst.run(taskProcessor.getWorkflowInstance().getSharedContext(),
workflowTask.getTaskConfig());
String msg = "Task: [" + workflowTask.getTaskName()
+ "] for instance id: ["
+ taskProcessor.getWorkflowInstance().getId()
+ "] completed successfully";
LOG.log(Level.INFO, msg);
WorkflowState state = lifecycle.createState("ExecutionComplete", "transition", msg);
taskProcessor.getWorkflowInstance().setState(state);
persist(taskProcessor.getWorkflowInstance());
} catch (Exception e) {
e.printStackTrace();
String msg = "Exception executing task: ["
+ workflowTask.getTaskName() + "]: Message: " + e.getMessage();
LOG.log(Level.WARNING, msg);
WorkflowState state = lifecycle.createState("Failure", "done", msg);
taskProcessor.getWorkflowInstance().setState(state);
persist(taskProcessor.getWorkflowInstance());
}
}
/*
* (non-Javadoc)
*
* @see java.lang.Thread#interrupt()
*/
@SuppressWarnings("deprecation")
@Override
public void interrupt() {
super.interrupt();
this.destroy();
}
};
String id = "";
synchronized (id) {
id = UUID.randomUUID().toString();
this.workerMap.put(id, worker);
this.executor.execute(worker);
}
}
/*
* (non-Javadoc)
*
* @see org.apache.oodt.cas.workflow.engine.EngineRunner#shutdown()
*/
@Override
public void shutdown() throws Exception {
for (Thread worker : this.workerMap.values()) {
if (worker != null) {
worker.interrupt();
worker = null;
}
}
}
/*
* (non-Javadoc)
*
* @see
* org.apache.oodt.cas.workflow.engine.runner.EngineRunner#hasOpenSlots(org
* .apache.oodt.cas.workflow.engine.processor.TaskProcessor)
*/
@Override
public boolean hasOpenSlots(TaskProcessor taskProcessor) throws Exception {
// TODO Auto-generated method stub
return true;
}
/* (non-Javadoc)
* @see org.apache.oodt.cas.workflow.engine.runner.EngineRunner#setInstanceRepository(org.apache.oodt.cas.workflow.instrepo.WorkflowInstanceRepository)
*/
@Override
public void setInstanceRepository(WorkflowInstanceRepository instRep) {
this.instRep = instRep;
}
}