RATIS-2035. Refactor streaming code for Read. (#1046)

diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/ChannelMap.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/ChannelMap.java
new file mode 100644
index 0000000..7b0d761
--- /dev/null
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/ChannelMap.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.ratis.netty.server;
+
+import org.apache.ratis.protocol.ClientInvocationId;
+import org.apache.ratis.thirdparty.io.netty.channel.ChannelId;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** Map: {@link ChannelId} -> {@link ClientInvocationId}s. */
+class ChannelMap {
+  private final Map<ChannelId, Map<ClientInvocationId, ClientInvocationId>> map = new ConcurrentHashMap<>();
+
+  void add(ChannelId channelId, ClientInvocationId clientInvocationId) {
+    map.computeIfAbsent(channelId, (e) -> new ConcurrentHashMap<>())
+        .put(clientInvocationId, clientInvocationId);
+  }
+
+  void remove(ChannelId channelId, ClientInvocationId clientInvocationId) {
+    Optional.ofNullable(map.get(channelId))
+        .ifPresent((ids) -> ids.remove(clientInvocationId));
+  }
+
+  Set<ClientInvocationId> remove(ChannelId channelId) {
+    return Optional.ofNullable(map.remove(channelId))
+        .map(Map::keySet)
+        .orElse(Collections.emptySet());
+  }
+}
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index 302aed9..a6e9b81 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -70,13 +70,10 @@
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicReference;
@@ -219,52 +216,10 @@
     }
   }
 
-  static class StreamMap {
-    private final ConcurrentMap<ClientInvocationId, StreamInfo> map = new ConcurrentHashMap<>();
-
-    StreamInfo computeIfAbsent(ClientInvocationId key, Function<ClientInvocationId, StreamInfo> function) {
-      final StreamInfo info = map.computeIfAbsent(key, function);
-      LOG.debug("computeIfAbsent({}) returns {}", key, info);
-      return info;
-    }
-
-    StreamInfo get(ClientInvocationId key) {
-      final StreamInfo info = map.get(key);
-      LOG.debug("get({}) returns {}", key, info);
-      return info;
-    }
-
-    StreamInfo remove(ClientInvocationId key) {
-      final StreamInfo info = map.remove(key);
-      LOG.debug("remove({}) returns {}", key, info);
-      return info;
-    }
-  }
-
-  public static class ChannelMap {
-    private final Map<ChannelId, Map<ClientInvocationId, ClientInvocationId>> map = new ConcurrentHashMap<>();
-
-    public void add(ChannelId channelId,
-                    ClientInvocationId clientInvocationId) {
-      map.computeIfAbsent(channelId, (e) -> new ConcurrentHashMap<>()).put(clientInvocationId, clientInvocationId);
-    }
-
-    public void remove(ChannelId channelId,
-                       ClientInvocationId clientInvocationId) {
-      Optional.ofNullable(map.get(channelId)).ifPresent((ids) -> ids.remove(clientInvocationId));
-    }
-
-    public Set<ClientInvocationId> remove(ChannelId channelId) {
-      return Optional.ofNullable(map.remove(channelId))
-          .map(Map::keySet)
-          .orElse(Collections.emptySet());
-    }
-  }
-
   private final RaftServer server;
   private final String name;
 
