blob: 6986baf36caad8496f2d4c7662e09af47b07701f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.sftp;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URLDecoder;
import java.util.ArrayList;
import java.util.Vector;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Progressable;
import com.jcraft.jsch.ChannelSftp;
import com.jcraft.jsch.ChannelSftp.LsEntry;
import com.jcraft.jsch.SftpATTRS;
import com.jcraft.jsch.SftpException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** SFTP FileSystem. */
public class SFTPFileSystem extends FileSystem {
public static final Logger LOG =
LoggerFactory.getLogger(SFTPFileSystem.class);
private SFTPConnectionPool connectionPool;
private URI uri;
private static final int DEFAULT_SFTP_PORT = 22;
private static final int DEFAULT_MAX_CONNECTION = 5;
public static final int DEFAULT_BUFFER_SIZE = 1024 * 1024;
public static final int DEFAULT_BLOCK_SIZE = 4 * 1024;
public static final String FS_SFTP_USER_PREFIX = "fs.sftp.user.";
public static final String FS_SFTP_PASSWORD_PREFIX = "fs.sftp.password.";
public static final String FS_SFTP_HOST = "fs.sftp.host";
public static final String FS_SFTP_HOST_PORT = "fs.sftp.host.port";
public static final String FS_SFTP_KEYFILE = "fs.sftp.keyfile";
public static final String FS_SFTP_CONNECTION_MAX = "fs.sftp.connection.max";
public static final String E_SAME_DIRECTORY_ONLY =
"only same directory renames are supported";
public static final String E_HOST_NULL = "Invalid host specified";
public static final String E_USER_NULL =
"No user specified for sftp connection. Expand URI or credential file.";
public static final String E_PATH_DIR = "Path %s is a directory.";
public static final String E_FILE_STATUS = "Failed to get file status";
public static final String E_FILE_NOTFOUND = "File %s does not exist.";
public static final String E_FILE_EXIST = "File already exists: %s";
public static final String E_CREATE_DIR =
"create(): Mkdirs failed to create: %s";
public static final String E_DIR_CREATE_FROMFILE =
"Can't make directory for path %s since it is a file.";
public static final String E_MAKE_DIR_FORPATH =
"Can't make directory for path \"%s\" under \"%s\".";
public static final String E_DIR_NOTEMPTY = "Directory: %s is not empty.";
public static final String E_FILE_CHECK_FAILED = "File check failed";
public static final String E_NOT_SUPPORTED = "Not supported";
public static final String E_SPATH_NOTEXIST = "Source path %s does not exist";
public static final String E_DPATH_EXIST =
"Destination path %s already exist, cannot rename!";
public static final String E_FAILED_GETHOME = "Failed to get home directory";
public static final String E_FAILED_DISCONNECT = "Failed to disconnect";
/**
* Set configuration from UI.
*
* @param uri
* @param conf
* @throws IOException
*/
private void setConfigurationFromURI(URI uriInfo, Configuration conf)
throws IOException {
// get host information from URI
String host = uriInfo.getHost();
host = (host == null) ? conf.get(FS_SFTP_HOST, null) : host;
if (host == null) {
throw new IOException(E_HOST_NULL);
}
conf.set(FS_SFTP_HOST, host);
int port = uriInfo.getPort();
port = (port == -1)
? conf.getInt(FS_SFTP_HOST_PORT, DEFAULT_SFTP_PORT)
: port;
conf.setInt(FS_SFTP_HOST_PORT, port);
// get user/password information from URI
String userAndPwdFromUri = uriInfo.getUserInfo();
if (userAndPwdFromUri != null) {
String[] userPasswdInfo = userAndPwdFromUri.split(":");
String user = userPasswdInfo[0];
user = URLDecoder.decode(user, "UTF-8");
conf.set(FS_SFTP_USER_PREFIX + host, user);
if (userPasswdInfo.length > 1) {
conf.set(FS_SFTP_PASSWORD_PREFIX + host + "." +
user, userPasswdInfo[1]);
}
}
String user = conf.get(FS_SFTP_USER_PREFIX + host);
if (user == null || user.equals("")) {
throw new IllegalStateException(E_USER_NULL);
}
int connectionMax =
conf.getInt(FS_SFTP_CONNECTION_MAX, DEFAULT_MAX_CONNECTION);
connectionPool = new SFTPConnectionPool(connectionMax);
}
/**
* Connecting by using configuration parameters.
*
* @return An FTPClient instance
* @throws IOException
*/
private ChannelSftp connect() throws IOException {
Configuration conf = getConf();
String host = conf.get(FS_SFTP_HOST, null);
int port = conf.getInt(FS_SFTP_HOST_PORT, DEFAULT_SFTP_PORT);
String user = conf.get(FS_SFTP_USER_PREFIX + host, null);
String pwd = conf.get(FS_SFTP_PASSWORD_PREFIX + host + "." + user, null);
String keyFile = conf.get(FS_SFTP_KEYFILE, null);
ChannelSftp channel =
connectionPool.connect(host, port, user, pwd, keyFile);
return channel;
}
/**
* Logout and disconnect the given channel.
*
* @param client
* @throws IOException
*/
private void disconnect(ChannelSftp channel) throws IOException {
connectionPool.disconnect(channel);
}
/**
* Resolve against given working directory.
*
* @param workDir
* @param path
* @return absolute path
*/
private Path makeAbsolute(Path workDir, Path path) {
if (path.isAbsolute()) {
return path;
}
return new Path(workDir, path);
}
/**
* Convenience method, so that we don't open a new connection when using this
* method from within another method. Otherwise every API invocation incurs
* the overhead of opening/closing a TCP connection.
* @throws IOException
*/
private boolean exists(ChannelSftp channel, Path file) throws IOException {
try {
getFileStatus(channel, file);
return true;
} catch (FileNotFoundException fnfe) {
return false;
} catch (IOException ioe) {
throw new IOException(E_FILE_STATUS, ioe);
}
}
/**
* Convenience method, so that we don't open a new connection when using this
* method from within another method. Otherwise every API invocation incurs
* the overhead of opening/closing a TCP connection.
*/
@SuppressWarnings("unchecked")
private FileStatus getFileStatus(ChannelSftp client, Path file)
throws IOException {
FileStatus fileStat = null;
Path workDir;
try {
workDir = new Path(client.pwd());
} catch (SftpException e) {
throw new IOException(e);
}
Path absolute = makeAbsolute(workDir, file);
Path parentPath = absolute.getParent();
if (parentPath == null) { // root directory
long length = -1; // Length of root directory on server not known
boolean isDir = true;
int blockReplication = 1;
long blockSize = DEFAULT_BLOCK_SIZE; // Block Size not known.
long modTime = -1; // Modification time of root directory not known.
Path root = new Path("/");
return new FileStatus(length, isDir, blockReplication, blockSize,
modTime,
root.makeQualified(this.getUri(), this.getWorkingDirectory()));
}
String pathName = parentPath.toUri().getPath();
Vector<LsEntry> sftpFiles;
try {
sftpFiles = (Vector<LsEntry>) client.ls(pathName);
} catch (SftpException e) {
throw new FileNotFoundException(String.format(E_FILE_NOTFOUND, file));
}
if (sftpFiles != null) {
for (LsEntry sftpFile : sftpFiles) {
if (sftpFile.getFilename().equals(file.getName())) {
// file found in directory
fileStat = getFileStatus(client, sftpFile, parentPath);
break;
}
}
if (fileStat == null) {
throw new FileNotFoundException(String.format(E_FILE_NOTFOUND, file));
}
} else {
throw new FileNotFoundException(String.format(E_FILE_NOTFOUND, file));
}
return fileStat;
}
/**
* Convert the file information in LsEntry to a {@link FileStatus} object. *
*
* @param sftpFile
* @param parentPath
* @return file status
* @throws IOException
*/
private FileStatus getFileStatus(ChannelSftp channel, LsEntry sftpFile,
Path parentPath) throws IOException {
SftpATTRS attr = sftpFile.getAttrs();
long length = attr.getSize();
boolean isDir = attr.isDir();
boolean isLink = attr.isLink();
if (isLink) {
String link = parentPath.toUri().getPath() + "/" + sftpFile.getFilename();
try {
link = channel.realpath(link);
Path linkParent = new Path("/", link);
FileStatus fstat = getFileStatus(channel, linkParent);
isDir = fstat.isDirectory();
length = fstat.getLen();
} catch (Exception e) {
throw new IOException(e);
}
}
int blockReplication = 1;
// Using default block size since there is no way in SFTP channel to know of
// block sizes on server. The assumption could be less than ideal.
long blockSize = DEFAULT_BLOCK_SIZE;
long modTime = attr.getMTime() * 1000L; // convert to milliseconds
long accessTime = attr.getATime() * 1000L;
FsPermission permission = getPermissions(sftpFile);
// not be able to get the real user group name, just use the user and group
// id
String user = Integer.toString(attr.getUId());
String group = Integer.toString(attr.getGId());
Path filePath = new Path(parentPath, sftpFile.getFilename());
return new FileStatus(length, isDir, blockReplication, blockSize, modTime,
accessTime, permission, user, group, filePath.makeQualified(
this.getUri(), this.getWorkingDirectory()));
}
/**
* Return file permission.
*
* @param sftpFile
* @return file permission
*/
private FsPermission getPermissions(LsEntry sftpFile) {
return new FsPermission((short) sftpFile.getAttrs().getPermissions());
}
/**
* Convenience method, so that we don't open a new connection when using this
* method from within another method. Otherwise every API invocation incurs
* the overhead of opening/closing a TCP connection.
*/
private boolean mkdirs(ChannelSftp client, Path file, FsPermission permission)
throws IOException {
boolean created = true;
Path workDir;
try {
workDir = new Path(client.pwd());
} catch (SftpException e) {
throw new IOException(e);
}
Path absolute = makeAbsolute(workDir, file);
String pathName = absolute.getName();
if (!exists(client, absolute)) {
Path parent = absolute.getParent();
created =
(parent == null || mkdirs(client, parent, FsPermission.getDefault()));
if (created) {
String parentDir = parent.toUri().getPath();
boolean succeeded = true;
try {
final String previousCwd = client.pwd();
client.cd(parentDir);
client.mkdir(pathName);
client.cd(previousCwd);
} catch (SftpException e) {
throw new IOException(String.format(E_MAKE_DIR_FORPATH, pathName,
parentDir));
}
created = created & succeeded;
}
} else if (isFile(client, absolute)) {
throw new IOException(String.format(E_DIR_CREATE_FROMFILE, absolute));
}
return created;
}
/**
* Convenience method, so that we don't open a new connection when using this
* method from within another method. Otherwise every API invocation incurs
* the overhead of opening/closing a TCP connection.
* @throws IOException
*/
private boolean isFile(ChannelSftp channel, Path file) throws IOException {
try {
return !getFileStatus(channel, file).isDirectory();
} catch (FileNotFoundException e) {
return false; // file does not exist
} catch (IOException ioe) {
throw new IOException(E_FILE_CHECK_FAILED, ioe);
}
}
/**
* Convenience method, so that we don't open a new connection when using this
* method from within another method. Otherwise every API invocation incurs
* the overhead of opening/closing a TCP connection.
*/
private boolean delete(ChannelSftp channel, Path file, boolean recursive)
throws IOException {
Path workDir;
try {
workDir = new Path(channel.pwd());
} catch (SftpException e) {
throw new IOException(e);
}
Path absolute = makeAbsolute(workDir, file);
String pathName = absolute.toUri().getPath();
FileStatus fileStat = null;
try {
fileStat = getFileStatus(channel, absolute);
} catch (FileNotFoundException e) {
// file not found, no need to delete, return true
return false;
}
if (!fileStat.isDirectory()) {
boolean status = true;
try {
channel.rm(pathName);
} catch (SftpException e) {
status = false;
}
return status;
} else {
boolean status = true;
FileStatus[] dirEntries = listStatus(channel, absolute);
if (dirEntries != null && dirEntries.length > 0) {
if (!recursive) {
throw new IOException(String.format(E_DIR_NOTEMPTY, file));
}
for (int i = 0; i < dirEntries.length; ++i) {
delete(channel, new Path(absolute, dirEntries[i].getPath()),
recursive);
}
}
try {
channel.rmdir(pathName);
} catch (SftpException e) {
status = false;
}
return status;
}
}
/**
* Convenience method, so that we don't open a new connection when using this
* method from within another method. Otherwise every API invocation incurs
* the overhead of opening/closing a TCP connection.
*/
@SuppressWarnings("unchecked")
private FileStatus[] listStatus(ChannelSftp client, Path file)
throws IOException {
Path workDir;
try {
workDir = new Path(client.pwd());
} catch (SftpException e) {
throw new IOException(e);
}
Path absolute = makeAbsolute(workDir, file);
FileStatus fileStat = getFileStatus(client, absolute);
if (!fileStat.isDirectory()) {
return new FileStatus[] {fileStat};
}
Vector<LsEntry> sftpFiles;
try {
sftpFiles = (Vector<LsEntry>) client.ls(absolute.toUri().getPath());
} catch (SftpException e) {
throw new IOException(e);
}
ArrayList<FileStatus> fileStats = new ArrayList<FileStatus>();
for (int i = 0; i < sftpFiles.size(); i++) {
LsEntry entry = sftpFiles.get(i);
String fname = entry.getFilename();
// skip current and parent directory, ie. "." and ".."
if (!".".equalsIgnoreCase(fname) && !"..".equalsIgnoreCase(fname)) {
fileStats.add(getFileStatus(client, entry, absolute));
}
}
return fileStats.toArray(new FileStatus[fileStats.size()]);
}
/**
* Convenience method, so that we don't open a new connection when using this
* method from within another method. Otherwise every API invocation incurs
* the overhead of opening/closing a TCP connection.
*
* @param channel
* @param src
* @param dst
* @return rename successful?
* @throws IOException
*/
private boolean rename(ChannelSftp channel, Path src, Path dst)
throws IOException {
Path workDir;
try {
workDir = new Path(channel.pwd());
} catch (SftpException e) {
throw new IOException(e);
}
Path absoluteSrc = makeAbsolute(workDir, src);
Path absoluteDst = makeAbsolute(workDir, dst);
if (!exists(channel, absoluteSrc)) {
throw new IOException(String.format(E_SPATH_NOTEXIST, src));
}
if (exists(channel, absoluteDst)) {
throw new IOException(String.format(E_DPATH_EXIST, dst));
}
boolean renamed = true;
try {
final String previousCwd = channel.pwd();
channel.cd("/");
channel.rename(src.toUri().getPath(), dst.toUri().getPath());
channel.cd(previousCwd);
} catch (SftpException e) {
renamed = false;
}
return renamed;
}
@Override
public void initialize(URI uriInfo, Configuration conf) throws IOException {
super.initialize(uriInfo, conf);
setConfigurationFromURI(uriInfo, conf);
setConf(conf);
this.uri = uriInfo;
}
@Override
public URI getUri() {
return uri;
}
@Override
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
ChannelSftp channel = connect();
Path workDir;
try {
workDir = new Path(channel.pwd());
} catch (SftpException e) {
throw new IOException(e);
}
Path absolute = makeAbsolute(workDir, f);
FileStatus fileStat = getFileStatus(channel, absolute);
if (fileStat.isDirectory()) {
disconnect(channel);
throw new IOException(String.format(E_PATH_DIR, f));
}
InputStream is;
try {
// the path could be a symbolic link, so get the real path
absolute = new Path("/", channel.realpath(absolute.toUri().getPath()));
is = channel.get(absolute.toUri().getPath());
} catch (SftpException e) {
throw new IOException(e);
}
FSDataInputStream fis =
new FSDataInputStream(new SFTPInputStream(is, channel, statistics));
return fis;
}
/**
* A stream obtained via this call must be closed before using other APIs of
* this class or else the invocation will block.
*/
@Override
public FSDataOutputStream create(Path f, FsPermission permission,
boolean overwrite, int bufferSize, short replication, long blockSize,
Progressable progress) throws IOException {
final ChannelSftp client = connect();
Path workDir;
try {
workDir = new Path(client.pwd());
} catch (SftpException e) {
throw new IOException(e);
}
Path absolute = makeAbsolute(workDir, f);
if (exists(client, f)) {
if (overwrite) {
delete(client, f, false);
} else {
disconnect(client);
throw new IOException(String.format(E_FILE_EXIST, f));
}
}
Path parent = absolute.getParent();
if (parent == null || !mkdirs(client, parent, FsPermission.getDefault())) {
parent = (parent == null) ? new Path("/") : parent;
disconnect(client);
throw new IOException(String.format(E_CREATE_DIR, parent));
}
OutputStream os;
try {
final String previousCwd = client.pwd();
client.cd(parent.toUri().getPath());
os = client.put(f.getName());
client.cd(previousCwd);
} catch (SftpException e) {
throw new IOException(e);
}
FSDataOutputStream fos = new FSDataOutputStream(os, statistics) {
@Override
public void close() throws IOException {
super.close();
disconnect(client);
}
};
return fos;
}
@Override
public FSDataOutputStream append(Path f, int bufferSize,
Progressable progress)
throws IOException {
throw new IOException(E_NOT_SUPPORTED);
}
/*
* The parent of source and destination can be different. It is suppose to
* work like 'move'
*/
@Override
public boolean rename(Path src, Path dst) throws IOException {
ChannelSftp channel = connect();
try {
boolean success = rename(channel, src, dst);
return success;
} finally {
disconnect(channel);
}
}
@Override
public boolean delete(Path f, boolean recursive) throws IOException {
ChannelSftp channel = connect();
try {
boolean success = delete(channel, f, recursive);
return success;
} finally {
disconnect(channel);
}
}
@Override
public FileStatus[] listStatus(Path f) throws IOException {
ChannelSftp client = connect();
try {
FileStatus[] stats = listStatus(client, f);
return stats;
} finally {
disconnect(client);
}
}
@Override
public void setWorkingDirectory(Path newDir) {
// we do not maintain the working directory state
}
@Override
public Path getWorkingDirectory() {
// Return home directory always since we do not maintain state.
return getHomeDirectory();
}
@Override
public Path getHomeDirectory() {
ChannelSftp channel = null;
try {
channel = connect();
Path homeDir = new Path(channel.pwd());
return homeDir;
} catch (Exception ioe) {
return null;
} finally {
try {
disconnect(channel);
} catch (IOException ioe) {
return null;
}
}
}
@Override
public boolean mkdirs(Path f, FsPermission permission) throws IOException {
ChannelSftp client = connect();
try {
boolean success = mkdirs(client, f, permission);
return success;
} finally {
disconnect(client);
}
}
@Override
public FileStatus getFileStatus(Path f) throws IOException {
ChannelSftp channel = connect();
try {
FileStatus status = getFileStatus(channel, f);
return status;
} finally {
disconnect(channel);
}
}
}