SQOOP-3136: Add support to Sqoop being able to handle different file
system urls (e.g. s3a://some-bucket/tmp/sqoop)
(Illya Yalovyy via Attila Szabo)
diff --git a/src/java/com/cloudera/sqoop/io/LobReaderCache.java b/src/java/com/cloudera/sqoop/io/LobReaderCache.java
index 3394296..89d31d3 100644
--- a/src/java/com/cloudera/sqoop/io/LobReaderCache.java
+++ b/src/java/com/cloudera/sqoop/io/LobReaderCache.java
@@ -59,7 +59,7 @@
*/
public static Path qualify(Path path, Configuration conf)
throws IOException {
- return org.apache.sqoop.io.LobReaderCache.qualify(path, conf);
+ return org.apache.sqoop.util.FileSystemUtil.makeQualified(path, conf);
}
}
diff --git a/src/java/org/apache/sqoop/hive/HiveImport.java b/src/java/org/apache/sqoop/hive/HiveImport.java
index 4828375..153d091 100644
--- a/src/java/org/apache/sqoop/hive/HiveImport.java
+++ b/src/java/org/apache/sqoop/hive/HiveImport.java
@@ -115,7 +115,7 @@
* from where we put it, before running Hive LOAD DATA INPATH.
*/
private void removeTempLogs(Path tablePath) throws IOException {
- FileSystem fs = FileSystem.get(configuration);
+ FileSystem fs = tablePath.getFileSystem(configuration);
Path logsPath = new Path(tablePath, "_logs");
if (fs.exists(logsPath)) {
LOG.info("Removing temporary files from import process: " + logsPath);
@@ -263,7 +263,7 @@
* @throws IOException
*/
private void cleanUp(Path outputPath) throws IOException {
- FileSystem fs = FileSystem.get(configuration);
+ FileSystem fs = outputPath.getFileSystem(configuration);
// HIVE is not always removing input directory after LOAD DATA statement
// (which is our export directory). We're removing export directory in case
diff --git a/src/java/org/apache/sqoop/hive/TableDefWriter.java b/src/java/org/apache/sqoop/hive/TableDefWriter.java
index c9962e9..32fcca3 100644
--- a/src/java/org/apache/sqoop/hive/TableDefWriter.java
+++ b/src/java/org/apache/sqoop/hive/TableDefWriter.java
@@ -36,6 +36,7 @@
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.manager.ConnManager;
+import org.apache.sqoop.util.FileSystemUtil;
/**
* Creates (Hive-specific) SQL DDL statements to create tables to hold data
@@ -271,8 +272,7 @@
} else {
tablePath = warehouseDir + inputTableName;
}
- FileSystem fs = FileSystem.get(configuration);
- return new Path(tablePath).makeQualified(fs);
+ return FileSystemUtil.makeQualified(new Path(tablePath), configuration);
}
/**
diff --git a/src/java/org/apache/sqoop/io/LobReaderCache.java b/src/java/org/apache/sqoop/io/LobReaderCache.java
index bd75374..dbfa4f1 100644
--- a/src/java/org/apache/sqoop/io/LobReaderCache.java
+++ b/src/java/org/apache/sqoop/io/LobReaderCache.java
@@ -24,10 +24,10 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import com.cloudera.sqoop.io.LobFile;
+import org.apache.sqoop.util.FileSystemUtil;
/**
* A cache of open LobFile.Reader objects.
@@ -55,7 +55,7 @@
throws IOException {
LobFile.Reader reader = null;
- Path canonicalPath = qualify(path, conf);
+ Path canonicalPath = FileSystemUtil.makeQualified(path, conf);
// Look up an entry in the cache.
synchronized(this) {
reader = readerMap.remove(canonicalPath);
@@ -111,24 +111,4 @@
protected LobReaderCache() {
this.readerMap = new TreeMap<Path, LobFile.Reader>();
}
-
- /**
- * Created a fully-qualified path object.
- * @param path the path to fully-qualify with its fs URI.
- * @param conf the current Hadoop FS configuration.
- * @return a new path representing the same location as the input 'path',
- * but with a fully-qualified URI.
- */
- public static Path qualify(Path path, Configuration conf)
- throws IOException {
- if (null == path) {
- return null;
- }
-
- FileSystem fs = path.getFileSystem(conf);
- if (null == fs) {
- fs = FileSystem.get(conf);
- }
- return path.makeQualified(fs);
- }
}
diff --git a/src/java/org/apache/sqoop/io/SplittingOutputStream.java b/src/java/org/apache/sqoop/io/SplittingOutputStream.java
index 5f98192..129b508 100644
--- a/src/java/org/apache/sqoop/io/SplittingOutputStream.java
+++ b/src/java/org/apache/sqoop/io/SplittingOutputStream.java
@@ -28,6 +28,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.sqoop.util.FileSystemUtil;
/**
* An output stream that writes to an underlying filesystem, opening
@@ -90,7 +91,7 @@
FileSystem fs = destFile.getFileSystem(conf);
LOG.debug("Opening next output file: " + destFile);
if (fs.exists(destFile)) {
- Path canonicalDest = destFile.makeQualified(fs);
+ Path canonicalDest = fs.makeQualified(destFile);
throw new IOException("Destination file " + canonicalDest
+ " already exists");
}
diff --git a/src/java/org/apache/sqoop/lib/LargeObjectLoader.java b/src/java/org/apache/sqoop/lib/LargeObjectLoader.java
index 70c0f4e..b8525fe 100644
--- a/src/java/org/apache/sqoop/lib/LargeObjectLoader.java
+++ b/src/java/org/apache/sqoop/lib/LargeObjectLoader.java
@@ -79,7 +79,7 @@
throws IOException {
this.conf = conf;
this.workPath = workPath;
- this.fs = FileSystem.get(conf);
+ this.fs = workPath.getFileSystem(conf);
this.curBlobWriter = null;
this.curClobWriter = null;
}
diff --git a/src/java/org/apache/sqoop/manager/oracle/OraOopUtilities.java b/src/java/org/apache/sqoop/manager/oracle/OraOopUtilities.java
index e81588c..e73fd68 100644
--- a/src/java/org/apache/sqoop/manager/oracle/OraOopUtilities.java
+++ b/src/java/org/apache/sqoop/manager/oracle/OraOopUtilities.java
@@ -714,16 +714,16 @@
Path uniqueFileName = null;
try {
- FileSystem fileSystem = FileSystem.get(conf);
-
// NOTE: This code is not thread-safe.
// i.e. A race-condition could still cause this code to 'fail'.
int suffix = 0;
String fileNameTemplate = fileName + "%s";
+ Path outputDirectory = new Path(getOutputDirectory(conf));
+ FileSystem fileSystem = outputDirectory.getFileSystem(conf);
while (true) {
uniqueFileName =
- new Path(getOutputDirectory(conf), String.format(fileNameTemplate,
+ new Path(outputDirectory, String.format(fileNameTemplate,
suffix == 0 ? "" : String.format(" (%d)", suffix)));
if (!fileSystem.exists(uniqueFileName)) {
break;
diff --git a/src/java/org/apache/sqoop/mapreduce/CombineFileInputFormat.java b/src/java/org/apache/sqoop/mapreduce/CombineFileInputFormat.java
index e08f997..fd2cf89 100644
--- a/src/java/org/apache/sqoop/mapreduce/CombineFileInputFormat.java
+++ b/src/java/org/apache/sqoop/mapreduce/CombineFileInputFormat.java
@@ -47,6 +47,7 @@
import org.apache.hadoop.net.NetworkTopology;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.sqoop.util.FileSystemUtil;
/**
* This file was ported from Hadoop 2.0.2-alpha
@@ -224,11 +225,9 @@
// times, one time each for each pool in the next loop.
List<Path> newpaths = new LinkedList<Path>();
for (int i = 0; i < paths.length; i++) {
- FileSystem fs = paths[i].getFileSystem(conf);
-
//the scheme and authority will be kept if the path is
//a valid path for a non-default file system
- Path p = fs.makeQualified(paths[i]);
+ Path p = FileSystemUtil.makeQualified(paths[i], conf);
newpaths.add(p);
}
paths = null;
diff --git a/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java b/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
index 260bc29..dc49282 100644
--- a/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
@@ -28,6 +28,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
@@ -48,6 +49,7 @@
import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
import com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat;
import com.cloudera.sqoop.orm.AvroSchemaGenerator;
+import org.apache.sqoop.util.FileSystemUtil;
import org.kitesdk.data.Datasets;
import org.kitesdk.data.mapreduce.DatasetKeyOutputFormat;
@@ -141,8 +143,8 @@
options.getHiveTableName();
return String.format("dataset:hive:/%s/%s", hiveDatabase, hiveTable);
} else {
- FileSystem fs = FileSystem.get(conf);
- return "dataset:" + fs.makeQualified(getContext().getDestination());
+ Path destination = getContext().getDestination();
+ return "dataset:" + FileSystemUtil.makeQualified(destination, conf);
}
}
diff --git a/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java b/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java
index 27f84da..c7609a5 100644
--- a/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java
+++ b/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java
@@ -50,6 +50,7 @@
import java.io.IOException;
import java.sql.SQLException;
import java.util.Date;
+import org.apache.sqoop.util.FileSystemUtil;
/**
* Base class for running an export MapReduce job.
@@ -232,7 +233,7 @@
}
Path inputPath = new Path(context.getOptions().getExportDir());
Configuration conf = options.getConf();
- inputPath = inputPath.makeQualified(FileSystem.get(conf));
+ inputPath = FileSystemUtil.makeQualified(inputPath, conf);
return inputPath;
}
diff --git a/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportJob.java b/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportJob.java
index b32cdd1..2bbfffe 100644
--- a/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportJob.java
@@ -92,11 +92,10 @@
protected void completeImport(Job job) throws IOException, ImportException {
super.completeImport(job);
- FileSystem fileSystem = FileSystem.get(job.getConfiguration());
-
// Make the bulk load files source directory accessible to the world
// so that the hbase user can deal with it
Path bulkLoadDir = getContext().getDestination();
+ FileSystem fileSystem = bulkLoadDir.getFileSystem(job.getConfiguration());
setPermission(fileSystem, fileSystem.getFileStatus(bulkLoadDir),
FsPermission.createImmutable((short) 00777));
@@ -120,8 +119,9 @@
protected void jobTeardown(Job job) throws IOException, ImportException {
super.jobTeardown(job);
// Delete the hfiles directory after we are finished.
- FileSystem fileSystem = FileSystem.get(job.getConfiguration());
- fileSystem.delete(getContext().getDestination(), true);
+ Path destination = getContext().getDestination();
+ FileSystem fileSystem = destination.getFileSystem(job.getConfiguration());
+ fileSystem.delete(destination, true);
}
/**
diff --git a/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java b/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java
index 626119b..6f9afaf 100644
--- a/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java
@@ -25,7 +25,6 @@
import com.cloudera.sqoop.mapreduce.db.DBOutputFormat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.DefaultStringifier;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
@@ -38,6 +37,7 @@
import java.io.IOException;
import java.util.Map;
+import org.apache.sqoop.util.FileSystemUtil;
/**
* Run an export using JDBC (JDBC-based ExportOutputFormat).
@@ -79,8 +79,7 @@
} else if (fileType == FileType.PARQUET_FILE) {
LOG.debug("Configuring for Parquet export");
configureGenericRecordExportInputFormat(job, tableName);
- FileSystem fs = FileSystem.get(job.getConfiguration());
- String uri = "dataset:" + fs.makeQualified(getInputPath());
+ String uri = "dataset:" + FileSystemUtil.makeQualified(getInputPath(), job.getConfiguration());
DatasetKeyInputFormat.configure(job).readFrom(uri);
}
}
diff --git a/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java b/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java
index f911280..d13b560 100644
--- a/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java
@@ -26,7 +26,6 @@
import java.util.StringTokenizer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.DefaultStringifier;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
@@ -43,6 +42,7 @@
import com.cloudera.sqoop.mapreduce.ExportJobBase;
import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
import com.cloudera.sqoop.mapreduce.db.DBOutputFormat;
+import org.apache.sqoop.util.FileSystemUtil;
/**
* Run an update-based export using JDBC (JDBC-based UpdateOutputFormat).
@@ -187,8 +187,7 @@
} else if (fileType == FileType.PARQUET_FILE) {
LOG.debug("Configuring for Parquet export");
configureGenericRecordExportInputFormat(job, tableName);
- FileSystem fs = FileSystem.get(job.getConfiguration());
- String uri = "dataset:" + fs.makeQualified(getInputPath());
+ String uri = "dataset:" + FileSystemUtil.makeQualified(getInputPath(), job.getConfiguration());
DatasetKeyInputFormat.configure(job).readFrom(uri);
}
}
diff --git a/src/java/org/apache/sqoop/mapreduce/MergeJob.java b/src/java/org/apache/sqoop/mapreduce/MergeJob.java
index 5b6c4df..8b1cba3 100644
--- a/src/java/org/apache/sqoop/mapreduce/MergeJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/MergeJob.java
@@ -46,6 +46,7 @@
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.mapreduce.JobBase;
+import org.apache.sqoop.util.FileSystemUtil;
/**
* Run a MapReduce job that merges two datasets.
@@ -111,9 +112,9 @@
Path newPath = new Path(options.getMergeNewPath());
Configuration jobConf = job.getConfiguration();
- FileSystem fs = FileSystem.get(jobConf);
- oldPath = oldPath.makeQualified(fs);
- newPath = newPath.makeQualified(fs);
+
+ oldPath = FileSystemUtil.makeQualified(oldPath, jobConf);
+ newPath = FileSystemUtil.makeQualified(newPath, jobConf);
propagateOptionsToJob(job);
diff --git a/src/java/org/apache/sqoop/tool/ImportTool.java b/src/java/org/apache/sqoop/tool/ImportTool.java
index 258ef79..d1c9749 100644
--- a/src/java/org/apache/sqoop/tool/ImportTool.java
+++ b/src/java/org/apache/sqoop/tool/ImportTool.java
@@ -300,7 +300,6 @@
return true;
}
- FileSystem fs = FileSystem.get(options.getConf());
SqoopOptions.IncrementalMode incrementalMode = options.getIncrementalMode();
String nextIncrementalValue = null;
@@ -325,11 +324,14 @@
}
break;
case DateLastModified:
- if (options.getMergeKeyCol() == null && !options.isAppendMode()
- && fs.exists(getOutputPath(options, context.getTableName(), false))) {
- throw new ImportException("--" + MERGE_KEY_ARG + " or " + "--" + APPEND_ARG
- + " is required when using --" + this.INCREMENT_TYPE_ARG
- + " lastmodified and the output directory exists.");
+ if (options.getMergeKeyCol() == null && !options.isAppendMode()) {
+ Path outputPath = getOutputPath(options, context.getTableName(), false);
+ FileSystem fs = outputPath.getFileSystem(options.getConf());
+ if (fs.exists(outputPath)) {
+ throw new ImportException("--" + MERGE_KEY_ARG + " or " + "--" + APPEND_ARG
+ + " is required when using --" + this.INCREMENT_TYPE_ARG
+ + " lastmodified and the output directory exists.");
+ }
}
checkColumnType = manager.getColumnTypes(options.getTableName(),
options.getSqlQuery()).get(options.getIncrementalTestColumn());
@@ -436,10 +438,14 @@
* Merge HDFS output directories
*/
protected void lastModifiedMerge(SqoopOptions options, ImportJobContext context) throws IOException {
- FileSystem fs = FileSystem.get(options.getConf());
- if (context.getDestination() != null && fs.exists(context.getDestination())) {
+ if (context.getDestination() == null) {
+ return;
+ }
+
+ Path userDestDir = getOutputPath(options, context.getTableName(), false);
+ FileSystem fs = userDestDir.getFileSystem(options.getConf());
+ if (fs.exists(context.getDestination())) {
LOG.info("Final destination exists, will run merge job.");
- Path userDestDir = getOutputPath(options, context.getTableName(), false);
if (fs.exists(userDestDir)) {
String tableClassName = null;
if (!context.getConnManager().isORMFacilitySelfManaged()) {
@@ -541,8 +547,8 @@
private void deleteTargetDir(ImportJobContext context) throws IOException {
SqoopOptions options = context.getOptions();
- FileSystem fs = FileSystem.get(options.getConf());
Path destDir = context.getDestination();
+ FileSystem fs = destDir.getFileSystem(options.getConf());
if (fs.exists(destDir)) {
fs.delete(destDir, true);
diff --git a/src/java/org/apache/sqoop/util/FileSystemUtil.java b/src/java/org/apache/sqoop/util/FileSystemUtil.java
new file mode 100644
index 0000000..1493e09
--- /dev/null
+++ b/src/java/org/apache/sqoop/util/FileSystemUtil.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.util;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+public final class FileSystemUtil {
+ private FileSystemUtil() {
+ }
+
+
+ /**
+ * Creates a fully-qualified path object.
+ * @param path the path to fully-qualify with its file system URI.
+ * @param conf the current Hadoop configuration.
+ * @return a new path representing the same location as the input path,
+ * but with a fully-qualified URI. Returns {@code null} if provided path is {@code null};
+ */
+ public static Path makeQualified(Path path, Configuration conf)
+ throws IOException {
+ if (null == path) {
+ return null;
+ }
+
+ return path.getFileSystem(conf).makeQualified(path);
+ }
+}
diff --git a/src/java/org/apache/sqoop/util/FileUploader.java b/src/java/org/apache/sqoop/util/FileUploader.java
index 155cffc..673a05b 100644
--- a/src/java/org/apache/sqoop/util/FileUploader.java
+++ b/src/java/org/apache/sqoop/util/FileUploader.java
@@ -18,16 +18,11 @@
package org.apache.sqoop.util;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
import java.io.IOException;
-import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -40,15 +35,14 @@
public static void uploadFilesToDFS(String srcBasePath, String src,
String destBasePath, String dest, Configuration conf) throws IOException {
- FileSystem fs = FileSystem.get(conf);
- Path targetPath = null;
Path srcPath = new Path(srcBasePath, src);
- if (destBasePath == null || destBasePath.length() == 0) {
+ if (destBasePath == null || destBasePath.isEmpty()) {
destBasePath = ".";
}
- targetPath = new Path(destBasePath, dest);
+ Path targetPath = new Path(destBasePath, dest);
+ FileSystem fs = targetPath.getFileSystem(conf);
if (!fs.exists(targetPath)) {
fs.mkdirs(targetPath);
diff --git a/src/test/org/apache/sqoop/util/TestFileSystemUtil.java b/src/test/org/apache/sqoop/util/TestFileSystemUtil.java
new file mode 100644
index 0000000..fef74af
--- /dev/null
+++ b/src/test/org/apache/sqoop/util/TestFileSystemUtil.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.util;
+
+import java.io.IOException;
+import java.net.URI;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.junit.Test;
+import org.junit.Before;
+import static org.junit.Assert.*;
+
+public class TestFileSystemUtil {
+ private Configuration conf;
+
+ @Before
+ public void setUp() {
+ conf = new Configuration();
+ conf.set("fs.my.impl", MyFileSystem.class.getTypeName());
+ }
+
+ @Test
+ public void testMakeQualifiedWhenPathIsNullThenReturnsNull() throws IOException {
+ assertNull(FileSystemUtil.makeQualified(null, conf));
+ }
+
+ @Test
+ public void testMakeQualifiedWhenPathIsRelativeThenReturnDefault() throws IOException {
+ Path actual = FileSystemUtil.makeQualified(new Path("foo/bar"), conf);
+ assertEquals("file", actual.toUri().getScheme());
+ }
+
+ @Test
+ public void testMakeQualifiedWhenPathHasCustomSchemaThenReturnSameSchema() throws IOException {
+ Path actual = FileSystemUtil.makeQualified(new Path("my:/foo/bar"), conf);
+ assertEquals("my", actual.toUri().getScheme());
+ }
+
+ @Test(expected = IOException.class)
+ public void testMakeQualifiedWhenPathHasBadSchemaThenThrowsIOException() throws IOException {
+ FileSystemUtil.makeQualified(new Path("nosuchfs://foo/bar"), conf);
+ }
+
+ public static final class MyFileSystem extends RawLocalFileSystem {
+ @Override
+ public URI getUri() { return URI.create("my:///"); }
+ }
+}