RATIS-2027. Ratis Streaming: Remote Stream copy data to heap. (#1044)

* RATIS-2027. Ratis Streaming: Remote Stream copy data to heap.
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index 26d01c3..ba91866 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -40,6 +40,7 @@
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
 import org.apache.ratis.rpc.CallId;
+import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
 import org.apache.ratis.util.IOUtils;
 import org.apache.ratis.protocol.*;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
@@ -169,6 +170,10 @@
       return f;
     }
 
+    public CompletableFuture<DataStreamReply> writeAsync(ByteBuf src, Iterable<WriteOption> options) {
+      return writeAsyncImpl(src, src.readableBytes(), options);
+    }
+
     @Override
     public CompletableFuture<DataStreamReply> writeAsync(ByteBuffer src, Iterable<WriteOption> options) {
       return writeAsyncImpl(src, src.remaining(), options);
@@ -235,7 +240,7 @@
   }
 
   @Override
-  public DataStreamOutputRpc stream(RaftClientRequest request) {
+  public DataStreamOutputImpl stream(RaftClientRequest request) {
     return new DataStreamOutputImpl(request);
   }
 
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
index 989c00c..2757555 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
@@ -21,12 +21,14 @@
 import org.apache.ratis.client.RaftClientConfigKeys;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.datastream.impl.DataStreamPacketByteBuffer;
+import org.apache.ratis.datastream.impl.DataStreamRequestByteBuf;
 import org.apache.ratis.datastream.impl.DataStreamRequestByteBuffer;
 import org.apache.ratis.datastream.impl.DataStreamRequestFilePositionCount;
 import org.apache.ratis.io.FilePositionCount;
 import org.apache.ratis.protocol.DataStreamReply;
 import org.apache.ratis.protocol.DataStreamRequest;
 import org.apache.ratis.protocol.DataStreamRequestHeader;
+import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
 import org.apache.ratis.util.IOUtils;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.SlidingWindow;
@@ -56,6 +58,8 @@
     DataStreamRequest getDataStreamRequest() {
       if (header.getDataLength() == 0) {
         return new DataStreamRequestByteBuffer(header, DataStreamPacketByteBuffer.EMPTY_BYTE_BUFFER);
+      } else if (data instanceof ByteBuf) {
+        return new DataStreamRequestByteBuf(header, (ByteBuf)data);
       } else if (data instanceof ByteBuffer) {
         return new DataStreamRequestByteBuffer(header, (ByteBuffer)data);
       } else if (data instanceof FilePositionCount) {
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuf.java
similarity index 96%
rename from ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
rename to ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuf.java
index 2542b1e..1873bec 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
+++ b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuf.java
@@ -16,9 +16,8 @@
  *  limitations under the License.
  */
 
-package org.apache.ratis.netty.server;
+package org.apache.ratis.datastream.impl;
 
-import org.apache.ratis.datastream.impl.DataStreamPacketImpl;
 import org.apache.ratis.io.WriteOption;
 import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
 import org.apache.ratis.protocol.ClientId;
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
index bd03fef..aa46cba 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
@@ -23,7 +23,7 @@
 import org.apache.ratis.io.FilePositionCount;
 import org.apache.ratis.io.StandardWriteOption;
 import org.apache.ratis.io.WriteOption;
-import org.apache.ratis.netty.server.DataStreamRequestByteBuf;
+import org.apache.ratis.datastream.impl.DataStreamRequestByteBuf;
 import org.apache.ratis.proto.RaftProtos.DataStreamReplyHeaderProto;
 import org.apache.ratis.proto.RaftProtos.DataStreamRequestHeaderProto;
 import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto;
@@ -128,6 +128,20 @@
     out.accept(Unpooled.wrappedBuffer(buffer));
   }
 
+  static void encodeDataStreamRequestByteBuf(DataStreamRequestByteBuf request, Consumer<Object> out,
+      ByteBufAllocator allocator) {
+    encodeDataStreamRequestHeader(request, out, allocator);
+    encodeByteBuf(request.slice(), out);
+  }
+
+  static void encodeByteBuf(ByteBuf buffer, Consumer<Object> out) {
+    if (buffer.readableBytes() == 0) {
+      out.accept(Unpooled.EMPTY_BUFFER); // to avoid EncoderException: must produce at least one message
+      return;
+    }
+    out.accept(buffer);
+  }
+
   static void encodeDataStreamRequestFilePositionCount(
       DataStreamRequestFilePositionCount request, Consumer<Object> out, ByteBufAllocator allocator) {
     encodeDataStreamRequestHeader(request, out, allocator);
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
index 020acc2..b2dc381 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
@@ -21,6 +21,7 @@
 import org.apache.ratis.client.DataStreamClientRpc;
 import org.apache.ratis.client.RaftClientConfigKeys;
 import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.datastream.impl.DataStreamRequestByteBuf;
 import org.apache.ratis.datastream.impl.DataStreamRequestByteBuffer;
 import org.apache.ratis.datastream.impl.DataStreamRequestFilePositionCount;
 import org.apache.ratis.io.StandardWriteOption;
@@ -370,6 +371,7 @@
         p.addLast(ENCODER);
         p.addLast(ENCODER_FILE_POSITION_COUNT);
         p.addLast(ENCODER_BYTE_BUFFER);
+        p.addLast(ENCODER_BYTE_BUF);
         p.addLast(newDecoder());
         p.addLast(handler);
       }
@@ -386,6 +388,16 @@
     }
   }
 
