Merge branch '0.11' of https://github.com/apache/falcon into 0.11
diff --git a/addons/extensions/hdfs-mirroring/src/main/resources/runtime/hdfs-mirroring-workflow.xml b/addons/extensions/hdfs-mirroring/src/main/resources/runtime/hdfs-mirroring-workflow.xml
index 7929dd7..1287333 100644
--- a/addons/extensions/hdfs-mirroring/src/main/resources/runtime/hdfs-mirroring-workflow.xml
+++ b/addons/extensions/hdfs-mirroring/src/main/resources/runtime/hdfs-mirroring-workflow.xml
@@ -92,7 +92,7 @@
             <arg>-sourcePaths</arg>
             <arg>${sourceDir}</arg>
             <arg>-targetPath</arg>
-            <arg>${targetClusterFS}${targetDir}</arg>
+            <arg>${targetDir}</arg>
             <arg>-falconFeedStorageType</arg>
             <arg>FILESYSTEM</arg>
             <arg>-availabilityFlag</arg>
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java
index 5cd7e74..fabc9ed 100644
--- a/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java
@@ -42,7 +42,7 @@
 
     private static final Logger LOG = LoggerFactory.getLogger(CopyMapper.class);
     private EventUtils eventUtils;
-    ScheduledThreadPoolExecutor timer;
+    private ScheduledThreadPoolExecutor timer;
 
     @Override
     protected void setup(Context context) throws IOException, InterruptedException {
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyReducer.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyReducer.java
index 7c415c3..c894b13 100644
--- a/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyReducer.java
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyReducer.java
@@ -48,10 +48,10 @@
     @Override
     protected void setup(Context context) throws IOException, InterruptedException {
         Configuration conf = context.getConfiguration();
-        FileSystem fs= FileSystem.get(
+        FileSystem fs = FileSystem.get(
                 FileUtils.getConfiguration(context.getConfiguration(),
-                conf.get(HiveDRArgs.TARGET_NN.getName()),
-                conf.get(HiveDRArgs.TARGET_NN_KERBEROS_PRINCIPAL.getName())));
+                        conf.get(HiveDRArgs.TARGET_NN.getName()),
+                        conf.get(HiveDRArgs.TARGET_NN_KERBEROS_PRINCIPAL.getName())));
         hiveDRStore = new HiveDRStatusStore(fs);
     }
 
@@ -67,7 +67,7 @@
 
     @Override
     protected void reduce(Text key, Iterable<Text> values, final Context context)
