[FLINK-24213][qs] Use single lock in ServerConnection
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ServerConnection.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ServerConnection.java
index ebc8454..dbeafcb 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ServerConnection.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ServerConnection.java
@@ -50,7 +50,7 @@
final class ServerConnection<REQ extends MessageBody, RESP extends MessageBody> {
private static final Logger LOG = LoggerFactory.getLogger(ServerConnection.class);
- private final Object connectionLock = new Object();
+ private final Object connectionLock;
@GuardedBy("connectionLock")
private InternalConnection<REQ, RESP> internalConnection;
@@ -60,7 +60,8 @@
private final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
- private ServerConnection(InternalConnection<REQ, RESP> internalConnection) {
+ private ServerConnection(Object lock, InternalConnection<REQ, RESP> internalConnection) {
+ this.connectionLock = lock;
this.internalConnection = internalConnection;
forwardCloseFuture();
}
@@ -119,11 +120,14 @@
final String clientName,
final MessageSerializer<REQ, RESP> serializer,
final KvStateRequestStats stats) {
+ final Object lock = new Object();
+
return new ServerConnection<>(
+ lock,
new PendingConnection<>(
channel ->
new EstablishedConnection<>(
- clientName, serializer, channel, stats)));
+ lock, clientName, serializer, channel, stats)));
}
interface InternalConnection<REQ, RESP> {
@@ -280,7 +284,7 @@
private static class EstablishedConnection<REQ extends MessageBody, RESP extends MessageBody>
implements ClientHandlerCallback<RESP>, InternalConnection<REQ, RESP> {
- private final Object lock = new Object();
+ private final Object lock;
/** The actual TCP channel. */
private final Channel channel;
@@ -307,11 +311,13 @@
* @param channel The actual TCP channel
*/
EstablishedConnection(
+ final Object lock,
final String clientName,
final MessageSerializer<REQ, RESP> serializer,
final Channel channel,
final KvStateRequestStats stats) {
+ this.lock = lock;
this.channel = Preconditions.checkNotNull(channel);
// Add the client handler with the callback