NIFI-9032 Refactoring HDFS processors in order to increase flexibility

This closes #5295.

Signed-off-by: Tamas Palfy <tamas.bertalan.palfy@gmail.com>
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
index 2838a38..57c57cf 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
@@ -33,6 +33,7 @@
 import org.apache.nifi.components.resource.ResourceCardinality;
 import org.apache.nifi.components.resource.ResourceReferences;
 import org.apache.nifi.components.resource.ResourceType;
+import org.apache.nifi.context.PropertyContext;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.hadoop.KerberosProperties;
@@ -134,7 +135,7 @@
             .dynamicallyModifiesClasspath(true)
             .build();
 
-    static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder()
+    public static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder()
             .name("kerberos-credentials-service")
             .displayName("Kerberos Credentials Service")
             .description("Specifies the Kerberos Credentials Controller Service that should be used for authenticating with Kerberos")
@@ -187,7 +188,6 @@
 
     @Override
     protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
-        final ResourceReferences configResources = validationContext.getProperty(HADOOP_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().asResources();
         final String explicitPrincipal = validationContext.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue();
         final String explicitKeytab = validationContext.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue();
         final String explicitPassword = validationContext.getProperty(kerberosProperties.getKerberosPassword()).getValue();
@@ -204,36 +204,18 @@
         }
 
         final List<ValidationResult> results = new ArrayList<>();
+        final List<String> locations = getConfigLocations(validationContext);
 
-        if (configResources.getCount() == 0) {
+        if (locations.isEmpty()) {
             return results;
         }
 
         try {
-            ValidationResources resources = validationResourceHolder.get();
-
-            // if no resources in the holder, or if the holder has different resources loaded,
-            // then load the Configuration and set the new resources in the holder
-            if (resources == null || !configResources.equals(resources.getConfigResources())) {
-                getLogger().debug("Reloading validation resources");
-                final Configuration config = new ExtendedConfiguration(getLogger());
-                config.setClassLoader(Thread.currentThread().getContextClassLoader());
-                resources = new ValidationResources(configResources, getConfigurationFromResources(config, configResources));
-                validationResourceHolder.set(resources);
-            }
-
-            final Configuration conf = resources.getConfiguration();
+            final Configuration conf = getHadoopConfigurationForValidation(locations);
             results.addAll(KerberosProperties.validatePrincipalWithKeytabOrPassword(
-                this.getClass().getSimpleName(), conf, resolvedPrincipal, resolvedKeytab, explicitPassword, getLogger()));
+                    this.getClass().getSimpleName(), conf, resolvedPrincipal, resolvedKeytab, explicitPassword, getLogger()));
 
-            final URI fileSystemUri = FileSystem.getDefaultUri(conf);
-            if (isFileSystemAccessDenied(fileSystemUri)) {
-                results.add(new ValidationResult.Builder()
-                        .valid(false)
-                        .subject("Hadoop File System")
-                        .explanation(DENY_LFS_EXPLANATION)
-                        .build());
-            }
+            results.addAll(validateFileSystem(conf));
         } catch (final IOException e) {
             results.add(new ValidationResult.Builder()
                     .valid(false)
@@ -262,6 +244,36 @@
         return results;
     }
 
+    protected Collection<ValidationResult> validateFileSystem(final Configuration configuration) {
+        final List<ValidationResult> results = new ArrayList<>();
+
+        if (isFileSystemAccessDenied(FileSystem.getDefaultUri(configuration))) {
+            results.add(new ValidationResult.Builder()
+                    .valid(false)
+                    .subject("Hadoop File System")
+                    .explanation(DENY_LFS_EXPLANATION)
+                    .build());
+        }
+
+        return results;
+    }
+
+    protected Configuration getHadoopConfigurationForValidation(final List<String> locations) throws IOException {
+        ValidationResources resources = validationResourceHolder.get();
+
+        // if no resources in the holder, or if the holder has different resources loaded,
+        // then load the Configuration and set the new resources in the holder
+        if (resources == null || !locations.equals(resources.getConfigLocations())) {
+            getLogger().debug("Reloading validation resources");
+            final Configuration config = new ExtendedConfiguration(getLogger());
+            config.setClassLoader(Thread.currentThread().getContextClassLoader());
+            resources = new ValidationResources(locations, getConfigurationFromResources(config, locations));
+            validationResourceHolder.set(resources);
+        }
+
+        return resources.getConfiguration();
+    }
+
     /**
      * If your subclass also has an @OnScheduled annotated method and you need hdfsResources in that method, then be sure to call super.abstractOnScheduled(context)
      */
@@ -272,17 +284,22 @@
             // properties this processor sets. TODO: re-work ListHDFS to utilize Kerberos
             HdfsResources resources = hdfsResources.get();
             if (resources.getConfiguration() == null) {
-                final ResourceReferences configResources = context.getProperty(HADOOP_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().asResources();
-                resources = resetHDFSResources(configResources, context);
+                resources = resetHDFSResources(getConfigLocations(context), context);
                 hdfsResources.set(resources);
             }
         } catch (Exception ex) {
-            getLogger().error("HDFS Configuration error - {}", new Object[] { ex });
+            getLogger().error("HDFS Configuration error - {}", new Object[]{ex});
             hdfsResources.set(EMPTY_HDFS_RESOURCES);
             throw ex;
         }
     }
 
+    protected List<String> getConfigLocations(PropertyContext context) {
+            final ResourceReferences configResources = context.getProperty(HADOOP_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().asResources();
+            final List<String> locations = configResources.asLocations();
+            return locations;
+    }
+
     @OnStopped
     public final void abstractOnStopped() {
         final HdfsResources resources = hdfsResources.get();
@@ -345,10 +362,10 @@
         }
     }
 