-            throws IOException, InterruptedException {
+        throws IOException, InterruptedException {
         List<ReplicationStatus> replStatusList = new ArrayList<ReplicationStatus>();
         ReplicationStatus rs;
         timer = new ScheduledThreadPoolExecutor(1);
diff --git a/common/src/main/java/org/apache/falcon/util/FSDRUtils.java b/common/src/main/java/org/apache/falcon/util/FSDRUtils.java
new file mode 100644
index 0000000..17da193
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/util/FSDRUtils.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.util;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.falcon.FalconException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Util class for FS DR.
+ */
+public final class FSDRUtils {
+    private FSDRUtils() {
+    }
+
+    private static final List<String> HDFS_SCHEME_PREFIXES =
+            Arrays.asList("file", "hdfs", "hftp", "hsftp", "webhdfs", "swebhdfs");
+
+    private static Configuration defaultConf = new Configuration();
+
+    public static Configuration getDefaultConf() {
+        return defaultConf;
+    }
+
+    public static void setDefaultConf(Configuration conf) {
+        defaultConf = conf;
+    }
+
+    public static boolean isHCFS(Path filePath) throws FalconException {
+        if (filePath == null) {
+            throw new FalconException("filePath cannot be empty");
+        }
+
+        String scheme;
+        try {
+            FileSystem f = FileSystem.get(filePath.toUri(), getDefaultConf());
+            scheme = f.getScheme();
+            if (StringUtils.isBlank(scheme)) {
+                throw new FalconException("Cannot get valid scheme for " + filePath);
+            }
+        } catch (IOException e) {
+            throw new FalconException(e);
+        }
+
+        return HDFS_SCHEME_PREFIXES.contains(scheme.toLowerCase().trim()) ? false : true;
+    }
+}
diff --git a/common/src/test/java/org/apache/falcon/util/FSDRUtilsTest.java b/common/src/test/java/org/apache/falcon/util/FSDRUtilsTest.java
new file mode 100644
index 0000000..7f63cda
--- /dev/null
+++ b/common/src/test/java/org/apache/falcon/util/FSDRUtilsTest.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.util;
+
+import org.apache.falcon.FalconException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ * Tests FSDRUtils.
+ */
+public final class FSDRUtilsTest {
+
+    @BeforeClass
+    private void setup() throws Exception {
+        Configuration conf = new Configuration();
+        conf.set("fs.s3n.awsAccessKeyId", "testS3KeyId");
+        conf.set("fs.s3n.awsSecretAccessKey", "testS3AccessKey");
+        conf.set("fs.azure.account.key.mystorage.blob.core.windows.net", "dGVzdEF6dXJlQWNjZXNzS2V5");
+        FSDRUtils.setDefaultConf(conf);
+    }
+
+    @Test(expectedExceptions = FalconException.class, expectedExceptionsMessageRegExp = "filePath cannot be empty")
+    public void testIsHCFSEmptyPath() throws Exception {
+        FSDRUtils.isHCFS(null);
+    }
+
+    @Test
+    public void testIsHCFS() throws Exception {
+        boolean isHCFSPath = FSDRUtils.isHCFS(new Path("/apps/dr"));
+        Assert.assertFalse(isHCFSPath);
+
+        isHCFSPath = FSDRUtils.isHCFS(new Path("hdfs://localhost:54136/apps/dr"));
+        Assert.assertFalse(isHCFSPath);
+
+        isHCFSPath = FSDRUtils.isHCFS(new Path("hftp://localhost:54136/apps/dr"));
+        Assert.assertFalse(isHCFSPath);
+
+        isHCFSPath = FSDRUtils.isHCFS(new Path("s3n://testBucket/apps/dr"));
+        Assert.assertTrue(isHCFSPath);
+    }
+}
diff --git a/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfs/HdfsMirroringExtension.java b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfs/HdfsMirroringExtension.java
index 74d217c..fd823cf 100644
--- a/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfs/HdfsMirroringExtension.java
+++ b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfs/HdfsMirroringExtension.java
@@ -23,6 +23,8 @@
 import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.extensions.AbstractExtension;
+import org.apache.falcon.util.FSDRUtils;
+import org.apache.hadoop.fs.Path;
 
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -48,11 +50,30 @@
                 throw new FalconException("Missing extension property: " + option.getName());
             }
         }
