RATIS-2027. Ratis Streaming: Remote Stream copy data to heap. (#1044)
* RATIS-2027. Ratis Streaming: Remote Stream copy data to heap.
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index 26d01c3..ba91866 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -40,6 +40,7 @@
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
import org.apache.ratis.rpc.CallId;
+import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.ratis.util.IOUtils;
import org.apache.ratis.protocol.*;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
@@ -169,6 +170,10 @@
return f;
}
+ public CompletableFuture<DataStreamReply> writeAsync(ByteBuf src, Iterable<WriteOption> options) {
+ return writeAsyncImpl(src, src.readableBytes(), options);
+ }
+
@Override
public CompletableFuture<DataStreamReply> writeAsync(ByteBuffer src, Iterable<WriteOption> options) {
return writeAsyncImpl(src, src.remaining(), options);
@@ -235,7 +240,7 @@
}
@Override
- public DataStreamOutputRpc stream(RaftClientRequest request) {
+ public DataStreamOutputImpl stream(RaftClientRequest request) {
return new DataStreamOutputImpl(request);
}
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
index 989c00c..2757555 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
@@ -21,12 +21,14 @@
import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.impl.DataStreamPacketByteBuffer;
+import org.apache.ratis.datastream.impl.DataStreamRequestByteBuf;
import org.apache.ratis.datastream.impl.DataStreamRequestByteBuffer;
import org.apache.ratis.datastream.impl.DataStreamRequestFilePositionCount;
import org.apache.ratis.io.FilePositionCount;
import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.DataStreamRequest;
import org.apache.ratis.protocol.DataStreamRequestHeader;
+import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.ratis.util.IOUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.SlidingWindow;
@@ -56,6 +58,8 @@
DataStreamRequest getDataStreamRequest() {
if (header.getDataLength() == 0) {
return new DataStreamRequestByteBuffer(header, DataStreamPacketByteBuffer.EMPTY_BYTE_BUFFER);
+ } else if (data instanceof ByteBuf) {
+ return new DataStreamRequestByteBuf(header, (ByteBuf)data);
} else if (data instanceof ByteBuffer) {
return new DataStreamRequestByteBuffer(header, (ByteBuffer)data);
} else if (data instanceof FilePositionCount) {
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuf.java
similarity index 96%
rename from ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
rename to ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuf.java
index 2542b1e..1873bec 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
+++ b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuf.java
@@ -16,9 +16,8 @@
* limitations under the License.
*/
-package org.apache.ratis.netty.server;
+package org.apache.ratis.datastream.impl;
-import org.apache.ratis.datastream.impl.DataStreamPacketImpl;
import org.apache.ratis.io.WriteOption;
import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
import org.apache.ratis.protocol.ClientId;
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
index bd03fef..aa46cba 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
@@ -23,7 +23,7 @@
import org.apache.ratis.io.FilePositionCount;
import org.apache.ratis.io.StandardWriteOption;
import org.apache.ratis.io.WriteOption;
-import org.apache.ratis.netty.server.DataStreamRequestByteBuf;
+import org.apache.ratis.datastream.impl.DataStreamRequestByteBuf;
import org.apache.ratis.proto.RaftProtos.DataStreamReplyHeaderProto;
import org.apache.ratis.proto.RaftProtos.DataStreamRequestHeaderProto;
import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto;
@@ -128,6 +128,20 @@
out.accept(Unpooled.wrappedBuffer(buffer));
}
+ static void encodeDataStreamRequestByteBuf(DataStreamRequestByteBuf request, Consumer<Object> out,
+ ByteBufAllocator allocator) {
+ encodeDataStreamRequestHeader(request, out, allocator);
+ encodeByteBuf(request.slice(), out);
+ }
+
+ static void encodeByteBuf(ByteBuf buffer, Consumer<Object> out) {
+ if (buffer.readableBytes() == 0) {
+ out.accept(Unpooled.EMPTY_BUFFER); // to avoid EncoderException: must produce at least one message
+ return;
+ }
+ out.accept(buffer);
+ }
+
static void encodeDataStreamRequestFilePositionCount(
DataStreamRequestFilePositionCount request, Consumer<Object> out, ByteBufAllocator allocator) {
encodeDataStreamRequestHeader(request, out, allocator);
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
index 020acc2..b2dc381 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
@@ -21,6 +21,7 @@
import org.apache.ratis.client.DataStreamClientRpc;
import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.datastream.impl.DataStreamRequestByteBuf;
import org.apache.ratis.datastream.impl.DataStreamRequestByteBuffer;
import org.apache.ratis.datastream.impl.DataStreamRequestFilePositionCount;
import org.apache.ratis.io.StandardWriteOption;
@@ -370,6 +371,7 @@
p.addLast(ENCODER);
p.addLast(ENCODER_FILE_POSITION_COUNT);
p.addLast(ENCODER_BYTE_BUFFER);
+ p.addLast(ENCODER_BYTE_BUF);
p.addLast(newDecoder());
p.addLast(handler);
}
@@ -386,6 +388,16 @@
}
}
+ static final MessageToMessageEncoder<DataStreamRequestByteBuf> ENCODER_BYTE_BUF = new EncoderByteBuf();
+
+ @ChannelHandler.Sharable
+ static class EncoderByteBuf extends MessageToMessageEncoder<DataStreamRequestByteBuf> {
+ @Override
+ protected void encode(ChannelHandlerContext context, DataStreamRequestByteBuf request, List<Object> out) {
+ NettyDataStreamUtils.encodeDataStreamRequestByteBuf(request, out::add, context.alloc());
+ }
+ }
+
static final MessageToMessageEncoder<DataStreamRequestFilePositionCount> ENCODER_FILE_POSITION_COUNT
= new EncoderFilePositionCount();
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index 276a365..302aed9 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -18,10 +18,11 @@
package org.apache.ratis.netty.server;
-import org.apache.ratis.client.DataStreamOutputRpc;
import org.apache.ratis.client.impl.ClientProtoUtils;
+import org.apache.ratis.client.impl.DataStreamClientImpl.DataStreamOutputImpl;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
+import org.apache.ratis.datastream.impl.DataStreamRequestByteBuf;
import org.apache.ratis.io.StandardWriteOption;
import org.apache.ratis.io.WriteOption;
import org.apache.ratis.metrics.Timekeeper;
@@ -111,12 +112,12 @@
}
static class RemoteStream {
- private final DataStreamOutputRpc out;
+ private final DataStreamOutputImpl out;
private final AtomicReference<CompletableFuture<DataStreamReply>> sendFuture
= new AtomicReference<>(CompletableFuture.completedFuture(null));
private final RequestMetrics metrics;
- RemoteStream(DataStreamOutputRpc out, RequestMetrics metrics) {
+ RemoteStream(DataStreamOutputImpl out, RequestMetrics metrics) {
this.metrics = metrics;
this.out = out;
}
@@ -132,7 +133,7 @@
CompletableFuture<DataStreamReply> write(DataStreamRequestByteBuf request, Executor executor) {
final Timekeeper.Context context = metrics.start();
return composeAsync(sendFuture, executor,
- n -> out.writeAsync(request.slice().nioBuffer(), addFlush(request.getWriteOptionList()))
+ n -> out.writeAsync(request.slice().retain(), addFlush(request.getWriteOptionList()))
.whenComplete((l, e) -> metrics.stop(context, e == null)));
}
}
@@ -147,7 +148,7 @@
= new AtomicReference<>(CompletableFuture.completedFuture(null));
StreamInfo(RaftClientRequest request, boolean primary, CompletableFuture<DataStream> stream, RaftServer server,
- CheckedBiFunction<RaftClientRequest, Set<RaftPeer>, Set<DataStreamOutputRpc>, IOException> getStreams,
+ CheckedBiFunction<RaftClientRequest, Set<RaftPeer>, Set<DataStreamOutputImpl>, IOException> getStreams,
Function<RequestType, RequestMetrics> metricsConstructor)
throws IOException {
this.request = request;
@@ -155,7 +156,7 @@
this.local = new LocalStream(stream, metricsConstructor.apply(RequestType.LOCAL_WRITE));
this.server = server;
final Set<RaftPeer> successors = getSuccessors(server.getId());
- final Set<DataStreamOutputRpc> outs = getStreams.apply(request, successors);
+ final Set<DataStreamOutputImpl> outs = getStreams.apply(request, successors);
this.remotes = outs.stream()
.map(o -> new RemoteStream(o, metricsConstructor.apply(RequestType.REMOTE_WRITE)))
.collect(Collectors.toSet());
@@ -315,7 +316,7 @@
}
private StreamInfo newStreamInfo(ByteBuf buf,
- CheckedBiFunction<RaftClientRequest, Set<RaftPeer>, Set<DataStreamOutputRpc>, IOException> getStreams) {
+ CheckedBiFunction<RaftClientRequest, Set<RaftPeer>, Set<DataStreamOutputImpl>, IOException> getStreams) {
try {
final RaftClientRequest request = ClientProtoUtils.toRaftClientRequest(
RaftClientRequestProto.parseFrom(buf.nioBuffer()));
@@ -449,7 +450,7 @@
}
void read(DataStreamRequestByteBuf request, ChannelHandlerContext ctx,
- CheckedBiFunction<RaftClientRequest, Set<RaftPeer>, Set<DataStreamOutputRpc>, IOException> getStreams) {
+ CheckedBiFunction<RaftClientRequest, Set<RaftPeer>, Set<DataStreamOutputImpl>, IOException> getStreams) {
LOG.debug("{}: read {}", this, request);
try {
readImpl(request, ctx, getStreams);
@@ -459,7 +460,7 @@
}
private void readImpl(DataStreamRequestByteBuf request, ChannelHandlerContext ctx,
- CheckedBiFunction<RaftClientRequest, Set<RaftPeer>, Set<DataStreamOutputRpc>, IOException> getStreams) {
+ CheckedBiFunction<RaftClientRequest, Set<RaftPeer>, Set<DataStreamOutputImpl>, IOException> getStreams) {
final boolean close = request.getWriteOptionList().contains(StandardWriteOption.CLOSE);
ClientInvocationId key = ClientInvocationId.valueOf(request.getClientId(), request.getStreamId());
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index c5f24b0..451040b 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -20,9 +20,11 @@
import org.apache.ratis.client.DataStreamClient;
import org.apache.ratis.client.DataStreamOutputRpc;
+import org.apache.ratis.client.impl.DataStreamClientImpl.DataStreamOutputImpl;
import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
+import org.apache.ratis.datastream.impl.DataStreamRequestByteBuf;
import org.apache.ratis.netty.NettyConfigKeys;
import org.apache.ratis.netty.NettyDataStreamUtils;
import org.apache.ratis.netty.NettyUtils;
@@ -90,8 +92,8 @@
map.addRaftPeers(newPeers);
}
- Set<DataStreamOutputRpc> getDataStreamOutput(RaftClientRequest request, Set<RaftPeer> peers) throws IOException {
- final Set<DataStreamOutputRpc> outs = new HashSet<>();
+ Set<DataStreamOutputImpl> getDataStreamOutput(RaftClientRequest request, Set<RaftPeer> peers) throws IOException {
+ final Set<DataStreamOutputImpl> outs = new HashSet<>();
try {
getDataStreamOutput(request, peers, outs);
} catch (IOException e) {
@@ -101,11 +103,11 @@
return outs;
}
- private void getDataStreamOutput(RaftClientRequest request, Set<RaftPeer> peers, Set<DataStreamOutputRpc> outs)
+ private void getDataStreamOutput(RaftClientRequest request, Set<RaftPeer> peers, Set<DataStreamOutputImpl> outs)
throws IOException {
for (RaftPeer peer : peers) {
try {
- outs.add((DataStreamOutputRpc) map.computeIfAbsent(peer).get().stream(request));
+ outs.add((DataStreamOutputImpl) map.computeIfAbsent(peer).get().stream(request));
} catch (IOException e) {
map.handleException(peer.getId(), e, true);
throw new IOException(map.getName() + ": Failed to getDataStreamOutput for " + peer, e);
@@ -238,7 +240,7 @@
}
@Override
- public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ public void channelInactive(ChannelHandlerContext ctx) {
requests.cleanUpOnChannelInactive(ctx.channel().id(), channelInactiveGracePeriod);
}