[SSHD-1289] Fix lock handling in KeyExchangeMessageHandler

Make sure that a thread does not try to acquire the write lock if
it already holds the read lock. This could happen if a write is not
enqueued and there is an exception during writing, and we then try
to close the session on the same thread.

The read/write lock is used for three purposes: first, it gives the
flushing thread trying to empty the queue of pending packets priority
over other threads trying to enqueue more packets, and second, it is
held during writeOrEnqueue() while writing a packet directly to prevent
that the KEX state changes between being checked and the write being
done, and third, to prevent that the KEX state changes asynchronously
while the flushing thread is checking it. The read/write lock itself
does not serve to ensure mutual exclusion on the KEX state itself.

These three functions can also be fulfilled if update() is executed
when only the read lock is held. If a thread in update() holds the read
lock, this can only occur if it wrote the buffer directly in
writeOrEnqueue(), in which case it is fine to proceed, and the flushing
thread for sure is not in its critical region where it holds the write
lock. Otherwise, any other thread either is the flushing thread and
holds the write lock already, or it's a thread not holding the lock at
all. In both cases it is fine to acquire the write lock.
diff --git a/CHANGES.md b/CHANGES.md
index 57ff237..842a961 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -22,3 +22,4 @@
 
 * [SSHD-1281](https://issues.apache.org/jira/browse/SSHD-1281) ClientSession.auth().verify() is terminated with timeout
 * [SSHD-1285](https://issues.apache.org/jira/browse/SSHD-1285) 2.9.0 release broken on Java 8
+* [SSHD-1289](https://issues.apache.org/jira/browse/SSHD-1289) Deadlock during session exit
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/KeyExchangeMessageHandler.java b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/KeyExchangeMessageHandler.java
index c8f3437..398f508 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/KeyExchangeMessageHandler.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/KeyExchangeMessageHandler.java
@@ -23,10 +23,10 @@
 import java.security.GeneralSecurityException;
 import java.util.AbstractMap.SimpleImmutableEntry;
 import java.util.ArrayList;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Objects;
 import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -97,12 +97,12 @@
     /**
      * Queues up high-level packets written during an ongoing key exchange.
      */
-    protected final Queue<PendingWriteFuture> pendingPackets = new LinkedList<>();
+    protected final Queue<PendingWriteFuture> pendingPackets = new ConcurrentLinkedQueue<>();
 
     /**
      * Indicates that all pending packets have been flushed.
      */
-    protected boolean kexFlushed = true;
+    protected volatile boolean kexFlushed = true;
 
     /**
      * Never {@code null}. Used to block some threads when writing packets while pending packets are still being flushed
@@ -110,7 +110,7 @@
      * of a KEX a new future is installed, which is fulfilled at the end of the KEX once there are no more pending
      * packets to be flushed.
      */
-    protected DefaultKeyExchangeFuture kexFlushedFuture;
+    protected volatile DefaultKeyExchangeFuture kexFlushedFuture;
 
     /**
      * Creates a new {@link KeyExchangeMessageHandler} for the given {@code session}, using the given {@code Logger}.
@@ -127,20 +127,29 @@
     }
 
     public void updateState(Runnable update) {
-        lock.writeLock().lock();
-        try {
+        updateState(() -> {
             update.run();
-        } finally {
-            lock.writeLock().unlock();
-        }
+            return null;
+        });
     }
 
     public <V> V updateState(Supplier<V> update) {
-        lock.writeLock().lock();
+        boolean locked = false;
+        // If we already have 'lock' as a reader, don't try to get the write lock -- the flushing thread is blocked
+        // currently anyway, and lock promotion from a readlock to a writelock is not possible. Contention between
+        // multiple readers is the business of the caller!
+        //
+        // See also writeOrEnqueue() below.
+        if (lock.getReadHoldCount() == 0) {
+            lock.writeLock().lock();
+            locked = true;
+        }
         try {
             return update.get();
         } finally {
-            lock.writeLock().unlock();
+            if (locked) {
+                lock.writeLock().unlock();
+            }
         }
     }