+  static final MessageToMessageEncoder<DataStreamRequestByteBuf> ENCODER_BYTE_BUF = new EncoderByteBuf();
+
+  @ChannelHandler.Sharable
+  static class EncoderByteBuf extends MessageToMessageEncoder<DataStreamRequestByteBuf> {
+    @Override
+    protected void encode(ChannelHandlerContext context, DataStreamRequestByteBuf request, List<Object> out) {
+      NettyDataStreamUtils.encodeDataStreamRequestByteBuf(request, out::add, context.alloc());
+    }
+  }
+
   static final MessageToMessageEncoder<DataStreamRequestFilePositionCount> ENCODER_FILE_POSITION_COUNT
       = new EncoderFilePositionCount();
 
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index 276a365..302aed9 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -18,10 +18,11 @@
 
 package org.apache.ratis.netty.server;
 
-import org.apache.ratis.client.DataStreamOutputRpc;
 import org.apache.ratis.client.impl.ClientProtoUtils;
+import org.apache.ratis.client.impl.DataStreamClientImpl.DataStreamOutputImpl;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
+import org.apache.ratis.datastream.impl.DataStreamRequestByteBuf;
 import org.apache.ratis.io.StandardWriteOption;
 import org.apache.ratis.io.WriteOption;
 import org.apache.ratis.metrics.Timekeeper;
