blob: 623f4a3f8dec5e9687ce3eed7754e65ec4a1525c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sshd.sftp.client.impl;
import java.io.IOException;
import java.util.Collection;
import java.util.Deque;
import java.util.LinkedList;
import java.util.Objects;
import org.apache.sshd.common.SshConstants;
import org.apache.sshd.common.session.Session;
import org.apache.sshd.common.util.buffer.Buffer;
import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
import org.apache.sshd.common.util.io.output.OutputStreamWithChannel;
import org.apache.sshd.sftp.client.SftpClient;
import org.apache.sshd.sftp.client.SftpClient.CloseableHandle;
import org.apache.sshd.sftp.client.SftpClient.OpenMode;
import org.apache.sshd.sftp.client.SftpClientHolder;
import org.apache.sshd.sftp.common.SftpConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Implements an output stream for a given remote file
*
* @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
*/
public class SftpOutputStreamAsync extends OutputStreamWithChannel implements SftpClientHolder {
protected final Logger log;
protected final byte[] bb = new byte[1];
protected final int bufferSize;
protected Buffer buffer;
protected CloseableHandle handle;
protected long offset;
protected final Deque<SftpAckData> pendingWrites = new LinkedList<>();
private final AbstractSftpClient clientInstance;
private final String path;
public SftpOutputStreamAsync(AbstractSftpClient client, int bufferSize,
String path, Collection<OpenMode> mode)
throws IOException {
this.log = LoggerFactory.getLogger(getClass());
this.clientInstance = Objects.requireNonNull(client, "No SFTP client instance");
this.path = path;
this.handle = client.open(path, mode);
this.bufferSize = bufferSize;
}
public SftpOutputStreamAsync(AbstractSftpClient client, int bufferSize,
String path, CloseableHandle handle)
throws IOException {
this.log = LoggerFactory.getLogger(getClass());
this.clientInstance = Objects.requireNonNull(client, "No SFTP client instance");
this.path = path;
this.handle = handle;
this.bufferSize = bufferSize;
}
@Override
public final AbstractSftpClient getClient() {
return clientInstance;
}
public void setOffset(long offset) {
this.offset = offset;
}
/**
* The remotely accessed file path
*
* @return Remote file path
*/
public final String getPath() {
return path;
}
@Override
public boolean isOpen() {
return (handle != null) && handle.isOpen();
}
@Override
public void write(int b) throws IOException {
bb[0] = (byte) b;
write(bb, 0, 1);
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
byte[] id = handle.getIdentifier();
SftpClient client = getClient();
Session session = client.getSession();
boolean traceEnabled = log.isTraceEnabled();
int writtenCount = 0;
int totalLen = len;
do {
if (buffer == null) {
if (traceEnabled) {
log.trace("write({}) allocate buffer size={} after {}/{} bytes",
this, bufferSize, writtenCount, totalLen);
}
buffer = session.createBuffer(SshConstants.SSH_MSG_CHANNEL_DATA, bufferSize);
int hdr = 9 + 16 + 8 + id.length + buffer.wpos();
buffer.rpos(hdr);
buffer.wpos(hdr);
}
int max = bufferSize - (9 + 16 + id.length + 72);
int nb = Math.min(len, Math.max(0, max - buffer.available()));
buffer.putRawBytes(b, off, nb);
off += nb;
len -= nb;
writtenCount += nb;
if (buffer.available() >= max) {
if (traceEnabled) {
log.trace("write({}) flush after {}/{} bytes", this, writtenCount, totalLen);
}
flush();
}
} while (len > 0);
}
@Override
public void flush() throws IOException {
if (!isOpen()) {
throw new IOException("flush(" + getPath() + ") stream is closed");
}
boolean debugEnabled = log.isDebugEnabled();
AbstractSftpClient client = getClient();
for (int ackIndex = 0;;) {
SftpAckData ack = pendingWrites.peek();
if (ack == null) {
if (debugEnabled) {
log.debug("flush({}) processed {} pending writes", this, ackIndex);
}
break;
}
ackIndex++;
if (debugEnabled) {
log.debug("flush({}) waiting for ack #{}: {}", this, ackIndex, ack);
}
Buffer response = client.receive(ack.id, 0L);
if (response == null) {
if (debugEnabled) {
log.debug("flush({}) no response for ack #{}: {}", this, ackIndex, ack);
}
break;
}
if (debugEnabled) {
log.debug("flush({}) processing ack #{}: {}", this, ackIndex, ack);
}
pendingWrites.removeFirst();
client.checkResponseStatus(SftpConstants.SSH_FXP_WRITE, response);
}
if (buffer == null) {
if (debugEnabled) {
log.debug("flush({}) no pending buffer to flush", this);
}
return;
}
byte[] id = handle.getIdentifier();
int avail = buffer.available();
Buffer buf;
if (buffer.rpos() >= (16 + id.length)) {
int wpos = buffer.wpos();
buffer.rpos(buffer.rpos() - 16 - id.length);
buffer.wpos(buffer.rpos());
buffer.putBytes(id);
buffer.putLong(offset);
buffer.putUInt(avail);
buffer.wpos(wpos);
buf = buffer;
} else {
buf = new ByteArrayBuffer(id.length + avail + Long.SIZE /* some extra fields */, false);
buf.putBytes(id);
buf.putLong(offset);
buf.putBytes(buffer.array(), buffer.rpos(), avail);
}
int reqId = client.send(SftpConstants.SSH_FXP_WRITE, buf);
SftpAckData ack = new SftpAckData(reqId, offset, avail);
if (debugEnabled) {
log.debug("flush({}) enqueue pending ack={}", this, ack);
}
pendingWrites.add(ack);
offset += avail;
buffer = null;
}
@Override
public void close() throws IOException {
if (!isOpen()) {
return;
}
try {
boolean debugEnabled = log.isDebugEnabled();
try {
int pendingSize = (buffer == null) ? 0 : buffer.available();
if (pendingSize > 0) {
if (debugEnabled) {
log.debug("close({}) flushing {} pending bytes", this, pendingSize);
}
flush();
}
AbstractSftpClient client = getClient();
for (int ackIndex = 1; !pendingWrites.isEmpty(); ackIndex++) {
SftpAckData ack = pendingWrites.removeFirst();
if (debugEnabled) {
log.debug("close({}) processing ack #{}: {}", this, ackIndex, ack);
}
Buffer response = client.receive(ack.id);
if (debugEnabled) {
log.debug("close({}) processing ack #{} response for {}", this, ackIndex, ack);
}
client.checkResponseStatus(SftpConstants.SSH_FXP_WRITE, response);
}
} finally {
if (debugEnabled) {
log.debug("close({}) closing file handle", this);
}
handle.close();
}
} finally {
handle = null;
}
}
@Override
public String toString() {
SftpClient client = getClient();
return getClass().getSimpleName()
+ "[" + client.getSession() + "]"
+ "[" + getPath() + "]";
}
}