blob: cdb7cba4d69345207233602ed9f757f5cc243af7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.Restricted;
import org.apache.nifi.annotation.behavior.Restriction;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.RequiredPermission;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StopWatch;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.PosixFileAttributeView;
import java.nio.file.attribute.PosixFilePermissions;
import java.nio.file.attribute.UserPrincipalLookupService;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@EventDriven
@SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"put", "local", "copy", "archive", "files", "filesystem"})
@CapabilityDescription("Writes the contents of a FlowFile to the local file system")
@SeeAlso({FetchFile.class, GetFile.class})
@ReadsAttribute(attribute = "filename", description = "The filename to use when writing the FlowFile to disk.")
@Restricted(
restrictions = {
@Restriction(
requiredPermission = RequiredPermission.WRITE_FILESYSTEM,
explanation = "Provides operator the ability to write to any file that NiFi has access to.")
}
)
public class PutFile extends AbstractProcessor {
public static final String REPLACE_RESOLUTION = "replace";
public static final String IGNORE_RESOLUTION = "ignore";
public static final String FAIL_RESOLUTION = "fail";
public static final String FILE_MODIFY_DATE_ATTRIBUTE = "file.lastModifiedTime";
public static final String FILE_MODIFY_DATE_ATTR_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ";
public static final Pattern RWX_PATTERN = Pattern.compile("^([r-][w-])([x-])([r-][w-])([x-])([r-][w-])([x-])$");
public static final Pattern NUM_PATTERN = Pattern.compile("^[0-7]{3}$");
private static final Validator PERMISSIONS_VALIDATOR = new Validator() {
@Override
public ValidationResult validate(String subject, String input, ValidationContext context) {
ValidationResult.Builder vr = new ValidationResult.Builder();
if (context.isExpressionLanguagePresent(input)) {
return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build();
}
if (RWX_PATTERN.matcher(input).matches() || NUM_PATTERN.matcher(input).matches()) {
return vr.valid(true).build();
}
return vr.valid(false)
.subject(subject)
.input(input)
.explanation("This must be expressed in rwxr-x--- form or octal triplet form.")
.build();
}
};
public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder()
.name("Directory")
.description("The directory to which files should be written. You may use expression language such as /aa/bb/${path}")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor MAX_DESTINATION_FILES = new PropertyDescriptor.Builder()
.name("Maximum File Count")
.description("Specifies the maximum number of files that can exist in the output directory")
.required(false)
.addValidator(StandardValidators.INTEGER_VALIDATOR)
.build();
public static final PropertyDescriptor CONFLICT_RESOLUTION = new PropertyDescriptor.Builder()
.name("Conflict Resolution Strategy")
.description("Indicates what should happen when a file with the same name already exists in the output directory")
.required(true)
.defaultValue(FAIL_RESOLUTION)
.allowableValues(REPLACE_RESOLUTION, IGNORE_RESOLUTION, FAIL_RESOLUTION)
.build();
public static final PropertyDescriptor CHANGE_LAST_MODIFIED_TIME = new PropertyDescriptor.Builder()
.name("Last Modified Time")
.description("Sets the lastModifiedTime on the output file to the value of this attribute. Format must be yyyy-MM-dd'T'HH:mm:ssZ. "
+ "You may also use expression language such as ${file.lastModifiedTime}.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor CHANGE_PERMISSIONS = new PropertyDescriptor.Builder()
.name("Permissions")
.description("Sets the permissions on the output file to the value of this attribute. Format must be either UNIX rwxrwxrwx with a - in "
+ "place of denied permissions (e.g. rw-r--r--) or an octal number (e.g. 644). You may also use expression language such as "
+ "${file.permissions}.")
.required(false)
.addValidator(PERMISSIONS_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor CHANGE_OWNER = new PropertyDescriptor.Builder()
.name("Owner")
.description("Sets the owner on the output file to the value of this attribute. You may also use expression language such as "
+ "${file.owner}. Note on many operating systems Nifi must be running as a super-user to have the permissions to set the file owner.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor CHANGE_GROUP = new PropertyDescriptor.Builder()
.name("Group")
.description("Sets the group on the output file to the value of this attribute. You may also use expression language such "
+ "as ${file.group}.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor CREATE_DIRS = new PropertyDescriptor.Builder()
.name("Create Missing Directories")
.description("If true, then missing destination directories will be created. If false, flowfiles are penalized and sent to failure.")
.required(true)
.allowableValues("true", "false")
.defaultValue("true")
.build();
public static final int MAX_FILE_LOCK_ATTEMPTS = 10;
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("Files that have been successfully written to the output directory are transferred to this relationship")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("Files that could not be written to the output directory for some reason are transferred to this relationship")
.build();
private List<PropertyDescriptor> properties;
private Set<Relationship> relationships;
@Override
protected void init(final ProcessorInitializationContext context) {
// relationships
final Set<Relationship> procRels = new HashSet<>();
procRels.add(REL_SUCCESS);
procRels.add(REL_FAILURE);
relationships = Collections.unmodifiableSet(procRels);
// descriptors
final List<PropertyDescriptor> supDescriptors = new ArrayList<>();
supDescriptors.add(DIRECTORY);
supDescriptors.add(CONFLICT_RESOLUTION);
supDescriptors.add(CREATE_DIRS);
supDescriptors.add(MAX_DESTINATION_FILES);
supDescriptors.add(CHANGE_LAST_MODIFIED_TIME);
supDescriptors.add(CHANGE_PERMISSIONS);
supDescriptors.add(CHANGE_OWNER);
supDescriptors.add(CHANGE_GROUP);
properties = Collections.unmodifiableList(supDescriptors);
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final StopWatch stopWatch = new StopWatch(true);
final Path configuredRootDirPath = Paths.get(context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue());
final String conflictResponse = context.getProperty(CONFLICT_RESOLUTION).getValue();
final Integer maxDestinationFiles = context.getProperty(MAX_DESTINATION_FILES).asInteger();
final ComponentLog logger = getLogger();
Path tempDotCopyFile = null;
try {
final Path rootDirPath = configuredRootDirPath.toAbsolutePath();
String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key());
final Path tempCopyFile = rootDirPath.resolve("." + filename);
final Path copyFile = rootDirPath.resolve(filename);
final String permissions = context.getProperty(CHANGE_PERMISSIONS).evaluateAttributeExpressions(flowFile).getValue();
final String owner = context.getProperty(CHANGE_OWNER).evaluateAttributeExpressions(flowFile).getValue();
final String group = context.getProperty(CHANGE_GROUP).evaluateAttributeExpressions(flowFile).getValue();
if (!Files.exists(rootDirPath)) {
if (context.getProperty(CREATE_DIRS).asBoolean()) {
Path existing = rootDirPath;
while (!Files.exists(existing)) {
existing = existing.getParent();
}
if (permissions != null && !permissions.trim().isEmpty()) {
try {
String perms = stringPermissions(permissions, true);
if (!perms.isEmpty()) {
Files.createDirectories(rootDirPath, PosixFilePermissions.asFileAttribute(PosixFilePermissions.fromString(perms)));
} else {
Files.createDirectories(rootDirPath);
}
} catch (Exception e) {
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
logger.error("Could not set create directory with permissions {} because {}", new Object[]{permissions, e});
return;
}
} else {
Files.createDirectories(rootDirPath);
}
boolean chOwner = owner != null && !owner.trim().isEmpty();
boolean chGroup = group != null && !group.trim().isEmpty();
if (chOwner || chGroup) {
Path currentPath = rootDirPath;
while (!currentPath.equals(existing)) {
if (chOwner) {
try {
UserPrincipalLookupService lookupService = currentPath.getFileSystem().getUserPrincipalLookupService();
Files.setOwner(currentPath, lookupService.lookupPrincipalByName(owner));
} catch (Exception e) {
logger.warn("Could not set directory owner to {} because {}", new Object[]{owner, e});
}
}
if (chGroup) {
try {
UserPrincipalLookupService lookupService = currentPath.getFileSystem().getUserPrincipalLookupService();
PosixFileAttributeView view = Files.getFileAttributeView(currentPath, PosixFileAttributeView.class);
view.setGroup(lookupService.lookupPrincipalByGroupName(group));
} catch (Exception e) {
logger.warn("Could not set file group to {} because {}", new Object[]{group, e});
}
}
currentPath = currentPath.getParent();
}
}
} else {
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
logger.error("Penalizing {} and routing to 'failure' because the output directory {} does not exist and Processor is "
+ "configured not to create missing directories", new Object[]{flowFile, rootDirPath});
return;
}
}
final Path dotCopyFile = tempCopyFile;
tempDotCopyFile = dotCopyFile;
Path finalCopyFile = copyFile;
final Path finalCopyFileDir = finalCopyFile.getParent();
if (Files.exists(finalCopyFileDir) && maxDestinationFiles != null) { // check if too many files already
final long numFiles = getFilesNumberInFolder(finalCopyFileDir, filename);
if (numFiles >= maxDestinationFiles) {
flowFile = session.penalize(flowFile);
logger.warn("Penalizing {} and routing to 'failure' because the output directory {} has {} files, which exceeds the "
+ "configured maximum number of files", new Object[]{flowFile, finalCopyFileDir, numFiles});
session.transfer(flowFile, REL_FAILURE);
return;
}
}
if (Files.exists(finalCopyFile)) {
switch (conflictResponse) {
case REPLACE_RESOLUTION:
Files.delete(finalCopyFile);
logger.info("Deleted {} as configured in order to replace with the contents of {}", new Object[]{finalCopyFile, flowFile});
break;
case IGNORE_RESOLUTION:
session.transfer(flowFile, REL_SUCCESS);
logger.info("Transferring {} to success because file with same name already exists", new Object[]{flowFile});
return;
case FAIL_RESOLUTION:
flowFile = session.penalize(flowFile);
logger.warn("Penalizing {} and routing to failure as configured because file with the same name already exists", new Object[]{flowFile});
session.transfer(flowFile, REL_FAILURE);
return;
default:
break;
}
}
session.exportTo(flowFile, dotCopyFile, false);
final String lastModifiedTime = context.getProperty(CHANGE_LAST_MODIFIED_TIME).evaluateAttributeExpressions(flowFile).getValue();
if (lastModifiedTime != null && !lastModifiedTime.trim().isEmpty()) {
try {
final DateFormat formatter = new SimpleDateFormat(FILE_MODIFY_DATE_ATTR_FORMAT, Locale.US);
final Date fileModifyTime = formatter.parse(lastModifiedTime);
dotCopyFile.toFile().setLastModified(fileModifyTime.getTime());
} catch (Exception e) {
logger.warn("Could not set file lastModifiedTime to {} because {}", new Object[]{lastModifiedTime, e});
}
}
if (permissions != null && !permissions.trim().isEmpty()) {
try {
String perms = stringPermissions(permissions, false);
if (!perms.isEmpty()) {
Files.setPosixFilePermissions(dotCopyFile, PosixFilePermissions.fromString(perms));
}
} catch (Exception e) {
logger.warn("Could not set file permissions to {} because {}", new Object[]{permissions, e});
}
}
if (owner != null && !owner.trim().isEmpty()) {
try {
UserPrincipalLookupService lookupService = dotCopyFile.getFileSystem().getUserPrincipalLookupService();
Files.setOwner(dotCopyFile, lookupService.lookupPrincipalByName(owner));
} catch (Exception e) {
logger.warn("Could not set file owner to {} because {}", new Object[]{owner, e});
}
}
if (group != null && !group.trim().isEmpty()) {
try {
UserPrincipalLookupService lookupService = dotCopyFile.getFileSystem().getUserPrincipalLookupService();
PosixFileAttributeView view = Files.getFileAttributeView(dotCopyFile, PosixFileAttributeView.class);
view.setGroup(lookupService.lookupPrincipalByGroupName(group));
} catch (Exception e) {
logger.warn("Could not set file group to {} because {}", new Object[]{group, e});
}
}
boolean renamed = false;
for (int i = 0; i < 10; i++) { // try rename up to 10 times.
if (dotCopyFile.toFile().renameTo(finalCopyFile.toFile())) {
renamed = true;
break;// rename was successful
}
Thread.sleep(100L);// try waiting a few ms to let whatever might cause rename failure to resolve
}
if (!renamed) {
if (Files.exists(dotCopyFile) && dotCopyFile.toFile().delete()) {
logger.debug("Deleted dot copy file {}", new Object[]{dotCopyFile});
}
throw new ProcessException("Could not rename: " + dotCopyFile);
} else {
logger.info("Produced copy of {} at location {}", new Object[]{flowFile, finalCopyFile});
}
session.getProvenanceReporter().send(flowFile, finalCopyFile.toFile().toURI().toString(), stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(flowFile, REL_SUCCESS);
} catch (final Throwable t) {
if (tempDotCopyFile != null) {
try {
Files.deleteIfExists(tempDotCopyFile);
} catch (final Exception e) {
logger.error("Unable to remove temporary file {} due to {}", new Object[]{tempDotCopyFile, e});
}
}
flowFile = session.penalize(flowFile);
logger.error("Penalizing {} and transferring to failure due to {}", new Object[]{flowFile, t});
session.transfer(flowFile, REL_FAILURE);
}
}
private long getFilesNumberInFolder(Path folder, String filename) {
String[] filesInFolder = folder.toFile().list();
return Arrays.stream(filesInFolder)
.filter(eachFilename -> !eachFilename.equals(filename))
.count();
}
protected String stringPermissions(String perms, boolean directory) {
String permissions = "";
Matcher rwx = RWX_PATTERN.matcher(perms);
if (rwx.matches()) {
if (directory) {
// To read or write, directory access will be required
StringBuilder permBuilder = new StringBuilder();
permBuilder.append("$1");
permBuilder.append(rwx.group(1).equals("--") ? "$2" : "x");
permBuilder.append("$3");
permBuilder.append(rwx.group(3).equals("--") ? "$4" : "x");
permBuilder.append("$5");
permBuilder.append(rwx.group(5).equals("--") ? "$6" : "x");
permissions = rwx.replaceAll(permBuilder.toString());
} else {
permissions = perms;
}
} else if (NUM_PATTERN.matcher(perms).matches()) {
try {
int number = Integer.parseInt(perms, 8);
StringBuilder permBuilder = new StringBuilder();
if ((number & 0x100) > 0) {
permBuilder.append('r');
} else {
permBuilder.append('-');
}
if ((number & 0x80) > 0) {
permBuilder.append('w');
} else {
permBuilder.append('-');
}
if (directory || (number & 0x40) > 0) {
permBuilder.append('x');
} else {
permBuilder.append('-');
}
if ((number & 0x20) > 0) {
permBuilder.append('r');
} else {
permBuilder.append('-');
}
if ((number & 0x10) > 0) {
permBuilder.append('w');
} else {
permBuilder.append('-');
}
if ((number & 0x8) > 0) {
permBuilder.append('x');
} else {
if (directory && (number & 0x30) > 0) {
// To read or write, directory access will be required
permBuilder.append('x');
} else {
permBuilder.append('-');
}
}
if ((number & 0x4) > 0) {
permBuilder.append('r');
} else {
permBuilder.append('-');
}
if ((number & 0x2) > 0) {
permBuilder.append('w');
} else {
permBuilder.append('-');
}
if ((number & 0x1) > 0) {
permBuilder.append('x');
} else {
if (directory && (number & 0x6) > 0) {
// To read or write, directory access will be required
permBuilder.append('x');
} else {
permBuilder.append('-');
}
}
permissions = permBuilder.toString();
} catch (NumberFormatException ignore) {
}
}
return permissions;
}
}