RATIS-2035. Refactor streaming code for Read. (#1046)
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/ChannelMap.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/ChannelMap.java
new file mode 100644
index 0000000..7b0d761
--- /dev/null
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/ChannelMap.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.netty.server;
+
+import org.apache.ratis.protocol.ClientInvocationId;
+import org.apache.ratis.thirdparty.io.netty.channel.ChannelId;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** Map: {@link ChannelId} -> {@link ClientInvocationId}s. */
+class ChannelMap {
+ private final Map<ChannelId, Map<ClientInvocationId, ClientInvocationId>> map = new ConcurrentHashMap<>();
+
+ void add(ChannelId channelId, ClientInvocationId clientInvocationId) {
+ map.computeIfAbsent(channelId, (e) -> new ConcurrentHashMap<>())
+ .put(clientInvocationId, clientInvocationId);
+ }
+
+ void remove(ChannelId channelId, ClientInvocationId clientInvocationId) {
+ Optional.ofNullable(map.get(channelId))
+ .ifPresent((ids) -> ids.remove(clientInvocationId));
+ }
+
+ Set<ClientInvocationId> remove(ChannelId channelId) {
+ return Optional.ofNullable(map.remove(channelId))
+ .map(Map::keySet)
+ .orElse(Collections.emptySet());
+ }
+}
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index 302aed9..a6e9b81 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -70,13 +70,10 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
-import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
@@ -219,52 +216,10 @@
}
}
- static class StreamMap {
- private final ConcurrentMap<ClientInvocationId, StreamInfo> map = new ConcurrentHashMap<>();
-
- StreamInfo computeIfAbsent(ClientInvocationId key, Function<ClientInvocationId, StreamInfo> function) {
- final StreamInfo info = map.computeIfAbsent(key, function);
- LOG.debug("computeIfAbsent({}) returns {}", key, info);
- return info;
- }
-
- StreamInfo get(ClientInvocationId key) {
- final StreamInfo info = map.get(key);
- LOG.debug("get({}) returns {}", key, info);
- return info;
- }
-
- StreamInfo remove(ClientInvocationId key) {
- final StreamInfo info = map.remove(key);
- LOG.debug("remove({}) returns {}", key, info);
- return info;
- }
- }
-
- public static class ChannelMap {
- private final Map<ChannelId, Map<ClientInvocationId, ClientInvocationId>> map = new ConcurrentHashMap<>();
-
- public void add(ChannelId channelId,
- ClientInvocationId clientInvocationId) {
- map.computeIfAbsent(channelId, (e) -> new ConcurrentHashMap<>()).put(clientInvocationId, clientInvocationId);
- }
-
- public void remove(ChannelId channelId,
- ClientInvocationId clientInvocationId) {
- Optional.ofNullable(map.get(channelId)).ifPresent((ids) -> ids.remove(clientInvocationId));
- }
-
- public Set<ClientInvocationId> remove(ChannelId channelId) {
- return Optional.ofNullable(map.remove(channelId))
- .map(Map::keySet)
- .orElse(Collections.emptySet());
- }
- }
-
private final RaftServer server;
private final String name;
- private final StreamMap streams = new StreamMap();
+ private final StreamMap<StreamInfo> streams = new StreamMap<>();
private final ChannelMap channels;
private final ExecutorService requestExecutor;
private final ExecutorService writeExecutor;
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/StreamMap.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/StreamMap.java
new file mode 100644
index 0000000..073698c
--- /dev/null
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/StreamMap.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.netty.server;
+
+import org.apache.ratis.protocol.ClientInvocationId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Function;
+
+/**
+ * Map: {@link ClientInvocationId} -> {@link STREAM}.
+ *
+ * @param <STREAM> the stream type.
+ */
+class StreamMap<STREAM> {
+ public static final Logger LOG = LoggerFactory.getLogger(StreamMap.class);
+
+ private final ConcurrentMap<ClientInvocationId, STREAM> map = new ConcurrentHashMap<>();
+
+ STREAM computeIfAbsent(ClientInvocationId key, Function<ClientInvocationId, STREAM> function) {
+ final STREAM info = map.computeIfAbsent(key, function);
+ LOG.debug("computeIfAbsent({}) returns {}", key, info);
+ return info;
+ }
+
+ STREAM get(ClientInvocationId key) {
+ final STREAM info = map.get(key);
+ LOG.debug("get({}) returns {}", key, info);
+ return info;
+ }
+
+ STREAM remove(ClientInvocationId key) {
+ final STREAM info = map.remove(key);
+ LOG.debug("remove({}) returns {}", key, info);
+ return info;
+ }
+}
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/DataStreamServerRpc.java b/ratis-server-api/src/main/java/org/apache/ratis/server/DataStreamServerRpc.java
index 4e948c6..6316ef6 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/DataStreamServerRpc.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/DataStreamServerRpc.java
@@ -20,18 +20,10 @@
import org.apache.ratis.protocol.RaftPeer;
import java.io.Closeable;
-import java.net.InetSocketAddress;
/**
* A server interface handling incoming streams
* Relays those streams to other servers after persisting
*/
-public interface DataStreamServerRpc extends RaftPeer.Add, Closeable {
- /**
- * start server
- */
- void start();
-
- /** @return the address where this RPC server is listening to. */
- InetSocketAddress getInetSocketAddress();
+public interface DataStreamServerRpc extends ServerRpc, RaftPeer.Add, Closeable {
}
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerRpc.java b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerRpc.java
index d81f9cc..76bd817 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerRpc.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerRpc.java
@@ -26,20 +26,13 @@
import org.apache.ratis.util.JavaUtils;
import java.io.Closeable;
-import java.io.IOException;
import java.net.InetSocketAddress;
/**
* An server-side interface for supporting different RPC implementations
* such as Netty, gRPC and Hadoop.
*/
-public interface RaftServerRpc extends RaftServerProtocol, RpcType.Get, RaftPeer.Add, Closeable {
- /** Start the RPC service. */
- void start() throws IOException;
-
- /** @return the address where this RPC server is listening */
- InetSocketAddress getInetSocketAddress();
-
+public interface RaftServerRpc extends RaftServerProtocol, ServerRpc, RpcType.Get, RaftPeer.Add, Closeable {
/** @return the address where this RPC server is listening for client requests */
default InetSocketAddress getClientServerAddress() {
return getInetSocketAddress();
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/ServerRpc.java b/ratis-server-api/src/main/java/org/apache/ratis/server/ServerRpc.java
new file mode 100644
index 0000000..6ad5eac
--- /dev/null
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/ServerRpc.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+/**
+ * A general server interface.
+ */
+public interface ServerRpc extends Closeable {
+ /** Start the RPC service. */
+ void start() throws IOException;
+
+ /** @return the address where this RPC server is listening to. */
+ InetSocketAddress getInetSocketAddress();
+}
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index 70e26af..2ac01ac 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -67,7 +67,7 @@
return raftServer;
}
- void start() {
+ void start() throws IOException {
dataStreamServer.getServerRpc().start();
}
@@ -90,7 +90,7 @@
return servers.get(0);
}
- void setup(RaftGroupId groupId, List<RaftPeer> peers, List<RaftServer> raftServers) {
+ void setup(RaftGroupId groupId, List<RaftPeer> peers, List<RaftServer> raftServers) throws Exception {
raftGroup = RaftGroup.valueOf(groupId, peers);
this.peers = peers;
servers = new ArrayList<>(peers.size());