Fixed TestPGETaskInstance test failure due to null FM Url at XMLHelper.fillIn()
diff --git a/pge/src/main/java/org/apache/oodt/cas/pge/PGETaskInstance.java b/pge/src/main/java/org/apache/oodt/cas/pge/PGETaskInstance.java
index 4f36890..2e266e7 100644
--- a/pge/src/main/java/org/apache/oodt/cas/pge/PGETaskInstance.java
+++ b/pge/src/main/java/org/apache/oodt/cas/pge/PGETaskInstance.java
@@ -16,6 +16,9 @@
  */
 package org.apache.oodt.cas.pge;
 
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
 import org.apache.commons.lang.Validate;
 import org.apache.oodt.cas.crawl.AutoDetectProductCrawler;
 import org.apache.oodt.cas.crawl.ProductCrawler;
@@ -54,12 +57,6 @@
 import org.apache.oodt.cas.workflow.util.ScriptFile;
 import org.apache.oodt.commons.exceptions.CommonsException;
 import org.apache.oodt.commons.exec.ExecUtils;
-import org.apache.xmlrpc.XmlRpcException;
-
-import com.google.common.base.Splitter;
-import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-
 import org.springframework.context.support.FileSystemXmlApplicationContext;
 
 import java.io.File;
@@ -80,8 +77,14 @@
 import java.util.regex.Pattern;
 
 import static org.apache.oodt.cas.pge.metadata.PgeTaskMetKeys.*;
