NIFI-8717 Refactoring PutHDFS (#5175)

NIFI-8717 Refactoring PutHDFS
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFS.java b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFS.java
new file mode 100644
index 0000000..44fae65
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFS.java
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop;
+
+import com.google.common.base.Throwables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsCreateModes;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.StopWatch;
+import org.ietf.jgss.GSSException;
+
+import java.io.BufferedInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.security.PrivilegedAction;
+import java.util.EnumSet;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+import java.util.stream.Stream;
+
+public abstract class AbstractPutHDFS extends AbstractHadoopProcessor {
+    protected static final String BUFFER_SIZE_KEY = "io.file.buffer.size";
+    protected static final int BUFFER_SIZE_DEFAULT = 4096;
+
+    protected static final String REPLACE_RESOLUTION = "replace";
+    protected static final String IGNORE_RESOLUTION = "ignore";
+    protected static final String FAIL_RESOLUTION = "fail";
+    protected static final String APPEND_RESOLUTION = "append";
+
+    protected static final AllowableValue REPLACE_RESOLUTION_AV = new AllowableValue(REPLACE_RESOLUTION,
+            REPLACE_RESOLUTION, "Replaces the existing file if any.");
+    protected static final AllowableValue IGNORE_RESOLUTION_AV = new AllowableValue(IGNORE_RESOLUTION, IGNORE_RESOLUTION,
+            "Ignores the flow file and routes it to success.");
+    protected static final AllowableValue FAIL_RESOLUTION_AV = new AllowableValue(FAIL_RESOLUTION, FAIL_RESOLUTION,
+            "Penalizes the flow file and routes it to failure.");
+    protected static final AllowableValue APPEND_RESOLUTION_AV = new AllowableValue(APPEND_RESOLUTION, APPEND_RESOLUTION,
+            "Appends to the existing file if any, creates a new file otherwise.");
+
+    protected static final PropertyDescriptor CONFLICT_RESOLUTION = new PropertyDescriptor.Builder()
+            .name("Conflict Resolution Strategy")
+            .description("Indicates what should happen when a file with the same name already exists in the output directory")
+            .required(true)
+            .defaultValue(FAIL_RESOLUTION_AV.getValue())
+            .allowableValues(REPLACE_RESOLUTION_AV, IGNORE_RESOLUTION_AV, FAIL_RESOLUTION_AV, APPEND_RESOLUTION_AV)
+            .build();
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final FileSystem hdfs = getFileSystem();
+        final Configuration configuration = getConfiguration();
+        final UserGroupInformation ugi = getUserGroupInformation();
+
+        if (configuration == null || hdfs == null || ugi == null) {
+            getLogger().error("HDFS not configured properly");
+            session.transfer(flowFile, getFailureRelationship());
+            context.yield();
+            return;
+        }
+
+        ugi.doAs(new PrivilegedAction<Object>() {
+            @Override
+            public Object run() {
+                Path tempDotCopyFile = null;
+                FlowFile putFlowFile = flowFile;
+                try {
+                    final String dirValue = context.getProperty(DIRECTORY).evaluateAttributeExpressions(putFlowFile).getValue();
+                    final Path configuredRootDirPath = new Path(dirValue);
+
+                    final String conflictResponse = context.getProperty(CONFLICT_RESOLUTION).getValue();
+                    final long blockSize = getBlockSize(context, session, putFlowFile);
+                    final int bufferSize = getBufferSize(context, session, putFlowFile);
+                    final short replication = getReplication(context, session, putFlowFile);
+
+                    final CompressionCodec codec = getCompressionCodec(context, configuration);
+
+                    final String filename = codec != null
+                            ? putFlowFile.getAttribute(CoreAttributes.FILENAME.key()) + codec.getDefaultExtension()
+                            : putFlowFile.getAttribute(CoreAttributes.FILENAME.key());
+
+                    final Path tempCopyFile = new Path(configuredRootDirPath, "." + filename);
+                    final Path copyFile = new Path(configuredRootDirPath, filename);
+
+                    // Create destination directory if it does not exist
+                    try {
+                        if (!hdfs.getFileStatus(configuredRootDirPath).isDirectory()) {
+                            throw new IOException(configuredRootDirPath.toString() + " already exists and is not a directory");
+                        }
+                    } catch (FileNotFoundException fe) {
+                        if (!hdfs.mkdirs(configuredRootDirPath)) {
+                            throw new IOException(configuredRootDirPath.toString() + " could not be created");
+                        }
+                        changeOwner(context, hdfs, configuredRootDirPath, flowFile);
+                    }
+
+                    final boolean destinationExists = hdfs.exists(copyFile);
+
+                    // If destination file already exists, resolve that based on processor configuration
+                    if (destinationExists) {
+                        switch (conflictResponse) {
+                            case REPLACE_RESOLUTION:
+                                if (hdfs.delete(copyFile, false)) {
+                                    getLogger().info("deleted {} in order to replace with the contents of {}",
+                                            new Object[]{copyFile, putFlowFile});
+                                }
+                                break;
+                            case IGNORE_RESOLUTION:
+                                session.transfer(putFlowFile, getSuccessRelationship());
+                                getLogger().info("transferring {} to success because file with same name already exists",
+                                        new Object[]{putFlowFile});
+                                return null;
+                            case FAIL_RESOLUTION:
+                                session.transfer(session.penalize(putFlowFile), getFailureRelationship());
+                                getLogger().warn("penalizing {} and routing to failure because file with same name already exists",
+                                        new Object[]{putFlowFile});
+                                return null;
+                            default:
+                                break;
+                        }
+                    }
+
+                    // Write FlowFile to temp file on HDFS
+                    final StopWatch stopWatch = new StopWatch(true);
+                    session.read(putFlowFile, new InputStreamCallback() {
+
+                        @Override
+                        public void process(InputStream in) throws IOException {
+                            OutputStream fos = null;
+                            Path createdFile = null;
+                            try {
+                                if (conflictResponse.equals(APPEND_RESOLUTION) && destinationExists) {
+                                    fos = hdfs.append(copyFile, bufferSize);
+                                } else {
+                                    final EnumSet<CreateFlag> cflags = EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE);
+
+                                    if (shouldIgnoreLocality(context, session)) {
+                                        cflags.add(CreateFlag.IGNORE_CLIENT_LOCALITY);
+                                    }
+
+                                    fos = hdfs.create(tempCopyFile, FsCreateModes.applyUMask(FsPermission.getFileDefault(),
+                                            FsPermission.getUMask(hdfs.getConf())), cflags, bufferSize, replication, blockSize,
+                                            null, null);
+                                }
+
+                                if (codec != null) {
+                                    fos = codec.createOutputStream(fos);
+                                }
+                                createdFile = tempCopyFile;
+                                BufferedInputStream bis = new BufferedInputStream(in);
+                                StreamUtils.copy(bis, fos);
+                                bis = null;
+                                fos.flush();
+                            } finally {
+                                try {
+                                    if (fos != null) {
+                                        fos.close();
+                                    }
+                                } catch (Throwable t) {
+                                    // when talking to remote HDFS clusters, we don't notice problems until fos.close()
+                                    if (createdFile != null) {
+                                        try {
+                                            hdfs.delete(createdFile, false);
+                                        } catch (Throwable ignore) {
+                                        }
+                                    }
+                                    throw t;
+                                }
+                                fos = null;
+                            }
+                        }
+
+                    });
+                    stopWatch.stop();
+                    final String dataRate = stopWatch.calculateDataRate(putFlowFile.getSize());
+                    final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
+                    tempDotCopyFile = tempCopyFile;
+
+                    if (!conflictResponse.equals(APPEND_RESOLUTION)
+                            || (conflictResponse.equals(APPEND_RESOLUTION) && !destinationExists)) {
+                        boolean renamed = false;
+                        for (int i = 0; i < 10; i++) { // try to rename multiple times.
+                            if (hdfs.rename(tempCopyFile, copyFile)) {
+                                renamed = true;
+                                break;// rename was successful
+                            }
+                            Thread.sleep(200L);// try waiting to let whatever might cause rename failure to resolve
+                        }
+                        if (!renamed) {
+                            hdfs.delete(tempCopyFile, false);
+                            throw new ProcessException("Copied file to HDFS but could not rename dot file " + tempCopyFile
+                                    + " to its final filename");
+                        }
+
+                        changeOwner(context, hdfs, copyFile, flowFile);
+                    }
+
+                    getLogger().info("copied {} to HDFS at {} in {} milliseconds at a rate of {}",
+                            new Object[]{putFlowFile, copyFile, millis, dataRate});
+
+                    final String newFilename = copyFile.getName();
+                    final String hdfsPath = copyFile.getParent().toString();
+                    putFlowFile = session.putAttribute(putFlowFile, CoreAttributes.FILENAME.key(), newFilename);
+                    putFlowFile = session.putAttribute(putFlowFile, ABSOLUTE_HDFS_PATH_ATTRIBUTE, hdfsPath);
+                    final Path qualifiedPath = copyFile.makeQualified(hdfs.getUri(), hdfs.getWorkingDirectory());
+                    session.getProvenanceReporter().send(putFlowFile, qualifiedPath.toString());
+
+                    session.transfer(putFlowFile, getSuccessRelationship());
+
+                } catch (final IOException e) {
+                    Optional<GSSException> causeOptional = findCause(e, GSSException.class, gsse -> GSSException.NO_CRED == gsse.getMajor());
+                    if (causeOptional.isPresent()) {
+                        getLogger().warn("An error occurred while connecting to HDFS. "
+                                        + "Rolling back session, and penalizing flow file {}",
+                                new Object[] {putFlowFile.getAttribute(CoreAttributes.UUID.key()), causeOptional.get()});
+                        session.rollback(true);
+                    } else {
+                        getLogger().error("Failed to access HDFS due to {}", new Object[]{e});
+                        session.transfer(putFlowFile, getFailureRelationship());
+                    }
+                } catch (final Throwable t) {
+                    if (tempDotCopyFile != null) {
+                        try {
+                            hdfs.delete(tempDotCopyFile, false);
+                        } catch (Exception e) {
+                            getLogger().error("Unable to remove temporary file {} due to {}", new Object[]{tempDotCopyFile, e});
+                        }
+                    }
+                    getLogger().error("Failed to write to HDFS due to {}", new Object[]{t});
+                    session.transfer(session.penalize(putFlowFile), getFailureRelationship());
+                    context.yield();
+                }
+
+                return null;
+            }
+        });
+    }
+
+    /**
+     * Returns with the expected block size.
+     */
+    protected abstract long getBlockSize(final ProcessContext context, final ProcessSession session, final FlowFile flowFile);
+
+    /**
+     * Returns with the expected buffer size.
+     */
+    protected abstract int getBufferSize(final ProcessContext context, final ProcessSession session, final FlowFile flowFile);
+
+    /**
+     * Returns with the expected replication factor.
+     */
+    protected abstract short getReplication(final ProcessContext context, final ProcessSession session, final FlowFile flowFile);
+
+    /**
+     * Returns if file system should ignore locality.
+     */
+    protected abstract boolean shouldIgnoreLocality(final ProcessContext context, final ProcessSession session);
+
+    /**
+     * If returns a non-null value, the uploaded file's owner will be changed to this value after it is written. This only
+     * works if NiFi is running as a user that has privilege to change owner.
+     */
+    protected abstract String getOwner(final ProcessContext context, final FlowFile flowFile);
+
+    /**
+     * I returns a non-null value, thee uploaded file's group will be changed to this value after it is written. This only
+     * works if NiFi is running as a user that has privilege to change group.
+     */
+    protected abstract String getGroup(final ProcessContext context, final FlowFile flowFile);
+
+    /**
+     * @return The relationship the flow file will be transferred in case of successful execution.
+     */
+    protected abstract Relationship getSuccessRelationship();
+
+    /**
+     * @return The relationship the flow file will be transferred in case of failed execution.
+     */
+    protected abstract Relationship getFailureRelationship();
+
+    /**
+     * Returns an optional with the first throwable in the causal chain that is assignable to the provided cause type,
+     * and satisfies the provided cause predicate, {@link Optional#empty()} otherwise.
+     * @param t The throwable to inspect for the cause.
+     * @return
+     */
+    private <T extends Throwable> Optional<T> findCause(Throwable t, Class<T> expectedCauseType, Predicate<T> causePredicate) {
+        Stream<Throwable> causalChain = Throwables.getCausalChain(t).stream();
+        return causalChain
+                .filter(expectedCauseType::isInstance)
+                .map(expectedCauseType::cast)
+                .filter(causePredicate)
+                .findFirst();
+    }
+
+    protected void changeOwner(final ProcessContext context, final FileSystem hdfs, final Path name, final FlowFile flowFile) {
+        try {
+            // Change owner and group of file if configured to do so
+            final String owner = getOwner(context, flowFile);
+            final String group = getGroup(context, flowFile);
+
+            if (owner != null || group != null) {
+                hdfs.setOwner(name, owner, group);
+            }
+        } catch (Exception e) {
+            getLogger().warn("Could not change owner or group of {} on HDFS due to {}", new Object[]{name, e});
+        }
+    }
+}
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
index fbcd3b1..fef0805 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
@@ -17,13 +17,8 @@
 package org.apache.nifi.processors.hadoop;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CreateFlag;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsCreateModes;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.ReadsAttribute;
