[fix][broker] Fix the bug that elected leader thinks it's a follower (#23138)

diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java
index aa60608..ab35eb7 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java
@@ -134,8 +134,11 @@
             // If the value is the same as our proposed value, it means this instance was the leader at some
             // point before. The existing value can either be for this same session or for a previous one.
             if (res.getStat().isCreatedBySelf()) {
+                log.info("Keeping the existing value {} for {} as it's from the same session stat={}", existingValue,
+                        path, res.getStat());
                 // The value is still valid because it was created in the same session
                 changeState(LeaderElectionState.Leading);
+                return CompletableFuture.completedFuture(LeaderElectionState.Leading);
             } else {
                 log.info("Conditionally deleting existing equals value {} for {} because it's not created in the "
                         + "current session. stat={}", existingValue, path, res.getStat());
@@ -271,7 +274,13 @@
             return CompletableFuture.completedFuture(null);
         }
 
-        return store.delete(path, version);
+        return store.delete(path, version)
+                .thenAccept(__ -> {
+                            synchronized (LeaderElectionImpl.this) {
+                                leaderElectionState = LeaderElectionState.NoLeader;
+                            }
+                        }
+                );
     }
 
     @Override
@@ -292,8 +301,8 @@
     private void handleSessionNotification(SessionEvent event) {
         // Ensure we're only processing one session event at a time.
         sequencer.sequential(() -> FutureUtil.composeAsync(() -> {
-            if (event == SessionEvent.SessionReestablished) {
-                log.info("Revalidating leadership for {}", path);
+            if (event == SessionEvent.Reconnected || event == SessionEvent.SessionReestablished) {
+                log.info("Revalidating leadership for {}, event:{}", path, event);
                 return elect().thenAccept(leaderState -> {
                     log.info("Resynced leadership for {} - State: {}", path, leaderState);
                 }).exceptionally(ex -> {
diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LeaderElectionTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LeaderElectionTest.java
index 6b4f74a..4b48f3c 100644
--- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LeaderElectionTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LeaderElectionTest.java
@@ -69,6 +69,8 @@
 
         leaderElection.close();
 
+        assertEquals(leaderElection.getState(), LeaderElectionState.NoLeader);
+
         assertEquals(cache.get("/my/leader-election").join(), Optional.empty());
     }
 
diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java
index 36cb0f1..02d65fd 100644
--- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java
@@ -27,6 +27,7 @@
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
+import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
 import org.apache.pulsar.metadata.api.coordination.CoordinationService;
 import org.apache.pulsar.metadata.api.coordination.LeaderElection;
@@ -180,4 +181,58 @@
                 .untilAsserted(()-> assertEquals(le1.getState(),LeaderElectionState.Leading));
         assertTrue(store.get(path).join().isPresent());
     }
+
+
+    @Test
+    public void testElectAfterReconnected() throws Exception {
+        //  ---  init
+        @Cleanup
+        MetadataStoreExtended store = MetadataStoreExtended.create(zks.getConnectionString(),
+                MetadataStoreConfig.builder()
+                        .sessionTimeoutMillis(2_000)
+                        .build());
+
+
+        BlockingQueue<SessionEvent> sessionEvents = new LinkedBlockingQueue<>();
+        store.registerSessionListener(sessionEvents::add);
+        BlockingQueue<LeaderElectionState> leaderElectionEvents = new LinkedBlockingQueue<>();
+        String path = newKey();
+
+        @Cleanup
+        CoordinationService coordinationService = new CoordinationServiceImpl(store);
+        @Cleanup
+        LeaderElection<String> le1 = coordinationService.getLeaderElection(String.class, path,
+                leaderElectionEvents::add);
+
+        // --- test manual elect
+        String proposed = "value-1";
+        le1.elect(proposed).join();
+        assertEquals(le1.getState(), LeaderElectionState.Leading);
+        LeaderElectionState les = leaderElectionEvents.poll(5, TimeUnit.SECONDS);
+        assertEquals(les, LeaderElectionState.Leading);
+
+
+        // simulate no leader state
+        FieldUtils.writeDeclaredField(le1, "leaderElectionState", LeaderElectionState.NoLeader, true);
+
+        // reconnect
+        zks.stop();
+
+        SessionEvent e = sessionEvents.poll(5, TimeUnit.SECONDS);
+        assertEquals(e, SessionEvent.ConnectionLost);
+
+        zks.start();
+
+
+        // --- test  le1 can be leader
+        e = sessionEvents.poll(10, TimeUnit.SECONDS);
+        assertEquals(e, SessionEvent.Reconnected);
+        Awaitility.await().atMost(Duration.ofSeconds(15))
+                .untilAsserted(()-> {
+                    assertEquals(le1.getState(),LeaderElectionState.Leading);
+                }); // reacquire leadership
+
+
+        assertTrue(store.get(path).join().isPresent());
+    }
 }