-    private static Configuration getConfigurationFromResources(final Configuration config, final ResourceReferences resourceReferences) throws IOException {
-        boolean foundResources = resourceReferences.getCount() > 0;
+    private static Configuration getConfigurationFromResources(final Configuration config, final List<String> locations) throws IOException {
+        boolean foundResources = !locations.isEmpty();
+
         if (foundResources) {
-            final List<String> locations = resourceReferences.asLocations();
             for (String resource : locations) {
                 config.addResource(new Path(resource.trim()));
             }
@@ -372,11 +389,11 @@
     /*
      * Reset Hadoop Configuration and FileSystem based on the supplied configuration resources.
      */
-    HdfsResources resetHDFSResources(final ResourceReferences resourceReferences, ProcessContext context) throws IOException {
+    HdfsResources resetHDFSResources(final List<String> resourceLocations, ProcessContext context) throws IOException {
         Configuration config = new ExtendedConfiguration(getLogger());
         config.setClassLoader(Thread.currentThread().getContextClassLoader());
 
-        getConfigurationFromResources(config, resourceReferences);
+        getConfigurationFromResources(config, resourceLocations);
 
         // give sub-classes a chance to process configuration
         preProcessConfiguration(config, context);
@@ -559,7 +576,7 @@
                 kerberosUser.checkTGTAndRelogin();
             } catch (LoginException e) {
                 throw new ProcessException("Unable to relogin with kerberos credentials for " + kerberosUser.getPrincipal(), e);
-                }
+            }
         } else {
             getLogger().debug("kerberosUser was null, will not refresh TGT with KerberosUser");
         }
@@ -577,7 +594,7 @@
         return Boolean.parseBoolean(System.getenv(DENY_LFS_ACCESS));
     }
 
-    private boolean isFileSystemAccessDenied(final URI fileSystemUri) {
+    protected boolean isFileSystemAccessDenied(final URI fileSystemUri) {
         boolean accessDenied;
 
         if (isLocalFileSystemAccessDenied()) {
@@ -590,16 +607,16 @@
     }
 
     static protected class ValidationResources {
-        private final ResourceReferences configResources;
+        private final List<String> configLocations;
         private final Configuration configuration;
 
-        public ValidationResources(final ResourceReferences configResources, Configuration configuration) {
-            this.configResources = configResources;
+        public ValidationResources(final List<String> configLocations, final Configuration configuration) {
+            this.configLocations = configLocations;
             this.configuration = configuration;
         }
 
-        public ResourceReferences getConfigResources() {
-            return configResources;
+        public List<String> getConfigLocations() {
+            return configLocations;
         }
 
         public Configuration getConfiguration() {
@@ -611,7 +628,25 @@
         return getNormalizedPath(context, property, null);
     }
 
-    protected Path getNormalizedPath(ProcessContext context, PropertyDescriptor property, FlowFile flowFile) {
+    protected Path getNormalizedPath(final String rawPath) {
+        final Path path = new Path(rawPath);
+        final URI uri = path.toUri();
+
+        final URI fileSystemUri = getFileSystem().getUri();
+
+        if (uri.getScheme() != null) {
+            if (!uri.getScheme().equals(fileSystemUri.getScheme()) || !uri.getAuthority().equals(fileSystemUri.getAuthority())) {
+                getLogger().warn("The filesystem component of the URI configured ({}) does not match the filesystem URI from the Hadoop configuration file ({}) " +
+                        "and will be ignored.", uri, fileSystemUri);
+            }
+
+            return new Path(uri.getPath());
+        } else {
+            return path;
+        }
+    }
+
+    protected Path getNormalizedPath(final ProcessContext context, final PropertyDescriptor property, final FlowFile flowFile) {
         final String propertyValue = context.getProperty(property).evaluateAttributeExpressions(flowFile).getValue();
         final Path path = new Path(propertyValue);
         final URI uri = path.toUri();
@@ -629,4 +664,4 @@
             return path;
         }
     }
-}
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFS.java b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFS.java
deleted file mode 100644
index b86b7dc..0000000
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFS.java
+++ /dev/null
@@ -1,343 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.hadoop;
-
-import com.google.common.base.Throwables;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CreateFlag;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsCreateModes;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.nifi.components.AllowableValue;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.InputStreamCallback;
-import org.apache.nifi.stream.io.StreamUtils;
-import org.apache.nifi.util.StopWatch;
-import org.ietf.jgss.GSSException;
-
-import java.io.BufferedInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.security.PrivilegedAction;
-import java.util.EnumSet;
-import java.util.Optional;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Predicate;
-import java.util.stream.Stream;
-
-public abstract class AbstractPutHDFS extends AbstractHadoopProcessor {
-    protected static final String BUFFER_SIZE_KEY = "io.file.buffer.size";
-    protected static final int BUFFER_SIZE_DEFAULT = 4096;
-
-    protected static final String REPLACE_RESOLUTION = "replace";
-    protected static final String IGNORE_RESOLUTION = "ignore";
-    protected static final String FAIL_RESOLUTION = "fail";
-    protected static final String APPEND_RESOLUTION = "append";
-
-    protected static final AllowableValue REPLACE_RESOLUTION_AV = new AllowableValue(REPLACE_RESOLUTION,
-            REPLACE_RESOLUTION, "Replaces the existing file if any.");
-    protected static final AllowableValue IGNORE_RESOLUTION_AV = new AllowableValue(IGNORE_RESOLUTION, IGNORE_RESOLUTION,
-            "Ignores the flow file and routes it to success.");
-    protected static final AllowableValue FAIL_RESOLUTION_AV = new AllowableValue(FAIL_RESOLUTION, FAIL_RESOLUTION,
-            "Penalizes the flow file and routes it to failure.");
-    protected static final AllowableValue APPEND_RESOLUTION_AV = new AllowableValue(APPEND_RESOLUTION, APPEND_RESOLUTION,
-            "Appends to the existing file if any, creates a new file otherwise.");
-
-    protected static final PropertyDescriptor CONFLICT_RESOLUTION = new PropertyDescriptor.Builder()
-            .name("Conflict Resolution Strategy")
-            .description("Indicates what should happen when a file with the same name already exists in the output directory")
-            .required(true)
-            .defaultValue(FAIL_RESOLUTION_AV.getValue())
-            .allowableValues(REPLACE_RESOLUTION_AV, IGNORE_RESOLUTION_AV, FAIL_RESOLUTION_AV, APPEND_RESOLUTION_AV)
-            .build();
-
-    @Override
-    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
-        final FlowFile flowFile = session.get();
-        if (flowFile == null) {
-            return;
-        }
-
-        final FileSystem hdfs = getFileSystem();
-        final Configuration configuration = getConfiguration();
-        final UserGroupInformation ugi = getUserGroupInformation();
-
-        if (configuration == null || hdfs == null || ugi == null) {
-            getLogger().error("HDFS not configured properly");
-            session.transfer(flowFile, getFailureRelationship());
-            context.yield();
-            return;
-        }
-
-        ugi.doAs(new PrivilegedAction<Object>() {
-            @Override
-            public Object run() {
-                Path tempDotCopyFile = null;
-                FlowFile putFlowFile = flowFile;
-                try {
-                    final Path dirPath = getNormalizedPath(context, DIRECTORY, putFlowFile);
-
-                    final String conflictResponse = context.getProperty(CONFLICT_RESOLUTION).getValue();
-                    final long blockSize = getBlockSize(context, session, putFlowFile, dirPath);
-                    final int bufferSize = getBufferSize(context, session, putFlowFile);
-                    final short replication = getReplication(context, session, putFlowFile, dirPath);
-
-                    final CompressionCodec codec = getCompressionCodec(context, configuration);
-
-                    final String filename = codec != null
-                            ? putFlowFile.getAttribute(CoreAttributes.FILENAME.key()) + codec.getDefaultExtension()
-                            : putFlowFile.getAttribute(CoreAttributes.FILENAME.key());
-
-                    final Path tempCopyFile = new Path(dirPath, "." + filename);
-                    final Path copyFile = new Path(dirPath, filename);
-
-                    // Create destination directory if it does not exist
-                    try {
-                        if (!hdfs.getFileStatus(dirPath).isDirectory()) {
-                            throw new IOException(dirPath.toString() + " already exists and is not a directory");
-                        }
-                    } catch (FileNotFoundException fe) {
-                        if (!hdfs.mkdirs(dirPath)) {
-                            throw new IOException(dirPath.toString() + " could not be created");
-                        }
-                        changeOwner(context, hdfs, dirPath, flowFile);
-                    }
-
-                    final boolean destinationExists = hdfs.exists(copyFile);
-
-                    // If destination file already exists, resolve that based on processor configuration
-                    if (destinationExists) {
-                        switch (conflictResponse) {
-                            case REPLACE_RESOLUTION:
-                                if (hdfs.delete(copyFile, false)) {
-                                    getLogger().info("deleted {} in order to replace with the contents of {}",
-                                            new Object[]{copyFile, putFlowFile});
-                                }
-                                break;
-                            case IGNORE_RESOLUTION:
-                                session.transfer(putFlowFile, getSuccessRelationship());
-                                getLogger().info("transferring {} to success because file with same name already exists",
-                                        new Object[]{putFlowFile});
-                                return null;
-                            case FAIL_RESOLUTION:
-                                session.transfer(session.penalize(putFlowFile), getFailureRelationship());
-                                getLogger().warn("penalizing {} and routing to failure because file with same name already exists",
-                                        new Object[]{putFlowFile});
-                                return null;
-                            default:
-                                break;
-                        }
-                    }
-
-                    // Write FlowFile to temp file on HDFS
-                    final StopWatch stopWatch = new StopWatch(true);
-                    session.read(putFlowFile, new InputStreamCallback() {
-
-                        @Override
-                        public void process(InputStream in) throws IOException {
-                            OutputStream fos = null;
-                            Path createdFile = null;
-                            try {
-                                if (conflictResponse.equals(APPEND_RESOLUTION) && destinationExists) {
-                                    fos = hdfs.append(copyFile, bufferSize);
-                                } else {
-                                    final EnumSet<CreateFlag> cflags = EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE);
-
-                                    if (shouldIgnoreLocality(context, session)) {
-                                        cflags.add(CreateFlag.IGNORE_CLIENT_LOCALITY);
-                                    }
-
-                                    fos = hdfs.create(tempCopyFile, FsCreateModes.applyUMask(FsPermission.getFileDefault(),
-                                            FsPermission.getUMask(hdfs.getConf())), cflags, bufferSize, replication, blockSize,
-                                            null, null);
-                                }
-
-                                if (codec != null) {
-                                    fos = codec.createOutputStream(fos);
-                                }
-                                createdFile = tempCopyFile;
-                                BufferedInputStream bis = new BufferedInputStream(in);
-                                StreamUtils.copy(bis, fos);
-                                bis = null;
-                                fos.flush();
-                            } finally {
-                                try {
-                                    if (fos != null) {
-                                        fos.close();
-                                    }
-                                } catch (Throwable t) {
-                                    // when talking to remote HDFS clusters, we don't notice problems until fos.close()
-                                    if (createdFile != null) {
-                                        try {
-                                            hdfs.delete(createdFile, false);
-                                        } catch (Throwable ignore) {
-                                        }
-                                    }
-                                    throw t;
-                                }
-                                fos = null;
-                            }
-                        }
-
-                    });
-                    stopWatch.stop();
-                    final String dataRate = stopWatch.calculateDataRate(putFlowFile.getSize());
-                    final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
-                    tempDotCopyFile = tempCopyFile;
-
-                    if (!conflictResponse.equals(APPEND_RESOLUTION)
-                            || (conflictResponse.equals(APPEND_RESOLUTION) && !destinationExists)) {
-                        boolean renamed = false;
-                        for (int i = 0; i < 10; i++) { // try to rename multiple times.
-                            if (hdfs.rename(tempCopyFile, copyFile)) {
-                                renamed = true;
-                                break;// rename was successful
-                            }
-                            Thread.sleep(200L);// try waiting to let whatever might cause rename failure to resolve
-                        }
-                        if (!renamed) {
-                            hdfs.delete(tempCopyFile, false);
-                            throw new ProcessException("Copied file to HDFS but could not rename dot file " + tempCopyFile
-                                    + " to its final filename");
-                        }
-
-                        changeOwner(context, hdfs, copyFile, flowFile);
-                    }
-
-                    getLogger().info("copied {} to HDFS at {} in {} milliseconds at a rate of {}",
-                            new Object[]{putFlowFile, copyFile, millis, dataRate});
-
-                    final String newFilename = copyFile.getName();
-                    final String hdfsPath = copyFile.getParent().toString();
-                    putFlowFile = session.putAttribute(putFlowFile, CoreAttributes.FILENAME.key(), newFilename);
-                    putFlowFile = session.putAttribute(putFlowFile, ABSOLUTE_HDFS_PATH_ATTRIBUTE, hdfsPath);
-                    final Path qualifiedPath = copyFile.makeQualified(hdfs.getUri(), hdfs.getWorkingDirectory());
-                    session.getProvenanceReporter().send(putFlowFile, qualifiedPath.toString());
-
-                    session.transfer(putFlowFile, getSuccessRelationship());
-
-                } catch (final IOException e) {
-                    Optional<GSSException> causeOptional = findCause(e, GSSException.class, gsse -> GSSException.NO_CRED == gsse.getMajor());
-                    if (causeOptional.isPresent()) {
-                        getLogger().warn("An error occurred while connecting to HDFS. "
-                                        + "Rolling back session, and penalizing flow file {}",
-                                new Object[] {putFlowFile.getAttribute(CoreAttributes.UUID.key()), causeOptional.get()});
-                        session.rollback(true);
-                    } else {
-                        getLogger().error("Failed to access HDFS due to {}", new Object[]{e});
-                        session.transfer(putFlowFile, getFailureRelationship());
-                    }
-                } catch (final Throwable t) {
-                    if (tempDotCopyFile != null) {
-                        try {
-                            hdfs.delete(tempDotCopyFile, false);
-                        } catch (Exception e) {
-                            getLogger().error("Unable to remove temporary file {} due to {}", new Object[]{tempDotCopyFile, e});
-                        }
-                    }
-                    getLogger().error("Failed to write to HDFS due to {}", new Object[]{t});
-                    session.transfer(session.penalize(putFlowFile), getFailureRelationship());
-                    context.yield();
-                }
-
-                return null;
-            }
-        });
-    }
-
-    /**
-     * Returns with the expected block size.
-     */
-    protected abstract long getBlockSize(final ProcessContext context, final ProcessSession session, final FlowFile flowFile, final Path dirPath);
-
-    /**
-     * Returns with the expected buffer size.
-     */
-    protected abstract int getBufferSize(final ProcessContext context, final ProcessSession session, final FlowFile flowFile);
-
-    /**
-     * Returns with the expected replication factor.
-     */
-    protected abstract short getReplication(final ProcessContext context, final ProcessSession session, final FlowFile flowFile, final Path dirPath);
-
-    /**
-     * Returns if file system should ignore locality.
-     */
-    protected abstract boolean shouldIgnoreLocality(final ProcessContext context, final ProcessSession session);
-
-    /**
-     * If returns a non-null value, the uploaded file's owner will be changed to this value after it is written. This only
-     * works if NiFi is running as a user that has privilege to change owner.
-     */
-    protected abstract String getOwner(final ProcessContext context, final FlowFile flowFile);
-
-    /**
-     * I returns a non-null value, thee uploaded file's group will be changed to this value after it is written. This only
-     * works if NiFi is running as a user that has privilege to change group.
-     */
-    protected abstract String getGroup(final ProcessContext context, final FlowFile flowFile);
-
-    /**
-     * @return The relationship the flow file will be transferred in case of successful execution.
-     */
-    protected abstract Relationship getSuccessRelationship();
-
-    /**
-     * @return The relationship the flow file will be transferred in case of failed execution.
-     */
-    protected abstract Relationship getFailureRelationship();
-
-    /**
-     * Returns an optional with the first throwable in the causal chain that is assignable to the provided cause type,
-     * and satisfies the provided cause predicate, {@link Optional#empty()} otherwise.
-     * @param t The throwable to inspect for the cause.
-     * @return
-     */
-    private <T extends Throwable> Optional<T> findCause(Throwable t, Class<T> expectedCauseType, Predicate<T> causePredicate) {
-        Stream<Throwable> causalChain = Throwables.getCausalChain(t).stream();
-        return causalChain
-                .filter(expectedCauseType::isInstance)
-                .map(expectedCauseType::cast)
-                .filter(causePredicate)
-                .findFirst();
-    }
-
-    protected void changeOwner(final ProcessContext context, final FileSystem hdfs, final Path name, final FlowFile flowFile) {
-        try {
-            // Change owner and group of file if configured to do so
-            final String owner = getOwner(context, flowFile);
-            final String group = getGroup(context, flowFile);
-
-            if (owner != null || group != null) {
-                hdfs.setOwner(name, owner, group);
-            }
-        } catch (Exception e) {
-            getLogger().warn("Could not change owner or group of {} on HDFS due to {}", new Object[]{name, e});
-        }
-    }
-}
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java
index 9296507..2c9285f 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java
@@ -144,9 +144,7 @@
 
         // We need a FlowFile to report provenance correctly.
         final FlowFile finalFlowFile = originalFlowFile != null ? originalFlowFile : session.create();
-
-        final String fileOrDirectoryName = getNormalizedPath(context, FILE_OR_DIRECTORY, finalFlowFile).toString();
-
+        final String fileOrDirectoryName = getPath(context, session, finalFlowFile);
         final FileSystem fileSystem = getFileSystem();
         final UserGroupInformation ugi = getUserGroupInformation();
 
@@ -171,11 +169,11 @@
                     if (fileSystem.exists(path)) {
                         try {
                             Map<String, String> attributes = Maps.newHashMapWithExpectedSize(2);
-                            attributes.put("hdfs.filename", path.getName());
-                            attributes.put("hdfs.path", path.getParent().toString());
+                            attributes.put(getAttributePrefix() + ".filename", path.getName());
+                            attributes.put(getAttributePrefix() + ".path", path.getParent().toString());
                             flowFile = session.putAllAttributes(flowFile, attributes);
 
-                            fileSystem.delete(path, context.getProperty(RECURSIVE).asBoolean());
+                            fileSystem.delete(path, isRecursive(context, session));
                             getLogger().debug("For flowfile {} Deleted file at path {} with name {}", new Object[]{originalFlowFile, path.getParent().toString(), path.getName()});
                             final Path qualifiedPath = path.makeQualified(fileSystem.getUri(), fileSystem.getWorkingDirectory());
                             session.getProvenanceReporter().invokeRemoteProcess(flowFile, qualifiedPath.toString());
@@ -186,27 +184,47 @@
 
                             Map<String, String> attributes = Maps.newHashMapWithExpectedSize(1);
                             // The error message is helpful in understanding at a flowfile level what caused the IOException (which ACL is denying the operation, e.g.)
-                            attributes.put("hdfs.error.message", ioe.getMessage());
+                            attributes.put(getAttributePrefix() + ".error.message", ioe.getMessage());
 
-                            session.transfer(session.putAllAttributes(session.clone(flowFile), attributes), REL_FAILURE);
+                            session.transfer(session.putAllAttributes(session.clone(flowFile), attributes), getFailureRelationship());
                             failedPath++;
                         }
                     }
                 }
 
                 if (failedPath == 0) {
-                    session.transfer(flowFile, DeleteHDFS.REL_SUCCESS);
+                    session.transfer(flowFile, getSuccessRelationship());
                 } else {
                     // If any path has been failed to be deleted, remove the FlowFile as it's been cloned and sent to failure.
                     session.remove(flowFile);
                 }
             } catch (IOException e) {
                 getLogger().error("Error processing delete for flowfile {} due to {}", new Object[]{flowFile, e.getMessage()}, e);
-                session.transfer(flowFile, DeleteHDFS.REL_FAILURE);
+                session.transfer(flowFile, getFailureRelationship());
             }
 
             return null;
         });
 
     }
