NIFI-4542 - add target.dir.created to indicate if the target directory created
Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>
This closes #5397.
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
index fd8dd4d..7791eef 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
@@ -153,6 +153,8 @@
public static final String ABSOLUTE_HDFS_PATH_ATTRIBUTE = "absolute.hdfs.path";
+ protected static final String TARGET_HDFS_DIR_CREATED_ATTRIBUTE = "target.dir.created";
+
private static final Object RESOURCES_LOCK = new Object();
private static final HdfsResources EMPTY_HDFS_RESOURCES = new HdfsResources(null, null, null, null);
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
index 29327ec..0a32930 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
@@ -79,7 +79,8 @@
@ReadsAttribute(attribute = "filename", description = "The name of the file written to HDFS comes from the value of this attribute.")
@WritesAttributes({
@WritesAttribute(attribute = "filename", description = "The name of the file written to HDFS is stored in this attribute."),
- @WritesAttribute(attribute = "absolute.hdfs.path", description = "The absolute path to the file on HDFS is stored in this attribute.")
+ @WritesAttribute(attribute = "absolute.hdfs.path", description = "The absolute path to the file on HDFS is stored in this attribute."),
+ @WritesAttribute(attribute = "target.dir.created", description = "The result(true/false) indicates if the folder is created by the processor.")
})
@SeeAlso(GetHDFS.class)
@Restricted(restrictions = {
@@ -269,12 +270,14 @@
final Path copyFile = new Path(dirPath, filename);
// Create destination directory if it does not exist
+ boolean targetDirCreated = false;
try {
if (!hdfs.getFileStatus(dirPath).isDirectory()) {
throw new IOException(dirPath.toString() + " already exists and is not a directory");
}
} catch (FileNotFoundException fe) {
- if (!hdfs.mkdirs(dirPath)) {
+ targetDirCreated = hdfs.mkdirs(dirPath);
+ if (!targetDirCreated) {
throw new IOException(dirPath.toString() + " could not be created");
}
changeOwner(context, hdfs, dirPath, flowFile);
@@ -388,6 +391,7 @@
final String hdfsPath = copyFile.getParent().toString();
putFlowFile = session.putAttribute(putFlowFile, CoreAttributes.FILENAME.key(), newFilename);
putFlowFile = session.putAttribute(putFlowFile, ABSOLUTE_HDFS_PATH_ATTRIBUTE, hdfsPath);
+ putFlowFile = session.putAttribute(putFlowFile, TARGET_HDFS_DIR_CREATED_ATTRIBUTE, String.valueOf(targetDirCreated));
final Path qualifiedPath = copyFile.makeQualified(hdfs.getUri(), hdfs.getWorkingDirectory());
session.getProvenanceReporter().send(putFlowFile, qualifiedPath.toString());
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
index 9413018..945e24e 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
@@ -210,6 +210,42 @@
assertTrue(mockFileSystem.exists(new Path("target/test-classes/randombytes-1")));
assertEquals("randombytes-1", flowFile.getAttribute(CoreAttributes.FILENAME.key()));
assertEquals("target/test-classes", flowFile.getAttribute(PutHDFS.ABSOLUTE_HDFS_PATH_ATTRIBUTE));
+ assertEquals("true", flowFile.getAttribute(PutHDFS.TARGET_HDFS_DIR_CREATED_ATTRIBUTE));
+
+ final List<ProvenanceEventRecord> provenanceEvents = runner.getProvenanceEvents();
+ assertEquals(1, provenanceEvents.size());
+ final ProvenanceEventRecord sendEvent = provenanceEvents.get(0);
+ assertEquals(ProvenanceEventType.SEND, sendEvent.getEventType());
+ // If it runs with a real HDFS, the protocol will be "hdfs://", but with a local filesystem, just assert the filename.
+ assertTrue(sendEvent.getTransitUri().endsWith("target/test-classes/randombytes-1"));
+ }
+
+ @Test
+ public void testPutFileWhenTargetDirExists() throws IOException {
+ String targetDir = "target/test-classes";
+ PutHDFS proc = new TestablePutHDFS(kerberosProperties, mockFileSystem);
+ proc.getFileSystem().mkdirs(new Path(targetDir));
+ TestRunner runner = TestRunners.newTestRunner(proc);
+ runner.setProperty(PutHDFS.DIRECTORY, targetDir);
+ runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace");
+ try (FileInputStream fis = new FileInputStream("src/test/resources/testdata/randombytes-1")) {
+ Map<String, String> attributes = new HashMap<>();
+ attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1");
+ runner.enqueue(fis, attributes);
+ runner.run();
+ }
+
+ List<MockFlowFile> failedFlowFiles = runner
+ .getFlowFilesForRelationship(new Relationship.Builder().name("failure").build());
+ assertTrue(failedFlowFiles.isEmpty());
+
+ List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutHDFS.REL_SUCCESS);
+ assertEquals(1, flowFiles.size());
+ MockFlowFile flowFile = flowFiles.get(0);
+ assertTrue(mockFileSystem.exists(new Path("target/test-classes/randombytes-1")));
+ assertEquals("randombytes-1", flowFile.getAttribute(CoreAttributes.FILENAME.key()));
+ assertEquals("target/test-classes", flowFile.getAttribute(PutHDFS.ABSOLUTE_HDFS_PATH_ATTRIBUTE));
+ assertEquals("false", flowFile.getAttribute(PutHDFS.TARGET_HDFS_DIR_CREATED_ATTRIBUTE));
final List<ProvenanceEventRecord> provenanceEvents = runner.getProvenanceEvents();
assertEquals(1, provenanceEvents.size());