blob: 8e1d2cd4aa756717b52ddb21cdb4e28edb2e92d1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.metadata.coordination.impl;
import com.fasterxml.jackson.databind.type.TypeFactory;
import com.google.common.annotations.VisibleForTesting;
import java.util.EnumSet;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.common.util.SafeRunnable;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataCacheConfig;
import org.apache.pulsar.metadata.api.MetadataSerde;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyClosedException;
import org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.coordination.LeaderElection;
import org.apache.pulsar.metadata.api.coordination.LeaderElectionState;
import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.api.extended.SessionEvent;
import org.apache.pulsar.metadata.cache.impl.JSONMetadataSerdeSimpleType;
@Slf4j
class LeaderElectionImpl<T> implements LeaderElection<T> {
private final String path;
private final MetadataSerde<T> serde;
private final MetadataStoreExtended store;
private final MetadataCache<T> cache;
private final Consumer<LeaderElectionState> stateChangesListener;
private final ScheduledFuture<?> updateCachedValueFuture;
private LeaderElectionState leaderElectionState;
private Optional<Long> version = Optional.empty();
private Optional<T> proposedValue;
private final ScheduledExecutorService executor;
private enum InternalState {
Init, ElectionInProgress, LeaderIsPresent, Closed
}
private InternalState internalState;
private static final int LEADER_ELECTION_RETRY_DELAY_SECONDS = 5;
LeaderElectionImpl(MetadataStoreExtended store, Class<T> clazz, String path,
Consumer<LeaderElectionState> stateChangesListener,
ScheduledExecutorService executor) {
this.path = path;
this.serde = new JSONMetadataSerdeSimpleType<>(TypeFactory.defaultInstance().constructSimpleType(clazz, null));
this.store = store;
MetadataCacheConfig metadataCacheConfig = MetadataCacheConfig.builder()
.expireAfterWriteMillis(-1L)
.build();
this.cache = store.getMetadataCache(clazz, metadataCacheConfig);
this.leaderElectionState = LeaderElectionState.NoLeader;
this.internalState = InternalState.Init;
this.stateChangesListener = stateChangesListener;
this.executor = executor;
store.registerListener(this::handlePathNotification);
store.registerSessionListener(this::handleSessionNotification);
updateCachedValueFuture = executor.scheduleWithFixedDelay(SafeRunnable.safeRun(this::getLeaderValue),
metadataCacheConfig.getRefreshAfterWriteMillis() / 2,
metadataCacheConfig.getRefreshAfterWriteMillis(), TimeUnit.MILLISECONDS);
}
@Override
public synchronized CompletableFuture<LeaderElectionState> elect(T proposedValue) {
if (leaderElectionState != LeaderElectionState.NoLeader) {
return CompletableFuture.completedFuture(leaderElectionState);
}
this.proposedValue = Optional.of(proposedValue);
return elect();
}
private synchronized CompletableFuture<LeaderElectionState> elect() {
// First check if there's already a leader elected
internalState = InternalState.ElectionInProgress;
return store.get(path).thenCompose(optLock -> {
if (optLock.isPresent()) {
return handleExistingLeaderValue(optLock.get());
} else {
return tryToBecomeLeader();
}
}).thenCompose(leaderElectionState -> {
// make sure that the cache contains the current leader
// so that getLeaderValueIfPresent works on all brokers
cache.refresh(path);
return cache.get(path)
.thenApply(__ -> leaderElectionState);
});
}
private synchronized CompletableFuture<LeaderElectionState> handleExistingLeaderValue(GetResult res) {
T existingValue;
try {
existingValue = serde.deserialize(path, res.getValue(), res.getStat());
} catch (Throwable t) {
return FutureUtils.exception(t);
}
if (existingValue.equals(proposedValue.orElse(null))) {
// If the value is the same as our proposed value, it means this instance was the leader at some
// point before. The existing value can either be for this same session or for a previous one.
if (res.getStat().isCreatedBySelf()) {
// The value is still valid because it was created in the same session
changeState(LeaderElectionState.Leading);
} else {
// Since the value was created in a different session, it might be expiring. We need to delete it
// and try the election again.
return store.delete(path, Optional.of(res.getStat().getVersion()))
.thenCompose(__ -> tryToBecomeLeader());
}
} else if (res.getStat().isCreatedBySelf()) {
// The existing value is different but was created from the same session
return store.delete(path, Optional.of(res.getStat().getVersion()))
.thenCompose(__ -> tryToBecomeLeader());
}
// If the existing value is different, it means there's already another leader
changeState(LeaderElectionState.Following);
return CompletableFuture.completedFuture(LeaderElectionState.Following);
}
private synchronized void changeState(LeaderElectionState les) {
internalState = InternalState.LeaderIsPresent;
if (this.leaderElectionState != les) {
this.leaderElectionState = les;
try {
stateChangesListener.accept(leaderElectionState);
} catch (Throwable t) {
log.warn("Exception in state change listener", t);
}
}
}
private synchronized CompletableFuture<LeaderElectionState> tryToBecomeLeader() {
byte[] payload;
try {
payload = serde.serialize(path, proposedValue.get());
} catch (Throwable t) {
return FutureUtils.exception(t);
}
CompletableFuture<LeaderElectionState> result = new CompletableFuture<>();
store.put(path, payload, Optional.of(-1L), EnumSet.of(CreateOption.Ephemeral))
.thenAccept(stat -> {
synchronized (LeaderElectionImpl.this) {
if (internalState == InternalState.ElectionInProgress) {
// Do a get() in order to force a notification later, if the z-node disappears
cache.get(path)
.thenRun(() -> {
synchronized (LeaderElectionImpl.this) {
log.info("Acquired leadership on {}", path);
internalState = InternalState.LeaderIsPresent;
if (leaderElectionState != LeaderElectionState.Leading) {
leaderElectionState = LeaderElectionState.Leading;
try {
stateChangesListener.accept(leaderElectionState);
} catch (Throwable t) {
log.warn("Exception in state change listener", t);
}
}
result.complete(leaderElectionState);
}
}).exceptionally(ex -> {
// We fail to do the get(), so clean up the leader election fail the whole
// operation
store.delete(path, Optional.of(stat.getVersion()))
.thenRun(() -> result.completeExceptionally(ex))
.exceptionally(ex2 -> {
result.completeExceptionally(ex2);
return null;
});
return null;
});
} else {
// LeaderElection was closed in between. Release the lock asynchronously
store.delete(path, Optional.of(stat.getVersion()))
.thenRun(() -> result.completeExceptionally(
new AlreadyClosedException("The leader election was already closed")))
.exceptionally(ex -> {
result.completeExceptionally(ex);
return null;
});
}
}
}).exceptionally(ex -> {
if (ex.getCause() instanceof BadVersionException) {
// There was a conflict between 2 participants trying to become leaders at same time. Retry
// to fetch info on new leader.
elect()
.thenAccept(lse -> result.complete(lse))
.exceptionally(ex2 -> {
result.completeExceptionally(ex2);
return null;
});
} else {
result.completeExceptionally(ex.getCause());
}
return null;
});
return result;
}
@Override
public void close() throws Exception {
updateCachedValueFuture.cancel(true);
try {
asyncClose().join();
} catch (CompletionException e) {
throw MetadataStoreException.unwrap(e);
}
}
@Override
public synchronized CompletableFuture<Void> asyncClose() {
if (internalState == InternalState.Closed) {
return CompletableFuture.completedFuture(null);
}
internalState = InternalState.Closed;
executor.shutdownNow();
if (leaderElectionState != LeaderElectionState.Leading) {
return CompletableFuture.completedFuture(null);
}
return store.delete(path, version);
}
@Override
public synchronized LeaderElectionState getState() {
return leaderElectionState;
}
@Override
public CompletableFuture<Optional<T>> getLeaderValue() {
return cache.get(path);
}
@Override
public Optional<T> getLeaderValueIfPresent() {
return cache.getIfCached(path);
}
private void handleSessionNotification(SessionEvent event) {
// Ensure we're only processing one session event at a time.
executor.execute(SafeRunnable.safeRun(() -> {
if (event == SessionEvent.SessionReestablished) {
log.info("Revalidating leadership for {}", path);
try {
LeaderElectionState les = elect().get();
log.info("Resynced leadership for {} - State: {}", path, les);
} catch (ExecutionException | InterruptedException e) {
log.warn("Failure when processing session event", e);
}
}
}));
}
private void handlePathNotification(Notification notification) {
if (!path.equals(notification.getPath())) {
// Ignore notifications we don't care about
return;
}
synchronized (this) {
if (internalState != InternalState.LeaderIsPresent) {
// Ignoring notification since we're not trying to become leader
return;
}
if (notification.getType() == NotificationType.Deleted) {
if (leaderElectionState == LeaderElectionState.Leading) {
// We've lost the leadership, switch to follower mode
log.warn("Leadership released for {}", path);
}
leaderElectionState = LeaderElectionState.NoLeader;
if (proposedValue.isPresent()) {
elect()
.exceptionally(ex -> {
log.warn("Leader election for path {} has failed", path, ex);
synchronized (LeaderElectionImpl.this) {
try {
stateChangesListener.accept(leaderElectionState);
} catch (Throwable t) {
log.warn("Exception in state change listener", t);
}
if (internalState != InternalState.Closed) {
executor.schedule(() -> {
log.info("Retrying Leader election for path {}", path);
elect();
}, LEADER_ELECTION_RETRY_DELAY_SECONDS, TimeUnit.SECONDS);
}
}
return null;
});
}
}
}
}
@VisibleForTesting
protected ScheduledExecutorService getSchedulerExecutor() {
return executor;
}
}