+
+    protected Relationship getSuccessRelationship() {
+        return REL_SUCCESS;
+    }
+
+    protected Relationship getFailureRelationship() {
+        return REL_FAILURE;
+    }
+
+    protected boolean isRecursive(final ProcessContext context, final ProcessSession session) {
+        return context.getProperty(RECURSIVE).asBoolean();
+    }
+
+    protected String getPath(final ProcessContext context, final ProcessSession session, final FlowFile finalFlowFile) {
+        return getNormalizedPath(context, FILE_OR_DIRECTORY, finalFlowFile).toString();
+    }
+
+    protected String getAttributePrefix() {
+        return "hdfs";
+    }
 }
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
index b60ee5b..e4acaac 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
@@ -17,7 +17,7 @@
 package org.apache.nifi.processors.hadoop;
 
 import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -121,16 +121,16 @@
 
         final FileSystem hdfs = getFileSystem();
         final UserGroupInformation ugi = getUserGroupInformation();
-        final String filenameValue = context.getProperty(FILENAME).evaluateAttributeExpressions(flowFile).getValue();
+        final String filenameValue = getPath(context, flowFile);
 
         final Path path;
         try {
-            path = getNormalizedPath(context, FILENAME, flowFile);
+            path = getNormalizedPath(getPath(context, flowFile));
         } catch (IllegalArgumentException e) {
             getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[] {filenameValue, flowFile, e});