@@ -34,42 +29,22 @@
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyValue;
 import org.apache.nifi.components.RequiredPermission;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.stream.io.StreamUtils;
-import org.apache.nifi.util.StopWatch;
-import org.ietf.jgss.GSSException;
 
-import com.google.common.base.Throwables;
-
-import java.io.BufferedInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.security.PrivilegedAction;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Predicate;
-import java.util.stream.Stream;
 
 /**
  * This processor copies FlowFiles to HDFS.
@@ -88,26 +63,9 @@
         requiredPermission = RequiredPermission.WRITE_DISTRIBUTED_FILESYSTEM,
         explanation = "Provides operator the ability to delete any file that NiFi has access to in HDFS or the local filesystem.")
 })
-public class PutHDFS extends AbstractHadoopProcessor {
-
-    public static final String REPLACE_RESOLUTION = "replace";
-    public static final String IGNORE_RESOLUTION = "ignore";
-    public static final String FAIL_RESOLUTION = "fail";
-    public static final String APPEND_RESOLUTION = "append";
-
-    public static final AllowableValue REPLACE_RESOLUTION_AV = new AllowableValue(REPLACE_RESOLUTION,
-            REPLACE_RESOLUTION, "Replaces the existing file if any.");
-    public static final AllowableValue IGNORE_RESOLUTION_AV = new AllowableValue(IGNORE_RESOLUTION, IGNORE_RESOLUTION,
-            "Ignores the flow file and routes it to success.");
-    public static final AllowableValue FAIL_RESOLUTION_AV = new AllowableValue(FAIL_RESOLUTION, FAIL_RESOLUTION,
-            "Penalizes the flow file and routes it to failure.");
-    public static final AllowableValue APPEND_RESOLUTION_AV = new AllowableValue(APPEND_RESOLUTION, APPEND_RESOLUTION,
-            "Appends to the existing file if any, creates a new file otherwise.");
-
-    public static final String BUFFER_SIZE_KEY = "io.file.buffer.size";
-    public static final int BUFFER_SIZE_DEFAULT = 4096;
-
+public class PutHDFS extends AbstractPutHDFS {
     // relationships
+
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
             .name("success")
             .description("Files that have been successfully written to HDFS are transferred to this relationship")
@@ -115,20 +73,11 @@
 
     public static final Relationship REL_FAILURE = new Relationship.Builder()
             .name("failure")
-            .description(
-                    "Files that could not be written to HDFS for some reason are transferred to this relationship")
+            .description("Files that could not be written to HDFS for some reason are transferred to this relationship")
             .build();
 
     // properties
 
-    public static final PropertyDescriptor CONFLICT_RESOLUTION = new PropertyDescriptor.Builder()
-            .name("Conflict Resolution Strategy")
-            .description("Indicates what should happen when a file with the same name already exists in the output directory")
-            .required(true)
-.defaultValue(FAIL_RESOLUTION_AV.getValue())
-            .allowableValues(REPLACE_RESOLUTION_AV, IGNORE_RESOLUTION_AV, FAIL_RESOLUTION_AV, APPEND_RESOLUTION_AV)
-            .build();
-
     public static final PropertyDescriptor BLOCK_SIZE = new PropertyDescriptor.Builder()
             .name("Block Size")
             .description("Size of each block as written to HDFS. This overrides the Hadoop Configuration")
@@ -230,239 +179,52 @@
     }
 
     @Override
-    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
-        final FlowFile flowFile = session.get();
-        if (flowFile == null) {
-            return;
-        }
-
-        final FileSystem hdfs = getFileSystem();
-        final Configuration configuration = getConfiguration();
-        final UserGroupInformation ugi = getUserGroupInformation();
-
-        if (configuration == null || hdfs == null || ugi == null) {
-            getLogger().error("HDFS not configured properly");
-            session.transfer(flowFile, REL_FAILURE);
-            context.yield();
-            return;
-        }
-
-        ugi.doAs(new PrivilegedAction<Object>() {
-            @Override
-            public Object run() {
-                Path tempDotCopyFile = null;
-                FlowFile putFlowFile = flowFile;
-                try {
-                    final String dirValue = context.getProperty(DIRECTORY).evaluateAttributeExpressions(putFlowFile).getValue();
-                    final Path configuredRootDirPath = new Path(dirValue);
-
-                    final String conflictResponse = context.getProperty(CONFLICT_RESOLUTION).getValue();
-
-                    final Double blockSizeProp = context.getProperty(BLOCK_SIZE).asDataSize(DataUnit.B);
-                    final long blockSize = blockSizeProp != null ? blockSizeProp.longValue() : hdfs.getDefaultBlockSize(configuredRootDirPath);
-
-                    final Double bufferSizeProp = context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B);
-                    final int bufferSize = bufferSizeProp != null ? bufferSizeProp.intValue() : configuration.getInt(BUFFER_SIZE_KEY, BUFFER_SIZE_DEFAULT);
-
-                    final Integer replicationProp = context.getProperty(REPLICATION_FACTOR).asInteger();
-                    final short replication = replicationProp != null ? replicationProp.shortValue() : hdfs
-                            .getDefaultReplication(configuredRootDirPath);
-
-                    final CompressionCodec codec = getCompressionCodec(context, configuration);
-
-                    final String filename = codec != null
-                            ? putFlowFile.getAttribute(CoreAttributes.FILENAME.key()) + codec.getDefaultExtension()
-                            : putFlowFile.getAttribute(CoreAttributes.FILENAME.key());
-
-                    final Path tempCopyFile = new Path(configuredRootDirPath, "." + filename);
-                    final Path copyFile = new Path(configuredRootDirPath, filename);
-
-                    // Create destination directory if it does not exist
-                    try {
-                        if (!hdfs.getFileStatus(configuredRootDirPath).isDirectory()) {
-                            throw new IOException(configuredRootDirPath.toString() + " already exists and is not a directory");
-                        }
-                    } catch (FileNotFoundException fe) {
-                        if (!hdfs.mkdirs(configuredRootDirPath)) {
-                            throw new IOException(configuredRootDirPath.toString() + " could not be created");
-                        }
-                        changeOwner(context, hdfs, configuredRootDirPath, flowFile);
-                    }
-
-                    final boolean destinationExists = hdfs.exists(copyFile);
-
-                    // If destination file already exists, resolve that based on processor configuration
-                    if (destinationExists) {
-                        switch (conflictResponse) {
-                        case REPLACE_RESOLUTION:
-                                if (hdfs.delete(copyFile, false)) {
-                                    getLogger().info("deleted {} in order to replace with the contents of {}",
-                                            new Object[]{copyFile, putFlowFile});
-                                }
-                                break;
-                        case IGNORE_RESOLUTION:
-                                session.transfer(putFlowFile, REL_SUCCESS);
-                                getLogger().info("transferring {} to success because file with same name already exists",
-                                        new Object[]{putFlowFile});
-                                return null;
-                        case FAIL_RESOLUTION:
-                                session.transfer(session.penalize(putFlowFile), REL_FAILURE);
-                                getLogger().warn("penalizing {} and routing to failure because file with same name already exists",
-                                        new Object[]{putFlowFile});
-                                return null;
-                            default:
-                                break;
-                        }
-                    }
-
-                    // Write FlowFile to temp file on HDFS
-                    final StopWatch stopWatch = new StopWatch(true);
-                    session.read(putFlowFile, new InputStreamCallback() {
-
-                        @Override
-                        public void process(InputStream in) throws IOException {
-                            OutputStream fos = null;
-                            Path createdFile = null;
-                            try {
-                                if (conflictResponse.equals(APPEND_RESOLUTION_AV.getValue()) && destinationExists) {
-                                    fos = hdfs.append(copyFile, bufferSize);
-                                } else {
-                                  final EnumSet<CreateFlag> cflags = EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE);
-
-                                  final Boolean ignoreLocality = context.getProperty(IGNORE_LOCALITY).asBoolean();
-                                  if (ignoreLocality) {
-                                    cflags.add(CreateFlag.IGNORE_CLIENT_LOCALITY);
-                                  }
-
-                                  fos = hdfs.create(tempCopyFile, FsCreateModes.applyUMask(FsPermission.getFileDefault(),
-                                      FsPermission.getUMask(hdfs.getConf())), cflags, bufferSize, replication, blockSize,
-                                      null, null);
-                                }
-
-                                if (codec != null) {
-                                    fos = codec.createOutputStream(fos);
-                                }
-                                createdFile = tempCopyFile;
-                                BufferedInputStream bis = new BufferedInputStream(in);
-                                StreamUtils.copy(bis, fos);
-                                bis = null;
-                                fos.flush();
-                            } finally {
-                                try {
-                                    if (fos != null) {
-                                        fos.close();
-                                    }
-                                } catch (Throwable t) {
-                                    // when talking to remote HDFS clusters, we don't notice problems until fos.close()
-                                    if (createdFile != null) {
-                                        try {
-                                            hdfs.delete(createdFile, false);
-                                        } catch (Throwable ignore) {
-                                        }
-                                    }
-                                    throw t;
-                                }
-                                fos = null;
-                            }
-                        }
-
-                    });
-                    stopWatch.stop();
-                    final String dataRate = stopWatch.calculateDataRate(putFlowFile.getSize());
-                    final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
-                    tempDotCopyFile = tempCopyFile;
-
-                    if (!conflictResponse.equals(APPEND_RESOLUTION_AV.getValue())
-                            || (conflictResponse.equals(APPEND_RESOLUTION_AV.getValue()) && !destinationExists)) {
-                        boolean renamed = false;
-                        for (int i = 0; i < 10; i++) { // try to rename multiple times.
-                            if (hdfs.rename(tempCopyFile, copyFile)) {
-                                renamed = true;
-                                break;// rename was successful
-                            }
-                            Thread.sleep(200L);// try waiting to let whatever might cause rename failure to resolve
-                        }
-                        if (!renamed) {
-                            hdfs.delete(tempCopyFile, false);
-                            throw new ProcessException("Copied file to HDFS but could not rename dot file " + tempCopyFile
-                                    + " to its final filename");
-                        }
-
-                        changeOwner(context, hdfs, copyFile, flowFile);
-                    }
-
-                    getLogger().info("copied {} to HDFS at {} in {} milliseconds at a rate of {}",
-                            new Object[]{putFlowFile, copyFile, millis, dataRate});
-
-                    final String newFilename = copyFile.getName();
-                    final String hdfsPath = copyFile.getParent().toString();
-                    putFlowFile = session.putAttribute(putFlowFile, CoreAttributes.FILENAME.key(), newFilename);
-                    putFlowFile = session.putAttribute(putFlowFile, ABSOLUTE_HDFS_PATH_ATTRIBUTE, hdfsPath);
-                    final Path qualifiedPath = copyFile.makeQualified(hdfs.getUri(), hdfs.getWorkingDirectory());
-                    session.getProvenanceReporter().send(putFlowFile, qualifiedPath.toString());
-
-                    session.transfer(putFlowFile, REL_SUCCESS);
-
-                } catch (final IOException e) {
-                  Optional<GSSException> causeOptional = findCause(e, GSSException.class, gsse -> GSSException.NO_CRED == gsse.getMajor());
-                  if (causeOptional.isPresent()) {
-                    getLogger().warn("An error occurred while connecting to HDFS. "
-                        + "Rolling back session, and penalizing flow file {}",
-                         new Object[] {putFlowFile.getAttribute(CoreAttributes.UUID.key()), causeOptional.get()});
-                    session.rollback(true);
-                  } else {
-                    getLogger().error("Failed to access HDFS due to {}", new Object[]{e});
-                    session.transfer(putFlowFile, REL_FAILURE);
-                  }
-                } catch (final Throwable t) {
-                    if (tempDotCopyFile != null) {
-                        try {
-                            hdfs.delete(tempDotCopyFile, false);
-                        } catch (Exception e) {
-                            getLogger().error("Unable to remove temporary file {} due to {}", new Object[]{tempDotCopyFile, e});
-                        }
-                    }
-                    getLogger().error("Failed to write to HDFS due to {}", new Object[]{t});
-                    session.transfer(session.penalize(putFlowFile), REL_FAILURE);
-                    context.yield();
-                }
-
-                return null;
-            }
-        });
+    protected Relationship getSuccessRelationship() {
+        return REL_SUCCESS;
     }
 
-
-    /**
-     * Returns an optional with the first throwable in the causal chain that is assignable to the provided cause type,
-     * and satisfies the provided cause predicate, {@link Optional#empty()} otherwise.
-     * @param t The throwable to inspect for the cause.
-     * @return
-     */
-    private <T extends Throwable> Optional<T> findCause(Throwable t, Class<T> expectedCauseType, Predicate<T> causePredicate) {
-       Stream<Throwable> causalChain = Throwables.getCausalChain(t).stream();
-       return causalChain
-               .filter(expectedCauseType::isInstance)
-               .map(expectedCauseType::cast)
-               .filter(causePredicate)
-               .findFirst();
+    @Override
+    protected Relationship getFailureRelationship() {
+        return REL_FAILURE;
     }
 
-    protected void changeOwner(final ProcessContext context, final FileSystem hdfs, final Path name, final FlowFile flowFile) {
-        try {
-            // Change owner and group of file if configured to do so
-            String owner = context.getProperty(REMOTE_OWNER).evaluateAttributeExpressions(flowFile).getValue();
-            String group = context.getProperty(REMOTE_GROUP).evaluateAttributeExpressions(flowFile).getValue();
-
-            owner = owner == null || owner.isEmpty() ? null : owner;
-            group = group == null || group.isEmpty() ? null : group;
-
-            if (owner != null || group != null) {
-                hdfs.setOwner(name, owner, group);
-            }
-        } catch (Exception e) {
-            getLogger().warn("Could not change owner or group of {} on HDFS due to {}", new Object[]{name, e});
-        }
+    @Override
+    protected long getBlockSize(final ProcessContext context, final ProcessSession session, final FlowFile flowFile) {
+        final String dirValue = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
+        final Path configuredRootDirPath = new Path(dirValue);
+        final Double blockSizeProp = context.getProperty(BLOCK_SIZE).asDataSize(DataUnit.B);
+        return blockSizeProp != null ? blockSizeProp.longValue() : getFileSystem().getDefaultBlockSize(configuredRootDirPath);
     }
 
+    @Override
+    protected int getBufferSize(final ProcessContext context, final ProcessSession session, final FlowFile flowFile) {
+        final Double bufferSizeProp = context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B);
+        return bufferSizeProp != null ? bufferSizeProp.intValue() : getConfiguration().getInt(BUFFER_SIZE_KEY, BUFFER_SIZE_DEFAULT);
+    }
+
+    @Override
+    protected short getReplication(final ProcessContext context, final ProcessSession session, final FlowFile flowFile) {
+        final String dirValue = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
+        final Path configuredRootDirPath = new Path(dirValue);
+        final Integer replicationProp = context.getProperty(REPLICATION_FACTOR).asInteger();
+        return replicationProp != null ? replicationProp.shortValue() : getFileSystem()
+                .getDefaultReplication(configuredRootDirPath);
+    }
+
+    @Override
+    protected boolean shouldIgnoreLocality(final ProcessContext context, final ProcessSession session) {
+        return context.getProperty(IGNORE_LOCALITY).asBoolean();
+    }
+
+    @Override
+    protected String getOwner(final ProcessContext context, final FlowFile flowFile) {
+        final String owner = context.getProperty(REMOTE_OWNER).evaluateAttributeExpressions(flowFile).getValue();
+        return owner == null || owner.isEmpty() ? null : owner;
+    }
+
+    @Override
+    protected String getGroup(final ProcessContext context, final FlowFile flowFile) {
+        final String group = context.getProperty(REMOTE_GROUP).evaluateAttributeExpressions(flowFile).getValue();
+        return group == null || group.isEmpty() ? null : group;
+    }
 }