[FLINK-24213][qs] Use single lock in ServerConnection
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ServerConnection.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ServerConnection.java
index ebc8454..dbeafcb 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ServerConnection.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ServerConnection.java
@@ -50,7 +50,7 @@
 final class ServerConnection<REQ extends MessageBody, RESP extends MessageBody> {
     private static final Logger LOG = LoggerFactory.getLogger(ServerConnection.class);
 
-    private final Object connectionLock = new Object();
+    private final Object connectionLock;
 
     @GuardedBy("connectionLock")
     private InternalConnection<REQ, RESP> internalConnection;
@@ -60,7 +60,8 @@
 
     private final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
 
-    private ServerConnection(InternalConnection<REQ, RESP> internalConnection) {
+    private ServerConnection(Object lock, InternalConnection<REQ, RESP> internalConnection) {
+        this.connectionLock = lock;
         this.internalConnection = internalConnection;
         forwardCloseFuture();
     }
@@ -119,11 +120,14 @@
                     final String clientName,
                     final MessageSerializer<REQ, RESP> serializer,
                     final KvStateRequestStats stats) {
+        final Object lock = new Object();
+
         return new ServerConnection<>(
+                lock,
                 new PendingConnection<>(
                         channel ->
                                 new EstablishedConnection<>(
-                                        clientName, serializer, channel, stats)));
+                                        lock, clientName, serializer, channel, stats)));
     }
 
     interface InternalConnection<REQ, RESP> {
@@ -280,7 +284,7 @@
     private static class EstablishedConnection<REQ extends MessageBody, RESP extends MessageBody>
             implements ClientHandlerCallback<RESP>, InternalConnection<REQ, RESP> {
 
-        private final Object lock = new Object();
+        private final Object lock;
 
         /** The actual TCP channel. */
         private final Channel channel;
@@ -307,11 +311,13 @@
          * @param channel The actual TCP channel
          */
         EstablishedConnection(
+                final Object lock,
                 final String clientName,
                 final MessageSerializer<REQ, RESP> serializer,
                 final Channel channel,
                 final KvStateRequestStats stats) {
 
+            this.lock = lock;
             this.channel = Preconditions.checkNotNull(channel);
 
             // Add the client handler with the callback