NIFI-4542 - add target.dir.created to indicate if the target directory created

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #5397.
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
index fd8dd4d..7791eef 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
@@ -153,6 +153,8 @@
 
     public static final String ABSOLUTE_HDFS_PATH_ATTRIBUTE = "absolute.hdfs.path";
 
+    protected static final String TARGET_HDFS_DIR_CREATED_ATTRIBUTE = "target.dir.created";
+
     private static final Object RESOURCES_LOCK = new Object();
     private static final HdfsResources EMPTY_HDFS_RESOURCES = new HdfsResources(null, null, null, null);
 
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
index 29327ec..0a32930 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
@@ -79,7 +79,8 @@
 @ReadsAttribute(attribute = "filename", description = "The name of the file written to HDFS comes from the value of this attribute.")
 @WritesAttributes({
         @WritesAttribute(attribute = "filename", description = "The name of the file written to HDFS is stored in this attribute."),
-        @WritesAttribute(attribute = "absolute.hdfs.path", description = "The absolute path to the file on HDFS is stored in this attribute.")
+        @WritesAttribute(attribute = "absolute.hdfs.path", description = "The absolute path to the file on HDFS is stored in this attribute."),
+        @WritesAttribute(attribute = "target.dir.created", description = "The result(true/false) indicates if the folder is created by the processor.")
 })
 @SeeAlso(GetHDFS.class)
 @Restricted(restrictions = {
@@ -269,12 +270,14 @@
                     final Path copyFile = new Path(dirPath, filename);
 
                     // Create destination directory if it does not exist
+                    boolean targetDirCreated = false;
                     try {
                         if (!hdfs.getFileStatus(dirPath).isDirectory()) {
                             throw new IOException(dirPath.toString() + " already exists and is not a directory");
                         }
                     } catch (FileNotFoundException fe) {
-                        if (!hdfs.mkdirs(dirPath)) {
+                        targetDirCreated = hdfs.mkdirs(dirPath);
+                        if (!targetDirCreated) {
                             throw new IOException(dirPath.toString() + " could not be created");
                         }
                         changeOwner(context, hdfs, dirPath, flowFile);
@@ -388,6 +391,7 @@
                     final String hdfsPath = copyFile.getParent().toString();
                     putFlowFile = session.putAttribute(putFlowFile, CoreAttributes.FILENAME.key(), newFilename);
                     putFlowFile = session.putAttribute(putFlowFile, ABSOLUTE_HDFS_PATH_ATTRIBUTE, hdfsPath);
+                    putFlowFile = session.putAttribute(putFlowFile, TARGET_HDFS_DIR_CREATED_ATTRIBUTE, String.valueOf(targetDirCreated));
                     final Path qualifiedPath = copyFile.makeQualified(hdfs.getUri(), hdfs.getWorkingDirectory());
                     session.getProvenanceReporter().send(putFlowFile, qualifiedPath.toString());
 
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
index 9413018..945e24e 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
@@ -210,6 +210,42 @@
         assertTrue(mockFileSystem.exists(new Path("target/test-classes/randombytes-1")));
         assertEquals("randombytes-1", flowFile.getAttribute(CoreAttributes.FILENAME.key()));
         assertEquals("target/test-classes", flowFile.getAttribute(PutHDFS.ABSOLUTE_HDFS_PATH_ATTRIBUTE));
+        assertEquals("true", flowFile.getAttribute(PutHDFS.TARGET_HDFS_DIR_CREATED_ATTRIBUTE));
+
+        final List<ProvenanceEventRecord> provenanceEvents = runner.getProvenanceEvents();
+        assertEquals(1, provenanceEvents.size());
+        final ProvenanceEventRecord sendEvent = provenanceEvents.get(0);
+        assertEquals(ProvenanceEventType.SEND, sendEvent.getEventType());
+        // If it runs with a real HDFS, the protocol will be "hdfs://", but with a local filesystem, just assert the filename.
+        assertTrue(sendEvent.getTransitUri().endsWith("target/test-classes/randombytes-1"));
+    }
+
+    @Test
+    public void testPutFileWhenTargetDirExists() throws IOException {
+        String targetDir = "target/test-classes";
+        PutHDFS proc = new TestablePutHDFS(kerberosProperties, mockFileSystem);
+        proc.getFileSystem().mkdirs(new Path(targetDir));
+        TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(PutHDFS.DIRECTORY, targetDir);
+        runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace");
+        try (FileInputStream fis = new FileInputStream("src/test/resources/testdata/randombytes-1")) {
+            Map<String, String> attributes = new HashMap<>();
+            attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1");
+            runner.enqueue(fis, attributes);
+            runner.run();
+        }
+
+        List<MockFlowFile> failedFlowFiles = runner
+                .getFlowFilesForRelationship(new Relationship.Builder().name("failure").build());
+        assertTrue(failedFlowFiles.isEmpty());
+
+        List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutHDFS.REL_SUCCESS);
+        assertEquals(1, flowFiles.size());
+        MockFlowFile flowFile = flowFiles.get(0);
+        assertTrue(mockFileSystem.exists(new Path("target/test-classes/randombytes-1")));
+        assertEquals("randombytes-1", flowFile.getAttribute(CoreAttributes.FILENAME.key()));
+        assertEquals("target/test-classes", flowFile.getAttribute(PutHDFS.ABSOLUTE_HDFS_PATH_ATTRIBUTE));
+        assertEquals("false", flowFile.getAttribute(PutHDFS.TARGET_HDFS_DIR_CREATED_ATTRIBUTE));
 
         final List<ProvenanceEventRecord> provenanceEvents = runner.getProvenanceEvents();
         assertEquals(1, provenanceEvents.size());