NIFI-9235 - Log conflicts between umask and ACL in PutHDFS
This closes #5409
Signed-off-by: David Handermann <exceptionfactory@apache.org>
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
index 3d8911d..cff199a 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
@@ -79,6 +79,11 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>com.github.ben-manes.caffeine</groupId>
+ <artifactId>caffeine</artifactId>
+ <version>2.9.2</version>
+ </dependency>
+ <dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.10.0</version>
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
index 0a32930..462033d 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
@@ -16,11 +16,15 @@
*/
package org.apache.nifi.processors.hadoop;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.base.Throwables;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.AclEntryScope;
+import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsCreateModes;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.compress.CompressionCodec;
@@ -35,6 +39,8 @@
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
@@ -58,7 +64,9 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.io.UncheckedIOException;
import java.security.PrivilegedAction;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
@@ -93,6 +101,10 @@
protected static final String BUFFER_SIZE_KEY = "io.file.buffer.size";
protected static final int BUFFER_SIZE_DEFAULT = 4096;
+ // state
+
+ private Cache<Path, AclStatus> aclCache;
+
// relationships
public static final Relationship REL_SUCCESS = new Relationship.Builder()
@@ -152,7 +164,8 @@
.description(
"A umask represented as an octal number which determines the permissions of files written to HDFS. " +
"This overrides the Hadoop property \"fs.permissions.umask-mode\". " +
- "If this property and \"fs.permissions.umask-mode\" are undefined, the Hadoop default \"022\" will be used.")
+ "If this property and \"fs.permissions.umask-mode\" are undefined, the Hadoop default \"022\" will be used. "+
+ "If the PutHDFS target folder has a default ACL defined, the umask property is ignored by HDFS.")
.addValidator(HadoopValidators.UMASK_VALIDATOR)
.build();
@@ -229,6 +242,19 @@
FsPermission.setUMask(config, new FsPermission(dfsUmask));
}
+ @OnScheduled
+ public void onScheduled(final ProcessContext context) {
+ aclCache = Caffeine.newBuilder()
+ .maximumSize(20L)
+ .expireAfterWrite(Duration.ofHours(1))
+ .build();
+ }
+
+ @OnStopped
+ public void onStopped() {
+ aclCache.invalidateAll();
+ }
+
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
final FlowFile flowFile = session.get();
@@ -254,7 +280,7 @@
FlowFile putFlowFile = flowFile;
try {
final Path dirPath = getNormalizedPath(context, DIRECTORY, putFlowFile);
-
+ checkAclStatus(getAclStatus(dirPath));
final String conflictResponse = context.getProperty(CONFLICT_RESOLUTION).getValue();
final long blockSize = getBlockSize(context, session, putFlowFile, dirPath);
final int bufferSize = getBufferSize(context, session, putFlowFile);
@@ -423,6 +449,25 @@
return null;
}
+
+ private void checkAclStatus(final AclStatus aclStatus) throws IOException {
+ final boolean isDefaultACL = aclStatus.getEntries().stream().anyMatch(
+ aclEntry -> AclEntryScope.DEFAULT.equals(aclEntry.getScope()));
+ final boolean isSetUmask = context.getProperty(UMASK).isSet();
+ if (isDefaultACL && isSetUmask) {
+ throw new IOException("PutHDFS umask setting is ignored by HDFS when HDFS default ACL is set.");
+ }
+ }
+
+ private AclStatus getAclStatus(final Path dirPath) {
+ return aclCache.get(dirPath, fn -> {
+ try {
+ return hdfs.getAclStatus(dirPath);
+ } catch (IOException e) {
+ throw new UncheckedIOException(String.format("Unable to query ACL for directory [%s]", dirPath), e);
+ }
+ });
+ }
});
}
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
index 945e24e..abe288d 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
@@ -22,6 +22,8 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.util.Progressable;
@@ -43,6 +45,7 @@
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mockito;
import javax.security.sasl.SaslException;
import java.io.ByteArrayOutputStream;
@@ -51,6 +54,8 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
@@ -61,6 +66,9 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
public class PutHDFSTest {
@@ -468,6 +476,89 @@
fileSystem.getFileStatus(new Path("target/test-classes/randombytes-1")).getPermission());
}
+ /**
+ * Multiple invocations of PutHDFS on the same target directory should query the remote filesystem ACL once, and
+ * use the cached ACL afterwards.
+ */
+ @Test
+ public void testPutHDFSAclCache() {
+ final MockFileSystem fileSystem = Mockito.spy(new MockFileSystem());
+ final Path directory = new Path("/withACL");
+ assertTrue(fileSystem.mkdirs(directory));
+ final String acl = "user::rwx,group::rwx,other::rwx";
+ final String aclDefault = "default:user::rwx,default:group::rwx,default:other::rwx";
+ fileSystem.setAcl(directory, AclEntry.parseAclSpec(String.join(",", acl, aclDefault), true));
+
+ final PutHDFS processor = new TestablePutHDFS(kerberosProperties, fileSystem);
+ final TestRunner runner = TestRunners.newTestRunner(processor);
+ runner.setProperty(PutHDFS.DIRECTORY, directory.toString());
+ runner.setProperty(PutHDFS.UMASK, "077");
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put(CoreAttributes.FILENAME.key(), "empty");
+ runner.enqueue(new byte[16], attributes);
+ runner.run(3); // fetch data once; hit AclCache twice
+ verify(fileSystem, times(1)).getAclStatus(any(Path.class));
+ }
+
+ /**
+ * When no default ACL is present on the remote directory, usage of {@link PutHDFS#UMASK}
+ * should be ok.
+ */
+ @Test
+ public void testPutFileWithNoDefaultACL() {
+ final List<Boolean> setUmask = Arrays.asList(false, true);
+ for (boolean setUmaskIt : setUmask) {
+ final MockFileSystem fileSystem = new MockFileSystem();
+ final Path directory = new Path("/withNoDACL");
+ assertTrue(fileSystem.mkdirs(directory));
+ final String acl = "user::rwx,group::rwx,other::rwx";
+ fileSystem.setAcl(directory, AclEntry.parseAclSpec(acl, true));
+
+ final PutHDFS processor = new TestablePutHDFS(kerberosProperties, fileSystem);
+ final TestRunner runner = TestRunners.newTestRunner(processor);
+ runner.setProperty(PutHDFS.DIRECTORY, directory.toString());
+ if (setUmaskIt) {
+ runner.setProperty(PutHDFS.UMASK, "077");
+ }
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put(CoreAttributes.FILENAME.key(), "empty");
+ runner.enqueue(new byte[16], attributes);
+ runner.run();
+ assertEquals(1, runner.getFlowFilesForRelationship(PutHDFS.REL_SUCCESS).size());
+ assertEquals(0, runner.getFlowFilesForRelationship(PutHDFS.REL_FAILURE).size());
+ }
+ }
+
+ /**
+ * When default ACL is present on the remote directory, usage of {@link PutHDFS#UMASK}
+ * should trigger failure of the flow file.
+ */
+ @Test
+ public void testPutFileWithDefaultACL() {
+ final List<Boolean> setUmask = Arrays.asList(false, true);
+ for (boolean setUmaskIt : setUmask) {
+ final MockFileSystem fileSystem = new MockFileSystem();
+ final Path directory = new Path("/withACL");
+ assertTrue(fileSystem.mkdirs(directory));
+ final String acl = "user::rwx,group::rwx,other::rwx";
+ final String aclDefault = "default:user::rwx,default:group::rwx,default:other::rwx";
+ fileSystem.setAcl(directory, AclEntry.parseAclSpec(String.join(",", acl, aclDefault), true));
+
+ final PutHDFS processor = new TestablePutHDFS(kerberosProperties, fileSystem);
+ final TestRunner runner = TestRunners.newTestRunner(processor);
+ runner.setProperty(PutHDFS.DIRECTORY, directory.toString());
+ if (setUmaskIt) {
+ runner.setProperty(PutHDFS.UMASK, "077");
+ }
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put(CoreAttributes.FILENAME.key(), "empty");
+ runner.enqueue(new byte[16], attributes);
+ runner.run();
+ assertEquals(setUmaskIt ? 0 : 1, runner.getFlowFilesForRelationship(PutHDFS.REL_SUCCESS).size());
+ assertEquals(setUmaskIt ? 1 : 0, runner.getFlowFilesForRelationship(PutHDFS.REL_FAILURE).size());
+ }
+ }
+
@Test
public void testPutFileWithCloseException() throws IOException {
mockFileSystem = new MockFileSystem(true);
@@ -522,8 +613,9 @@
}
}
- private class MockFileSystem extends FileSystem {
+ private static class MockFileSystem extends FileSystem {
private final Map<Path, FileStatus> pathToStatus = new HashMap<>();
+ private final Map<Path, List<AclEntry>> pathToAcl = new HashMap<>();
private final boolean failOnClose;
public MockFileSystem() {
@@ -534,6 +626,15 @@
this.failOnClose = failOnClose;
}
+ public void setAcl(final Path path, final List<AclEntry> aclSpec) {
+ pathToAcl.put(path, aclSpec);
+ }
+
+ @Override
+ public AclStatus getAclStatus(final Path path) {
+ return new AclStatus.Builder().addEntries(pathToAcl.getOrDefault(path, new ArrayList<>())).build();
+ }
+
@Override
public URI getUri() {
return URI.create("file:///");