SQOOP-3013: Configuration "tmpjars" is not checked for empty strings before passing to MR

(Erzsebet Szilagyi via Attila Szabo)
diff --git a/src/java/org/apache/sqoop/mapreduce/JobBase.java b/src/java/org/apache/sqoop/mapreduce/JobBase.java
index 7ed2684..256d4f7 100644
--- a/src/java/org/apache/sqoop/mapreduce/JobBase.java
+++ b/src/java/org/apache/sqoop/mapreduce/JobBase.java
@@ -34,6 +34,7 @@
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.sqoop.config.ConfigurationConstants;
 import com.cloudera.sqoop.SqoopOptions;
 import com.cloudera.sqoop.config.ConfigurationHelper;
 import com.cloudera.sqoop.manager.ConnManager;
@@ -195,21 +196,39 @@
       }
     }
 
+    String tmpjars = conf.get(ConfigurationConstants.MAPRED_DISTCACHE_CONF_PARAM);
+    StringBuilder sb = new StringBuilder();
+
     // If we didn't put anything in our set, then there's nothing to cache.
-    if (localUrls.isEmpty()) {
+    if (localUrls.isEmpty() && (org.apache.commons.lang.StringUtils.isEmpty(tmpjars))) {
       return;
     }
 
+    if (null != tmpjars) {
+      String[] tmpjarsElements = tmpjars.split(",");
+      for (String jarElement : tmpjarsElements) {
+        if (jarElement.isEmpty()) {
+          warn("Empty input is invalid and was removed from tmpjars.");
+        } else {
+          sb.append(jarElement);
+          sb.append(",");
+        }
+      }
+    }
+
+    int lastComma = sb.lastIndexOf(",");
+    if (localUrls.isEmpty() && lastComma >= 0) {
+      sb.deleteCharAt(lastComma);
+    }
+
     // Add these to the 'tmpjars' array, which the MR JobSubmitter
     // will upload to HDFS and put in the DistributedCache libjars.
-    String tmpjars = conf.get("tmpjars");
-    StringBuilder sb = new StringBuilder();
-    if (null != tmpjars) {
-      sb.append(tmpjars);
-      sb.append(",");
-    }
     sb.append(StringUtils.arrayToString(localUrls.toArray(new String[0])));
-    conf.set("tmpjars", sb.toString());
+    conf.set(ConfigurationConstants.MAPRED_DISTCACHE_CONF_PARAM, sb.toString());
+  }
+
+  protected void warn(String message) {
+    LOG.warn(message);
   }
 
   private void addToCache(String file, FileSystem fs, Set<String> localUrls) {
diff --git a/src/test/org/apache/sqoop/mapreduce/TestJobBase.java b/src/test/org/apache/sqoop/mapreduce/TestJobBase.java
new file mode 100644
index 0000000..f228a35
--- /dev/null
+++ b/src/test/org/apache/sqoop/mapreduce/TestJobBase.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.sqoop.config.ConfigurationConstants;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.mapreduce.JobBase;
+
+import junit.framework.TestCase;
+
+public class TestJobBase extends TestCase {
+
+  SqoopOptions options;
+  Configuration conf;
+  JobBase jobBase;
+  Job job;
+
+  @Before
+  public void setUp() {
+    // set Sqoop command line arguments
+    options = new SqoopOptions();
+    conf = options.getConf();
+    jobBase = spy(new JobBase(options));
+  }
+
+  private void tmpjarsValidatingSeed(String tmpjarsInput) throws IOException {
+
+    // call cacheJars(...)
+    conf.set(ConfigurationConstants.MAPRED_DISTCACHE_CONF_PARAM, tmpjarsInput);
+    job = jobBase.createJob(conf);
+    jobBase.cacheJars(job, null);
+
+  }
+
+  public void tmpjarsValidatingVerif(String expectedOutput, int numWarnings) throws IOException {
+    // check outputs
+    assertEquals("Expected " + expectedOutput + "but received something different", expectedOutput,
+        job.getConfiguration().get(ConfigurationConstants.MAPRED_DISTCACHE_CONF_PARAM));
+
+    // check for warnings
+    verify(jobBase, times(numWarnings))
+        .warn("Empty input is invalid and was removed from tmpjars.");
+  }
+
+  @Test
+  public void testTmpjarsValidatingMultipleValidInputs() throws IOException {
+
+    String tmpjarsInput = "valid,validother";
+    String expectedOutput = "valid,validother";
+
+    tmpjarsValidatingSeed(tmpjarsInput);
+    tmpjarsValidatingVerif(expectedOutput, 0);
+  }
+
+  @Test
+  public void testTmpjarsValidatingFullEmptyInput() throws IOException {
+
+    String tmpjarsInput = "";
+    String expectedOutput = "";
+
+    tmpjarsValidatingSeed(tmpjarsInput);
+    tmpjarsValidatingVerif(expectedOutput, 0);
+  }
+
+  @Test
+  public void testTmpjarsValidatingMixedInput() throws IOException {
+
+    String tmpjarsInput = ",,valid,,,validother,,";
+    String expectedOutput = "valid,validother";
+
+    tmpjarsValidatingSeed(tmpjarsInput);
+    tmpjarsValidatingVerif(expectedOutput, 4);
+  }
+
+}