CRUNCH-677 Source and Target accept FileSystem
diff --git a/crunch-core/pom.xml b/crunch-core/pom.xml
index cd77373..81842d2 100644
--- a/crunch-core/pom.xml
+++ b/crunch-core/pom.xml
@@ -129,6 +129,12 @@
     </dependency>
 
     <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-minicluster</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>
diff --git a/crunch-core/src/it/java/org/apache/crunch/ExternalFilesystemIT.java b/crunch-core/src/it/java/org/apache/crunch/ExternalFilesystemIT.java
new file mode 100644
index 0000000..75a2837
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/ExternalFilesystemIT.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import static org.apache.hadoop.hdfs.MiniDFSCluster.HDFS_MINIDFS_BASEDIR;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import org.apache.commons.io.IOUtils;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.From;
+import org.apache.crunch.io.To;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.MiniDFSCluster.Builder;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+/**
+ * Tests reading and writing from a FileSystem for which the Configuration is separate from the Pipeline's
+ * Configuration.
+ */
+public class ExternalFilesystemIT {
+
+    @ClassRule
+    public static TemporaryPath tmpDir1 = TemporaryPaths.create();
+
+    @ClassRule
+    public static TemporaryPath tmpDir2 = TemporaryPaths.create();
+
+    @ClassRule
+    public static TemporaryPath tmpDir3 = TemporaryPaths.create();
+
+    private static FileSystem dfsCluster1;
+    private static FileSystem dfsCluster2;
+    private static FileSystem defaultFs;
+
+    private static Collection<MiniDFSCluster> miniDFSClusters = new ArrayList<>();
+
+    @BeforeClass
+    public static void setup() throws Exception {
+        dfsCluster1 = createHaMiniClusterFs("cluster1", tmpDir1);
+        dfsCluster2 = createHaMiniClusterFs("cluster2", tmpDir2);
+        defaultFs = createHaMiniClusterFs("default", tmpDir3);
+    }
+
+    @AfterClass
+    public static void teardown() throws IOException {
+        dfsCluster1.close();
+        dfsCluster2.close();
+        defaultFs.close();
+        for (MiniDFSCluster miniDFSCluster : miniDFSClusters) {
+            miniDFSCluster.shutdown();
+        }
+    }
+
+    @Test
+    public void testReadWrite() throws Exception {
+        // write a test file outside crunch
+        Path path = new Path("hdfs://cluster1/input.txt");
+        String testString = "Hello world!";
+        try (PrintWriter printWriter = new PrintWriter(dfsCluster1.create(path, true))) {
+            printWriter.println(testString);
+        }
+
+        // assert it can be read back using a Pipeline with config that doesn't know the FileSystem
+        Iterable<String> strings = new MRPipeline(getClass(), minimalConfiguration())
+            .read(From.textFile(path).fileSystem(dfsCluster1)).materialize();
+        Assert.assertEquals(testString, concatStrings(strings));
+
+        // write output with crunch using a Pipeline with config that doesn't know the FileSystem
+        MRPipeline pipeline = new MRPipeline(getClass(), minimalConfiguration());
+        PCollection<String> input = pipeline.read(From.textFile("hdfs://cluster1/input.txt").fileSystem(dfsCluster1));
+        pipeline.write(input, To.textFile("hdfs://cluster2/output").fileSystem(dfsCluster2));
+        pipeline.run();
+
+        // assert the output was written correctly
+        try (FSDataInputStream inputStream = dfsCluster2.open(new Path("hdfs://cluster2/output/out0-m-00000"))) {
+            String readValue = IOUtils.toString(inputStream).trim();
+            Assert.assertEquals(testString, readValue);
+        }
+
+        // make sure the clusters aren't getting mixed up
+        Assert.assertFalse(dfsCluster1.exists(new Path("/output")));
+    }
+
+    /**
+     * Tests that multiple calls to fileSystem() on Source, Target, or SourceTarget results in
+     * IllegalStateException
+     */
+    @Test
+    public void testResetFileSystem() {
+        Source<String> source = From.textFile("/data").fileSystem(defaultFs);
+        try {
+            source.fileSystem(dfsCluster1);
+            Assert.fail("Expected an IllegalStateException");
+        } catch (IllegalStateException e) { }
+
+        Target target = To.textFile("/data").fileSystem(defaultFs);
+        try {
+            target.fileSystem(dfsCluster1);
+            Assert.fail("Expected an IllegalStateException");
+        } catch (IllegalStateException e) { }
+
+        SourceTarget<String> sourceTarget = target.asSourceTarget(source.getType());
+        try {
+            sourceTarget.fileSystem(dfsCluster1);
+            Assert.fail("Expected an IllegalStateException");
+        } catch (IllegalStateException e) { }
+    }
+
+    /**
+     * Tests when supplied Filesystem is not in agreement with Path.  For example, Path is "hdfs://cluster1/data"
+     * but FileSystem is hdfs://cluster2.
+     */
+    @Test
+    public void testWrongFs() {
+        Source<String> source = From.textFile("hdfs://cluster1/data");
+        try {
+            source.fileSystem(dfsCluster2);
+            Assert.fail("Expected IllegalArgumentException");
+        } catch (IllegalArgumentException e) { }
+
+        Target target = To.textFile("hdfs://cluster1/data");
+        try {
+            target.fileSystem(dfsCluster2);
+            Assert.fail("Expected IllegalArgumentException");
+        } catch (IllegalArgumentException e) { }
+
+        SourceTarget<String> sourceTarget = target.asSourceTarget(source.getType());
+        try {
+            sourceTarget.fileSystem(dfsCluster2);
+            Assert.fail("Expected IllegalArgumentException");
+        } catch (IllegalArgumentException e) { }
+    }
+
+    private static String concatStrings(Iterable<String> strings) {
+        StringBuilder builder = new StringBuilder();
+        for (String string : strings) {
+            builder.append(string);
+        }
+        return builder.toString();
+    }
+
+    /**
+     * Creates a minimal configuration pointing to an HA HDFS default filesystem to ensure
+     * that configuration for external filesystems used in Sources and Targets doesn't mess up
+     * dfs.nameservices on the Pipeline, which could cause the default filesystem to become
+     * unresolveable.
+     *
+     * @return a minimal configuration with an HDFS HA default fs
+     */
+    private static Configuration minimalConfiguration() {
+        Configuration minimalConfiguration = new Configuration(false);
+        minimalConfiguration.addResource(defaultFs.getConf());
+        minimalConfiguration.set("fs.defaultFS", "hdfs://default");
+        // exposes bugs hidden by filesystem cache
+        minimalConfiguration.set("fs.hdfs.impl.disable.cache", "true");
+        return minimalConfiguration;
+    }
+
+    private static Configuration getDfsConf(String nsName, MiniDFSCluster cluster) {
+        Configuration conf = new Configuration();
+        conf.set("dfs.nameservices", nsName);
+        conf.set("dfs.client.failover.proxy.provider." + nsName,
+            "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
+        conf.set("dfs.ha.namenodes." + nsName, "nn1");
+        conf.set("dfs.namenode.rpc-address." + nsName + ".nn1", "localhost:" + cluster.getNameNodePort());
+        return conf;
+    }
+
+    private static FileSystem createHaMiniClusterFs(String clusterName, TemporaryPath temporaryPath)
+        throws IOException {
+        Configuration conf = new Configuration();
+        conf.set(HDFS_MINIDFS_BASEDIR, temporaryPath.getRootFileName());
+        MiniDFSCluster cluster = new Builder(conf).build();
+        miniDFSClusters.add(cluster);
+        return FileSystem.get(URI.create("hdfs://" + clusterName), getDfsConf(clusterName, cluster));
+    }
+}
diff --git a/crunch-core/src/main/java/org/apache/crunch/Source.java b/crunch-core/src/main/java/org/apache/crunch/Source.java
index b209dfc..0cffbf7 100644
--- a/crunch-core/src/main/java/org/apache/crunch/Source.java
+++ b/crunch-core/src/main/java/org/apache/crunch/Source.java
@@ -22,6 +22,7 @@
 import org.apache.crunch.types.Converter;
 import org.apache.crunch.types.PType;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.mapreduce.Job;
 
 /**
@@ -39,6 +40,26 @@
   Source<T> inputConf(String key, String value);
 
   /**
+   * Adds the {@code Configuration} of the given filesystem such that the source can read from it when the {@code
+   * Pipeline} itself does not have that configuration.
+   * </p>
+   * Changing the filesystem after it is set is not supported and will result in {@link
+   * IllegalStateException}
+   *
+   * @param fileSystem the filesystem
+   * @return this Source
+   * @throws IllegalStateException if the filesystem has already been set
+   * @throws IllegalArgumentException if the source is pointing to a fully qualified Path in a different FileSystem
+   */
+  Source<T> fileSystem(FileSystem fileSystem);
+
+  /**
+   * Returns the {@code FileSystem} for this source or null if no explicit filesystem {@link #fileSystem(FileSystem)
+   * has been set}.
+   */
+  FileSystem getFileSystem();
+
+  /**
    * Returns the {@code PType} for this source.
    */
   PType<T> getType();
diff --git a/crunch-core/src/main/java/org/apache/crunch/SourceTarget.java b/crunch-core/src/main/java/org/apache/crunch/SourceTarget.java
index 80cd730..132c183 100644
--- a/crunch-core/src/main/java/org/apache/crunch/SourceTarget.java
+++ b/crunch-core/src/main/java/org/apache/crunch/SourceTarget.java
@@ -17,6 +17,8 @@
  */
 package org.apache.crunch;
 
+import org.apache.hadoop.fs.FileSystem;
+
 /**
  * An interface for classes that implement both the {@code Source} and the
  * {@code Target} interfaces.
@@ -29,4 +31,19 @@
    * re-use the same config keys with different values when necessary.
    */
   SourceTarget<T> conf(String key, String value);
+
+  /**
+   * Adds the {@code Configuration} of the given filesystem such that the source-target can read/write from/to it when
+   * the {@code Pipeline} itself does not have that configuration.
+   * </p>
+   * Changing the filesystem after it is set is not supported and will result in {@link
+   * IllegalStateException}
+   *
+   * @param fileSystem the filesystem
+   * @return this SourceTarget
+   * @throws IllegalStateException if the filesystem has already been set
+   * @throws IllegalArgumentException if the source/target is pointing to a fully qualified Path in a different
+   * FileSystem
+   */
+  SourceTarget<T> fileSystem(FileSystem fileSystem);
 }
