| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.sshd.sftp.server; |
| |
| import java.io.EOFException; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.OutputStream; |
| import java.io.StreamCorruptedException; |
| import java.net.UnknownServiceException; |
| import java.nio.charset.StandardCharsets; |
| import java.nio.file.AccessDeniedException; |
| import java.nio.file.FileSystem; |
| import java.nio.file.FileSystemLoopException; |
| import java.nio.file.FileSystems; |
| import java.nio.file.Files; |
| import java.nio.file.LinkOption; |
| import java.nio.file.NoSuchFileException; |
| import java.nio.file.NotDirectoryException; |
| import java.nio.file.Path; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Comparator; |
| import java.util.Deque; |
| import java.util.LinkedList; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.TreeMap; |
| import java.util.concurrent.BlockingQueue; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.concurrent.atomic.AtomicReference; |
| |
| import org.apache.sshd.common.Factory; |
| import org.apache.sshd.common.FactoryManager; |
| import org.apache.sshd.common.channel.AbstractChannel; |
| import org.apache.sshd.common.channel.AbstractChannel.PacketValidator; |
| import org.apache.sshd.common.channel.BufferedIoOutputStream; |
| import org.apache.sshd.common.channel.LocalWindow; |
| import org.apache.sshd.common.digest.BuiltinDigests; |
| import org.apache.sshd.common.digest.DigestFactory; |
| import org.apache.sshd.common.file.FileSystemAware; |
| import org.apache.sshd.common.io.IoInputStream; |
| import org.apache.sshd.common.io.IoOutputStream; |
| import org.apache.sshd.common.random.Random; |
| import org.apache.sshd.common.session.Session; |
| import org.apache.sshd.common.util.ValidateUtils; |
| import org.apache.sshd.common.util.buffer.Buffer; |
| import org.apache.sshd.common.util.buffer.BufferUtils; |
| import org.apache.sshd.common.util.buffer.ByteArrayBuffer; |
| import org.apache.sshd.common.util.io.IoUtils; |
| import org.apache.sshd.common.util.threads.CloseableExecutorService; |
| import org.apache.sshd.common.util.threads.ExecutorServiceCarrier; |
| import org.apache.sshd.common.util.threads.ThreadUtils; |
| import org.apache.sshd.server.Environment; |
| import org.apache.sshd.server.ExitCallback; |
| import org.apache.sshd.server.channel.ChannelDataReceiver; |
| import org.apache.sshd.server.channel.ChannelSession; |
| import org.apache.sshd.server.channel.ChannelSessionAware; |
| import org.apache.sshd.server.command.AsyncCommand; |
| import org.apache.sshd.server.command.AsyncCommandErrorStreamAware; |
| import org.apache.sshd.server.command.Command; |
| import org.apache.sshd.server.command.CommandDirectErrorStreamAware; |
| import org.apache.sshd.server.session.ServerSession; |
| import org.apache.sshd.sftp.SftpModuleProperties; |
| import org.apache.sshd.sftp.client.fs.SftpPath; |
| import org.apache.sshd.sftp.client.impl.SftpPathImpl; |
| import org.apache.sshd.sftp.common.SftpConstants; |
| import org.apache.sshd.sftp.common.SftpException; |
| import org.apache.sshd.sftp.common.SftpHelper; |
| |
| /** |
| * SFTP subsystem |
| * |
| * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a> |
| */ |
| public class SftpSubsystem |
| extends AbstractSftpSubsystemHelper |
| implements Command, Runnable, FileSystemAware, ExecutorServiceCarrier, |
| AsyncCommand, ChannelDataReceiver { |
| protected static final Buffer CLOSE = new ByteArrayBuffer(null, 0, 0); |
| |
| protected final AtomicBoolean closed = new AtomicBoolean(false); |
| protected final AtomicLong requestsCount = new AtomicLong(0L); |
| protected final Map<String, byte[]> extensions = new TreeMap<>(Comparator.naturalOrder()); |
| protected final Map<String, Handle> handles = new ConcurrentHashMap<>(); |
| protected final Buffer buffer = new ByteArrayBuffer(1024); |
| protected final BlockingQueue<Buffer> requests = new LinkedBlockingQueue<>(); |
| |
| protected ExitCallback callback; |
| protected IoOutputStream out; |
| protected Environment env; |
| protected Random randomizer; |
| protected int maxHandleCount = Integer.MAX_VALUE - 1; |
| protected Deque<String> unusedHandles; |
| protected int fileHandleSize = SftpModuleProperties.DEFAULT_FILE_HANDLE_SIZE; |
| protected int maxFileHandleRounds = SftpModuleProperties.DEFAULT_FILE_HANDLE_ROUNDS; |
| protected Future<?> pendingFuture; |
| protected byte[] workBuf = new byte[Math.max(SftpModuleProperties.DEFAULT_FILE_HANDLE_SIZE, Integer.BYTES)]; |
| protected FileSystem fileSystem = FileSystems.getDefault(); |
| protected Path defaultDir = fileSystem.getPath("").toAbsolutePath().normalize(); |
| protected int version; |
| |
| protected CloseableExecutorService executorService; |
| |
| private final ServerSession serverSession; |
| |
| /** |
| * @param channel The {@link ChannelSession} through which the command was received |
| * @param configurator The {@link SftpSubsystemConfigurator} to use |
| */ |
| public SftpSubsystem(ChannelSession channel, SftpSubsystemConfigurator configurator) { |
| super(channel, configurator); |
| |
| CloseableExecutorService executorService = configurator.getExecutorService(); |
| if (executorService == null) { |
| // See SSHD-1148 - use different thread name for concurrently running instances |
| this.executorService = ThreadUtils.newSingleThreadExecutor( |
| getClass().getSimpleName() + "-" + Math.abs(System.nanoTime() & 0xFFFF)); |
| } else { |
| this.executorService = executorService; |
| } |
| |
| serverSession = Objects.requireNonNull(channel.getServerSession(), "No session associated with the channel"); |
| initializeSessionRelatedMember(serverSession, channel); |
| |
| ChannelDataReceiver errorDataChannelReceiver |
| = resolveErrorDataChannelReceiver(channel, configurator.getErrorChannelDataReceiver()); |
| channel.setDataReceiver(this); |
| channel.setExtendedDataWriter(errorDataChannelReceiver); |
| |
| SftpErrorStatusDataHandler errHandler = getErrorStatusDataHandler(); |
| if (errHandler instanceof ChannelSessionAware) { |
| ((ChannelSessionAware) errHandler).setChannelSession(channel); |
| } |
| } |
| |
| protected ChannelDataReceiver resolveErrorDataChannelReceiver(ChannelSession channelSession, ChannelDataReceiver receiver) { |
| return (receiver != null) ? receiver : new ChannelDataReceiver() { |
| @Override |
| @SuppressWarnings("synthetic-access") |
| public void close() throws IOException { |
| if (log.isDebugEnabled()) { |
| log.debug("stderrData({}) closing", getSession()); |
| } |
| |
| } |
| |
| @Override |
| @SuppressWarnings("synthetic-access") |
| public int data(ChannelSession channel, byte[] buf, int start, int len) throws IOException { |
| if (log.isDebugEnabled()) { |
| log.debug("stderrData({}) received {} data bytes", channel, len); |
| } |
| return len; |
| } |
| }; |
| } |
| |
| @Override |
| public int getVersion() { |
| return version; |
| } |
| |
| @Override |
| public Path getDefaultDirectory() { |
| return defaultDir; |
| } |
| |
| @Override |
| public CloseableExecutorService getExecutorService() { |
| return executorService; |
| } |
| |
| protected void initializeSessionRelatedMember(ServerSession session, ChannelSession channel) { |
| FactoryManager manager = session.getFactoryManager(); |
| Factory<? extends Random> factory = manager.getRandomFactory(); |
| this.randomizer = factory.create(); |
| |
| this.maxHandleCount = SftpModuleProperties.MAX_OPEN_HANDLES_PER_SESSION.getRequired(session); |
| this.fileHandleSize = SftpModuleProperties.FILE_HANDLE_SIZE.getRequired(channel); |
| this.maxFileHandleRounds = SftpModuleProperties.MAX_FILE_HANDLE_RAND_ROUNDS.getRequired(channel); |
| |
| if (this.fileHandleSize > 4) { |
| // WS_FTP <= 12.9 has a bug concerning SFTP file handle sizes. If the handle size is > 4, it sends (handle |
| // size - 4) bytes too many in the SSH_MSG_CHANNEL_DATA packets. To work around this client-side bug, |
| // install a more lenient {@link PacketValidator} on the channel. |
| PacketValidator validator = (packetSize, maximumPacketSize, extendedData) -> { |
| if (AbstractChannel.DEFAULT_PACKET_VALIDATOR.isValid(packetSize, maximumPacketSize, extendedData)) { |
| return true; |
| } |
| // Packet size is larger. Accept only if it's a SSH_MSG_CHANNEL_DATA packet exactly the file handle size |
| // difference larger. |
| if (!extendedData && packetSize == maximumPacketSize + fileHandleSize - 4) { |
| return true; |
| } |
| return false; |
| }; |
| channel.setPacketValidator(validator); |
| } else { |
| this.unusedHandles = new LinkedList<>(); |
| } |
| if (workBuf.length < this.fileHandleSize) { |
| workBuf = new byte[this.fileHandleSize]; |
| } |
| } |
| |
| @Override |
| public ServerSession getServerSession() { |
| return serverSession; |
| } |
| |
| @Override |
| public void setFileSystem(FileSystem fileSystem) { |
| // reference check on purpose |
| if (fileSystem != this.fileSystem) { |
| this.fileSystem = fileSystem; |
| this.defaultDir = fileSystem.getPath("").toAbsolutePath().normalize(); |
| } |
| } |
| |
| @Override |
| public void setExitCallback(ExitCallback callback) { |
| this.callback = callback; |
| } |
| |
| @Override |
| public void setInputStream(InputStream in) { |
| // Do nothing |
| } |
| |
| @Override |
| public void setOutputStream(OutputStream out) { |
| // Do nothing |
| } |
| |
| @Override |
| public void setErrorStream(OutputStream err) { |
| SftpErrorStatusDataHandler errHandler = getErrorStatusDataHandler(); |
| if (errHandler instanceof CommandDirectErrorStreamAware) { |
| ((CommandDirectErrorStreamAware) errHandler).setErrorStream(err); |
| } |
| } |
| |
| @Override |
| public void setIoInputStream(IoInputStream in) { |
| // Do nothing |
| } |
| |
| @Override |
| public void setIoOutputStream(IoOutputStream out) { |
| ChannelSession channel = getServerChannelSession(); |
| long channelId = channel.getChannelId(); |
| this.out = new BufferedIoOutputStream("sftp-out@" + channelId, channelId, out, channel); |
| } |
| |
| @Override |
| public void setIoErrorStream(IoOutputStream err) { |
| SftpErrorStatusDataHandler errHandler = getErrorStatusDataHandler(); |
| if (errHandler instanceof AsyncCommandErrorStreamAware) { |
| ((AsyncCommandErrorStreamAware) errHandler).setIoErrorStream(err); |
| } |
| } |
| |
| @Override |
| public void start(ChannelSession channel, Environment env) throws IOException { |
| this.env = env; |
| try { |
| CloseableExecutorService executor = getExecutorService(); |
| pendingFuture = executor.submit(this); |
| } catch (RuntimeException e) { // e.g., RejectedExecutionException |
| log.error("Failed (" + e.getClass().getSimpleName() + ") to start command: " + e.getMessage(), e); |
| throw new IOException(e); |
| } |
| } |
| |
| @Override |
| public int data(ChannelSession channel, byte[] buf, int start, int len) throws IOException { |
| buffer.compact(); |
| buffer.putRawBytes(buf, start, len); |
| while (buffer.available() >= Integer.BYTES) { |
| int rpos = buffer.rpos(); |
| int msglen = buffer.getInt(); |
| if (buffer.available() >= msglen) { |
| Buffer b = new ByteArrayBuffer(msglen + Integer.BYTES + Long.SIZE /* a bit extra */, false); |
| b.putUInt(msglen); |
| b.putRawBytes(buffer.array(), buffer.rpos(), msglen); |
| requests.add(b); |
| buffer.rpos(rpos + msglen + Integer.BYTES); |
| } else { |
| buffer.rpos(rpos); |
| break; |
| } |
| } |
| return 0; |
| } |
| |
| @Override |
| public void run() { |
| int exitCode = 0; |
| long buffersCount = 0L; |
| try { |
| ChannelSession channel = getServerChannelSession(); |
| LocalWindow localWindow = channel.getLocalWindow(); |
| while (true) { |
| Buffer buffer = requests.take(); |
| if (buffer == CLOSE) { |
| break; |
| } |
| int len = buffer.available(); |
| buffersCount++; |
| process(buffer); |
| localWindow.release(len); |
| } |
| } catch (Throwable t) { |
| if (!closed.get()) { // Ignore |
| Session session = getServerSession(); |
| error("run({}) {} caught in SFTP subsystem after {} buffers: {}", |
| session, t.getClass().getSimpleName(), buffersCount, t.getMessage(), t); |
| exitCode = -1; |
| } |
| } finally { |
| closeAllHandles(); |
| callback.onExit(exitCode, exitCode != 0); |
| } |
| } |
| |
| @Override |
| public void close() throws IOException { |
| requests.clear(); |
| requests.add(CLOSE); |
| } |
| |
| @Override |
| protected void doProcess(Buffer buffer, int length, int type, int id) throws IOException { |
| super.doProcess(buffer, length, type, id); |
| if (type != SftpConstants.SSH_FXP_INIT) { |
| requestsCount.incrementAndGet(); |
| } |
| } |
| |
| @Override |
| protected void createLink( |
| int id, String existingPath, String linkPath, boolean symLink) |
| throws IOException { |
| Path link = resolveFile(linkPath); |
| Path existing = fileSystem.getPath(existingPath); |
| ServerSession session = getServerSession(); |
| if (log.isDebugEnabled()) { |
| log.debug("createLink({})[id={}], existing={}[{}], link={}[{}], symlink={})", |
| session, id, linkPath, link, existingPath, existing, symLink); |
| } |
| |
| SftpEventListener listener = getSftpEventListenerProxy(); |
| listener.linking(session, link, existing, symLink); |
| try { |
| SftpFileSystemAccessor accessor = getFileSystemAccessor(); |
| accessor.createLink(this, link, existing, symLink); |
| } catch (IOException | RuntimeException | Error e) { |
| listener.linked(session, link, existing, symLink, e); |
| throw e; |
| } |
| listener.linked(session, link, existing, symLink, null); |
| } |
| |
| @Override |
| protected void doTextSeek(int id, String handle, long line) throws IOException { |
| Handle h = handles.get(handle); |
| if (log.isDebugEnabled()) { |
| log.debug("doTextSeek({})[id={}] SSH_FXP_EXTENDED(text-seek) (handle={}[{}], line={})", |
| getServerSession(), id, Handle.safe(handle), h, line); |
| } |
| |
| FileHandle fileHandle = validateHandle(handle, h, FileHandle.class); |
| throw new UnknownServiceException("doTextSeek(" + fileHandle + ")"); |
| } |
| |
| @Override |
| protected void doOpenSSHFsync(int id, String handle) throws IOException { |
| Handle h = handles.get(handle); |
| if (log.isDebugEnabled()) { |
| log.debug("doOpenSSHFsync({})[id={}] {}[{}]", getServerSession(), id, Handle.safe(handle), h); |
| } |
| |
| FileHandle fileHandle = validateHandle(handle, h, FileHandle.class); |
| SftpFileSystemAccessor accessor = getFileSystemAccessor(); |
| accessor.syncFileData( |
| this, fileHandle, fileHandle.getFile(), |
| fileHandle.getFileHandle(), fileHandle.getFileChannel()); |
| } |
| |
| @Override |
| protected void doCheckFileHash( |
| int id, String targetType, String target, Collection<String> algos, |
| long startOffset, long length, int blockSize, Buffer buffer) |
| throws Exception { |
| Path path; |
| if (SftpConstants.EXT_CHECK_FILE_HANDLE.equalsIgnoreCase(targetType)) { |
| Handle h = handles.get(target); |
| FileHandle fileHandle = validateHandle(target, h, FileHandle.class); |
| path = fileHandle.getFile(); |
| |
| /* |
| * To quote http://tools.ietf.org/wg/secsh/draft-ietf-secsh-filexfer/draft-ietf-secsh-filexfer-09.txt |
| * section 9.1.2: |
| * |
| * If ACE4_READ_DATA was not included when the file was opened, the server MUST return |
| * STATUS_PERMISSION_DENIED. |
| */ |
| int access = fileHandle.getAccessMask(); |
| if ((access & SftpConstants.ACE4_READ_DATA) == 0) { |
| throw new AccessDeniedException(path.toString(), path.toString(), "File not opened for read"); |
| } |
| } else { |
| path = resolveFile(target); |
| |
| /* |
| * To quote http://tools.ietf.org/wg/secsh/draft-ietf-secsh-filexfer/draft-ietf-secsh-filexfer-09.txt |
| * section 9.1.2: |
| * |
| * If 'check-file-name' refers to a SSH_FILEXFER_TYPE_SYMLINK, the target should be opened. |
| */ |
| for (int index = 0; |
| Files.isSymbolicLink(path) && (index < Byte.MAX_VALUE /* TODO make this configurable */); |
| index++) { |
| path = Files.readSymbolicLink(path); |
| } |
| |
| if (Files.isSymbolicLink(path)) { |
| throw new FileSystemLoopException(target); |
| } |
| |
| SftpFileSystemAccessor accessor = getFileSystemAccessor(); |
| LinkOption[] options = accessor.resolveFileAccessLinkOptions( |
| this, path, SftpConstants.SSH_FXP_EXTENDED, targetType, false); |
| if (Files.isDirectory(path, options)) { |
| throw new NotDirectoryException(path.toString()); |
| } |
| } |
| |
| ValidateUtils.checkNotNullAndNotEmpty(algos, "No hash algorithms specified"); |
| |
| DigestFactory factory = null; |
| for (String a : algos) { |
| factory = BuiltinDigests.fromFactoryName(a); |
| if ((factory != null) && factory.isSupported()) { |
| break; |
| } |
| } |
| ValidateUtils.checkNotNull(factory, "No matching digest factory found for %s", algos); |
| |
| doCheckFileHash(id, path, factory, startOffset, length, blockSize, buffer); |
| } |
| |
| @Override |
| protected byte[] doMD5Hash( |
| int id, String targetType, String target, long startOffset, long length, byte[] quickCheckHash) |
| throws Exception { |
| if (log.isDebugEnabled()) { |
| log.debug("doMD5Hash({})({})[{}] offset={}, length={}, quick-hash={}", |
| getServerSession(), targetType, target, startOffset, length, |
| BufferUtils.toHex(':', quickCheckHash)); |
| } |
| |
| Path path; |
| if (SftpConstants.EXT_MD5_HASH_HANDLE.equalsIgnoreCase(targetType)) { |
| Handle h = handles.get(target); |
| FileHandle fileHandle = validateHandle(target, h, FileHandle.class); |
| path = fileHandle.getFile(); |
| |
| /* |
| * To quote http://tools.ietf.org/wg/secsh/draft-ietf-secsh-filexfer/draft-ietf-secsh-filexfer-09.txt |
| * section 9.1.1: |
| * |
| * The handle MUST be a file handle, and ACE4_READ_DATA MUST have been included in the desired-access when |
| * the file was opened |
| */ |
| int access = fileHandle.getAccessMask(); |
| if ((access & SftpConstants.ACE4_READ_DATA) == 0) { |
| throw new AccessDeniedException(path.toString(), path.toString(), "File not opened for read"); |
| } |
| } else { |
| path = resolveFile(target); |
| |
| SftpFileSystemAccessor accessor = getFileSystemAccessor(); |
| LinkOption[] options = accessor.resolveFileAccessLinkOptions( |
| this, path, SftpConstants.SSH_FXP_EXTENDED, targetType, true); |
| if (Files.isDirectory(path, options)) { |
| throw new NotDirectoryException(path.toString()); |
| } |
| } |
| |
| /* |
| * To quote http://tools.ietf.org/wg/secsh/draft-ietf-secsh-filexfer/draft-ietf-secsh-filexfer-09.txt section |
| * 9.1.1: |
| * |
| * If both start-offset and length are zero, the entire file should be included |
| */ |
| long effectiveLength = length; |
| long totalSize = Files.size(path); |
| if ((startOffset == 0L) && (length == 0L)) { |
| effectiveLength = totalSize; |
| } else { |
| long maxRead = startOffset + effectiveLength; |
| if (maxRead > totalSize) { |
| effectiveLength = totalSize - startOffset; |
| } |
| } |
| |
| return doMD5Hash(id, path, startOffset, effectiveLength, quickCheckHash); |
| } |
| |
| @Override |
| protected void doVersionSelect(Buffer buffer, int id, String proposed) throws IOException { |
| ServerSession session = getServerSession(); |
| /* |
| * The 'version-select' MUST be the first request from the client to the server; if it is not, the server MUST |
| * fail the request and close the channel. |
| */ |
| if (requestsCount.get() > 0L) { |
| sendStatus(prepareReply(buffer), id, SftpConstants.SSH_FX_FAILURE, |
| "Version selection not the 1st request for proposal = " + proposed); |
| session.close(true); |
| return; |
| } |
| |
| Boolean result = validateProposedVersion(buffer, id, proposed); |
| /* |
| * "MUST then close the channel without processing any further requests" |
| */ |
| if (result == null) { // response sent internally |
| session.close(true); |
| return; |
| } |
| if (result) { |
| version = Integer.parseInt(proposed); |
| sendStatus(prepareReply(buffer), id, SftpConstants.SSH_FX_OK, ""); |
| } else { |
| sendStatus(prepareReply(buffer), id, SftpConstants.SSH_FX_FAILURE, "Unsupported version " + proposed); |
| session.close(true); |
| } |
| } |
| |
| @Override |
| protected void doBlock(int id, String handle, long offset, long length, int mask) throws IOException { |
| int vers = getVersion(); |
| if (vers < SftpConstants.SFTP_V6) { |
| throw new UnsupportedOperationException("File locks are not supported in sftp v" + vers + ", need SFTPv6"); |
| } |
| Handle p = handles.get(handle); |
| ServerSession session = getServerSession(); |
| if (log.isDebugEnabled()) { |
| log.debug("doBlock({})[id={}] SSH_FXP_BLOCK (handle={}[{}], offset={}, length={}, mask=0x{})", |
| session, id, Handle.safe(handle), p, offset, length, Integer.toHexString(mask)); |
| } |
| |
| FileHandle fileHandle = validateHandle(handle, p, FileHandle.class); |
| SftpEventListener listener = getSftpEventListenerProxy(); |
| listener.blocking(session, handle, fileHandle, offset, length, mask); |
| try { |
| fileHandle.lock(offset, length, mask); |
| } catch (IOException | RuntimeException | Error e) { |
| listener.blocked(session, handle, fileHandle, offset, length, mask, e); |
| throw e; |
| } |
| listener.blocked(session, handle, fileHandle, offset, length, mask, null); |
| } |
| |
| @Override |
| protected void doUnblock(int id, String handle, long offset, long length) throws IOException { |
| int vers = getVersion(); |
| if (vers < SftpConstants.SFTP_V6) { |
| throw new UnsupportedOperationException("File locks are not supported in sftp v" + vers + ", need SFTPv6"); |
| } |
| Handle p = handles.get(handle); |
| ServerSession session = getServerSession(); |
| if (log.isDebugEnabled()) { |
| log.debug("doUnblock({})[id={}] SSH_FXP_UNBLOCK (handle={}[{}], offset={}, length={})", |
| session, id, Handle.safe(handle), p, offset, length); |
| } |
| |
| FileHandle fileHandle = validateHandle(handle, p, FileHandle.class); |
| SftpEventListener listener = getSftpEventListenerProxy(); |
| listener.unblocking(session, handle, fileHandle, offset, length); |
| try { |
| fileHandle.unlock(offset, length); |
| } catch (IOException | RuntimeException | Error e) { |
| listener.unblocked(session, handle, fileHandle, offset, length, e); |
| throw e; |
| } |
| listener.unblocked(session, handle, fileHandle, offset, length, null); |
| } |
| |
| @Override |
| @SuppressWarnings("resource") |
| protected void doCopyData( |
| int id, String readHandle, long readOffset, long readLength, String writeHandle, long writeOffset) |
| throws IOException { |
| boolean inPlaceCopy = readHandle.equals(writeHandle); |
| Handle rh = handles.get(readHandle); |
| Handle wh = inPlaceCopy ? rh : handles.get(writeHandle); |
| if (log.isDebugEnabled()) { |
| log.debug("doCopyData({})[id={}] SSH_FXP_EXTENDED[{}] read={}[{}]" |
| + ", read-offset={}, read-length={}, write={}[{}], write-offset={})", |
| getServerSession(), id, SftpConstants.EXT_COPY_DATA, |
| Handle.safe(readHandle), rh, readOffset, readLength, Handle.safe(writeHandle), wh, writeOffset); |
| } |
| |
| FileHandle srcHandle = validateHandle(readHandle, rh, FileHandle.class); |
| Path srcPath = srcHandle.getFile(); |
| int srcAccess = srcHandle.getAccessMask(); |
| if ((srcAccess & SftpConstants.ACE4_READ_DATA) != SftpConstants.ACE4_READ_DATA) { |
| throw new AccessDeniedException(srcPath.toString(), srcPath.toString(), "Source file not opened for read"); |
| } |
| |
| ValidateUtils.checkTrue(readLength >= 0L, "Invalid read length: %d", readLength); |
| ValidateUtils.checkTrue(readOffset >= 0L, "Invalid read offset: %d", readOffset); |
| |
| long totalSize = Files.size(srcHandle.getFile()); |
| long effectiveLength = readLength; |
| if (effectiveLength == 0L) { |
| effectiveLength = totalSize - readOffset; |
| } else { |
| long maxRead = readOffset + effectiveLength; |
| if (maxRead > totalSize) { |
| effectiveLength = totalSize - readOffset; |
| } |
| } |
| ValidateUtils.checkTrue(effectiveLength > 0L, |
| "Non-positive effective copy data length: %d", effectiveLength); |
| |
| FileHandle dstHandle = inPlaceCopy ? srcHandle : validateHandle(writeHandle, wh, FileHandle.class); |
| int dstAccess = dstHandle.getAccessMask(); |
| if ((dstAccess & SftpConstants.ACE4_WRITE_DATA) != SftpConstants.ACE4_WRITE_DATA) { |
| throw new AccessDeniedException(srcHandle.toString(), srcHandle.toString(), "Source handle not opened for write"); |
| } |
| |
| ValidateUtils.checkTrue(writeOffset >= 0L, "Invalid write offset: %d", writeOffset); |
| // check if overlapping ranges as per the draft |
| if (inPlaceCopy) { |
| long maxRead = readOffset + effectiveLength; |
| if (maxRead > totalSize) { |
| maxRead = totalSize; |
| } |
| |
| long maxWrite = writeOffset + effectiveLength; |
| if (maxWrite > readOffset) { |
| throw new IllegalArgumentException("Write range end [" + writeOffset + "-" + maxWrite + "]" |
| + " overlaps with read range [" + readOffset + "-" + maxRead + "]"); |
| } else if (maxRead > writeOffset) { |
| throw new IllegalArgumentException("Read range end [" + readOffset + "-" + maxRead + "]" |
| + " overlaps with write range [" + writeOffset + "-" + maxWrite + "]"); |
| } |
| } |
| |
| byte[] copyBuf = new byte[Math.min(IoUtils.DEFAULT_COPY_SIZE, (int) effectiveLength)]; |
| while (effectiveLength > 0L) { |
| int remainLength = Math.min(copyBuf.length, (int) effectiveLength); |
| int readLen = srcHandle.read(copyBuf, 0, remainLength, readOffset); |
| if (readLen < 0) { |
| throw new EOFException("Premature EOF while still remaining " + effectiveLength + " bytes"); |
| } |
| dstHandle.write(copyBuf, 0, readLen, writeOffset); |
| |
| effectiveLength -= readLen; |
| readOffset += readLen; |
| writeOffset += readLen; |
| } |
| } |
| |
| @Override |
| protected void doReadDir(Buffer buffer, int id) throws IOException { |
| String handle = buffer.getString(StandardCharsets.ISO_8859_1); |
| Handle h = handles.get(handle); |
| ServerSession session = getServerSession(); |
| boolean debugEnabled = log.isDebugEnabled(); |
| if (debugEnabled) { |
| log.debug("doReadDir({})[id={}] SSH_FXP_READDIR (handle={}[{}])", session, id, Handle.safe(handle), h); |
| } |
| |
| Buffer reply = null; |
| try { |
| DirectoryHandle dh = validateHandle(handle, h, DirectoryHandle.class); |
| if (dh.isDone()) { |
| sendStatus(prepareReply(buffer), id, SftpConstants.SSH_FX_EOF, "Directory reading is done"); |
| return; |
| } |
| |
| Path file = dh.getFile(); |
| // If it's an SftpPath, don't re-check accessibily or existence. The underlying DirectoryHandle iterator |
| // contacts the upstream server, which should check. This repeated check here is questionable anyway. |
| // We did check in doOpenDir(). If access to the directory has changed in the meantime; it's undefined |
| // anyway what happens. If the directory is local, it depends on what the Java library would do with |
| // the DirectoryStream in that case if it reads the directory lazily. And if it is remote, it may well |
| // be that the upstream server has read the whole list from the local file system and buffered it, so |
| // it could serve the listing even if some other concurrent operation had removed the directory in the |
| // meantime, or had changed its access. It is entirely unspecified what shall happen if files inside |
| // the directory are changed while the directory is listed, and likewise it's entirely unspecified what |
| // shall happen if the directory itself is deleted while being listed. |
| // |
| // As long as the file system is local, this check here is local operations only, but if the directory |
| // is remote; this incurs several (up to three) remote LSTAT calls. We really can skip this here and let |
| // the upstream server decide. |
| if (!(file instanceof SftpPath)) { |
| LinkOption[] options = getPathResolutionLinkOption(SftpConstants.SSH_FXP_READDIR, "", file); |
| Boolean status = IoUtils.checkFileExistsAnySymlinks(file, !IoUtils.followLinks(options)); |
| if (status == null) { |
| throw new AccessDeniedException(file.toString(), file.toString(), "Cannot determine existence of read-dir"); |
| } |
| |
| if (!status) { |
| throw new NoSuchFileException(file.toString(), file.toString(), "Non-existent directory"); |
| } else if (!Files.isDirectory(file, options)) { |
| throw new NotDirectoryException(file.toString()); |
| } else if (!Files.isReadable(file)) { |
| throw new AccessDeniedException(file.toString(), file.toString(), "Not readable"); |
| } |
| } |
| |
| SftpEventListener listener = getSftpEventListenerProxy(); |
| listener.readingEntries(session, handle, dh); |
| |
| if (dh.isSendDot() || dh.isSendDotDot() || dh.hasNext()) { |
| // There is at least one file in the directory or we need to send the "..". |
| // Send only a few files at a time to not create packets of a too |
| // large size or have a timeout to occur. |
| |
| reply = prepareReply(buffer); |
| reply.putByte((byte) SftpConstants.SSH_FXP_NAME); |
| reply.putInt(id); |
| |
| int lenPos = reply.wpos(); |
| reply.putUInt(0L); // save room for actual length |
| |
| int maxDataSize = SftpModuleProperties.MAX_READDIR_DATA_SIZE.getRequired(session); |
| int count = doReadDir(id, handle, dh, reply, maxDataSize, false); |
| BufferUtils.updateLengthPlaceholder(reply, lenPos, count); |
| if ((!dh.isSendDot()) && (!dh.isSendDotDot()) && (!dh.hasNext())) { |
| dh.markDone(); |
| } |
| |
| int sftpVersion = getVersion(); |
| Boolean indicator = SftpHelper.indicateEndOfNamesList(reply, sftpVersion, session, dh.isDone()); |
| if (debugEnabled) { |
| log.debug("doReadDir({})({})[{}] - sending {} entries - eol={} (SFTP version {})", session, |
| Handle.safe(handle), h, |
| count, indicator, sftpVersion); |
| } |
| } else { |
| // empty directory |
| dh.markDone(); |
| sendStatus(prepareReply(buffer), id, SftpConstants.SSH_FX_EOF, "Empty directory"); |
| return; |
| } |
| |
| Objects.requireNonNull(reply, "No reply buffer created"); |
| } catch (IOException | RuntimeException | Error e) { |
| sendStatus(prepareReply(buffer), id, e, SftpConstants.SSH_FXP_READDIR, Handle.safe(handle)); |
| return; |
| } |
| |
| send(reply); |
| } |
| |
| @Override |
| protected String doOpenDir(int id, String path, Path dir, LinkOption... options) throws IOException { |
| SftpPathImpl.withAttributeCache(dir, p -> { |
| Boolean status = IoUtils.checkFileExistsAnySymlinks(p, !IoUtils.followLinks(options)); |
| if (status == null) { |
| throw signalOpenFailure(id, path, p, true, |
| new AccessDeniedException(p.toString(), p.toString(), "Cannot determine open-dir existence")); |
| } |
| |
| if (!status) { |
| throw signalOpenFailure(id, path, p, true, |
| new NoSuchFileException(path, path, "Referenced target directory N/A")); |
| } else if (!Files.isDirectory(p, options)) { |
| throw signalOpenFailure(id, path, p, true, new NotDirectoryException(path)); |
| } else if (!Files.isReadable(p)) { |
| throw signalOpenFailure(id, path, p, true, |
| new AccessDeniedException(p.toString(), p.toString(), "Not readable")); |
| } |
| return null; |
| }); |
| // Directory exists and is readable |
| String handle; |
| try { |
| synchronized (handles) { |
| handle = generateFileHandle(dir); |
| DirectoryHandle dirHandle = new DirectoryHandle(this, dir, handle); |
| handles.put(handle, dirHandle); |
| } |
| } catch (IOException e) { |
| throw signalOpenFailure(id, path, dir, true, e); |
| } |
| |
| return handle; |
| } |
| |
| @Override |
| protected void doFSetStat(int id, String handle, Map<String, ?> attrs) throws IOException { |
| Handle h = handles.get(handle); |
| if (log.isDebugEnabled()) { |
| log.debug("doFsetStat({})[id={}] SSH_FXP_FSETSTAT (handle={}[{}], attrs={})", |
| getServerSession(), id, Handle.safe(handle), h, attrs); |
| } |
| |
| Handle fileHandle = validateHandle(handle, h, Handle.class); |
| Path path = fileHandle.getFile(); |
| boolean followLinks = resolvePathResolutionFollowLinks( |
| SftpConstants.SSH_FXP_FSETSTAT, "", path); |
| doSetAttributes(SftpConstants.SSH_FXP_FSETSTAT, "", path, attrs, followLinks); |
| } |
| |
| @Override |
| protected Map<String, Object> doFStat(int id, String handle, int flags) throws IOException { |
| Handle h = handles.get(handle); |
| if (log.isDebugEnabled()) { |
| log.debug("doFStat({})[id={}] SSH_FXP_FSTAT (handle={}[{}], flags=0x{})", |
| getServerSession(), id, Handle.safe(handle), h, Integer.toHexString(flags)); |
| } |
| |
| Handle fileHandle = validateHandle(handle, h, Handle.class); |
| SftpFileSystemAccessor accessor = getFileSystemAccessor(); |
| Path file = fileHandle.getFile(); |
| LinkOption[] options = accessor.resolveFileAccessLinkOptions( |
| this, file, SftpConstants.SSH_FXP_FSTAT, "", true); |
| |
| boolean followLinks = resolvePathResolutionFollowLinks(SftpConstants.SSH_FXP_FSTAT, handle, file); |
| return resolveFileAttributes(file, flags, followLinks, options); |
| } |
| |
| @Override |
| protected void doWrite( |
| int id, String handle, long offset, int length, byte[] data, int doff, int remaining) |
| throws IOException { |
| Handle h = handles.get(handle); |
| ServerSession session = getServerSession(); |
| int maxAllowed = SftpModuleProperties.MAX_WRITEDATA_PACKET_LENGTH.getRequired(session); |
| if (log.isTraceEnabled()) { |
| log.trace("doWrite({})[id={}] SSH_FXP_WRITE (handle={}[{}], offset={}, length={}, maxAllowed={})", |
| session, id, Handle.safe(handle), h, offset, length, maxAllowed); |
| } |
| |
| FileHandle fh = validateHandle(handle, h, FileHandle.class); |
| if (length < 0) { |
| throw new IllegalStateException("Bad length (" + length + ") for writing to " + fh); |
| } |
| |
| if (remaining < length) { |
| throw new IllegalStateException("Not enough buffer data for writing to " + fh |
| + ": required=" + length + ", available=" + remaining); |
| } |
| |
| if (length > maxAllowed) { |
| throw new IOException("Reuested write size (" + length + ") exceeds max. allowed (" + maxAllowed + ")"); |
| } |
| |
| SftpEventListener listener = getSftpEventListenerProxy(); |
| listener.writing(session, handle, fh, offset, data, doff, length); |
| try { |
| if (fh.isOpenAppend()) { |
| fh.append(data, doff, length); |
| } else { |
| fh.write(data, doff, length, offset); |
| } |
| } catch (IOException | RuntimeException | Error e) { |
| listener.written(session, handle, fh, offset, data, doff, length, e); |
| throw e; |
| } |
| listener.written(session, handle, fh, offset, data, doff, length, null); |
| } |
| |
| @Override |
| protected int doRead( |
| int id, String handle, long offset, int length, byte[] data, int doff, AtomicReference<Boolean> eof) |
| throws IOException { |
| Handle h = handles.get(handle); |
| ServerSession session = getServerSession(); |
| if (log.isTraceEnabled()) { |
| log.trace("doRead({})[id={}] SSH_FXP_READ (handle={}[{}], offset={}, length={})", |
| session, id, Handle.safe(handle), h, offset, length); |
| } |
| |
| ValidateUtils.checkTrue(length > 0L, "Invalid read length: %d", length); |
| FileHandle fh = validateHandle(handle, h, FileHandle.class); |
| SftpEventListener listener = getSftpEventListenerProxy(); |
| int readLen; |
| listener.reading(session, handle, fh, offset, data, doff, length); |
| try { |
| readLen = fh.read(data, doff, length, offset, eof); |
| } catch (IOException | RuntimeException | Error e) { |
| listener.read(session, handle, fh, offset, data, doff, length, -1, e); |
| throw e; |
| } |
| listener.read(session, handle, fh, offset, data, doff, length, readLen, null); |
| return readLen; |
| } |
| |
| @Override |
| protected void doClose(int id, String handle) throws IOException { |
| Handle h; |
| synchronized (handles) { |
| h = handles.remove(handle); |
| if (fileHandleSize == Integer.BYTES) { |
| if (handles.isEmpty()) { |
| // No currently used handles: we may forget about unused handles and start from scratch |
| unusedHandles.clear(); |
| } else if (h != null && handle != null) { |
| // If h is null, the handle was invalid |
| unusedHandles.push(handle); |
| } |
| } |
| } |
| ServerSession session = getServerSession(); |
| if (log.isDebugEnabled()) { |
| log.debug("doClose({})[id={}] SSH_FXP_CLOSE (handle={}[{}])", session, id, Handle.safe(handle), h); |
| } |
| |
| Handle nodeHandle = validateHandle(handle, h, Handle.class); |
| SftpEventListener listener = getSftpEventListenerProxy(); |
| try { |
| listener.closing(session, handle, nodeHandle); |
| nodeHandle.close(); |
| listener.closed(session, handle, nodeHandle, null); |
| } catch (IOException | RuntimeException | Error e) { |
| listener.closed(session, handle, nodeHandle, e); |
| throw e; |
| } finally { |
| nodeHandle.clearAttributes(); |
| } |
| } |
| |
| @Override |
| protected String doOpen( |
| int id, String path, int pflags, int access, Map<String, Object> attrs) |
| throws IOException { |
| ServerSession session = getServerSession(); |
| if (log.isDebugEnabled()) { |
| log.debug("doOpen({})[id={}] SSH_FXP_OPEN (path={}, access=0x{}, pflags=0x{}, attrs={})", |
| session, id, path, Integer.toHexString(access), Integer.toHexString(pflags), attrs); |
| } |
| |
| Path file = resolveFile(path); |
| |
| try { |
| synchronized (handles) { |
| String handle = generateFileHandle(file); |
| FileHandle fileHandle = new FileHandle(this, file, handle, pflags, access, attrs); |
| handles.put(handle, fileHandle); |
| return handle; |
| } |
| } catch (IOException e) { |
| throw signalOpenFailure(id, path, file, false, e); |
| } |
| } |
| |
| /** |
| * Generates a file handle, which is used as key in a {@link Map} to record open file handles across requests. For |
| * historical reasons, the handle is returned as a {@link String}, and other APIs also use {@link String} for |
| * handles, so we're kind of stuck with that. |
| * <p> |
| * As of version 2.10.1, the result <b>must</b> be encoded using {@link StandardCharsets#ISO_8859_1}, and it must |
| * not have more than 256 IOS-8859-1 characters (i.e., bytes). |
| * </p> |
| * <p> |
| * This method is called with the monitor on the {@link #handles} {@link Map} held. |
| * </p> |
| * |
| * @param file to generate a handle for |
| * @return the file handle |
| * @throws IOException if no handle could be generated |
| */ |
| protected String generateFileHandle(Path file) throws IOException { |
| int curHandleCount = handles.size(); |
| if (curHandleCount >= maxHandleCount) { |
| throw new SftpException(SftpConstants.SSH_FX_NO_SPACE_ON_FILESYSTEM, |
| "Too many open handles: current=" + curHandleCount + ", max.=" + maxHandleCount); |
| } |
| boolean traceEnabled = log.isTraceEnabled(); |
| if (fileHandleSize == Integer.BYTES) { |
| String handle = unusedHandles.poll(); |
| if (handle == null) { |
| // No usused previously created handle available: all handles are used and have values in the range of |
| // [0 .. curHandleCount). So we can simply use curHandleCount for the new handle and be sure it is |
| // unique. |
| Arrays.fill(workBuf, (byte) 0); |
| BufferUtils.putUInt(curHandleCount, workBuf); |
| handle = new String(workBuf, 0, Integer.BYTES, StandardCharsets.ISO_8859_1); |
| } |
| if (traceEnabled) { |
| log.trace("generateFileHandle({})[{}] {}", getServerSession(), file, Handle.safe(handle)); |
| } |
| return handle; |
| } |
| // use several rounds in case the file handle size is relatively small so we might get conflicts |
| ServerSession session = getServerSession(); |
| for (int index = 0; index < maxFileHandleRounds; index++) { |
| randomizer.fill(workBuf, 0, fileHandleSize); |
| String handle = new String(workBuf, 0, fileHandleSize, StandardCharsets.ISO_8859_1); |
| |
| if (handles.containsKey(handle)) { |
| if (traceEnabled) { |
| log.trace("generateFileHandle({})[{}] handle={} in use at round {}", |
| session, file, Handle.safe(handle), index); |
| } |
| continue; |
| } |
| |
| if (traceEnabled) { |
| log.trace("generateFileHandle({})[{}] {}", session, file, Handle.safe(handle)); |
| } |
| return handle; |
| } |
| |
| throw new StreamCorruptedException("Failed to generate a unique file handle for " + file); |
| } |
| |
| @Override |
| protected void doInit(Buffer buffer, int id) throws IOException { |
| ServerSession session = getServerSession(); |
| if (log.isDebugEnabled()) { |
| log.debug("doInit({})[id={}] SSH_FXP_INIT (version={})", session, id, id); |
| } |
| |
| Map.Entry<Integer, String> negotiated = checkVersionCompatibility( |
| buffer, id, id, SftpConstants.SSH_FX_OP_UNSUPPORTED); |
| if (negotiated == null) { // i.e. validation failed |
| return; |
| } |
| |
| version = negotiated.getKey(); |
| while (buffer.available() > 0) { |
| String name = buffer.getString(); |
| byte[] data = buffer.getBytes(); |
| extensions.put(name, data); |
| } |
| |
| buffer = prepareReply(buffer); |
| |
| buffer.putByte((byte) SftpConstants.SSH_FXP_VERSION); |
| buffer.putUInt(version); |
| appendExtensions(buffer, negotiated.getValue()); |
| |
| SftpEventListener listener = getSftpEventListenerProxy(); |
| listener.initialized(session, version); |
| |
| send(buffer); |
| } |
| |
| @Override |
| protected Buffer prepareReply(Buffer buffer) { |
| buffer.clear(); |
| buffer.putUInt(0L); // reserve space for actual packet length |
| return buffer; |
| } |
| |
| @Override |
| protected void send(Buffer buffer) throws IOException { |
| BufferUtils.updateLengthPlaceholder(buffer, 0); |
| out.writeBuffer(buffer); |
| } |
| |
| @Override |
| public void destroy(ChannelSession channel) { |
| if (closed.getAndSet(true)) { |
| return; // ignore if already closed |
| } |
| |
| ServerSession session = getServerSession(); |
| boolean debugEnabled = log.isDebugEnabled(); |
| if (debugEnabled) { |
| log.debug("destroy({}) - mark as closed", session); |
| } |
| |
| try { |
| SftpEventListener listener = getSftpEventListenerProxy(); |
| listener.destroying(session); |
| } catch (Exception e) { |
| warn("destroy({}) Failed ({}) to announce destruction event: {}", |
| session, e.getClass().getSimpleName(), e.getMessage(), e); |
| } |
| |
| // if thread has not completed, cancel it |
| if ((pendingFuture != null) && (!pendingFuture.isDone())) { |
| boolean result = pendingFuture.cancel(true); |
| // TODO consider waiting some reasonable (?) amount of time for cancellation |
| if (debugEnabled) { |
| log.debug("destroy({}) - cancel pending future={}", session, result); |
| } |
| } |
| |
| pendingFuture = null; |
| |
| CloseableExecutorService executors = getExecutorService(); |
| if ((executors != null) && (!executors.isShutdown())) { |
| Collection<Runnable> runners = executors.shutdownNow(); |
| if (debugEnabled) { |
| log.debug("destroy({}) - shutdown executor service - runners count={}", session, runners.size()); |
| } |
| } |
| this.executorService = null; |
| |
| try { |
| fileSystem.close(); |
| } catch (UnsupportedOperationException e) { |
| if (debugEnabled) { |
| log.debug("destroy({}) closing the file system is not supported", session); |
| } |
| } catch (IOException e) { |
| if (debugEnabled) { |
| log.warn("destroy({}) failed to close the file system", session, e); |
| } |
| } |
| } |
| |
| protected void closeAllHandles() { |
| boolean debugEnabled = log.isDebugEnabled(); |
| ServerSession session = getServerSession(); |
| SftpEventListener listener = getSftpEventListenerProxy(); |
| for (Map.Entry<String, Handle> fe : handles.entrySet()) { |
| String id = fe.getKey(); |
| Handle handle = fe.getValue(); |
| try { |
| if (debugEnabled) { |
| log.debug("closeAllHandles({}) exiting pending handle {} [{}]", session, Handle.safe(id), handle); |
| } |
| |
| listener.exiting(session, handle); |
| } catch (IOException | RuntimeException e) { |
| log.warn("closeAllHandles({}) failed ({}) to inform listener of exit for handle={}[{}]: {}", |
| session, e.getClass().getSimpleName(), Handle.safe(id), handle, e.getMessage()); |
| } |
| |
| try { |
| handle.close(); |
| if (debugEnabled) { |
| log.debug("closeAllHandles({}) closed pending handle {} [{}]", session, Handle.safe(id), handle); |
| } |
| } catch (IOException | RuntimeException e) { |
| log.warn("closeAllHandles({}) failed ({}) to close handle={}[{}]: {}", |
| session, e.getClass().getSimpleName(), Handle.safe(id), handle, e.getMessage()); |
| } finally { |
| handle.clearAttributes(); |
| } |
| } |
| handles.clear(); |
| } |
| } |