NIFI-8787: Wrapped hdfs.exists() call in UGI.doAs() in GetHDFS processor (#5217)

* NIFI-8787: Wrapped hdfs.exists() call in UGI.doAs() in GetHDFS processor

* NIFI-8787: Unit tests updated
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
index 3f08da0..f1e8661 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
@@ -429,7 +429,8 @@
                 final FileSystem hdfs = getFileSystem();
                 final Path directoryPath = getNormalizedPath(context, DIRECTORY);
 
-                if (!hdfs.exists(directoryPath)) {
+                final boolean directoryExists = getUserGroupInformation().doAs((PrivilegedExceptionAction<Boolean>) () -> hdfs.exists(directoryPath));
+                if (!directoryExists) {
                     context.yield();
                     getLogger().warn("The directory {} does not exist.", new Object[]{directoryPath});
                 } else {
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java
index fcba96e..91781a1 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java
@@ -17,7 +17,10 @@
 package org.apache.nifi.processors.hadoop;
 
 import org.apache.commons.lang3.SystemUtils;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.hadoop.KerberosProperties;
@@ -34,17 +37,24 @@
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
+import java.security.PrivilegedExceptionAction;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
 public class GetHDFSTest {
@@ -265,6 +275,62 @@
         runner.assertNotValid();
     }
 
+    @Test
+    public void testDirectoryCheckWrappedInUGICallWhenDirectoryExists() throws IOException, InterruptedException {
+        // GIVEN, WHEN
+        boolean directoryExists = true;
+
+        // THEN
+        directoryExistsWrappedInUGICall(directoryExists);
+    }
+
+    @Test
+    public void testDirectoryCheckWrappedInUGICallWhenDirectoryDoesNotExist() throws IOException, InterruptedException {
+        // GIVEN, WHEN
+        boolean directoryExists = false;
+
+        // THEN
+        directoryExistsWrappedInUGICall(directoryExists);
+    }
+
+    private void directoryExistsWrappedInUGICall(boolean directoryExists) throws IOException, InterruptedException {
+        // GIVEN
+        FileSystem mockFileSystem = mock(FileSystem.class);
+        UserGroupInformation mockUserGroupInformation = mock(UserGroupInformation.class);
+
+        GetHDFS testSubject = new TestableGetHDFSForUGI(kerberosProperties, mockFileSystem, mockUserGroupInformation);
+        TestRunner runner = TestRunners.newTestRunner(testSubject);
+        runner.setProperty(GetHDFS.DIRECTORY, "src/test/resources/testdata");
+
+        // WHEN
+        Answer<?> answer = new Answer<Object>() {
+            private int callCounter = 0;
+            @Override
+            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+                final Object result;
+                if (callCounter == 0) {
+                    when(mockFileSystem.exists(any(Path.class))).thenReturn(directoryExists);
+                    result = ((PrivilegedExceptionAction) invocationOnMock.getArgument(0)).run();
+                    verify(mockUserGroupInformation, times(callCounter + 1)).doAs(any(PrivilegedExceptionAction.class));
+                    verify(mockFileSystem).exists(any(Path.class));
+                } else {
+                    when(mockFileSystem.listStatus(any(Path.class))).thenReturn(new FileStatus[0]);
+                    result = ((PrivilegedExceptionAction) invocationOnMock.getArgument(0)).run();
+                    verify(mockUserGroupInformation, times(callCounter + 1)).doAs(any(PrivilegedExceptionAction.class));
+                    verify(mockFileSystem).listStatus(any(Path.class));
+                }
+                ++callCounter;
+                return result;
+            }
+        };
+        when(mockUserGroupInformation.doAs(any(PrivilegedExceptionAction.class))).thenAnswer(answer);
+        runner.run();
+
+        // THEN
+        verify(mockFileSystem).getUri();
+        verifyNoMoreInteractions(mockFileSystem, mockUserGroupInformation);
+    }
+
     private static class TestableGetHDFS extends GetHDFS {
 
         private final KerberosProperties testKerberosProperties;
@@ -279,4 +345,24 @@
         }
     }
 
+    private static class TestableGetHDFSForUGI extends TestableGetHDFS {
+        private FileSystem mockFileSystem;
+        private UserGroupInformation mockUserGroupInformation;
+
+        public TestableGetHDFSForUGI(KerberosProperties testKerberosProperties, FileSystem mockFileSystem, UserGroupInformation mockUserGroupInformation) {
+            super(testKerberosProperties);
+            this.mockFileSystem = mockFileSystem;
+            this.mockUserGroupInformation = mockUserGroupInformation;
+        }
+
+        @Override
+        protected FileSystem getFileSystem() {
+            return mockFileSystem;
+        }
+
+        @Override
+        protected UserGroupInformation getUserGroupInformation() {
+            return mockUserGroupInformation;
+        }
+    }
 }