THRIFT-5288: Move Support for ByteBuffer into TTransport
Client: Java
Patch: David Mollitor

This closes #2254
diff --git a/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java b/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java
index d5c608d..a119f23 100644
--- a/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java
+++ b/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java
@@ -217,9 +217,9 @@
     state = State.ERROR;
   }
 
-  private void doReadingResponseBody(SelectionKey key) throws IOException {
+  private void doReadingResponseBody(SelectionKey key) throws TTransportException {
     if (transport.read(frameBuffer) < 0) {
-      throw new IOException("Read call frame failed");
+      throw new TTransportException(TTransportException.END_OF_FILE, "Read call frame failed");
     }
     if (frameBuffer.remaining() == 0) {
       cleanUpAndFireCallback(key);
@@ -241,9 +241,9 @@
     }
   }
 
-  private void doReadingResponseSize() throws IOException {
+  private void doReadingResponseSize() throws TTransportException {
     if (transport.read(sizeBuffer) < 0) {
-      throw new IOException("Read call frame size failed");
+      throw new TTransportException(TTransportException.END_OF_FILE, "Read call frame size failed");
     }
     if (sizeBuffer.remaining() == 0) {
       state = State.READING_RESPONSE_BODY;
@@ -251,9 +251,9 @@
     }
   }
 
-  private void doWritingRequestBody(SelectionKey key) throws IOException {
+  private void doWritingRequestBody(SelectionKey key) throws TTransportException {
     if (transport.write(frameBuffer) < 0) {
-      throw new IOException("Write call frame failed");
+      throw new TTransportException(TTransportException.END_OF_FILE, "Write call frame failed");
     }
     if (frameBuffer.remaining() == 0) {
       if (isOneway) {
@@ -266,9 +266,9 @@
     }
   }
 
-  private void doWritingRequestSize() throws IOException {
+  private void doWritingRequestSize() throws TTransportException {
     if (transport.write(sizeBuffer) < 0) {
-      throw new IOException("Write call frame size failed");
+      throw new TTransportException(TTransportException.END_OF_FILE, "Write call frame size failed");
     }
     if (sizeBuffer.remaining() == 0) {
       state = State.WRITING_REQUEST_BODY;
diff --git a/lib/java/src/org/apache/thrift/protocol/TCompactProtocol.java b/lib/java/src/org/apache/thrift/protocol/TCompactProtocol.java
index 0dfcf25..4f4e21f 100644
--- a/lib/java/src/org/apache/thrift/protocol/TCompactProtocol.java
+++ b/lib/java/src/org/apache/thrift/protocol/TCompactProtocol.java
@@ -363,20 +363,17 @@
    */
   public void writeString(String str) throws TException {
     byte[] bytes = str.getBytes(StandardCharsets.UTF_8);
-    writeBinary(bytes, 0, bytes.length);
+    writeVarint32(bytes.length);
+    trans_.write(bytes, 0, bytes.length);
   }
 
   /**
    * Write a byte array, using a varint for the size.
    */
   public void writeBinary(ByteBuffer bin) throws TException {
-    int length = bin.limit() - bin.position();
-    writeBinary(bin.array(), bin.position() + bin.arrayOffset(), length);
-  }
-
-  private void writeBinary(byte[] buf, int offset, int length) throws TException {
-    writeVarint32(length);
-    trans_.write(buf, offset, length);
+    ByteBuffer bb = bin.asReadOnlyBuffer();
+    writeVarint32(bb.remaining());
+    trans_.write(bb);
   }
 
   //
@@ -694,12 +691,13 @@
   }
 
   /**
-   * Read a byte[] from the wire.
+   * Read a ByteBuffer from the wire.
    */
   public ByteBuffer readBinary() throws TException {
     int length = readVarint32();
-    
-    if (length == 0) return EMPTY_BUFFER;
+    if (length == 0) {
+      return EMPTY_BUFFER;
+    }
     getTransport().checkReadBytesAvailable(length);
     if (trans_.getBytesRemainingInBuffer() >= length) {
       ByteBuffer bb = ByteBuffer.wrap(trans_.getBuffer(), trans_.getBufferPosition(), length);
diff --git a/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java b/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java
index 4aae803..f91e825 100644
--- a/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java
+++ b/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java
@@ -414,8 +414,8 @@
           if (trans_.write(buffer_) < 0) {
             return false;
           }
-        } catch (IOException e) {
-          LOGGER.warn("Got an IOException during write!", e);
+        } catch (TTransportException e) {
+          LOGGER.warn("Got an Exception during write", e);
           return false;
         }
 
@@ -543,8 +543,8 @@
     private boolean internalRead() {
       try {
           return trans_.read(buffer_) >= 0;
-      } catch (IOException e) {
-        LOGGER.warn("Got an IOException in internalRead!", e);
+      } catch (TTransportException e) {
+        LOGGER.warn("Got an Exception in internalRead", e);
         return false;
       }
     }
diff --git a/lib/java/src/org/apache/thrift/transport/TNonblockingSocket.java b/lib/java/src/org/apache/thrift/transport/TNonblockingSocket.java
index 76ed02c..13c8586 100644
--- a/lib/java/src/org/apache/thrift/transport/TNonblockingSocket.java
+++ b/lib/java/src/org/apache/thrift/transport/TNonblockingSocket.java
@@ -144,11 +144,14 @@
   /**
    * Perform a nonblocking read into buffer.
    */
-  public int read(ByteBuffer buffer) throws IOException {
-    return socketChannel_.read(buffer);
+  public int read(ByteBuffer buffer) throws TTransportException {
+    try {
+      return socketChannel_.read(buffer);
+    } catch (IOException iox) {
+      throw new TTransportException(TTransportException.UNKNOWN, iox);
+    }
   }
 
-
   /**
    * Reads from the underlying input stream if not null.
    */
@@ -167,8 +170,12 @@
   /**
    * Perform a nonblocking write of the data in buffer;
    */
-  public int write(ByteBuffer buffer) throws IOException {
-    return socketChannel_.write(buffer);
+  public int write(ByteBuffer buffer) throws TTransportException {
+    try {
+      return socketChannel_.write(buffer);
+    } catch (IOException iox) {
+      throw new TTransportException(TTransportException.UNKNOWN, iox);
+    }
   }
 
   /**
@@ -179,11 +186,7 @@
       throw new TTransportException(TTransportException.NOT_OPEN,
         "Cannot write to write-only socket channel");
     }
-    try {
-      socketChannel_.write(ByteBuffer.wrap(buf, off, len));
-    } catch (IOException iox) {
-      throw new TTransportException(TTransportException.UNKNOWN, iox);
-    }
+    write(ByteBuffer.wrap(buf, off, len));
   }
 
   /**
diff --git a/lib/java/src/org/apache/thrift/transport/TNonblockingTransport.java b/lib/java/src/org/apache/thrift/transport/TNonblockingTransport.java
index 255595d..30ec9d2 100644
--- a/lib/java/src/org/apache/thrift/transport/TNonblockingTransport.java
+++ b/lib/java/src/org/apache/thrift/transport/TNonblockingTransport.java
@@ -23,7 +23,6 @@
 
 import java.io.IOException;
 import java.net.SocketAddress;
-import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 
@@ -47,7 +46,4 @@
 
   public abstract SelectionKey registerSelector(Selector selector, int interests) throws IOException;
 
-  public abstract int read(ByteBuffer buffer) throws IOException;
-
-  public abstract int write(ByteBuffer buffer) throws IOException;
 }
diff --git a/lib/java/src/org/apache/thrift/transport/TTransport.java b/lib/java/src/org/apache/thrift/transport/TTransport.java
index 5645f7f..ee07024 100644
--- a/lib/java/src/org/apache/thrift/transport/TTransport.java
+++ b/lib/java/src/org/apache/thrift/transport/TTransport.java
@@ -22,6 +22,7 @@
 import org.apache.thrift.TConfiguration;
 
 import java.io.Closeable;
+import java.nio.ByteBuffer;
 
 /**
  * Generic class that encapsulates the I/O layer. This is basically a thin
@@ -60,6 +61,26 @@
   public abstract void close();
 
   /**
+   * Reads a sequence of bytes from this channel into the given buffer. An
+   * attempt is made to read up to the number of bytes remaining in the buffer,
+   * that is, dst.remaining(), at the moment this method is invoked. Upon return
+   * the buffer's position will move forward the number of bytes read; its limit
+   * will not have changed. Subclasses are encouraged to provide a more
+   * efficient implementation of this method.
+   *
+   * @param dst The buffer into which bytes are to be transferred
+   * @return The number of bytes read, possibly zero, or -1 if the channel has
+   *         reached end-of-stream
+   * @throws TTransportException if there was an error reading data
+   */
+  public int read(ByteBuffer dst) throws TTransportException {
+    byte[] arr = new byte[dst.remaining()];
+    int n = read(arr, 0, arr.length);
+    dst.put(arr, 0, n);
+    return n;
+  }
+
+  /**
    * Reads up to len bytes into buffer buf, starting at offset off.
    *
    * @param buf Array to read into
@@ -121,6 +142,24 @@
     throws TTransportException;
 
   /**
+   * Writes a sequence of bytes to the buffer. An attempt is made to write all
+   * remaining bytes in the buffer, that is, src.remaining(), at the moment this
+   * method is invoked. Upon return the buffer's position will updated; its limit
+   * will not have changed. Subclasses are encouraged to provide a more efficient
+   * implementation of this method.
+   *
+   * @param src The buffer from which bytes are to be retrieved
+   * @return The number of bytes written, possibly zero
+   * @throws TTransportException if there was an error writing data
+   */
+  public int write(ByteBuffer src) throws TTransportException {
+    byte[] arr = new byte[src.remaining()];
+    src.get(arr);
+    write(arr, 0, arr.length);
+    return arr.length;
+  }
+
+  /**
    * Flush any pending data out of a transport buffer.
    *
    * @throws TTransportException if there was an error writing out data.
diff --git a/lib/java/src/org/apache/thrift/transport/sasl/FrameWriter.java b/lib/java/src/org/apache/thrift/transport/sasl/FrameWriter.java
index e5feba0..4357f13 100644
--- a/lib/java/src/org/apache/thrift/transport/sasl/FrameWriter.java
+++ b/lib/java/src/org/apache/thrift/transport/sasl/FrameWriter.java
@@ -23,6 +23,7 @@
 import java.nio.ByteBuffer;
 
 import org.apache.thrift.transport.TNonblockingTransport;
+import org.apache.thrift.transport.TTransportException;
 
 /**
  * Write frame (header and payload) to transport in a nonblocking way.
@@ -99,9 +100,9 @@
   /**
    * Nonblocking write to the underlying transport.
    *
-   * @throws IOException
+   * @throws TTransportException
    */
-  public void write(TNonblockingTransport transport) throws IOException {
+  public void write(TNonblockingTransport transport) throws TTransportException {
     transport.write(frameBytes);
   }
 
diff --git a/lib/java/src/org/apache/thrift/transport/sasl/NonblockingSaslHandler.java b/lib/java/src/org/apache/thrift/transport/sasl/NonblockingSaslHandler.java
index 4557146..d73c3ec 100644
--- a/lib/java/src/org/apache/thrift/transport/sasl/NonblockingSaslHandler.java
+++ b/lib/java/src/org/apache/thrift/transport/sasl/NonblockingSaslHandler.java
@@ -19,7 +19,6 @@
 
 package org.apache.thrift.transport.sasl;
 
-import java.io.IOException;
 import java.nio.channels.SelectionKey;
 import java.nio.charset.StandardCharsets;
 
@@ -364,7 +363,7 @@
         saslChallenge.clear();
         nextPhase = Phase.READING_SASL_RESPONSE;
       }
-    } catch (IOException e) {
+    } catch (TTransportException e) {
       fail(e);
     }
   }
@@ -378,7 +377,7 @@
         saslResponse = null;
         nextPhase = Phase.READING_REQUEST;
       }
-    } catch (IOException e) {
+    } catch (TTransportException e) {
       fail(e);
     }
   }
@@ -389,7 +388,7 @@
       if (saslChallenge.isComplete()) {
         nextPhase = Phase.CLOSING;
       }
-    } catch (IOException e) {
+    } catch (TTransportException e) {
       fail(e);
     }
   }
@@ -401,7 +400,7 @@
         responseWriter.clear();
         nextPhase = Phase.READING_REQUEST;
       }
-    } catch (IOException e) {
+    } catch (TTransportException e) {
       fail(e);
     }
   }
diff --git a/lib/java/test/org/apache/thrift/transport/sasl/TestDataFrameWriter.java b/lib/java/test/org/apache/thrift/transport/sasl/TestDataFrameWriter.java
index d242593..60fe5c9 100644
--- a/lib/java/test/org/apache/thrift/transport/sasl/TestDataFrameWriter.java
+++ b/lib/java/test/org/apache/thrift/transport/sasl/TestDataFrameWriter.java
@@ -72,7 +72,7 @@
   }
 
   @Test
-  public void testWrite() throws IOException {
+  public void testWrite() throws Exception {
     DataFrameWriter frameWriter = new DataFrameWriter();
     frameWriter.withOnlyPayload(BYTES);
     // Slow socket which writes one byte per call.