[fix][broker] Fix the bug that elected leader thinks it's a follower (#23138)
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java
index aa60608..ab35eb7 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java
@@ -134,8 +134,11 @@
// If the value is the same as our proposed value, it means this instance was the leader at some
// point before. The existing value can either be for this same session or for a previous one.
if (res.getStat().isCreatedBySelf()) {
+ log.info("Keeping the existing value {} for {} as it's from the same session stat={}", existingValue,
+ path, res.getStat());
// The value is still valid because it was created in the same session
changeState(LeaderElectionState.Leading);
+ return CompletableFuture.completedFuture(LeaderElectionState.Leading);
} else {
log.info("Conditionally deleting existing equals value {} for {} because it's not created in the "
+ "current session. stat={}", existingValue, path, res.getStat());
@@ -271,7 +274,13 @@
return CompletableFuture.completedFuture(null);
}
- return store.delete(path, version);
+ return store.delete(path, version)
+ .thenAccept(__ -> {
+ synchronized (LeaderElectionImpl.this) {
+ leaderElectionState = LeaderElectionState.NoLeader;
+ }
+ }
+ );
}
@Override
@@ -292,8 +301,8 @@
private void handleSessionNotification(SessionEvent event) {
// Ensure we're only processing one session event at a time.
sequencer.sequential(() -> FutureUtil.composeAsync(() -> {
- if (event == SessionEvent.SessionReestablished) {
- log.info("Revalidating leadership for {}", path);
+ if (event == SessionEvent.Reconnected || event == SessionEvent.SessionReestablished) {
+ log.info("Revalidating leadership for {}, event:{}", path, event);
return elect().thenAccept(leaderState -> {
log.info("Resynced leadership for {} - State: {}", path, leaderState);
}).exceptionally(ex -> {
diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LeaderElectionTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LeaderElectionTest.java
index 6b4f74a..4b48f3c 100644
--- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LeaderElectionTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LeaderElectionTest.java
@@ -69,6 +69,8 @@
leaderElection.close();
+ assertEquals(leaderElection.getState(), LeaderElectionState.NoLeader);
+
assertEquals(cache.get("/my/leader-election").join(), Optional.empty());
}
diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java
index 36cb0f1..02d65fd 100644
--- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java
@@ -27,6 +27,7 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
+import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.coordination.CoordinationService;
import org.apache.pulsar.metadata.api.coordination.LeaderElection;
@@ -180,4 +181,58 @@
.untilAsserted(()-> assertEquals(le1.getState(),LeaderElectionState.Leading));
assertTrue(store.get(path).join().isPresent());
}
+
+
+ @Test
+ public void testElectAfterReconnected() throws Exception {
+ // --- init
+ @Cleanup
+ MetadataStoreExtended store = MetadataStoreExtended.create(zks.getConnectionString(),
+ MetadataStoreConfig.builder()
+ .sessionTimeoutMillis(2_000)
+ .build());
+
+
+ BlockingQueue<SessionEvent> sessionEvents = new LinkedBlockingQueue<>();
+ store.registerSessionListener(sessionEvents::add);
+ BlockingQueue<LeaderElectionState> leaderElectionEvents = new LinkedBlockingQueue<>();
+ String path = newKey();
+
+ @Cleanup
+ CoordinationService coordinationService = new CoordinationServiceImpl(store);
+ @Cleanup
+ LeaderElection<String> le1 = coordinationService.getLeaderElection(String.class, path,
+ leaderElectionEvents::add);
+
+ // --- test manual elect
+ String proposed = "value-1";
+ le1.elect(proposed).join();
+ assertEquals(le1.getState(), LeaderElectionState.Leading);
+ LeaderElectionState les = leaderElectionEvents.poll(5, TimeUnit.SECONDS);
+ assertEquals(les, LeaderElectionState.Leading);
+
+
+ // simulate no leader state
+ FieldUtils.writeDeclaredField(le1, "leaderElectionState", LeaderElectionState.NoLeader, true);
+
+ // reconnect
+ zks.stop();
+
+ SessionEvent e = sessionEvents.poll(5, TimeUnit.SECONDS);
+ assertEquals(e, SessionEvent.ConnectionLost);
+
+ zks.start();
+
+
+ // --- test le1 can be leader
+ e = sessionEvents.poll(10, TimeUnit.SECONDS);
+ assertEquals(e, SessionEvent.Reconnected);
+ Awaitility.await().atMost(Duration.ofSeconds(15))
+ .untilAsserted(()-> {
+ assertEquals(le1.getState(),LeaderElectionState.Leading);
+ }); // reacquire leadership
+
+
+ assertTrue(store.get(path).join().isPresent());
+ }
}