Fixed TestPGETaskInstance test failure due to null FM Url at XMLHelper.fillIn()
diff --git a/pge/src/main/java/org/apache/oodt/cas/pge/PGETaskInstance.java b/pge/src/main/java/org/apache/oodt/cas/pge/PGETaskInstance.java
index 4f36890..2e266e7 100644
--- a/pge/src/main/java/org/apache/oodt/cas/pge/PGETaskInstance.java
+++ b/pge/src/main/java/org/apache/oodt/cas/pge/PGETaskInstance.java
@@ -16,6 +16,9 @@
*/
package org.apache.oodt.cas.pge;
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
import org.apache.commons.lang.Validate;
import org.apache.oodt.cas.crawl.AutoDetectProductCrawler;
import org.apache.oodt.cas.crawl.ProductCrawler;
@@ -54,12 +57,6 @@
import org.apache.oodt.cas.workflow.util.ScriptFile;
import org.apache.oodt.commons.exceptions.CommonsException;
import org.apache.oodt.commons.exec.ExecUtils;
-import org.apache.xmlrpc.XmlRpcException;
-
-import com.google.common.base.Splitter;
-import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-
import org.springframework.context.support.FileSystemXmlApplicationContext;
import java.io.File;
@@ -80,8 +77,14 @@
import java.util.regex.Pattern;
import static org.apache.oodt.cas.pge.metadata.PgeTaskMetKeys.*;
-import static org.apache.oodt.cas.pge.metadata.PgeTaskStatus.*;
-import static org.apache.oodt.cas.pge.util.GenericPgeObjectFactory.*;
+import static org.apache.oodt.cas.pge.metadata.PgeTaskStatus.CONF_FILE_BUILD;
+import static org.apache.oodt.cas.pge.metadata.PgeTaskStatus.CRAWLING;
+import static org.apache.oodt.cas.pge.metadata.PgeTaskStatus.RUNNING_PGE;
+import static org.apache.oodt.cas.pge.metadata.PgeTaskStatus.STAGING_INPUT;
+import static org.apache.oodt.cas.pge.util.GenericPgeObjectFactory.createConfigFilePropertyAdder;
+import static org.apache.oodt.cas.pge.util.GenericPgeObjectFactory.createFileStager;
+import static org.apache.oodt.cas.pge.util.GenericPgeObjectFactory.createPgeConfigBuilder;
+import static org.apache.oodt.cas.pge.util.GenericPgeObjectFactory.createSciPgeConfigFileWriter;
/**
@@ -94,24 +97,21 @@
public class PGETaskInstance implements WorkflowTaskInstance {
protected Logger logger = Logger.getLogger(PGETaskInstance.class.getName());
- protected WorkflowManagerClient wm;
- protected String workflowInstId;
+ private WorkflowManagerClient wmClient;
+ private String workflowInstId;
protected PgeMetadata pgeMetadata;
protected PgeConfig pgeConfig;
protected PGETaskInstance() {}
@Override
- public void run(Metadata metadata, WorkflowTaskConfiguration config)
- throws WorkflowTaskInstanceException {
+ public void run(Metadata metadata, WorkflowTaskConfiguration config) throws WorkflowTaskInstanceException {
try {
// Initialize CAS-PGE.
pgeMetadata = createPgeMetadata(metadata, config);
pgeConfig = createPgeConfig();
runPropertyAdders();
- wm = createWorkflowManagerClient();
- workflowInstId = getWorkflowInstanceId();
- logger = createLogger(); // use workflow ID specific logger from now on
+ logger = createLogger(); // use workflow ID specific logger from now on
// Write out PgeMetadata.
dumpMetadataIfRequested();
@@ -128,7 +128,9 @@
runPge();
// Ingest products.
- runIngestCrawler(createProductCrawler());
+ ProductCrawler productCrawler = createProductCrawler();
+ runIngestCrawler(productCrawler);
+ productCrawler.shutdown();
// Commit dynamic metadata.
updateDynamicMetadata();
@@ -141,7 +143,7 @@
protected void updateStatus(String status) throws Exception {
logger.info("Updating status to workflow as [" + status + "]");
- if (!wm.updateWorkflowInstanceStatus(workflowInstId, status)) {
+ if (!getWorkflowManagerClient().updateWorkflowInstanceStatus(workflowInstId, status)) {
throw new PGEException(
"Failed to update workflow status : client returned false");
}
@@ -250,12 +252,15 @@
pgeConfig.getPropertyAdderCustomArgs());
}
- protected WorkflowManagerClient createWorkflowManagerClient()
- throws MalformedURLException {
- String url = pgeMetadata.getMetadata(WORKFLOW_MANAGER_URL);
- logger.info("Creating WorkflowManager client for url [" + url + "]");
- Validate.notNull(url, "Must specify " + WORKFLOW_MANAGER_URL);
- return RpcCommunicationFactory.createClient(new URL(url));
+ protected WorkflowManagerClient getWorkflowManagerClient() throws MalformedURLException {
+ if (this.wmClient == null) {
+ String url = pgeMetadata.getMetadata(WORKFLOW_MANAGER_URL);
+ logger.info("Creating WorkflowManager client for url [" + url + "]");
+ Validate.notNull(url, "Must specify " + WORKFLOW_MANAGER_URL);
+ this.wmClient = RpcCommunicationFactory.createClient(new URL(url));
+ }
+
+ return this.wmClient;
}
protected String getWorkflowInstanceId() {
@@ -586,7 +591,26 @@
protected void updateDynamicMetadata() throws Exception {
pgeMetadata.commitMarkedDynamicMetadataKeys();
- wm.updateMetadataForWorkflow(workflowInstId,
- pgeMetadata.asMetadata(PgeMetadata.Type.DYNAMIC));
+ getWorkflowManagerClient()
+ .updateMetadataForWorkflow(workflowInstId, pgeMetadata.asMetadata(PgeMetadata.Type.DYNAMIC));
+ }
+
+ public String getWorkflowInstId() {
+ return workflowInstId;
+ }
+
+ public void setWorkflowInstId(String workflowInstId) {
+ this.workflowInstId = workflowInstId;
+ }
+
+ public void setWmClient(WorkflowManagerClient wmClient) {
+ this.wmClient = wmClient;
+ }
+
+ @Override
+ public void finalize() throws IOException {
+ if (wmClient != null) {
+ wmClient.close();
+ }
}
}
diff --git a/pge/src/main/java/org/apache/oodt/cas/pge/util/XmlHelper.java b/pge/src/main/java/org/apache/oodt/cas/pge/util/XmlHelper.java
index 9f01d99..a373af0 100644
--- a/pge/src/main/java/org/apache/oodt/cas/pge/util/XmlHelper.java
+++ b/pge/src/main/java/org/apache/oodt/cas/pge/util/XmlHelper.java
@@ -37,6 +37,7 @@
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
+import java.io.IOException;
import java.net.URL;
import java.util.Arrays;
import java.util.List;
@@ -438,21 +439,26 @@
return fillIn(value, inputMetadata, true);
}
- public static String fillIn(String value, Metadata inputMetadata,
- boolean envReplaceRecur) throws PGEException {
- try (FileManagerClient fmClient=RpcCommunicationFactory.createClient(
- new URL(inputMetadata.getMetadata(QUERY_FILE_MANAGER_URL.getName())))){
- while ((value = PathUtils
- .doDynamicReplacement(value, inputMetadata)).contains("[")
- && envReplaceRecur) {
+ public static String fillIn(String value, Metadata inputMetadata, boolean envReplaceRecur) throws PGEException {
+ FileManagerClient fmClient=null;
+ try {
+ while ((value = PathUtils.doDynamicReplacement(value, inputMetadata)).contains("[") && envReplaceRecur) {
}
- if (value.toUpperCase().matches(
- "^\\s*SQL\\s*\\(.*\\)\\s*\\{.*\\}\\s*$")) {
+
+ if (value.toUpperCase().matches("^\\s*SQL\\s*\\(.*\\)\\s*\\{.*\\}\\s*$")) {
+ fmClient = RpcCommunicationFactory
+ .createClient(new URL(inputMetadata.getMetadata(QUERY_FILE_MANAGER_URL.getName())));
value = QueryUtils.getQueryResultsAsString(fmClient.complexQuery(SqlParser.parseSqlQueryMethod(value)));
}
return value;
} catch (Exception e) {
throw new PGEException("Failed to parse value: " + value, e);
+ } finally {
+ if (fmClient != null) {
+ try {
+ fmClient.close();
+ } catch (IOException ignored) { }
+ }
}
}
}
diff --git a/pge/src/test/java/org/apache/oodt/cas/pge/TestPGETaskInstance.java b/pge/src/test/java/org/apache/oodt/cas/pge/TestPGETaskInstance.java
index c888290..db60792 100644
--- a/pge/src/test/java/org/apache/oodt/cas/pge/TestPGETaskInstance.java
+++ b/pge/src/test/java/org/apache/oodt/cas/pge/TestPGETaskInstance.java
@@ -18,6 +18,9 @@
//OODT static imports
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import org.apache.commons.io.FileUtils;
import org.apache.oodt.cas.crawl.AutoDetectProductCrawler;
import org.apache.oodt.cas.crawl.ProductCrawler;
@@ -35,13 +38,8 @@
import org.apache.oodt.cas.pge.writers.MockDynamicConfigFileWriter;
import org.apache.oodt.cas.workflow.metadata.CoreMetKeys;
import org.apache.oodt.cas.workflow.structs.WorkflowTaskConfiguration;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
+import org.apache.oodt.cas.workflow.system.AvroRpcWorkflowManagerClient;
import org.apache.oodt.cas.workflow.system.WorkflowManagerClient;
-import org.apache.oodt.cas.workflow.system.XmlRpcWorkflowManagerClient;
import org.junit.After;
import org.junit.Test;
import org.w3c.dom.Document;
@@ -50,10 +48,13 @@
import org.w3c.dom.NodeList;
import org.xml.sax.InputSource;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
import java.io.File;
import java.io.FileFilter;
import java.io.FileInputStream;
import java.io.StringReader;
+import java.net.URL;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -64,14 +65,18 @@
import java.util.logging.Level;
import java.util.logging.Logger;
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
-
import static org.apache.oodt.cas.pge.metadata.PgeTaskMetKeys.*;
import static org.apache.oodt.cas.pge.metadata.PgeTaskStatus.CRAWLING;
-import static org.easymock.EasyMock.*;
-import static org.junit.Assert.*;
-import static org.junit.Assume.*;
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeTrue;
//JDK imports
//JUnit imports
//Apache imports
@@ -219,25 +224,24 @@
assertEquals(Level.SEVERE.getLocalizedName() + ": pge2 message1", messages.get(1));
}
- @Test
+ @Test
public void testUpdateStatus() throws Exception {
final Map<String, String> args = Maps.newHashMap();
- PGETaskInstance pgeTask = createTestInstance();
- pgeTask.wm = new XmlRpcWorkflowManagerClient(null) {
- @Override
- public boolean updateWorkflowInstanceStatus(String instanceId,
- String status) {
- args.put("InstanceId", instanceId);
- args.put("Status", status);
- return true;
- }
- };
- String instanceId = "Test ID";
- String status = PgeTaskStatus.CRAWLING.getWorkflowStatusName();
- pgeTask.workflowInstId = instanceId;
- pgeTask.updateStatus(status);
- assertEquals(instanceId, args.get("InstanceId"));
- assertEquals(status, args.get("Status"));
+ PGETaskInstance pgeTask = createTestInstance();
+ pgeTask.setWmClient(new AvroRpcWorkflowManagerClient(new URL("http://localhost:9001")) {
+ @Override
+ public boolean updateWorkflowInstanceStatus(String instanceId, String status) {
+ args.put("InstanceId", instanceId);
+ args.put("Status", status);
+ return true;
+ }
+ });
+ String instanceId = "Test ID";
+ String status = PgeTaskStatus.CRAWLING.getWorkflowStatusName();
+ pgeTask.setWorkflowInstId(instanceId);
+ pgeTask.updateStatus(status);
+ assertEquals(instanceId, args.get("InstanceId"));
+ assertEquals(status, args.get("Status"));
}
@Test
@@ -271,11 +275,8 @@
@Test
public void testCreateWorkflowManagerClient() throws Exception {
PGETaskInstance pgeTask = createTestInstance();
- pgeTask.pgeMetadata.replaceMetadata(WORKFLOW_MANAGER_URL,
- "http://localhost:8888");
- WorkflowManagerClient wmClient =
- pgeTask.createWorkflowManagerClient();
- assertNotNull(wmClient);
+ pgeTask.pgeMetadata.replaceMetadata(WORKFLOW_MANAGER_URL, "http://localhost:8888");
+ assertNotNull(pgeTask.getWorkflowManagerClient());
}
@Test
@@ -462,12 +463,13 @@
pgeTask.pgeConfig.addOuputDirAndExpressions(new OutputDir("/tmp/dir1", true));
pgeTask.pgeConfig.addOuputDirAndExpressions(new OutputDir("/tmp/dir2", true));
pgeTask.pgeMetadata.replaceMetadata(ATTEMPT_INGEST_ALL, Boolean.toString(true));
- pgeTask.workflowInstId = "WorkflowInstanceId";
+ pgeTask.setWorkflowInstId("WorkflowInstanceId");
- pgeTask.wm = createMock(WorkflowManagerClient.class);
- expect(pgeTask.wm.updateWorkflowInstanceStatus(pgeTask.workflowInstId,
- CRAWLING.getWorkflowStatusName())).andReturn(true);
- replay(pgeTask.wm);
+ pgeTask.setWmClient(createMock(WorkflowManagerClient.class));
+ expect(pgeTask.getWorkflowManagerClient()
+ .updateWorkflowInstanceStatus(pgeTask.getWorkflowInstId(), CRAWLING.getWorkflowStatusName())
+ ).andReturn(true);
+ replay(pgeTask.getWorkflowManagerClient());
AutoDetectProductCrawler pc = createMock(AutoDetectProductCrawler.class);
pc.crawl(new File("/tmp/dir1"));
@@ -477,14 +479,14 @@
pgeTask.runIngestCrawler(pc);
- verify(pgeTask.wm);
+ verify(pgeTask.getWorkflowManagerClient());
verify(pc);
// Case: UpdateStatus Fail
- pgeTask.wm = createMock(WorkflowManagerClient.class);
- expect(pgeTask.wm.updateWorkflowInstanceStatus(pgeTask.workflowInstId,
+ pgeTask.setWmClient(createMock(WorkflowManagerClient.class));
+ expect(pgeTask.getWorkflowManagerClient().updateWorkflowInstanceStatus(pgeTask.getWorkflowInstId(),
CRAWLING.getWorkflowStatusName())).andReturn(false);
- replay(pgeTask.wm);
+ replay(pgeTask.getWorkflowManagerClient());
pc = createMock(AutoDetectProductCrawler.class);
replay(pc);
@@ -494,14 +496,14 @@
fail("Should have thrown");
} catch (Exception e) { /* expect throw */ }
- verify(pgeTask.wm);
+ verify(pgeTask.getWorkflowManagerClient());
verify(pc);
// Case: UpdateStatus Success, VerifyIngest Fail
- pgeTask.wm = createMock(WorkflowManagerClient.class);
- expect(pgeTask.wm.updateWorkflowInstanceStatus(pgeTask.workflowInstId,
+ pgeTask.setWmClient(createMock(WorkflowManagerClient.class));
+ expect(pgeTask.getWorkflowManagerClient().updateWorkflowInstanceStatus(pgeTask.getWorkflowInstId(),
CRAWLING.getWorkflowStatusName())).andReturn(true);
- replay(pgeTask.wm);
+ replay(pgeTask.getWorkflowManagerClient());
pc = createMock(AutoDetectProductCrawler.class);
pc.crawl(new File("/tmp/dir1"));
@@ -529,7 +531,7 @@
fail("Should have thrown");
} catch (Exception e) { /* expect throw */ }
- verify(pgeTask.wm);
+ verify(pgeTask.getWorkflowManagerClient());
verify(pc);
}
@@ -634,7 +636,7 @@
private PGETaskInstance createTestInstance(String workflowInstId)
throws Exception {
PGETaskInstance pgeTask = new PGETaskInstance();
- pgeTask.workflowInstId = workflowInstId;
+ pgeTask.setWorkflowInstId(workflowInstId);
pgeTask.pgeMetadata = new PgeMetadata();
pgeTask.pgeMetadata.replaceMetadata(NAME, "TestPGE");
pgeTask.pgeConfig = new PgeConfig();
diff --git a/workflow/src/main/java/org/apache/oodt/cas/workflow/system/AvroRpcWorkflowManagerClient.java b/workflow/src/main/java/org/apache/oodt/cas/workflow/system/AvroRpcWorkflowManagerClient.java
index adc0bba..337c520 100644
--- a/workflow/src/main/java/org/apache/oodt/cas/workflow/system/AvroRpcWorkflowManagerClient.java
+++ b/workflow/src/main/java/org/apache/oodt/cas/workflow/system/AvroRpcWorkflowManagerClient.java
@@ -20,13 +20,12 @@
import org.apache.avro.ipc.NettyTransceiver;
import org.apache.avro.ipc.Transceiver;
import org.apache.avro.ipc.specific.SpecificRequestor;
-import org.apache.oodt.cas.cli.CmdLineUtility;
import org.apache.oodt.cas.metadata.Metadata;
-import org.apache.oodt.cas.workflow.structs.WorkflowInstancePage;
-import org.apache.oodt.cas.workflow.structs.WorkflowInstance;
import org.apache.oodt.cas.workflow.structs.Workflow;
-import org.apache.oodt.cas.workflow.structs.WorkflowTask;
import org.apache.oodt.cas.workflow.structs.WorkflowCondition;
+import org.apache.oodt.cas.workflow.structs.WorkflowInstance;
+import org.apache.oodt.cas.workflow.structs.WorkflowInstancePage;
+import org.apache.oodt.cas.workflow.structs.WorkflowTask;
import org.apache.oodt.cas.workflow.util.AvroTypeFactory;
import java.io.IOException;
@@ -45,26 +44,20 @@
*/
public class AvroRpcWorkflowManagerClient implements WorkflowManagerClient {
- private static Logger LOG = Logger
- .getLogger(AvroRpcWorkflowManagerClient.class.getName());
+ private static Logger LOG = Logger.getLogger(AvroRpcWorkflowManagerClient.class.getName());
- Transceiver client;
-
- org.apache.oodt.cas.workflow.struct.avrotypes.WorkflowManager proxy;
-
- URL workflowManagerUrl;
-
+ private Transceiver client;
+ private org.apache.oodt.cas.workflow.struct.avrotypes.WorkflowManager proxy;
+ private URL workflowManagerUrl;
public AvroRpcWorkflowManagerClient(URL url){
workflowManagerUrl = url;
try {
client = new NettyTransceiver(new InetSocketAddress(url.getHost(),url.getPort()));
- proxy = SpecificRequestor.getClient(org.apache.oodt.cas.workflow.struct.avrotypes.WorkflowManager.class, client);
-
+ proxy = SpecificRequestor.getClient(org.apache.oodt.cas.workflow.struct.avrotypes.WorkflowManager.class, client);
} catch (IOException e) {
- e.printStackTrace();
+ LOG.severe(String.format("Error occurred when creating client: %s", e.getMessage()));
}
-
}
@Override
@@ -251,4 +244,18 @@
}
}
+
+ @Override
+ public void close() throws IOException {
+ if (client != null) {
+ client.close();
+ client = null;
+ LOG.info("Closed workflow manager client: " + workflowManagerUrl.toString());
+ }
+ }
+
+ @Override
+ public void finalize() throws IOException {
+ close();
+ }
}
diff --git a/workflow/src/main/java/org/apache/oodt/cas/workflow/system/WorkflowManagerClient.java b/workflow/src/main/java/org/apache/oodt/cas/workflow/system/WorkflowManagerClient.java
index 9d821bc..ed0cfd1 100644
--- a/workflow/src/main/java/org/apache/oodt/cas/workflow/system/WorkflowManagerClient.java
+++ b/workflow/src/main/java/org/apache/oodt/cas/workflow/system/WorkflowManagerClient.java
@@ -24,6 +24,7 @@
import org.apache.oodt.cas.workflow.structs.WorkflowTask;
import org.apache.oodt.cas.workflow.structs.WorkflowCondition;
+import java.io.Closeable;
import java.net.URL;
import java.util.List;
import java.util.Vector;
@@ -36,10 +37,11 @@
* Base interface for client RPC implementation.
* </p>
*/
-public interface WorkflowManagerClient {
+public interface WorkflowManagerClient extends Closeable {
boolean refreshRepository()
throws Exception;
+
String executeDynamicWorkflow(List<String> taskIds, Metadata metadata)
throws Exception;
diff --git a/workflow/src/main/java/org/apache/oodt/cas/workflow/system/XmlRpcWorkflowManagerClient.java b/workflow/src/main/java/org/apache/oodt/cas/workflow/system/XmlRpcWorkflowManagerClient.java
index 9c29cc4..f9bc6e6 100644
--- a/workflow/src/main/java/org/apache/oodt/cas/workflow/system/XmlRpcWorkflowManagerClient.java
+++ b/workflow/src/main/java/org/apache/oodt/cas/workflow/system/XmlRpcWorkflowManagerClient.java
@@ -534,4 +534,8 @@
client = new XmlRpcClient(workflowManagerUrl);
}
+ @Override
+ public void close() throws IOException {
+
+ }
}