+
+        String srcPaths = extensionProperties.getProperty(HdfsMirroringExtensionProperties
+                .SOURCE_DIR.getName());
+        if (!isHCFSPath(srcPaths)) {
+            if (extensionProperties.getProperty(HdfsMirroringExtensionProperties.SOURCE_CLUSTER.getName()) == null) {
+                throw new FalconException("Missing extension property: " + HdfsMirroringExtensionProperties.
+                        SOURCE_CLUSTER.getName());
+            }
+        }
+        String targetDir = extensionProperties.getProperty(HdfsMirroringExtensionProperties
+                .TARGET_DIR.getName());
+
+        if (!FSDRUtils.isHCFS(new Path(targetDir.trim()))) {
+            if (extensionProperties.getProperty(HdfsMirroringExtensionProperties.TARGET_CLUSTER.getName()) == null) {
+                throw new FalconException("Missing extension property: " + HdfsMirroringExtensionProperties.
+                        TARGET_CLUSTER.getName());
+            }
+        }
     }
 
     @Override
     public Properties getAdditionalProperties(final Properties extensionProperties) throws FalconException {
         Properties additionalProperties = new Properties();
+        boolean isHCFSDR = false;
 
         // Add default properties if not passed
         String distcpMaxMaps = extensionProperties.getProperty(
@@ -67,23 +88,31 @@
             additionalProperties.put(HdfsMirroringExtensionProperties.DISTCP_MAP_BANDWIDTH_IN_MB.getName(), "100");
         }
 
-        // Construct fully qualified hdfs src path
         String srcPaths = extensionProperties.getProperty(HdfsMirroringExtensionProperties
                 .SOURCE_DIR.getName());
-        StringBuilder absoluteSrcPaths = new StringBuilder();
-        String sourceClusterName = extensionProperties.getProperty(
-                HdfsMirroringExtensionProperties.SOURCE_CLUSTER.getName());
+        String sourceClusterFS = "";
+        if (isHCFSPath(srcPaths)) {
+            // Make sure path is fully qualified
+            // For HCFS only one path
+            URI pathUri = new Path(srcPaths).toUri();
+            if (pathUri.getAuthority() == null) {
+                throw new FalconException("getAdditionalProperties: " + srcPaths + " is not fully qualified path");
+            }
+            isHCFSDR = true;
+        } else {
+            StringBuilder absoluteSrcPaths = new StringBuilder();
+            String sourceClusterName = extensionProperties.getProperty(
+                    HdfsMirroringExtensionProperties.SOURCE_CLUSTER.getName());
 
-        // Since source cluster get read interface
-        Cluster srcCluster = ClusterHelper.getCluster(sourceClusterName);
-        if (srcCluster == null) {
-            throw new FalconException("Cluster entity " + sourceClusterName + " not found");
-        }
-        String srcClusterEndPoint = ClusterHelper.getReadOnlyStorageUrl(srcCluster);
+            // Since source cluster get read interface
+            Cluster srcCluster = ClusterHelper.getCluster(sourceClusterName.trim());
+            if (srcCluster == null) {
+                throw new FalconException("Source Cluster entity " + sourceClusterName + " not found");
+            }
+            String srcClusterEndPoint = ClusterHelper.getReadOnlyStorageUrl(srcCluster);
 
-        if (StringUtils.isNotBlank(srcPaths)) {
+            // Construct fully qualified hdfs src path
             String[] paths = srcPaths.split(COMMA_SEPARATOR);
-
             URI pathUri;
             for (String path : paths) {
                 try {
@@ -92,48 +121,78 @@
                     throw new FalconException(e);
                 }
                 String authority = pathUri.getAuthority();
-                StringBuilder srcpath = new StringBuilder();
+                StringBuilder srcpath;
                 if (authority == null) {
-                    srcpath.append(srcClusterEndPoint);
+                    srcpath = new StringBuilder(srcClusterEndPoint);
+                } else {
+                    srcpath = new StringBuilder();
                 }
-
                 srcpath.append(path.trim());
                 srcpath.append(COMMA_SEPARATOR);
                 absoluteSrcPaths.append(srcpath);
             }
+            additionalProperties.put(HdfsMirroringExtensionProperties.SOURCE_DIR.getName(),
+                    StringUtils.removeEnd(absoluteSrcPaths.toString(), COMMA_SEPARATOR));
+            sourceClusterFS = ClusterHelper.getReadOnlyStorageUrl(srcCluster);
         }
-        additionalProperties.put(HdfsMirroringExtensionProperties.SOURCE_DIR.getName(),
-                StringUtils.removeEnd(absoluteSrcPaths.toString(), COMMA_SEPARATOR));
 
-        // Target dir shouldn't have the namenode
+
         String targetDir = extensionProperties.getProperty(HdfsMirroringExtensionProperties
                 .TARGET_DIR.getName());
+        String targetClusterFS = "";
+        if (FSDRUtils.isHCFS(new Path(targetDir.trim()))) {
+            // Make sure path is fully qualified
+            URI pathUri = new Path(targetDir).toUri();
+            if (pathUri.getAuthority() == null) {
+                throw new FalconException("getAdditionalProperties: " + targetDir + " is not fully qualified path");
+            }
+            isHCFSDR = true;
+        } else {
+            String targetClusterName = extensionProperties.getProperty(
+                    HdfsMirroringExtensionProperties.TARGET_CLUSTER.getName());
 
-        URI targetPathUri;
-        try {
-            targetPathUri = new URI(targetDir.trim());
-        } catch (URISyntaxException e) {
-            throw new FalconException(e);
-        }
+            Cluster targetCluster = ClusterHelper.getCluster(targetClusterName.trim());
+            if (targetCluster == null) {
+                throw new FalconException("Target Cluster entity " + targetClusterName + " not found");
+            }
 
-        if (targetPathUri.getScheme() != null) {
+            targetClusterFS = ClusterHelper.getStorageUrl(targetCluster);
+
+            // Construct fully qualified hdfs target path
+            URI pathUri;
+            try {
+                pathUri = new URI(targetDir.trim());
+            } catch (URISyntaxException e) {
+                throw new FalconException(e);
+            }
+
+            StringBuilder targetPath;
+            String authority = pathUri.getAuthority();
+            if (authority == null) {
+                targetPath = new StringBuilder(targetClusterFS);
+            } else {
+                targetPath = new StringBuilder();
+            }
+            targetPath.append(targetDir.trim());
+
             additionalProperties.put(HdfsMirroringExtensionProperties.TARGET_DIR.getName(),
-                    targetPathUri.getPath());
+                    targetPath.toString());
         }
 
-        // add sourceClusterFS and targetClusterFS
-        additionalProperties.put(HdfsMirroringExtensionProperties.SOURCE_CLUSTER_FS_WRITE_ENDPOINT.getName(),
-                ClusterHelper.getStorageUrl(srcCluster));
-
-        String targetClusterName = extensionProperties.getProperty(
-                HdfsMirroringExtensionProperties.TARGET_CLUSTER.getName());
-
-        Cluster targetCluster = ClusterHelper.getCluster(targetClusterName);
-        if (targetCluster == null) {
-            throw new FalconException("Cluster entity " + targetClusterName + " not found");
+        // Oozie doesn't take null or empty string for arg in the WF. For HCFS pass the source FS as its not used
+        if (isHCFSDR) {
+            if (StringUtils.isBlank(sourceClusterFS)) {
+                sourceClusterFS = targetClusterFS;
+            } else if (StringUtils.isBlank(targetClusterFS)) {
+                targetClusterFS = sourceClusterFS;
+            }
         }
+        // Add sourceClusterFS
+        additionalProperties.put(HdfsMirroringExtensionProperties.SOURCE_CLUSTER_FS_READ_ENDPOINT.getName(),
+                sourceClusterFS);
+        // Add targetClusterFS
         additionalProperties.put(HdfsMirroringExtensionProperties.TARGET_CLUSTER_FS_WRITE_ENDPOINT.getName(),
-                ClusterHelper.getStorageUrl(targetCluster));
+                targetClusterFS);
 
         if (StringUtils.isBlank(
                 extensionProperties.getProperty(HdfsMirroringExtensionProperties.TDE_ENCRYPTION_ENABLED.getName()))) {
@@ -144,4 +203,16 @@
         return additionalProperties;
     }
 
+    private static boolean isHCFSPath(String srcPaths) throws FalconException {
+        if (StringUtils.isNotBlank(srcPaths)) {
+            String[] paths = srcPaths.split(COMMA_SEPARATOR);
+
+            // We expect all paths to be of same type, hence verify the first path
+            for (String path : paths) {
+                return FSDRUtils.isHCFS(new Path(path.trim()));
+            }
+        }
+
+        return false;
+    }
 }
diff --git a/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfs/HdfsMirroringExtensionProperties.java b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfs/HdfsMirroringExtensionProperties.java
index 52ae0c0..47ebca0 100644
--- a/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfs/HdfsMirroringExtensionProperties.java
+++ b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfs/HdfsMirroringExtensionProperties.java
@@ -23,13 +23,13 @@
  */
 public enum HdfsMirroringExtensionProperties {
     SOURCE_DIR("sourceDir", "Location of source data to replicate"),
-    SOURCE_CLUSTER("sourceCluster", "Source cluster"),
-    SOURCE_CLUSTER_FS_WRITE_ENDPOINT("sourceClusterFS", "Source cluster end point", false),
+    SOURCE_CLUSTER("sourceCluster", "Source cluster", false),
+    SOURCE_CLUSTER_FS_READ_ENDPOINT("sourceClusterFS", "Source cluster end point", false),
     TARGET_DIR("targetDir", "Location on target cluster for replication"),
-    TARGET_CLUSTER("targetCluster", "Target cluster"),
+    TARGET_CLUSTER("targetCluster", "Target cluster", false),
     TARGET_CLUSTER_FS_WRITE_ENDPOINT("targetClusterFS", "Target cluster end point", false),
     DISTCP_MAX_MAPS("distcpMaxMaps", "Maximum number of maps used during replication", false),
-    DISTCP_MAP_BANDWIDTH_IN_MB("distcpMapBandwidth", "Bandwidth in MB/s used by each mapper during replication",
+    DISTCP_MAP_BANDWIDTH_IN_MB("distcpMapBandwidth", "Bandwidth in MB/s usCOed by each mapper during replication",
             false),
     TDE_ENCRYPTION_ENABLED("tdeEncryptionEnabled", "Set to true if TDE encryption is enabled", false);
 
diff --git a/extensions/src/test/java/org/apache/falcon/extensions/ExtensionTest.java b/extensions/src/test/java/org/apache/falcon/extensions/ExtensionTest.java
index 4763db8..f6bfe17 100644
--- a/extensions/src/test/java/org/apache/falcon/extensions/ExtensionTest.java
+++ b/extensions/src/test/java/org/apache/falcon/extensions/ExtensionTest.java
@@ -213,7 +213,7 @@
         Assert.assertEquals(extensionStorePath + "/hdfs-mirroring/libs",
                 processEntity.getWorkflow().getLib());
         Assert.assertEquals(extensionStorePath
-                + "/hdfs-mirroring/resources/runtime/hdfs-mirroring-workflow.xml",
+                        + "/hdfs-mirroring/resources/runtime/hdfs-mirroring-workflow.xml",
                 processEntity.getWorkflow().getPath());
 
         Properties props = EntityUtil.getEntityProperties(processEntity);
@@ -221,7 +221,8 @@
         String srcClusterEndPoint = ClusterHelper.getReadOnlyStorageUrl(ClusterHelper.getCluster(SOURCE_CLUSTER));
         Assert.assertEquals(srcClusterEndPoint + SOURCEDIR, props.getProperty("sourceDir"));
         Assert.assertEquals(SOURCE_CLUSTER, props.getProperty("sourceCluster"));
-        Assert.assertEquals(TARGETDIR, props.getProperty("targetDir"));
+        String tgtClusterEndPoint = ClusterHelper.getStorageUrl(ClusterHelper.getCluster(TARGET_CLUSTER));
+        Assert.assertEquals(tgtClusterEndPoint + TARGETDIR, props.getProperty("targetDir"));
         Assert.assertEquals(TARGET_CLUSTER, props.getProperty("targetCluster"));
 
         //retry
diff --git a/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java b/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java
index b862111..9672272 100644
--- a/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java
+++ b/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java
@@ -54,8 +54,9 @@
 
     private static final String[] LIBS = StartupProperties.get().getProperty("shared.libs").split(",");
 
-    private static class FalconLibPath implements  FalconPathFilter {
+    private static class FalconLibPath implements FalconPathFilter {
         private String[] shareLibs;
+
         FalconLibPath(String[] libList) {
             this.shareLibs = Arrays.copyOf(libList, libList.length);
         }
@@ -79,7 +80,8 @@
             }
             throw new IllegalArgumentException(path + " is not accepted!");
         }
-    };
+    }
+
 
     private void pushExtensionArtifactsToCluster(final Cluster cluster,
                                                  final FileSystem clusterFs) throws FalconException {
@@ -95,6 +97,7 @@
             return;
         }
 
+        final String filterPath = "/apps/falcon/extensions/mirroring/";
         Path extensionStorePath = store.getExtensionStorePath();
         LOG.info("extensionStorePath :{}", extensionStorePath);
         FileSystem falconFileSystem =
@@ -119,6 +122,12 @@
                 LocatedFileStatus srcfileStatus = fileStatusListIterator.next();
                 Path filePath = Path.getPathWithoutSchemeAndAuthority(srcfileStatus.getPath());
 
+                if (filePath != null && filePath.toString().startsWith(filterPath)) {
+                    /* HiveDR uses filter path as store path in DRStatusStore, so skip it. Copy only the extension
+                     artifacts */
+                    continue;
+                }
+
                 if (srcfileStatus.isDirectory()) {
                     if (!clusterFs.exists(filePath)) {
                         HadoopClientFactory.mkdirs(clusterFs, filePath, srcfileStatus.getPermission());
@@ -160,13 +169,13 @@
                     nonFalconJarFilter);
             pushLibsToHDFS(fs, properties.getProperty("libext.paths"), libext, null);
             pushLibsToHDFS(fs, properties.getProperty("libext.feed.paths"),
-                    new Path(libext, EntityType.FEED.name()) , null);
+                    new Path(libext, EntityType.FEED.name()), null);
             pushLibsToHDFS(fs, properties.getProperty("libext.feed.replication.paths"),
                     new Path(libext, EntityType.FEED.name() + "/replication"), null);
             pushLibsToHDFS(fs, properties.getProperty("libext.feed.retention.paths"),
                     new Path(libext, EntityType.FEED.name() + "/retention"), null);
             pushLibsToHDFS(fs, properties.getProperty("libext.process.paths"),
-                    new Path(libext, EntityType.PROCESS.name()) , null);
+                    new Path(libext, EntityType.PROCESS.name()), null);
         } catch (IOException e) {
             throw new FalconException("Failed to copy shared libs to cluster" + cluster.getName(), e);
         }
@@ -181,9 +190,9 @@
         LOG.debug("Copying libs from {}", src);
         createTargetPath(fs, target);
 
-        for(String srcPaths : src.split(",")) {
+        for (String srcPaths : src.split(",")) {
             File srcFile = new File(srcPaths);
-            File[] srcFiles = new File[] { srcFile };
+            File[] srcFiles = new File[]{srcFile};
             if (srcFiles != null) {
                 if (srcFile.isDirectory()) {
                     srcFiles = srcFile.listFiles();
diff --git a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
index 6148979..b0c6edf 100644
--- a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
+++ b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
@@ -333,7 +333,7 @@
             entities = ExtensionHandler.loadAndPrepare(extensionName, jobName, configStream,
                     packagePath);
         } catch (FalconException | IOException | URISyntaxException e) {
-            throw new FalconCLIException("Failed in generating entities for job:" + jobName);
+            throw new FalconCLIException("Failed in generating entities for job:" + jobName, e);
         }
         return entities;
     }