diff --git a/crunch-core/src/main/java/org/apache/crunch/Target.java b/crunch-core/src/main/java/org/apache/crunch/Target.java
index 4dec831..bd32a77 100644
--- a/crunch-core/src/main/java/org/apache/crunch/Target.java
+++ b/crunch-core/src/main/java/org/apache/crunch/Target.java
@@ -21,6 +21,7 @@
 import org.apache.crunch.types.Converter;
 import org.apache.crunch.types.PType;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 
 /**
  * A {@code Target} represents the output destination of a Crunch {@code PCollection}
@@ -68,6 +69,26 @@
   Target outputConf(String key, String value);
 
   /**
+   * Adds the {@code Configuration} of the given filesystem such that the target can write to it when the {@code
+   * Pipeline} itself does not have that configuration.
+   * </p>
+   * Changing the filesystem after it is set is not supported and will result in {@link
+   * IllegalStateException}
+   *
+   * @param fileSystem the filesystem
+   * @return this Target
+   * @throws IllegalStateException if the filesystem has already been set
+   * @throws IllegalArgumentException if the target is pointing to a fully qualified Path in a different FileSystem
+   */
+  Target fileSystem(FileSystem fileSystem);
+
+  /**
+   * Returns the {@code FileSystem} for this target or null if no explicit filesystem {@link #fileSystem(FileSystem)
+   * has been set}.
+   */
+  FileSystem getFileSystem();
+
+  /**
    * Apply the given {@code WriteMode} to this {@code Target} instance.
    * 
    * @param writeMode The strategy for handling existing outputs
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/FormatBundle.java b/crunch-core/src/main/java/org/apache/crunch/io/FormatBundle.java
index 3259aaf..bbe7f5c 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/FormatBundle.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/FormatBundle.java
@@ -17,6 +17,7 @@
  */
 package org.apache.crunch.io;
 