-            flowFile = session.putAttribute(flowFile, "hdfs.failure.reason", e.getMessage());
+            flowFile = session.putAttribute(flowFile, getAttributePrefix() + ".failure.reason", e.getMessage());
             flowFile = session.penalize(flowFile);
-            session.transfer(flowFile, REL_FAILURE);
+            session.transfer(flowFile, getFailureRelationship());
             return;
         }
 
@@ -144,7 +144,7 @@
                 CompressionCodec codec = null;
                 Configuration conf = getConfiguration();
                 final CompressionCodecFactory compressionCodecFactory = new CompressionCodecFactory(conf);
-                final CompressionType compressionType = CompressionType.valueOf(context.getProperty(COMPRESSION_CODEC).toString());
+                final CompressionType compressionType = getCompressionType(context);
                 final boolean inferCompressionCodec = compressionType == CompressionType.AUTOMATIC;
 
                 if(inferCompressionCodec) {
@@ -174,16 +174,16 @@
                     stopWatch.stop();
                     getLogger().info("Successfully received content from {} for {} in {}", new Object[] {qualifiedPath, flowFile, stopWatch.getDuration()});
                     session.getProvenanceReporter().fetch(flowFile, qualifiedPath.toString(), stopWatch.getDuration(TimeUnit.MILLISECONDS));
-                    session.transfer(flowFile, REL_SUCCESS);
+                    session.transfer(flowFile, getSuccessRelationship());
                 } catch (final FileNotFoundException | AccessControlException e) {
                     getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[] {qualifiedPath, flowFile, e});
