| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.pulsar.metadata.coordination.impl; |
| |
| import com.fasterxml.jackson.databind.type.TypeFactory; |
| import com.google.common.annotations.VisibleForTesting; |
| import java.util.EnumSet; |
| import java.util.Optional; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.CompletionException; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.ScheduledFuture; |
| import java.util.concurrent.TimeUnit; |
| import java.util.function.Consumer; |
| import lombok.extern.slf4j.Slf4j; |
| import org.apache.bookkeeper.common.concurrent.FutureUtils; |
| import org.apache.bookkeeper.common.util.SafeRunnable; |
| import org.apache.pulsar.metadata.api.GetResult; |
| import org.apache.pulsar.metadata.api.MetadataCache; |
| import org.apache.pulsar.metadata.api.MetadataCacheConfig; |
| import org.apache.pulsar.metadata.api.MetadataSerde; |
| import org.apache.pulsar.metadata.api.MetadataStoreException; |
| import org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyClosedException; |
| import org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException; |
| import org.apache.pulsar.metadata.api.Notification; |
| import org.apache.pulsar.metadata.api.NotificationType; |
| import org.apache.pulsar.metadata.api.coordination.LeaderElection; |
| import org.apache.pulsar.metadata.api.coordination.LeaderElectionState; |
| import org.apache.pulsar.metadata.api.extended.CreateOption; |
| import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; |
| import org.apache.pulsar.metadata.api.extended.SessionEvent; |
| import org.apache.pulsar.metadata.cache.impl.JSONMetadataSerdeSimpleType; |
| |
| @Slf4j |
| class LeaderElectionImpl<T> implements LeaderElection<T> { |
| private final String path; |
| private final MetadataSerde<T> serde; |
| private final MetadataStoreExtended store; |
| private final MetadataCache<T> cache; |
| private final Consumer<LeaderElectionState> stateChangesListener; |
| private final ScheduledFuture<?> updateCachedValueFuture; |
| |
| private LeaderElectionState leaderElectionState; |
| private Optional<Long> version = Optional.empty(); |
| private Optional<T> proposedValue; |
| |
| private final ScheduledExecutorService executor; |
| |
| private enum InternalState { |
| Init, ElectionInProgress, LeaderIsPresent, Closed |
| } |
| |
| private InternalState internalState; |
| |
| private static final int LEADER_ELECTION_RETRY_DELAY_SECONDS = 5; |
| |
| LeaderElectionImpl(MetadataStoreExtended store, Class<T> clazz, String path, |
| Consumer<LeaderElectionState> stateChangesListener, |
| ScheduledExecutorService executor) { |
| this.path = path; |
| this.serde = new JSONMetadataSerdeSimpleType<>(TypeFactory.defaultInstance().constructSimpleType(clazz, null)); |
| this.store = store; |
| MetadataCacheConfig metadataCacheConfig = MetadataCacheConfig.builder() |
| .expireAfterWriteMillis(-1L) |
| .build(); |
| this.cache = store.getMetadataCache(clazz, metadataCacheConfig); |
| this.leaderElectionState = LeaderElectionState.NoLeader; |
| this.internalState = InternalState.Init; |
| this.stateChangesListener = stateChangesListener; |
| this.executor = executor; |
| |
| store.registerListener(this::handlePathNotification); |
| store.registerSessionListener(this::handleSessionNotification); |
| updateCachedValueFuture = executor.scheduleWithFixedDelay(SafeRunnable.safeRun(this::getLeaderValue), |
| metadataCacheConfig.getRefreshAfterWriteMillis() / 2, |
| metadataCacheConfig.getRefreshAfterWriteMillis(), TimeUnit.MILLISECONDS); |
| } |
| |
| @Override |
| public synchronized CompletableFuture<LeaderElectionState> elect(T proposedValue) { |
| if (leaderElectionState != LeaderElectionState.NoLeader) { |
| return CompletableFuture.completedFuture(leaderElectionState); |
| } |
| |
| this.proposedValue = Optional.of(proposedValue); |
| return elect(); |
| } |
| |
| private synchronized CompletableFuture<LeaderElectionState> elect() { |
| // First check if there's already a leader elected |
| internalState = InternalState.ElectionInProgress; |
| return store.get(path).thenCompose(optLock -> { |
| if (optLock.isPresent()) { |
| return handleExistingLeaderValue(optLock.get()); |
| } else { |
| return tryToBecomeLeader(); |
| } |
| }).thenCompose(leaderElectionState -> { |
| // make sure that the cache contains the current leader |
| // so that getLeaderValueIfPresent works on all brokers |
| cache.refresh(path); |
| return cache.get(path) |
| .thenApply(__ -> leaderElectionState); |
| }); |
| } |
| |
| private synchronized CompletableFuture<LeaderElectionState> handleExistingLeaderValue(GetResult res) { |
| T existingValue; |
| try { |
| existingValue = serde.deserialize(path, res.getValue(), res.getStat()); |
| } catch (Throwable t) { |
| return FutureUtils.exception(t); |
| } |
| |
| if (existingValue.equals(proposedValue.orElse(null))) { |
| // If the value is the same as our proposed value, it means this instance was the leader at some |
| // point before. The existing value can either be for this same session or for a previous one. |
| if (res.getStat().isCreatedBySelf()) { |
| // The value is still valid because it was created in the same session |
| changeState(LeaderElectionState.Leading); |
| } else { |
| // Since the value was created in a different session, it might be expiring. We need to delete it |
| // and try the election again. |
| return store.delete(path, Optional.of(res.getStat().getVersion())) |
| .thenCompose(__ -> tryToBecomeLeader()); |
| } |
| } else if (res.getStat().isCreatedBySelf()) { |
| // The existing value is different but was created from the same session |
| return store.delete(path, Optional.of(res.getStat().getVersion())) |
| .thenCompose(__ -> tryToBecomeLeader()); |
| } |
| |
| // If the existing value is different, it means there's already another leader |
| changeState(LeaderElectionState.Following); |
| return CompletableFuture.completedFuture(LeaderElectionState.Following); |
| } |
| |
| private synchronized void changeState(LeaderElectionState les) { |
| internalState = InternalState.LeaderIsPresent; |
| if (this.leaderElectionState != les) { |
| this.leaderElectionState = les; |
| try { |
| stateChangesListener.accept(leaderElectionState); |
| } catch (Throwable t) { |
| log.warn("Exception in state change listener", t); |
| } |
| } |
| } |
| |
| private synchronized CompletableFuture<LeaderElectionState> tryToBecomeLeader() { |
| byte[] payload; |
| try { |
| payload = serde.serialize(path, proposedValue.get()); |
| } catch (Throwable t) { |
| return FutureUtils.exception(t); |
| } |
| |
| CompletableFuture<LeaderElectionState> result = new CompletableFuture<>(); |
| store.put(path, payload, Optional.of(-1L), EnumSet.of(CreateOption.Ephemeral)) |
| .thenAccept(stat -> { |
| synchronized (LeaderElectionImpl.this) { |
| if (internalState == InternalState.ElectionInProgress) { |
| // Do a get() in order to force a notification later, if the z-node disappears |
| cache.get(path) |
| .thenRun(() -> { |
| synchronized (LeaderElectionImpl.this) { |
| log.info("Acquired leadership on {}", path); |
| internalState = InternalState.LeaderIsPresent; |
| if (leaderElectionState != LeaderElectionState.Leading) { |
| leaderElectionState = LeaderElectionState.Leading; |
| try { |
| stateChangesListener.accept(leaderElectionState); |
| } catch (Throwable t) { |
| log.warn("Exception in state change listener", t); |
| } |
| } |
| result.complete(leaderElectionState); |
| } |
| }).exceptionally(ex -> { |
| // We fail to do the get(), so clean up the leader election fail the whole |
| // operation |
| store.delete(path, Optional.of(stat.getVersion())) |
| .thenRun(() -> result.completeExceptionally(ex)) |
| .exceptionally(ex2 -> { |
| result.completeExceptionally(ex2); |
| return null; |
| }); |
| return null; |
| }); |
| } else { |
| // LeaderElection was closed in between. Release the lock asynchronously |
| store.delete(path, Optional.of(stat.getVersion())) |
| .thenRun(() -> result.completeExceptionally( |
| new AlreadyClosedException("The leader election was already closed"))) |
| .exceptionally(ex -> { |
| result.completeExceptionally(ex); |
| return null; |
| }); |
| } |
| } |
| }).exceptionally(ex -> { |
| if (ex.getCause() instanceof BadVersionException) { |
| // There was a conflict between 2 participants trying to become leaders at same time. Retry |
| // to fetch info on new leader. |
| |
| elect() |
| .thenAccept(lse -> result.complete(lse)) |
| .exceptionally(ex2 -> { |
| result.completeExceptionally(ex2); |
| return null; |
| }); |
| } else { |
| result.completeExceptionally(ex.getCause()); |
| } |
| return null; |
| }); |
| |
| return result; |
| } |
| |
| @Override |
| public void close() throws Exception { |
| updateCachedValueFuture.cancel(true); |
| try { |
| asyncClose().join(); |
| } catch (CompletionException e) { |
| throw MetadataStoreException.unwrap(e); |
| } |
| } |
| |
| @Override |
| public synchronized CompletableFuture<Void> asyncClose() { |
| if (internalState == InternalState.Closed) { |
| return CompletableFuture.completedFuture(null); |
| } |
| |
| internalState = InternalState.Closed; |
| |
| executor.shutdownNow(); |
| |
| if (leaderElectionState != LeaderElectionState.Leading) { |
| return CompletableFuture.completedFuture(null); |
| } |
| |
| return store.delete(path, version); |
| } |
| |
| @Override |
| public synchronized LeaderElectionState getState() { |
| return leaderElectionState; |
| } |
| |
| @Override |
| public CompletableFuture<Optional<T>> getLeaderValue() { |
| return cache.get(path); |
| } |
| |
| @Override |
| public Optional<T> getLeaderValueIfPresent() { |
| return cache.getIfCached(path); |
| } |
| |
| private void handleSessionNotification(SessionEvent event) { |
| // Ensure we're only processing one session event at a time. |
| executor.execute(SafeRunnable.safeRun(() -> { |
| if (event == SessionEvent.SessionReestablished) { |
| log.info("Revalidating leadership for {}", path); |
| |
| try { |
| LeaderElectionState les = elect().get(); |
| log.info("Resynced leadership for {} - State: {}", path, les); |
| } catch (ExecutionException | InterruptedException e) { |
| log.warn("Failure when processing session event", e); |
| } |
| } |
| })); |
| } |
| |
| private void handlePathNotification(Notification notification) { |
| if (!path.equals(notification.getPath())) { |
| // Ignore notifications we don't care about |
| return; |
| } |
| |
| synchronized (this) { |
| if (internalState != InternalState.LeaderIsPresent) { |
| // Ignoring notification since we're not trying to become leader |
| return; |
| } |
| |
| if (notification.getType() == NotificationType.Deleted) { |
| if (leaderElectionState == LeaderElectionState.Leading) { |
| // We've lost the leadership, switch to follower mode |
| log.warn("Leadership released for {}", path); |
| } |
| |
| leaderElectionState = LeaderElectionState.NoLeader; |
| |
| if (proposedValue.isPresent()) { |
| elect() |
| .exceptionally(ex -> { |
| log.warn("Leader election for path {} has failed", path, ex); |
| synchronized (LeaderElectionImpl.this) { |
| try { |
| stateChangesListener.accept(leaderElectionState); |
| } catch (Throwable t) { |
| log.warn("Exception in state change listener", t); |
| } |
| |
| if (internalState != InternalState.Closed) { |
| executor.schedule(() -> { |
| log.info("Retrying Leader election for path {}", path); |
| elect(); |
| }, LEADER_ELECTION_RETRY_DELAY_SECONDS, TimeUnit.SECONDS); |
| } |
| } |
| return null; |
| }); |
| } |
| } |
| } |
| } |
| |
| @VisibleForTesting |
| protected ScheduledExecutorService getSchedulerExecutor() { |
| return executor; |
| } |
| } |