blob: e7816f06aacc8df3d839bf3f6af7a1c55ecf683b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.worker;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerEventListener;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConsumerImpl;
@Slf4j
public class LeaderService implements AutoCloseable, ConsumerEventListener {
private final String consumerName;
private final FunctionAssignmentTailer functionAssignmentTailer;
private final ErrorNotifier errorNotifier;
private final SchedulerManager schedulerManager;
private final FunctionRuntimeManager functionRuntimeManager;
private final FunctionMetaDataManager functionMetaDataManager;
private final MembershipManager membershipManager;
private ConsumerImpl<byte[]> consumer;
private final WorkerConfig workerConfig;
private final PulsarClient pulsarClient;
private volatile boolean isLeader = false;
static final String COORDINATION_TOPIC_SUBSCRIPTION = "participants";
private static final String WORKER_IDENTIFIER = "id";
public LeaderService(WorkerService workerService,
PulsarClient pulsarClient,
FunctionAssignmentTailer functionAssignmentTailer,
SchedulerManager schedulerManager,
FunctionRuntimeManager functionRuntimeManager,
FunctionMetaDataManager functionMetaDataManager,
MembershipManager membershipManager,
ErrorNotifier errorNotifier) {
this.workerConfig = workerService.getWorkerConfig();
this.pulsarClient = pulsarClient;
this.functionAssignmentTailer = functionAssignmentTailer;
this.schedulerManager = schedulerManager;
this.functionRuntimeManager = functionRuntimeManager;
this.functionMetaDataManager = functionMetaDataManager;
this.membershipManager = membershipManager;
this.errorNotifier = errorNotifier;
consumerName = String.format(
"%s:%s:%d",
workerConfig.getWorkerId(),
workerConfig.getWorkerHostname(),
workerConfig.getTlsEnabled() ? workerConfig.getWorkerPortTls() : workerConfig.getWorkerPort()
);
}
public void start() throws PulsarClientException {
// the leaders service is using a `coordination` topic for leader election.
// we don't produce any messages into this topic, we only use the `failover` subscription
// to elect an active consumer as the leader worker. The leader worker will be responsible
// for scheduling snapshots for FMT and doing task assignment.
consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer()
.topic(workerConfig.getClusterCoordinationTopic())
.subscriptionName(COORDINATION_TOPIC_SUBSCRIPTION)
.subscriptionType(SubscriptionType.Failover)
.consumerEventListener(this)
.property(WORKER_IDENTIFIER, consumerName)
.consumerName(consumerName)
.subscribe();
}
@Override
public void becameActive(Consumer<?> consumer, int partitionId) {
synchronized (this) {
if (isLeader) {
return;
}
log.info("Worker {} became the leader.", consumerName);
try {
// Wait for worker to be initialized.
// We need to do this because LeaderService is started
// before FunctionMetadataManager and FunctionRuntimeManager is done initializing
functionMetaDataManager.getIsInitialized().get();
functionRuntimeManager.getIsInitialized().get();
// attempt to acquire exclusive publishers to both the metadata topic and assignments topic
// we should keep trying to acquire exclusive producers as long as we are still the leader
Supplier<Boolean> checkIsStillLeader = WorkerUtils.getIsStillLeaderSupplier(membershipManager,
workerConfig.getWorkerId());
Producer<byte[]> scheduleManagerExclusiveProducer = null;
Producer<byte[]> functionMetaDataManagerExclusiveProducer = null;
try {
scheduleManagerExclusiveProducer = schedulerManager.acquireExclusiveWrite(checkIsStillLeader);
functionMetaDataManagerExclusiveProducer = functionMetaDataManager
.acquireExclusiveWrite(checkIsStillLeader);
} catch (WorkerUtils.NotLeaderAnymore e) {
log.info("Worker {} is not leader anymore. Exiting becoming leader routine.", consumer);
if (scheduleManagerExclusiveProducer != null) {
scheduleManagerExclusiveProducer.close();
}
if (functionMetaDataManagerExclusiveProducer != null) {
functionMetaDataManagerExclusiveProducer.close();
}
return;
}
// make sure scheduler is initialized because this worker
// is the leader and may need to start computing and writing assignments
// also creates exclusive producer for assignment topic
schedulerManager.initialize(scheduleManagerExclusiveProducer);
// trigger read to the end of the topic and exit
// Since the leader can just update its in memory assignments cache directly
functionAssignmentTailer.triggerReadToTheEndAndExit().get();
functionAssignmentTailer.close();
// need to move function meta data manager into leader mode
functionMetaDataManager.acquireLeadership(functionMetaDataManagerExclusiveProducer);
isLeader = true;
} catch (Throwable th) {
log.error("Encountered error when initializing to become leader", th);
errorNotifier.triggerError(th);
}
}
// Once we become leader we need to schedule
schedulerManager.schedule();
}
@Override
public synchronized void becameInactive(Consumer<?> consumer, int partitionId) {
if (isLeader) {
log.info("Worker {} lost the leadership.", consumerName);
isLeader = false;
// when a worker has lost leadership it needs to start reading from the assignment topic again
try {
// stop scheduler manager since we are not the leader anymore
// will block if a schedule invocation is in process
schedulerManager.close();
// starting reading from assignment topic from the last published message of the scheduler
if (schedulerManager.getLastMessageProduced() == null) {
functionAssignmentTailer.start();
} else {
functionAssignmentTailer.startFromMessage(schedulerManager.getLastMessageProduced());
}
functionMetaDataManager.giveupLeadership();
} catch (Throwable th) {
log.error("Encountered error in routine when worker lost leadership", th);
errorNotifier.triggerError(th);
}
}
}
public boolean isLeader() {
return isLeader;
}
@Override
public void close() throws PulsarClientException {
if (consumer != null) {
consumer.close();
}
}
}