CRUNCH-677 Source and Target accept FileSystem
diff --git a/crunch-core/pom.xml b/crunch-core/pom.xml
index cd77373..81842d2 100644
--- a/crunch-core/pom.xml
+++ b/crunch-core/pom.xml
@@ -129,6 +129,12 @@
</dependency>
<dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-minicluster</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
diff --git a/crunch-core/src/it/java/org/apache/crunch/ExternalFilesystemIT.java b/crunch-core/src/it/java/org/apache/crunch/ExternalFilesystemIT.java
new file mode 100644
index 0000000..75a2837
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/ExternalFilesystemIT.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import static org.apache.hadoop.hdfs.MiniDFSCluster.HDFS_MINIDFS_BASEDIR;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import org.apache.commons.io.IOUtils;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.From;
+import org.apache.crunch.io.To;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.MiniDFSCluster.Builder;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+/**
+ * Tests reading and writing from a FileSystem for which the Configuration is separate from the Pipeline's
+ * Configuration.
+ */
+public class ExternalFilesystemIT {
+
+ @ClassRule
+ public static TemporaryPath tmpDir1 = TemporaryPaths.create();
+
+ @ClassRule
+ public static TemporaryPath tmpDir2 = TemporaryPaths.create();
+
+ @ClassRule
+ public static TemporaryPath tmpDir3 = TemporaryPaths.create();
+
+ private static FileSystem dfsCluster1;
+ private static FileSystem dfsCluster2;
+ private static FileSystem defaultFs;
+
+ private static Collection<MiniDFSCluster> miniDFSClusters = new ArrayList<>();
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ dfsCluster1 = createHaMiniClusterFs("cluster1", tmpDir1);
+ dfsCluster2 = createHaMiniClusterFs("cluster2", tmpDir2);
+ defaultFs = createHaMiniClusterFs("default", tmpDir3);
+ }
+
+ @AfterClass
+ public static void teardown() throws IOException {
+ dfsCluster1.close();
+ dfsCluster2.close();
+ defaultFs.close();
+ for (MiniDFSCluster miniDFSCluster : miniDFSClusters) {
+ miniDFSCluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testReadWrite() throws Exception {
+ // write a test file outside crunch
+ Path path = new Path("hdfs://cluster1/input.txt");
+ String testString = "Hello world!";
+ try (PrintWriter printWriter = new PrintWriter(dfsCluster1.create(path, true))) {
+ printWriter.println(testString);
+ }
+
+ // assert it can be read back using a Pipeline with config that doesn't know the FileSystem
+ Iterable<String> strings = new MRPipeline(getClass(), minimalConfiguration())
+ .read(From.textFile(path).fileSystem(dfsCluster1)).materialize();
+ Assert.assertEquals(testString, concatStrings(strings));
+
+ // write output with crunch using a Pipeline with config that doesn't know the FileSystem
+ MRPipeline pipeline = new MRPipeline(getClass(), minimalConfiguration());
+ PCollection<String> input = pipeline.read(From.textFile("hdfs://cluster1/input.txt").fileSystem(dfsCluster1));
+ pipeline.write(input, To.textFile("hdfs://cluster2/output").fileSystem(dfsCluster2));
+ pipeline.run();
+
+ // assert the output was written correctly
+ try (FSDataInputStream inputStream = dfsCluster2.open(new Path("hdfs://cluster2/output/out0-m-00000"))) {
+ String readValue = IOUtils.toString(inputStream).trim();
+ Assert.assertEquals(testString, readValue);
+ }
+
+ // make sure the clusters aren't getting mixed up
+ Assert.assertFalse(dfsCluster1.exists(new Path("/output")));
+ }
+
+ /**
+ * Tests that multiple calls to fileSystem() on Source, Target, or SourceTarget results in
+ * IllegalStateException
+ */
+ @Test
+ public void testResetFileSystem() {
+ Source<String> source = From.textFile("/data").fileSystem(defaultFs);
+ try {
+ source.fileSystem(dfsCluster1);
+ Assert.fail("Expected an IllegalStateException");
+ } catch (IllegalStateException e) { }
+
+ Target target = To.textFile("/data").fileSystem(defaultFs);
+ try {
+ target.fileSystem(dfsCluster1);
+ Assert.fail("Expected an IllegalStateException");
+ } catch (IllegalStateException e) { }
+
+ SourceTarget<String> sourceTarget = target.asSourceTarget(source.getType());
+ try {
+ sourceTarget.fileSystem(dfsCluster1);
+ Assert.fail("Expected an IllegalStateException");
+ } catch (IllegalStateException e) { }
+ }
+
+ /**
+ * Tests when supplied Filesystem is not in agreement with Path. For example, Path is "hdfs://cluster1/data"
+ * but FileSystem is hdfs://cluster2.
+ */
+ @Test
+ public void testWrongFs() {
+ Source<String> source = From.textFile("hdfs://cluster1/data");
+ try {
+ source.fileSystem(dfsCluster2);
+ Assert.fail("Expected IllegalArgumentException");
+ } catch (IllegalArgumentException e) { }
+
+ Target target = To.textFile("hdfs://cluster1/data");
+ try {
+ target.fileSystem(dfsCluster2);
+ Assert.fail("Expected IllegalArgumentException");
+ } catch (IllegalArgumentException e) { }
+
+ SourceTarget<String> sourceTarget = target.asSourceTarget(source.getType());
+ try {
+ sourceTarget.fileSystem(dfsCluster2);
+ Assert.fail("Expected IllegalArgumentException");
+ } catch (IllegalArgumentException e) { }
+ }
+
+ private static String concatStrings(Iterable<String> strings) {
+ StringBuilder builder = new StringBuilder();
+ for (String string : strings) {
+ builder.append(string);
+ }
+ return builder.toString();
+ }
+
+ /**
+ * Creates a minimal configuration pointing to an HA HDFS default filesystem to ensure
+ * that configuration for external filesystems used in Sources and Targets doesn't mess up
+ * dfs.nameservices on the Pipeline, which could cause the default filesystem to become
+ * unresolveable.
+ *
+ * @return a minimal configuration with an HDFS HA default fs
+ */
+ private static Configuration minimalConfiguration() {
+ Configuration minimalConfiguration = new Configuration(false);
+ minimalConfiguration.addResource(defaultFs.getConf());
+ minimalConfiguration.set("fs.defaultFS", "hdfs://default");
+ // exposes bugs hidden by filesystem cache
+ minimalConfiguration.set("fs.hdfs.impl.disable.cache", "true");
+ return minimalConfiguration;
+ }
+
+ private static Configuration getDfsConf(String nsName, MiniDFSCluster cluster) {
+ Configuration conf = new Configuration();
+ conf.set("dfs.nameservices", nsName);
+ conf.set("dfs.client.failover.proxy.provider." + nsName,
+ "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
+ conf.set("dfs.ha.namenodes." + nsName, "nn1");
+ conf.set("dfs.namenode.rpc-address." + nsName + ".nn1", "localhost:" + cluster.getNameNodePort());
+ return conf;
+ }
+
+ private static FileSystem createHaMiniClusterFs(String clusterName, TemporaryPath temporaryPath)
+ throws IOException {
+ Configuration conf = new Configuration();
+ conf.set(HDFS_MINIDFS_BASEDIR, temporaryPath.getRootFileName());
+ MiniDFSCluster cluster = new Builder(conf).build();
+ miniDFSClusters.add(cluster);
+ return FileSystem.get(URI.create("hdfs://" + clusterName), getDfsConf(clusterName, cluster));
+ }
+}
diff --git a/crunch-core/src/main/java/org/apache/crunch/Source.java b/crunch-core/src/main/java/org/apache/crunch/Source.java
index b209dfc..0cffbf7 100644
--- a/crunch-core/src/main/java/org/apache/crunch/Source.java
+++ b/crunch-core/src/main/java/org/apache/crunch/Source.java
@@ -22,6 +22,7 @@
import org.apache.crunch.types.Converter;
import org.apache.crunch.types.PType;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.mapreduce.Job;
/**
@@ -39,6 +40,26 @@
Source<T> inputConf(String key, String value);
/**
+ * Adds the {@code Configuration} of the given filesystem such that the source can read from it when the {@code
+ * Pipeline} itself does not have that configuration.
+ * </p>
+ * Changing the filesystem after it is set is not supported and will result in {@link
+ * IllegalStateException}
+ *
+ * @param fileSystem the filesystem
+ * @return this Source
+ * @throws IllegalStateException if the filesystem has already been set
+ * @throws IllegalArgumentException if the source is pointing to a fully qualified Path in a different FileSystem
+ */
+ Source<T> fileSystem(FileSystem fileSystem);
+
+ /**
+ * Returns the {@code FileSystem} for this source or null if no explicit filesystem {@link #fileSystem(FileSystem)
+ * has been set}.
+ */
+ FileSystem getFileSystem();
+
+ /**
* Returns the {@code PType} for this source.
*/
PType<T> getType();
diff --git a/crunch-core/src/main/java/org/apache/crunch/SourceTarget.java b/crunch-core/src/main/java/org/apache/crunch/SourceTarget.java
index 80cd730..132c183 100644
--- a/crunch-core/src/main/java/org/apache/crunch/SourceTarget.java
+++ b/crunch-core/src/main/java/org/apache/crunch/SourceTarget.java
@@ -17,6 +17,8 @@
*/
package org.apache.crunch;
+import org.apache.hadoop.fs.FileSystem;
+
/**
* An interface for classes that implement both the {@code Source} and the
* {@code Target} interfaces.
@@ -29,4 +31,19 @@
* re-use the same config keys with different values when necessary.
*/
SourceTarget<T> conf(String key, String value);
+
+ /**
+ * Adds the {@code Configuration} of the given filesystem such that the source-target can read/write from/to it when
+ * the {@code Pipeline} itself does not have that configuration.
+ * </p>
+ * Changing the filesystem after it is set is not supported and will result in {@link
+ * IllegalStateException}
+ *
+ * @param fileSystem the filesystem
+ * @return this SourceTarget
+ * @throws IllegalStateException if the filesystem has already been set
+ * @throws IllegalArgumentException if the source/target is pointing to a fully qualified Path in a different
+ * FileSystem
+ */
+ SourceTarget<T> fileSystem(FileSystem fileSystem);
}
diff --git a/crunch-core/src/main/java/org/apache/crunch/Target.java b/crunch-core/src/main/java/org/apache/crunch/Target.java
index 4dec831..bd32a77 100644
--- a/crunch-core/src/main/java/org/apache/crunch/Target.java
+++ b/crunch-core/src/main/java/org/apache/crunch/Target.java
@@ -21,6 +21,7 @@
import org.apache.crunch.types.Converter;
import org.apache.crunch.types.PType;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
/**
* A {@code Target} represents the output destination of a Crunch {@code PCollection}
@@ -68,6 +69,26 @@
Target outputConf(String key, String value);
/**
+ * Adds the {@code Configuration} of the given filesystem such that the target can write to it when the {@code
+ * Pipeline} itself does not have that configuration.
+ * </p>
+ * Changing the filesystem after it is set is not supported and will result in {@link
+ * IllegalStateException}
+ *
+ * @param fileSystem the filesystem
+ * @return this Target
+ * @throws IllegalStateException if the filesystem has already been set
+ * @throws IllegalArgumentException if the target is pointing to a fully qualified Path in a different FileSystem
+ */
+ Target fileSystem(FileSystem fileSystem);
+
+ /**
+ * Returns the {@code FileSystem} for this target or null if no explicit filesystem {@link #fileSystem(FileSystem)
+ * has been set}.
+ */
+ FileSystem getFileSystem();
+
+ /**
* Apply the given {@code WriteMode} to this {@code Target} instance.
*
* @param writeMode The strategy for handling existing outputs
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/FormatBundle.java b/crunch-core/src/main/java/org/apache/crunch/io/FormatBundle.java
index 3259aaf..bbe7f5c 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/FormatBundle.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/FormatBundle.java
@@ -17,6 +17,7 @@
*/
package org.apache.crunch.io;
+import com.google.common.collect.Sets;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
@@ -27,12 +28,15 @@
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
+import java.util.Arrays;
import java.util.Map;
+import java.util.Set;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputFormat;
@@ -94,11 +98,29 @@
public Configuration configure(Configuration conf) {
for (Map.Entry<String, String> e : extraConf.entrySet()) {
- conf.set(e.getKey(), e.getValue());
+ String key = e.getKey();
+ String value = e.getValue();
+ // merge the value if it is DFS_NAMESERVICES to support additional filesystems
+ if (key.equals(DFSConfigKeys.DFS_NAMESERVICES)) {
+ String[] originalValue = conf.getStrings(key);
+ if (originalValue != null) {
+ String[] newValue = value != null ? value.split(",") : new String[0];
+ conf.setStrings(key, mergeValues(originalValue, newValue));
+ continue;
+ }
+ }
+ conf.set(key, value);
}
return conf;
}
+ private static String[] mergeValues(String[] value1, String[] value2) {
+ Set<String> values = Sets.newHashSet();
+ values.addAll(Arrays.asList(value1));
+ values.addAll(Arrays.asList(value2));
+ return values.toArray(new String[0]);
+ }
+
public String serialize() {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java
index 37aa05b..1826302 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java
@@ -88,7 +88,7 @@
@Override
public <T> SourceTarget<T> asSourceTarget(PType<T> ptype) {
if (ptype instanceof AvroType) {
- return new AvroFileSourceTarget<T>(path, (AvroType<T>) ptype);
+ return new AvroFileSourceTarget<T>(path, (AvroType<T>) ptype).fileSystem(getFileSystem());
}
return null;
}
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
index 27a1167..98c0fb8 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
@@ -18,11 +18,14 @@
package org.apache.crunch.io.impl;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
+import java.util.Map.Entry;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.crunch.ReadableData;
import org.apache.crunch.Source;
@@ -48,10 +51,11 @@
private static final Logger LOG = LoggerFactory.getLogger(FileSourceImpl.class);
@Deprecated
- protected final Path path;
- protected final List<Path> paths;
+ protected Path path;
+ protected List<Path> paths;
protected final PType<T> ptype;
protected final FormatBundle<? extends InputFormat> inputBundle;
+ private FileSystem fileSystem;
public FileSourceImpl(Path path, PType<T> ptype, Class<? extends InputFormat> inputFormatClass) {
this(path, ptype, FormatBundle.forInput(inputFormatClass));
@@ -86,12 +90,40 @@
}
@Override
+ public FileSystem getFileSystem() {
+ return fileSystem;
+ }
+
+ @Override
public Source<T> inputConf(String key, String value) {
inputBundle.set(key, value);
return this;
}
@Override
+ public Source<T> fileSystem(FileSystem fileSystem) {
+ if (this.fileSystem != null) {
+ throw new IllegalStateException("Filesystem already set. Change is not supported.");
+ }
+
+ this.fileSystem = fileSystem;
+
+ if (fileSystem != null) {
+ List<Path> qualifiedPaths = new ArrayList<>(paths.size());
+ for (Path path : paths) {
+ qualifiedPaths.add(fileSystem.makeQualified(path));
+ }
+ paths = qualifiedPaths;
+
+ Configuration fsConf = fileSystem.getConf();
+ for (Entry<String, String> entry : fsConf) {
+ inputBundle.set(entry.getKey(), entry.getValue());
+ }
+ }
+ return this;
+ }
+
+ @Override
public Converter<?, ?, ?, ?> getConverter() {
return ptype.getConverter();
}
@@ -112,12 +144,18 @@
return ptype;
}
+ private Configuration getEffectiveBundleConfig(Configuration configuration) {
+ // overlay the bundle config on top of a copy of the supplied config
+ return getBundle().configure(new Configuration(configuration));
+ }
+
@Override
public long getSize(Configuration configuration) {
long size = 0;
+ Configuration bundleConfig = getEffectiveBundleConfig(configuration);
for (Path path : paths) {
try {
- size += SourceTargetHelper.getPathSize(configuration, path);
+ size += SourceTargetHelper.getPathSize(bundleConfig, path);
} catch (IOException e) {
LOG.warn("Exception thrown looking up size of: {}", path, e);
throw new IllegalStateException("Failed to get the file size of:" + path, e);
@@ -129,8 +167,9 @@
protected Iterable<T> read(Configuration conf, FileReaderFactory<T> readerFactory)
throws IOException {
List<Iterable<T>> iterables = Lists.newArrayList();
+ Configuration bundleConfig = getEffectiveBundleConfig(conf);
for (Path path : paths) {
- FileSystem fs = path.getFileSystem(conf);
+ FileSystem fs = path.getFileSystem(bundleConfig);
iterables.add(CompositePathIterable.create(fs, path, readerFactory));
}
return Iterables.concat(iterables);
@@ -147,9 +186,10 @@
@Override
public long getLastModifiedAt(Configuration conf) {
long lastMod = -1;
+ Configuration bundleConfig = getEffectiveBundleConfig(conf);
for (Path path : paths) {
try {
- FileSystem fs = path.getFileSystem(conf);
+ FileSystem fs = path.getFileSystem(bundleConfig);
long lm = SourceTargetHelper.getLastModifiedAt(fs, path);
if (lm > lastMod) {
lastMod = lm;
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
index 17efabb..ea7e98a 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
@@ -19,8 +19,10 @@
import java.io.IOException;
import java.util.Arrays;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
@@ -63,9 +65,10 @@
private static final Logger LOG = LoggerFactory.getLogger(FileTargetImpl.class);
- protected final Path path;
+ protected Path path;
private final FormatBundle<? extends FileOutputFormat> formatBundle;
private final FileNamingScheme fileNamingScheme;
+ private FileSystem fileSystem;
public FileTargetImpl(Path path, Class<? extends FileOutputFormat> outputFormatClass,
FileNamingScheme fileNamingScheme) {
@@ -91,6 +94,30 @@
}
@Override
+ public Target fileSystem(FileSystem fileSystem) {
+ if (this.fileSystem != null) {
+ throw new IllegalStateException("Filesystem already set. Change is not supported.");
+ }
+
+ if (fileSystem != null) {
+ path = fileSystem.makeQualified(path);
+
+ this.fileSystem = fileSystem;
+
+ Configuration fsConf = fileSystem.getConf();
+ for (Entry<String, String> entry : fsConf) {
+ formatBundle.set(entry.getKey(), entry.getValue());
+ }
+ }
+ return this;
+ }
+
+ @Override
+ public FileSystem getFileSystem() {
+ return fileSystem;
+ }
+
+ @Override
public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String name) {
Converter converter = getConverter(ptype);
Class keyClass = converter.getKeyClass();
@@ -164,7 +191,8 @@
@Override
public void handleOutputs(Configuration conf, Path workingPath, int index) throws IOException {
FileSystem srcFs = workingPath.getFileSystem(conf);
- FileSystem dstFs = path.getFileSystem(conf);
+ Configuration dstFsConf = getEffectiveBundleConfig(conf);
+ FileSystem dstFs = path.getFileSystem(dstFsConf);
if (!dstFs.exists(path)) {
dstFs.mkdirs(path);
}
@@ -178,7 +206,7 @@
if (useDistributedCopy) {
LOG.info("Source and destination are in different file systems, performing distributed copy from {} to {}", srcPattern,
path);
- handeOutputsDistributedCopy(conf, srcPattern, srcFs, dstFs, maxDistributedCopyTasks);
+ handleOutputsDistributedCopy(dstFsConf, srcPattern, srcFs, dstFs, maxDistributedCopyTasks);
} else {
LOG.info("Source and destination are in different file systems, performing asynch copies from {} to {}", srcPattern, path);
handleOutputsAsynchronously(conf, srcPattern, srcFs, dstFs, sameFs, maxThreads);
@@ -198,8 +226,9 @@
MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(
maxThreads));
+ Configuration dstFsConf = getEffectiveBundleConfig(conf);
for (Path s : srcs) {
- Path d = getDestFile(conf, s, path, s.getName().contains("-m-"));
+ Path d = getDestFile(dstFsConf, s, path, s.getName().contains("-m-"));
renameFutures.add(
executorService.submit(
new WorkingPathFileMover(conf, s, d, srcFs, dstFs, sameFs)));
@@ -356,11 +385,16 @@
return null;
}
+ private Configuration getEffectiveBundleConfig(Configuration configuration) {
+ // overlay the bundle config on top of a copy of the supplied config
+ return formatBundle.configure(new Configuration(configuration));
+ }
+
@Override
public boolean handleExisting(WriteMode strategy, long lastModForSource, Configuration conf) {
FileSystem fs = null;
try {
- fs = path.getFileSystem(conf);
+ fs = path.getFileSystem(getEffectiveBundleConfig(conf));
} catch (IOException e) {
LOG.error("Could not retrieve FileSystem object to check for existing path", e);
throw new CrunchRuntimeException(e);
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java
index b15a00b..6ef88aa 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java
@@ -27,6 +27,7 @@
import org.apache.crunch.types.Converter;
import org.apache.crunch.types.PType;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.mapreduce.Job;
class SourceTargetImpl<T> implements SourceTarget<T> {
@@ -46,6 +47,19 @@
}
@Override
+ public SourceTarget<T> fileSystem(FileSystem fileSystem) {
+ source.fileSystem(fileSystem);
+ target.fileSystem(fileSystem);
+ return this;
+ }
+
+ @Override
+ public FileSystem getFileSystem() {
+ // could either return source or target filesytem as they are the same
+ return source.getFileSystem();
+ }
+
+ @Override
public PType<T> getType() {
return source.getType();
}
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileTarget.java
index 2698053..9b1bbf2 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileTarget.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileTarget.java
@@ -102,7 +102,7 @@
@Override
public <T> SourceTarget<T> asSourceTarget(PType<T> ptype) {
if (ptype instanceof AvroType && IndexedRecord.class.isAssignableFrom(((AvroType) ptype).getTypeClass())) {
- return new AvroParquetFileSourceTarget(path, (AvroType<T>) ptype);
+ return new AvroParquetFileSourceTarget(path, (AvroType<T>) ptype).fileSystem(getFileSystem());
}
return null;
}
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/seq/SeqFileTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/seq/SeqFileTarget.java
index b23f358..0ec91c9 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/seq/SeqFileTarget.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/seq/SeqFileTarget.java
@@ -47,9 +47,9 @@
@Override
public <T> SourceTarget<T> asSourceTarget(PType<T> ptype) {
if (ptype instanceof PTableType) {
- return new SeqFileTableSourceTarget(path, (PTableType) ptype);
+ return new SeqFileTableSourceTarget(path, (PTableType) ptype).fileSystem(getFileSystem());
} else {
- return new SeqFileSourceTarget(path, ptype);
+ return new SeqFileSourceTarget(path, ptype).fileSystem(getFileSystem());
}
}
}
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileTarget.java
index 258936e..880ffb5 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileTarget.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileTarget.java
@@ -98,9 +98,9 @@
return null;
}
if (ptype instanceof PTableType) {
- return new TextFileTableSourceTarget(path, (PTableType) ptype);
+ return new TextFileTableSourceTarget(path, (PTableType) ptype).fileSystem(getFileSystem());
}
- return new TextFileSourceTarget<T>(path, ptype);
+ return new TextFileSourceTarget<T>(path, ptype).fileSystem(getFileSystem());
}
private <T> boolean isTextCompatible(PType<T> ptype) {
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
index a8c157d..908bc50 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
@@ -35,6 +35,7 @@
import org.apache.crunch.types.PType;
import org.apache.crunch.types.writable.Writables;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
@@ -129,6 +130,12 @@
}
@Override
+ public SourceTarget<Pair<ImmutableBytesWritable, Result>> fileSystem(FileSystem fileSystem) {
+ // not currently supported/applicable for HBase
+ return this;
+ }
+
+ @Override
public PType<Pair<ImmutableBytesWritable, Result>> getType() {
return PTYPE;
}
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
index f4f134d..d51dee0 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
@@ -32,6 +32,7 @@
import org.apache.crunch.types.Converter;
import org.apache.crunch.types.PType;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
@@ -146,6 +147,18 @@
}
@Override
+ public Target fileSystem(FileSystem fileSystem) {
+ // not currently supported/applicable for HBase
+ return this;
+ }
+
+ @Override
+ public FileSystem getFileSystem() {
+ // not currently supported/applicable for HBase
+ return null;
+ }
+
+ @Override
public boolean handleExisting(WriteMode strategy, long lastModifiedAt, Configuration conf) {
LOG.info("HBaseTarget ignores checks for existing outputs...");
return false;
diff --git a/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcFileTarget.java b/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcFileTarget.java
index 0ad0434..8ff9571 100644
--- a/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcFileTarget.java
+++ b/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcFileTarget.java
@@ -45,7 +45,7 @@
@Override
public <T> SourceTarget<T> asSourceTarget(PType<T> ptype) {
- return new OrcFileSourceTarget<T>(path, ptype);
+ return new OrcFileSourceTarget<T>(path, ptype).fileSystem(getFileSystem());
}
}