blob: 318a63504cbdd71235d9129c1046f63b58676b71 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.Restricted;
import org.apache.nifi.annotation.behavior.Restriction;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.RequiredPermission;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
import java.nio.file.FileStore;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.BasicFileAttributeView;
import java.nio.file.attribute.BasicFileAttributes;
import java.nio.file.attribute.FileOwnerAttributeView;
import java.nio.file.attribute.PosixFileAttributeView;
import java.nio.file.attribute.PosixFilePermissions;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.ListIterator;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
@TriggerWhenEmpty
@InputRequirement(Requirement.INPUT_FORBIDDEN)
@Tags({"local", "files", "filesystem", "ingest", "ingress", "get", "source", "input"})
@CapabilityDescription("Creates FlowFiles from files in a directory. NiFi will ignore files it doesn't have at least read permissions for.")
@WritesAttributes({
@WritesAttribute(attribute = "filename", description = "The filename is set to the name of the file on disk"),
@WritesAttribute(attribute = "path", description = "The path is set to the relative path of the file's directory on disk. For example, "
+ "if the <Input Directory> property is set to /tmp, files picked up from /tmp will have the path attribute set to ./. If "
+ "the <Recurse Subdirectories> property is set to true and a file is picked up from /tmp/abc/1/2/3, then the path attribute will "
+ "be set to abc/1/2/3"),
@WritesAttribute(attribute = "file.creationTime", description = "The date and time that the file was created. May not work on all file systems"),
@WritesAttribute(attribute = "file.lastModifiedTime", description = "The date and time that the file was last modified. May not work on all "
+ "file systems"),
@WritesAttribute(attribute = "file.lastAccessTime", description = "The date and time that the file was last accessed. May not work on all "
+ "file systems"),
@WritesAttribute(attribute = "file.owner", description = "The owner of the file. May not work on all file systems"),
@WritesAttribute(attribute = "file.group", description = "The group owner of the file. May not work on all file systems"),
@WritesAttribute(attribute = "file.permissions", description = "The read/write/execute permissions of the file. May not work on all file systems"),
@WritesAttribute(attribute = "absolute.path", description = "The full/absolute path from where a file was picked up. The current 'path' "
+ "attribute is still populated, but may be a relative path")})
@SeeAlso({PutFile.class, FetchFile.class})
@Restricted(
restrictions = {
@Restriction(
requiredPermission = RequiredPermission.READ_FILESYSTEM,
explanation = "Provides operator the ability to read from any file that NiFi has access to."),
@Restriction(
requiredPermission = RequiredPermission.WRITE_FILESYSTEM,
explanation = "Provides operator the ability to delete any file that NiFi has access to.")
}
)
public class GetFile extends AbstractProcessor {
public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder()
.name("Input Directory")
.description("The input directory from which to pull files")
.required(true)
.addValidator(StandardValidators.createDirectoryExistsValidator(true, false))
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor RECURSE = new PropertyDescriptor.Builder()
.name("Recurse Subdirectories")
.description("Indicates whether or not to pull files from subdirectories")
.required(true)
.allowableValues("true", "false")
.defaultValue("true")
.build();
public static final PropertyDescriptor KEEP_SOURCE_FILE = new PropertyDescriptor.Builder()
.name("Keep Source File")
.description("If true, the file is not deleted after it has been copied to the Content Repository; "
+ "this causes the file to be picked up continually and is useful for testing purposes. "
+ "If not keeping original NiFi will need write permissions on the directory it is pulling "
+ "from otherwise it will ignore the file.")
.required(true)
.allowableValues("true", "false")
.defaultValue("false")
.build();
public static final PropertyDescriptor FILE_FILTER = new PropertyDescriptor.Builder()
.name("File Filter")
.description("Only files whose names match the given regular expression will be picked up")
.required(true)
.defaultValue("[^\\.].*")
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
.build();
public static final PropertyDescriptor PATH_FILTER = new PropertyDescriptor.Builder()
.name("Path Filter")
.description("When " + RECURSE.getName() + " is true, then only subdirectories whose path matches the given regular expression will be scanned")
.required(false)
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
.build();
public static final PropertyDescriptor MIN_AGE = new PropertyDescriptor.Builder()
.name("Minimum File Age")
.description("The minimum age that a file must be in order to be pulled; any file younger than this amount of time (according to last modification date) will be ignored")
.required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("0 sec")
.build();
public static final PropertyDescriptor MAX_AGE = new PropertyDescriptor.Builder()
.name("Maximum File Age")
.description("The maximum age that a file must be in order to be pulled; any file older than this amount of time (according to last modification date) will be ignored")
.required(false)
.addValidator(StandardValidators.createTimePeriodValidator(100, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
.build();
public static final PropertyDescriptor MIN_SIZE = new PropertyDescriptor.Builder()
.name("Minimum File Size")
.description("The minimum size that a file must be in order to be pulled")
.required(true)
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.defaultValue("0 B")
.build();
public static final PropertyDescriptor MAX_SIZE = new PropertyDescriptor.Builder()
.name("Maximum File Size")
.description("The maximum size that a file can be in order to be pulled")
.required(false)
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.build();
public static final PropertyDescriptor IGNORE_HIDDEN_FILES = new PropertyDescriptor.Builder()
.name("Ignore Hidden Files")
.description("Indicates whether or not hidden files should be ignored")
.allowableValues("true", "false")
.defaultValue("true")
.required(true)
.build();
public static final PropertyDescriptor POLLING_INTERVAL = new PropertyDescriptor.Builder()
.name("Polling Interval")
.description("Indicates how long to wait before performing a directory listing")
.required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("0 sec")
.build();
public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("Batch Size")
.description("The maximum number of files to pull in each iteration")
.required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("10")
.build();
public static final String FILE_CREATION_TIME_ATTRIBUTE = "file.creationTime";
public static final String FILE_LAST_MODIFY_TIME_ATTRIBUTE = "file.lastModifiedTime";
public static final String FILE_LAST_ACCESS_TIME_ATTRIBUTE = "file.lastAccessTime";
public static final String FILE_OWNER_ATTRIBUTE = "file.owner";
public static final String FILE_GROUP_ATTRIBUTE = "file.group";
public static final String FILE_PERMISSIONS_ATTRIBUTE = "file.permissions";
public static final String FILE_MODIFY_DATE_ATTR_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ";
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All files are routed to success").build();
private List<PropertyDescriptor> properties;
private Set<Relationship> relationships;
private final AtomicReference<FileFilter> fileFilterRef = new AtomicReference<>();
private final BlockingQueue<File> fileQueue = new LinkedBlockingQueue<>();
private final Set<File> inProcess = new HashSet<>(); // guarded by queueLock
private final Set<File> recentlyProcessed = new HashSet<>(); // guarded by queueLock
private final Lock queueLock = new ReentrantLock();
private final Lock listingLock = new ReentrantLock();
private final AtomicLong queueLastUpdated = new AtomicLong(0L);
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(DIRECTORY);
properties.add(FILE_FILTER);
properties.add(PATH_FILTER);
properties.add(BATCH_SIZE);
properties.add(KEEP_SOURCE_FILE);
properties.add(RECURSE);
properties.add(POLLING_INTERVAL);
properties.add(IGNORE_HIDDEN_FILES);
properties.add(MIN_AGE);
properties.add(MAX_AGE);
properties.add(MIN_SIZE);
properties.add(MAX_SIZE);
this.properties = Collections.unmodifiableList(properties);
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@OnScheduled
public void onScheduled(final ProcessContext context) {
fileFilterRef.set(createFileFilter(context));
fileQueue.clear();
}
private FileFilter createFileFilter(final ProcessContext context) {
final long minSize = context.getProperty(MIN_SIZE).asDataSize(DataUnit.B).longValue();
final Double maxSize = context.getProperty(MAX_SIZE).asDataSize(DataUnit.B);
final long minAge = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
final Long maxAge = context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
final boolean ignoreHidden = context.getProperty(IGNORE_HIDDEN_FILES).asBoolean();
final Pattern filePattern = Pattern.compile(context.getProperty(FILE_FILTER).getValue());
final String indir = context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue();
final boolean recurseDirs = context.getProperty(RECURSE).asBoolean();
final String pathPatternStr = context.getProperty(PATH_FILTER).getValue();
final Pattern pathPattern = (!recurseDirs || pathPatternStr == null) ? null : Pattern.compile(pathPatternStr);
final boolean keepOriginal = context.getProperty(KEEP_SOURCE_FILE).asBoolean();
return new FileFilter() {
@Override
public boolean accept(final File file) {
if (minSize > file.length()) {
return false;
}
if (maxSize != null && maxSize < file.length()) {
return false;
}
final long fileAge = System.currentTimeMillis() - file.lastModified();
if (minAge > fileAge) {
return false;
}
if (maxAge != null && maxAge < fileAge) {
return false;
}
if (ignoreHidden && file.isHidden()) {
return false;
}
if (pathPattern != null) {
Path reldir = Paths.get(indir).relativize(file.toPath()).getParent();
if (reldir != null && !reldir.toString().isEmpty()) {
if (!pathPattern.matcher(reldir.toString()).matches()) {
return false;
}
}
}
//Verify that we have at least read permissions on the file we're considering grabbing
if (!Files.isReadable(file.toPath())) {
return false;
}
//Verify that if we're not keeping original that we have write permissions on the directory the file is in
if (keepOriginal == false && !Files.isWritable(file.toPath().getParent())) {
return false;
}
return filePattern.matcher(file.getName()).matches();
}
};
}
private Set<File> performListing(final File directory, final FileFilter filter, final boolean recurseSubdirectories) {
Path p = directory.toPath();
if (!Files.isWritable(p) || !Files.isReadable(p)) {
throw new IllegalStateException("Directory '" + directory + "' does not have sufficient permissions (i.e., not writable and readable)");
}
final Set<File> queue = new HashSet<>();
if (!directory.exists()) {
return queue;
}
final File[] children = directory.listFiles();
if (children == null) {
return queue;
}
for (final File child : children) {
if (child.isDirectory()) {
if (recurseSubdirectories) {
queue.addAll(performListing(child, filter, recurseSubdirectories));
}
} else if (filter.accept(child)) {
queue.add(child);
}
}
return queue;
}
protected Map<String, String> getAttributesFromFile(final Path file) {
Map<String, String> attributes = new HashMap<>();
try {
FileStore store = Files.getFileStore(file);
if (store.supportsFileAttributeView("basic")) {
try {
final DateFormat formatter = new SimpleDateFormat(FILE_MODIFY_DATE_ATTR_FORMAT, Locale.US);
BasicFileAttributeView view = Files.getFileAttributeView(file, BasicFileAttributeView.class);
BasicFileAttributes attrs = view.readAttributes();
attributes.put(FILE_LAST_MODIFY_TIME_ATTRIBUTE, formatter.format(new Date(attrs.lastModifiedTime().toMillis())));
attributes.put(FILE_CREATION_TIME_ATTRIBUTE, formatter.format(new Date(attrs.creationTime().toMillis())));
attributes.put(FILE_LAST_ACCESS_TIME_ATTRIBUTE, formatter.format(new Date(attrs.lastAccessTime().toMillis())));
} catch (Exception ignore) {
} // allow other attributes if these fail
}
if (store.supportsFileAttributeView("owner")) {
try {
FileOwnerAttributeView view = Files.getFileAttributeView(file, FileOwnerAttributeView.class);
attributes.put(FILE_OWNER_ATTRIBUTE, view.getOwner().getName());
} catch (Exception ignore) {
} // allow other attributes if these fail
}
if (store.supportsFileAttributeView("posix")) {
try {
PosixFileAttributeView view = Files.getFileAttributeView(file, PosixFileAttributeView.class);
attributes.put(FILE_PERMISSIONS_ATTRIBUTE, PosixFilePermissions.toString(view.readAttributes().permissions()));
attributes.put(FILE_GROUP_ATTRIBUTE, view.readAttributes().group().getName());
} catch (Exception ignore) {
} // allow other attributes if these fail
}
} catch (IOException ioe) {
// well then this FlowFile gets none of these attributes
}
return attributes;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final File directory = new File(context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue());
final boolean keepingSourceFile = context.getProperty(KEEP_SOURCE_FILE).asBoolean();
final ComponentLog logger = getLogger();
if (fileQueue.size() < 100) {
final long pollingMillis = context.getProperty(POLLING_INTERVAL).asTimePeriod(TimeUnit.MILLISECONDS);
if ((queueLastUpdated.get() < System.currentTimeMillis() - pollingMillis) && listingLock.tryLock()) {
try {
final Set<File> listing = performListing(directory, fileFilterRef.get(), context.getProperty(RECURSE).asBoolean().booleanValue());
queueLock.lock();
try {
listing.removeAll(inProcess);
if (!keepingSourceFile) {
listing.removeAll(recentlyProcessed);
}
fileQueue.clear();
fileQueue.addAll(listing);
queueLastUpdated.set(System.currentTimeMillis());
recentlyProcessed.clear();
if (listing.isEmpty()) {
context.yield();
}
} finally {
queueLock.unlock();
}
} finally {
listingLock.unlock();
}
}
}
final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
final List<File> files = new ArrayList<>(batchSize);
queueLock.lock();
try {
fileQueue.drainTo(files, batchSize);
if (files.isEmpty()) {
return;
} else {
inProcess.addAll(files);
}
} finally {
queueLock.unlock();
}
final ListIterator<File> itr = files.listIterator();
FlowFile flowFile = null;
try {
final Path directoryPath = directory.toPath();
while (itr.hasNext()) {
final File file = itr.next();
final Path filePath = file.toPath();
final Path relativePath = directoryPath.relativize(filePath.getParent());
String relativePathString = relativePath.toString() + "/";
if (relativePathString.isEmpty()) {
relativePathString = "./";
}
final Path absPath = filePath.toAbsolutePath();
final String absPathString = absPath.getParent().toString() + "/";
flowFile = session.create();
final long importStart = System.nanoTime();
flowFile = session.importFrom(filePath, keepingSourceFile, flowFile);
final long importNanos = System.nanoTime() - importStart;
final long importMillis = TimeUnit.MILLISECONDS.convert(importNanos, TimeUnit.NANOSECONDS);
flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), file.getName());
flowFile = session.putAttribute(flowFile, CoreAttributes.PATH.key(), relativePathString);
flowFile = session.putAttribute(flowFile, CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
Map<String, String> attributes = getAttributesFromFile(filePath);
if (attributes.size() > 0) {
flowFile = session.putAllAttributes(flowFile, attributes);
}
session.getProvenanceReporter().receive(flowFile, file.toURI().toString(), importMillis);
session.transfer(flowFile, REL_SUCCESS);
logger.info("added {} to flow", new Object[]{flowFile});
if (!isScheduled()) { // if processor stopped, put the rest of the files back on the queue.
queueLock.lock();
try {
while (itr.hasNext()) {
final File nextFile = itr.next();
fileQueue.add(nextFile);
inProcess.remove(nextFile);
}
} finally {
queueLock.unlock();
}
}
}
} catch (final Exception e) {
logger.error("Failed to retrieve files due to {}", e);
// anything that we've not already processed needs to be put back on the queue
if (flowFile != null) {
session.remove(flowFile);
}
} finally {
queueLock.lock();
try {
inProcess.removeAll(files);
recentlyProcessed.addAll(files);
} finally {
queueLock.unlock();
}
}
}
}