-import static org.apache.oodt.cas.pge.metadata.PgeTaskStatus.*;
-import static org.apache.oodt.cas.pge.util.GenericPgeObjectFactory.*;
+import static org.apache.oodt.cas.pge.metadata.PgeTaskStatus.CONF_FILE_BUILD;
+import static org.apache.oodt.cas.pge.metadata.PgeTaskStatus.CRAWLING;
+import static org.apache.oodt.cas.pge.metadata.PgeTaskStatus.RUNNING_PGE;
+import static org.apache.oodt.cas.pge.metadata.PgeTaskStatus.STAGING_INPUT;
+import static org.apache.oodt.cas.pge.util.GenericPgeObjectFactory.createConfigFilePropertyAdder;
+import static org.apache.oodt.cas.pge.util.GenericPgeObjectFactory.createFileStager;
+import static org.apache.oodt.cas.pge.util.GenericPgeObjectFactory.createPgeConfigBuilder;
+import static org.apache.oodt.cas.pge.util.GenericPgeObjectFactory.createSciPgeConfigFileWriter;
 
 
 /**
@@ -94,24 +97,21 @@
 public class PGETaskInstance implements WorkflowTaskInstance {
 
    protected Logger logger = Logger.getLogger(PGETaskInstance.class.getName());
-   protected WorkflowManagerClient wm;
-   protected String workflowInstId;
+   private WorkflowManagerClient wmClient;
+   private String workflowInstId;
    protected PgeMetadata pgeMetadata;
    protected PgeConfig pgeConfig;
 
    protected PGETaskInstance() {}
 
    @Override
-   public void run(Metadata metadata, WorkflowTaskConfiguration config)
-         throws WorkflowTaskInstanceException {
+   public void run(Metadata metadata, WorkflowTaskConfiguration config) throws WorkflowTaskInstanceException {
       try {
          // Initialize CAS-PGE.
          pgeMetadata = createPgeMetadata(metadata, config);
          pgeConfig = createPgeConfig();
          runPropertyAdders();
-         wm = createWorkflowManagerClient();
-         workflowInstId = getWorkflowInstanceId();
-         logger = createLogger(); // use workflow ID specific logger from now on 
+         logger = createLogger(); // use workflow ID specific logger from now on
 
          // Write out PgeMetadata.
          dumpMetadataIfRequested();
@@ -128,7 +128,9 @@
          runPge();
 
          // Ingest products.
-         runIngestCrawler(createProductCrawler());
+         ProductCrawler productCrawler = createProductCrawler();
+         runIngestCrawler(productCrawler);
+         productCrawler.shutdown();
 
          // Commit dynamic metadata.
          updateDynamicMetadata();
@@ -141,7 +143,7 @@
 
    protected void updateStatus(String status) throws Exception {
       logger.info("Updating status to workflow as [" + status + "]");
-      if (!wm.updateWorkflowInstanceStatus(workflowInstId, status)) {
+      if (!getWorkflowManagerClient().updateWorkflowInstanceStatus(workflowInstId, status)) {
          throw new PGEException(
                "Failed to update workflow status : client returned false");
       }
@@ -250,12 +252,15 @@
             pgeConfig.getPropertyAdderCustomArgs());
    }
 
-   protected WorkflowManagerClient createWorkflowManagerClient()
-       throws MalformedURLException {
-      String url = pgeMetadata.getMetadata(WORKFLOW_MANAGER_URL);
-      logger.info("Creating WorkflowManager client for url [" + url + "]");
-      Validate.notNull(url, "Must specify " + WORKFLOW_MANAGER_URL);
-      return RpcCommunicationFactory.createClient(new URL(url));
+   protected WorkflowManagerClient getWorkflowManagerClient() throws MalformedURLException {
+      if (this.wmClient == null) {
+         String url = pgeMetadata.getMetadata(WORKFLOW_MANAGER_URL);
+         logger.info("Creating WorkflowManager client for url [" + url + "]");
+         Validate.notNull(url, "Must specify " + WORKFLOW_MANAGER_URL);
+         this.wmClient = RpcCommunicationFactory.createClient(new URL(url));
+      }
+
+      return this.wmClient;
    }
 
    protected String getWorkflowInstanceId() {
@@ -586,7 +591,26 @@
 
    protected void updateDynamicMetadata() throws Exception {
       pgeMetadata.commitMarkedDynamicMetadataKeys();
-      wm.updateMetadataForWorkflow(workflowInstId,
-            pgeMetadata.asMetadata(PgeMetadata.Type.DYNAMIC));
+      getWorkflowManagerClient()
+              .updateMetadataForWorkflow(workflowInstId, pgeMetadata.asMetadata(PgeMetadata.Type.DYNAMIC));
+   }
+
+   public String getWorkflowInstId() {
+      return workflowInstId;
+   }
+
+   public void setWorkflowInstId(String workflowInstId) {
+      this.workflowInstId = workflowInstId;
+   }
+
+   public void setWmClient(WorkflowManagerClient wmClient) {
+      this.wmClient = wmClient;
+   }
+
+   @Override
+   public void finalize() throws IOException {
+      if (wmClient != null) {
+         wmClient.close();
+      }
    }
 }
diff --git a/pge/src/main/java/org/apache/oodt/cas/pge/util/XmlHelper.java b/pge/src/main/java/org/apache/oodt/cas/pge/util/XmlHelper.java
index 9f01d99..a373af0 100644
--- a/pge/src/main/java/org/apache/oodt/cas/pge/util/XmlHelper.java
+++ b/pge/src/main/java/org/apache/oodt/cas/pge/util/XmlHelper.java
@@ -37,6 +37,7 @@
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
+import java.io.IOException;
 import java.net.URL;
 import java.util.Arrays;
 import java.util.List;
@@ -438,21 +439,26 @@
 		return fillIn(value, inputMetadata, true);
 	}
 
-	public static String fillIn(String value, Metadata inputMetadata,
-			boolean envReplaceRecur) throws PGEException {
-		try (FileManagerClient fmClient=RpcCommunicationFactory.createClient(
-				new URL(inputMetadata.getMetadata(QUERY_FILE_MANAGER_URL.getName())))){
-			while ((value = PathUtils
-					.doDynamicReplacement(value, inputMetadata)).contains("[")
-					&& envReplaceRecur) {
+	public static String fillIn(String value, Metadata inputMetadata, boolean envReplaceRecur) throws PGEException {
+		FileManagerClient fmClient=null;
+		try {
+			while ((value = PathUtils.doDynamicReplacement(value, inputMetadata)).contains("[") && envReplaceRecur) {
 			}
-			if (value.toUpperCase().matches(
-					"^\\s*SQL\\s*\\(.*\\)\\s*\\{.*\\}\\s*$")) {
+
+			if (value.toUpperCase().matches("^\\s*SQL\\s*\\(.*\\)\\s*\\{.*\\}\\s*$")) {
+				fmClient = RpcCommunicationFactory
+						.createClient(new URL(inputMetadata.getMetadata(QUERY_FILE_MANAGER_URL.getName())));
 				value = QueryUtils.getQueryResultsAsString(fmClient.complexQuery(SqlParser.parseSqlQueryMethod(value)));
 			}
 			return value;
 		} catch (Exception e) {
 			throw new PGEException("Failed to parse value: " + value, e);
+		} finally {
+			if (fmClient != null) {
+				try {
+					fmClient.close();
+				} catch (IOException ignored) { }
+			}
 		}
 	}
 }
diff --git a/pge/src/test/java/org/apache/oodt/cas/pge/TestPGETaskInstance.java b/pge/src/test/java/org/apache/oodt/cas/pge/TestPGETaskInstance.java
index c888290..db60792 100644
--- a/pge/src/test/java/org/apache/oodt/cas/pge/TestPGETaskInstance.java
+++ b/pge/src/test/java/org/apache/oodt/cas/pge/TestPGETaskInstance.java
@@ -18,6 +18,9 @@
 
 //OODT static imports
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import org.apache.commons.io.FileUtils;
 import org.apache.oodt.cas.crawl.AutoDetectProductCrawler;
 import org.apache.oodt.cas.crawl.ProductCrawler;
@@ -35,13 +38,8 @@
 import org.apache.oodt.cas.pge.writers.MockDynamicConfigFileWriter;
 import org.apache.oodt.cas.workflow.metadata.CoreMetKeys;
 import org.apache.oodt.cas.workflow.structs.WorkflowTaskConfiguration;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
+import org.apache.oodt.cas.workflow.system.AvroRpcWorkflowManagerClient;
 import org.apache.oodt.cas.workflow.system.WorkflowManagerClient;
-import org.apache.oodt.cas.workflow.system.XmlRpcWorkflowManagerClient;
 import org.junit.After;
 import org.junit.Test;
 import org.w3c.dom.Document;
@@ -50,10 +48,13 @@
 import org.w3c.dom.NodeList;
 import org.xml.sax.InputSource;
 
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
 import java.io.File;
 import java.io.FileFilter;
 import java.io.FileInputStream;
 import java.io.StringReader;
+import java.net.URL;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -64,14 +65,18 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
-
 import static org.apache.oodt.cas.pge.metadata.PgeTaskMetKeys.*;
 import static org.apache.oodt.cas.pge.metadata.PgeTaskStatus.CRAWLING;
-import static org.easymock.EasyMock.*;
-import static org.junit.Assert.*;
-import static org.junit.Assume.*;
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeTrue;
 //JDK imports
 //JUnit imports
 //Apache imports
@@ -219,25 +224,24 @@
       assertEquals(Level.SEVERE.getLocalizedName() + ": pge2 message1", messages.get(1));
    }
 
-  @Test
+   @Test
    public void testUpdateStatus() throws Exception {
       final Map<String, String> args = Maps.newHashMap();
-      PGETaskInstance pgeTask = createTestInstance();
-      pgeTask.wm = new XmlRpcWorkflowManagerClient(null) {
-         @Override
-         public boolean updateWorkflowInstanceStatus(String instanceId,
-               String status) {
-            args.put("InstanceId", instanceId);
-            args.put("Status", status);
-            return true;
-         }
-      };
-      String instanceId = "Test ID";
-      String status = PgeTaskStatus.CRAWLING.getWorkflowStatusName();
-      pgeTask.workflowInstId = instanceId;
-      pgeTask.updateStatus(status);
-      assertEquals(instanceId, args.get("InstanceId"));
-      assertEquals(status, args.get("Status"));
+       PGETaskInstance pgeTask = createTestInstance();
+       pgeTask.setWmClient(new AvroRpcWorkflowManagerClient(new URL("http://localhost:9001")) {
+           @Override
+           public boolean updateWorkflowInstanceStatus(String instanceId, String status) {
+               args.put("InstanceId", instanceId);
+               args.put("Status", status);
+               return true;
+           }
+       });
+       String instanceId = "Test ID";
+       String status = PgeTaskStatus.CRAWLING.getWorkflowStatusName();
+       pgeTask.setWorkflowInstId(instanceId);
+       pgeTask.updateStatus(status);
+       assertEquals(instanceId, args.get("InstanceId"));
+       assertEquals(status, args.get("Status"));
    }
 
   @Test
@@ -271,11 +275,8 @@
   @Test
    public void testCreateWorkflowManagerClient() throws Exception {
       PGETaskInstance pgeTask = createTestInstance();
-      pgeTask.pgeMetadata.replaceMetadata(WORKFLOW_MANAGER_URL,
-            "http://localhost:8888");
-      WorkflowManagerClient wmClient =
-         pgeTask.createWorkflowManagerClient();
-      assertNotNull(wmClient);
+      pgeTask.pgeMetadata.replaceMetadata(WORKFLOW_MANAGER_URL, "http://localhost:8888");
+      assertNotNull(pgeTask.getWorkflowManagerClient());
    }
 
   @Test
@@ -462,12 +463,13 @@
       pgeTask.pgeConfig.addOuputDirAndExpressions(new OutputDir("/tmp/dir1", true));
       pgeTask.pgeConfig.addOuputDirAndExpressions(new OutputDir("/tmp/dir2", true));
       pgeTask.pgeMetadata.replaceMetadata(ATTEMPT_INGEST_ALL, Boolean.toString(true));
-      pgeTask.workflowInstId = "WorkflowInstanceId";
+      pgeTask.setWorkflowInstId("WorkflowInstanceId");
 
-      pgeTask.wm = createMock(WorkflowManagerClient.class);
-      expect(pgeTask.wm.updateWorkflowInstanceStatus(pgeTask.workflowInstId,
-            CRAWLING.getWorkflowStatusName())).andReturn(true);
-      replay(pgeTask.wm);
+      pgeTask.setWmClient(createMock(WorkflowManagerClient.class));
+      expect(pgeTask.getWorkflowManagerClient()
+              .updateWorkflowInstanceStatus(pgeTask.getWorkflowInstId(), CRAWLING.getWorkflowStatusName())
+      ).andReturn(true);
+      replay(pgeTask.getWorkflowManagerClient());
 
       AutoDetectProductCrawler pc = createMock(AutoDetectProductCrawler.class);
       pc.crawl(new File("/tmp/dir1"));
@@ -477,14 +479,14 @@
 
       pgeTask.runIngestCrawler(pc);
 
-      verify(pgeTask.wm);
+      verify(pgeTask.getWorkflowManagerClient());
       verify(pc);
 
       // Case: UpdateStatus Fail
-      pgeTask.wm = createMock(WorkflowManagerClient.class);
-      expect(pgeTask.wm.updateWorkflowInstanceStatus(pgeTask.workflowInstId,
+      pgeTask.setWmClient(createMock(WorkflowManagerClient.class));
+      expect(pgeTask.getWorkflowManagerClient().updateWorkflowInstanceStatus(pgeTask.getWorkflowInstId(),
             CRAWLING.getWorkflowStatusName())).andReturn(false);
-      replay(pgeTask.wm);
+      replay(pgeTask.getWorkflowManagerClient());
 
       pc = createMock(AutoDetectProductCrawler.class);
       replay(pc);
@@ -494,14 +496,14 @@
          fail("Should have thrown");
       } catch (Exception e) { /* expect throw */ }
 
