| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.pulsar.functions.worker; |
| |
| import static java.util.concurrent.TimeUnit.NANOSECONDS; |
| import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables; |
| import com.fasterxml.jackson.core.JsonProcessingException; |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.util.concurrent.ThreadFactoryBuilder; |
| import io.netty.util.concurrent.DefaultThreadFactory; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.Optional; |
| import java.util.Set; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.RejectedExecutionException; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.ThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.locks.Lock; |
| import java.util.concurrent.locks.ReentrantLock; |
| import java.util.function.Supplier; |
| import java.util.stream.Collectors; |
| import lombok.Builder; |
| import lombok.Data; |
| import lombok.Getter; |
| import lombok.Setter; |
| import lombok.extern.slf4j.Slf4j; |
| import lombok.val; |
| import org.apache.commons.lang3.tuple.ImmutablePair; |
| import org.apache.commons.lang3.tuple.Pair; |
| import org.apache.pulsar.client.admin.LongRunningProcessStatus; |
| import org.apache.pulsar.client.admin.PulsarAdmin; |
| import org.apache.pulsar.client.admin.PulsarAdminException; |
| import org.apache.pulsar.client.api.MessageId; |
| import org.apache.pulsar.client.api.Producer; |
| import org.apache.pulsar.client.api.PulsarClient; |
| import org.apache.pulsar.client.api.PulsarClientException; |
| import org.apache.pulsar.common.functions.WorkerInfo; |
| import org.apache.pulsar.common.util.ObjectMapperFactory; |
| import org.apache.pulsar.common.util.Reflections; |
| import org.apache.pulsar.functions.proto.Function; |
| import org.apache.pulsar.functions.proto.Function.Assignment; |
| import org.apache.pulsar.functions.proto.Function.FunctionDetails; |
| import org.apache.pulsar.functions.proto.Function.FunctionMetaData; |
| import org.apache.pulsar.functions.proto.Function.Instance; |
| import org.apache.pulsar.functions.utils.FunctionCommon; |
| import org.apache.pulsar.functions.worker.scheduler.IScheduler; |
| |
| /** |
| * The scheduler manager is used to compute scheduling of function instances |
| * Only the leader computes new schedulings and writes assignments to the assignment topic |
| * The lifecyle of this class is the following: |
| * 1. When worker becomes leader, this class with be initialized |
| * 2. When worker loses leadership, this class will be closed which |
| * also closes the worker's producer to the assignments topic |
| */ |
| @Slf4j |
| public class SchedulerManager implements AutoCloseable { |
| |
| private final WorkerConfig workerConfig; |
| private final ErrorNotifier errorNotifier; |
| private final WorkerStatsManager workerStatsManager; |
| private ThreadPoolExecutor executorService; |
| private final PulsarClient pulsarClient; |
| |
| @Setter |
| private FunctionMetaDataManager functionMetaDataManager; |
| |
| @Setter |
| private LeaderService leaderService; |
| |
| @Setter |
| private MembershipManager membershipManager; |
| |
| @Setter |
| private FunctionRuntimeManager functionRuntimeManager; |
| |
| private final IScheduler scheduler; |
| |
| private Producer<byte[]> exclusiveProducer; |
| |
| private ScheduledExecutorService scheduledExecutorService; |
| |
| private final PulsarAdmin admin; |
| |
| @Getter |
| private final Lock schedulerLock = new ReentrantLock(true); |
| |
| private volatile boolean isRunning = false; |
| |
| AtomicBoolean isCompactionNeeded = new AtomicBoolean(false); |
| private static final long DEFAULT_ADMIN_API_BACKOFF_SEC = 60; |
| public static final String HEARTBEAT_TENANT = "pulsar-function"; |
| public static final String HEARTBEAT_NAMESPACE = "heartbeat"; |
| |
| @Getter |
| private MessageId lastMessageProduced = null; |
| |
| private MessageId metadataTopicLastMessage = MessageId.earliest; |
| |
| private final AtomicBoolean rebalanceInProgress = new AtomicBoolean(false); |
| |
| private final AtomicBoolean drainInProgressFlag = new AtomicBoolean(false); |
| // The list of assignments moved due to the last drain op on a leader. Used in UTs, and debugging. |
| private List<Assignment> assignmentsMovedInLastDrain; |
| |
| // Possible status of a drain operation. |
| enum DrainOpStatus { |
| DrainNotInProgress, |
| DrainInProgress, |
| DrainCompleted |
| } |
| |
| // A map to hold the status of recent drain operations. |
| // It is of the form {workerId : DrainOpStatus}. |
| // Entries are added when a drain operation starts, and removed on a periodic (when the worker is no longer seen |
| // on a poll). |
| private final ConcurrentHashMap<String, DrainOpStatus> drainOpStatusMap = new ConcurrentHashMap<>(); |
| |
| public SchedulerManager(WorkerConfig workerConfig, |
| PulsarClient pulsarClient, |
| PulsarAdmin admin, |
| WorkerStatsManager workerStatsManager, |
| ErrorNotifier errorNotifier) { |
| this.workerConfig = workerConfig; |
| this.pulsarClient = pulsarClient; |
| this.admin = admin; |
| this.scheduler = Reflections.createInstance(workerConfig.getSchedulerClassName(), IScheduler.class, |
| Thread.currentThread().getContextClassLoader()); |
| this.workerStatsManager = workerStatsManager; |
| this.errorNotifier = errorNotifier; |
| } |
| |
| /** |
| * Acquires a exclusive producer. This method cannot return null. It can only return a valid exclusive producer |
| * or throw NotLeaderAnymore exception. |
| * |
| * @param isLeader if the worker is still the leader |
| * @return A valid exclusive producer |
| * @throws WorkerUtils.NotLeaderAnymore if the worker is no longer the leader. |
| */ |
| public Producer<byte[]> acquireExclusiveWrite(Supplier<Boolean> isLeader) throws WorkerUtils.NotLeaderAnymore { |
| // creates exclusive producer for assignment topic |
| return WorkerUtils.createExclusiveProducerWithRetry( |
| pulsarClient, |
| workerConfig.getFunctionAssignmentTopic(), |
| workerConfig.getWorkerId() + "-scheduler-manager", |
| isLeader, 10000); |
| } |
| |
| public synchronized void initialize(Producer<byte[]> exclusiveProducer) { |
| if (!isRunning) { |
| log.info("Initializing scheduler manager"); |
| this.exclusiveProducer = exclusiveProducer; |
| executorService = new ThreadPoolExecutor(1, 5, 0L, TimeUnit.MILLISECONDS, |
| new LinkedBlockingQueue<>(5)); |
| executorService.setThreadFactory(new ThreadFactoryBuilder().setNameFormat("worker-scheduler-%d").build()); |
| scheduledExecutorService = Executors |
| .newSingleThreadScheduledExecutor(new DefaultThreadFactory("worker-assignment-topic-compactor")); |
| if (workerConfig.getTopicCompactionFrequencySec() > 0) { |
| scheduleCompaction(this.scheduledExecutorService, workerConfig.getTopicCompactionFrequencySec()); |
| } |
| |
| isRunning = true; |
| lastMessageProduced = null; |
| } else { |
| log.error("Scheduler Manager entered invalid state"); |
| errorNotifier.triggerError(new IllegalStateException()); |
| } |
| } |
| |
| private Future<?> scheduleInternal(Runnable runnable, String errMsg) { |
| if (!leaderService.isLeader()) { |
| return CompletableFuture.completedFuture(null); |
| } |
| |
| try { |
| return executorService.submit(() -> { |
| schedulerLock.lock(); |
| try { |
| boolean isLeader = leaderService.isLeader(); |
| if (isLeader) { |
| try { |
| runnable.run(); |
| } catch (Throwable th) { |
| log.error("Encountered error when invoking scheduler [{}]", errMsg); |
| errorNotifier.triggerError(th); |
| } |
| } |
| } finally { |
| schedulerLock.unlock(); |
| } |
| }); |
| } catch (RejectedExecutionException e) { |
| // task queue is full so just ignore |
| log.debug("Rejected task to invoke scheduler since task queue is already full"); |
| return CompletableFuture.completedFuture(null); |
| } |
| } |
| |
| public Future<?> schedule() { |
| return scheduleInternal(() -> { |
| workerStatsManager.scheduleTotalExecTimeStart(); |
| invokeScheduler(); |
| workerStatsManager.scheduleTotalExecTimeEnd(); |
| }, "Encountered error when invoking scheduler"); |
| } |
| |
| private Future<?> rebalance() { |
| return scheduleInternal(() -> { |
| workerStatsManager.rebalanceTotalExecTimeStart(); |
| invokeRebalance(); |
| workerStatsManager.rebalanceTotalExecTimeEnd(); |
| }, "Encountered error when invoking rebalance"); |
| } |
| |
| public Future<?> rebalanceIfNotInprogress() { |
| if (rebalanceInProgress.compareAndSet(false, true)) { |
| val numWorkers = getCurrentAvailableNumWorkers(); |
| if (numWorkers <= 1) { |
| rebalanceInProgress.set(false); |
| throw new TooFewWorkersException(); |
| } |
| return rebalance(); |
| } else { |
| throw new RebalanceInProgressException(); |
| } |
| } |
| |
| private Future<?> drain(String workerId) { |
| return scheduleInternal(() -> { |
| workerStatsManager.drainTotalExecTimeStart(); |
| assignmentsMovedInLastDrain = invokeDrain(workerId); |
| workerStatsManager.drainTotalExecTimeEnd(); |
| }, "Encountered error when invoking drain"); |
| } |
| |
| public Future<?> drainIfNotInProgress(String workerId) { |
| if (drainInProgressFlag.compareAndSet(false, true)) { |
| try { |
| val availableWorkers = getCurrentAvailableWorkers(); |
| if (availableWorkers.size() <= 1) { |
| throw new TooFewWorkersException(); |
| } |
| |
| // A worker must be specified at this point. This would be set up by the caller. |
| Objects.requireNonNull(workerId); |
| |
| // [We can get stricter, and require that every drain op be followed up with a cleanup of the |
| // corresponding worker before any other drain op, so that the drainOpStatusMap should be empty |
| // at the next drain operation.] |
| if (drainOpStatusMap.containsKey(workerId)) { |
| String warnString = "Worker " + workerId |
| + " was not removed yet from SchedulerManager after previous drain op"; |
| log.warn(warnString); |
| throw new WorkerNotRemovedAfterPriorDrainException(); |
| } |
| |
| if (!availableWorkers.contains(workerId)) { |
| log.info("invokeDrain was called for a worker={} which is not currently active", workerId); |
| throw new UnknownWorkerException(); |
| } |
| |
| return drain(workerId); |
| } finally { |
| drainInProgressFlag.set(false); |
| } |
| } else { |
| throw new DrainInProgressException(); |
| } |
| } |
| |
| public LongRunningProcessStatus getDrainStatus(String workerId) { |
| long startTime = System.nanoTime(); |
| LongRunningProcessStatus status = Optional.ofNullable(workerId).map(id -> |
| Optional.ofNullable(drainOpStatusMap.get(id)).map(opStatus -> |
| switch (opStatus) { |
| case DrainCompleted -> |
| LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.SUCCESS); |
| case DrainInProgress -> |
| LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.RUNNING); |
| case DrainNotInProgress -> |
| LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.NOT_RUN); |
| }).orElse( |
| LongRunningProcessStatus.forError("Worker " + id + " not found in drain records") |
| ) |
| ).orElse( |
| new LongRunningProcessStatus() |
| ); |
| log.info("Get drain status for worker {} - execution time: {} sec; returning status={}, error={}", |
| workerId, NANOSECONDS.toSeconds (System.nanoTime() - startTime), |
| status.status, status.lastError); |
| return status; |
| } |
| |
| // The following method is used only for testing. |
| @VisibleForTesting |
| void clearDrainOpsStatus() { |
| drainOpStatusMap.clear(); |
| log.warn("Cleared drain op status map"); |
| } |
| |
| // The following method is used only for testing. |
| @VisibleForTesting |
| void setDrainOpsStatus(final String workerId, final DrainOpStatus dStatus) { |
| drainOpStatusMap.put(workerId, dStatus); |
| log.warn("setDrainOpsStatus: updated drain status of worker {} to {}", workerId, dStatus); |
| } |
| |
| // The following method is used only for testing. |
| @VisibleForTesting |
| ConcurrentHashMap<String, DrainOpStatus> getDrainOpsStatusMap() { |
| return new ConcurrentHashMap<>(drainOpStatusMap); |
| } |
| |
| private synchronized int getCurrentAvailableNumWorkers() { |
| return getCurrentAvailableWorkers().size(); |
| } |
| |
| private synchronized Set<String> getCurrentAvailableWorkers() { |
| Set<String> currentMembership = membershipManager.getCurrentMembership() |
| .stream().map(WorkerInfo::getWorkerId).collect(Collectors.toSet()); |
| |
| // iterate the set, instead of the concurrent hashmap |
| currentMembership.removeIf(drainOpStatusMap::containsKey); |
| return currentMembership; |
| } |
| |
| void invokeScheduler() { |
| long startTime = System.nanoTime(); |
| |
| Set<String> availableWorkers = getCurrentAvailableWorkers(); |
| |
| List<FunctionMetaData> allFunctions = functionMetaDataManager.getAllFunctionMetaData(); |
| Map<String, Function.Instance> allInstances = |
| computeAllInstances(allFunctions, functionRuntimeManager.getRuntimeFactory().externallyManaged()); |
| Map<String, Map<String, Assignment>> workerIdToAssignments = functionRuntimeManager |
| .getCurrentAssignments(); |
| |
| // initialize stats collection |
| SchedulerStats schedulerStats = new SchedulerStats(workerIdToAssignments, availableWorkers); |
| |
| //delete assignments of functions and instances that don't exist anymore |
| Iterator<Map.Entry<String, Map<String, Assignment>>> it = workerIdToAssignments.entrySet().iterator(); |
| while (it.hasNext()) { |
| Map.Entry<String, Map<String, Assignment>> workerIdToAssignmentEntry = it.next(); |
| Map<String, Assignment> functionMap = workerIdToAssignmentEntry.getValue(); |
| |
| // remove instances that don't exist anymore |
| functionMap.entrySet().removeIf(entry -> { |
| String fullyQualifiedInstanceId = entry.getKey(); |
| boolean deleted = !allInstances.containsKey(fullyQualifiedInstanceId); |
| if (deleted) { |
| Assignment assignment = entry.getValue(); |
| MessageId messageId = publishNewAssignment(assignment.toBuilder().build(), true); |
| |
| // Directly update in memory assignment cache since I am leader |
| log.info("Deleting assignment: {}", assignment); |
| functionRuntimeManager.deleteAssignment(fullyQualifiedInstanceId); |
| // update message id associated with current view of assignments map |
| lastMessageProduced = messageId; |
| // update stats |
| schedulerStats.removedAssignment(assignment); |
| } |
| return deleted; |
| }); |
| |
| // update assignment instances in case attributes of a function gets updated |
| for (Map.Entry<String, Assignment> entry : functionMap.entrySet()) { |
| String fullyQualifiedInstanceId = entry.getKey(); |
| Assignment assignment = entry.getValue(); |
| Function.Instance instance = allInstances.get(fullyQualifiedInstanceId); |
| |
| if (!assignment.getInstance().equals(instance)) { |
| functionMap.put(fullyQualifiedInstanceId, assignment.toBuilder().setInstance(instance).build()); |
| Assignment newAssignment = assignment.toBuilder().setInstance(instance).build().toBuilder().build(); |
| MessageId messageId = publishNewAssignment(newAssignment, false); |
| |
| // Directly update in memory assignment cache since I am leader |
| log.info("Updating assignment: {}", newAssignment); |
| functionRuntimeManager.processAssignment(newAssignment); |
| // update message id associated with current view of assignments map |
| lastMessageProduced = messageId; |
| //update stats |
| schedulerStats.updatedAssignment(newAssignment); |
| } |
| if (functionMap.isEmpty()) { |
| it.remove(); |
| } |
| } |
| } |
| |
| List<Assignment> currentAssignments = workerIdToAssignments |
| .entrySet() |
| .stream() |
| .filter(workerIdToAssignmentEntry -> { |
| String workerId = workerIdToAssignmentEntry.getKey(); |
| // remove assignments to workers that don't exist / died for now. |
| // wait for failure detector to unassign them in the future for re-scheduling |
| return availableWorkers.contains(workerId); |
| }) |
| .flatMap(stringMapEntry -> stringMapEntry.getValue().values().stream()) |
| .collect(Collectors.toList()); |
| |
| Pair<List<Function.Instance>, List<Assignment>> unassignedInstances = |
| getUnassignedFunctionInstances(workerIdToAssignments, allInstances); |
| |
| workerStatsManager.scheduleStrategyExecTimeStartStart(); |
| List<Assignment> assignments = |
| scheduler.schedule(unassignedInstances.getLeft(), currentAssignments, availableWorkers); |
| workerStatsManager.scheduleStrategyExecTimeStartEnd(); |
| |
| assignments.addAll(unassignedInstances.getRight()); |
| |
| if (log.isDebugEnabled()) { |
| log.debug("New assignments computed: {}", assignments); |
| } |
| |
| isCompactionNeeded.set(!assignments.isEmpty()); |
| |
| for (Assignment assignment : assignments) { |
| MessageId messageId = publishNewAssignment(assignment, false); |
| |
| // Directly update in memory assignment cache since I am leader |
| log.info("Adding assignment: {}", assignment); |
| functionRuntimeManager.processAssignment(assignment); |
| // update message id associated with current view of assignments map |
| lastMessageProduced = messageId; |
| // update stats |
| schedulerStats.newAssignment(assignment); |
| } |
| |
| log.info("Schedule summary - execution time: {} sec | total unassigned: {} | stats: {}\n{}", |
| (System.nanoTime() - startTime) / Math.pow(10, 9), |
| unassignedInstances.getLeft().size(), schedulerStats.getSummary(), schedulerStats); |
| } |
| |
| private void invokeRebalance() { |
| long startTime = System.nanoTime(); |
| |
| Set<String> availableWorkers = getCurrentAvailableWorkers(); |
| |
| Map<String, Map<String, Assignment>> workerIdToAssignments = functionRuntimeManager.getCurrentAssignments(); |
| |
| // initialize stats collection |
| SchedulerStats schedulerStats = new SchedulerStats(workerIdToAssignments, availableWorkers); |
| |
| // filter out assignments of workers that are not currently in the active membership |
| List<Assignment> currentAssignments = workerIdToAssignments |
| .entrySet() |
| .stream() |
| .filter(workerIdToAssignmentEntry -> { |
| String workerId = workerIdToAssignmentEntry.getKey(); |
| // remove assignments to workers that don't exist / died for now. |
| // wait for failure detector to unassign them in the future for re-scheduling |
| return availableWorkers.contains(workerId); |
| }) |
| .flatMap(stringMapEntry -> stringMapEntry.getValue().values().stream()) |
| .collect(Collectors.toList()); |
| |
| workerStatsManager.rebalanceStrategyExecTimeStart(); |
| List<Assignment> rebalancedAssignments = scheduler.rebalance(currentAssignments, availableWorkers); |
| workerStatsManager.rebalanceStrategyExecTimeEnd(); |
| |
| for (Assignment assignment : rebalancedAssignments) { |
| MessageId messageId = publishNewAssignment(assignment, false); |
| // Directly update in memory assignment cache since I am leader |
| log.info("Rebalance - new assignment: {}", assignment); |
| functionRuntimeManager.processAssignment(assignment); |
| // update message id associated with current view of assignments map |
| lastMessageProduced = messageId; |
| // update stats |
| schedulerStats.newAssignment(assignment); |
| } |
| |
| log.info("Rebalance summary - execution time: {} sec | stats: {}\n{}", |
| (System.nanoTime() - startTime) / Math.pow(10, 9), schedulerStats.getSummary(), schedulerStats); |
| |
| rebalanceInProgress.set(false); |
| } |
| |
| private void scheduleCompaction(ScheduledExecutorService executor, long scheduleFrequencySec) { |
| if (executor != null) { |
| executor.scheduleWithFixedDelay(catchingAndLoggingThrowables(() -> { |
| if (leaderService.isLeader() && isCompactionNeeded.get()) { |
| compactAssignmentTopic(); |
| isCompactionNeeded.set(false); |
| } |
| }), scheduleFrequencySec, scheduleFrequencySec, TimeUnit.SECONDS); |
| |
| executor.scheduleWithFixedDelay(catchingAndLoggingThrowables(() -> { |
| if (leaderService.isLeader() |
| && metadataTopicLastMessage.compareTo(functionMetaDataManager.getLastMessageSeen()) != 0) { |
| metadataTopicLastMessage = functionMetaDataManager.getLastMessageSeen(); |
| compactFunctionMetadataTopic(); |
| } |
| }), scheduleFrequencySec, scheduleFrequencySec, TimeUnit.SECONDS); |
| } |
| } |
| |
| // The following method is used only for testing. |
| @VisibleForTesting |
| List<Assignment> getAssignmentsMovedInLastDrain() { |
| return assignmentsMovedInLastDrain; |
| } |
| |
| // The following method is used only for testing. |
| @VisibleForTesting |
| void clearAssignmentsMovedInLastDrain() { |
| assignmentsMovedInLastDrain = null; |
| } |
| |
| List<Assignment> invokeDrain(String workerId) { |
| |
| long startTime = System.nanoTime(); |
| |
| Set<String> availableWorkers = getCurrentAvailableWorkers(); |
| |
| // workerIdToAssignments is a map of the form {workerId : {FullyQualifiedInstanceId : Assignment}} |
| Map<String, Map<String, Assignment>> workerIdToAssignments = functionRuntimeManager.getCurrentAssignments(); |
| |
| // initialize stats collection |
| SchedulerStats schedulerStats = new SchedulerStats(workerIdToAssignments, availableWorkers); |
| |
| boolean drainSuccessful = false; |
| List<Assignment> postDrainAssignments = null; |
| |
| try { |
| drainOpStatusMap.put(workerId, DrainOpStatus.DrainInProgress); |
| |
| availableWorkers.remove(workerId); |
| |
| List<FunctionMetaData> allFunctions = functionMetaDataManager.getAllFunctionMetaData(); |
| Map<String, Function.Instance> allInstances = |
| computeAllInstances(allFunctions, functionRuntimeManager.getRuntimeFactory().externallyManaged()); |
| |
| // The assignments that were not on the worker being drained don't need to change. |
| val activeWorkersAssignmentsMap = new HashMap<String, Map<String, Assignment>>(); |
| List<Assignment> assignmentsOnActiveWorkers = workerIdToAssignments |
| .entrySet() |
| .stream() |
| .filter(workerIdToAssignmentEntry -> { |
| if (workerIdToAssignmentEntry.getKey().compareTo(workerId) != 0) { |
| activeWorkersAssignmentsMap.put(workerIdToAssignmentEntry.getKey(), |
| workerIdToAssignmentEntry.getValue()); |
| return true; |
| } |
| |
| return false; |
| }) |
| .flatMap(stringMapEntry -> stringMapEntry.getValue().values().stream()) |
| .collect(Collectors.toList()); |
| |
| Pair<List<Function.Instance>, List<Assignment>> instancesToAssign = |
| getUnassignedFunctionInstances(activeWorkersAssignmentsMap, allInstances); |
| |
| workerStatsManager.drainTotalExecTimeStart(); |
| // Try to schedule the instances on the workers remaining available after "workerId" is removed. |
| try { |
| postDrainAssignments = |
| scheduler.schedule(instancesToAssign.getLeft(), assignmentsOnActiveWorkers, availableWorkers); |
| } catch (Exception e) { |
| log.info("invokeDrain: Got exception from schedule: ", e); |
| } |
| workerStatsManager.drainTotalExecTimeEnd(); |
| |
| if (postDrainAssignments != null) { |
| for (Assignment assignment : postDrainAssignments) { |
| MessageId messageId = publishNewAssignment(assignment, false); |
| // Directly update in memory assignment cache since I am leader |
| functionRuntimeManager.processAssignment(assignment); |
| // update message id associated with current view of assignments map |
| lastMessageProduced = messageId; |
| // update stats |
| schedulerStats.newAssignment(assignment); |
| } |
| } |
| drainOpStatusMap.put(workerId, DrainOpStatus.DrainCompleted); |
| drainSuccessful = true; |
| } finally { |
| log.info("Draining worker {} was {}successful; summary [] - execution time: {} sec | stats: {}\n{}", |
| workerId, drainSuccessful ? "" : "un", |
| (System.nanoTime() - startTime) / Math.pow(10, 9), |
| schedulerStats.getSummary(), schedulerStats); |
| } |
| return postDrainAssignments; |
| } |
| |
| private void compactAssignmentTopic() { |
| if (this.admin != null) { |
| try { |
| this.admin.topics().triggerCompaction(workerConfig.getFunctionAssignmentTopic()); |
| } catch (PulsarAdminException e) { |
| log.error("Failed to trigger compaction", e); |
| scheduledExecutorService.schedule(this::compactAssignmentTopic, DEFAULT_ADMIN_API_BACKOFF_SEC, |
| TimeUnit.SECONDS); |
| } |
| } |
| } |
| |
| protected synchronized int updateWorkerDrainMap() { |
| long startTime = System.nanoTime(); |
| int numRemovedWorkerIds = 0; |
| |
| if (drainOpStatusMap.size() > 0) { |
| val currentMembership = membershipManager.getCurrentMembership() |
| .stream().map(WorkerInfo::getWorkerId).collect(Collectors.toSet()); |
| val removeWorkerIds = new ArrayList<String>(); |
| |
| for (String workerId : drainOpStatusMap.keySet()) { |
| if (!currentMembership.contains(workerId)) { |
| removeWorkerIds.add(workerId); |
| } |
| } |
| for (String workerId : removeWorkerIds) { |
| drainOpStatusMap.remove(workerId); |
| } |
| numRemovedWorkerIds = removeWorkerIds.size(); |
| } |
| |
| if (numRemovedWorkerIds > 0) { |
| log.info("cleanupWorkerDrainMap removed {} stale workerIds in {} sec", |
| numRemovedWorkerIds, (System.nanoTime() - startTime) / Math.pow(10, 9)); |
| } |
| |
| return numRemovedWorkerIds; |
| } |
| |
| private void compactFunctionMetadataTopic() { |
| if (this.admin != null) { |
| try { |
| this.admin.topics().triggerCompaction(workerConfig.getFunctionMetadataTopic()); |
| } catch (PulsarAdminException e) { |
| log.error("Failed to trigger compaction", e); |
| scheduledExecutorService.schedule(this::compactFunctionMetadataTopic, DEFAULT_ADMIN_API_BACKOFF_SEC, |
| TimeUnit.SECONDS); |
| } |
| } |
| } |
| |
| private MessageId publishNewAssignment(Assignment assignment, boolean deleted) { |
| try { |
| String fullyQualifiedInstanceId = FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance()); |
| // publish empty message with instance-id key so, compactor can delete and skip delivery of this instance-id |
| // message |
| return exclusiveProducer.newMessage().key(fullyQualifiedInstanceId) |
| .value(deleted ? "".getBytes() : assignment.toByteArray()).send(); |
| } catch (Exception e) { |
| log.error("Failed to {} assignment update {}", assignment, deleted ? "send" : "deleted", e); |
| throw new RuntimeException(e); |
| } |
| } |
| |
| private static Map<String, Function.Instance> computeAllInstances(List<FunctionMetaData> allFunctions, |
| boolean externallyManagedRuntime) { |
| Map<String, Function.Instance> functionInstances = new HashMap<>(); |
| for (FunctionMetaData functionMetaData : allFunctions) { |
| for (Function.Instance instance : computeInstances(functionMetaData, externallyManagedRuntime)) { |
| functionInstances.put(FunctionCommon.getFullyQualifiedInstanceId(instance), instance); |
| } |
| } |
| return functionInstances; |
| } |
| |
| static List<Function.Instance> computeInstances(FunctionMetaData functionMetaData, |
| boolean externallyManagedRuntime) { |
| List<Function.Instance> functionInstances = new LinkedList<>(); |
| if (!externallyManagedRuntime) { |
| int instances = functionMetaData.getFunctionDetails().getParallelism(); |
| for (int i = 0; i < instances; i++) { |
| functionInstances.add(Function.Instance.newBuilder() |
| .setFunctionMetaData(functionMetaData) |
| .setInstanceId(i) |
| .build()); |
| } |
| } else { |
| functionInstances.add(Function.Instance.newBuilder() |
| .setFunctionMetaData(functionMetaData) |
| .setInstanceId(-1) |
| .build()); |
| } |
| return functionInstances; |
| } |
| |
| private Pair<List<Function.Instance>, List<Assignment>> getUnassignedFunctionInstances( |
| Map<String, Map<String, Assignment>> currentAssignments, Map<String, Function.Instance> functionInstances) { |
| |
| List<Function.Instance> unassignedFunctionInstances = new LinkedList<>(); |
| List<Assignment> heartBeatAssignments = new ArrayList<>(); |
| Map<String, Assignment> assignmentMap = new HashMap<>(); |
| if (currentAssignments != null) { |
| for (Map<String, Assignment> entry : currentAssignments.values()) { |
| assignmentMap.putAll(entry); |
| } |
| } |
| |
| for (Map.Entry<String, Function.Instance> instanceEntry : functionInstances.entrySet()) { |
| String fullyQualifiedInstanceId = instanceEntry.getKey(); |
| Function.Instance instance = instanceEntry.getValue(); |
| String heartBeatWorkerId = checkHeartBeatFunction(instance); |
| if (heartBeatWorkerId != null) { |
| heartBeatAssignments |
| .add(Assignment.newBuilder().setInstance(instance).setWorkerId(heartBeatWorkerId).build()); |
| continue; |
| } |
| if (!assignmentMap.containsKey(fullyQualifiedInstanceId)) { |
| unassignedFunctionInstances.add(instance); |
| } |
| } |
| return ImmutablePair.of(unassignedFunctionInstances, heartBeatAssignments); |
| } |
| |
| @Override |
| public synchronized void close() { |
| log.info("Closing scheduler manager"); |
| // make sure we are not closing while a scheduling is being calculated |
| schedulerLock.lock(); |
| try { |
| isRunning = false; |
| |
| if (scheduledExecutorService != null) { |
| scheduledExecutorService.shutdown(); |
| } |
| |
| if (executorService != null) { |
| executorService.shutdown(); |
| } |
| |
| if (exclusiveProducer != null) { |
| try { |
| exclusiveProducer.close(); |
| } catch (PulsarClientException e) { |
| log.warn("Failed to shutdown scheduler manager assignment producer", e); |
| } |
| } |
| } finally { |
| schedulerLock.unlock(); |
| } |
| } |
| |
| static String checkHeartBeatFunction(Instance funInstance) { |
| if (funInstance.getFunctionMetaData() != null |
| && funInstance.getFunctionMetaData().getFunctionDetails() != null) { |
| FunctionDetails funDetails = funInstance.getFunctionMetaData().getFunctionDetails(); |
| return HEARTBEAT_TENANT.equals(funDetails.getTenant()) |
| && HEARTBEAT_NAMESPACE.equals(funDetails.getNamespace()) ? funDetails.getName() : null; |
| } |
| return null; |
| } |
| |
| public static class RebalanceInProgressException extends RuntimeException { |
| } |
| |
| public static class DrainInProgressException extends RuntimeException { |
| } |
| |
| public static class TooFewWorkersException extends RuntimeException { |
| } |
| |
| public static class UnknownWorkerException extends RuntimeException { |
| } |
| |
| public static class WorkerNotRemovedAfterPriorDrainException extends RuntimeException { |
| } |
| |
| private static class SchedulerStats { |
| @Builder |
| @Data |
| private static class WorkerStats { |
| private int originalNumAssignments; |
| private int finalNumAssignments; |
| private int instancesAdded; |
| private int instancesRemoved; |
| private int instancesUpdated; |
| private boolean alive; |
| } |
| |
| private final Map<String, WorkerStats> workerStatsMap = new HashMap<>(); |
| |
| private final Map<String, String> instanceToWorkerId = new HashMap<>(); |
| |
| public SchedulerStats(Map<String, Map<String, Assignment>> workerIdToAssignments, Set<String> workers) { |
| |
| for (String workerId : workers) { |
| WorkerStats.WorkerStatsBuilder workerStats = WorkerStats.builder().alive(true); |
| Map<String, Assignment> assignmentMap = workerIdToAssignments.get(workerId); |
| if (assignmentMap != null) { |
| workerStats.originalNumAssignments(assignmentMap.size()); |
| workerStats.finalNumAssignments(assignmentMap.size()); |
| |
| for (String fullyQualifiedInstanceId : assignmentMap.keySet()) { |
| instanceToWorkerId.put(fullyQualifiedInstanceId, workerId); |
| } |
| } else { |
| workerStats.originalNumAssignments(0); |
| workerStats.finalNumAssignments(0); |
| } |
| |
| workerStatsMap.put(workerId, workerStats.build()); |
| } |
| |
| // workers with assignments that are dead |
| for (Map.Entry<String, Map<String, Assignment>> entry : workerIdToAssignments.entrySet()) { |
| String workerId = entry.getKey(); |
| Map<String, Assignment> assignmentMap = entry.getValue(); |
| if (!workers.contains(workerId)) { |
| WorkerStats workerStats = WorkerStats.builder() |
| .alive(false) |
| .originalNumAssignments(assignmentMap.size()) |
| .finalNumAssignments(assignmentMap.size()) |
| .build(); |
| workerStatsMap.put(workerId, workerStats); |
| } |
| } |
| } |
| |
| public void removedAssignment(Assignment assignment) { |
| String workerId = assignment.getWorkerId(); |
| WorkerStats stats = workerStatsMap.get(workerId); |
| Objects.requireNonNull(stats); |
| |
| stats.instancesRemoved++; |
| stats.finalNumAssignments--; |
| } |
| |
| public void newAssignment(Assignment assignment) { |
| String fullyQualifiedInstanceId = FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance()); |
| String newWorkerId = assignment.getWorkerId(); |
| String oldWorkerId = instanceToWorkerId.get(fullyQualifiedInstanceId); |
| if (oldWorkerId != null) { |
| WorkerStats oldWorkerStats = workerStatsMap.get(oldWorkerId); |
| Objects.requireNonNull(oldWorkerStats); |
| |
| oldWorkerStats.instancesRemoved++; |
| oldWorkerStats.finalNumAssignments--; |
| } |
| |
| WorkerStats newWorkerStats = workerStatsMap.get(newWorkerId); |
| Objects.requireNonNull(newWorkerStats); |
| |
| newWorkerStats.instancesAdded++; |
| newWorkerStats.finalNumAssignments++; |
| } |
| |
| public void updatedAssignment(Assignment assignment) { |
| String workerId = assignment.getWorkerId(); |
| WorkerStats stats = workerStatsMap.get(workerId); |
| Objects.requireNonNull(stats); |
| |
| stats.instancesUpdated++; |
| } |
| |
| public String getSummary() { |
| int totalAdded = 0; |
| int totalUpdated = 0; |
| int totalRemoved = 0; |
| |
| for (Map.Entry<String, WorkerStats> entry : workerStatsMap.entrySet()) { |
| WorkerStats workerStats = entry.getValue(); |
| totalAdded += workerStats.instancesAdded; |
| totalUpdated += workerStats.instancesUpdated; |
| totalRemoved += workerStats.instancesRemoved; |
| } |
| |
| return String.format("{\"Added\": %d, \"Updated\": %d, \"removed\": %d}", totalAdded, totalUpdated, |
| totalRemoved); |
| } |
| |
| @Override |
| public String toString() { |
| try { |
| return ObjectMapperFactory.getMapper().writer().withDefaultPrettyPrinter() |
| .writeValueAsString(workerStatsMap); |
| } catch (JsonProcessingException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| } |
| } |