blob: a3ece7f45a2f6aaa56564ee7275dc7147ee684c5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oodt.cas.workflow.system;
import com.google.common.base.Preconditions;
import org.apache.avro.AvroRemoteException;
import org.apache.avro.ipc.HttpServer;
import org.apache.avro.ipc.Server;
import org.apache.avro.ipc.specific.SpecificResponder;
import org.apache.oodt.cas.metadata.Metadata;
import org.apache.oodt.cas.workflow.engine.ThreadPoolWorkflowEngineFactory;
import org.apache.oodt.cas.workflow.engine.WorkflowEngine;
import org.apache.oodt.cas.workflow.repository.DataSourceWorkflowRepositoryFactory;
import org.apache.oodt.cas.workflow.repository.WorkflowRepository;
import org.apache.oodt.cas.workflow.struct.avrotypes.AvroWorkflow;
import org.apache.oodt.cas.workflow.struct.avrotypes.AvroWorkflowCondition;
import org.apache.oodt.cas.workflow.struct.avrotypes.AvroWorkflowInstance;
import org.apache.oodt.cas.workflow.struct.avrotypes.AvroWorkflowInstancePage;
import org.apache.oodt.cas.workflow.struct.avrotypes.AvroWorkflowTask;
import org.apache.oodt.cas.workflow.structs.Workflow;
import org.apache.oodt.cas.workflow.structs.WorkflowCondition;
import org.apache.oodt.cas.workflow.structs.WorkflowInstance;
import org.apache.oodt.cas.workflow.structs.WorkflowInstancePage;
import org.apache.oodt.cas.workflow.structs.WorkflowTask;
import org.apache.oodt.cas.workflow.structs.exceptions.EngineException;
import org.apache.oodt.cas.workflow.structs.exceptions.InstanceRepositoryException;
import org.apache.oodt.cas.workflow.structs.exceptions.RepositoryException;
import org.apache.oodt.cas.workflow.util.AvroTypeFactory;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.InetAddress;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import static org.apache.oodt.cas.workflow.util.GenericWorkflowObjectFactory.getWorkflowEngineFromClassName;
import static org.apache.oodt.cas.workflow.util.GenericWorkflowObjectFactory.getWorkflowRepositoryFromClassName;
/**
* @author radu
*
* <p>
* The Avro RPC based workflow manager.
* </p>
*/
public class AvroRpcWorkflowManager implements WorkflowManager,org.apache.oodt.cas.workflow.struct.avrotypes.WorkflowManager {
private static final org.slf4j.Logger logger = LoggerFactory.getLogger(AvroRpcWorkflowManager.class);
private Server server;
private final WorkflowEngine engine;
private WorkflowRepository repo;
public AvroRpcWorkflowManager(){
this(DEFAULT_WEB_SERVER_PORT);
}
public AvroRpcWorkflowManager(int port){
logger.info("Starting workflow manager on port: {} as {}",
port, System.getProperty("user.name", "unknown"));
Preconditions.checkArgument(port > 0, "Must specify a port greater than 0");
try {
loadProperties();
} catch (IOException e) {
logger.error("Error occurred when loading properties", e);
}
logger.debug("Getting workflow engine");
engine = getWorkflowEngineFromProperty();
if(engine == null){
throw new IllegalStateException("Null engine");
}
URL workflowManagerUrl = safeGetUrlFromString("http://" + getHostname() + ":" + port);
if(workflowManagerUrl == null){
throw new IllegalStateException("Null workflow manager URL");
}
logger.debug("Setting workflow engine url: {}", workflowManagerUrl.toString());
engine.setWorkflowManagerUrl(safeGetUrlFromString("http://" + getHostname() + ":" + port));
repo = getWorkflowRepositoryFromProperty();
logger.debug("Starting Http Server...");
// start up the server
try {
server = new HttpServer(new SpecificResponder(
org.apache.oodt.cas.workflow.struct.avrotypes.WorkflowManager.class,this), port);
} catch (IOException e) {
logger.error("Unable to create http server on port: {}", e);
throw new IllegalStateException("Unable to start http server on port: " + port, e);
}
logger.debug("Server created. Starting ...");
server.start();
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
shutdown();
}
});
logger.info("Workflow Manager started by {} for url: {}",
System.getProperty("user.name", "unknown"), workflowManagerUrl);
}
@Override
public boolean shutdown() {
logger.debug("Shutting down");
if (server != null) {
server.close();
server = null;
logger.info("Successfully shutdown");
return true;
} else
return false;
}
@Override
public boolean refreshRepository() throws AvroRemoteException {
repo = getWorkflowRepositoryFromProperty();
return true;
}
@Override
public String executeDynamicWorkflow(List<String> taskIds, Map<String, Object> metadata) throws AvroRemoteException {
logger.debug("Executing dynamic workflow with task IDs: {}", taskIds);
try {
if (taskIds == null || taskIds.size() == 0){
logger.warn("Null or empty task IDs");
throw new RepositoryException("Must specify task identifiers to build dynamic workflows!");
}
Workflow dynamicWorkflow = new Workflow();
for (String taskId : taskIds) {
WorkflowTask task = this.repo.getWorkflowTaskById(taskId);
if (task == null){
throw new RepositoryException("Dynamic workflow task: [" + taskId
+ "] is not defined!");
}
dynamicWorkflow.getTasks().add(task);
}
dynamicWorkflow.setId(this.repo.addWorkflow(dynamicWorkflow));
dynamicWorkflow.setName("Dynamic Workflow-" + dynamicWorkflow.getId());
Metadata met = new Metadata();
met.addMetadata(AvroTypeFactory.getMetadata(metadata));
logger.info("Created dynamic workflow[{}] for task IDs: {}", dynamicWorkflow.getName(), taskIds);
WorkflowInstance inst = this.engine.startWorkflow(dynamicWorkflow, met);
return inst.getId();
}catch (RepositoryException | EngineException e){
logger.error("Error occurred when creating dynamic workflow for taskIDs: {}", taskIds, e);
throw new AvroRemoteException(e);
}
}
@Override
public List<String> getRegisteredEvents() throws AvroRemoteException {
List events = null;
try {
events = repo.getRegisteredEvents();
return events;
} catch (RepositoryException e) {
logger.error("Error occurred when registering events: {}", e.getMessage());
throw new AvroRemoteException(
"Exception getting registered events from repository: Message: " + e.getMessage());
}
}
@Override
public AvroWorkflowInstancePage getFirstPage() throws AvroRemoteException {
WorkflowInstancePage page = engine.getInstanceRepository().getFirstPage();
if (page != null) {
logger.debug("Found first page: {}", page);
populateWorkflows(page.getPageWorkflows());
return AvroTypeFactory.getAvroWorkflowInstancePage(page);
} else
return AvroTypeFactory.getAvroWorkflowInstancePage(WorkflowInstancePage.blankPage());
}
@Override
public AvroWorkflowInstancePage getNextPage(AvroWorkflowInstancePage currentPage) throws AvroRemoteException {
// first unpack current page
WorkflowInstancePage currPage = AvroTypeFactory.getWorkflowInstancePage(currentPage);
WorkflowInstancePage page = engine.getInstanceRepository().getNextPage(
currPage);
if (page != null) {
populateWorkflows(page.getPageWorkflows());
return AvroTypeFactory.getAvroWorkflowInstancePage(page);
} else
return AvroTypeFactory.getAvroWorkflowInstancePage(WorkflowInstancePage
.blankPage());
}
@Override
public AvroWorkflowInstancePage getPrevPage(AvroWorkflowInstancePage currentPage) throws AvroRemoteException {
// first unpack current page
WorkflowInstancePage currPage = AvroTypeFactory.getWorkflowInstancePage(currentPage);
WorkflowInstancePage page = engine.getInstanceRepository().getPrevPage(
currPage);
if (page != null) {
populateWorkflows(page.getPageWorkflows());
return AvroTypeFactory.getAvroWorkflowInstancePage(page);
} else
return AvroTypeFactory.getAvroWorkflowInstancePage(WorkflowInstancePage
.blankPage());
}
@Override
public AvroWorkflowInstancePage getLastPage() throws AvroRemoteException {
WorkflowInstancePage page = engine.getInstanceRepository()
.getLastPage();
if (page != null) {
populateWorkflows(page.getPageWorkflows());
return AvroTypeFactory.getAvroWorkflowInstancePage(page);
} else
return AvroTypeFactory.getAvroWorkflowInstancePage(WorkflowInstancePage
.blankPage());
}
@Override
public AvroWorkflowInstancePage paginateWorkflowInstancesOfStatus(int pageNum, String status)
throws AvroRemoteException{
WorkflowInstancePage page = null;
try {
page = engine.getInstanceRepository()
.getPagedWorkflows(pageNum, status);
if (page != null) {
populateWorkflows(page.getPageWorkflows());
return AvroTypeFactory.getAvroWorkflowInstancePage(page);
} else
return AvroTypeFactory.getAvroWorkflowInstancePage(WorkflowInstancePage
.blankPage());
} catch (InstanceRepositoryException e) {
throw new AvroRemoteException(e);
}
}
@Override
public AvroWorkflowInstancePage paginateWorkflowInstances(int pageNum) throws AvroRemoteException {
WorkflowInstancePage page = null;
try {
page = engine.getInstanceRepository()
.getPagedWorkflows(pageNum);
if (page != null) {
populateWorkflows(page.getPageWorkflows());
return AvroTypeFactory.getAvroWorkflowInstancePage(page);
} else
return AvroTypeFactory.getAvroWorkflowInstancePage(WorkflowInstancePage
.blankPage());
} catch (InstanceRepositoryException e) {
throw new AvroRemoteException(e);
}
}
@Override
public List<AvroWorkflow> getWorkflowsByEvent(String eventName) throws AvroRemoteException {
List workflows = null;
try {
workflows = repo.getWorkflowsForEvent(eventName);
if (workflows != null)
return AvroTypeFactory.getAvroWorkflows(workflows);
else
return new ArrayList();
} catch (Exception e) {
e.printStackTrace();
throw new AvroRemoteException(
"Exception getting workflows for event: " + eventName
+ " from repository: Message: " + e.getMessage());
}
}
@Override
public Map<String, Object> getWorkflowInstanceMetadata(String wInstId) throws AvroRemoteException {
Metadata met = engine.getWorkflowInstanceMetadata(wInstId);
return AvroTypeFactory.getAvroMetadata(met);
}
@Override
public boolean handleEvent(String eventName, Map<String, Object> metadata) throws AvroRemoteException {
logger.info("Received event: {}", eventName);
logger.debug("Reveiced meta data for event: {} -> {}", eventName, metadata);
List workflows = null;
try {
workflows = repo.getWorkflowsForEvent(eventName);
} catch (Exception e) {
logger.error("Couldn't get workflows for event: {}", eventName, e);
throw new AvroRemoteException(
"Exception getting workflows associated with event: "
+ eventName + ": Message: " + e.getMessage());
}
if (workflows != null) {
logger.debug("Found {} workflows for event: {}", workflows.size(), eventName);
for (Iterator i = workflows.iterator(); i.hasNext();) {
Workflow w = (Workflow) i.next();
logger.debug("Workflow {} retrieved for event: {}", w.getName(), eventName);
Metadata m = new Metadata();
m.addMetadata(AvroTypeFactory.getMetadata(metadata));
try {
engine.startWorkflow(w, m);
} catch (Exception e) {
logger.error("Error when starting workflow: {} with metadata: {}", w.getName(), m.getAllKeys(), e);
throw new AvroRemoteException(
"Engine exception when starting workflow: " + w.getName() + ": Message: " + e.getMessage());
}
}
logger.info("Event: {} handled successfully", eventName);
return true;
} else
return false;
}
@Override
public AvroWorkflowInstance getWorkflowInstanceById(String wInstId) throws AvroRemoteException {
WorkflowInstance inst = null;
try {
inst = engine.getInstanceRepository().getWorkflowInstanceById(
wInstId);
} catch (Exception e) {
logger.error("Error obtaining workflow instance with ID: [{}], error: {}", wInstId, e.getMessage());
inst = new WorkflowInstance();
}
return AvroTypeFactory.getAvroWorkflowInstance(inst);
}
@Override
public boolean stopWorkflowInstance(String workflowInstId) throws AvroRemoteException {
engine.stopWorkflow(workflowInstId);
return true;
}
@Override
public boolean pauseWorkflowInstance(String workflowInstId) throws AvroRemoteException {
engine.pauseWorkflowInstance(workflowInstId);
return true;
}
@Override
public boolean resumeWorkflowInstance(String workflowInstId) throws AvroRemoteException {
engine.resumeWorkflowInstance(workflowInstId);
return true;
}
@Override
public double getWorkflowWallClockMinutes(String workflowInstId) throws AvroRemoteException {
return engine.getWallClockMinutes(workflowInstId);
}
@Override
public double getWorkflowCurrentTaskWallClockMinutes(String workflowInstId) throws AvroRemoteException {
return engine.getCurrentTaskWallClockMinutes(workflowInstId);
}
@Override
public int getNumWorkflowInstancesByStatus(String status) throws AvroRemoteException {
try {
return engine.getInstanceRepository().getNumWorkflowInstancesByStatus(
status);
} catch (InstanceRepositoryException e) {
throw new AvroRemoteException(e);
}
}
@Override
public int getNumWorkflowInstances() throws AvroRemoteException {
try {
return engine.getInstanceRepository().getNumWorkflowInstances();
} catch (InstanceRepositoryException e) {
e.printStackTrace();
throw new AvroRemoteException(e);
}
}
@Override
public List<AvroWorkflowInstance> getWorkflowInstancesByStatus(String status) throws AvroRemoteException {
List workflowInsts = null;
List<AvroWorkflowInstance> avroWorkflowInstances = new ArrayList<AvroWorkflowInstance>();
try {
workflowInsts = engine.getInstanceRepository()
.getWorkflowInstancesByStatus(status);
} catch (Exception e) {
logger.error("Error when obtaining workflow instances by status: {}, error: {}", status, e.getMessage());
return avroWorkflowInstances; //AvroTypeFactory.getAvroWorkflowInstances(workflowInsts);
}
if (workflowInsts != null) {
logger.debug("Retrieved {} instances by status: {}", workflowInsts.size(), status);
try {
for (WorkflowInstance wi :(List<WorkflowInstance>) workflowInsts){
// pick up the description of the workflow
Workflow wDesc = repo.getWorkflowById(wi.getWorkflow().getId());
// TODO: hack for now, fix this, we shouldn't have to cast
// here, bad
// design
if(wDesc == null){
//Possible dynamic workflow for instance
//reconsitute it from cache
wDesc = wi.getWorkflow();
repo.addWorkflow(wDesc);
}
wi.setWorkflow(wDesc);
avroWorkflowInstances.add(AvroTypeFactory.getAvroWorkflowInstance(wi));
}
} catch (Exception e) {
logger.error("Error when getting workflow instances by status: {}, error: {}", status, e.getMessage());
throw new AvroRemoteException(
"Exception getting workflow instances by statusfrom workflow engine: Message: "
+ e.getMessage());
}
}
return avroWorkflowInstances;
}
@Override
public List<AvroWorkflowInstance> getWorkflowInstances() throws AvroRemoteException {
List workflowInsts = null;
List<AvroWorkflowInstance> avroWorkflowInstances = new ArrayList<AvroWorkflowInstance>();
try {
workflowInsts = engine.getInstanceRepository()
.getWorkflowInstances();
} catch (Exception e) {
logger.error("Exception getting workflow instances. Message: {}", e.getMessage());
return avroWorkflowInstances;
}
if (workflowInsts != null) {
logger.debug("Retrieved {} workflow instances", workflowInsts.size());
try {
for (WorkflowInstance wi :(List<WorkflowInstance>) workflowInsts){
// pick up the description of the workflow
Workflow wDesc = repo.getWorkflowById(wi.getWorkflow()
.getId());
// TODO: hack for now, fix this, we shouldn't have to cast
// here, bad
// design
if(wDesc == null){
//Possible dynamic workflow for instance
//reconsitute it from cache
wDesc = wi.getWorkflow();
repo.addWorkflow(wDesc);
}
wi.setWorkflow(wDesc);
avroWorkflowInstances.add(AvroTypeFactory.getAvroWorkflowInstance(wi));
}
return avroWorkflowInstances;
} catch (Exception e) {
logger.error("Error getting workflow instances", e);
throw new AvroRemoteException(
"Exception getting workflow instances from workflow engine: Message: "
+ e.getMessage());
}
} else
return null;
}
@Override
public List<AvroWorkflow> getWorkflows() throws AvroRemoteException {
List workflowList = null;
try {
workflowList = repo.getWorkflows();
} catch (RepositoryException e) {
throw new AvroRemoteException(e);
}
if (workflowList != null) {
logger.debug("Retrieved {} workflows", workflowList.size());
try {
return AvroTypeFactory.getAvroWorkflows(workflowList);
} catch (Exception e) {
logger.error("Unable to get workflows: {}", e.getMessage());
throw new AvroRemoteException(
"Exception getting workflows from repository: Message: "
+ e.getMessage());
}
} else
return null;
}
@Override
public AvroWorkflowTask getTaskById(String taskId) throws AvroRemoteException {
try {
WorkflowTask t = repo.getWorkflowTaskById(taskId);
return AvroTypeFactory.getAvroWorkflowTask(t);
} catch (Exception e) {
logger.error("Error when getting task by ID: {} - {}", taskId, e.getMessage());
throw new AvroRemoteException("Exception getting task by id: Message: " + e.getMessage());
}
}
@Override
public AvroWorkflowCondition getConditionById(String conditionId) throws AvroRemoteException {
try {
WorkflowCondition c = repo.getWorkflowConditionById(conditionId);
return AvroTypeFactory.getAvroWorkflowCondition(c);
} catch (Exception e) {
logger.error("Error when getting condition by ID: {} - {}", conditionId, e.getMessage());
throw new AvroRemoteException("Exception getting condition by id: Message: " + e.getMessage());
}
}
@Override
public AvroWorkflow getWorkflowById(String workflowId) throws AvroRemoteException {
try {
Workflow workflow = repo.getWorkflowById(workflowId);
return AvroTypeFactory.getAvroWorkflow(workflow);
} catch (Exception e) {
logger.error("Error getting workflow by ID: {} - {}", workflowId, e.getMessage());
throw new AvroRemoteException(
"Exception getting workflow by id from the repository: Message: "
+ e.getMessage());
}
}
@Override
public synchronized boolean updateMetadataForWorkflow(String workflowInstId, Map<String, Object> metadata) throws AvroRemoteException {
Metadata met = new Metadata();
met.addMetadata(AvroTypeFactory.getMetadata(metadata));
return this.engine.updateMetadata(workflowInstId, met);
}
@Override
public synchronized boolean updateWorkflowInstance(AvroWorkflowInstance instance) throws AvroRemoteException {
WorkflowInstance wInst = AvroTypeFactory.getWorkflowInstance(instance);
return doUpdateWorkflowInstance(wInst);
}
@Override
public boolean setWorkflowInstanceCurrentTaskStartDateTime(String wInstId, String startDateTimeIsoStr) throws AvroRemoteException {
WorkflowInstance wInst = null;
try {
wInst = this.engine.getInstanceRepository()
.getWorkflowInstanceById(wInstId);
} catch (InstanceRepositoryException e) {
e.printStackTrace();
return false;
}
wInst.setCurrentTaskStartDateTimeIsoStr(startDateTimeIsoStr);
return doUpdateWorkflowInstance(wInst);
}
@Override
public synchronized boolean setWorkflowInstanceCurrentTaskEndDateTime(String wInstId, String endDateTimeIsoStr) throws AvroRemoteException {
WorkflowInstance wInst = null;
try {
wInst = this.engine.getInstanceRepository()
.getWorkflowInstanceById(wInstId);
} catch (InstanceRepositoryException e) {
e.printStackTrace();
return false;
}
wInst.setCurrentTaskEndDateTimeIsoStr(endDateTimeIsoStr);
return doUpdateWorkflowInstance(wInst);
}
public static void loadProperties() throws FileNotFoundException, IOException {
String configFile = System.getProperty(PROPERTIES_FILE_PROPERTY);
if (configFile != null) {
logger.info("Loading Workflow Manager Configuration Properties from: {}", configFile);
System.getProperties().load(new FileInputStream(new File(configFile)));
}
}
@Override
public synchronized boolean updateWorkflowInstanceStatus(String workflowInstId, String status) throws AvroRemoteException {
logger.debug("Updating workflow instance[{}] status to {}", workflowInstId, status);
WorkflowInstance wInst = null;
try {
wInst = engine.getInstanceRepository().getWorkflowInstanceById(workflowInstId);
} catch (Exception e) {
logger.error("Unable to updated workflow instance [{}] status to {} - {}",
workflowInstId, status, e.getMessage());
throw new AvroRemoteException(e);
}
wInst.setStatus(status);
return doUpdateWorkflowInstance(wInst);
}
private static WorkflowEngine getWorkflowEngineFromProperty() {
return getWorkflowEngineFromClassName(System.getProperty(
WORKFLOW_ENGINE_FACTORY_PROPERTY,
ThreadPoolWorkflowEngineFactory.class.getCanonicalName()));
}
private static WorkflowRepository getWorkflowRepositoryFromProperty() {
return getWorkflowRepositoryFromClassName(System.getProperty(
WORKFLOW_REPOSITORY_FACTORY_PROPERTY,
DataSourceWorkflowRepositoryFactory.class.getCanonicalName()));
}
private String getHostname() {
try {
// Get hostname by textual representation of IP address
InetAddress addr = InetAddress.getLocalHost();
// Get the host name
String hostname = addr.getHostName();
return hostname;
} catch (UnknownHostException e) {
}
return null;
}
private URL safeGetUrlFromString(String urlStr) {
try {
return new URL(urlStr);
} catch (MalformedURLException e) {
return null;
}
}
private boolean doUpdateWorkflowInstance(WorkflowInstance wInst) {
logger.debug("Updating workflow instance: {}", wInst.getId());
try {
engine.getInstanceRepository().updateWorkflowInstance(wInst);
return true;
} catch (InstanceRepositoryException e) {
logger.error("Error when updating workflow instance: {}", wInst.getId());
return false;
}
}
private void populateWorkflows(List wInsts) {
if (wInsts != null && wInsts.size() > 0) {
for (Iterator i = wInsts.iterator(); i.hasNext();) {
WorkflowInstance wInst = (WorkflowInstance) i.next();
if(wInst.getWorkflow() == null ||
(wInst.getWorkflow() != null &&
(wInst.getWorkflow().getName() == null ||
wInst.getWorkflow().getId() == null))){
wInst.setWorkflow(safeGetWorkflowById(wInst.getWorkflow()
.getId()));
}
else{
// check to see if the workflow exists in the
// repo
try {
if(repo.getWorkflowById(wInst.getWorkflow().getId()) == null){
repo.addWorkflow(wInst.getWorkflow());
}
} catch (RepositoryException e) {
logger.error("Error when attempting to look up workflow[{}] in populate workflows. Message:",
wInst.getWorkflow().getId(), e.getMessage());
}
}
}
}
}
private Workflow safeGetWorkflowById(String workflowId) {
logger.debug("Safe get workflow by ID: {}", workflowId);
try {
return repo.getWorkflowById(workflowId);
} catch (Exception e) {
logger.error("Error getting workflow by id: [{}], error: {}", workflowId, e.getMessage());
return new Workflow();
}
}
}