@@ -111,12 +112,12 @@
   }
 
   static class RemoteStream {
-    private final DataStreamOutputRpc out;
+    private final DataStreamOutputImpl out;
     private final AtomicReference<CompletableFuture<DataStreamReply>> sendFuture
         = new AtomicReference<>(CompletableFuture.completedFuture(null));
     private final RequestMetrics metrics;
 
-    RemoteStream(DataStreamOutputRpc out, RequestMetrics metrics) {
+    RemoteStream(DataStreamOutputImpl out, RequestMetrics metrics) {
       this.metrics = metrics;
       this.out = out;
     }
@@ -132,7 +133,7 @@
     CompletableFuture<DataStreamReply> write(DataStreamRequestByteBuf request, Executor executor) {
       final Timekeeper.Context context = metrics.start();
       return composeAsync(sendFuture, executor,
-          n -> out.writeAsync(request.slice().nioBuffer(), addFlush(request.getWriteOptionList()))
+          n -> out.writeAsync(request.slice().retain(), addFlush(request.getWriteOptionList()))
               .whenComplete((l, e) -> metrics.stop(context, e == null)));
     }
   }
@@ -147,7 +148,7 @@
         = new AtomicReference<>(CompletableFuture.completedFuture(null));
 
     StreamInfo(RaftClientRequest request, boolean primary, CompletableFuture<DataStream> stream, RaftServer server,
-        CheckedBiFunction<RaftClientRequest, Set<RaftPeer>, Set<DataStreamOutputRpc>, IOException> getStreams,
+        CheckedBiFunction<RaftClientRequest, Set<RaftPeer>, Set<DataStreamOutputImpl>, IOException> getStreams,
         Function<RequestType, RequestMetrics> metricsConstructor)
         throws IOException {
       this.request = request;
@@ -155,7 +156,7 @@
       this.local = new LocalStream(stream, metricsConstructor.apply(RequestType.LOCAL_WRITE));
       this.server = server;
       final Set<RaftPeer> successors = getSuccessors(server.getId());
-      final Set<DataStreamOutputRpc> outs = getStreams.apply(request, successors);
+      final Set<DataStreamOutputImpl> outs = getStreams.apply(request, successors);
       this.remotes = outs.stream()
           .map(o -> new RemoteStream(o, metricsConstructor.apply(RequestType.REMOTE_WRITE)))
           .collect(Collectors.toSet());
@@ -315,7 +316,7 @@
   }
 
   private StreamInfo newStreamInfo(ByteBuf buf,
-      CheckedBiFunction<RaftClientRequest, Set<RaftPeer>, Set<DataStreamOutputRpc>, IOException> getStreams) {
+      CheckedBiFunction<RaftClientRequest, Set<RaftPeer>, Set<DataStreamOutputImpl>, IOException> getStreams) {
     try {
       final RaftClientRequest request = ClientProtoUtils.toRaftClientRequest(
           RaftClientRequestProto.parseFrom(buf.nioBuffer()));
@@ -449,7 +450,7 @@
   }
 
   void read(DataStreamRequestByteBuf request, ChannelHandlerContext ctx,
-      CheckedBiFunction<RaftClientRequest, Set<RaftPeer>, Set<DataStreamOutputRpc>, IOException> getStreams) {
+      CheckedBiFunction<RaftClientRequest, Set<RaftPeer>, Set<DataStreamOutputImpl>, IOException> getStreams) {
     LOG.debug("{}: read {}", this, request);
     try {
       readImpl(request, ctx, getStreams);
@@ -459,7 +460,7 @@
   }
 
   private void readImpl(DataStreamRequestByteBuf request, ChannelHandlerContext ctx,
-      CheckedBiFunction<RaftClientRequest, Set<RaftPeer>, Set<DataStreamOutputRpc>, IOException> getStreams) {
+      CheckedBiFunction<RaftClientRequest, Set<RaftPeer>, Set<DataStreamOutputImpl>, IOException> getStreams) {
     final boolean close = request.getWriteOptionList().contains(StandardWriteOption.CLOSE);
     ClientInvocationId key =  ClientInvocationId.valueOf(request.getClientId(), request.getStreamId());
 
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index c5f24b0..451040b 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -20,9 +20,11 @@
 
 import org.apache.ratis.client.DataStreamClient;
 import org.apache.ratis.client.DataStreamOutputRpc;
+import org.apache.ratis.client.impl.DataStreamClientImpl.DataStreamOutputImpl;
 import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
+import org.apache.ratis.datastream.impl.DataStreamRequestByteBuf;
 import org.apache.ratis.netty.NettyConfigKeys;
 import org.apache.ratis.netty.NettyDataStreamUtils;
 import org.apache.ratis.netty.NettyUtils;
@@ -90,8 +92,8 @@
       map.addRaftPeers(newPeers);
     }
 
-    Set<DataStreamOutputRpc> getDataStreamOutput(RaftClientRequest request, Set<RaftPeer> peers) throws IOException {
-      final Set<DataStreamOutputRpc> outs = new HashSet<>();
+    Set<DataStreamOutputImpl> getDataStreamOutput(RaftClientRequest request, Set<RaftPeer> peers) throws IOException {
+      final Set<DataStreamOutputImpl> outs = new HashSet<>();
       try {
         getDataStreamOutput(request, peers, outs);
       } catch (IOException e) {
@@ -101,11 +103,11 @@
       return outs;
     }
 
-    private void getDataStreamOutput(RaftClientRequest request, Set<RaftPeer> peers, Set<DataStreamOutputRpc> outs)
+    private void getDataStreamOutput(RaftClientRequest request, Set<RaftPeer> peers, Set<DataStreamOutputImpl> outs)
         throws IOException {
       for (RaftPeer peer : peers) {
         try {
-          outs.add((DataStreamOutputRpc) map.computeIfAbsent(peer).get().stream(request));
+          outs.add((DataStreamOutputImpl) map.computeIfAbsent(peer).get().stream(request));
         } catch (IOException e) {
           map.handleException(peer.getId(), e, true);
           throw new IOException(map.getName() + ": Failed to getDataStreamOutput for " + peer, e);
@@ -238,7 +240,7 @@
       }
 
       @Override
-      public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+      public void channelInactive(ChannelHandlerContext ctx) {
         requests.cleanUpOnChannelInactive(ctx.channel().id(), channelInactiveGracePeriod);
       }