blob: 31f582f4e311ab49d7f4ca85c5dab10db2a99082 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.nifi.processors.hadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
@Tags({"hadoop", "HDFS", "get", "list", "ingest", "source", "filesystem"})
@CapabilityDescription("Retrieves a listing of files from HDFS. Each time a listing is performed, the files with the latest timestamp will be excluded "
+ "and picked up during the next execution of the processor. This is done to ensure that we do not miss any files, or produce duplicates, in the "
+ "cases where files with the same timestamp are written immediately before and after a single execution of the processor. For each file that is "
+ "listed in HDFS, this processor creates a FlowFile that represents the HDFS file to be fetched in conjunction with FetchHDFS. This Processor is "
+ "designed to run on Primary Node only in a cluster. If the primary node changes, the new Primary Node will pick up where the previous node left "
+ "off without duplicating all of the data. Unlike GetHDFS, this Processor does not delete any data from HDFS.")
@WritesAttribute(attribute="filename", description="The name of the file that was read from HDFS."),
@WritesAttribute(attribute="path", description="The path is set to the absolute path of the file's directory on HDFS. For example, if the Directory property is set to /tmp, "
+ "then files picked up from /tmp will have the path attribute set to \"./\". If the Recurse Subdirectories property is set to true and a file is picked up "
+ "from /tmp/abc/1/2/3, then the path attribute will be set to \"/tmp/abc/1/2/3\"."),
@WritesAttribute(attribute="hdfs.owner", description="The user that owns the file in HDFS"),
@WritesAttribute(attribute="", description="The group that owns the file in HDFS"),
@WritesAttribute(attribute="hdfs.lastModified", description="The timestamp of when the file in HDFS was last modified, as milliseconds since midnight Jan 1, 1970 UTC"),
@WritesAttribute(attribute="hdfs.length", description="The number of bytes in the file in HDFS"),
@WritesAttribute(attribute="hdfs.replication", description="The number of HDFS replicas for hte file"),
@WritesAttribute(attribute="hdfs.permissions", description="The permissions for the file in HDFS. This is formatted as 3 characters for the owner, "
+ "3 for the group, and 3 for other users. For example rw-rw-r--")
@Stateful(scopes = Scope.CLUSTER, description = "After performing a listing of HDFS files, the latest timestamp of all the files listed and the latest "
+ "timestamp of all the files transferred are both stored. This allows the Processor to list only files that have been added or modified after "
+ "this date the next time that the Processor is run, without having to store all of the actual filenames/paths which could lead to performance "
+ "problems. State is stored across the cluster so that this Processor can be run on Primary Node only and if a new Primary "
+ "Node is selected, the new node can pick up where the previous node left off, without duplicating the data.")
@SeeAlso({GetHDFS.class, FetchHDFS.class, PutHDFS.class})
public class ListHDFS extends AbstractHadoopProcessor {
public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder()
.name("Distributed Cache Service")
.description("This property is ignored. State will be stored in the " + Scope.LOCAL + " or " + Scope.CLUSTER + " scope by the State Manager based on NiFi's configuration.")
public static final PropertyDescriptor RECURSE_SUBDIRS = new PropertyDescriptor.Builder()
.name("Recurse Subdirectories")
.description("Indicates whether to list files from subdirectories of the HDFS directory")
.allowableValues("true", "false")
public static final PropertyDescriptor FILE_FILTER = new PropertyDescriptor.Builder()
.name("File Filter")
.description("Only files whose names match the given regular expression will be picked up")
private static final String FILTER_MODE_DIRECTORIES_AND_FILES = "filter-mode-directories-and-files";
private static final String FILTER_MODE_FILES_ONLY = "filter-mode-files-only";
private static final String FILTER_MODE_FULL_PATH = "filter-mode-full-path";
"Directories and Files",
"Filtering will be applied to the names of directories and files. If " + RECURSE_SUBDIRS.getDisplayName()
+ " is set to true, only subdirectories with a matching name will be searched for files that match "
+ "the regular expression defined in " + FILE_FILTER.getDisplayName() + ".");
static final AllowableValue FILTER_FILES_ONLY_VALUE = new AllowableValue(FILTER_MODE_FILES_ONLY,
"Files Only",
"Filtering will only be applied to the names of files. If " + RECURSE_SUBDIRS.getDisplayName()
+ " is set to true, the entire subdirectory tree will be searched for files that match "
+ "the regular expression defined in " + FILE_FILTER.getDisplayName() + ".");
static final AllowableValue FILTER_FULL_PATH_VALUE = new AllowableValue(FILTER_MODE_FULL_PATH,
"Full Path",
"Filtering will be applied by evaluating the regular expression defined in " + FILE_FILTER.getDisplayName()
+ " against the full path of files with and without the scheme and authority. If "
+ RECURSE_SUBDIRS.getDisplayName() + " is set to true, the entire subdirectory tree will be searched for files in which the full path of "
+ "the file matches the regular expression defined in " + FILE_FILTER.getDisplayName() + ". See 'Additional Details' for more information.");
public static final PropertyDescriptor FILE_FILTER_MODE = new PropertyDescriptor.Builder()
.displayName("File Filter Mode")
.description("Determines how the regular expression in " + FILE_FILTER.getDisplayName() + " will be used when retrieving listings.")
public static final PropertyDescriptor MIN_AGE = new PropertyDescriptor.Builder()
.displayName("Minimum File Age")
.description("The minimum age that a file must be in order to be pulled; any file younger than this "
+ "amount of time (based on last modification date) will be ignored")
.addValidator(StandardValidators.createTimePeriodValidator(0, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
public static final PropertyDescriptor MAX_AGE = new PropertyDescriptor.Builder()
.displayName("Maximum File Age")
.description("The maximum age that a file must be in order to be pulled; any file older than this "
+ "amount of time (based on last modification date) will be ignored. Minimum value is 100ms.")
.addValidator(StandardValidators.createTimePeriodValidator(100, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.description("All FlowFiles are transferred to this relationship")
private volatile long latestTimestampListed = -1L;
private volatile long latestTimestampEmitted = -1L;
private volatile long lastRunTimestamp = -1L;
private volatile boolean resetState = false;
static final String LISTING_TIMESTAMP_KEY = "listing.timestamp";
static final String EMITTED_TIMESTAMP_KEY = "emitted.timestamp";
static final long LISTING_LAG_NANOS = TimeUnit.MILLISECONDS.toNanos(100L);
private Pattern fileFilterRegexPattern;
protected void init(final ProcessorInitializationContext context) {
protected void preProcessConfiguration(Configuration config, ProcessContext context) {
super.preProcessConfiguration(config, context);
// Since this processor is marked as INPUT_FORBIDDEN, the FILE_FILTER regex can be compiled here rather than during onTrigger processing
fileFilterRegexPattern = Pattern.compile(context.getProperty(FILE_FILTER).getValue());
protected File getPersistenceFile() {
return new File("conf/state/" + getIdentifier());
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> props = new ArrayList<>(properties);
return props;
public Set<Relationship> getRelationships() {
final Set<Relationship> relationships = new HashSet<>();
return relationships;
protected Collection<ValidationResult> customValidate(ValidationContext context) {
final List<ValidationResult> problems = new ArrayList<>(super.customValidate(context));
final Long minAgeProp = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
final Long maxAgeProp = context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
final long minimumAge = (minAgeProp == null) ? 0L : minAgeProp;
final long maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE : maxAgeProp;
if (minimumAge > maximumAge) {
problems.add(new ValidationResult.Builder().valid(false).subject("GetHDFS Configuration")
.explanation(MIN_AGE.getDisplayName() + " cannot be greater than " + MAX_AGE.getDisplayName()).build());
return problems;
protected String getKey(final String directory) {
return getIdentifier() + ".lastListingTime." + directory;
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
super.onPropertyModified(descriptor, oldValue, newValue);
if (isConfigurationRestored() && (descriptor.equals(DIRECTORY) || descriptor.equals(FILE_FILTER))) {
this.resetState = true;
* Determines which of the given FileStatus's describes a File that should be listed.
* @param statuses the eligible FileStatus objects that we could potentially list
* @param context processor context with properties values
* @return a Set containing only those FileStatus objects that we want to list
Set<FileStatus> determineListable(final Set<FileStatus> statuses, ProcessContext context) {
final long minTimestamp = this.latestTimestampListed;
final TreeMap<Long, List<FileStatus>> orderedEntries = new TreeMap<>();
final Long minAgeProp = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
// NIFI-4144 - setting to MIN_VALUE so that in case the file modification time is in
// the future relative to the nifi instance, files are not skipped.
final long minimumAge = (minAgeProp == null) ? Long.MIN_VALUE : minAgeProp;
final Long maxAgeProp = context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
final long maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE : maxAgeProp;
// Build a sorted map to determine the latest possible entries
for (final FileStatus status : statuses) {
if (status.getPath().getName().endsWith("_COPYING_")) {
final long fileAge = System.currentTimeMillis() - status.getModificationTime();
if (minimumAge > fileAge || fileAge > maximumAge) {
final long entityTimestamp = status.getModificationTime();
if (entityTimestamp > latestTimestampListed) {
latestTimestampListed = entityTimestamp;
// New entries are all those that occur at or after the associated timestamp
final boolean newEntry = entityTimestamp >= minTimestamp && entityTimestamp > latestTimestampEmitted;
if (newEntry) {
List<FileStatus> entitiesForTimestamp = orderedEntries.get(status.getModificationTime());
if (entitiesForTimestamp == null) {
entitiesForTimestamp = new ArrayList<FileStatus>();
orderedEntries.put(status.getModificationTime(), entitiesForTimestamp);
final Set<FileStatus> toList = new HashSet<>();
if (orderedEntries.size() > 0) {
long latestListingTimestamp = orderedEntries.lastKey();
// If the last listing time is equal to the newest entries previously seen,
// another iteration has occurred without new files and special handling is needed to avoid starvation
if (latestListingTimestamp == minTimestamp) {
// We are done if the latest listing timestamp is equal to the last processed time,
// meaning we handled those items originally passed over
if (latestListingTimestamp == latestTimestampEmitted) {
return Collections.emptySet();
} else {
// Otherwise, newest entries are held back one cycle to avoid issues in writes occurring exactly when the listing is being performed to avoid missing data
for (List<FileStatus> timestampEntities : orderedEntries.values()) {
for (FileStatus status : timestampEntities) {
return toList;
public void resetStateIfNecessary(final ProcessContext context) throws IOException {
if (resetState) {
getLogger().debug("Property has been modified. Resetting the state values - listing.timestamp and emitted.timestamp to -1L");
this.resetState = false;
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
// We have to ensure that we don't continually perform listings, because if we perform two listings within
// the same millisecond, our algorithm for comparing timestamps will not work. So we ensure here that we do
// not let that happen.
final long now = System.nanoTime();
if (now - lastRunTimestamp < LISTING_LAG_NANOS) {
lastRunTimestamp = now;
lastRunTimestamp = now;
final String directory = context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue();
// Ensure that we are using the latest listing information before we try to perform a listing of HDFS files.
try {
final StateMap stateMap = context.getStateManager().getState(Scope.CLUSTER);
if (stateMap.getVersion() == -1L) {
latestTimestampEmitted = -1L;
latestTimestampListed = -1L;
getLogger().debug("Found no state stored");
} else {
// Determine if state is stored in the 'new' format or the 'old' format
final String emittedString = stateMap.get(EMITTED_TIMESTAMP_KEY);
if (emittedString == null) {
latestTimestampEmitted = -1L;
latestTimestampListed = -1L;
getLogger().debug("Found no recognized state keys; assuming no relevant state and resetting listing/emitted time to -1");
} else {
// state is stored in the new format, using just two timestamps
latestTimestampEmitted = Long.parseLong(emittedString);
final String listingTimestmapString = stateMap.get(LISTING_TIMESTAMP_KEY);
if (listingTimestmapString != null) {
latestTimestampListed = Long.parseLong(listingTimestmapString);
getLogger().debug("Found new-style state stored, latesting timestamp emitted = {}, latest listed = {}",
new Object[] {latestTimestampEmitted, latestTimestampListed});
} catch (final IOException ioe) {
getLogger().error("Failed to retrieve timestamp of last listing from the State Manager. Will not perform listing until this is accomplished.");
// Pull in any file that is newer than the timestamp that we have.
final FileSystem hdfs = getFileSystem();
final boolean recursive = context.getProperty(RECURSE_SUBDIRS).asBoolean();
String fileFilterMode = context.getProperty(FILE_FILTER_MODE).getValue();
final Set<FileStatus> statuses;
try {
final Path rootPath = new Path(directory);
statuses = getStatuses(rootPath, recursive, hdfs, createPathFilter(context), fileFilterMode);
getLogger().debug("Found a total of {} files in HDFS", new Object[] {statuses.size()});
} catch (final IOException | IllegalArgumentException e) {
getLogger().error("Failed to perform listing of HDFS due to {}", new Object[] {e});
} catch (final InterruptedException e) {
getLogger().error("Interrupted while performing listing of HDFS", e);
final Set<FileStatus> listable = determineListable(statuses, context);
getLogger().debug("Of the {} files found in HDFS, {} are listable", new Object[] {statuses.size(), listable.size()});
for (final FileStatus status : listable) {
final Map<String, String> attributes = createAttributes(status);
FlowFile flowFile = session.create();
flowFile = session.putAllAttributes(flowFile, attributes);
session.transfer(flowFile, REL_SUCCESS);
final long fileModTime = status.getModificationTime();
if (fileModTime > latestTimestampEmitted) {
latestTimestampEmitted = fileModTime;
final int listCount = listable.size();
if ( listCount > 0 ) {
getLogger().info("Successfully created listing with {} new files from HDFS", new Object[] {listCount});
} else {
getLogger().debug("There is no data to list. Yielding.");
final Map<String, String> updatedState = new HashMap<>(1);
updatedState.put(LISTING_TIMESTAMP_KEY, String.valueOf(latestTimestampListed));
updatedState.put(EMITTED_TIMESTAMP_KEY, String.valueOf(latestTimestampEmitted));
getLogger().debug("New state map: {}", new Object[] {updatedState});
try {
context.getStateManager().setState(updatedState, Scope.CLUSTER);
} catch (final IOException ioe) {
getLogger().warn("Failed to save cluster-wide state. If NiFi is restarted, data duplication may occur", ioe);
private Set<FileStatus> getStatuses(final Path path, final boolean recursive, final FileSystem hdfs, final PathFilter filter, String filterMode) throws IOException, InterruptedException {
final Set<FileStatus> statusSet = new HashSet<>();
getLogger().debug("Fetching listing for {}", new Object[] {path});
final FileStatus[] statuses;
if (isPostListingFilterNeeded(filterMode)) {
// For this filter mode, the filter is not passed to listStatus, so that directory names will not be
// filtered out when the listing is recursive.
statuses = getUserGroupInformation().doAs((PrivilegedExceptionAction<FileStatus[]>) () -> hdfs.listStatus(path));
} else {
statuses = getUserGroupInformation().doAs((PrivilegedExceptionAction<FileStatus[]>) () -> hdfs.listStatus(path, filter));
for ( final FileStatus status : statuses ) {
if ( status.isDirectory() ) {
if ( recursive ) {
try {
statusSet.addAll(getStatuses(status.getPath(), recursive, hdfs, filter, filterMode));
} catch (final IOException ioe) {
getLogger().error("Failed to retrieve HDFS listing for subdirectory {} due to {}; will continue listing others", new Object[] {status.getPath(), ioe});
} else {
if (isPostListingFilterNeeded(filterMode)) {
// Filtering explicitly performed here, since it was not able to be done when calling listStatus.
if (filter.accept(status.getPath())) {
} else {
return statusSet;
* Determines if filtering needs to be applied, after calling {@link FileSystem#listStatus(Path)}, based on the
* given filter mode.
* Filter modes that need to be able to search directories regardless of the given filter should return true.
* FILTER_MODE_FILES_ONLY and FILTER_MODE_FULL_PATH require that {@link FileSystem#listStatus(Path)} be invoked
* without a filter so that all directories can be traversed when filtering with these modes.
* FILTER_MODE_DIRECTORIES_AND_FILES should return false, since filtering can be applied directly with
* {@link FileSystem#listStatus(Path, PathFilter)} regardless of a recursive listing.
* @param filterMode the value of one of the defined AllowableValues representing filter modes
* @return true if results need to be filtered, false otherwise
private boolean isPostListingFilterNeeded(String filterMode) {
return filterMode.equals(FILTER_MODE_FILES_ONLY) || filterMode.equals(FILTER_MODE_FULL_PATH);
private String getAbsolutePath(final Path path) {
final Path parent = path.getParent();
final String prefix = (parent == null || parent.getName().equals("")) ? "" : getAbsolutePath(parent);
return prefix + "/" + path.getName();
private Map<String, String> createAttributes(final FileStatus status) {
final Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.FILENAME.key(), status.getPath().getName());
attributes.put(CoreAttributes.PATH.key(), getAbsolutePath(status.getPath().getParent()));
attributes.put("hdfs.owner", status.getOwner());
attributes.put("", status.getGroup());
attributes.put("hdfs.lastModified", String.valueOf(status.getModificationTime()));
attributes.put("hdfs.length", String.valueOf(status.getLen()));
attributes.put("hdfs.replication", String.valueOf(status.getReplication()));
final FsPermission permission = status.getPermission();
final String perms = getPerms(permission.getUserAction()) + getPerms(permission.getGroupAction()) + getPerms(permission.getOtherAction());
attributes.put("hdfs.permissions", perms);
return attributes;
private String getPerms(final FsAction action) {
final StringBuilder sb = new StringBuilder();
if (action.implies(FsAction.READ)) {
} else {
if (action.implies(FsAction.WRITE)) {
} else {
if (action.implies(FsAction.EXECUTE)) {
} else {
return sb.toString();
private PathFilter createPathFilter(final ProcessContext context) {
final String filterMode = context.getProperty(FILE_FILTER_MODE).getValue();
return path -> {
final boolean accepted;
if (FILTER_FULL_PATH_VALUE.getValue().equals(filterMode)) {
accepted = fileFilterRegexPattern.matcher(path.toString()).matches()
|| fileFilterRegexPattern.matcher(Path.getPathWithoutSchemeAndAuthority(path).toString()).matches();
} else {
accepted = fileFilterRegexPattern.matcher(path.getName()).matches();
return accepted;