blob: e7ea4918beeb29b008194146c2ec13d70f735d9a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.util.FileTransfer;
import org.apache.nifi.processors.standard.util.PermissionDeniedException;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.Tuple;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* A base class for FetchSFTP, FetchFTP processors.
*
* Note that implementations of this class should never use the @SupportsBatching annotation! Doing so
* could result in data loss!
*/
public abstract class FetchFileTransfer extends AbstractProcessor {
static final AllowableValue COMPLETION_NONE = new AllowableValue("None", "None", "Leave the file as-is");
static final AllowableValue COMPLETION_MOVE = new AllowableValue("Move File", "Move File", "Move the file to the directory specified by the <Move Destination Directory> property");
static final AllowableValue COMPLETION_DELETE = new AllowableValue("Delete File", "Delete File", "Deletes the original file from the remote system");
static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
.name("Hostname")
.description("The fully-qualified hostname or IP address of the host to fetch the data from")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(true)
.build();
static final PropertyDescriptor UNDEFAULTED_PORT = new PropertyDescriptor.Builder()
.name("Port")
.description("The port to connect to on the remote host to fetch the data from")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(true)
.build();
public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
.name("Username")
.description("Username")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(true)
.build();
public static final PropertyDescriptor REMOTE_FILENAME = new PropertyDescriptor.Builder()
.name("Remote File")
.description("The fully qualified filename on the remote system")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
static final PropertyDescriptor COMPLETION_STRATEGY = new PropertyDescriptor.Builder()
.name("Completion Strategy")
.description("Specifies what to do with the original file on the server once it has been pulled into NiFi. If the Completion Strategy fails, a warning will be "
+ "logged but the data will still be transferred.")
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.allowableValues(COMPLETION_NONE, COMPLETION_MOVE, COMPLETION_DELETE)
.defaultValue(COMPLETION_NONE.getValue())
.required(true)
.build();
static final PropertyDescriptor MOVE_CREATE_DIRECTORY = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(FileTransfer.CREATE_DIRECTORY).description(String.format("Used when '%s' is '%s'. %s",
COMPLETION_STRATEGY.getDisplayName(),
COMPLETION_MOVE.getDisplayName(),
FileTransfer.CREATE_DIRECTORY.getDescription()))
.required(false)
.build();
static final PropertyDescriptor MOVE_DESTINATION_DIR = new PropertyDescriptor.Builder()
.name("Move Destination Directory")
.description(String.format("The directory on the remote server to move the original file to once it has been ingested into NiFi. "
+ "This property is ignored unless the %s is set to '%s'. The specified directory must already exist on "
+ "the remote system if '%s' is disabled, or the rename will fail.",
COMPLETION_STRATEGY.getDisplayName(), COMPLETION_MOVE.getDisplayName(), MOVE_CREATE_DIRECTORY.getDisplayName()))
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(false)
.build();
static final PropertyDescriptor FILE_NOT_FOUND_LOG_LEVEL = new PropertyDescriptor.Builder()
.displayName("Log level when file not found")
.name("fetchfiletransfer-notfound-loglevel")
.description("Log level to use in case the file does not exist when the processor is triggered")
.allowableValues(LogLevel.values())
.defaultValue(LogLevel.ERROR.toString()) // backward compatibility support
.required(true)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("All FlowFiles that are received are routed to success")
.build();
static final Relationship REL_COMMS_FAILURE = new Relationship.Builder()
.name("comms.failure")
.description("Any FlowFile that could not be fetched from the remote server due to a communications failure will be transferred to this Relationship.")
.build();
static final Relationship REL_NOT_FOUND = new Relationship.Builder()
.name("not.found")
.description("Any FlowFile for which we receive a 'Not Found' message from the remote server will be transferred to this Relationship.")
.build();
static final Relationship REL_PERMISSION_DENIED = new Relationship.Builder()
.name("permission.denied")
.description("Any FlowFile that could not be fetched from the remote server due to insufficient permissions will be transferred to this Relationship.")
.build();
private final Map<Tuple<String, Integer>, BlockingQueue<FileTransferIdleWrapper>> fileTransferMap = new HashMap<>();
private final long IDLE_CONNECTION_MILLIS = TimeUnit.SECONDS.toMillis(10L); // amount of time to wait before closing an idle connection
private volatile long lastClearTime = System.currentTimeMillis();
private LogLevel levelFileNotFound = LogLevel.ERROR;
@Override
public Set<Relationship> getRelationships() {
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
relationships.add(REL_NOT_FOUND);
relationships.add(REL_PERMISSION_DENIED);
relationships.add(REL_COMMS_FAILURE);
return relationships;
}
@OnScheduled
public void onScheduled(final ProcessContext context) {
levelFileNotFound = LogLevel.valueOf(context.getProperty(FILE_NOT_FOUND_LOG_LEVEL).getValue());
}
/**
* Close connections that are idle or optionally close all connections.
* Connections are considered "idle" if they have not been used in 10 seconds.
*
* @param closeNonIdleConnections if <code>true</code> will close all connection; if <code>false</code> will close only idle connections
*/
private void closeConnections(final boolean closeNonIdleConnections) {
for (final Map.Entry<Tuple<String, Integer>, BlockingQueue<FileTransferIdleWrapper>> entry : fileTransferMap.entrySet()) {
final BlockingQueue<FileTransferIdleWrapper> wrapperQueue = entry.getValue();
final List<FileTransferIdleWrapper> putBack = new ArrayList<>();
FileTransferIdleWrapper wrapper;
while ((wrapper = wrapperQueue.poll()) != null) {
final long lastUsed = wrapper.getLastUsed();
final long nanosSinceLastUse = System.nanoTime() - lastUsed;
if (!closeNonIdleConnections && TimeUnit.NANOSECONDS.toMillis(nanosSinceLastUse) < IDLE_CONNECTION_MILLIS) {
putBack.add(wrapper);
} else {
try {
wrapper.getFileTransfer().close();
} catch (final IOException ioe) {
getLogger().warn("Failed to close Idle Connection due to {}", new Object[] {ioe}, ioe);
}
}
}
for (final FileTransferIdleWrapper toPutBack : putBack) {
wrapperQueue.offer(toPutBack);
}
}
}
@OnStopped
public void cleanup() {
// close all connections
closeConnections(true);
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(HOSTNAME);
properties.add(UNDEFAULTED_PORT);
properties.add(REMOTE_FILENAME);
properties.add(COMPLETION_STRATEGY);
properties.add(MOVE_DESTINATION_DIR);
properties.add(MOVE_CREATE_DIRECTORY);
return properties;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final StopWatch stopWatch = new StopWatch(true);
final String host = context.getProperty(HOSTNAME).evaluateAttributeExpressions(flowFile).getValue();
final int port = context.getProperty(UNDEFAULTED_PORT).evaluateAttributeExpressions(flowFile).asInteger();
final String filename = context.getProperty(REMOTE_FILENAME).evaluateAttributeExpressions(flowFile).getValue();
// Try to get a FileTransfer object from our cache.
BlockingQueue<FileTransferIdleWrapper> transferQueue;
synchronized (fileTransferMap) {
final Tuple<String, Integer> tuple = new Tuple<>(host, port);
transferQueue = fileTransferMap.get(tuple);
if (transferQueue == null) {
transferQueue = new LinkedBlockingQueue<>();
fileTransferMap.put(tuple, transferQueue);
}
// periodically close idle connections
if (System.currentTimeMillis() - lastClearTime > IDLE_CONNECTION_MILLIS) {
closeConnections(false);
lastClearTime = System.currentTimeMillis();
}
}
// we have a queue of FileTransfer Objects. Get one from the queue or create a new one.
FileTransfer transfer;
FileTransferIdleWrapper transferWrapper = transferQueue.poll();
if (transferWrapper == null) {
transfer = createFileTransfer(context);
} else {
transfer = transferWrapper.getFileTransfer();
}
boolean closeConnection = false;
try {
// Pull data from remote system.
try {
flowFile = transfer.getRemoteFile(filename, flowFile, session);
} catch (final FileNotFoundException e) {
closeConnection = false;
getLogger().log(levelFileNotFound, "Failed to fetch content for {} from filename {} on remote host {} because the file could not be found on the remote system; routing to {}",
new Object[]{flowFile, filename, host, REL_NOT_FOUND.getName()});
session.transfer(session.penalize(flowFile), REL_NOT_FOUND);
session.getProvenanceReporter().route(flowFile, REL_NOT_FOUND);
return;
} catch (final PermissionDeniedException e) {
closeConnection = false;
getLogger().error("Failed to fetch content for {} from filename {} on remote host {} due to insufficient permissions; routing to {}",
new Object[]{flowFile, filename, host, REL_PERMISSION_DENIED.getName()});
session.transfer(session.penalize(flowFile), REL_PERMISSION_DENIED);
session.getProvenanceReporter().route(flowFile, REL_PERMISSION_DENIED);
return;
} catch (final ProcessException | IOException e) {
closeConnection = true;
getLogger().error("Failed to fetch content for {} from filename {} on remote host {}:{} due to {}; routing to comms.failure",
new Object[]{flowFile, filename, host, port, e.toString()}, e);
session.transfer(session.penalize(flowFile), REL_COMMS_FAILURE);
return;
}
// Add FlowFile attributes
final String protocolName = transfer.getProtocolName();
final Map<String, String> attributes = new HashMap<>();
attributes.put(protocolName + ".remote.host", host);
attributes.put(protocolName + ".remote.port", String.valueOf(port));
attributes.put(protocolName + ".remote.filename", filename);
if (filename.contains("/")) {
final String path = StringUtils.substringBeforeLast(filename, "/");
final String filenameOnly = StringUtils.substringAfterLast(filename, "/");
attributes.put(CoreAttributes.PATH.key(), path);
attributes.put(CoreAttributes.FILENAME.key(), filenameOnly);
} else {
attributes.put(CoreAttributes.FILENAME.key(), filename);
}
flowFile = session.putAllAttributes(flowFile, attributes);
// emit provenance event and transfer FlowFile
session.getProvenanceReporter().fetch(flowFile, protocolName + "://" + host + ":" + port + "/" + filename, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(flowFile, REL_SUCCESS);
// it is critical that we commit the session before moving/deleting the remote file. Otherwise, we could have a situation where
// we ingest the data, delete/move the remote file, and then NiFi dies/is shut down before the session is committed. This would
// result in data loss! If we commit the session first, we are safe.
final boolean close = closeConnection;
final BlockingQueue<FileTransferIdleWrapper> queue = transferQueue;
final Runnable cleanupTask = () -> cleanupTransfer(transfer, close, queue, host, port);
final FlowFile flowFileReceived = flowFile;
session.commitAsync(() -> {
performCompletionStrategy(transfer, context, flowFileReceived, filename, host, port);
cleanupTask.run();
}, t -> {
cleanupTask.run();
});
} catch (final Throwable t) {
getLogger().error("Failed to fetch file", t);
if (transfer != null) {
cleanupTransfer(transfer, closeConnection, transferQueue, host, port);
}
}
}
private void cleanupTransfer(final FileTransfer transfer, final boolean closeConnection, final BlockingQueue<FileTransferIdleWrapper> transferQueue, final String host, final int port) {
if (closeConnection) {
getLogger().debug("Closing FileTransfer...");
try {
transfer.close();
} catch (final IOException e) {
getLogger().warn("Failed to close connection to {}:{} due to {}", new Object[]{host, port, e.getMessage()}, e);
}
} else {
getLogger().debug("Returning FileTransfer to pool...");
transferQueue.offer(new FileTransferIdleWrapper(transfer, System.nanoTime()));
}
}
private void performCompletionStrategy(final FileTransfer transfer, final ProcessContext context, final FlowFile flowFile, final String filename, final String host, final int port) {
final String completionStrategy = context.getProperty(COMPLETION_STRATEGY).getValue();
if (COMPLETION_DELETE.getValue().equalsIgnoreCase(completionStrategy)) {
try {
transfer.deleteFile(flowFile, null, filename);
} catch (final FileNotFoundException e) {
// file doesn't exist -- effectively the same as removing it. Move on.
} catch (final IOException ioe) {
getLogger().warn("Successfully fetched the content for {} from {}:{}{} but failed to remove the remote file due to {}",
new Object[]{flowFile, host, port, filename, ioe}, ioe);
}
} else if (COMPLETION_MOVE.getValue().equalsIgnoreCase(completionStrategy)) {
final String targetDir = context.getProperty(MOVE_DESTINATION_DIR).evaluateAttributeExpressions(flowFile).getValue();
final String simpleFilename = StringUtils.substringAfterLast(filename, "/");
try {
final String absoluteTargetDirPath = transfer.getAbsolutePath(flowFile, targetDir);
final File targetFile = new File(absoluteTargetDirPath, simpleFilename);
if (context.getProperty(MOVE_CREATE_DIRECTORY).asBoolean()) {
// Create the target directory if necessary.
transfer.ensureDirectoryExists(flowFile, targetFile.getParentFile());
}
transfer.rename(flowFile, filename, targetFile.getAbsolutePath());
} catch (final IOException ioe) {
getLogger().warn("Successfully fetched the content for {} from {}:{}{} but failed to rename the remote file due to {}",
new Object[]{flowFile, host, port, filename, ioe}, ioe);
}
}
}
/**
* Creates a new instance of a FileTransfer that can be used to pull files from a remote system.
*
* @param context the ProcessContext to use in order to obtain configured properties
* @return a FileTransfer that can be used to pull files from a remote system
*/
protected abstract FileTransfer createFileTransfer(ProcessContext context);
/**
* Wrapper around a FileTransfer object that is used to know when the FileTransfer was last used, so that
* we have the ability to close connections that are "idle," or unused for some period of time.
*/
private static class FileTransferIdleWrapper {
private final FileTransfer fileTransfer;
private final long lastUsed;
public FileTransferIdleWrapper(final FileTransfer fileTransfer, final long lastUsed) {
this.fileTransfer = fileTransfer;
this.lastUsed = lastUsed;
}
public FileTransfer getFileTransfer() {
return fileTransfer;
}
public long getLastUsed() {
return this.lastUsed;
}
}
}