| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.nifi.processor.util.list; |
| |
| import com.fasterxml.jackson.core.JsonParseException; |
| import com.fasterxml.jackson.databind.JsonMappingException; |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| import org.apache.nifi.annotation.behavior.Stateful; |
| import org.apache.nifi.annotation.behavior.TriggerSerially; |
| import org.apache.nifi.annotation.lifecycle.OnScheduled; |
| import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange; |
| import org.apache.nifi.annotation.notification.PrimaryNodeState; |
| import org.apache.nifi.components.AllowableValue; |
| import org.apache.nifi.components.ConfigVerificationResult; |
| import org.apache.nifi.components.ConfigVerificationResult.Outcome; |
| import org.apache.nifi.components.PropertyDescriptor; |
| import org.apache.nifi.components.PropertyDescriptor.Builder; |
| import org.apache.nifi.components.ValidationContext; |
| import org.apache.nifi.components.ValidationResult; |
| import org.apache.nifi.components.state.Scope; |
| import org.apache.nifi.components.state.StateManager; |
| import org.apache.nifi.components.state.StateMap; |
| import org.apache.nifi.context.PropertyContext; |
| import org.apache.nifi.distributed.cache.client.Deserializer; |
| import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; |
| import org.apache.nifi.distributed.cache.client.Serializer; |
| import org.apache.nifi.distributed.cache.client.exception.DeserializationException; |
| import org.apache.nifi.distributed.cache.client.exception.SerializationException; |
| import org.apache.nifi.flowfile.FlowFile; |
| import org.apache.nifi.logging.ComponentLog; |
| import org.apache.nifi.processor.AbstractProcessor; |
| import org.apache.nifi.processor.ProcessContext; |
| import org.apache.nifi.processor.ProcessSession; |
| import org.apache.nifi.processor.Relationship; |
| import org.apache.nifi.processor.VerifiableProcessor; |
| import org.apache.nifi.processor.exception.ProcessException; |
| import org.apache.nifi.schema.access.SchemaNotFoundException; |
| import org.apache.nifi.serialization.RecordSetWriter; |
| import org.apache.nifi.serialization.RecordSetWriterFactory; |
| import org.apache.nifi.serialization.WriteResult; |
| import org.apache.nifi.serialization.record.RecordSchema; |
| import org.apache.nifi.util.StringUtils; |
| |
| import java.io.File; |
| import java.io.FileInputStream; |
| import java.io.IOException; |
| import java.io.OutputStream; |
| import java.nio.charset.StandardCharsets; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Optional; |
| import java.util.Properties; |
| import java.util.Set; |
| import java.util.TreeMap; |
| import java.util.concurrent.TimeUnit; |
| import java.util.stream.Collectors; |
| |
| /** |
| * <p> |
| * An Abstract Processor that is intended to simplify the coding required in order to perform Listing operations of remote or local resources. |
| * Those resources may be files, "objects", "messages", or any other sort of entity that may need to be listed in such a way that |
| * we identity the entity only once. Each of these objects, messages, etc. is referred to as an "entity" for the scope of this Processor. |
| * </p> |
| * <p> |
| * This class is responsible for triggering the listing to occur, filtering the results returned such that only new (unlisted) entities |
| * or entities that have been modified will be emitted from the Processor. |
| * </p> |
| * <p> |
| * In order to make use of this abstract class, the entities listed must meet the following criteria: |
| * </p> |
| * <ul> |
| * <li> |
| * Entity must have a timestamp associated with it. This timestamp is used to determine if entities are "new" or not. Any entity that is |
| * returned by the listing will be considered "new" if the timestamp is later than the latest timestamp pulled. |
| * </li> |
| * <li> |
| * If the timestamp of an entity is before OR equal to the latest timestamp pulled, then the entity is not considered new. If the timestamp is later |
| * than the last timestamp pulled, then the entity is considered new. |
| * </li> |
| * <li> |
| * With 'Tracking Entities' strategy, the size of entity content is also used to determine if an entity is "new". If the size changes the entity is considered "new". |
| * </li> |
| * <li> |
| * Entity must have a user-readable name that can be used for logging purposes. |
| * </li> |
| * </ul> |
| * <p> |
| * This class persists state across restarts so that even if NiFi is restarted, duplicates will not be pulled from the target system given the above criteria. This is |
| * performed using the {@link StateManager}. This allows the system to be restarted and begin processing where it left off. The state that is stored is the latest timestamp |
| * that has been pulled (as determined by the timestamps of the entities that are returned). See the section above for information about how this information isused in order to |
| * determine new entities. |
| * </p> |
| * <p> |
| * NOTE: This processor performs migrations of legacy state mechanisms inclusive of locally stored, file-based state and the optional utilization of the <code>Distributed Cache |
| * Service</code> property to the new {@link StateManager} functionality. Upon successful migration, the associated data from one or both of the legacy mechanisms is purged. |
| * </p> |
| * |
| * <p> |
| * For each new entity that is listed, the Processor will send a FlowFile to the 'success' relationship. The FlowFile will have no content but will have some set |
| * of attributes (defined by the concrete implementation) that can be used to fetch those resources or interact with them in whatever way makes sense for |
| * the configured dataflow. |
| * </p> |
| * <p> |
| * Subclasses are responsible for the following: |
| * </p> |
| * <ul> |
| * <li> |
| * Perform a listing of resources. The subclass will implement the {@link #performListing(ProcessContext, Long, ListingMode)} method, which creates a listing of all |
| * entities on the target system that have timestamps later than the provided timestamp. If the entities returned have a timestamp before the provided one, those |
| * entities will be filtered out. It is therefore not necessary to perform the filtering of timestamps but is provided in order to give the implementation the ability |
| * to filter those resources on the server side rather than pulling back all of the information, if it makes sense to do so in the concrete implementation. |
| * </li> |
| * <li> |
| * Creating a Map of attributes that are applicable for an entity. The attributes that are assigned to each FlowFile are exactly those returned by the |
| * {@link #createAttributes(ListableEntity, ProcessContext)}. |
| * </li> |
| * <li> |
| * Returning the configured path. Many resources can be comprised of a "path" (or a "container" or "bucket", etc.) as well as name or identifier that is unique only |
| * within that path. The {@link #getPath(ProcessContext)} method is responsible for returning the path that is currently being polled for entities. If this does concept |
| * does not apply for the concrete implementation, it is recommended that the concrete implementation return "." or "/" for all invocations of this method. |
| * </li> |
| * <li> |
| * Determining when the listing must be cleared. It is sometimes necessary to clear state about which entities have already been ingested, as the result of a user |
| * changing a property value. The {@link #isListingResetNecessary(PropertyDescriptor)} method is responsible for determining when the listing needs to be reset by returning |
| * a boolean indicating whether or not a change in the value of the provided property should trigger the timestamp and identifier information to be cleared. |
| * </li> |
| * <li> |
| * Provide the target system timestamp precision. By either letting user to choose the right one by adding TARGET_SYSTEM_TIMESTAMP_PRECISION to the return value of |
| * getSupportedPropertyDescriptors method or, overriding getDefaultTimePrecision method in case the target system has a fixed time precision. |
| * </li> |
| * </ul> |
| */ |
| @TriggerSerially |
| @Stateful(scopes = {Scope.LOCAL, Scope.CLUSTER}, description = "After a listing of resources is performed, the latest timestamp of any of the resources is stored in the component's state. " |
| + "The scope used depends on the implementation.") |
| public abstract class AbstractListProcessor<T extends ListableEntity> extends AbstractProcessor implements VerifiableProcessor { |
| |
| /** |
| * Indicates the mode when performing a listing. |
| */ |
| protected enum ListingMode { |
| /** |
| * Indicates the listing is being performed during normal processor execution. May use configuration cached in the Processor object. |
| */ |
| EXECUTION, |
| /** |
| * Indicates the listing is being performed during configuration verification. Only use configuration provided in the ProcessContext argument, since the configuration may not |
| * have been applied to the processor yet. |
| */ |
| CONFIGURATION_VERIFICATION |
| } |
| |
| private static final Long IGNORE_MIN_TIMESTAMP_VALUE = 0L; |
| |
| public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new Builder() |
| .name("Distributed Cache Service") |
| .description("NOTE: This property is used merely for migration from old NiFi version before state management was introduced at version 0.5.0. " |
| + "The stored value in the cache service will be migrated into the state when this processor is started at the first time. " |
| + "The specified Controller Service was used to maintain state about what had been pulled from the remote server so that if a new node " |
| + "begins pulling data, it won't duplicate all of the work that has been done. If not specified, the information was not shared across the cluster. " |
| + "This property did not need to be set for standalone instances of NiFi but was supposed to be configured if NiFi had been running within a cluster.") |
| .required(false) |
| .identifiesControllerService(DistributedMapCacheClient.class) |
| .build(); |
| |
| public static final AllowableValue PRECISION_AUTO_DETECT = new AllowableValue("auto-detect", "Auto Detect", |
| "Automatically detect time unit deterministically based on candidate entries timestamp." |
| + " Please note that this option may take longer to list entities unnecessarily, if none of entries has a precise precision timestamp." |
| + " E.g. even if a target system supports millis, if all entries only have timestamps without millis, such as '2017-06-16 09:06:34.000', " |
| + "then its precision is determined as 'seconds'."); |
| public static final AllowableValue PRECISION_MILLIS = new AllowableValue("millis", "Milliseconds", |
| "This option provides the minimum latency for an entry from being available to being listed if target system supports millis, if not, use other options."); |
| public static final AllowableValue PRECISION_SECONDS = new AllowableValue("seconds", "Seconds", |
| "For a target system that does not have millis precision, but has in seconds."); |
| public static final AllowableValue PRECISION_MINUTES = new AllowableValue("minutes", "Minutes", "For a target system that only supports precision in minutes."); |
| |
| public static final PropertyDescriptor TARGET_SYSTEM_TIMESTAMP_PRECISION = new Builder() |
| .name("target-system-timestamp-precision") |
| .displayName("Target System Timestamp Precision") |
| .description("Specify timestamp precision at the target system." |
| + " Since this processor uses timestamp of entities to decide which should be listed, it is crucial to use the right timestamp precision.") |
| .required(true) |
| .allowableValues(PRECISION_AUTO_DETECT, PRECISION_MILLIS, PRECISION_SECONDS, PRECISION_MINUTES) |
| .defaultValue(PRECISION_AUTO_DETECT.getValue()) |
| .build(); |
| |
| public static final Relationship REL_SUCCESS = new Relationship.Builder() |
| .name("success") |
| .description("All FlowFiles that are received are routed to success") |
| .build(); |
| |
| public static final AllowableValue BY_TIMESTAMPS = new AllowableValue("timestamps", "Tracking Timestamps", |
| "This strategy tracks the latest timestamp of listed entity to determine new/updated entities." + |
| " Since it only tracks few timestamps, it can manage listing state efficiently." + |
| " However, any newly added, or updated entity having timestamp older than the tracked latest timestamp can not be picked by this strategy." + |
| " For example, such situation can happen in a file system if a file with old timestamp" + |
| " is copied or moved into the target directory without its last modified timestamp being updated." + |
| " Also may miss files when multiple subdirectories are being written at the same time while listing is running."); |
| |
| public static final AllowableValue BY_ENTITIES = new AllowableValue("entities", "Tracking Entities", |
| "This strategy tracks information of all the listed entities within the latest 'Entity Tracking Time Window' to determine new/updated entities." + |
| " This strategy can pick entities having old timestamp that can be missed with 'Tracking Timestamps'." + |
| " Works even when multiple subdirectories are being written at the same time while listing is running." + |
| " However additional DistributedMapCache controller service is required and more JVM heap memory is used." + |
| " See the description of 'Entity Tracking Time Window' property for further details on how it works."); |
| |
| public static final AllowableValue NO_TRACKING = new AllowableValue("none", "No Tracking", |
| "This strategy lists an entity without any tracking. The same entity will be listed each time" + |
| " on executing this processor. It is recommended to change the default run schedule value." + |
| " Any property that related to the persisting state will be disregarded."); |
| |
| public static final AllowableValue BY_TIME_WINDOW = new AllowableValue("time-window", "Time Window", |
| "This strategy uses a sliding time window. The window starts where the previous window ended and ends with the 'current time'." + |
| " One cycle will list files with modification time falling within the time window." + |
| " Works even when multiple subdirectories are being written at the same time while listing is running." + |
| " IMPORTANT: This strategy works properly only if the time on both the system hosting NiFi and the one hosting the files" + |
| " are accurate."); |
| |
| public static final PropertyDescriptor LISTING_STRATEGY = new Builder() |
| .name("listing-strategy") |
| .displayName("Listing Strategy") |
| .description("Specify how to determine new/updated entities. See each strategy descriptions for detail.") |
| .required(true) |
| .allowableValues(BY_TIMESTAMPS, BY_ENTITIES, NO_TRACKING) |
| .defaultValue(BY_TIMESTAMPS.getValue()) |
| .build(); |
| |
| public static final PropertyDescriptor RECORD_WRITER = new Builder() |
| .name("record-writer") |
| .displayName("Record Writer") |
| .description("Specifies the Record Writer to use for creating the listing. If not specified, one FlowFile will be created for each entity that is listed. " + |
| "If the Record Writer is specified, all entities will be written to a single FlowFile instead of adding attributes to individual FlowFiles.") |
| .required(false) |
| .identifiesControllerService(RecordSetWriterFactory.class) |
| .build(); |
| |
| /** |
| * Represents the timestamp of an entity which was the latest one within those listed at the previous cycle. |
| * It does not necessary mean it has been processed as well. |
| * Whether it was processed or not depends on target system time precision and how old the entity timestamp was. |
| */ |
| private volatile Long lastListedLatestEntryTimestampMillis = null; |
| /** |
| * Represents the timestamp of an entity which was the latest one |
| * within those picked up and written to the output relationship at the previous cycle. |
| */ |
| private volatile Long lastProcessedLatestEntryTimestampMillis = 0L; |
| private volatile Long lastRunTimeNanos = 0L; |
| private volatile boolean justElectedPrimaryNode = false; |
| private volatile boolean resetState = false; |
| private volatile boolean resetEntityTrackingState = false; |
| private volatile List<String> latestIdentifiersProcessed = new ArrayList<>(); |
| private volatile ListedEntityTracker<T> listedEntityTracker; |
| |
| /* |
| * A constant used in determining an internal "yield" of processing files. Given the logic to provide a pause on the newest |
| * files according to timestamp, it is ensured that at least the specified millis has been eclipsed to avoid getting scheduled |
| * near instantaneously after the prior iteration effectively voiding the built in buffer |
| */ |
| public static final Map<TimeUnit, Long> LISTING_LAG_MILLIS; |
| |
| static { |
| final Map<TimeUnit, Long> nanos = new HashMap<>(); |
| nanos.put(TimeUnit.MILLISECONDS, 100L); |
| nanos.put(TimeUnit.SECONDS, 1_000L); |
| nanos.put(TimeUnit.MINUTES, 60_000L); |
| LISTING_LAG_MILLIS = Collections.unmodifiableMap(nanos); |
| } |
| |
| static final String LATEST_LISTED_ENTRY_TIMESTAMP_KEY = "listing.timestamp"; |
| static final String LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY = "processed.timestamp"; |
| static final String IDENTIFIER_PREFIX = "id"; |
| |
| public File getPersistenceFile() { |
| return new File("conf/state/" + getIdentifier()); |
| } |
| |
| @Override |
| public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { |
| if (isConfigurationRestored() && isListingResetNecessary(descriptor)) { |
| resetTimeStates(); // clear lastListingTime so that we have to fetch new time |
| resetState = true; |
| resetEntityTrackingState = true; |
| } |
| } |
| |
| @Override |
| public Set<Relationship> getRelationships() { |
| final Set<Relationship> relationships = new HashSet<>(); |
| relationships.add(REL_SUCCESS); |
| return relationships; |
| } |
| |
| /** |
| * In order to add custom validation at sub-classes, implement {@link #customValidate(ValidationContext, Collection)} method. |
| */ |
| @Override |
| protected final Collection<ValidationResult> customValidate(final ValidationContext context) { |
| final Collection<ValidationResult> results = new ArrayList<>(); |
| |
| final String listingStrategy = context.getProperty(LISTING_STRATEGY).getValue(); |
| if (BY_ENTITIES.equals(listingStrategy)) { |
| ListedEntityTracker.validateProperties(context, results, getStateScope(context)); |
| } |
| |
| customValidate(context, results); |
| return results; |
| } |
| |
| /** |
| * Sub-classes can add custom validation by implementing this method. |
| * |
| * @param validationContext the validation context |
| * @param validationResults add custom validation result to this collection |
| */ |
| protected void customValidate(ValidationContext validationContext, Collection<ValidationResult> validationResults) { |
| |
| } |
| |
| @Override |
| public List<ConfigVerificationResult> verify(final ProcessContext context, final ComponentLog logger, final Map<String, String> attributes) { |
| |
| final List<ConfigVerificationResult> results = new ArrayList<>(); |
| |
| final String containerName = getListingContainerName(context); |
| try { |
| final Integer unfilteredListingCount = countUnfilteredListing(context); |
| final int matchingCount = performListing(context, IGNORE_MIN_TIMESTAMP_VALUE, ListingMode.CONFIGURATION_VERIFICATION).size(); |
| |
| final String countExplanation; |
| if (unfilteredListingCount == null) { |
| if (matchingCount == 0) { |
| countExplanation = "Found no objects matching the filter."; |
| } else { |
| final String matchingCountText = matchingCount == 1 ? matchingCount + " object" : matchingCount + " objects"; |
| countExplanation = String.format("Found %s matching the filter.", matchingCountText); |
| } |
| } else if (unfilteredListingCount == 0) { |
| countExplanation = "Found no objects."; |
| } else { |
| final String unfilteredListingCountText = unfilteredListingCount == 1 ? unfilteredListingCount + " object" : unfilteredListingCount + " objects"; |
| final String unfilteredDemonstrativePronoun = unfilteredListingCount == 1 ? "that" : "those"; |
| final String matchingCountText = matchingCount == 1 ? matchingCount + " matches" : matchingCount + " match"; |
| countExplanation = String.format("Found %s. Of %s, %s the filter.", |
| unfilteredListingCountText, unfilteredDemonstrativePronoun, matchingCountText); |
| } |
| |
| results.add(new ConfigVerificationResult.Builder() |
| .verificationStepName("Perform Listing") |
| .outcome(Outcome.SUCCESSFUL) |
| .explanation(String.format("Successfully listed contents of %s. %s", containerName, countExplanation)) |
| .build()); |
| |
| logger.info("Successfully verified configuration"); |
| } catch (final IOException e) { |
| logger.warn("Failed to verify configuration. Could not list contents of {}", containerName, e); |
| |
| results.add(new ConfigVerificationResult.Builder() |
| .verificationStepName("Perform Listing") |
| .outcome(Outcome.FAILED) |
| .explanation(String.format("Failed to list contents of %s: %s", containerName, e.getMessage())) |
| .build()); |
| } |
| |
| return results; |
| } |
| |
| @OnPrimaryNodeStateChange |
| public void onPrimaryNodeChange(final PrimaryNodeState newState) { |
| justElectedPrimaryNode = (newState == PrimaryNodeState.ELECTED_PRIMARY_NODE); |
| } |
| |
| @OnScheduled |
| public final void updateState(final ProcessContext context) throws IOException { |
| final String path = getPath(context); |
| final DistributedMapCacheClient client = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class); |
| |
| // Check if state already exists for this path. If so, we have already migrated the state. |
| final StateMap stateMap = context.getStateManager().getState(getStateScope(context)); |
| if (stateMap.getVersion() == -1L) { |
| try { |
| // Migrate state from the old way of managing state (distributed cache service and local file) |
| // to the new mechanism (State Manager). |
| migrateState(path, client, context.getStateManager(), getStateScope(context)); |
| } catch (final IOException ioe) { |
| throw new IOException("Failed to properly migrate state to State Manager", ioe); |
| } |
| } |
| |
| // When scheduled to run, check if the associated timestamp is null, signifying a clearing of state and reset the internal timestamp |
| if (lastListedLatestEntryTimestampMillis != null && stateMap.get(LATEST_LISTED_ENTRY_TIMESTAMP_KEY) == null) { |
| getLogger().info("Detected that state was cleared for this component. Resetting internal values."); |
| resetTimeStates(); |
| } |
| |
| if (resetState) { |
| context.getStateManager().clear(getStateScope(context)); |
| resetState = false; |
| } |
| } |
| |
| /** |
| * This processor used to use the DistributedMapCacheClient in order to store cluster-wide state, before the introduction of |
| * the StateManager. This method will migrate state from that DistributedMapCacheClient, or from a local file, to the StateManager, |
| * if any state already exists. More specifically, this will extract out the relevant timestamp for when the processor last ran |
| * |
| * @param path the path to migrate state for |
| * @param client the DistributedMapCacheClient that is capable of obtaining the current state |
| * @param stateManager the StateManager to use in order to store the new state |
| * @param scope the scope to use |
| * @throws IOException if unable to retrieve or store the state |
| */ |
| private void migrateState(final String path, final DistributedMapCacheClient client, final StateManager stateManager, final Scope scope) throws IOException { |
| Long minTimestamp = null; |
| |
| // Retrieve state from Distributed Cache Client, establishing the latest file seen |
| if (client != null) { |
| final StringSerDe serde = new StringSerDe(); |
| final String serializedState = client.get(getKey(path), serde, serde); |
| if (serializedState != null && !serializedState.isEmpty()) { |
| final EntityListing listing = deserialize(serializedState); |
| minTimestamp = listing.getLatestTimestamp().getTime(); |
| } |
| |
| // remove entry from distributed cache server |
| if (client != null) { |
| try { |
| client.remove(path, new StringSerDe()); |
| } catch (final IOException ioe) { |
| getLogger().warn("Failed to remove entry from Distributed Cache Service. However, the state has already been migrated to use the new " |
| + "State Management service, so the Distributed Cache Service is no longer needed."); |
| } |
| } |
| } |
| |
| // Retrieve state from locally persisted file, and compare these to the minTimestamp established from the distributedCache, if there was one |
| final File persistenceFile = getPersistenceFile(); |
| if (persistenceFile.exists()) { |
| final Properties props = new Properties(); |
| |
| try (final FileInputStream fis = new FileInputStream(persistenceFile)) { |
| props.load(fis); |
| } |
| |
| final String locallyPersistedValue = props.getProperty(path); |
| if (locallyPersistedValue != null) { |
| final EntityListing listing = deserialize(locallyPersistedValue); |
| final long localTimestamp = listing.getLatestTimestamp().getTime(); |
| // if the local file's latest timestamp is beyond that of the value provided from the cache, replace |
| if (minTimestamp == null || localTimestamp > minTimestamp) { |
| minTimestamp = localTimestamp; |
| latestIdentifiersProcessed.clear(); |
| latestIdentifiersProcessed.addAll(listing.getMatchingIdentifiers()); |
| } |
| } |
| |
| // delete the local file, since it is no longer needed |
| if (persistenceFile.exists() && !persistenceFile.delete()) { |
| getLogger().warn("Migrated state but failed to delete local persistence file"); |
| } |
| } |
| |
| if (minTimestamp != null) { |
| final Map<String, String> updatedState = createStateMap(minTimestamp, minTimestamp, latestIdentifiersProcessed); |
| stateManager.setState(updatedState, scope); |
| } |
| } |
| |
| private Map<String, String> createStateMap(final long latestListedEntryTimestampThisCycleMillis, |
| final long lastProcessedLatestEntryTimestampMillis, |
| final List<String> processedIdentifiesWithLatestTimestamp) throws IOException { |
| |
| final Map<String, String> updatedState = new HashMap<>(processedIdentifiesWithLatestTimestamp.size() + 2); |
| updatedState.put(LATEST_LISTED_ENTRY_TIMESTAMP_KEY, String.valueOf(latestListedEntryTimestampThisCycleMillis)); |
| updatedState.put(LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY, String.valueOf(lastProcessedLatestEntryTimestampMillis)); |
| for (int i = 0; i < processedIdentifiesWithLatestTimestamp.size(); i++) { |
| updatedState.put(IDENTIFIER_PREFIX + "." + i, processedIdentifiesWithLatestTimestamp.get(i)); |
| } |
| |
| return updatedState; |
| } |
| |
| private void persist(final long latestListedEntryTimestampThisCycleMillis, |
| final long lastProcessedLatestEntryTimestampMillis, |
| final List<String> processedIdentifiesWithLatestTimestamp, |
| final ProcessSession session, final Scope scope) throws IOException { |
| final Map<String, String> updatedState = createStateMap(latestListedEntryTimestampThisCycleMillis, lastProcessedLatestEntryTimestampMillis, processedIdentifiesWithLatestTimestamp); |
| session.setState(updatedState, scope); |
| } |
| |
| protected String getKey(final String directory) { |
| return getIdentifier() + ".lastListingTime." + directory; |
| } |
| |
| private EntityListing deserialize(final String serializedState) throws JsonParseException, JsonMappingException, IOException { |
| final ObjectMapper mapper = new ObjectMapper(); |
| return mapper.readValue(serializedState, EntityListing.class); |
| } |
| |
| @Override |
| public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { |
| |
| final String listingStrategy = context.getProperty(LISTING_STRATEGY).getValue(); |
| if (BY_TIMESTAMPS.equals(listingStrategy)) { |
| listByTrackingTimestamps(context, session); |
| |
| } else if (BY_ENTITIES.equals(listingStrategy)) { |
| listByTrackingEntities(context, session); |
| |
| } else if (NO_TRACKING.equals(listingStrategy)) { |
| listByNoTracking(context, session); |
| |
| } else if (BY_TIME_WINDOW.equals(listingStrategy)) { |
| listByTimeWindow(context, session); |
| |
| } else { |
| throw new ProcessException("Unknown listing strategy: " + listingStrategy); |
| } |
| } |
| |
| protected long getCurrentTime() { |
| return System.currentTimeMillis(); |
| } |
| |
| public void listByNoTracking(final ProcessContext context, final ProcessSession session) { |
| final List<T> entityList; |
| |
| try { |
| // Remove any previous state from the state manager before use a No Tracking Strategy. |
| context.getStateManager().clear(getStateScope(context)); |
| |
| } catch (final IOException re) { |
| getLogger().error("Failed to remove previous state from the State Manager.", new Object[]{re.getMessage()}, re); |
| context.yield(); |
| return; |
| } |
| |
| try { |
| // minTimestamp = 0L by default on this strategy to ignore any future |
| // comparision in lastModifiedMap to the same entity. |
| entityList = performListing(context, IGNORE_MIN_TIMESTAMP_VALUE, ListingMode.EXECUTION); |
| } catch (final IOException pe) { |
| getLogger().error("Failed to perform listing on remote host due to {}", new Object[]{pe.getMessage()}, pe); |
| context.yield(); |
| return; |
| } |
| |
| if (entityList == null || entityList.isEmpty()) { |
| context.yield(); |
| return; |
| } |
| |
| final TreeMap<Long, List<T>> orderedEntries = new TreeMap<>(); |
| for (final T entity : entityList) { |
| List<T> entitiesForTimestamp = orderedEntries.computeIfAbsent(entity.getTimestamp(), k -> new ArrayList<T>()); |
| entitiesForTimestamp.add(entity); |
| } |
| |
| if (orderedEntries.size() > 0) { |
| for (Map.Entry<Long, List<T>> timestampEntities : orderedEntries.entrySet()) { |
| List<T> entities = timestampEntities.getValue(); |
| for (T entity : entities) { |
| // Create the FlowFile for this path. |
| final Map<String, String> attributes = createAttributes(entity, context); |
| FlowFile flowFile = session.create(); |
| flowFile = session.putAllAttributes(flowFile, attributes); |
| session.transfer(flowFile, REL_SUCCESS); |
| } |
| } |
| } |
| } |
| |
| public void listByTimeWindow(final ProcessContext context, final ProcessSession session) throws ProcessException { |
| if (lastListedLatestEntryTimestampMillis == null || justElectedPrimaryNode) { |
| try { |
| final StateMap stateMap = context.getStateManager().getState(getStateScope(context)); |
| Optional.ofNullable(stateMap.get(LATEST_LISTED_ENTRY_TIMESTAMP_KEY)) |
| .map(Long::parseLong) |
| .ifPresent(lastTimestamp -> lastListedLatestEntryTimestampMillis = lastTimestamp); |
| |
| justElectedPrimaryNode = false; |
| } catch (final IOException ioe) { |
| getLogger().error("Failed to retrieve timestamp of last listing from the State Manager. Will not perform listing until this is accomplished."); |
| context.yield(); |
| return; |
| } |
| } |
| |
| long lowerBoundInclusiveTimestamp = Optional.ofNullable(lastListedLatestEntryTimestampMillis).orElse(IGNORE_MIN_TIMESTAMP_VALUE); |
| long upperBoundExclusiveTimestamp; |
| |
| long currentTime = getCurrentTime(); |
| |
| final TreeMap<Long, List<T>> orderedEntries = new TreeMap<>(); |
| try { |
| List<T> entityList = performListing(context, lowerBoundInclusiveTimestamp, ListingMode.EXECUTION); |
| |
| boolean targetSystemHasMilliseconds = false; |
| boolean targetSystemHasSeconds = false; |
| for (final T entity : entityList) { |
| final long entityTimestampMillis = entity.getTimestamp(); |
| if (!targetSystemHasMilliseconds) { |
| targetSystemHasMilliseconds = entityTimestampMillis % 1000 > 0; |
| } |
| if (!targetSystemHasSeconds) { |
| targetSystemHasSeconds = entityTimestampMillis % 60_000 > 0; |
| } |
| } |
| |
| // Determine target system time precision. |
| String specifiedPrecision = context.getProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION).getValue(); |
| if (StringUtils.isBlank(specifiedPrecision)) { |
| // If TARGET_SYSTEM_TIMESTAMP_PRECISION is not supported by the Processor, then specifiedPrecision can be null, instead of its default value. |
| specifiedPrecision = getDefaultTimePrecision(); |
| } |
| final TimeUnit targetSystemTimePrecision |
| = PRECISION_AUTO_DETECT.getValue().equals(specifiedPrecision) |
| ? targetSystemHasMilliseconds ? TimeUnit.MILLISECONDS : targetSystemHasSeconds ? TimeUnit.SECONDS : TimeUnit.MINUTES |
| : PRECISION_MILLIS.getValue().equals(specifiedPrecision) ? TimeUnit.MILLISECONDS |
| : PRECISION_SECONDS.getValue().equals(specifiedPrecision) ? TimeUnit.SECONDS : TimeUnit.MINUTES; |
| final Long listingLagMillis = LISTING_LAG_MILLIS.get(targetSystemTimePrecision); |
| |
| upperBoundExclusiveTimestamp = currentTime - listingLagMillis; |
| |
| if (getLogger().isTraceEnabled()) { |
| getLogger().trace("interval: " + lowerBoundInclusiveTimestamp + " - " + upperBoundExclusiveTimestamp); |
| getLogger().trace("entityList: " + entityList.stream().map(entity -> entity.getName() + "_" + entity.getTimestamp()).collect(Collectors.joining(", "))); |
| } |
| entityList |
| .stream() |
| .filter(entity -> entity.getTimestamp() >= lowerBoundInclusiveTimestamp) |
| .filter(entity -> entity.getTimestamp() < upperBoundExclusiveTimestamp) |
| .forEach(entity -> orderedEntries |
| .computeIfAbsent(entity.getTimestamp(), __ -> new ArrayList<>()) |
| .add(entity) |
| ); |
| if (getLogger().isTraceEnabled()) { |
| getLogger().trace("orderedEntries: " + |
| orderedEntries.values().stream() |
| .flatMap(List::stream) |
| .map(entity -> entity.getName() + "_" + entity.getTimestamp()) |
| .collect(Collectors.joining(", ")) |
| ); |
| } |
| } catch (final IOException e) { |
| getLogger().error("Failed to perform listing on remote host due to {}", new Object[]{e.getMessage()}, e); |
| context.yield(); |
| return; |
| } |
| |
| if (orderedEntries.isEmpty()) { |
| getLogger().debug("There is no data to list. Yielding."); |
| context.yield(); |
| return; |
| } |
| |
| final boolean writerSet = context.getProperty(RECORD_WRITER).isSet(); |
| if (writerSet) { |
| try { |
| createRecordsForEntities(context, session, orderedEntries); |
| } catch (final IOException | SchemaNotFoundException e) { |
| getLogger().error("Failed to write listing to FlowFile", e); |
| context.yield(); |
| return; |
| } |
| } else { |
| createFlowFilesForEntities(context, session, orderedEntries); |
| } |
| |
| try { |
| if (getLogger().isTraceEnabled()) { |
| getLogger().info("this.lastListedLatestEntryTimestampMillis = upperBoundExclusiveTimestamp: " + lastListedLatestEntryTimestampMillis + " = " + upperBoundExclusiveTimestamp); |
| } |
| lastListedLatestEntryTimestampMillis = upperBoundExclusiveTimestamp; |
| persist(upperBoundExclusiveTimestamp, upperBoundExclusiveTimestamp, latestIdentifiersProcessed, session, getStateScope(context)); |
| } catch (final IOException ioe) { |
| getLogger().warn("Unable to save state due to {}. If NiFi is restarted before state is saved, or " |
| + "if another node begins executing this Processor, data duplication may occur.", ioe); |
| } |
| } |
| |
| public void listByTrackingTimestamps(final ProcessContext context, final ProcessSession session) throws ProcessException { |
| Long minTimestampToListMillis = lastListedLatestEntryTimestampMillis; |
| |
| if (lastListedLatestEntryTimestampMillis == null || lastProcessedLatestEntryTimestampMillis == null || justElectedPrimaryNode) { |
| try { |
| boolean noUpdateRequired = false; |
| // Attempt to retrieve state from the state manager if a last listing was not yet established or |
| // if just elected the primary node |
| final StateMap stateMap = session.getState(getStateScope(context)); |
| latestIdentifiersProcessed.clear(); |
| for (final Map.Entry<String, String> state : stateMap.toMap().entrySet()) { |
| final String k = state.getKey(); |
| final String v = state.getValue(); |
| if (v == null || v.isEmpty()) { |
| continue; |
| } |
| |
| if (LATEST_LISTED_ENTRY_TIMESTAMP_KEY.equals(k)) { |
| minTimestampToListMillis = Long.parseLong(v); |
| // If our determined timestamp is the same as that of our last listing, skip this execution as there are no updates |
| if (minTimestampToListMillis.equals(lastListedLatestEntryTimestampMillis)) { |
| noUpdateRequired = true; |
| } else { |
| lastListedLatestEntryTimestampMillis = minTimestampToListMillis; |
| } |
| } else if (LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY.equals(k)) { |
| lastProcessedLatestEntryTimestampMillis = Long.parseLong(v); |
| } else if (k.startsWith(IDENTIFIER_PREFIX)) { |
| latestIdentifiersProcessed.add(v); |
| } |
| } |
| justElectedPrimaryNode = false; |
| if (noUpdateRequired) { |
| context.yield(); |
| return; |
| } |
| } catch (final IOException ioe) { |
| getLogger().error("Failed to retrieve timestamp of last listing from the State Manager. Will not perform listing until this is accomplished."); |
| context.yield(); |
| return; |
| } |
| } |
| |
| final List<T> entityList; |
| final long currentRunTimeNanos = System.nanoTime(); |
| final long currentRunTimeMillis = System.currentTimeMillis(); |
| try { |
| // track of when this last executed for consideration of the lag nanos |
| entityList = performListing(context, minTimestampToListMillis, ListingMode.EXECUTION); |
| } catch (final IOException e) { |
| getLogger().error("Failed to perform listing on remote host due to {}", new Object[]{e.getMessage()}, e); |
| context.yield(); |
| return; |
| } |
| |
| if (entityList == null || entityList.isEmpty()) { |
| context.yield(); |
| return; |
| } |
| |
| Long latestListedEntryTimestampThisCycleMillis = null; |
| final TreeMap<Long, List<T>> orderedEntries = new TreeMap<>(); |
| |
| // Build a sorted map to determine the latest possible entries |
| boolean targetSystemHasMilliseconds = false; |
| boolean targetSystemHasSeconds = false; |
| for (final T entity : entityList) { |
| final long entityTimestampMillis = entity.getTimestamp(); |
| if (!targetSystemHasMilliseconds) { |
| targetSystemHasMilliseconds = entityTimestampMillis % 1000 > 0; |
| } |
| if (!targetSystemHasSeconds) { |
| targetSystemHasSeconds = entityTimestampMillis % 60_000 > 0; |
| } |
| // New entries are all those that occur at or after the associated timestamp |
| final boolean newEntry = minTimestampToListMillis == null || entityTimestampMillis >= minTimestampToListMillis && entityTimestampMillis >= lastProcessedLatestEntryTimestampMillis; |
| |
| if (newEntry) { |
| List<T> entitiesForTimestamp = orderedEntries.get(entity.getTimestamp()); |
| if (entitiesForTimestamp == null) { |
| entitiesForTimestamp = new ArrayList<T>(); |
| orderedEntries.put(entity.getTimestamp(), entitiesForTimestamp); |
| } |
| entitiesForTimestamp.add(entity); |
| } |
| } |
| |
| int entitiesListed = 0; |
| |
| if (orderedEntries.size() > 0) { |
| latestListedEntryTimestampThisCycleMillis = orderedEntries.lastKey(); |
| |
| // Determine target system time precision. |
| String specifiedPrecision = context.getProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION).getValue(); |
| if (StringUtils.isBlank(specifiedPrecision)) { |
| // If TARGET_SYSTEM_TIMESTAMP_PRECISION is not supported by the Processor, then specifiedPrecision can be null, instead of its default value. |
| specifiedPrecision = getDefaultTimePrecision(); |
| } |
| final TimeUnit targetSystemTimePrecision |
| = PRECISION_AUTO_DETECT.getValue().equals(specifiedPrecision) |
| ? targetSystemHasMilliseconds ? TimeUnit.MILLISECONDS : targetSystemHasSeconds ? TimeUnit.SECONDS : TimeUnit.MINUTES |
| : PRECISION_MILLIS.getValue().equals(specifiedPrecision) ? TimeUnit.MILLISECONDS |
| : PRECISION_SECONDS.getValue().equals(specifiedPrecision) ? TimeUnit.SECONDS : TimeUnit.MINUTES; |
| final Long listingLagMillis = LISTING_LAG_MILLIS.get(targetSystemTimePrecision); |
| |
| // If the last listing time is equal to the newest entries previously seen, |
| // another iteration has occurred without new files and special handling is needed to avoid starvation |
| if (latestListedEntryTimestampThisCycleMillis.equals(lastListedLatestEntryTimestampMillis)) { |
| /* We need to wait for another cycle when either: |
| * - If we have not eclipsed the minimal listing lag needed due to being triggered too soon after the last run |
| * - The latest listed entity timestamp is equal to the last processed time, meaning we handled those items originally passed over. No need to process it again. |
| */ |
| final long listingLagNanos = TimeUnit.MILLISECONDS.toNanos(listingLagMillis); |
| if (currentRunTimeNanos - lastRunTimeNanos < listingLagNanos |
| || (latestListedEntryTimestampThisCycleMillis.equals(lastProcessedLatestEntryTimestampMillis) |
| && orderedEntries.get(latestListedEntryTimestampThisCycleMillis).stream() |
| .allMatch(entity -> latestIdentifiersProcessed.contains(entity.getIdentifier())))) { |
| context.yield(); |
| return; |
| } |
| |
| } else { |
| // Convert minimum reliable timestamp into target system time unit, in order to truncate unreliable digits. |
| final long minimumReliableTimestampInFilesystemTimeUnit = targetSystemTimePrecision.convert(currentRunTimeMillis - listingLagMillis, TimeUnit.MILLISECONDS); |
| final long minimumReliableTimestampMillis = targetSystemTimePrecision.toMillis(minimumReliableTimestampInFilesystemTimeUnit); |
| // If the latest listed entity is not old enough, compared with the minimum timestamp, then wait for another cycle. |
| // The minimum timestamp should be reliable to determine that no further entries will be added with the same timestamp based on the target system time precision. |
| if (minimumReliableTimestampMillis < latestListedEntryTimestampThisCycleMillis) { |
| // Otherwise, newest entries are held back one cycle to avoid issues in writes occurring exactly when the listing is being performed to avoid missing data |
| orderedEntries.remove(latestListedEntryTimestampThisCycleMillis); |
| } |
| } |
| |
| |
| final boolean writerSet = context.getProperty(RECORD_WRITER).isSet(); |
| if (writerSet) { |
| try { |
| entitiesListed = createRecordsForEntities(context, session, orderedEntries); |
| } catch (final IOException | SchemaNotFoundException e) { |
| getLogger().error("Failed to write listing to FlowFile", e); |
| context.yield(); |
| return; |
| } |
| } else { |
| entitiesListed = createFlowFilesForEntities(context, session, orderedEntries); |
| } |
| } |
| |
| // As long as we have a listing timestamp, there is meaningful state to capture regardless of any outputs generated |
| if (latestListedEntryTimestampThisCycleMillis != null) { |
| final boolean processedNewFiles = entitiesListed > 0; |
| |
| if (!latestListedEntryTimestampThisCycleMillis.equals(lastListedLatestEntryTimestampMillis) || processedNewFiles) { |
| // We have performed a listing and pushed any FlowFiles out that may have been generated |
| // Now, we need to persist state about the Last Modified timestamp of the newest file |
| // that we evaluated. We do this in order to avoid pulling in the same file twice. |
| // However, we want to save the state both locally and remotely. |
| // We store the state remotely so that if a new Primary Node is chosen, it can pick up where the |
| // previously Primary Node left off. |
| // We also store the state locally so that if the node is restarted, and the node cannot contact |
| // the distributed state cache, the node can continue to run (if it is primary node). |
| try { |
| lastListedLatestEntryTimestampMillis = latestListedEntryTimestampThisCycleMillis; |
| persist(latestListedEntryTimestampThisCycleMillis, lastProcessedLatestEntryTimestampMillis, latestIdentifiersProcessed, session, getStateScope(context)); |
| } catch (final IOException ioe) { |
| getLogger().warn("Unable to save state due to {}. If NiFi is restarted before state is saved, or " |
| + "if another node begins executing this Processor, data duplication may occur.", ioe); |
| } |
| } |
| |
| if (processedNewFiles) { |
| // If there have been files created, update the last timestamp we processed. |
| // Retrieving lastKey instead of using latestListedEntryTimestampThisCycleMillis is intentional here, |
| // because latestListedEntryTimestampThisCycleMillis might be removed if it's not old enough. |
| if (!orderedEntries.lastKey().equals(lastProcessedLatestEntryTimestampMillis)) { |
| // If the latest timestamp at this cycle becomes different than the previous one, we need to clear identifiers. |
| // If it didn't change, we need to add identifiers. |
| latestIdentifiersProcessed.clear(); |
| } |
| // Capture latestIdentifierProcessed. |
| latestIdentifiersProcessed.addAll(orderedEntries.lastEntry().getValue().stream().map(T::getIdentifier).collect(Collectors.toList())); |
| lastProcessedLatestEntryTimestampMillis = orderedEntries.lastKey(); |
| getLogger().info("Successfully created listing with {} new objects", new Object[]{entitiesListed}); |
| session.commitAsync(); |
| } |
| |
| lastRunTimeNanos = currentRunTimeNanos; |
| } else { |
| getLogger().debug("There is no data to list. Yielding."); |
| context.yield(); |
| |
| // lastListingTime = 0 so that we don't continually poll the distributed cache / local file system |
| if (lastListedLatestEntryTimestampMillis == null) { |
| lastListedLatestEntryTimestampMillis = 0L; |
| } |
| } |
| } |
| |
| private int createRecordsForEntities(final ProcessContext context, final ProcessSession session, final Map<Long, List<T>> orderedEntries) throws IOException, SchemaNotFoundException { |
| final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); |
| |
| int entitiesListed = 0; |
| FlowFile flowFile = session.create(); |
| final WriteResult writeResult; |
| |
| try (final OutputStream out = session.write(flowFile); |
| final RecordSetWriter recordSetWriter = writerFactory.createWriter(getLogger(), getRecordSchema(), out, Collections.emptyMap())) { |
| |
| recordSetWriter.beginRecordSet(); |
| for (final Map.Entry<Long, List<T>> timestampEntities : orderedEntries.entrySet()) { |
| List<T> entities = timestampEntities.getValue(); |
| if (timestampEntities.getKey().equals(lastProcessedLatestEntryTimestampMillis)) { |
| // Filter out previously processed entities. |
| entities = entities.stream().filter(entity -> !latestIdentifiersProcessed.contains(entity.getIdentifier())).collect(Collectors.toList()); |
| } |
| |
| for (final T entity : entities) { |
| entitiesListed++; |
| recordSetWriter.write(entity.toRecord()); |
| } |
| } |
| |
| writeResult = recordSetWriter.finishRecordSet(); |
| } |
| |
| if (entitiesListed == 0) { |
| session.remove(flowFile); |
| return 0; |
| } |
| |
| final Map<String, String> attributes = new HashMap<>(writeResult.getAttributes()); |
| attributes.put("record.count", String.valueOf(writeResult.getRecordCount())); |
| flowFile = session.putAllAttributes(flowFile, attributes); |
| |
| session.transfer(flowFile, REL_SUCCESS); |
| return entitiesListed; |
| } |
| |
| private int createFlowFilesForEntities(final ProcessContext context, final ProcessSession session, final Map<Long, List<T>> orderedEntries) { |
| int entitiesListed = 0; |
| for (final Map.Entry<Long, List<T>> timestampEntities : orderedEntries.entrySet()) { |
| List<T> entities = timestampEntities.getValue(); |
| if (timestampEntities.getKey().equals(lastProcessedLatestEntryTimestampMillis)) { |
| // Filter out previously processed entities. |
| entities = entities.stream().filter(entity -> !latestIdentifiersProcessed.contains(entity.getIdentifier())).collect(Collectors.toList()); |
| } |
| |
| for (final T entity : entities) { |
| entitiesListed++; |
| |
| // Create the FlowFile for this path. |
| final Map<String, String> attributes = createAttributes(entity, context); |
| FlowFile flowFile = session.create(); |
| flowFile = session.putAllAttributes(flowFile, attributes); |
| session.transfer(flowFile, REL_SUCCESS); |
| } |
| } |
| |
| return entitiesListed; |
| } |
| |
| /** |
| * This method is intended to be overridden by SubClasses those do not support TARGET_SYSTEM_TIMESTAMP_PRECISION property. |
| * So that it use return different precisions than PRECISION_AUTO_DETECT. |
| * If TARGET_SYSTEM_TIMESTAMP_PRECISION is supported as a valid Processor property, |
| * then PRECISION_AUTO_DETECT will be the default value when not specified by a user. |
| * |
| * @return |
| */ |
| protected String getDefaultTimePrecision() { |
| return TARGET_SYSTEM_TIMESTAMP_PRECISION.getDefaultValue(); |
| } |
| |
| private void resetTimeStates() { |
| lastListedLatestEntryTimestampMillis = null; |
| lastProcessedLatestEntryTimestampMillis = 0L; |
| lastRunTimeNanos = 0L; |
| latestIdentifiersProcessed.clear(); |
| } |
| |
| /** |
| * Creates a Map of attributes that should be applied to the FlowFile to represent this entity. This processor will emit a FlowFile for each "new" entity |
| * (see the documentation for this class for a discussion of how this class determines whether or not an entity is "new"). The FlowFile will contain no |
| * content. The attributes that will be included are exactly the attributes that are returned by this method. |
| * |
| * @param entity the entity represented by the FlowFile |
| * @param context the ProcessContext for obtaining configuration information |
| * @return a Map of attributes for this entity |
| */ |
| protected abstract Map<String, String> createAttributes(T entity, ProcessContext context); |
| |
| /** |
| * Returns the path to perform a listing on. |
| * Many resources can be comprised of a "path" (or a "container" or "bucket", etc.) as well as name or identifier that is unique only |
| * within that path. This method is responsible for returning the path that is currently being polled for entities. If this does concept |
| * does not apply for the concrete implementation, it is recommended that the concrete implementation return "." or "/" for all invocations of this method. |
| * |
| * @param context the ProcessContex to use in order to obtain configuration |
| * @return the path that is to be used to perform the listing, or <code>null</code> if not applicable. |
| */ |
| protected abstract String getPath(final ProcessContext context); |
| |
| /** |
| * Performs a listing of the remote entities that can be pulled. If any entity that is returned has already been "discovered" or "emitted" |
| * by this Processor, it will be ignored. A discussion of how the Processor determines those entities that have already been emitted is |
| * provided above in the documentation for this class. Any entity that is returned by this method with a timestamp prior to the minTimestamp |
| * will be filtered out by the Processor. Therefore, it is not necessary that implementations perform this filtering but can be more efficient |
| * if the filtering can be performed on the server side prior to retrieving the information. |
| * |
| * @param context the ProcessContext to use in order to pull the appropriate entities |
| * @param minTimestamp the minimum timestamp of entities that should be returned |
| * @param listingMode the listing mode, indicating whether the listing is being performed during configuration verification or normal processor execution |
| * @return a Listing of entities that have a timestamp >= minTimestamp |
| */ |
| protected abstract List<T> performListing(final ProcessContext context, final Long minTimestamp, final ListingMode listingMode) |
| throws IOException; |
| |
| /** |
| * Determines whether or not the listing must be reset if the value of the given property is changed |
| * |
| * @param property the property that has changed |
| * @return <code>true</code> if a change in value of the given property necessitates that the listing be reset, <code>false</code> otherwise. |
| */ |
| protected abstract boolean isListingResetNecessary(final PropertyDescriptor property); |
| |
| /** |
| * Returns a Scope that specifies where the state should be managed for this Processor |
| * |
| * @param context the ProcessContext to use in order to make a determination |
| * @return a Scope that specifies where the state should be managed for this Processor |
| */ |
| protected abstract Scope getStateScope(final PropertyContext context); |
| |
| /** |
| * @return the RecordSchema that will be used for any Records that are produced by the Processor |
| */ |
| protected abstract RecordSchema getRecordSchema(); |
| |
| /** |
| * Performs an unfiltered listing and returns the count, or null if this operation is not supported. |
| * |
| * @param context the ProcessContext to use in order to pull the appropriate entities |
| * @return The number of unfiltered entities in the listing, or null if this processor does not support an unfiltered listing |
| */ |
| protected abstract Integer countUnfilteredListing(final ProcessContext context) |
| throws IOException; |
| |
| /** |
| * Provides a human-readable name for the container being listed, for the purpose of displaying readable verification messages during processor configuration verification. |
| * |
| * @param context The process context |
| * @return The user-friendly name for the container |
| */ |
| protected abstract String getListingContainerName(final ProcessContext context); |
| |
| private static class StringSerDe implements Serializer<String>, Deserializer<String> { |
| |
| @Override |
| public String deserialize(final byte[] value) throws DeserializationException, IOException { |
| if (value == null) { |
| return null; |
| } |
| |
| return new String(value, StandardCharsets.UTF_8); |
| } |
| @Override |
| public void serialize(final String value, final OutputStream out) throws SerializationException, IOException { |
| out.write(value.getBytes(StandardCharsets.UTF_8)); |
| } |
| |
| } |
| |
| @OnScheduled |
| public void initListedEntityTracker(ProcessContext context) { |
| final boolean isTrackingEntityStrategy = BY_ENTITIES.getValue().equals(context.getProperty(LISTING_STRATEGY).getValue()); |
| if (listedEntityTracker != null && (resetEntityTrackingState || !isTrackingEntityStrategy)) { |
| try { |
| listedEntityTracker.clearListedEntities(); |
| } catch (IOException e) { |
| throw new RuntimeException("Failed to reset previously listed entities due to " + e, e); |
| } |
| } |
| resetEntityTrackingState = false; |
| |
| if (isTrackingEntityStrategy) { |
| if (listedEntityTracker == null) { |
| listedEntityTracker = createListedEntityTracker(); |
| } |
| } else { |
| listedEntityTracker = null; |
| } |
| } |
| |
| protected ListedEntityTracker<T> createListedEntityTracker() { |
| return new ListedEntityTracker<>(getIdentifier(), getLogger(), getRecordSchema()); |
| } |
| |
| private void listByTrackingEntities(ProcessContext context, ProcessSession session) throws ProcessException { |
| listedEntityTracker.trackEntities(context, session, justElectedPrimaryNode, getStateScope(context), minTimestampToList -> { |
| try { |
| return performListing(context, minTimestampToList, ListingMode.EXECUTION); |
| } catch (final IOException e) { |
| getLogger().error("Failed to perform listing on remote host due to {}", new Object[]{e.getMessage()}, e); |
| return Collections.emptyList(); |
| } |
| }, entity -> createAttributes(entity, context)); |
| justElectedPrimaryNode = false; |
| } |
| } |