[OODT-981] Further logging improvements for workflow manager
diff --git a/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/IterativeWorkflowProcessorThread.java b/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/IterativeWorkflowProcessorThread.java
index b431da3..0779a08 100644
--- a/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/IterativeWorkflowProcessorThread.java
+++ b/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/IterativeWorkflowProcessorThread.java
@@ -17,93 +17,91 @@
package org.apache.oodt.cas.workflow.engine;
-//OODT imports
+import org.apache.oodt.cas.metadata.Metadata;
import org.apache.oodt.cas.resource.structs.Job;
import org.apache.oodt.cas.resource.structs.exceptions.JobExecutionException;
+import org.apache.oodt.cas.resource.system.ResourceManagerClient;
import org.apache.oodt.cas.resource.system.XmlRpcResourceManagerClient;
import org.apache.oodt.cas.workflow.instrepo.WorkflowInstanceRepository;
import org.apache.oodt.cas.workflow.metadata.CoreMetKeys;
import org.apache.oodt.cas.workflow.structs.TaskJobInput;
+import org.apache.oodt.cas.workflow.structs.WorkflowCondition;
+import org.apache.oodt.cas.workflow.structs.WorkflowConditionInstance;
import org.apache.oodt.cas.workflow.structs.WorkflowInstance;
import org.apache.oodt.cas.workflow.structs.WorkflowStatus;
import org.apache.oodt.cas.workflow.structs.WorkflowTask;
import org.apache.oodt.cas.workflow.structs.WorkflowTaskConfiguration;
import org.apache.oodt.cas.workflow.structs.WorkflowTaskInstance;
-import org.apache.oodt.cas.workflow.structs.WorkflowCondition;
-import org.apache.oodt.cas.workflow.structs.WorkflowConditionInstance;
import org.apache.oodt.cas.workflow.structs.exceptions.InstanceRepositoryException;
import org.apache.oodt.cas.workflow.util.GenericWorkflowObjectFactory;
-import org.apache.oodt.cas.metadata.Metadata;
import org.apache.oodt.commons.util.DateConvert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-//JDK imports
import java.net.InetAddress;
import java.net.URL;
import java.net.UnknownHostException;
import java.util.Date;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.List;
-import java.util.logging.Level;
-import java.util.logging.Logger;
+
/**
- * An instance of the {@link WorkflowProcessorThread} that processes through an
+ * A workflow processor thread implementation that processes through an
* iterative {@link WorkflowInstance}. This class keeps an <code>Iterator</code>
- * that allows it to move from one end of a sequential {@link Workflow}
+ * that allows it to move from one end of a sequential {@link org.apache.oodt.cas.workflow.structs.Workflow}
* processing pipeline to another. This class should only be used to process
- * science pipeline style {@link Workflow}s, i.e., those which resemble an
+ * science pipeline style {@link org.apache.oodt.cas.workflow.structs.Workflow}s, i.e., those which resemble an
* iterative processing pipelines, with no forks, or concurrent task executions.
*
* @author mattmann
+ * @author Imesha Sudasingha
* @version $Revision$
*
*/
-public class IterativeWorkflowProcessorThread implements WorkflowStatus,
- CoreMetKeys, Runnable {
+public class IterativeWorkflowProcessorThread implements WorkflowStatus, CoreMetKeys, Runnable {
- /* the default queue name if we're using resmgr job submission */
+ private static final Logger logger = LoggerFactory.getLogger(IterativeWorkflowProcessorThread.class);
+
+ /** the default queue name if we're using resmgr job submission */
private static final String DEFAULT_QUEUE_NAME = "high";
- /* an iterator representing the current task that we are on in the workflow */
+ /** an iterator representing the current task that we are on in the workflow */
private Iterator taskIterator = null;
- /* the workflow instance that this processor thread is processing */
+ /** the workflow instance that this processor thread is processing */
private WorkflowInstance workflowInst = null;
- /* should our workflow processor thread start running? */
+ /** should our workflow processor thread start running? */
private boolean running = false;
- /*
+ /**
* the amount of seconds to wait inbetween checking for task pre-condition
* satisfaction
*/
private long waitForConditionSatisfy = -1;
- /* our instance repository used to persist workflow instance info */
+ /** our instance repository used to persist workflow instance info */
private WorkflowInstanceRepository instanceRepository = null;
- /*
+ /**
* our client to a resource manager: if null, local task execution will be
* performed
*/
- private XmlRpcResourceManagerClient rClient = null;
+ private ResourceManagerClient rClient = null;
- /* polling wait for res mgr */
+ /** polling wait for res mgr */
private long pollingWaitTime = 10L;
- /*
+ /**
* should our workflow processor thread pause, and not move onto the next
* task?
*/
private boolean pause = false;
- /* our log stream */
- private static Logger LOG = Logger
- .getLogger(IterativeWorkflowProcessorThread.class.getName());
-
private Map CONDITION_CACHE = new ConcurrentHashMap();
/* the parent workflow manager url that executed this processor thread */
@@ -112,8 +110,7 @@
/* the currently executing jobId if we're using the resource manager */
private String currentJobId = null;
- public IterativeWorkflowProcessorThread(WorkflowInstance wInst,
- WorkflowInstanceRepository instRep, URL wParentUrl) {
+ public IterativeWorkflowProcessorThread(WorkflowInstance wInst, WorkflowInstanceRepository instRep, URL wParentUrl) {
workflowInst = wInst;
taskIterator = workflowInst.getWorkflow().getTasks().iterator();
this.instanceRepository = instRep;
@@ -132,6 +129,9 @@
"org.apache.oodt.cas.workflow.engine.resourcemgr.pollingWaitTime", 10);
wmgrParentUrl = wParentUrl;
+ logger.info("Thread created for workflowInstance: {}[{}], instanceRepository class: {}, wmgrParentUrl: {}",
+ workflowInst.getId(), workflowInst.getWorkflow().getName(),
+ instanceRepository.getClass().getName(), wmgrParentUrl);
}
/*
@@ -140,6 +140,7 @@
* @see java.lang.Runnable#run()
*/
public void run() {
+ logger.debug("Starting workflow processor thread");
/*
* okay, we got into the run method, mark the start date time for the
* workflow instance here
@@ -151,15 +152,14 @@
while (running && taskIterator.hasNext()) {
if (pause) {
- LOG.log(Level.FINE,
- "IterativeWorkflowProcessorThread: Skipping execution: Paused: CurrentTask: "
- + getTaskNameById(workflowInst.getCurrentTaskId()));
+ logger.debug("Skipping execution: Paused: CurrentTask: {}", getTaskNameById(workflowInst.getCurrentTaskId()));
continue;
}
WorkflowTask task = (WorkflowTask) taskIterator.next();
- workflowInst.setCurrentTaskId(task.getTaskId());
+ logger.debug("Selected task: {} for execution", task.getTaskName());
+ workflowInst.setCurrentTaskId(task.getTaskId());
// now persist it
persistWorkflowInstance();
@@ -176,20 +176,17 @@
// only execute the below code when it's passed all of its
// pre-conditions
if (task.getConditions() != null) {
- while (!satisfied(task.getConditions(), task.getTaskId())
- && !isStopped()) {
+ while (!satisfied(task.getConditions(), task.getTaskId()) && !isStopped()) {
// if we're not paused, go ahead and pause us now
if (!isPaused()) {
pause();
}
- LOG.log(Level.FINEST,
- "Pre-conditions for task: " + task.getTaskName()
- + " unsatisfied: waiting: " + waitForConditionSatisfy
- + " seconds before checking again.");
+ logger.debug("Pre-conditions for task: {} unsatisfied: waiting: {} seconds before checking again.",
+ task.getTaskName(), waitForConditionSatisfy);
try {
- Thread.currentThread().sleep(waitForConditionSatisfy * 1000);
+ Thread.sleep(waitForConditionSatisfy * 1000);
} catch (InterruptedException ignore) {
}
@@ -212,10 +209,7 @@
}
// task execution
- LOG.log(
- Level.FINEST,
- "IterativeWorkflowProcessorThread: Executing task: "
- + task.getTaskName());
+ logger.info("Executing task: {}", task.getTaskName());
WorkflowTaskInstance taskInstance = GenericWorkflowObjectFactory
.getTaskObjectFromClassName(task.getTaskInstanceClassName());
@@ -273,7 +267,7 @@
// sleep for 5 seconds then come back
// and check again
try {
- Thread.currentThread().sleep(pollingWaitTime * 1000);
+ Thread.sleep(pollingWaitTime * 1000);
} catch (InterruptedException ignore) {
}
}
@@ -296,53 +290,39 @@
.getWorkflowInstanceById(workflowInst.getId());
workflowInst = updatedInst;
} catch (InstanceRepositoryException e) {
- LOG.log(Level.SEVERE, e.getMessage());
- LOG.log(Level.WARNING, "Unable to get " + "updated workflow "
- + "instance record " + "when executing remote job: Message: "
- + e.getMessage());
+ logger.error("Unable to get updated workflow instance record, task - {}, workflowInstanceId - {} : {}",
+ task.getTaskName(), workflowInst.getId(), e);
}
-
} catch (JobExecutionException e) {
- LOG.log(Level.WARNING,
- "Job execution exception using resource manager to execute job: Message: "
- + e.getMessage());
+ logger.error("Job execution exception using resource manager to execute job, task {} : {}",
+ task.getTaskName(), e);
}
} else {
+ logger.debug("Updating workflow instance [{}] state as {}", workflowInst.getId(), STARTED);
// we started, so mark it
workflowInst.setStatus(STARTED);
// go ahead and persist the workflow instance, after we
// save the current task start date time
- String currentTaskIsoStartDateTimeStr = DateConvert
- .isoFormat(new Date());
- workflowInst
- .setCurrentTaskStartDateTimeIsoStr(currentTaskIsoStartDateTimeStr);
+ String currentTaskIsoStartDateTimeStr = DateConvert.isoFormat(new Date());
+ workflowInst.setCurrentTaskStartDateTimeIsoStr(currentTaskIsoStartDateTimeStr);
workflowInst.setCurrentTaskEndDateTimeIsoStr(null); /*
* clear this out
* until it's ready
*/
persistWorkflowInstance();
- executeTaskLocally(taskInstance, workflowInst.getSharedContext(),
- task.getTaskConfig(), task.getTaskName());
+ executeTaskLocally(taskInstance, workflowInst.getSharedContext(), task.getTaskConfig(), task.getTaskName());
String currentTaskIsoEndDateTimeStr = DateConvert.isoFormat(new Date());
- workflowInst
- .setCurrentTaskEndDateTimeIsoStr(currentTaskIsoEndDateTimeStr);
+ workflowInst.setCurrentTaskEndDateTimeIsoStr(currentTaskIsoEndDateTimeStr);
persistWorkflowInstance();
}
- LOG.log(
- Level.FINEST,
- "IterativeWorkflowProcessorThread: Completed task: "
- + task.getTaskName());
-
+ logger.info("Completed task: {}", task.getTaskName());
}
- LOG.log(Level.FINEST,
- "IterativeWorkflowProcessorThread: Completed workflow: "
- + workflowInst.getWorkflow().getName());
+ logger.info("Completed workflow: {}", workflowInst.getWorkflow().getName());
if (!isStopped()) {
stop();
}
-
}
public WorkflowInstance getWorkflowInstance() {
@@ -355,8 +335,7 @@
// then kill the current job there
if (this.rClient != null && this.currentJobId != null) {
if (!this.rClient.killJob(this.currentJobId)) {
- LOG.log(Level.WARNING, "Attempt to kill " + "current resmgr job: ["
- + this.currentJobId + "]: failed");
+ logger.warn("Attempt to kill current resmgr job: [{}] failed", this.currentJobId);
}
}
@@ -437,7 +416,7 @@
/**
* @return the rClient
*/
- public XmlRpcResourceManagerClient getRClient() {
+ public ResourceManagerClient getRClient() {
return rClient;
}
@@ -448,8 +427,7 @@
public void setRClient(XmlRpcResourceManagerClient client) {
rClient = client;
if (rClient != null) {
- LOG.log(Level.INFO, "Resource Manager Job Submission enabled to: ["
- + rClient.getResMgrUrl() + "]");
+ logger.info("Resource Manager Job Submission enabled to: [{}]", rClient.getResMgrUrl());
}
}
@@ -472,24 +450,21 @@
Metadata dynMetadata) {
if (task.getRequiredMetFields() == null || (task.getRequiredMetFields()
.size() == 0)) {
- LOG.log(Level.INFO, "Task: [" + task.getTaskName()
- + "] has no required metadata fields");
- return true; /* no required metadata, so we're fine */
+ logger.info("Task: [{}] has no required metadata fields", task.getTaskName());
+ // No required metadata, so we're fine
+ return true;
}
for (Object o : task.getRequiredMetFields()) {
String reqField = (String) o;
if (!dynMetadata.containsKey(reqField)) {
- LOG.log(Level.SEVERE, "Checking metadata key: [" + reqField
- + "] for task: [" + task.getTaskName()
- + "]: failed: aborting workflow");
+ logger.error("Checking metadata key: [{}] for task: [{}]: failed: aborting workflow",
+ reqField, task.getTaskName());
return false;
}
}
- LOG.log(Level.INFO, "All required metadata fields present for task: ["
- + task.getTaskName() + "]");
-
+ logger.info("All required metadata fields present for task: [{}]", task.getTaskName());
return true;
}
@@ -559,22 +534,20 @@
private void persistWorkflowInstance() {
try {
+ logger.debug("Persisting workflow instance: {}[{}]", workflowInst.getId(), workflowInst.getWorkflow().getName());
instanceRepository.updateWorkflowInstance(workflowInst);
} catch (InstanceRepositoryException e) {
- LOG.log(Level.WARNING, "Exception persisting workflow instance: ["
- + workflowInst.getId() + "]: Message: " + e.getMessage());
+ logger.error("Exception persisting workflow instance: [{}]", workflowInst.getId(), e);
}
}
private void executeTaskLocally(WorkflowTaskInstance instance, Metadata met,
WorkflowTaskConfiguration cfg, String taskName) {
try {
- LOG.log(Level.INFO, "Executing task: [" + taskName + "] locally");
+ logger.info("Executing task: [{}], task instance: {} locally", taskName, instance.getClass());
instance.run(met, cfg);
} catch (Exception e) {
- LOG.log(Level.SEVERE, e.getMessage());
- LOG.log(Level.WARNING, "Exception executing task: [" + taskName
- + "] locally: Message: " + e.getMessage());
+ logger.error("Exception executing task: [{}] locally: Message: ", taskName, e);
}
}
@@ -582,10 +555,8 @@
try {
return rClient.isJobComplete(jobId);
} catch (Exception e) {
- LOG.log(Level.WARNING, "Exception checking completion status for job: ["
- + jobId + "]: Messsage: " + e.getMessage());
+ logger.error("Exception checking completion status for job: [{}]", jobId, e);
return false;
}
}
-
}
diff --git a/workflow/src/main/java/org/apache/oodt/cas/workflow/structs/WorkflowInstance.java b/workflow/src/main/java/org/apache/oodt/cas/workflow/structs/WorkflowInstance.java
index 5c541bc..345db69 100644
--- a/workflow/src/main/java/org/apache/oodt/cas/workflow/structs/WorkflowInstance.java
+++ b/workflow/src/main/java/org/apache/oodt/cas/workflow/structs/WorkflowInstance.java
@@ -17,18 +17,14 @@
package org.apache.oodt.cas.workflow.structs;
-//JDK imports
-
import org.apache.oodt.cas.metadata.Metadata;
import org.apache.oodt.cas.workflow.lifecycle.WorkflowState;
import org.apache.oodt.commons.util.DateConvert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.text.ParseException;
import java.util.Date;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-//OODT imports
/**
* A WorkflowInstance is an instantiation of the abstract description of a
@@ -61,7 +57,9 @@
*
*/
public class WorkflowInstance {
- private static Logger LOG = Logger.getLogger(WorkflowInstance.class.getName());
+
+ private static final Logger logger = LoggerFactory.getLogger(WorkflowInstance.class);
+
private ParentChildWorkflow workflow;
private String id;
@@ -92,8 +90,8 @@
public WorkflowInstance(Workflow workflow, String id, WorkflowState state,
String currentTaskId, Date startDate, Date endDate,
Metadata sharedContext, int timesBlocked, Priority priority) {
- this.workflow = workflow != null && workflow instanceof ParentChildWorkflow ? (ParentChildWorkflow) workflow
- : new ParentChildWorkflow(workflow != null ? workflow : new Workflow());
+ this.workflow = workflow instanceof ParentChildWorkflow ?
+ (ParentChildWorkflow) workflow : new ParentChildWorkflow(workflow != null ? workflow : new Workflow());
this.id = id;
this.state = state;
this.currentTaskId = currentTaskId;
@@ -138,6 +136,7 @@
WorkflowState state = new WorkflowState();
state.setName(status);
this.state = state;
+ logger.debug("Workflow state updated to: {}", state.getName());
}
/**
@@ -312,7 +311,7 @@
try {
this.endDate = DateConvert.isoParse(endDateTimeIsoStr);
} catch (ParseException e) {
- LOG.log(Level.SEVERE, e.getMessage());
+ logger.error("Error when parsing end time: {}", e.getMessage());
// fail silently besides this: it's just a setter
}
}
@@ -337,7 +336,7 @@
try {
this.startDate = DateConvert.isoParse(startDateTimeIsoStr);
} catch (ParseException e) {
- LOG.log(Level.SEVERE, e.getMessage());
+ logger.error("Error when parsing start time: {}", e.getMessage());
// fail silently besides this: it's just a setter
}
}
@@ -368,7 +367,7 @@
this.getTaskById(currentTaskId).
setEndDate(DateConvert.isoParse(currentTaskEndDateTimeIsoStr));
} catch (ParseException e) {
- LOG.log(Level.SEVERE, e.getMessage());
+ logger.error("Error when parsing time: {}", e.getMessage());
// fail silently besides this: it's just a setter
}
}
@@ -398,7 +397,7 @@
this.getTaskById(currentTaskId).setStartDate(DateConvert
.isoParse(currentTaskStartDateTimeIsoStr));
} catch (ParseException e) {
- LOG.log(Level.SEVERE, e.getMessage());
+ logger.error("Error when parsing time: {}", e.getMessage());
// fail silently besides this: it's just a setter
}
}
@@ -446,6 +445,4 @@
return null;
}
}
-
-
}
diff --git a/workflow/src/main/resources/log4j2.xml b/workflow/src/main/resources/log4j2.xml
index 8033d98..3214815 100644
--- a/workflow/src/main/resources/log4j2.xml
+++ b/workflow/src/main/resources/log4j2.xml
@@ -23,7 +23,7 @@
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
</Console>
- <File name="File" fileName="../logs/cas_wmgr.log" immediateFlush="false" append="false">
+ <File name="File" fileName="../logs/cas_wmgr.log" immediateFlush="true" append="false">
<PatternLayout pattern="%d{yyy-MM-dd HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
</File>
</Appenders>