-                    flowFile = session.putAttribute(flowFile, "hdfs.failure.reason", e.getMessage());
+                    flowFile = session.putAttribute(flowFile, getAttributePrefix() + ".failure.reason", e.getMessage());
                     flowFile = session.penalize(flowFile);
-                    session.transfer(flowFile, REL_FAILURE);
+                    session.transfer(flowFile, getFailureRelationship());
                 } catch (final IOException e) {
                     getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to comms.failure", new Object[] {qualifiedPath, flowFile, e});
                     flowFile = session.penalize(flowFile);
-                    session.transfer(flowFile, REL_COMMS_FAILURE);
+                    session.transfer(flowFile, getCommsFailureRelationship());
                 } finally {
                     IOUtils.closeQuietly(stream);
                 }
@@ -191,7 +191,29 @@
                 return null;
             }
         });
-
     }
 
+    protected Relationship getSuccessRelationship() {
+        return REL_SUCCESS;
+    }
+
+    protected Relationship getFailureRelationship() {
+        return REL_FAILURE;
+    }
+
+    protected Relationship getCommsFailureRelationship() {
+        return REL_COMMS_FAILURE;
+    }
+
+    protected String getPath(final ProcessContext context, final FlowFile flowFile) {
+        return context.getProperty(FILENAME).evaluateAttributeExpressions(flowFile).getValue();
+    }
+
+    protected String getAttributePrefix() {
+        return "hdfs";
+    }
+
+    protected CompressionType getCompressionType(final ProcessContext context) {
+        return CompressionType.valueOf(context.getProperty(COMPRESSION_CODEC).toString());
+    }
 }
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
index 583b0b8..907a141 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
@@ -503,7 +503,7 @@
             final Map<String, String> attributes = createAttributes(status);
             FlowFile flowFile = session.create();
             flowFile = session.putAllAttributes(flowFile, attributes);
