| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.nifi.processors.standard; |
| |
| import org.apache.nifi.annotation.behavior.InputRequirement; |
| import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; |
| import org.apache.nifi.annotation.behavior.Stateful; |
| import org.apache.nifi.annotation.behavior.TriggerSerially; |
| import org.apache.nifi.annotation.behavior.WritesAttribute; |
| import org.apache.nifi.annotation.behavior.WritesAttributes; |
| import org.apache.nifi.annotation.documentation.CapabilityDescription; |
| import org.apache.nifi.annotation.documentation.SeeAlso; |
| import org.apache.nifi.annotation.documentation.Tags; |
| import org.apache.nifi.annotation.lifecycle.OnScheduled; |
| import org.apache.nifi.annotation.lifecycle.OnStopped; |
| import org.apache.nifi.components.AllowableValue; |
| import org.apache.nifi.components.PropertyDescriptor; |
| import org.apache.nifi.components.PropertyDescriptor.Builder; |
| import org.apache.nifi.components.state.Scope; |
| import org.apache.nifi.context.PropertyContext; |
| import org.apache.nifi.expression.ExpressionLanguageScope; |
| import org.apache.nifi.flowfile.attributes.CoreAttributes; |
| import org.apache.nifi.logging.ComponentLog; |
| import org.apache.nifi.processor.DataUnit; |
| import org.apache.nifi.processor.ProcessContext; |
| import org.apache.nifi.processor.ProcessorInitializationContext; |
| import org.apache.nifi.processor.Relationship; |
| import org.apache.nifi.processor.util.StandardValidators; |
| import org.apache.nifi.processor.util.list.AbstractListProcessor; |
| import org.apache.nifi.processor.util.list.ListedEntityTracker; |
| import org.apache.nifi.processors.standard.util.FileInfo; |
| import org.apache.nifi.serialization.record.RecordSchema; |
| import org.apache.nifi.util.Tuple; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.nio.file.AccessDeniedException; |
| import java.nio.file.FileStore; |
| import java.nio.file.FileVisitOption; |
| import java.nio.file.FileVisitResult; |
| import java.nio.file.FileVisitor; |
| import java.nio.file.Files; |
| import java.nio.file.Path; |
| import java.nio.file.Paths; |
| import java.nio.file.attribute.BasicFileAttributeView; |
| import java.nio.file.attribute.BasicFileAttributes; |
| import java.nio.file.attribute.FileOwnerAttributeView; |
| import java.nio.file.attribute.PosixFileAttributeView; |
| import java.nio.file.attribute.PosixFilePermissions; |
| import java.text.DateFormat; |
| import java.text.SimpleDateFormat; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.Date; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.LinkedHashMap; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Locale; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicReference; |
| import java.util.function.BiPredicate; |
| import java.util.function.Supplier; |
| import java.util.regex.Pattern; |
| |
| import static org.apache.nifi.expression.ExpressionLanguageScope.VARIABLE_REGISTRY; |
| import static org.apache.nifi.processor.util.StandardValidators.POSITIVE_INTEGER_VALIDATOR; |
| import static org.apache.nifi.processor.util.StandardValidators.TIME_PERIOD_VALIDATOR; |
| |
| @TriggerSerially |
| @InputRequirement(Requirement.INPUT_FORBIDDEN) |
| @Tags({"file", "get", "list", "ingest", "source", "filesystem"}) |
| @CapabilityDescription("Retrieves a listing of files from the local filesystem. For each file that is listed, " + |
| "creates a FlowFile that represents the file so that it can be fetched in conjunction with FetchFile. This " + |
| "Processor is designed to run on Primary Node only in a cluster. If the primary node changes, the new " + |
| "Primary Node will pick up where the previous node left off without duplicating all of the data. Unlike " + |
| "GetFile, this Processor does not delete any data from the local filesystem.") |
| @WritesAttributes({ |
| @WritesAttribute(attribute="filename", description="The name of the file that was read from filesystem."), |
| @WritesAttribute(attribute="path", description="The path is set to the relative path of the file's directory " + |
| "on filesystem compared to the Input Directory property. For example, if Input Directory is set to " + |
| "/tmp, then files picked up from /tmp will have the path attribute set to \"/\". If the Recurse " + |
| "Subdirectories property is set to true and a file is picked up from /tmp/abc/1/2/3, then the path " + |
| "attribute will be set to \"abc/1/2/3/\"."), |
| @WritesAttribute(attribute="absolute.path", description="The absolute.path is set to the absolute path of " + |
| "the file's directory on filesystem. For example, if the Input Directory property is set to /tmp, " + |
| "then files picked up from /tmp will have the path attribute set to \"/tmp/\". If the Recurse " + |
| "Subdirectories property is set to true and a file is picked up from /tmp/abc/1/2/3, then the path " + |
| "attribute will be set to \"/tmp/abc/1/2/3/\"."), |
| @WritesAttribute(attribute=ListFile.FILE_OWNER_ATTRIBUTE, description="The user that owns the file in filesystem"), |
| @WritesAttribute(attribute=ListFile.FILE_GROUP_ATTRIBUTE, description="The group that owns the file in filesystem"), |
| @WritesAttribute(attribute=ListFile.FILE_SIZE_ATTRIBUTE, description="The number of bytes in the file in filesystem"), |
| @WritesAttribute(attribute=ListFile.FILE_PERMISSIONS_ATTRIBUTE, description="The permissions for the file in filesystem. This " + |
| "is formatted as 3 characters for the owner, 3 for the group, and 3 for other users. For example " + |
| "rw-rw-r--"), |
| @WritesAttribute(attribute=ListFile.FILE_LAST_MODIFY_TIME_ATTRIBUTE, description="The timestamp of when the file in filesystem was " + |
| "last modified as 'yyyy-MM-dd'T'HH:mm:ssZ'"), |
| @WritesAttribute(attribute=ListFile.FILE_LAST_ACCESS_TIME_ATTRIBUTE, description="The timestamp of when the file in filesystem was " + |
| "last accessed as 'yyyy-MM-dd'T'HH:mm:ssZ'"), |
| @WritesAttribute(attribute=ListFile.FILE_CREATION_TIME_ATTRIBUTE, description="The timestamp of when the file in filesystem was " + |
| "created as 'yyyy-MM-dd'T'HH:mm:ssZ'") |
| }) |
| @SeeAlso({GetFile.class, PutFile.class, FetchFile.class}) |
| @Stateful(scopes = {Scope.LOCAL, Scope.CLUSTER}, description = "After performing a listing of files, the timestamp of the newest file is stored. " |
| + "This allows the Processor to list only files that have been added or modified after " |
| + "this date the next time that the Processor is run. Whether the state is stored with a Local or Cluster scope depends on the value of the " |
| + "<Input Directory Location> property.") |
| public class ListFile extends AbstractListProcessor<FileInfo> { |
| static final AllowableValue LOCATION_LOCAL = new AllowableValue("Local", "Local", "Input Directory is located on a local disk. State will be stored locally on each node in the cluster."); |
| static final AllowableValue LOCATION_REMOTE = new AllowableValue("Remote", "Remote", "Input Directory is located on a remote system. State will be stored across the cluster so that " |
| + "the listing can be performed on Primary Node Only and another node can pick up where the last node left off, if the Primary Node changes"); |
| |
| public static final PropertyDescriptor DIRECTORY = new Builder() |
| .name("Input Directory") |
| .description("The input directory from which files to pull files") |
| .required(true) |
| .addValidator(StandardValidators.createDirectoryExistsValidator(true, false)) |
| .expressionLanguageSupported(VARIABLE_REGISTRY) |
| .build(); |
| |
| public static final PropertyDescriptor RECURSE = new Builder() |
| .name("Recurse Subdirectories") |
| .description("Indicates whether to list files from subdirectories of the directory") |
| .required(true) |
| .allowableValues("true", "false") |
| .defaultValue("true") |
| .build(); |
| |
| public static final PropertyDescriptor DIRECTORY_LOCATION = new Builder() |
| .name("Input Directory Location") |
| .description("Specifies where the Input Directory is located. This is used to determine whether state should be stored locally or across the cluster.") |
| .allowableValues(LOCATION_LOCAL, LOCATION_REMOTE) |
| .defaultValue(LOCATION_LOCAL.getValue()) |
| .required(true) |
| .build(); |
| |
| public static final PropertyDescriptor FILE_FILTER = new Builder() |
| .name("File Filter") |
| .description("Only files whose names match the given regular expression will be picked up") |
| .required(true) |
| .defaultValue("[^\\.].*") |
| .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) |
| .build(); |
| |
| public static final PropertyDescriptor PATH_FILTER = new Builder() |
| .name("Path Filter") |
| .description("When " + RECURSE.getName() + " is true, then only subdirectories whose path matches the given regular expression will be scanned") |
| .required(false) |
| .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) |
| .build(); |
| |
| public static final PropertyDescriptor INCLUDE_FILE_ATTRIBUTES = new Builder() |
| .name("Include File Attributes") |
| .description("Whether or not to include information such as the file's Last Modified Time and Owner as FlowFile Attributes. " |
| + "Depending on the File System being used, gathering this information can be expensive and as a result should be disabled. This is especially true of remote file shares.") |
| .allowableValues("true", "false") |
| .defaultValue("true") |
| .required(true) |
| .build(); |
| |
| public static final PropertyDescriptor MIN_AGE = new Builder() |
| .name("Minimum File Age") |
| .description("The minimum age that a file must be in order to be pulled; any file younger than this amount of time (according to last modification date) will be ignored") |
| .required(true) |
| .addValidator(TIME_PERIOD_VALIDATOR) |
| .defaultValue("0 sec") |
| .build(); |
| |
| public static final PropertyDescriptor MAX_AGE = new Builder() |
| .name("Maximum File Age") |
| .description("The maximum age that a file must be in order to be pulled; any file older than this amount of time (according to last modification date) will be ignored") |
| .required(false) |
| .addValidator(StandardValidators.createTimePeriodValidator(100, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS)) |
| .build(); |
| |
| public static final PropertyDescriptor MIN_SIZE = new Builder() |
| .name("Minimum File Size") |
| .description("The minimum size that a file must be in order to be pulled") |
| .required(true) |
| .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) |
| .defaultValue("0 B") |
| .build(); |
| |
| public static final PropertyDescriptor MAX_SIZE = new Builder() |
| .name("Maximum File Size") |
| .description("The maximum size that a file can be in order to be pulled") |
| .required(false) |
| .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) |
| .build(); |
| |
| public static final PropertyDescriptor IGNORE_HIDDEN_FILES = new Builder() |
| .name("Ignore Hidden Files") |
| .description("Indicates whether or not hidden files should be ignored") |
| .allowableValues("true", "false") |
| .defaultValue("true") |
| .required(true) |
| .build(); |
| |
| public static final PropertyDescriptor TRACK_PERFORMANCE = new Builder() |
| .name("track-performance") |
| .displayName("Track Performance") |
| .description("Whether or not the Processor should track the performance of disk access operations. If true, all accesses to disk will be recorded, including the file being accessed, the " + |
| "information being obtained, and how long it takes. This is then logged periodically at a DEBUG level. While the amount of data will be capped, " + |
| "this option may still consume a significant amount of heap (controlled by the 'Maximum Number of Files to Track' property), " + |
| "but it can be very useful for troubleshooting purposes if performance is poor is degraded.") |
| .required(true) |
| .allowableValues("true", "false") |
| .defaultValue("false") |
| .build(); |
| |
| public static final PropertyDescriptor MAX_TRACKED_FILES = new Builder() |
| .name("max-performance-metrics") |
| .displayName("Maximum Number of Files to Track") |
| .description("If the 'Track Performance' property is set to 'true', this property indicates the maximum number of files whose performance metrics should be held onto. A smaller value for " + |
| "this property will result in less heap utilization, while a larger value may provide more accurate insights into how the disk access operations are performing") |
| .required(true) |
| .addValidator(POSITIVE_INTEGER_VALIDATOR) |
| .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) |
| .defaultValue("100000") |
| .build(); |
| |
| public static final PropertyDescriptor MAX_DISK_OPERATION_TIME = new Builder() |
| .name("max-operation-time") |
| .displayName("Max Disk Operation Time") |
| .description("The maximum amount of time that any single disk operation is expected to take. If any disk operation takes longer than this amount of time, a warning bulletin will be " + |
| "generated for each operation that exceeds this amount of time.") |
| .required(false) |
| .addValidator(TIME_PERIOD_VALIDATOR) |
| .expressionLanguageSupported(VARIABLE_REGISTRY) |
| .defaultValue("10 secs") |
| .build(); |
| |
| public static final PropertyDescriptor MAX_LISTING_TIME = new Builder() |
| .name("max-listing-time") |
| .displayName("Max Directory Listing Time") |
| .description("The maximum amount of time that listing any single directory is expected to take. If the listing for the directory specified by the 'Input Directory' property, " + |
| "or the listing of any subdirectory (if 'Recurse' is set to true) takes longer than this amount of time, a warning bulletin will be generated for each directory listing " + |
| "that exceeds this amount of time.") |
| .required(false) |
| .addValidator(TIME_PERIOD_VALIDATOR) |
| .expressionLanguageSupported(VARIABLE_REGISTRY) |
| .defaultValue("3 mins") |
| .build(); |
| |
| |
| private List<PropertyDescriptor> properties; |
| private Set<Relationship> relationships; |
| |
| private volatile ScheduledExecutorService monitoringThreadPool; |
| private volatile Future<?> monitoringFuture; |
| |
| private volatile boolean includeFileAttributes; |
| private volatile PerformanceTracker performanceTracker; |
| private volatile long performanceLoggingTimestamp = System.currentTimeMillis(); |
| private final AtomicReference<BiPredicate<Path, BasicFileAttributes>> fileFilterRef = new AtomicReference<>(); |
| |
| public static final String FILE_CREATION_TIME_ATTRIBUTE = "file.creationTime"; |
| public static final String FILE_LAST_MODIFY_TIME_ATTRIBUTE = "file.lastModifiedTime"; |
| public static final String FILE_LAST_ACCESS_TIME_ATTRIBUTE = "file.lastAccessTime"; |
| public static final String FILE_SIZE_ATTRIBUTE = "file.size"; |
| public static final String FILE_OWNER_ATTRIBUTE = "file.owner"; |
| public static final String FILE_GROUP_ATTRIBUTE = "file.group"; |
| public static final String FILE_PERMISSIONS_ATTRIBUTE = "file.permissions"; |
| public static final String FILE_MODIFY_DATE_ATTR_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ"; |
| |
| @Override |
| protected void init(final ProcessorInitializationContext context) { |
| final List<PropertyDescriptor> properties = new ArrayList<>(); |
| properties.add(DIRECTORY); |
| properties.add(LISTING_STRATEGY); |
| properties.add(RECURSE); |
| properties.add(RECORD_WRITER); |
| properties.add(DIRECTORY_LOCATION); |
| properties.add(FILE_FILTER); |
| properties.add(PATH_FILTER); |
| properties.add(INCLUDE_FILE_ATTRIBUTES); |
| properties.add(MIN_AGE); |
| properties.add(MAX_AGE); |
| properties.add(MIN_SIZE); |
| properties.add(MAX_SIZE); |
| properties.add(IGNORE_HIDDEN_FILES); |
| properties.add(TARGET_SYSTEM_TIMESTAMP_PRECISION); |
| properties.add(ListedEntityTracker.TRACKING_STATE_CACHE); |
| properties.add(ListedEntityTracker.TRACKING_TIME_WINDOW); |
| properties.add(ListedEntityTracker.INITIAL_LISTING_TARGET); |
| properties.add(ListedEntityTracker.NODE_IDENTIFIER); |
| properties.add(TRACK_PERFORMANCE); |
| properties.add(MAX_TRACKED_FILES); |
| properties.add(MAX_DISK_OPERATION_TIME); |
| properties.add(MAX_LISTING_TIME); |
| this.properties = Collections.unmodifiableList(properties); |
| |
| final Set<Relationship> relationships = new HashSet<>(); |
| relationships.add(REL_SUCCESS); |
| this.relationships = Collections.unmodifiableSet(relationships); |
| |
| monitoringThreadPool = Executors.newScheduledThreadPool(1, r -> { |
| final Thread t = Executors.defaultThreadFactory().newThread(r); |
| t.setName("Monitor ListFile Performance [UUID=" + context.getIdentifier() + "]"); |
| t.setDaemon(true); |
| |
| return t; |
| }); |
| } |
| |
| @Override |
| protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { |
| return properties; |
| } |
| |
| @Override |
| public Set<Relationship> getRelationships() { |
| return relationships; |
| } |
| |
| @OnScheduled |
| public void onScheduled(final ProcessContext context) { |
| fileFilterRef.set(createFileFilter(context)); |
| includeFileAttributes = context.getProperty(INCLUDE_FILE_ATTRIBUTES).asBoolean(); |
| |
| final long maxDiskOperationMillis = context.getProperty(MAX_DISK_OPERATION_TIME).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS); |
| final long maxListingMillis = context.getProperty(MAX_LISTING_TIME).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS); |
| |
| final boolean trackPerformance = context.getProperty(TRACK_PERFORMANCE).asBoolean(); |
| if (trackPerformance) { |
| final int maxEntries = context.getProperty(MAX_TRACKED_FILES).evaluateAttributeExpressions().asInteger(); |
| performanceTracker = new RollingMetricPerformanceTracker(getLogger(), maxDiskOperationMillis, maxEntries); |
| } else { |
| performanceTracker = new UntrackedPerformanceTracker(getLogger(), maxDiskOperationMillis); |
| } |
| |
| final long millisToKeepStats = TimeUnit.MINUTES.toMillis(15); |
| final MonitorActiveTasks monitorTask = new MonitorActiveTasks(performanceTracker, getLogger(), maxDiskOperationMillis, maxListingMillis, millisToKeepStats); |
| monitoringFuture = monitoringThreadPool.scheduleAtFixedRate(monitorTask, 15, 15, TimeUnit.SECONDS); |
| } |
| |
| @OnStopped |
| public void onStopped(final ProcessContext context) { |
| if (monitoringFuture != null) { |
| monitoringFuture.cancel(true); |
| } |
| |
| final boolean trackPerformance = context.getProperty(TRACK_PERFORMANCE).asBoolean(); |
| if (trackPerformance) { |
| logPerformance(); |
| } |
| } |
| |
| protected PerformanceTracker getPerformanceTracker() { |
| return performanceTracker; |
| } |
| |
| public void logPerformance() { |
| final ComponentLog logger = getLogger(); |
| if (!logger.isDebugEnabled()) { |
| return; |
| } |
| |
| final long earliestTimestamp = performanceTracker.getEarliestTimestamp(); |
| final long millis = System.currentTimeMillis() - earliestTimestamp; |
| final long seconds = TimeUnit.MILLISECONDS.toSeconds(millis); |
| |
| for (final DiskOperation operation : DiskOperation.values()) { |
| final OperationStatistics stats = performanceTracker.getOperationStatistics(operation); |
| |
| final StringBuilder sb = new StringBuilder(); |
| if (stats.getCount() == 0) { |
| sb.append("Over the past ").append(seconds).append(" seconds, for Operation '").append(operation).append("' there were no operations performed"); |
| } else { |
| sb.append("Over the past ").append(seconds).append(" seconds, For Operation '").append(operation).append("' there were ") |
| .append(stats.getCount()).append(" operations performed with an average time of ") |
| .append(stats.getAverage()).append(" milliseconds; Standard Deviation = ").append(stats.getStandardDeviation()).append(" millis; Min Time = ") |
| .append(stats.getMin()).append(" millis, Max Time = ").append(stats.getMax()).append(" millis"); |
| |
| if (logger.isDebugEnabled()) { |
| final Map<String, Long> outliers = stats.getOutliers(); |
| |
| sb.append("; ").append(stats.getOutliers().size()).append(" significant outliers: "); |
| sb.append(outliers); |
| } |
| } |
| |
| logger.debug(sb.toString()); |
| } |
| |
| performanceLoggingTimestamp = System.currentTimeMillis(); |
| } |
| |
| |
| @Override |
| protected Map<String, String> createAttributes(final FileInfo fileInfo, final ProcessContext context) { |
| final Map<String, String> attributes = new HashMap<>(); |
| |
| final String fullPath = fileInfo.getFullPathFileName(); |
| final File file = new File(fullPath); |
| final Path filePath = file.toPath(); |
| final Path directoryPath = new File(getPath(context)).toPath(); |
| |
| final Path relativePath = directoryPath.toAbsolutePath().relativize(filePath.getParent()); |
| String relativePathString = relativePath.toString(); |
| relativePathString = relativePathString.isEmpty() ? "." + File.separator : relativePathString + File.separator; |
| |
| final Path absPath = filePath.toAbsolutePath(); |
| final String absPathString = absPath.getParent().toString() + File.separator; |
| |
| final DateFormat formatter = new SimpleDateFormat(FILE_MODIFY_DATE_ATTR_FORMAT, Locale.US); |
| |
| attributes.put(CoreAttributes.PATH.key(), relativePathString); |
| attributes.put(CoreAttributes.FILENAME.key(), fileInfo.getFileName()); |
| attributes.put(CoreAttributes.ABSOLUTE_PATH.key(), absPathString); |
| attributes.put(FILE_SIZE_ATTRIBUTE, Long.toString(fileInfo.getSize())); |
| attributes.put(FILE_LAST_MODIFY_TIME_ATTRIBUTE, formatter.format(new Date(fileInfo.getLastModifiedTime()))); |
| |
| if (includeFileAttributes) { |
| final TimingInfo timingInfo = performanceTracker.getTimingInfo(relativePath.toString(), file.getName()); |
| |
| try { |
| FileStore store = Files.getFileStore(filePath); |
| |
| timingInfo.timeOperation(DiskOperation.RETRIEVE_BASIC_ATTRIBUTES, () -> { |
| if (store.supportsFileAttributeView("basic")) { |
| try { |
| BasicFileAttributeView view = Files.getFileAttributeView(filePath, BasicFileAttributeView.class); |
| BasicFileAttributes attrs = view.readAttributes(); |
| attributes.put(FILE_CREATION_TIME_ATTRIBUTE, formatter.format(new Date(attrs.creationTime().toMillis()))); |
| attributes.put(FILE_LAST_ACCESS_TIME_ATTRIBUTE, formatter.format(new Date(attrs.lastAccessTime().toMillis()))); |
| } catch (Exception ignore) { |
| } // allow other attributes if these fail |
| } |
| }); |
| |
| timingInfo.timeOperation(DiskOperation.RETRIEVE_OWNER_ATTRIBUTES, () -> { |
| if (store.supportsFileAttributeView("owner")) { |
| try { |
| FileOwnerAttributeView view = Files.getFileAttributeView(filePath, FileOwnerAttributeView.class); |
| attributes.put(FILE_OWNER_ATTRIBUTE, view.getOwner().getName()); |
| } catch (Exception ignore) { |
| } // allow other attributes if these fail |
| } |
| }); |
| |
| timingInfo.timeOperation(DiskOperation.RETRIEVE_POSIX_ATTRIBUTES, () -> { |
| if (store.supportsFileAttributeView("posix")) { |
| try { |
| PosixFileAttributeView view = Files.getFileAttributeView(filePath, PosixFileAttributeView.class); |
| attributes.put(FILE_PERMISSIONS_ATTRIBUTE, PosixFilePermissions.toString(view.readAttributes().permissions())); |
| attributes.put(FILE_GROUP_ATTRIBUTE, view.readAttributes().group().getName()); |
| } catch (Exception ignore) { |
| } // allow other attributes if these fail |
| } |
| }); |
| } catch (IOException ioe) { |
| // well then this FlowFile gets none of these attributes |
| getLogger().warn("Error collecting attributes for file {}, message is {}", new Object[] {absPathString, ioe.getMessage()}); |
| } |
| } |
| |
| return attributes; |
| } |
| |
| @Override |
| protected String getPath(final ProcessContext context) { |
| return context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue(); |
| } |
| |
| @Override |
| protected Scope getStateScope(final PropertyContext context) { |
| final String location = context.getProperty(DIRECTORY_LOCATION).getValue(); |
| if (LOCATION_REMOTE.getValue().equalsIgnoreCase(location)) { |
| return Scope.CLUSTER; |
| } |
| |
| return Scope.LOCAL; |
| } |
| |
| @Override |
| protected RecordSchema getRecordSchema() { |
| return FileInfo.getRecordSchema(); |
| } |
| |
| @Override |
| protected List<FileInfo> performListing(final ProcessContext context, final Long minTimestamp) throws IOException { |
| final Path basePath = new File(getPath(context)).toPath(); |
| final Boolean recurse = context.getProperty(RECURSE).asBoolean(); |
| final Map<Path, BasicFileAttributes> lastModifiedMap = new HashMap<>(); |
| |
| final BiPredicate<Path, BasicFileAttributes> fileFilter = fileFilterRef.get(); |
| int maxDepth = recurse ? Integer.MAX_VALUE : 1; |
| |
| final BiPredicate<Path, BasicFileAttributes> matcher = new BiPredicate<Path, BasicFileAttributes>() { |
| private long lastTimestamp = System.currentTimeMillis(); |
| |
| @Override |
| public boolean test(final Path path, final BasicFileAttributes attributes) { |
| if (!isScheduled()) { |
| throw new ProcessorStoppedException(); |
| } |
| |
| final long now = System.currentTimeMillis(); |
| final long timeToList = now - lastTimestamp; |
| lastTimestamp = now; |
| |
| final Path relativeDirectory = basePath.relativize(path).getParent(); |
| final String relativePath = relativeDirectory == null ? "" : relativeDirectory.toString(); |
| final String filename = path.getFileName().toString(); |
| performanceTracker.acceptOperation(DiskOperation.RETRIEVE_NEXT_FILE_FROM_OS, relativePath, filename, timeToList); |
| |
| final boolean isDirectory = attributes.isDirectory(); |
| if (isDirectory) { |
| performanceTracker.setActiveDirectory(relativePath); |
| } |
| |
| final TimedOperationKey operationKey = performanceTracker.beginOperation(DiskOperation.FILTER, relativePath, filename); |
| |
| try { |
| if (!isDirectory && (minTimestamp == null || attributes.lastModifiedTime().toMillis() >= minTimestamp) |
| && fileFilter.test(path, attributes)) { |
| // We store the attributes for each Path we are returning in order to avoid to |
| // retrieve them again later when creating the FileInfo |
| lastModifiedMap.put(path, attributes); |
| |
| return true; |
| } |
| |
| return false; |
| } finally { |
| performanceTracker.completeOperation(operationKey); |
| |
| if (TimeUnit.MILLISECONDS.toMinutes(System.currentTimeMillis() - performanceLoggingTimestamp) >= 5) { |
| logPerformance(); |
| } |
| } |
| } |
| }; |
| |
| try { |
| final long start = System.currentTimeMillis(); |
| final List<FileInfo> result = new LinkedList<>(); |
| |
| Files.walkFileTree(basePath, Collections.singleton(FileVisitOption.FOLLOW_LINKS), maxDepth, new FileVisitor<Path>() { |
| @Override |
| public FileVisitResult preVisitDirectory(final Path dir, final BasicFileAttributes attributes) throws IOException { |
| if (Files.isReadable(dir)) { |
| return FileVisitResult.CONTINUE; |
| } else { |
| getLogger().debug("The following directory is not readable: {}", new Object[] {dir.toString()}); |
| return FileVisitResult.SKIP_SUBTREE; |
| } |
| } |
| |
| @Override |
| public FileVisitResult visitFile(final Path path, final BasicFileAttributes attributes) throws IOException { |
| if (matcher.test(path, attributes)) { |
| final File file = path.toFile(); |
| final BasicFileAttributes fileAttributes = lastModifiedMap.get(path); |
| final FileInfo fileInfo = new FileInfo.Builder() |
| .directory(false) |
| .filename(file.getName()) |
| .fullPathFileName(file.getAbsolutePath()) |
| .lastModifiedTime(fileAttributes.lastModifiedTime().toMillis()) |
| .size(fileAttributes.size()) |
| .build(); |
| |
| result.add(fileInfo); |
| } |
| |
| return FileVisitResult.CONTINUE; |
| } |
| |
| @Override |
| public FileVisitResult visitFileFailed(final Path path, final IOException e) throws IOException { |
| if (e instanceof AccessDeniedException) { |
| getLogger().debug("The following file is not readable: {}", new Object[] {path.toString()}); |
| return FileVisitResult.SKIP_SUBTREE; |
| } else { |
| getLogger().error("Error during visiting file {}: {}", new Object[] {path.toString(), e.getMessage()}, e); |
| return FileVisitResult.TERMINATE; |
| } |
| } |
| |
| @Override |
| public FileVisitResult postVisitDirectory(final Path dir, final IOException e) throws IOException { |
| if (e != null) { |
| getLogger().error("Error during visiting directory {}: {}", new Object[] {dir.toString(), e.getMessage()}, e); |
| } |
| |
| return FileVisitResult.CONTINUE; |
| } |
| }); |
| |
| final long millis = System.currentTimeMillis() - start; |
| |
| getLogger().debug("Took {} milliseconds to perform listing and gather {} entries", new Object[] {millis, result.size()}); |
| return result; |
| } catch (final ProcessorStoppedException pse) { |
| getLogger().info("Processor was stopped so will not complete listing of Files"); |
| return Collections.emptyList(); |
| } finally { |
| performanceTracker.completeActiveDirectory(); |
| } |
| } |
| |
| @Override |
| protected boolean isListingResetNecessary(final PropertyDescriptor property) { |
| return DIRECTORY.equals(property) |
| || RECURSE.equals(property) |
| || FILE_FILTER.equals(property) |
| || PATH_FILTER.equals(property) |
| || MIN_AGE.equals(property) |
| || MAX_AGE.equals(property) |
| || MIN_SIZE.equals(property) |
| || MAX_SIZE.equals(property) |
| || IGNORE_HIDDEN_FILES.equals(property); |
| } |
| |
| private BiPredicate<Path, BasicFileAttributes> createFileFilter(final ProcessContext context) { |
| final long minSize = context.getProperty(MIN_SIZE).asDataSize(DataUnit.B).longValue(); |
| final Double maxSize = context.getProperty(MAX_SIZE).asDataSize(DataUnit.B); |
| final long minAge = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS); |
| final Long maxAge = context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS); |
| final boolean ignoreHidden = context.getProperty(IGNORE_HIDDEN_FILES).asBoolean(); |
| final String fileFilter = context.getProperty(FILE_FILTER).getValue(); |
| final Pattern filePattern = Pattern.compile(fileFilter); |
| final String indir = context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue(); |
| final boolean recurseDirs = context.getProperty(RECURSE).asBoolean(); |
| final String pathPatternStr = context.getProperty(PATH_FILTER).getValue(); |
| final Pattern pathPattern = (!recurseDirs || pathPatternStr == null) ? null : Pattern.compile(pathPatternStr); |
| |
| final Path basePath = Paths.get(indir); |
| |
| return (path, attributes) -> { |
| if (minSize > attributes.size()) { |
| return false; |
| } |
| if (maxSize != null && maxSize < attributes.size()) { |
| return false; |
| } |
| final long fileAge = System.currentTimeMillis() - attributes.lastModifiedTime().toMillis(); |
| if (minAge > fileAge) { |
| return false; |
| } |
| if (maxAge != null && maxAge < fileAge) { |
| return false; |
| } |
| |
| final Path relativePath = basePath.relativize(path).getParent(); |
| final String relativeDir = relativePath == null ? "" : relativePath.toString(); |
| final String filename = path.getFileName().toString(); |
| final TimingInfo timingInfo = performanceTracker.getTimingInfo(relativeDir, filename); |
| |
| final File file = path.toFile(); |
| |
| if (pathPattern != null) { |
| if (relativePath != null && !relativePath.toString().isEmpty()) { |
| if (!pathPattern.matcher(relativePath.toString()).matches()) { |
| return false; |
| } |
| } |
| } |
| |
| final boolean matchesFilter = filePattern.matcher(filename).matches(); |
| if (!matchesFilter) { |
| return false; |
| } |
| |
| // Verify that we have at least read permissions on the file we're considering grabbing |
| if (!timingInfo.timeOperation(DiskOperation.CHECK_READABLE, () -> Files.isReadable(path))) { |
| return false; |
| } |
| |
| if (ignoreHidden && timingInfo.timeOperation(DiskOperation.CHECK_HIDDEN, file::isHidden)) { |
| return false; |
| } |
| |
| return true; |
| }; |
| } |
| |
| /** |
| * A PerformanceTracker that is capable of tracking which disk access operation is active and which directory is actively being listed, |
| * as well as timing specific operations, but does not track metrics over any amount of time. This implementation does not provide the ability |
| * to glean information such as which operations or files are taking the longest to operate on but uses very little heap. |
| */ |
| public static class UntrackedPerformanceTracker implements PerformanceTracker { |
| private TimedOperationKey activeOperation = null; |
| private String activeDirectory; |
| private long activeDirectoryStartTime = -1L; |
| |
| private final ComponentLog logger; |
| private final long maxDiskOperationMillis; |
| |
| public UntrackedPerformanceTracker(final ComponentLog logger, final long maxDiskOperationMillis) { |
| this.logger = logger; |
| this.maxDiskOperationMillis = maxDiskOperationMillis; |
| } |
| |
| @Override |
| public TimedOperationKey beginOperation(final DiskOperation operation, final String directory, final String filename) { |
| return null; |
| } |
| |
| @Override |
| public void completeOperation(final TimedOperationKey operationKey) { |
| } |
| |
| @Override |
| public void acceptOperation(final DiskOperation operation, final String directory, final String filename, final long millis) { |
| } |
| |
| @Override |
| public TimingInfo getTimingInfo(final String directory, final String filename) { |
| return new TimingInfo(directory, filename, this, logger, maxDiskOperationMillis); |
| } |
| |
| @Override |
| public OperationStatistics getOperationStatistics(final DiskOperation operation) { |
| return OperationStatistics.EMPTY; |
| } |
| |
| @Override |
| public synchronized void setActiveOperation(final TimedOperationKey operationKey) { |
| this.activeOperation = operationKey; |
| } |
| |
| @Override |
| public synchronized void completeActiveOperation() { |
| this.activeOperation = null; |
| } |
| |
| @Override |
| public synchronized TimedOperationKey getActiveOperation() { |
| return activeOperation; |
| } |
| |
| @Override |
| public void purgeTimingInfo(final long cutoff) { |
| } |
| |
| @Override |
| public long getEarliestTimestamp() { |
| return System.currentTimeMillis(); |
| } |
| |
| @Override |
| public synchronized void setActiveDirectory(final String directory) { |
| activeDirectory = directory; |
| activeDirectoryStartTime = System.currentTimeMillis(); |
| } |
| |
| @Override |
| public synchronized void completeActiveDirectory() { |
| activeDirectory = null; |
| activeDirectoryStartTime = -1L; |
| } |
| |
| @Override |
| public synchronized long getActiveDirectoryStartTime() { |
| return activeDirectoryStartTime; |
| } |
| |
| @Override |
| public synchronized String getActiveDirectory() { |
| return activeDirectory; |
| } |
| |
| @Override |
| public int getTrackedFileCount() { |
| return 0; |
| } |
| } |
| |
| /** |
| * Tracks metrics using a rolling window of time, in which older metrics are 'aged off' by calling {@link #purgeTimingInfo(long)}. Tracking these metrics allows information |
| * to be gleaned, such as which files are expensive to operate on or which operations are most expensive. However, the heap utilization is significant. |
| */ |
| public static final class RollingMetricPerformanceTracker implements PerformanceTracker { |
| private final Map<String, String> directoryCanonicalization = new HashMap<>(); |
| private final Map<Tuple<String, String>, TimingInfo> directoryToTimingInfo; |
| private TimedOperationKey activeOperation; |
| private long earliestTimestamp = System.currentTimeMillis(); |
| private final long maxDiskOperationMillis; |
| private final ComponentLog logger; |
| |
| private String activeDirectory; |
| private long activeDirectoryStartTime = -1L; |
| |
| public RollingMetricPerformanceTracker(final ComponentLog logger, final long maxDiskOperationMillis, final int maxEntries) { |
| this.logger = logger; |
| this.maxDiskOperationMillis = maxDiskOperationMillis; |
| |
| directoryToTimingInfo = new LinkedHashMap<Tuple<String, String>, TimingInfo>() { |
| @Override |
| protected boolean removeEldestEntry(final Map.Entry<Tuple<String, String>, TimingInfo> eldest) { |
| return size() > maxEntries; |
| } |
| }; |
| } |
| |
| @Override |
| public synchronized TimedOperationKey beginOperation(final DiskOperation operation, final String directory, final String filename) { |
| return new TimedOperationKey(operation, directory, filename, System.currentTimeMillis()); |
| } |
| |
| @Override |
| public synchronized void completeOperation(final TimedOperationKey operationKey) { |
| final TimingInfo timingInfo = getTimingInfo(operationKey.getDirectory(), operationKey.getFilename()); |
| timingInfo.accept(operationKey.getOperation(), System.currentTimeMillis() - operationKey.getStartTime()); |
| } |
| |
| @Override |
| public synchronized void acceptOperation(final DiskOperation operation, final String directory, final String filename, final long millis) { |
| final String canonicalDirectory = directoryCanonicalization.computeIfAbsent(directory, key -> directory); |
| final Tuple<String, String> key = new Tuple<>(canonicalDirectory, filename); |
| final TimingInfo timingInfo = directoryToTimingInfo.computeIfAbsent(key, k -> new TimingInfo(directory, filename, this, logger, maxDiskOperationMillis)); |
| timingInfo.accept(operation, millis); |
| } |
| |
| @Override |
| public synchronized TimingInfo getTimingInfo(final String directory, final String filename) { |
| final String canonicalDirectory = directoryCanonicalization.computeIfAbsent(directory, key -> directory); |
| final Tuple<String, String> key = new Tuple<>(canonicalDirectory, filename); |
| final TimingInfo timingInfo = directoryToTimingInfo.computeIfAbsent(key, k -> new TimingInfo(directory, filename, this, logger, maxDiskOperationMillis)); |
| |
| return timingInfo; |
| } |
| |
| @Override |
| public void setActiveOperation(final TimedOperationKey activeOperation) { |
| this.activeOperation = activeOperation; |
| } |
| |
| @Override |
| public void completeActiveOperation() { |
| this.activeOperation = null; |
| } |
| |
| @Override |
| public synchronized TimedOperationKey getActiveOperation() { |
| return activeOperation; |
| } |
| |
| @Override |
| public synchronized void setActiveDirectory(final String directory) { |
| activeDirectory = directory; |
| activeDirectoryStartTime = System.currentTimeMillis(); |
| } |
| |
| @Override |
| public synchronized void completeActiveDirectory() { |
| activeDirectory = null; |
| activeDirectoryStartTime = -1L; |
| } |
| |
| @Override |
| public synchronized long getActiveDirectoryStartTime() { |
| return activeDirectoryStartTime; |
| } |
| |
| @Override |
| public synchronized String getActiveDirectory() { |
| return activeDirectory; |
| } |
| |
| @Override |
| public synchronized int getTrackedFileCount() { |
| return directoryToTimingInfo.size(); |
| } |
| |
| @Override |
| public synchronized void purgeTimingInfo(final long cutoff) { |
| logger.debug("Purging any entries from Performance Tracker that is older than {}", new Object[] {new Date(cutoff)}); |
| final Iterator<Map.Entry<Tuple<String, String>, TimingInfo>> itr = directoryToTimingInfo.entrySet().iterator(); |
| |
| int purgedCount = 0; |
| long earliestTimestamp = System.currentTimeMillis(); |
| while (itr.hasNext()) { |
| final Map.Entry<Tuple<String, String>, TimingInfo> entry = itr.next(); |
| final TimingInfo timingInfo = entry.getValue(); |
| final long creationTime = timingInfo.getCreationTimestamp(); |
| |
| if (creationTime < cutoff) { |
| itr.remove(); |
| purgedCount++; |
| |
| directoryCanonicalization.remove(entry.getKey().getKey()); |
| } else { |
| earliestTimestamp = Math.min(earliestTimestamp, creationTime); |
| } |
| } |
| |
| this.earliestTimestamp = earliestTimestamp; |
| logger.debug("Purged {} entries from Performance Tracker; now holding {} entries", new Object[] {purgedCount, directoryToTimingInfo.size()}); |
| } |
| |
| public long getEarliestTimestamp() { |
| return earliestTimestamp; |
| } |
| |
| public synchronized OperationStatistics getOperationStatistics(final DiskOperation operation) { |
| long count = 0L; |
| long sum = 0L; |
| long min = 0L; |
| long max = 0L; |
| |
| // Calculate min/max/mean |
| for (final TimingInfo timingInfo : directoryToTimingInfo.values()) { |
| final long operationTime = timingInfo.getOperationTime(operation); |
| |
| if (operationTime < 0) { // operation not conducted |
| continue; |
| } |
| |
| sum += operationTime; |
| |
| if (count++ == 0) { |
| min = operationTime; |
| max = operationTime; |
| } else { |
| min = Math.min(min, operationTime); |
| max = Math.max(max, operationTime); |
| } |
| } |
| |
| if (count == 0) { |
| return OperationStatistics.EMPTY; |
| } |
| |
| double average = (double) sum / (double) count; |
| |
| // Calculate Standard Deviation |
| final double stdDeviation = calculateStdDev(average, (double) count, operation); |
| final double outlierCutoff = average + 2 * stdDeviation; |
| |
| final Map<String, Long> outliers = new HashMap<>(); |
| for (final TimingInfo timingInfo : directoryToTimingInfo.values()) { |
| final long operationTime = timingInfo.getOperationTime(operation); |
| |
| if (operationTime > 2 && operationTime > outlierCutoff) { |
| final String directory = timingInfo.getDirectory(); |
| final String filename = timingInfo.getFilename(); |
| final String fullPath = directory.endsWith("/") ? directory + filename : directory + "/" + filename; |
| outliers.put(fullPath, operationTime); |
| } |
| } |
| |
| return new StandardOperationStatistics(min, max, count, average, stdDeviation, outliers); |
| } |
| |
| private double calculateStdDev(final double average, final double count, final DiskOperation operation) { |
| double squaredDifferenceSum = 0D; |
| for (final TimingInfo timingInfo : directoryToTimingInfo.values()) { |
| final long operationTime = timingInfo.getOperationTime(operation); |
| if (operationTime < 0) { |
| continue; |
| } |
| |
| final double differenceSquared = Math.pow(((double) operationTime - average), 2); |
| squaredDifferenceSum += differenceSquared; |
| } |
| |
| final double squaredDifferenceAverage = squaredDifferenceSum / count; |
| final double stdDeviation = Math.pow(squaredDifferenceAverage, 0.5); |
| return stdDeviation; |
| } |
| } |
| |
| /** |
| * Provides a mechanism for timing how long a particular operation takes to complete, logging if it takes longer than the configured threshold. |
| */ |
| private static class TimingInfo { |
| private final String directory; |
| private final String filename; |
| private final int[] operationTimes; |
| private final PerformanceTracker tracker; |
| private final long creationTimestamp; |
| private final ComponentLog logger; |
| private final long maxDiskOperationMillis; |
| |
| public TimingInfo(final String directory, final String filename, final PerformanceTracker tracker, final ComponentLog logger, final long maxDiskOperationMillis) { |
| this.directory = directory; |
| this.filename = filename; |
| this.tracker = tracker; |
| this.logger = logger; |
| this.maxDiskOperationMillis = maxDiskOperationMillis; |
| |
| this.creationTimestamp = System.currentTimeMillis(); |
| |
| operationTimes = new int[DiskOperation.values().length]; |
| Arrays.fill(operationTimes, -1); |
| } |
| |
| public String getDirectory() { |
| return directory; |
| } |
| |
| public String getFilename() { |
| return filename; |
| } |
| |
| public void accept(final DiskOperation operation, final long duration) { |
| operationTimes[operation.ordinal()] = (int) duration; |
| |
| if (duration > maxDiskOperationMillis) { |
| final String fullPath = getFullPath(); |
| logger.warn("This Processor completed action {} on {} in {} milliseconds, which exceeds the configured threshold of {} milliseconds", |
| new Object[] {operation, fullPath, duration, maxDiskOperationMillis}); |
| } |
| |
| if (logger.isTraceEnabled()) { |
| logger.trace("Performing operation {} on {} took {} milliseconds", new Object[] {operation, getFullPath(), duration}); |
| } |
| } |
| |
| private String getFullPath() { |
| if (directory.isEmpty()) { |
| return filename; |
| } else { |
| return directory.endsWith("/") ? directory + filename : directory + "/" + filename; |
| } |
| } |
| |
| public long getOperationTime(final DiskOperation operation) { |
| return operationTimes[operation.ordinal()]; |
| } |
| |
| private <T> T timeOperation(final DiskOperation operation, final Supplier<T> function) { |
| final long start = System.currentTimeMillis(); |
| final TimedOperationKey operationKey = new TimedOperationKey(operation, directory, filename, start); |
| tracker.setActiveOperation(operationKey); |
| |
| try { |
| final T value = function.get(); |
| final long millis = System.currentTimeMillis() - start; |
| accept(operation, millis); |
| return value; |
| } finally { |
| tracker.completeActiveOperation(); |
| } |
| } |
| |
| private void timeOperation(final DiskOperation operation, final Runnable task) { |
| final long start = System.currentTimeMillis(); |
| final TimedOperationKey operationKey = new TimedOperationKey(operation, directory, filename, start); |
| tracker.setActiveOperation(operationKey); |
| |
| try { |
| task.run(); |
| final long millis = System.currentTimeMillis() - start; |
| accept(operation, millis); |
| } finally { |
| tracker.completeActiveOperation(); |
| } |
| } |
| |
| public long getCreationTimestamp() { |
| return creationTimestamp; |
| } |
| } |
| |
| /** |
| * PerformanceTracker is responsible for providing a mechanism by which any disk operation can be timed and the timing information |
| * can both be used to issue warnings as well as be aggregated for some amount of time, in order to understand how long certain disk operations |
| * take and which files may be responsible for causing longer-than-usual operations to be performed. |
| */ |
| interface PerformanceTracker { |
| TimedOperationKey beginOperation(DiskOperation operation, String directory, String filename); |
| |
| void completeOperation(TimedOperationKey operationKey); |
| |
| void acceptOperation(DiskOperation operation, String directory, String filename, long millis); |
| |
| TimingInfo getTimingInfo(String directory, String filename); |
| |
| OperationStatistics getOperationStatistics(DiskOperation operation); |
| |
| void setActiveOperation(TimedOperationKey operationKey); |
| |
| void completeActiveOperation(); |
| |
| TimedOperationKey getActiveOperation(); |
| |
| void purgeTimingInfo(long cutoff); |
| |
| long getEarliestTimestamp(); |
| |
| void setActiveDirectory(String directory); |
| |
| void completeActiveDirectory(); |
| |
| String getActiveDirectory(); |
| |
| long getActiveDirectoryStartTime(); |
| |
| int getTrackedFileCount(); |
| } |
| |
| |
| interface OperationStatistics { |
| long getMin(); |
| long getMax(); |
| long getCount(); |
| double getAverage(); |
| double getStandardDeviation(); |
| |
| Map<String, Long> getOutliers(); |
| |
| OperationStatistics EMPTY = new OperationStatistics() { |
| @Override |
| public long getMin() { |
| return 0; |
| } |
| |
| @Override |
| public long getMax() { |
| return 0; |
| } |
| |
| @Override |
| public long getCount() { |
| return 0; |
| } |
| |
| @Override |
| public double getAverage() { |
| return 0; |
| } |
| |
| @Override |
| public double getStandardDeviation() { |
| return 0; |
| } |
| |
| @Override |
| public Map<String, Long> getOutliers() { |
| return Collections.emptyMap(); |
| } |
| }; |
| } |
| |
| private static class StandardOperationStatistics implements OperationStatistics { |
| private final long min; |
| private final long max; |
| private final long count; |
| private final double average; |
| private final double stdDev; |
| private final Map<String, Long> outliers; |
| |
| public StandardOperationStatistics(final long min, final long max, final long count, final double average, final double stdDev, final Map<String, Long> outliers) { |
| this.min = min; |
| this.max = max; |
| this.count = count; |
| this.average = average; |
| this.stdDev = stdDev; |
| this.outliers = outliers; |
| } |
| |
| public long getMin() { |
| return min; |
| } |
| |
| public long getMax() { |
| return max; |
| } |
| |
| public long getCount() { |
| return count; |
| } |
| |
| public double getAverage() { |
| return average; |
| } |
| |
| public double getStandardDeviation() { |
| return stdDev; |
| } |
| |
| public Map<String, Long> getOutliers() { |
| return outliers; |
| } |
| } |
| |
| |
| private static class TimedOperationKey { |
| private final DiskOperation operation; |
| private final String directory; |
| private final String filename; |
| private final long startTime; |
| |
| public TimedOperationKey(final DiskOperation operation, final String directory, final String filename, final long startTime) { |
| this.operation = operation; |
| this.startTime = startTime; |
| this.directory = directory; |
| this.filename = filename; |
| } |
| |
| public DiskOperation getOperation() { |
| return operation; |
| } |
| |
| public String getDirectory() { |
| return directory; |
| } |
| |
| public String getFilename() { |
| return filename; |
| } |
| |
| public long getStartTime() { |
| return startTime; |
| } |
| } |
| |
| private enum DiskOperation { |
| RETRIEVE_BASIC_ATTRIBUTES, |
| RETRIEVE_OWNER_ATTRIBUTES, |
| RETRIEVE_POSIX_ATTRIBUTES, |
| CHECK_HIDDEN, |
| CHECK_READABLE, |
| FILTER, |
| RETRIEVE_NEXT_FILE_FROM_OS; |
| } |
| |
| private static class ProcessorStoppedException extends RuntimeException { |
| } |
| |
| static class MonitorActiveTasks implements Runnable { |
| private final PerformanceTracker performanceTracker; |
| private final ComponentLog logger; |
| private final long maxDiskOperationMillis; |
| private final long maxListingMillis; |
| private final long millisToKeepStats; |
| private long lastPurgeTimestamp = 0L; |
| |
| public MonitorActiveTasks(final PerformanceTracker tracker, final ComponentLog logger, final long maxDiskOperationMillis, final long maxListingMillis, final long millisToKeepStats) { |
| this.performanceTracker = tracker; |
| this.logger = logger; |
| this.maxDiskOperationMillis = maxDiskOperationMillis; |
| this.maxListingMillis = maxListingMillis; |
| this.millisToKeepStats = millisToKeepStats; |
| } |
| |
| @Override |
| public void run() { |
| monitorActiveOperation(); |
| monitorActiveDirectory(); |
| |
| final long now = System.currentTimeMillis(); |
| final long millisSincePurge = now - lastPurgeTimestamp; |
| if (millisSincePurge > TimeUnit.SECONDS.toMillis(60)) { |
| performanceTracker.purgeTimingInfo(now - millisToKeepStats); |
| lastPurgeTimestamp = System.currentTimeMillis(); |
| } |
| } |
| |
| private void monitorActiveOperation() { |
| final TimedOperationKey activeOperation = performanceTracker.getActiveOperation(); |
| if (activeOperation == null) { |
| return; |
| } |
| |
| final long activeTime = System.currentTimeMillis() - activeOperation.getStartTime(); |
| if (activeTime > maxDiskOperationMillis) { |
| final String directory = activeOperation.getDirectory(); |
| final String filename = activeOperation.getFilename(); |
| |
| final String fullPath; |
| if (directory.isEmpty()) { |
| fullPath = filename; |
| } else { |
| fullPath = directory.endsWith("/") ? directory + filename : directory + "/" + filename; |
| } |
| |
| logger.warn("This Processor has currently spent {} milliseconds performing the {} action on {}, which exceeds the configured threshold of {} milliseconds", |
| new Object[] {activeTime, activeOperation.getOperation(), fullPath, maxDiskOperationMillis}); |
| } |
| } |
| |
| private void monitorActiveDirectory() { |
| final String activeDirectory = performanceTracker.getActiveDirectory(); |
| final long startTime = performanceTracker.getActiveDirectoryStartTime(); |
| if (startTime <= 0) { |
| return; |
| } |
| |
| final long activeMillis = System.currentTimeMillis() - startTime; |
| if (activeMillis > maxListingMillis) { |
| final String fullPath = activeDirectory.isEmpty() ? "the base directory" : activeDirectory; |
| logger.warn("This processor has currently spent {} milliseconds performing the listing of {}, which exceeds the configured threshold of {} milliseconds", |
| new Object[] {activeMillis, fullPath, maxListingMillis}); |
| } |
| } |
| } |
| } |