-      verify(pgeTask.wm);
+      verify(pgeTask.getWorkflowManagerClient());
       verify(pc);
 
       // Case: UpdateStatus Success, VerifyIngest Fail
-      pgeTask.wm = createMock(WorkflowManagerClient.class);
-      expect(pgeTask.wm.updateWorkflowInstanceStatus(pgeTask.workflowInstId,
+      pgeTask.setWmClient(createMock(WorkflowManagerClient.class));
+      expect(pgeTask.getWorkflowManagerClient().updateWorkflowInstanceStatus(pgeTask.getWorkflowInstId(),
             CRAWLING.getWorkflowStatusName())).andReturn(true);
-      replay(pgeTask.wm);
+      replay(pgeTask.getWorkflowManagerClient());
 
       pc = createMock(AutoDetectProductCrawler.class);
       pc.crawl(new File("/tmp/dir1"));
@@ -529,7 +531,7 @@
          fail("Should have thrown");
       } catch (Exception e) { /* expect throw */ }
 
-      verify(pgeTask.wm);
+      verify(pgeTask.getWorkflowManagerClient());
       verify(pc);
    }
 
@@ -634,7 +636,7 @@
    private PGETaskInstance createTestInstance(String workflowInstId)
          throws Exception {
       PGETaskInstance pgeTask = new PGETaskInstance();
-      pgeTask.workflowInstId = workflowInstId;
+       pgeTask.setWorkflowInstId(workflowInstId);
       pgeTask.pgeMetadata = new PgeMetadata();
       pgeTask.pgeMetadata.replaceMetadata(NAME, "TestPGE");
       pgeTask.pgeConfig = new PgeConfig();
diff --git a/workflow/src/main/java/org/apache/oodt/cas/workflow/system/AvroRpcWorkflowManagerClient.java b/workflow/src/main/java/org/apache/oodt/cas/workflow/system/AvroRpcWorkflowManagerClient.java
index adc0bba..337c520 100644
--- a/workflow/src/main/java/org/apache/oodt/cas/workflow/system/AvroRpcWorkflowManagerClient.java
+++ b/workflow/src/main/java/org/apache/oodt/cas/workflow/system/AvroRpcWorkflowManagerClient.java
@@ -20,13 +20,12 @@
 import org.apache.avro.ipc.NettyTransceiver;
 import org.apache.avro.ipc.Transceiver;
 import org.apache.avro.ipc.specific.SpecificRequestor;
-import org.apache.oodt.cas.cli.CmdLineUtility;
 import org.apache.oodt.cas.metadata.Metadata;
-import org.apache.oodt.cas.workflow.structs.WorkflowInstancePage;
-import org.apache.oodt.cas.workflow.structs.WorkflowInstance;
 import org.apache.oodt.cas.workflow.structs.Workflow;
-import org.apache.oodt.cas.workflow.structs.WorkflowTask;
 import org.apache.oodt.cas.workflow.structs.WorkflowCondition;
+import org.apache.oodt.cas.workflow.structs.WorkflowInstance;
+import org.apache.oodt.cas.workflow.structs.WorkflowInstancePage;
+import org.apache.oodt.cas.workflow.structs.WorkflowTask;
 import org.apache.oodt.cas.workflow.util.AvroTypeFactory;
 
 import java.io.IOException;
@@ -45,26 +44,20 @@
  */
 public class AvroRpcWorkflowManagerClient implements WorkflowManagerClient {
 
-    private static Logger LOG = Logger
-            .getLogger(AvroRpcWorkflowManagerClient.class.getName());
+    private static Logger LOG = Logger.getLogger(AvroRpcWorkflowManagerClient.class.getName());
 
-    Transceiver client;
-
-    org.apache.oodt.cas.workflow.struct.avrotypes.WorkflowManager proxy;
-
-    URL workflowManagerUrl;
-
+    private Transceiver client;
+    private org.apache.oodt.cas.workflow.struct.avrotypes.WorkflowManager proxy;
+    private URL workflowManagerUrl;
 
     public AvroRpcWorkflowManagerClient(URL url){
         workflowManagerUrl = url;
         try {
             client = new NettyTransceiver(new InetSocketAddress(url.getHost(),url.getPort()));
-        proxy = SpecificRequestor.getClient(org.apache.oodt.cas.workflow.struct.avrotypes.WorkflowManager.class, client);
-
+            proxy = SpecificRequestor.getClient(org.apache.oodt.cas.workflow.struct.avrotypes.WorkflowManager.class, client);
         } catch (IOException e) {
-            e.printStackTrace();
+            LOG.severe(String.format("Error occurred when creating client: %s", e.getMessage()));
         }
-
     }
 
     @Override
@@ -251,4 +244,18 @@
         }
 
     }