-            session.transfer(flowFile, REL_SUCCESS);
+            session.transfer(flowFile, getSuccessRelationship());
         }
     }
 
@@ -528,7 +528,7 @@
         attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
         flowFile = session.putAllAttributes(flowFile, attributes);
 
-        session.transfer(flowFile, REL_SUCCESS);
+        session.transfer(flowFile, getSuccessRelationship());
     }
 
     private Record createRecord(final FileStatus fileStatus) {
@@ -620,15 +620,15 @@
         attributes.put(CoreAttributes.FILENAME.key(), status.getPath().getName());
         attributes.put(CoreAttributes.PATH.key(), getAbsolutePath(status.getPath().getParent()));
 
-        attributes.put("hdfs.owner", status.getOwner());
-        attributes.put("hdfs.group", status.getGroup());
-        attributes.put("hdfs.lastModified", String.valueOf(status.getModificationTime()));
-        attributes.put("hdfs.length", String.valueOf(status.getLen()));
-        attributes.put("hdfs.replication", String.valueOf(status.getReplication()));
+        attributes.put(getAttributePrefix() + ".owner", status.getOwner());
+        attributes.put(getAttributePrefix() + ".group", status.getGroup());
+        attributes.put(getAttributePrefix() + ".lastModified", String.valueOf(status.getModificationTime()));
+        attributes.put(getAttributePrefix() + ".length", String.valueOf(status.getLen()));
+        attributes.put(getAttributePrefix() + ".replication", String.valueOf(status.getReplication()));
 
         final FsPermission permission = status.getPermission();
         final String perms = getPerms(permission.getUserAction()) + getPerms(permission.getGroupAction()) + getPerms(permission.getOtherAction());
-        attributes.put("hdfs.permissions", perms);
+        attributes.put(getAttributePrefix() + ".permissions", perms);
         return attributes;
     }
 
@@ -669,4 +669,11 @@
         };
     }
 
+    protected Relationship getSuccessRelationship() {
+        return REL_SUCCESS;
+    }
+
+    protected String getAttributePrefix() {
+        return "hdfs";
+    }
 }
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
index 00942e3..29327ec 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
@@ -16,9 +16,15 @@
  */
 package org.apache.nifi.processors.hadoop;
 
+import com.google.common.base.Throwables;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsCreateModes;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.ReadsAttribute;
@@ -29,22 +35,40 @@
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyValue;
 import org.apache.nifi.components.RequiredPermission;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.StopWatch;
+import org.ietf.jgss.GSSException;
 
