blob: 1082dd7ef3c69ab159e6fc42f1ff93e0a28b634f [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.nifi.processors.standard.util;
import net.schmizz.keepalive.KeepAlive;
import net.schmizz.keepalive.KeepAliveProvider;
import net.schmizz.sshj.Config;
import net.schmizz.sshj.DefaultConfig;
import net.schmizz.sshj.SSHClient;
import net.schmizz.sshj.common.Factory;
import net.schmizz.sshj.connection.ConnectionImpl;
import net.schmizz.sshj.sftp.FileAttributes;
import net.schmizz.sshj.sftp.FileMode;
import net.schmizz.sshj.sftp.RemoteFile;
import net.schmizz.sshj.sftp.RemoteResourceFilter;
import net.schmizz.sshj.sftp.RemoteResourceInfo;
import net.schmizz.sshj.sftp.Response;
import net.schmizz.sshj.sftp.SFTPClient;
import net.schmizz.sshj.sftp.SFTPException;
import net.schmizz.sshj.transport.verification.PromiscuousVerifier;
import net.schmizz.sshj.userauth.keyprovider.KeyFormat;
import net.schmizz.sshj.userauth.keyprovider.KeyProvider;
import net.schmizz.sshj.userauth.keyprovider.KeyProviderUtil;
import net.schmizz.sshj.userauth.method.AuthKeyboardInteractive;
import net.schmizz.sshj.userauth.method.AuthMethod;
import net.schmizz.sshj.userauth.method.AuthPassword;
import net.schmizz.sshj.userauth.method.AuthPublickey;
import net.schmizz.sshj.userauth.method.PasswordResponseProvider;
import net.schmizz.sshj.userauth.password.PasswordFinder;
import net.schmizz.sshj.userauth.password.PasswordUtils;
import net.schmizz.sshj.xfer.FilePermission;
import net.schmizz.sshj.xfer.LocalSourceFile;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.resource.ResourceCardinality;
import org.apache.nifi.components.resource.ResourceType;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.proxy.ProxyConfiguration;
import org.apache.nifi.proxy.ProxySpec;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import static org.apache.nifi.processors.standard.util.FTPTransfer.createComponentProxyConfigSupplier;
public class SFTPTransfer implements FileTransfer {
private static final int KEEP_ALIVE_INTERVAL_SECONDS = 5;
private static final Set<String> DEFAULT_KEY_ALGORITHM_NAMES;
private static final Set<String> DEFAULT_CIPHER_NAMES;
private static final Set<String> DEFAULT_KEY_EXCHANGE_ALGORITHM_NAMES;
static {
DefaultConfig defaultConfig = new DefaultConfig();
DEFAULT_KEY_ALGORITHM_NAMES = Collections.unmodifiableSet(defaultConfig.getKeyAlgorithms().stream()
DEFAULT_CIPHER_NAMES = Collections.unmodifiableSet(defaultConfig.getCipherFactories().stream()
DEFAULT_MESSAGE_AUTHENTICATION_CODE_NAMES = Collections.unmodifiableSet(defaultConfig.getMACFactories().stream()
DEFAULT_KEY_EXCHANGE_ALGORITHM_NAMES = Collections.unmodifiableSet(defaultConfig.getKeyExchangeFactories().stream()
* Converts a set of names into an alphabetically ordered comma separated value list.
* @param factorySetNames The set of names
* @return An alphabetically ordered comma separated value list of names
private static String convertFactorySetToString(Set<String> factorySetNames) {
return factorySetNames
.collect(Collectors.joining(", "));
public static final PropertyDescriptor PRIVATE_KEY_PATH = new PropertyDescriptor.Builder()
.name("Private Key Path")
.description("The fully qualified path to the Private Key file")
.identifiesExternalResource(ResourceCardinality.SINGLE, ResourceType.FILE)
public static final PropertyDescriptor PRIVATE_KEY_PASSPHRASE = new PropertyDescriptor.Builder()
.name("Private Key Passphrase")
.description("Password for the private key")
public static final PropertyDescriptor HOST_KEY_FILE = new PropertyDescriptor.Builder()
.name("Host Key File")
.description("If supplied, the given file will be used as the Host Key;" +
" otherwise, if 'Strict Host Key Checking' property is applied (set to true)" +
" then uses the 'known_hosts' and 'known_hosts2' files from ~/.ssh directory" +
" else no host key file will be used")
.identifiesExternalResource(ResourceCardinality.SINGLE, ResourceType.FILE)
public static final PropertyDescriptor STRICT_HOST_KEY_CHECKING = new PropertyDescriptor.Builder()
.name("Strict Host Key Checking")
.description("Indicates whether or not strict enforcement of hosts keys should be applied")
.allowableValues("true", "false")
public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
.description("The port that the remote system is listening on for file transfers")
public static final PropertyDescriptor USE_KEEPALIVE_ON_TIMEOUT = new PropertyDescriptor.Builder()
.name("Send Keep Alive On Timeout")
.description("Send a Keep Alive message every 5 seconds up to 5 times for an overall timeout of 25 seconds.")
.allowableValues("true", "false")
public static final PropertyDescriptor KEY_ALGORITHMS_ALLOWED = new PropertyDescriptor.Builder()
.name("Key Algorithms Allowed")
.displayName("Key Algorithms Allowed")
.description("A comma-separated list of Key Algorithms allowed for SFTP connections. Leave unset to allow all. Available options are: "
+ convertFactorySetToString(DEFAULT_KEY_ALGORITHM_NAMES))
public static final PropertyDescriptor CIPHERS_ALLOWED = new PropertyDescriptor.Builder()
.name("Ciphers Allowed")
.displayName("Ciphers Allowed")
.description("A comma-separated list of Ciphers allowed for SFTP connections. Leave unset to allow all. Available options are: " + convertFactorySetToString(DEFAULT_CIPHER_NAMES))
public static final PropertyDescriptor MESSAGE_AUTHENTICATION_CODES_ALLOWED = new PropertyDescriptor.Builder()
.name("Message Authentication Codes Allowed")
.displayName("Message Authentication Codes Allowed")
.description("A comma-separated list of Message Authentication Codes allowed for SFTP connections. Leave unset to allow all. Available options are: "
public static final PropertyDescriptor KEY_EXCHANGE_ALGORITHMS_ALLOWED = new PropertyDescriptor.Builder()
.name("Key Exchange Algorithms Allowed")
.displayName("Key Exchange Algorithms Allowed")
.description("A comma-separated list of Key Exchange Algorithms allowed for SFTP connections. Leave unset to allow all. Available options are: "
* Property which is used to decide if the {@link #ensureDirectoryExists(FlowFile, File)} method should perform a {@link SFTPClient#ls(String)} before calling
* {@link SFTPClient#mkdir(String)}. In most cases, the code should call ls before mkdir, but some weird permission setups (chmod 100) on a directory would cause the 'ls' to throw a permission
* exception.
public static final PropertyDescriptor DISABLE_DIRECTORY_LISTING = new PropertyDescriptor.Builder()
.name("Disable Directory Listing")
.description("If set to 'true', directory listing is not performed prior to create missing directories." +
" By default, this processor executes a directory listing command" +
" to see target directory existence before creating missing directories." +
" However, there are situations that you might need to disable the directory listing such as the following." +
" Directory listing might fail with some permission setups (e.g. chmod 100) on a directory." +
" Also, if any other SFTP client created the directory after this processor performed a listing" +
" and before a directory creation request by this processor is finished," +
" then an error is returned because the directory already exists.")
.allowableValues("true", "false")
private static final ProxySpec[] PROXY_SPECS = {ProxySpec.HTTP_AUTH, ProxySpec.SOCKS_AUTH};
public static final PropertyDescriptor PROXY_CONFIGURATION_SERVICE
= ProxyConfiguration.createProxyConfigPropertyDescriptor(true, PROXY_SPECS);
private final ComponentLog logger;
private final PropertyContext ctx;
private SSHClient sshClient;
private SFTPClient sftpClient;
private volatile boolean closed = false;
private String homeDir;
private final boolean disableDirectoryListing;
public SFTPTransfer(final PropertyContext propertyContext, final ComponentLog logger) {
this.ctx = propertyContext;
this.logger = logger;
final PropertyValue disableListing = propertyContext.getProperty(DISABLE_DIRECTORY_LISTING);
disableDirectoryListing = disableListing == null ? false : Boolean.TRUE.equals(disableListing.asBoolean());
public static void validateProxySpec(ValidationContext context, Collection<ValidationResult> results) {
ProxyConfiguration.validateProxySpec(context, results, PROXY_SPECS);
public String getProtocolName() {
return "sftp";
public List<FileInfo> getListing(final boolean applyFilters) throws IOException {
final String path = ctx.getProperty(FileTransfer.REMOTE_PATH).evaluateAttributeExpressions().getValue();
final int depth = 0;
final int maxResults;
final PropertyValue batchSizeValue = ctx.getProperty(FileTransfer.REMOTE_POLL_BATCH_SIZE);
if (batchSizeValue == null) {
maxResults = Integer.MAX_VALUE;
} else {
final Integer configuredValue = batchSizeValue.asInteger();
maxResults = configuredValue == null ? Integer.MAX_VALUE : configuredValue;
final List<FileInfo> listing = new ArrayList<>(1000);
getListing(path, depth, maxResults, listing, applyFilters);
return listing;
protected void getListing(final String path, final int depth, final int maxResults, final List<FileInfo> listing,
final boolean applyFilters) throws IOException {
if (maxResults < 1 || listing.size() >= maxResults) {
if (depth >= 100) {
logger.warn(this + " had to stop recursively searching directories at a recursive depth of " + depth + " to avoid memory issues");
final boolean ignoreDottedFiles = ctx.getProperty(FileTransfer.IGNORE_DOTTED_FILES).asBoolean();
final boolean recurse = ctx.getProperty(FileTransfer.RECURSIVE_SEARCH).asBoolean();
final boolean symlink = ctx.getProperty(FileTransfer.FOLLOW_SYMLINK).asBoolean();
final String fileFilterRegex = ctx.getProperty(FileTransfer.FILE_FILTER_REGEX).getValue();
final Pattern pattern = (fileFilterRegex == null) ? null : Pattern.compile(fileFilterRegex);
final String pathFilterRegex = ctx.getProperty(FileTransfer.PATH_FILTER_REGEX).getValue();
final Pattern pathPattern = (!recurse || pathFilterRegex == null) ? null : Pattern.compile(pathFilterRegex);
final String remotePath = ctx.getProperty(FileTransfer.REMOTE_PATH).evaluateAttributeExpressions().getValue();
// check if this directory path matches the PATH_FILTER_REGEX
boolean pathFilterMatches = true;
if (pathPattern != null) {
Path reldir = path == null ? Paths.get(".") : Paths.get(path);
if (remotePath != null) {
reldir = Paths.get(remotePath).relativize(reldir);
if (reldir != null && !reldir.toString().isEmpty()) {
if (!pathPattern.matcher(reldir.toString().replace("\\", "/")).matches()) {
pathFilterMatches = false;
final SFTPClient sftpClient = getSFTPClient(null);
final boolean isPathMatch = pathFilterMatches;
//subDirs list is used for both 'sub directories' and 'symlinks'
final List<RemoteResourceInfo> subDirs = new ArrayList<>();
try {
final RemoteResourceFilter filter = (entry) -> {
final String entryFilename = entry.getName();
// skip over 'this directory' and 'parent directory' special files regardless of ignoring dot files
if (entryFilename.equals(".") || entryFilename.equals("..")) {
return false;
// skip files and directories that begin with a dot if we're ignoring them
if (ignoreDottedFiles && entryFilename.startsWith(".")) {
return false;
// if is a directory and we're supposed to recurse OR if is a link and we're supposed to follow symlink
if ((recurse && entry.isDirectory()) || (symlink && (entry.getAttributes().getType() == FileMode.Type.SYMLINK))){
return false;
// Since SSHJ does not have the concept of BREAK that JSCH had, we need to move this before the call to listing.add
// below, otherwise we would keep adding to the listings since returning false here doesn't break
if (listing.size() >= maxResults) {
return false;
// if is not a directory and is not a link and it matches FILE_FILTER_REGEX - then let's add it
if (!entry.isDirectory() && !(entry.getAttributes().getType() == FileMode.Type.SYMLINK) && (!applyFilters || isPathMatch)) {
if (pattern == null || !applyFilters || pattern.matcher(entryFilename).matches()) {
listing.add(newFileInfo(entry, path));
return false;
if (path == null || path.trim().isEmpty()) {".", filter);
} else {, filter);
} catch (final SFTPException e) {
final String pathDesc = path == null ? "current directory" : path;
switch (e.getStatusCode()) {
throw new FileNotFoundException("Could not perform listing on " + pathDesc + " because could not find the file on the remote server");
throw new PermissionDeniedException("Could not perform listing on " + pathDesc + " due to insufficient permissions");
throw new IOException(String.format("Failed to obtain file listing for %s due to unexpected SSH_FXP_STATUS (%d)",
pathDesc, e.getStatusCode().getCode()), e);
for (final RemoteResourceInfo entry : subDirs) {
final String entryFilename = entry.getName();
final File newFullPath = new File(path, entryFilename);
final String newFullForwardPath = newFullPath.getPath().replace("\\", "/");
try {
getListing(newFullForwardPath, depth + 1, maxResults, listing, applyFilters);
} catch (final IOException e) {
logger.error("Unable to get listing from " + newFullForwardPath + "; skipping", e);
private FileInfo newFileInfo(final RemoteResourceInfo entry, String path) {
if (entry == null) {
return null;
final File newFullPath = new File(path, entry.getName());
final String newFullForwardPath = newFullPath.getPath().replace("\\", "/");
final StringBuilder permsBuilder = new StringBuilder();
final Set<FilePermission> permissions = entry.getAttributes().getPermissions();
appendPermission(permsBuilder, permissions, FilePermission.USR_R, "r");
appendPermission(permsBuilder, permissions, FilePermission.USR_W, "w");
appendPermission(permsBuilder, permissions, FilePermission.USR_X, "x");
appendPermission(permsBuilder, permissions, FilePermission.GRP_R, "r");
appendPermission(permsBuilder, permissions, FilePermission.GRP_W, "w");
appendPermission(permsBuilder, permissions, FilePermission.GRP_X, "x");
appendPermission(permsBuilder, permissions, FilePermission.OTH_R, "r");
appendPermission(permsBuilder, permissions, FilePermission.OTH_W, "w");
appendPermission(permsBuilder, permissions, FilePermission.OTH_X, "x");
final FileInfo.Builder builder = new FileInfo.Builder()
.lastModifiedTime(entry.getAttributes().getMtime() * 1000L)
private void appendPermission(final StringBuilder builder, final Set<FilePermission> permissions, final FilePermission filePermission, final String permString) {
if (permissions.contains(filePermission)) {
} else {
public FlowFile getRemoteFile(final String remoteFileName, final FlowFile origFlowFile, final ProcessSession session) throws ProcessException, IOException {
final SFTPClient sftpClient = getSFTPClient(origFlowFile);
RemoteFile rf = null;
RemoteFile.ReadAheadRemoteFileInputStream rfis = null;
FlowFile resultFlowFile;
try {
rf =;
rfis = ReadAheadRemoteFileInputStream(16);
final InputStream in = rfis;
resultFlowFile = session.write(origFlowFile, new OutputStreamCallback() {
public void process(final OutputStream out) throws IOException {
StreamUtils.copy(in, out);
return resultFlowFile;
} catch (final SFTPException e) {
switch (e.getStatusCode()) {
throw new FileNotFoundException("Could not find file " + remoteFileName + " on remote SFTP Server");
throw new PermissionDeniedException("Insufficient permissions to read file " + remoteFileName + " from remote SFTP Server", e);
throw new IOException("Failed to obtain file content for " + remoteFileName, e);
} finally {
if(rf != null){
}catch(final IOException ioe){
//do nothing
if(rfis != null){
}catch(final IOException ioe){
//do nothing
public void deleteFile(final FlowFile flowFile, final String path, final String remoteFileName) throws IOException {
final SFTPClient sftpClient = getSFTPClient(flowFile);
final String fullPath = (path == null) ? remoteFileName : (path.endsWith("/")) ? path + remoteFileName : path + "/" + remoteFileName;
try {
} catch (final SFTPException e) {
switch (e.getStatusCode()) {
throw new FileNotFoundException("Could not find file " + remoteFileName + " to remove from remote SFTP Server");
throw new PermissionDeniedException("Insufficient permissions to delete file " + remoteFileName + " from remote SFTP Server", e);
throw new IOException("Failed to delete remote file " + fullPath, e);
public void deleteDirectory(final FlowFile flowFile, final String remoteDirectoryName) throws IOException {
final SFTPClient sftpClient = getSFTPClient(flowFile);
try {
} catch (final SFTPException e) {
throw new IOException("Failed to delete remote directory " + remoteDirectoryName, e);
public void ensureDirectoryExists(final FlowFile flowFile, final File directoryName) throws IOException {
final SFTPClient sftpClient = getSFTPClient(flowFile);
final String remoteDirectory = directoryName.getAbsolutePath().replace("\\", "/").replaceAll("^.\\:", "");
// if we disable the directory listing, we just want to blindly perform the mkdir command,
// eating failure exceptions thrown (like if the directory already exists).
if (disableDirectoryListing) {
try {
// Blindly create the dir.
// The remote directory did not exist, and was created successfully.
} catch (SFTPException e) {
if (e.getStatusCode() == Response.StatusCode.NO_SUCH_FILE) {
// No Such File. This happens when parent directory was not found.
logger.debug(String.format("Could not create %s due to 'No such file'. Will try to create the parent dir.", remoteDirectory));
} else if (e.getStatusCode() == Response.StatusCode.FAILURE) {
// Swallow '4: Failure' including the remote directory already exists.
logger.debug("Could not blindly create remote directory due to " + getMessage(e), e);
} else {
throw new IOException("Could not blindly create remote directory due to " + e.getMessage(), e);
} else {
try {
// Check dir existence.
// The remote directory already exists.
} catch (final SFTPException e) {
if (e.getStatusCode() != Response.StatusCode.NO_SUCH_FILE) {
throw new IOException("Failed to determine if remote directory exists at " + remoteDirectory + " due to " + getMessage(e), e);
// first ensure parent directories exist before creating this one
if (directoryName.getParent() != null && !directoryName.getParentFile().equals(new File(File.separator))) {
ensureDirectoryExists(flowFile, directoryName.getParentFile());
logger.debug("Remote Directory {} does not exist; creating it", remoteDirectory);
try {
logger.debug("Created {}", remoteDirectory);
} catch (final SFTPException e) {
throw new IOException("Failed to create remote directory " + remoteDirectory + " due to " + getMessage(e), e);
private String getMessage(final SFTPException e) {
if (e.getStatusCode() != null) {
return e.getStatusCode().getCode() + ": " + e.getMessage();
} else {
return e.getMessage();
private static final KeepAliveProvider NO_OP_KEEP_ALIVE = new KeepAliveProvider() {
public KeepAlive provide(final ConnectionImpl connection) {
return new KeepAlive(connection, "no-op-keep-alive") {
protected void doKeepAlive() {
// do nothing;
private static final KeepAliveProvider DEFAULT_KEEP_ALIVE_PROVIDER = new KeepAliveProvider() {
public KeepAlive provide(final ConnectionImpl connection) {
final KeepAlive keepAlive = KeepAliveProvider.KEEP_ALIVE.provide(connection);
return keepAlive;
protected KeepAliveProvider getKeepAliveProvider() {
final boolean useKeepAliveOnTimeout = ctx.getProperty(USE_KEEPALIVE_ON_TIMEOUT).asBoolean();
protected SFTPClient getSFTPClient(final FlowFile flowFile) throws IOException {
// If the client is already initialized then compare the host that the client is connected to with the current
// host from the properties/flow-file, and if different then we need to close and reinitialize, if same we can reuse
if (sftpClient != null) {
final String clientHost = sshClient.getRemoteHostname();
final String propertiesHost = ctx.getProperty(HOSTNAME).evaluateAttributeExpressions(flowFile).getValue();
if (clientHost.equals(propertiesHost)) {
// destination matches so we can keep our current session
return sftpClient;
} else {
// this flowFile is going to a different destination, reset session
// Initialize a new SSHClient...
final DefaultConfig sshClientConfig = new DefaultConfig();
final SSHClient sshClient = new SSHClient(sshClientConfig);
// Create a Proxy if the config was specified, proxy will be null if type was NO_PROXY
final Proxy proxy;
final ProxyConfiguration proxyConfig = ProxyConfiguration.getConfiguration(ctx, createComponentProxyConfigSupplier(ctx));
switch (proxyConfig.getProxyType()) {
case HTTP:
case SOCKS:
proxy = proxyConfig.createProxy();
proxy = null;
// If a proxy was specified, configure the client to use a SocketFactory that creates Sockets using the proxy
if (proxy != null) {
sshClient.setSocketFactory(new SocketFactory() {
public Socket createSocket() {
return new Socket(proxy);
public Socket createSocket(String s, int i) {
return new Socket(proxy);
public Socket createSocket(String s, int i, InetAddress inetAddress, int i1) {
return new Socket(proxy);
public Socket createSocket(InetAddress inetAddress, int i) {
return new Socket(proxy);
public Socket createSocket(InetAddress inetAddress, int i, InetAddress inetAddress1, int i1) {
return new Socket(proxy);
// If strict host key checking is false, add a HostKeyVerifier that always returns true
final boolean strictHostKeyChecking = ctx.getProperty(STRICT_HOST_KEY_CHECKING).asBoolean();
if (!strictHostKeyChecking) {
sshClient.addHostKeyVerifier(new PromiscuousVerifier());
// Load known hosts file if specified, otherwise load default
final String hostKeyVal = ctx.getProperty(HOST_KEY_FILE).getValue();
if (hostKeyVal != null) {
sshClient.loadKnownHosts(new File(hostKeyVal));
// Load default known_hosts file only when 'Strict Host Key Checking' property is enabled
} else if (strictHostKeyChecking) {
// Enable compression on the client if specified in properties
final PropertyValue compressionValue = ctx.getProperty(FileTransfer.USE_COMPRESSION);
if (compressionValue != null && "true".equalsIgnoreCase(compressionValue.getValue())) {
// Configure connection timeout
final int connectionTimeoutMillis = ctx.getProperty(FileTransfer.CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
// Connect to the host and port
final String hostname = ctx.getProperty(HOSTNAME).evaluateAttributeExpressions(flowFile).getValue();
final int port = ctx.getProperty(PORT).evaluateAttributeExpressions(flowFile).asInteger();
sshClient.connect(hostname, port);
// Setup authentication methods...
final List<AuthMethod> authMethods = getAuthMethods(sshClient, flowFile);
// Authenticate...
final String username = ctx.getProperty(USERNAME).evaluateAttributeExpressions(flowFile).getValue();
sshClient.auth(username, authMethods);
// At this point we are connected and can create a new SFTPClient which means everything is good
this.sshClient = sshClient;
this.sftpClient = sshClient.newSFTPClient();
this.closed = false;
// Configure timeout for sftp operations
final int dataTimeout = ctx.getProperty(FileTransfer.DATA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
// Attempt to get the home dir
try {
this.homeDir = sftpClient.canonicalize("");
} catch (IOException e) {
this.homeDir = "";
// For some combination of server configuration and user home directory, getHome() can fail with "2: File not found"
// Since homeDir is only used tor SEND provenance event transit uri, this is harmless. Log and continue.
logger.debug("Failed to retrieve {} home directory due to {}", username, e.getMessage());
return sftpClient;
void updateConfigAlgorithms(final Config config) {
if (ctx.getProperty(CIPHERS_ALLOWED).isSet()) {
Set<String> allowedCiphers =","))
.filter(cipherNamed -> allowedCiphers.contains(cipherNamed.getName()))
if (ctx.getProperty(KEY_ALGORITHMS_ALLOWED).isSet()) {
Set<String> allowedKeyAlgorithms =","))
.filter(keyAlgorithmNamed -> allowedKeyAlgorithms.contains(keyAlgorithmNamed.getName()))
if (ctx.getProperty(KEY_EXCHANGE_ALGORITHMS_ALLOWED).isSet()) {
Set<String> allowedKeyExchangeAlgorithms =","))
.filter(keyExchangeNamed -> allowedKeyExchangeAlgorithms.contains(keyExchangeNamed.getName()))
Set<String> allowedMessageAuthenticationCodes =","))
.filter(macNamed -> allowedMessageAuthenticationCodes.contains(macNamed.getName()))
public String getHomeDirectory(final FlowFile flowFile) throws IOException {
return this.homeDir;
public void close() throws IOException {
if (closed) {
closed = true;
try {
if (null != sftpClient) {
} catch (final Exception ex) {
logger.warn("Failed to close SFTPClient due to {}", new Object[] {ex.toString()}, ex);
sftpClient = null;
try {
if (null != sshClient) {
} catch (final Exception ex) {
logger.warn("Failed to close SSHClient due to {}", new Object[] {ex.toString()}, ex);
sshClient = null;
public boolean isClosed() {
return closed;
public FileInfo getRemoteFileInfo(final FlowFile flowFile, final String path, String filename) throws IOException {
final SFTPClient sftpClient = getSFTPClient(flowFile);
final List<RemoteResourceInfo> remoteResources;
try {
remoteResources =;
} catch (final SFTPException e) {
if (e.getStatusCode() == Response.StatusCode.NO_SUCH_FILE) {
return null;
} else {
throw new IOException("Failed to obtain file listing for " + path, e);
RemoteResourceInfo matchingEntry = null;
for (final RemoteResourceInfo entry : remoteResources) {
if (entry.getName().equalsIgnoreCase(filename)) {
matchingEntry = entry;
// Previously JSCH would perform a listing on the full path (path + filename) and would get an exception when it wasn't
// a file and then return null, so to preserve that behavior we return null if the matchingEntry is a directory
if (matchingEntry != null && matchingEntry.isDirectory()) {
return null;
} else {
return newFileInfo(matchingEntry, path);
public String put(final FlowFile flowFile, final String path, final String filename, final InputStream content) throws IOException {
final SFTPClient sftpClient = getSFTPClient(flowFile);
// destination path + filename
final String fullPath = (path == null) ? filename : (path.endsWith("/")) ? path + filename : path + "/" + filename;
// temporary path + filename
String tempFilename = ctx.getProperty(TEMP_FILENAME).evaluateAttributeExpressions(flowFile).getValue();
if (tempFilename == null) {
final boolean dotRename = ctx.getProperty(DOT_RENAME).asBoolean();
tempFilename = dotRename ? "." + filename : filename;
final String tempPath = (path == null) ? tempFilename : (path.endsWith("/")) ? path + tempFilename : path + "/" + tempFilename;
int perms;
final String permissions = ctx.getProperty(PERMISSIONS).evaluateAttributeExpressions(flowFile).getValue();
if (permissions == null || permissions.trim().isEmpty()) {
sftpClient.getFileTransfer().setPreserveAttributes(false); //We will accept whatever the default permissions are of the destination
perms = 0;
} else {
sftpClient.getFileTransfer().setPreserveAttributes(true); //We will use the permissions supplied by evaluating processor property expression
perms = numberPermissions(permissions);
try {
final LocalSourceFile sourceFile = new SFTPFlowFileSourceFile(filename, content, perms);
sftpClient.put(sourceFile, tempPath);
} catch (final SFTPException e) {
throw new IOException("Unable to put content to " + fullPath + " due to " + getMessage(e), e);
final String lastModifiedTime = ctx.getProperty(LAST_MODIFIED_TIME).evaluateAttributeExpressions(flowFile).getValue();
if (lastModifiedTime != null && !lastModifiedTime.trim().isEmpty()) {
try {
final DateFormat formatter = new SimpleDateFormat(FILE_MODIFY_DATE_ATTR_FORMAT, Locale.US);
final Date fileModifyTime = formatter.parse(lastModifiedTime);
int time = (int) (fileModifyTime.getTime() / 1000L);
final FileAttributes tempAttributes = sftpClient.stat(tempPath);
final FileAttributes modifiedAttributes = new FileAttributes.Builder()
.withAtimeMtime(tempAttributes.getAtime(), time)
sftpClient.setattr(tempPath, modifiedAttributes);
} catch (final Exception e) {
logger.error("Failed to set lastModifiedTime on {} to {} due to {}", tempPath, lastModifiedTime, e);
final String owner = ctx.getProperty(REMOTE_OWNER).evaluateAttributeExpressions(flowFile).getValue();
if (owner != null && !owner.trim().isEmpty()) {
try {
sftpClient.chown(tempPath, Integer.parseInt(owner));
} catch (final Exception e) {
logger.error("Failed to set owner on {} to {} due to {}", tempPath, owner, e);
final String group = ctx.getProperty(REMOTE_GROUP).evaluateAttributeExpressions(flowFile).getValue();
if (group != null && !group.trim().isEmpty()) {
try {
sftpClient.chgrp(tempPath, Integer.parseInt(group));
} catch (final Exception e) {
logger.error("Failed to set group on {} to {} due to {}", tempPath, group, e);
if (!filename.equals(tempFilename)) {
try {
sftpClient.rename(tempPath, fullPath);
} catch (final SFTPException e) {
try {
throw new IOException("Failed to rename dot-file to " + fullPath + " due to " + getMessage(e), e);
} catch (final SFTPException e1) {
throw new IOException("Failed to rename dot-file to " + fullPath + " and failed to delete it when attempting to clean up", e1);
return fullPath;
public void rename(final FlowFile flowFile, final String source, final String target) throws IOException {
final SFTPClient sftpClient = getSFTPClient(flowFile);
try {
sftpClient.rename(source, target);
} catch (final SFTPException e) {
switch (e.getStatusCode()) {
throw new FileNotFoundException("No such file or directory");
throw new PermissionDeniedException("Could not rename remote file " + source + " to " + target + " due to insufficient permissions");
throw new IOException(e);
protected int numberPermissions(String perms) {
int number = -1;
final Pattern rwxPattern = Pattern.compile("^[rwx-]{9}$");
final Pattern numPattern = Pattern.compile("\\d+");
if (rwxPattern.matcher(perms).matches()) {
number = 0;
if (perms.charAt(0) == 'r') {
number |= 0x100;
if (perms.charAt(1) == 'w') {
number |= 0x80;
if (perms.charAt(2) == 'x') {
number |= 0x40;
if (perms.charAt(3) == 'r') {
number |= 0x20;
if (perms.charAt(4) == 'w') {
number |= 0x10;
if (perms.charAt(5) == 'x') {
number |= 0x8;
if (perms.charAt(6) == 'r') {
number |= 0x4;
if (perms.charAt(7) == 'w') {
number |= 0x2;
if (perms.charAt(8) == 'x') {
number |= 0x1;
} else if (numPattern.matcher(perms).matches()) {
try {
number = Integer.parseInt(perms, 8);
} catch (NumberFormatException ignore) {
return number;
protected List<AuthMethod> getAuthMethods(final SSHClient client, final FlowFile flowFile) {
final List<AuthMethod> authMethods = new ArrayList<>();
final String privateKeyPath = ctx.getProperty(PRIVATE_KEY_PATH).evaluateAttributeExpressions(flowFile).getValue();
if (privateKeyPath != null) {
final String privateKeyPassphrase = ctx.getProperty(PRIVATE_KEY_PASSPHRASE).evaluateAttributeExpressions(flowFile).getValue();
final KeyProvider keyProvider = getKeyProvider(client, privateKeyPath, privateKeyPassphrase);
final AuthMethod authPublicKey = new AuthPublickey(keyProvider);
final String password = ctx.getProperty(FileTransfer.PASSWORD).evaluateAttributeExpressions(flowFile).getValue();
if (password != null) {
final AuthMethod authPassword = new AuthPassword(getPasswordFinder(password));
final PasswordResponseProvider passwordProvider = new PasswordResponseProvider(getPasswordFinder(password));
final AuthMethod authKeyboardInteractive = new AuthKeyboardInteractive(passwordProvider);
if (logger.isDebugEnabled()) {
final List<String> methods =;
logger.debug("Authentication Methods Configured {}", methods);
return authMethods;
private KeyProvider getKeyProvider(final SSHClient client, final String privateKeyLocation, final String privateKeyPassphrase) {
final KeyFormat keyFormat = getKeyFormat(privateKeyLocation);
logger.debug("Loading Private Key File [{}] Format [{}]", privateKeyLocation, keyFormat);
try {
return privateKeyPassphrase == null ? client.loadKeys(privateKeyLocation) : client.loadKeys(privateKeyLocation, privateKeyPassphrase);
} catch (final IOException e) {
throw new ProcessException(String.format("Loading Private Key File [%s] Format [%s] Failed", privateKeyLocation, keyFormat), e);
private KeyFormat getKeyFormat(final String privateKeyLocation) {
try {
final File privateKeyFile = new File(privateKeyLocation);
return KeyProviderUtil.detectKeyFileFormat(privateKeyFile);
} catch (final IOException e) {
throw new ProcessException(String.format("Reading Private Key File [%s] Format Failed", privateKeyLocation), e);
private PasswordFinder getPasswordFinder(final String password) {
return PasswordUtils.createOneOff(password.toCharArray());