-  private final StreamMap streams = new StreamMap();
+  private final StreamMap<StreamInfo> streams = new StreamMap<>();
   private final ChannelMap channels;
   private final ExecutorService requestExecutor;
   private final ExecutorService writeExecutor;
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/StreamMap.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/StreamMap.java
new file mode 100644
index 0000000..073698c
--- /dev/null
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/StreamMap.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.ratis.netty.server;
+
+import org.apache.ratis.protocol.ClientInvocationId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Function;
+
+/**
+ * Map: {@link ClientInvocationId} -> {@link STREAM}.
+ *
+ * @param <STREAM> the stream type.
+ */
+class StreamMap<STREAM> {
+  public static final Logger LOG = LoggerFactory.getLogger(StreamMap.class);
+
+  private final ConcurrentMap<ClientInvocationId, STREAM> map = new ConcurrentHashMap<>();
+
+  STREAM computeIfAbsent(ClientInvocationId key, Function<ClientInvocationId, STREAM> function) {
+    final STREAM info = map.computeIfAbsent(key, function);
+    LOG.debug("computeIfAbsent({}) returns {}", key, info);
+    return info;
+  }
+
+  STREAM get(ClientInvocationId key) {
+    final STREAM info = map.get(key);
+    LOG.debug("get({}) returns {}", key, info);
+    return info;
+  }
+
+  STREAM remove(ClientInvocationId key) {
+    final STREAM info = map.remove(key);
+    LOG.debug("remove({}) returns {}", key, info);
+    return info;
+  }
+}
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/DataStreamServerRpc.java b/ratis-server-api/src/main/java/org/apache/ratis/server/DataStreamServerRpc.java
index 4e948c6..6316ef6 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/DataStreamServerRpc.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/DataStreamServerRpc.java
@@ -20,18 +20,10 @@
 import org.apache.ratis.protocol.RaftPeer;
 
 import java.io.Closeable;
-import java.net.InetSocketAddress;
 
 /**
  * A server interface handling incoming streams
  * Relays those streams to other servers after persisting
  */
-public interface DataStreamServerRpc extends RaftPeer.Add, Closeable {
-  /**
-   * start server
-   */
-  void start();
-
-  /** @return the address where this RPC server is listening to. */
-  InetSocketAddress getInetSocketAddress();
+public interface DataStreamServerRpc extends ServerRpc, RaftPeer.Add, Closeable {
 }
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerRpc.java b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerRpc.java
index d81f9cc..76bd817 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerRpc.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerRpc.java
@@ -26,20 +26,13 @@
 import org.apache.ratis.util.JavaUtils;
 
 import java.io.Closeable;
-import java.io.IOException;
 import java.net.InetSocketAddress;
 
 /**
  * An server-side interface for supporting different RPC implementations
  * such as Netty, gRPC and Hadoop.
  */
-public interface RaftServerRpc extends RaftServerProtocol, RpcType.Get, RaftPeer.Add, Closeable {
-  /** Start the RPC service. */
-  void start() throws IOException;
-
-  /** @return the address where this RPC server is listening */
-  InetSocketAddress getInetSocketAddress();
-
+public interface RaftServerRpc extends RaftServerProtocol, ServerRpc, RpcType.Get, RaftPeer.Add, Closeable {
   /** @return the address where this RPC server is listening for client requests */
   default InetSocketAddress getClientServerAddress() {
     return getInetSocketAddress();
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/ServerRpc.java b/ratis-server-api/src/main/java/org/apache/ratis/server/ServerRpc.java
new file mode 100644
index 0000000..6ad5eac
--- /dev/null
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/ServerRpc.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+/**
+ * A general server interface.
+ */
+public interface ServerRpc extends Closeable {
+  /** Start the RPC service. */
+  void start() throws IOException;
+
+  /** @return the address where this RPC server is listening to. */
+  InetSocketAddress getInetSocketAddress();
+}
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index 70e26af..2ac01ac 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -67,7 +67,7 @@
       return raftServer;
     }
 
-    void start() {
+    void start() throws IOException {
       dataStreamServer.getServerRpc().start();
     }
 
@@ -90,7 +90,7 @@
     return servers.get(0);
   }
 
-  void setup(RaftGroupId groupId, List<RaftPeer> peers, List<RaftServer> raftServers) {
+  void setup(RaftGroupId groupId, List<RaftPeer> peers, List<RaftServer> raftServers) throws Exception {
     raftGroup = RaftGroup.valueOf(groupId, peers);
     this.peers = peers;
     servers = new ArrayList<>(peers.size());