+import java.io.BufferedInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.security.PrivilegedAction;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+import java.util.stream.Stream;
 
 /**
  * This processor copies FlowFiles to HDFS.
@@ -63,7 +87,11 @@
         requiredPermission = RequiredPermission.WRITE_DISTRIBUTED_FILESYSTEM,
         explanation = "Provides operator the ability to delete any file that NiFi has access to in HDFS or the local filesystem.")
 })
-public class PutHDFS extends AbstractPutHDFS {
+public class PutHDFS extends AbstractHadoopProcessor {
+
+    protected static final String BUFFER_SIZE_KEY = "io.file.buffer.size";
+    protected static final int BUFFER_SIZE_DEFAULT = 4096;
+
     // relationships
 
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
@@ -78,6 +106,28 @@
 
     // properties
 
+    protected static final String REPLACE_RESOLUTION = "replace";
+    protected static final String IGNORE_RESOLUTION = "ignore";
+    protected static final String FAIL_RESOLUTION = "fail";
+    protected static final String APPEND_RESOLUTION = "append";
+
+    protected static final AllowableValue REPLACE_RESOLUTION_AV = new AllowableValue(REPLACE_RESOLUTION,
+            REPLACE_RESOLUTION, "Replaces the existing file if any.");
+    protected static final AllowableValue IGNORE_RESOLUTION_AV = new AllowableValue(IGNORE_RESOLUTION, IGNORE_RESOLUTION,
+            "Ignores the flow file and routes it to success.");
+    protected static final AllowableValue FAIL_RESOLUTION_AV = new AllowableValue(FAIL_RESOLUTION, FAIL_RESOLUTION,
+            "Penalizes the flow file and routes it to failure.");
+    protected static final AllowableValue APPEND_RESOLUTION_AV = new AllowableValue(APPEND_RESOLUTION, APPEND_RESOLUTION,
+            "Appends to the existing file if any, creates a new file otherwise.");
+
+    protected static final PropertyDescriptor CONFLICT_RESOLUTION = new PropertyDescriptor.Builder()
+            .name("Conflict Resolution Strategy")
+            .description("Indicates what should happen when a file with the same name already exists in the output directory")
+            .required(true)
+            .defaultValue(FAIL_RESOLUTION_AV.getValue())
+            .allowableValues(REPLACE_RESOLUTION_AV, IGNORE_RESOLUTION_AV, FAIL_RESOLUTION_AV, APPEND_RESOLUTION_AV)
+            .build();
+
     public static final PropertyDescriptor BLOCK_SIZE = new PropertyDescriptor.Builder()
             .name("Block Size")
             .description("Size of each block as written to HDFS. This overrides the Hadoop Configuration")
@@ -179,48 +229,263 @@
     }
 
     @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final FileSystem hdfs = getFileSystem();
+        final Configuration configuration = getConfiguration();
+        final UserGroupInformation ugi = getUserGroupInformation();
+
+        if (configuration == null || hdfs == null || ugi == null) {
+            getLogger().error("HDFS not configured properly");
+            session.transfer(flowFile, getFailureRelationship());
+            context.yield();
+            return;
+        }
+
+        ugi.doAs(new PrivilegedAction<Object>() {
+            @Override
+            public Object run() {
+                Path tempDotCopyFile = null;
+                FlowFile putFlowFile = flowFile;
+                try {
+                    final Path dirPath = getNormalizedPath(context, DIRECTORY, putFlowFile);
+
+                    final String conflictResponse = context.getProperty(CONFLICT_RESOLUTION).getValue();
+                    final long blockSize = getBlockSize(context, session, putFlowFile, dirPath);
+                    final int bufferSize = getBufferSize(context, session, putFlowFile);
+                    final short replication = getReplication(context, session, putFlowFile, dirPath);
+
+                    final CompressionCodec codec = getCompressionCodec(context, configuration);
+
+                    final String filename = codec != null
+                            ? putFlowFile.getAttribute(CoreAttributes.FILENAME.key()) + codec.getDefaultExtension()
+                            : putFlowFile.getAttribute(CoreAttributes.FILENAME.key());
+
+                    final Path tempCopyFile = new Path(dirPath, "." + filename);
+                    final Path copyFile = new Path(dirPath, filename);
+
+                    // Create destination directory if it does not exist
+                    try {
+                        if (!hdfs.getFileStatus(dirPath).isDirectory()) {
+                            throw new IOException(dirPath.toString() + " already exists and is not a directory");
+                        }
+                    } catch (FileNotFoundException fe) {
+                        if (!hdfs.mkdirs(dirPath)) {
+                            throw new IOException(dirPath.toString() + " could not be created");
+                        }
+                        changeOwner(context, hdfs, dirPath, flowFile);
+                    }
+
+                    final boolean destinationExists = hdfs.exists(copyFile);
+
+                    // If destination file already exists, resolve that based on processor configuration
+                    if (destinationExists) {
+                        switch (conflictResponse) {
+                            case REPLACE_RESOLUTION:
+                                if (hdfs.delete(copyFile, false)) {
+                                    getLogger().info("deleted {} in order to replace with the contents of {}",
+                                            new Object[]{copyFile, putFlowFile});
+                                }
+                                break;
+                            case IGNORE_RESOLUTION:
+                                session.transfer(putFlowFile, getSuccessRelationship());
+                                getLogger().info("transferring {} to success because file with same name already exists",
+                                        new Object[]{putFlowFile});
+                                return null;
+                            case FAIL_RESOLUTION:
+                                session.transfer(session.penalize(putFlowFile), getFailureRelationship());
+                                getLogger().warn("penalizing {} and routing to failure because file with same name already exists",
+                                        new Object[]{putFlowFile});
+                                return null;
+                            default:
+                                break;
+                        }
+                    }
+
+                    // Write FlowFile to temp file on HDFS
+                    final StopWatch stopWatch = new StopWatch(true);
+                    session.read(putFlowFile, new InputStreamCallback() {
+
+                        @Override
+                        public void process(InputStream in) throws IOException {
+                            OutputStream fos = null;
+                            Path createdFile = null;
+                            try {
+                                if (conflictResponse.equals(APPEND_RESOLUTION) && destinationExists) {
+                                    fos = hdfs.append(copyFile, bufferSize);
+                                } else {
+                                    final EnumSet<CreateFlag> cflags = EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE);
+
+                                    if (shouldIgnoreLocality(context, session)) {
+                                        cflags.add(CreateFlag.IGNORE_CLIENT_LOCALITY);
+                                    }
+
+                                    fos = hdfs.create(tempCopyFile, FsCreateModes.applyUMask(FsPermission.getFileDefault(),
+                                            FsPermission.getUMask(hdfs.getConf())), cflags, bufferSize, replication, blockSize,
+                                            null, null);
+                                }
+
+                                if (codec != null) {
+                                    fos = codec.createOutputStream(fos);
+                                }
+                                createdFile = tempCopyFile;
+                                BufferedInputStream bis = new BufferedInputStream(in);
+                                StreamUtils.copy(bis, fos);
+                                bis = null;
+                                fos.flush();
+                            } finally {
+                                try {
+                                    if (fos != null) {
+                                        fos.close();
+                                    }
+                                } catch (Throwable t) {
+                                    // when talking to remote HDFS clusters, we don't notice problems until fos.close()
+                                    if (createdFile != null) {
+                                        try {
+                                            hdfs.delete(createdFile, false);
+                                        } catch (Throwable ignore) {
+                                        }
+                                    }
+                                    throw t;
+                                }
+                                fos = null;
+                            }
+                        }
+
+                    });
+                    stopWatch.stop();
+                    final String dataRate = stopWatch.calculateDataRate(putFlowFile.getSize());
+                    final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
+                    tempDotCopyFile = tempCopyFile;
+
+                    if (!conflictResponse.equals(APPEND_RESOLUTION)
+                            || (conflictResponse.equals(APPEND_RESOLUTION) && !destinationExists)) {
+                        boolean renamed = false;
+                        for (int i = 0; i < 10; i++) { // try to rename multiple times.
+                            if (hdfs.rename(tempCopyFile, copyFile)) {
+                                renamed = true;
+                                break;// rename was successful
+                            }
+                            Thread.sleep(200L);// try waiting to let whatever might cause rename failure to resolve
+                        }
+                        if (!renamed) {
+                            hdfs.delete(tempCopyFile, false);
+                            throw new ProcessException("Copied file to HDFS but could not rename dot file " + tempCopyFile
+                                    + " to its final filename");
+                        }
+
+                        changeOwner(context, hdfs, copyFile, flowFile);
+                    }
+
+                    getLogger().info("copied {} to HDFS at {} in {} milliseconds at a rate of {}",
+                            new Object[]{putFlowFile, copyFile, millis, dataRate});
+
+                    final String newFilename = copyFile.getName();
+                    final String hdfsPath = copyFile.getParent().toString();
+                    putFlowFile = session.putAttribute(putFlowFile, CoreAttributes.FILENAME.key(), newFilename);
+                    putFlowFile = session.putAttribute(putFlowFile, ABSOLUTE_HDFS_PATH_ATTRIBUTE, hdfsPath);
+                    final Path qualifiedPath = copyFile.makeQualified(hdfs.getUri(), hdfs.getWorkingDirectory());
+                    session.getProvenanceReporter().send(putFlowFile, qualifiedPath.toString());
+
+                    session.transfer(putFlowFile, getSuccessRelationship());
+
+                } catch (final IOException e) {
+                    Optional<GSSException> causeOptional = findCause(e, GSSException.class, gsse -> GSSException.NO_CRED == gsse.getMajor());
+                    if (causeOptional.isPresent()) {
+                        getLogger().warn("An error occurred while connecting to HDFS. "
+                                        + "Rolling back session, and penalizing flow file {}",
+                                new Object[] {putFlowFile.getAttribute(CoreAttributes.UUID.key()), causeOptional.get()});
+                        session.rollback(true);
+                    } else {
+                        getLogger().error("Failed to access HDFS due to {}", new Object[]{e});
+                        session.transfer(putFlowFile, getFailureRelationship());
+                    }
+                } catch (final Throwable t) {
+                    if (tempDotCopyFile != null) {
+                        try {
+                            hdfs.delete(tempDotCopyFile, false);
+                        } catch (Exception e) {
+                            getLogger().error("Unable to remove temporary file {} due to {}", new Object[]{tempDotCopyFile, e});
+                        }
+                    }
+                    getLogger().error("Failed to write to HDFS due to {}", new Object[]{t});
+                    session.transfer(session.penalize(putFlowFile), getFailureRelationship());
+                    context.yield();
+                }
+
+                return null;
+            }
+        });
+    }
+
     protected Relationship getSuccessRelationship() {
         return REL_SUCCESS;
     }
 
-    @Override
     protected Relationship getFailureRelationship() {
         return REL_FAILURE;
     }
 
-    @Override
     protected long getBlockSize(final ProcessContext context, final ProcessSession session, final FlowFile flowFile, Path dirPath) {
         final Double blockSizeProp = context.getProperty(BLOCK_SIZE).asDataSize(DataUnit.B);
         return blockSizeProp != null ? blockSizeProp.longValue() : getFileSystem().getDefaultBlockSize(dirPath);
     }
 
-    @Override
     protected int getBufferSize(final ProcessContext context, final ProcessSession session, final FlowFile flowFile) {
         final Double bufferSizeProp = context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B);
         return bufferSizeProp != null ? bufferSizeProp.intValue() : getConfiguration().getInt(BUFFER_SIZE_KEY, BUFFER_SIZE_DEFAULT);
     }
 
-    @Override
     protected short getReplication(final ProcessContext context, final ProcessSession session, final FlowFile flowFile, Path dirPath) {
         final Integer replicationProp = context.getProperty(REPLICATION_FACTOR).asInteger();
         return replicationProp != null ? replicationProp.shortValue() : getFileSystem()
                 .getDefaultReplication(dirPath);
     }
 
-    @Override
     protected boolean shouldIgnoreLocality(final ProcessContext context, final ProcessSession session) {
         return context.getProperty(IGNORE_LOCALITY).asBoolean();
     }
 
-    @Override
     protected String getOwner(final ProcessContext context, final FlowFile flowFile) {
         final String owner = context.getProperty(REMOTE_OWNER).evaluateAttributeExpressions(flowFile).getValue();
         return owner == null || owner.isEmpty() ? null : owner;
     }
 
-    @Override
     protected String getGroup(final ProcessContext context, final FlowFile flowFile) {
         final String group = context.getProperty(REMOTE_GROUP).evaluateAttributeExpressions(flowFile).getValue();
         return group == null || group.isEmpty() ? null : group;
     }
+
+    /**
+     * Returns an optional with the first throwable in the causal chain that is assignable to the provided cause type,
+     * and satisfies the provided cause predicate, {@link Optional#empty()} otherwise.
+     * @param t The throwable to inspect for the cause.
+     * @return
+     */
+    private <T extends Throwable> Optional<T> findCause(Throwable t, Class<T> expectedCauseType, Predicate<T> causePredicate) {
+        Stream<Throwable> causalChain = Throwables.getCausalChain(t).stream();
+        return causalChain
+                .filter(expectedCauseType::isInstance)
+                .map(expectedCauseType::cast)
+                .filter(causePredicate)
+                .findFirst();
+    }
+
+    protected void changeOwner(final ProcessContext context, final FileSystem hdfs, final Path name, final FlowFile flowFile) {
+        try {
+            // Change owner and group of file if configured to do so
+            final String owner = getOwner(context, flowFile);
+            final String group = getGroup(context, flowFile);
+
+            if (owner != null || group != null) {
+                hdfs.setOwner(name, owner, group);
+            }
+        } catch (Exception e) {
+            getLogger().warn("Could not change owner or group of {} on HDFS due to {}", new Object[]{name, e});
+        }
+    }
 }
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/AbstractHadoopTest.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/AbstractHadoopTest.java
index 0cd069e..342c71b 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/AbstractHadoopTest.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/AbstractHadoopTest.java
@@ -46,6 +46,7 @@
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Optional;
 
 import static org.junit.Assert.assertEquals;