+
+    @Override
+    public void close() throws IOException {
+        if (client != null) {
+            client.close();
+            client = null;
+            LOG.info("Closed workflow manager client: " + workflowManagerUrl.toString());
+        }
+    }
+
+    @Override
+    public void finalize() throws IOException {
+        close();
+    }
 }
diff --git a/workflow/src/main/java/org/apache/oodt/cas/workflow/system/WorkflowManagerClient.java b/workflow/src/main/java/org/apache/oodt/cas/workflow/system/WorkflowManagerClient.java
index 9d821bc..ed0cfd1 100644
--- a/workflow/src/main/java/org/apache/oodt/cas/workflow/system/WorkflowManagerClient.java
+++ b/workflow/src/main/java/org/apache/oodt/cas/workflow/system/WorkflowManagerClient.java
@@ -24,6 +24,7 @@
 import org.apache.oodt.cas.workflow.structs.WorkflowTask;
 import org.apache.oodt.cas.workflow.structs.WorkflowCondition;
 
+import java.io.Closeable;
 import java.net.URL;
 import java.util.List;
 import java.util.Vector;
@@ -36,10 +37,11 @@
  * Base interface for client RPC implementation.
  * </p>
  */
-public interface WorkflowManagerClient {
+public interface WorkflowManagerClient extends Closeable {
 
     boolean refreshRepository()
             throws Exception;
+
     String executeDynamicWorkflow(List<String> taskIds, Metadata metadata)
             throws Exception;
 
diff --git a/workflow/src/main/java/org/apache/oodt/cas/workflow/system/XmlRpcWorkflowManagerClient.java b/workflow/src/main/java/org/apache/oodt/cas/workflow/system/XmlRpcWorkflowManagerClient.java
index 9c29cc4..f9bc6e6 100644
--- a/workflow/src/main/java/org/apache/oodt/cas/workflow/system/XmlRpcWorkflowManagerClient.java
+++ b/workflow/src/main/java/org/apache/oodt/cas/workflow/system/XmlRpcWorkflowManagerClient.java
@@ -534,4 +534,8 @@
         client = new XmlRpcClient(workflowManagerUrl);
     }
 
+    @Override
+    public void close() throws IOException {
+
+    }
 }