[SSHD-1079] Async mode on the local port forwarder (#167)
Disabled by default
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/SocksProxy.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/SocksProxy.java
index bdb0b7f..f87d35c 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/forward/SocksProxy.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/SocksProxy.java
@@ -27,6 +27,7 @@
import org.apache.sshd.client.future.OpenFuture;
import org.apache.sshd.common.SshException;
import org.apache.sshd.common.io.IoHandler;
+import org.apache.sshd.common.io.IoOutputStream;
import org.apache.sshd.common.io.IoSession;
import org.apache.sshd.common.session.ConnectionService;
import org.apache.sshd.common.util.buffer.Buffer;
@@ -100,9 +101,14 @@
}
protected void onMessage(Buffer buffer) throws IOException {
- OutputStream invertedIn = channel.getInvertedIn();
- invertedIn.write(buffer.array(), buffer.rpos(), buffer.available());
- invertedIn.flush();
+ IoOutputStream asyncIn = channel.getAsyncIn();
+ if (asyncIn != null) {
+ asyncIn.writePacket(buffer);
+ } else {
+ OutputStream invertedIn = channel.getInvertedIn();
+ invertedIn.write(buffer.array(), buffer.rpos(), buffer.available());
+ invertedIn.flush();
+ }
}
@Override
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java
index ae78076..c743948 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java
@@ -33,8 +33,11 @@
import org.apache.sshd.common.NamedResource;
import org.apache.sshd.common.SshConstants;
import org.apache.sshd.common.SshException;
+import org.apache.sshd.common.channel.ChannelAsyncInputStream;
+import org.apache.sshd.common.channel.ChannelAsyncOutputStream;
import org.apache.sshd.common.channel.ChannelOutputStream;
import org.apache.sshd.common.channel.Window;
+import org.apache.sshd.common.future.CloseFuture;
import org.apache.sshd.common.io.IoSession;
import org.apache.sshd.common.session.Session;
import org.apache.sshd.common.util.ValidateUtils;
@@ -161,12 +164,24 @@
@Override
protected synchronized void doOpen() throws IOException {
if (streaming == Streaming.Async) {
- throw new IllegalArgumentException("Asynchronous streaming isn't supported yet on this channel");
+ asyncIn = new ChannelAsyncOutputStream(this, SshConstants.SSH_MSG_CHANNEL_DATA) {
+ @SuppressWarnings("synthetic-access")
+ @Override
+ protected CloseFuture doCloseGracefully() {
+ try {
+ sendEof();
+ } catch (IOException e) {
+ getSession().exceptionCaught(e);
+ }
+ return super.doCloseGracefully();
+ }
+ };
+ asyncOut = new ChannelAsyncInputStream(this);
+ } else {
+ out = new ChannelOutputStream(
+ this, getRemoteWindow(), log, SshConstants.SSH_MSG_CHANNEL_DATA, true);
+ invertedIn = out;
}
-
- out = new ChannelOutputStream(
- this, getRemoteWindow(), log, SshConstants.SSH_MSG_CHANNEL_DATA, true);
- invertedIn = out;
}
@Override