@@ -124,7 +125,8 @@
             final File brokenCoreSite = new File("src/test/resources/core-site-broken.xml");
             final ResourceReference brokenCoreSiteReference = new FileResourceReference(brokenCoreSite);
             final ResourceReferences references = new StandardResourceReferences(Collections.singletonList(brokenCoreSiteReference));
-            processor.resetHDFSResources(references, runner.getProcessContext());
+            final List<String> locations = references.asLocations();
+            processor.resetHDFSResources(locations, runner.getProcessContext());
             Assert.fail("Should have thrown SocketTimeoutException");
         } catch (IOException e) {
         }
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFileTest.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFileTest.java
index ab49d63..b7920b8 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFileTest.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFileTest.java
@@ -21,7 +21,6 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.nifi.components.resource.ResourceReferences;
 import org.apache.nifi.hadoop.KerberosProperties;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessorInitializationContext;
@@ -35,6 +34,7 @@
 import java.io.File;
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
+import java.util.List;
 
 import static org.junit.Assert.assertFalse;
 import static org.mockito.Mockito.mock;
@@ -96,7 +96,7 @@
 
     public class TestableGetHDFSSequenceFile extends GetHDFSSequenceFile {
         @Override
-        HdfsResources resetHDFSResources(ResourceReferences configResources, ProcessContext context) throws IOException {
+        HdfsResources resetHDFSResources(final List<String> resourceLocations, ProcessContext context) throws IOException {
             return hdfsResources;
         }