NIFI-8787: Wrapped hdfs.exists() call in UGI.doAs() in GetHDFS processor (#5217)
* NIFI-8787: Wrapped hdfs.exists() call in UGI.doAs() in GetHDFS processor
* NIFI-8787: Unit tests updated
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
index 3f08da0..f1e8661 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
@@ -429,7 +429,8 @@
final FileSystem hdfs = getFileSystem();
final Path directoryPath = getNormalizedPath(context, DIRECTORY);
- if (!hdfs.exists(directoryPath)) {
+ final boolean directoryExists = getUserGroupInformation().doAs((PrivilegedExceptionAction<Boolean>) () -> hdfs.exists(directoryPath));
+ if (!directoryExists) {
context.yield();
getLogger().warn("The directory {} does not exist.", new Object[]{directoryPath});
} else {
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java
index fcba96e..91781a1 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java
@@ -17,7 +17,10 @@
package org.apache.nifi.processors.hadoop;
import org.apache.commons.lang3.SystemUtils;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.hadoop.KerberosProperties;
@@ -34,17 +37,24 @@
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
+import java.security.PrivilegedExceptionAction;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
public class GetHDFSTest {
@@ -265,6 +275,62 @@
runner.assertNotValid();
}
+ @Test
+ public void testDirectoryCheckWrappedInUGICallWhenDirectoryExists() throws IOException, InterruptedException {
+ // GIVEN, WHEN
+ boolean directoryExists = true;
+
+ // THEN
+ directoryExistsWrappedInUGICall(directoryExists);
+ }
+
+ @Test
+ public void testDirectoryCheckWrappedInUGICallWhenDirectoryDoesNotExist() throws IOException, InterruptedException {
+ // GIVEN, WHEN
+ boolean directoryExists = false;
+
+ // THEN
+ directoryExistsWrappedInUGICall(directoryExists);
+ }
+
+ private void directoryExistsWrappedInUGICall(boolean directoryExists) throws IOException, InterruptedException {
+ // GIVEN
+ FileSystem mockFileSystem = mock(FileSystem.class);
+ UserGroupInformation mockUserGroupInformation = mock(UserGroupInformation.class);
+
+ GetHDFS testSubject = new TestableGetHDFSForUGI(kerberosProperties, mockFileSystem, mockUserGroupInformation);
+ TestRunner runner = TestRunners.newTestRunner(testSubject);
+ runner.setProperty(GetHDFS.DIRECTORY, "src/test/resources/testdata");
+
+ // WHEN
+ Answer<?> answer = new Answer<Object>() {
+ private int callCounter = 0;
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ final Object result;
+ if (callCounter == 0) {
+ when(mockFileSystem.exists(any(Path.class))).thenReturn(directoryExists);
+ result = ((PrivilegedExceptionAction) invocationOnMock.getArgument(0)).run();
+ verify(mockUserGroupInformation, times(callCounter + 1)).doAs(any(PrivilegedExceptionAction.class));
+ verify(mockFileSystem).exists(any(Path.class));
+ } else {
+ when(mockFileSystem.listStatus(any(Path.class))).thenReturn(new FileStatus[0]);
+ result = ((PrivilegedExceptionAction) invocationOnMock.getArgument(0)).run();
+ verify(mockUserGroupInformation, times(callCounter + 1)).doAs(any(PrivilegedExceptionAction.class));
+ verify(mockFileSystem).listStatus(any(Path.class));
+ }
+ ++callCounter;
+ return result;
+ }
+ };
+ when(mockUserGroupInformation.doAs(any(PrivilegedExceptionAction.class))).thenAnswer(answer);
+ runner.run();
+
+ // THEN
+ verify(mockFileSystem).getUri();
+ verifyNoMoreInteractions(mockFileSystem, mockUserGroupInformation);
+ }
+
private static class TestableGetHDFS extends GetHDFS {
private final KerberosProperties testKerberosProperties;
@@ -279,4 +345,24 @@
}
}
+ private static class TestableGetHDFSForUGI extends TestableGetHDFS {
+ private FileSystem mockFileSystem;
+ private UserGroupInformation mockUserGroupInformation;
+
+ public TestableGetHDFSForUGI(KerberosProperties testKerberosProperties, FileSystem mockFileSystem, UserGroupInformation mockUserGroupInformation) {
+ super(testKerberosProperties);
+ this.mockFileSystem = mockFileSystem;
+ this.mockUserGroupInformation = mockUserGroupInformation;
+ }
+
+ @Override
+ protected FileSystem getFileSystem() {
+ return mockFileSystem;
+ }
+
+ @Override
+ protected UserGroupInformation getUserGroupInformation() {
+ return mockUserGroupInformation;
+ }
+ }
}