[SSHD-1079] Async mode on the local port forwarder (#167)

Disabled by default
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/SocksProxy.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/SocksProxy.java
index bdb0b7f..f87d35c 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/forward/SocksProxy.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/SocksProxy.java
@@ -27,6 +27,7 @@
 import org.apache.sshd.client.future.OpenFuture;
 import org.apache.sshd.common.SshException;
 import org.apache.sshd.common.io.IoHandler;
+import org.apache.sshd.common.io.IoOutputStream;
 import org.apache.sshd.common.io.IoSession;
 import org.apache.sshd.common.session.ConnectionService;
 import org.apache.sshd.common.util.buffer.Buffer;
@@ -100,9 +101,14 @@
         }
 
         protected void onMessage(Buffer buffer) throws IOException {
-            OutputStream invertedIn = channel.getInvertedIn();
-            invertedIn.write(buffer.array(), buffer.rpos(), buffer.available());
-            invertedIn.flush();
+            IoOutputStream asyncIn = channel.getAsyncIn();
+            if (asyncIn != null) {
+                asyncIn.writePacket(buffer);
+            } else {
+                OutputStream invertedIn = channel.getInvertedIn();
+                invertedIn.write(buffer.array(), buffer.rpos(), buffer.available());
+                invertedIn.flush();
+            }
         }
 
         @Override
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java
index ae78076..c743948 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java
@@ -33,8 +33,11 @@
 import org.apache.sshd.common.NamedResource;
 import org.apache.sshd.common.SshConstants;
 import org.apache.sshd.common.SshException;
+import org.apache.sshd.common.channel.ChannelAsyncInputStream;
+import org.apache.sshd.common.channel.ChannelAsyncOutputStream;
 import org.apache.sshd.common.channel.ChannelOutputStream;
 import org.apache.sshd.common.channel.Window;
+import org.apache.sshd.common.future.CloseFuture;
 import org.apache.sshd.common.io.IoSession;
 import org.apache.sshd.common.session.Session;
 import org.apache.sshd.common.util.ValidateUtils;
@@ -161,12 +164,24 @@
     @Override
     protected synchronized void doOpen() throws IOException {
         if (streaming == Streaming.Async) {
-            throw new IllegalArgumentException("Asynchronous streaming isn't supported yet on this channel");
+            asyncIn = new ChannelAsyncOutputStream(this, SshConstants.SSH_MSG_CHANNEL_DATA) {
+                @SuppressWarnings("synthetic-access")
+                @Override
+                protected CloseFuture doCloseGracefully() {
+                    try {
+                        sendEof();
+                    } catch (IOException e) {
+                        getSession().exceptionCaught(e);
+                    }
+                    return super.doCloseGracefully();
+                }
+            };
+            asyncOut = new ChannelAsyncInputStream(this);
+        } else {
+            out = new ChannelOutputStream(
+                    this, getRemoteWindow(), log, SshConstants.SSH_MSG_CHANNEL_DATA, true);
+            invertedIn = out;
         }
-
-        out = new ChannelOutputStream(
-                this, getRemoteWindow(), log, SshConstants.SSH_MSG_CHANNEL_DATA, true);
-        invertedIn = out;
     }
 
     @Override