SQOOP-3013: Configuration "tmpjars" is not checked for empty strings before passing to MR
(Erzsebet Szilagyi via Attila Szabo)
diff --git a/src/java/org/apache/sqoop/mapreduce/JobBase.java b/src/java/org/apache/sqoop/mapreduce/JobBase.java
index 7ed2684..256d4f7 100644
--- a/src/java/org/apache/sqoop/mapreduce/JobBase.java
+++ b/src/java/org/apache/sqoop/mapreduce/JobBase.java
@@ -34,6 +34,7 @@
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.util.StringUtils;
+import org.apache.sqoop.config.ConfigurationConstants;
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.config.ConfigurationHelper;
import com.cloudera.sqoop.manager.ConnManager;
@@ -195,21 +196,39 @@
}
}
+ String tmpjars = conf.get(ConfigurationConstants.MAPRED_DISTCACHE_CONF_PARAM);
+ StringBuilder sb = new StringBuilder();
+
// If we didn't put anything in our set, then there's nothing to cache.
- if (localUrls.isEmpty()) {
+ if (localUrls.isEmpty() && (org.apache.commons.lang.StringUtils.isEmpty(tmpjars))) {
return;
}
+ if (null != tmpjars) {
+ String[] tmpjarsElements = tmpjars.split(",");
+ for (String jarElement : tmpjarsElements) {
+ if (jarElement.isEmpty()) {
+ warn("Empty input is invalid and was removed from tmpjars.");
+ } else {
+ sb.append(jarElement);
+ sb.append(",");
+ }
+ }
+ }
+
+ int lastComma = sb.lastIndexOf(",");
+ if (localUrls.isEmpty() && lastComma >= 0) {
+ sb.deleteCharAt(lastComma);
+ }
+
// Add these to the 'tmpjars' array, which the MR JobSubmitter
// will upload to HDFS and put in the DistributedCache libjars.
- String tmpjars = conf.get("tmpjars");
- StringBuilder sb = new StringBuilder();
- if (null != tmpjars) {
- sb.append(tmpjars);
- sb.append(",");
- }
sb.append(StringUtils.arrayToString(localUrls.toArray(new String[0])));
- conf.set("tmpjars", sb.toString());
+ conf.set(ConfigurationConstants.MAPRED_DISTCACHE_CONF_PARAM, sb.toString());
+ }
+
+ protected void warn(String message) {
+ LOG.warn(message);
}
private void addToCache(String file, FileSystem fs, Set<String> localUrls) {
diff --git a/src/test/org/apache/sqoop/mapreduce/TestJobBase.java b/src/test/org/apache/sqoop/mapreduce/TestJobBase.java
new file mode 100644
index 0000000..f228a35
--- /dev/null
+++ b/src/test/org/apache/sqoop/mapreduce/TestJobBase.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.sqoop.config.ConfigurationConstants;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.mapreduce.JobBase;
+
+import junit.framework.TestCase;
+
+public class TestJobBase extends TestCase {
+
+ SqoopOptions options;
+ Configuration conf;
+ JobBase jobBase;
+ Job job;
+
+ @Before
+ public void setUp() {
+ // set Sqoop command line arguments
+ options = new SqoopOptions();
+ conf = options.getConf();
+ jobBase = spy(new JobBase(options));
+ }
+
+ private void tmpjarsValidatingSeed(String tmpjarsInput) throws IOException {
+
+ // call cacheJars(...)
+ conf.set(ConfigurationConstants.MAPRED_DISTCACHE_CONF_PARAM, tmpjarsInput);
+ job = jobBase.createJob(conf);
+ jobBase.cacheJars(job, null);
+
+ }
+
+ public void tmpjarsValidatingVerif(String expectedOutput, int numWarnings) throws IOException {
+ // check outputs
+ assertEquals("Expected " + expectedOutput + "but received something different", expectedOutput,
+ job.getConfiguration().get(ConfigurationConstants.MAPRED_DISTCACHE_CONF_PARAM));
+
+ // check for warnings
+ verify(jobBase, times(numWarnings))
+ .warn("Empty input is invalid and was removed from tmpjars.");
+ }
+
+ @Test
+ public void testTmpjarsValidatingMultipleValidInputs() throws IOException {
+
+ String tmpjarsInput = "valid,validother";
+ String expectedOutput = "valid,validother";
+
+ tmpjarsValidatingSeed(tmpjarsInput);
+ tmpjarsValidatingVerif(expectedOutput, 0);
+ }
+
+ @Test
+ public void testTmpjarsValidatingFullEmptyInput() throws IOException {
+
+ String tmpjarsInput = "";
+ String expectedOutput = "";
+
+ tmpjarsValidatingSeed(tmpjarsInput);
+ tmpjarsValidatingVerif(expectedOutput, 0);
+ }
+
+ @Test
+ public void testTmpjarsValidatingMixedInput() throws IOException {
+
+ String tmpjarsInput = ",,valid,,,validother,,";
+ String expectedOutput = "valid,validother";
+
+ tmpjarsValidatingSeed(tmpjarsInput);
+ tmpjarsValidatingVerif(expectedOutput, 4);
+ }
+
+}