+import com.google.common.collect.Sets;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInput;
@@ -27,12 +28,15 @@
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
+import java.util.Arrays;
 import java.util.Map;
 
+import java.util.Set;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.InputFormat;
@@ -94,11 +98,29 @@
 
   public Configuration configure(Configuration conf) {
     for (Map.Entry<String, String> e : extraConf.entrySet()) {
-      conf.set(e.getKey(), e.getValue());
+      String key = e.getKey();
+      String value = e.getValue();
+      // merge the value if it is DFS_NAMESERVICES to support additional filesystems
+      if (key.equals(DFSConfigKeys.DFS_NAMESERVICES)) {
+        String[] originalValue = conf.getStrings(key);
+        if (originalValue != null) {
+          String[] newValue = value != null ? value.split(",") : new String[0];
+          conf.setStrings(key, mergeValues(originalValue, newValue));
+          continue;
+        }
+      }
+      conf.set(key, value);
     }
     return conf;
   }
 
+  private static String[] mergeValues(String[] value1, String[] value2) {
+    Set<String> values = Sets.newHashSet();
+    values.addAll(Arrays.asList(value1));
+    values.addAll(Arrays.asList(value2));
+    return values.toArray(new String[0]);
+  }
+
   public String serialize() {
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     try {
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java
index 37aa05b..1826302 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java
@@ -88,7 +88,7 @@
   @Override
   public <T> SourceTarget<T> asSourceTarget(PType<T> ptype) {
     if (ptype instanceof AvroType) {
-      return new AvroFileSourceTarget<T>(path, (AvroType<T>) ptype);
+      return new AvroFileSourceTarget<T>(path, (AvroType<T>) ptype).fileSystem(getFileSystem());
     }
     return null;
   }
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
index 27a1167..98c0fb8 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
@@ -18,11 +18,14 @@
 package org.apache.crunch.io.impl;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
+import java.util.Map.Entry;
 import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.crunch.ReadableData;
 import org.apache.crunch.Source;
@@ -48,10 +51,11 @@
   private static final Logger LOG = LoggerFactory.getLogger(FileSourceImpl.class);
 
   @Deprecated
-  protected final Path path;
-  protected final List<Path> paths;
+  protected Path path;
+  protected List<Path> paths;
   protected final PType<T> ptype;
   protected final FormatBundle<? extends InputFormat> inputBundle;
+  private FileSystem fileSystem;
 
   public FileSourceImpl(Path path, PType<T> ptype, Class<? extends InputFormat> inputFormatClass) {
     this(path, ptype, FormatBundle.forInput(inputFormatClass));
@@ -86,12 +90,40 @@
   }
 
   @Override
+  public FileSystem getFileSystem() {
+    return fileSystem;
+  }
+
+  @Override
   public Source<T> inputConf(String key, String value) {
     inputBundle.set(key, value);
     return this;
   }
 
   @Override
+  public Source<T> fileSystem(FileSystem fileSystem) {
+    if (this.fileSystem != null) {
+      throw new IllegalStateException("Filesystem already set. Change is not supported.");
+    }
+
+    this.fileSystem = fileSystem;
+
+    if (fileSystem != null) {
+      List<Path> qualifiedPaths = new ArrayList<>(paths.size());
+      for (Path path : paths) {
+        qualifiedPaths.add(fileSystem.makeQualified(path));
+      }
+      paths = qualifiedPaths;
+
+      Configuration fsConf = fileSystem.getConf();
+      for (Entry<String, String> entry : fsConf) {
+        inputBundle.set(entry.getKey(), entry.getValue());
+      }
+    }
+    return this;
+  }
+
+  @Override
   public Converter<?, ?, ?, ?> getConverter() {
     return ptype.getConverter();
   }
@@ -112,12 +144,18 @@
     return ptype;
   }
 
+  private Configuration getEffectiveBundleConfig(Configuration configuration) {
+    // overlay the bundle config on top of a copy of the supplied config
+    return getBundle().configure(new Configuration(configuration));
+  }
+
   @Override
   public long getSize(Configuration configuration) {
     long size = 0;
+    Configuration bundleConfig = getEffectiveBundleConfig(configuration);
     for (Path path : paths) {
       try {
-        size += SourceTargetHelper.getPathSize(configuration, path);
+        size += SourceTargetHelper.getPathSize(bundleConfig, path);
       } catch (IOException e) {
         LOG.warn("Exception thrown looking up size of: {}", path, e);
         throw new IllegalStateException("Failed to get the file size of:" + path, e);
@@ -129,8 +167,9 @@
   protected Iterable<T> read(Configuration conf, FileReaderFactory<T> readerFactory)
       throws IOException {
     List<Iterable<T>> iterables = Lists.newArrayList();
+    Configuration bundleConfig = getEffectiveBundleConfig(conf);
     for (Path path : paths) {
-      FileSystem fs = path.getFileSystem(conf);
+      FileSystem fs = path.getFileSystem(bundleConfig);
       iterables.add(CompositePathIterable.create(fs, path, readerFactory));
     }
     return Iterables.concat(iterables);
@@ -147,9 +186,10 @@
   @Override
   public long getLastModifiedAt(Configuration conf) {
     long lastMod = -1;
+    Configuration bundleConfig = getEffectiveBundleConfig(conf);
     for (Path path : paths) {
       try {
-        FileSystem fs = path.getFileSystem(conf);
+        FileSystem fs = path.getFileSystem(bundleConfig);
         long lm = SourceTargetHelper.getLastModifiedAt(fs, path);
         if (lm > lastMod) {
           lastMod = lm;
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
index 17efabb..ea7e98a 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
@@ -19,8 +19,10 @@
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
@@ -63,9 +65,10 @@
 
   private static final Logger LOG = LoggerFactory.getLogger(FileTargetImpl.class);
   
-  protected final Path path;
+  protected Path path;
   private final FormatBundle<? extends FileOutputFormat> formatBundle;
   private final FileNamingScheme fileNamingScheme;
+  private FileSystem fileSystem;
 
   public FileTargetImpl(Path path, Class<? extends FileOutputFormat> outputFormatClass,
                         FileNamingScheme fileNamingScheme) {
@@ -91,6 +94,30 @@
   }
 
   @Override
+  public Target fileSystem(FileSystem fileSystem) {
+    if (this.fileSystem != null) {
+      throw new IllegalStateException("Filesystem already set. Change is not supported.");
+    }
+
+    if (fileSystem != null) {
+      path = fileSystem.makeQualified(path);
+
+      this.fileSystem = fileSystem;
+
+      Configuration fsConf = fileSystem.getConf();
+      for (Entry<String, String> entry : fsConf) {
+        formatBundle.set(entry.getKey(), entry.getValue());
+      }
+    }
+    return this;
+  }
+
+  @Override
+  public FileSystem getFileSystem() {
+    return fileSystem;
+  }
+
+  @Override
   public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String name) {
     Converter converter = getConverter(ptype);
     Class keyClass = converter.getKeyClass();
@@ -164,7 +191,8 @@
   @Override
   public void handleOutputs(Configuration conf, Path workingPath, int index) throws IOException {
     FileSystem srcFs = workingPath.getFileSystem(conf);
-    FileSystem dstFs = path.getFileSystem(conf);
+    Configuration dstFsConf = getEffectiveBundleConfig(conf);
+    FileSystem dstFs = path.getFileSystem(dstFsConf);
     if (!dstFs.exists(path)) {
       dstFs.mkdirs(path);
     }
@@ -178,7 +206,7 @@
       if (useDistributedCopy) {
         LOG.info("Source and destination are in different file systems, performing distributed copy from {} to {}", srcPattern,
             path);
-        handeOutputsDistributedCopy(conf, srcPattern, srcFs, dstFs, maxDistributedCopyTasks);
+        handleOutputsDistributedCopy(dstFsConf, srcPattern, srcFs, dstFs, maxDistributedCopyTasks);
       } else {
         LOG.info("Source and destination are in different file systems, performing asynch copies from {} to {}", srcPattern, path);
         handleOutputsAsynchronously(conf, srcPattern, srcFs, dstFs, sameFs, maxThreads);
@@ -198,8 +226,9 @@
         MoreExecutors.listeningDecorator(
             Executors.newFixedThreadPool(
                 maxThreads));
+    Configuration dstFsConf = getEffectiveBundleConfig(conf);
     for (Path s : srcs) {
-      Path d = getDestFile(conf, s, path, s.getName().contains("-m-"));
+      Path d = getDestFile(dstFsConf, s, path, s.getName().contains("-m-"));
       renameFutures.add(
           executorService.submit(
               new WorkingPathFileMover(conf, s, d, srcFs, dstFs, sameFs)));
@@ -356,11 +385,16 @@
     return null;
   }
 
+  private Configuration getEffectiveBundleConfig(Configuration configuration) {
+    // overlay the bundle config on top of a copy of the supplied config
+    return formatBundle.configure(new Configuration(configuration));
+  }
+
   @Override
   public boolean handleExisting(WriteMode strategy, long lastModForSource, Configuration conf) {
     FileSystem fs = null;
     try {
-      fs = path.getFileSystem(conf);
+      fs = path.getFileSystem(getEffectiveBundleConfig(conf));
     } catch (IOException e) {
       LOG.error("Could not retrieve FileSystem object to check for existing path", e);
       throw new CrunchRuntimeException(e);
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java
index b15a00b..6ef88aa 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java
@@ -27,6 +27,7 @@
 import org.apache.crunch.types.Converter;
 import org.apache.crunch.types.PType;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.mapreduce.Job;
 
 class SourceTargetImpl<T> implements SourceTarget<T> {
@@ -46,6 +47,19 @@
   }
 
   @Override
+  public SourceTarget<T> fileSystem(FileSystem fileSystem) {
+    source.fileSystem(fileSystem);
+    target.fileSystem(fileSystem);
+    return this;
+  }
+
+  @Override
+  public FileSystem getFileSystem() {
+    // could either return source or target filesytem as they are the same
+    return source.getFileSystem();
+  }
+
+  @Override
   public PType<T> getType() {
     return source.getType();
   }
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileTarget.java
index 2698053..9b1bbf2 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileTarget.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileTarget.java
@@ -102,7 +102,7 @@
   @Override
   public <T> SourceTarget<T> asSourceTarget(PType<T> ptype) {
     if (ptype instanceof AvroType && IndexedRecord.class.isAssignableFrom(((AvroType) ptype).getTypeClass())) {
-      return new AvroParquetFileSourceTarget(path, (AvroType<T>) ptype);
+      return new AvroParquetFileSourceTarget(path, (AvroType<T>) ptype).fileSystem(getFileSystem());
     }
     return null;
   }
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/seq/SeqFileTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/seq/SeqFileTarget.java
index b23f358..0ec91c9 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/seq/SeqFileTarget.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/seq/SeqFileTarget.java
@@ -47,9 +47,9 @@
   @Override
   public <T> SourceTarget<T> asSourceTarget(PType<T> ptype) {
     if (ptype instanceof PTableType) {
-      return new SeqFileTableSourceTarget(path, (PTableType) ptype);
+      return new SeqFileTableSourceTarget(path, (PTableType) ptype).fileSystem(getFileSystem());
     } else {
-      return new SeqFileSourceTarget(path, ptype);
+      return new SeqFileSourceTarget(path, ptype).fileSystem(getFileSystem());
     }
   }
 }
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileTarget.java
index 258936e..880ffb5 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileTarget.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileTarget.java
@@ -98,9 +98,9 @@
       return null;
     }
     if (ptype instanceof PTableType) {
-      return new TextFileTableSourceTarget(path, (PTableType) ptype);
+      return new TextFileTableSourceTarget(path, (PTableType) ptype).fileSystem(getFileSystem());
     }
-    return new TextFileSourceTarget<T>(path, ptype);
+    return new TextFileSourceTarget<T>(path, ptype).fileSystem(getFileSystem());
   }
   
   private <T> boolean isTextCompatible(PType<T> ptype) {
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
index a8c157d..908bc50 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
@@ -35,6 +35,7 @@
 import org.apache.crunch.types.PType;
 import org.apache.crunch.types.writable.Writables;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.TableName;
@@ -129,6 +130,12 @@
   }
 
   @Override
+  public SourceTarget<Pair<ImmutableBytesWritable, Result>> fileSystem(FileSystem fileSystem) {
+    // not currently supported/applicable for HBase
+    return this;
+  }
+
+  @Override
   public PType<Pair<ImmutableBytesWritable, Result>> getType() {
     return PTYPE;
   }
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
index f4f134d..d51dee0 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
@@ -32,6 +32,7 @@
 import org.apache.crunch.types.Converter;
 import org.apache.crunch.types.PType;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.TableName;
@@ -146,6 +147,18 @@
   }
 
   @Override
+  public Target fileSystem(FileSystem fileSystem) {
+    // not currently supported/applicable for HBase
+    return this;
+  }
+
+  @Override
+  public FileSystem getFileSystem() {
+    // not currently supported/applicable for HBase
+    return null;
+  }
+
+  @Override
   public boolean handleExisting(WriteMode strategy, long lastModifiedAt, Configuration conf) {
     LOG.info("HBaseTarget ignores checks for existing outputs...");
     return false;
diff --git a/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcFileTarget.java b/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcFileTarget.java
index 0ad0434..8ff9571 100644
--- a/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcFileTarget.java
+++ b/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcFileTarget.java
@@ -45,7 +45,7 @@
   
   @Override
   public <T> SourceTarget<T> asSourceTarget(PType<T> ptype) {
-    return new OrcFileSourceTarget<T>(path, ptype);
+    return new OrcFileSourceTarget<T>(path, ptype).fileSystem(getFileSystem());
   }
   
 }