blob: e47c420d964242589dccb1615146ac8d84c00b02 [file] [log] [blame]
package org.apache.helix.controller;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.collect.Sets;
import org.I0Itec.zkclient.exception.ZkInterruptedException;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.api.exceptions.HelixMetaDataAccessException;
import org.apache.helix.api.listeners.ClusterConfigChangeListener;
import org.apache.helix.api.listeners.ControllerChangeListener;
import org.apache.helix.api.listeners.CurrentStateChangeListener;
import org.apache.helix.api.listeners.IdealStateChangeListener;
import org.apache.helix.api.listeners.InstanceConfigChangeListener;
import org.apache.helix.api.listeners.LiveInstanceChangeListener;
import org.apache.helix.api.listeners.MessageListener;
import org.apache.helix.api.listeners.PreFetch;
import org.apache.helix.api.listeners.ResourceConfigChangeListener;
import org.apache.helix.common.ClusterEventBlockingQueue;
import org.apache.helix.common.DedupEventProcessor;
import org.apache.helix.controller.dataproviders.BaseControllerDataProvider;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
import org.apache.helix.controller.pipeline.AsyncWorkerType;
import org.apache.helix.controller.pipeline.Pipeline;
import org.apache.helix.controller.pipeline.PipelineRegistry;
import org.apache.helix.controller.rebalancer.StatefulRebalancer;
import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
import org.apache.helix.controller.stages.AttributeName;
import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
import org.apache.helix.controller.stages.ClusterEvent;
import org.apache.helix.controller.stages.ClusterEventType;
import org.apache.helix.controller.stages.CompatibilityCheckStage;
import org.apache.helix.controller.stages.CurrentStateComputationStage;
import org.apache.helix.controller.stages.ExternalViewComputeStage;
import org.apache.helix.controller.stages.IntermediateStateCalcStage;
import org.apache.helix.controller.stages.MaintenanceRecoveryStage;
import org.apache.helix.controller.stages.MessageSelectionStage;
import org.apache.helix.controller.stages.MessageThrottleStage;
import org.apache.helix.controller.stages.PersistAssignmentStage;
import org.apache.helix.controller.stages.ReadClusterDataStage;
import org.apache.helix.controller.stages.ResourceComputationStage;
import org.apache.helix.controller.stages.ResourceValidationStage;
import org.apache.helix.controller.stages.TargetExteralViewCalcStage;
import org.apache.helix.controller.stages.TaskGarbageCollectionStage;
import org.apache.helix.controller.stages.TopStateHandoffReportStage;
import org.apache.helix.controller.stages.resource.ResourceMessageDispatchStage;
import org.apache.helix.controller.stages.resource.ResourceMessageGenerationPhase;
import org.apache.helix.controller.stages.task.TaskMessageDispatchStage;
import org.apache.helix.controller.stages.task.TaskMessageGenerationPhase;
import org.apache.helix.controller.stages.task.TaskPersistDataStage;
import org.apache.helix.controller.stages.task.TaskSchedulingStage;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.MaintenanceSignal;
import org.apache.helix.model.Message;
import org.apache.helix.model.PauseSignal;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.monitoring.mbeans.ClusterEventMonitor;
import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.helix.HelixConstants.ChangeType;
/**
* Cluster Controllers main goal is to keep the cluster state as close as possible to Ideal State.
* It does this by listening to changes in cluster state and scheduling new tasks to get cluster
* state to best possible ideal state. Every instance of this class can control can control only one
* cluster Get all the partitions use IdealState, CurrentState and Messages <br>
* foreach partition <br>
* 1. get the (instance,state) from IdealState, CurrentState and PendingMessages <br>
* 2. compute best possible state (instance,state) pair. This needs previous step data and state
* model constraints <br>
* 3. compute the messages/tasks needed to move to 1 to 2 <br>
* 4. select the messages that can be sent, needs messages and state model constraints <br>
* 5. send messages
*/
public class GenericHelixController implements IdealStateChangeListener,
LiveInstanceChangeListener, MessageListener, CurrentStateChangeListener,
ControllerChangeListener, InstanceConfigChangeListener, ResourceConfigChangeListener,
ClusterConfigChangeListener {
private static final Logger logger =
LoggerFactory.getLogger(GenericHelixController.class.getName());
private static final long EVENT_THREAD_JOIN_TIMEOUT = 1000;
private static final int ASYNC_TASKS_THREADPOOL_SIZE = 10;
private final PipelineRegistry _registry;
private final PipelineRegistry _taskRegistry;
final AtomicReference<Map<String, LiveInstance>> _lastSeenInstances;
final AtomicReference<Map<String, LiveInstance>> _lastSeenSessions;
// By default not reporting status until controller status is changed to activate
// TODO This flag should be inside ClusterStatusMonitor. When false, no MBean registering.
private boolean _isMonitoring = false;
private final ClusterStatusMonitor _clusterStatusMonitor;
/**
* A queue for controller events and a thread that will consume it
*/
private final ClusterEventBlockingQueue _eventQueue;
private final ClusterEventProcessor _eventThread;
private final ClusterEventBlockingQueue _taskEventQueue;
private final ClusterEventProcessor _taskEventThread;
private final Map<AsyncWorkerType, DedupEventProcessor<String, Runnable>> _asyncFIFOWorkerPool;
private long _continousRebalanceFailureCount = 0;
/**
* The _paused flag is checked by function handleEvent(), while if the flag is set handleEvent()
* will be no-op. Other event handling logic keeps the same when the flag is set.
*/
private boolean _paused;
private boolean _inMaintenanceMode;
/**
* The timer that can periodically run the rebalancing pipeline. The timer will start if there is
* one resource group has the config to use the timer.
*/
Timer _periodicalRebalanceTimer = null;
long _timerPeriod = Long.MAX_VALUE;
/**
* The timer that triggers the on-demand rebalance pipeline.
*/
Timer _onDemandRebalanceTimer = null;
AtomicReference<RebalanceTask> _nextRebalanceTask = new AtomicReference<>();
/**
* A cache maintained across pipelines
*/
private final ResourceControllerDataProvider _resourceControlDataProvider;
private final WorkflowControllerDataProvider _workflowControlDataProvider;
private final ScheduledExecutorService _asyncTasksThreadPool;
/**
* A record of last pipeline finish duration
*/
private long _lastPipelineEndTimestamp;
private String _clusterName;
private final Set<Pipeline.Type> _enabledPipelineTypes;
private HelixManager _helixManager;
// Since the stateful rebalancer needs to be lazily constructed when the HelixManager instance is
// ready, the GenericHelixController is not constructed with a stateful rebalancer. This wrapper
// is to avoid the complexity of handling a nullable value in the event handling process.
// TODO Create the required stateful rebalancer only when it is used by any resource.
private final StatefulRebalancerRef _rebalancerRef = new StatefulRebalancerRef() {
@Override
protected StatefulRebalancer createRebalancer(HelixManager helixManager) {
return new WagedRebalancer(helixManager);
}
};
/**
* TODO: We should get rid of this once we move to:
* 1) ZK callback should go to ClusterDataCache and trigger data cache refresh only
* 2) then ClusterDataCache.refresh triggers rebalance pipeline.
*/
/* Map of cluster->GenrichelixController */
private static Map<String, GenericHelixController> HelixControllerFactory = new ConcurrentHashMap<>();
public static GenericHelixController getController(String clusterName) {
return HelixControllerFactory.get(clusterName);
}
/**
* Default constructor that creates a default pipeline registry. This is sufficient in most cases,
* but if there is a some thing specific needed use another constructor where in you can pass a
* pipeline registry
*/
public GenericHelixController() {
this(createDefaultRegistry(Pipeline.Type.DEFAULT.name()),
createTaskRegistry(Pipeline.Type.TASK.name()));
}
public GenericHelixController(String clusterName) {
this(createDefaultRegistry(Pipeline.Type.DEFAULT.name()),
createTaskRegistry(Pipeline.Type.TASK.name()), clusterName,
Sets.newHashSet(Pipeline.Type.TASK, Pipeline.Type.DEFAULT));
}
public GenericHelixController(String clusterName, Set<Pipeline.Type> enabledPipelins) {
this(createDefaultRegistry(Pipeline.Type.DEFAULT.name()),
createTaskRegistry(Pipeline.Type.TASK.name()), clusterName, enabledPipelins);
}
class RebalanceTask extends TimerTask {
final HelixManager _manager;
final ClusterEventType _clusterEventType;
private final Optional<Boolean> _shouldRefreshCacheOption;
private long _nextRebalanceTime;
public RebalanceTask(HelixManager manager, ClusterEventType clusterEventType) {
this(manager, clusterEventType, -1);
}
public RebalanceTask(HelixManager manager, ClusterEventType clusterEventType,
long nextRebalanceTime) {
this(manager, clusterEventType, nextRebalanceTime, Optional.empty());
}
public RebalanceTask(HelixManager manager, ClusterEventType clusterEventType,
long nextRebalanceTime, boolean shouldRefreshCache) {
this(manager, clusterEventType, nextRebalanceTime, Optional.of(shouldRefreshCache));
}
private RebalanceTask(HelixManager manager, ClusterEventType clusterEventType,
long nextRebalanceTime, Optional<Boolean> shouldRefreshCacheOption) {
_manager = manager;
_clusterEventType = clusterEventType;
_nextRebalanceTime = nextRebalanceTime;
_shouldRefreshCacheOption = shouldRefreshCacheOption;
}
public long getNextRebalanceTime() {
return _nextRebalanceTime;
}
@Override
public void run() {
try {
if (_shouldRefreshCacheOption.orElse(
_clusterEventType.equals(ClusterEventType.PeriodicalRebalance) || _clusterEventType
.equals(ClusterEventType.OnDemandRebalance))) {
requestDataProvidersFullRefresh();
HelixDataAccessor accessor = _manager.getHelixDataAccessor();
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
Map<String, LiveInstance> liveInstanceMap =
accessor.getChildValuesMap(keyBuilder.liveInstances());
if (liveInstanceMap != null && !liveInstanceMap.isEmpty()) {
NotificationContext changeContext = new NotificationContext(_manager);
changeContext.setType(NotificationContext.Type.CALLBACK);
synchronized (_manager) {
checkLiveInstancesObservation(new ArrayList<>(liveInstanceMap.values()),
changeContext);
}
}
}
forceRebalance(_manager, _clusterEventType);
} catch (Throwable ex) {
logger.error("Time task failed. Rebalance task type: " + _clusterEventType + ", cluster: "
+ _clusterName, ex);
}
}
}
/* Trigger a rebalance pipeline */
private void forceRebalance(HelixManager manager, ClusterEventType eventType) {
NotificationContext changeContext = new NotificationContext(manager);
changeContext.setType(NotificationContext.Type.CALLBACK);
String uid = UUID.randomUUID().toString().substring(0, 8);
ClusterEvent event = new ClusterEvent(_clusterName, eventType, uid);
event.addAttribute(AttributeName.helixmanager.name(), changeContext.getManager());
event.addAttribute(AttributeName.changeContext.name(), changeContext);
event.addAttribute(AttributeName.eventData.name(), new ArrayList<>());
event.addAttribute(AttributeName.AsyncFIFOWorkerPool.name(), _asyncFIFOWorkerPool);
enqueueEvent(_taskEventQueue, event);
enqueueEvent(_eventQueue, event.clone(uid));
logger.info(String
.format("Controller rebalance pipeline triggered with event type: %s for cluster %s",
eventType, _clusterName));
}
/**
* Starts the rebalancing timer with the specified period. Start the timer if necessary; If the
* period is smaller than the current period, cancel the current timer and use the new period.
*/
void startPeriodRebalance(long period, HelixManager manager) {
if (period != _timerPeriod) {
logger.info("Controller starting periodical rebalance timer at period " + period);
if (_periodicalRebalanceTimer != null) {
_periodicalRebalanceTimer.cancel();
}
_periodicalRebalanceTimer = new Timer(true);
_timerPeriod = period;
_periodicalRebalanceTimer
.scheduleAtFixedRate(new RebalanceTask(manager, ClusterEventType.PeriodicalRebalance),
_timerPeriod, _timerPeriod);
} else {
logger.info("Controller already has periodical rebalance timer at period " + _timerPeriod);
}
}
/**
* Stops the rebalancing timer.
*/
void stopPeriodRebalance() {
logger.info("Controller stopping periodical rebalance timer at period " + _timerPeriod);
if (_periodicalRebalanceTimer != null) {
_periodicalRebalanceTimer.cancel();
_periodicalRebalanceTimer = null;
_timerPeriod = Long.MAX_VALUE;
logger.info("Controller stopped periodical rebalance timer at period " + _timerPeriod);
}
}
/**
* This function is deprecated. Please use RebalanceUtil.scheduleInstantPipeline method instead.
* schedule a future rebalance pipeline run, delayed at given time.
*/
@Deprecated
public void scheduleRebalance(long rebalanceTime) {
if (_helixManager == null) {
logger.warn(
"Failed to schedule a future pipeline run for cluster " + _clusterName + " helix manager is null!");
return;
}
long current = System.currentTimeMillis();
long delay = rebalanceTime - current;
if (rebalanceTime > current) {
RebalanceTask preTask = _nextRebalanceTask.get();
if (preTask != null && preTask.getNextRebalanceTime() > current
&& preTask.getNextRebalanceTime() < rebalanceTime) {
// already have a earlier rebalance scheduled, no need to schedule again.
return;
}
RebalanceTask newTask =
new RebalanceTask(_helixManager, ClusterEventType.OnDemandRebalance, rebalanceTime);
_onDemandRebalanceTimer.schedule(newTask, delay);
logger.info(
"Scheduled a future pipeline run for cluster " + _helixManager.getClusterName() + " in delay "
+ delay);
preTask = _nextRebalanceTask.getAndSet(newTask);
if (preTask != null) {
preTask.cancel();
}
}
}
/**
* Schedule an on demand rebalance pipeline.
* @param delay
*/
@Deprecated
public void scheduleOnDemandRebalance(long delay) {
scheduleOnDemandRebalance(delay, true);
}
/**
* Schedule an on demand rebalance pipeline.
* @param delay
* @param shouldRefreshCache true if refresh the cache before scheduling a rebalance.
*/
public void scheduleOnDemandRebalance(long delay, boolean shouldRefreshCache) {
if (_helixManager == null) {
logger.error("Failed to schedule a future pipeline run for cluster {}. Helix manager is null!",
_clusterName);
return;
}
long currentTime = System.currentTimeMillis();
long rebalanceTime = currentTime + delay;
if (delay > 0) {
RebalanceTask preTask = _nextRebalanceTask.get();
if (preTask != null && preTask.getNextRebalanceTime() > currentTime
&& preTask.getNextRebalanceTime() < rebalanceTime) {
// already have a earlier rebalance scheduled, no need to schedule again.
return;
}
}
RebalanceTask newTask =
new RebalanceTask(_helixManager, ClusterEventType.OnDemandRebalance, rebalanceTime,
shouldRefreshCache);
_onDemandRebalanceTimer.schedule(newTask, delay);
logger.info("Scheduled instant pipeline run for cluster {}." , _helixManager.getClusterName());
RebalanceTask preTask = _nextRebalanceTask.getAndSet(newTask);
if (preTask != null) {
preTask.cancel();
}
}
private static PipelineRegistry createDefaultRegistry(String pipelineName) {
logger.info("createDefaultRegistry");
synchronized (GenericHelixController.class) {
PipelineRegistry registry = new PipelineRegistry();
// cluster data cache refresh
Pipeline dataRefresh = new Pipeline(pipelineName);
dataRefresh.addStage(new ReadClusterDataStage());
// data pre-process pipeline
Pipeline dataPreprocess = new Pipeline(pipelineName);
dataPreprocess.addStage(new ResourceComputationStage());
dataPreprocess.addStage(new ResourceValidationStage());
dataPreprocess.addStage(new CurrentStateComputationStage());
dataPreprocess.addStage(new TopStateHandoffReportStage());
// rebalance pipeline
Pipeline rebalancePipeline = new Pipeline(pipelineName);
rebalancePipeline.addStage(new BestPossibleStateCalcStage());
rebalancePipeline.addStage(new IntermediateStateCalcStage());
// Need to add MaintenanceRecoveryStage here because MAX_PARTITIONS_PER_INSTANCE check could
// only occur after IntermediateStateCalcStage calculation
rebalancePipeline.addStage(new MaintenanceRecoveryStage());
rebalancePipeline.addStage(new ResourceMessageGenerationPhase());
rebalancePipeline.addStage(new MessageSelectionStage());
rebalancePipeline.addStage(new MessageThrottleStage());
rebalancePipeline.addStage(new ResourceMessageDispatchStage());
rebalancePipeline.addStage(new PersistAssignmentStage());
rebalancePipeline.addStage(new TargetExteralViewCalcStage());
// external view generation
Pipeline externalViewPipeline = new Pipeline(pipelineName);
externalViewPipeline.addStage(new ExternalViewComputeStage());
// backward compatibility check
Pipeline liveInstancePipeline = new Pipeline(pipelineName);
liveInstancePipeline.addStage(new CompatibilityCheckStage());
// auto-exit maintenance mode if applicable
Pipeline autoExitMaintenancePipeline = new Pipeline(pipelineName);
autoExitMaintenancePipeline.addStage(new MaintenanceRecoveryStage());
registry.register(ClusterEventType.IdealStateChange, dataRefresh, dataPreprocess, rebalancePipeline);
registry.register(ClusterEventType.CurrentStateChange, dataRefresh, dataPreprocess, externalViewPipeline, rebalancePipeline);
registry.register(ClusterEventType.InstanceConfigChange, dataRefresh, dataPreprocess, rebalancePipeline);
registry.register(ClusterEventType.ResourceConfigChange, dataRefresh, dataPreprocess, rebalancePipeline);
registry.register(ClusterEventType.ClusterConfigChange, dataRefresh, autoExitMaintenancePipeline, dataPreprocess, rebalancePipeline);
registry.register(ClusterEventType.LiveInstanceChange, dataRefresh, autoExitMaintenancePipeline, liveInstancePipeline, dataPreprocess, externalViewPipeline, rebalancePipeline);
registry.register(ClusterEventType.MessageChange, dataRefresh, dataPreprocess, rebalancePipeline);
registry.register(ClusterEventType.Resume, dataRefresh, dataPreprocess, externalViewPipeline, rebalancePipeline);
registry.register(ClusterEventType.PeriodicalRebalance, dataRefresh, autoExitMaintenancePipeline, dataPreprocess, externalViewPipeline, rebalancePipeline);
registry.register(ClusterEventType.OnDemandRebalance, dataRefresh, autoExitMaintenancePipeline, dataPreprocess, externalViewPipeline, rebalancePipeline);
return registry;
}
}
private static PipelineRegistry createTaskRegistry(String pipelineName) {
logger.info("createTaskRegistry");
synchronized (GenericHelixController.class) {
PipelineRegistry registry = new PipelineRegistry();
// cluster data cache refresh
Pipeline dataRefresh = new Pipeline(pipelineName);
dataRefresh.addStage(new ReadClusterDataStage());
// data pre-process pipeline
Pipeline dataPreprocess = new Pipeline(pipelineName);
dataPreprocess.addStage(new ResourceComputationStage());
dataPreprocess.addStage(new ResourceValidationStage());
dataPreprocess.addStage(new CurrentStateComputationStage());
// rebalance pipeline
Pipeline rebalancePipeline = new Pipeline(pipelineName);
rebalancePipeline.addStage(new TaskSchedulingStage());
rebalancePipeline.addStage(new TaskPersistDataStage());
rebalancePipeline.addStage(new TaskGarbageCollectionStage());
rebalancePipeline.addStage(new TaskMessageGenerationPhase());
rebalancePipeline.addStage(new TaskMessageDispatchStage());
// backward compatibility check
Pipeline liveInstancePipeline = new Pipeline(pipelineName);
liveInstancePipeline.addStage(new CompatibilityCheckStage());
registry.register(ClusterEventType.IdealStateChange, dataRefresh, dataPreprocess,
rebalancePipeline);
registry.register(ClusterEventType.CurrentStateChange, dataRefresh, dataPreprocess,
rebalancePipeline);
registry.register(ClusterEventType.InstanceConfigChange, dataRefresh, dataPreprocess,
rebalancePipeline);
registry.register(ClusterEventType.ResourceConfigChange, dataRefresh, dataPreprocess,
rebalancePipeline);
registry.register(ClusterEventType.ClusterConfigChange, dataRefresh, dataPreprocess,
rebalancePipeline);
registry.register(ClusterEventType.LiveInstanceChange, dataRefresh, liveInstancePipeline,
dataPreprocess, rebalancePipeline);
registry
.register(ClusterEventType.MessageChange, dataRefresh, dataPreprocess, rebalancePipeline);
registry.register(ClusterEventType.Resume, dataRefresh, dataPreprocess, rebalancePipeline);
registry.register(ClusterEventType.PeriodicalRebalance, dataRefresh, dataPreprocess,
rebalancePipeline);
registry.register(ClusterEventType.OnDemandRebalance, dataRefresh, dataPreprocess,
rebalancePipeline);
return registry;
}
}
// TODO: refactor the constructor as providing both registry but only enabling one looks confusing
public GenericHelixController(PipelineRegistry registry, PipelineRegistry taskRegistry) {
this(registry, taskRegistry, null, Sets.newHashSet(
Pipeline.Type.TASK, Pipeline.Type.DEFAULT));
}
private GenericHelixController(PipelineRegistry registry, PipelineRegistry taskRegistry,
final String clusterName, Set<Pipeline.Type> enabledPipelineTypes) {
_paused = false;
_enabledPipelineTypes = enabledPipelineTypes;
_registry = registry;
_taskRegistry = taskRegistry;
_lastSeenInstances = new AtomicReference<>();
_lastSeenSessions = new AtomicReference<>();
_clusterName = clusterName;
_lastPipelineEndTimestamp = TopStateHandoffReportStage.TIMESTAMP_NOT_RECORDED;
_clusterStatusMonitor = new ClusterStatusMonitor(_clusterName);
_asyncTasksThreadPool =
Executors.newScheduledThreadPool(ASYNC_TASKS_THREADPOOL_SIZE, new ThreadFactory() {
@Override public Thread newThread(Runnable r) {
return new Thread(r, "HelixController-async_tasks-" + _clusterName);
}
});
_asyncFIFOWorkerPool = new HashMap<>();
initializeAsyncFIFOWorkers();
_onDemandRebalanceTimer = new Timer(true);
// initialize pipelines at the end so we have everything else prepared
if (_enabledPipelineTypes.contains(Pipeline.Type.DEFAULT)) {
logger.info("Initializing {} pipeline", Pipeline.Type.DEFAULT.name());
_resourceControlDataProvider = new ResourceControllerDataProvider(clusterName);
_eventQueue = new ClusterEventBlockingQueue();
_eventThread = new ClusterEventProcessor(_resourceControlDataProvider, _eventQueue,
"default-" + clusterName);
initPipeline(_eventThread, _resourceControlDataProvider);
logger.info("Initialized {} pipeline", Pipeline.Type.DEFAULT.name());
} else {
_eventQueue = null;
_resourceControlDataProvider = null;
_eventThread = null;
}
if (_enabledPipelineTypes.contains(Pipeline.Type.TASK)) {
logger.info("Initializing {} pipeline", Pipeline.Type.TASK.name());
_workflowControlDataProvider = new WorkflowControllerDataProvider(clusterName);
_taskEventQueue = new ClusterEventBlockingQueue();
_taskEventThread = new ClusterEventProcessor(_workflowControlDataProvider, _taskEventQueue,
"task-" + clusterName);
initPipeline(_taskEventThread, _workflowControlDataProvider);
logger.info("Initialized {} pipeline", Pipeline.Type.TASK.name());
} else {
_workflowControlDataProvider = null;
_taskEventQueue = null;
_taskEventThread = null;
}
if (clusterName != null) {
HelixControllerFactory.put(clusterName, this);
}
}
private void initializeAsyncFIFOWorkers() {
for (AsyncWorkerType type : AsyncWorkerType.values()) {
DedupEventProcessor<String, Runnable> worker =
new DedupEventProcessor<String, Runnable>(_clusterName, type.name()) {
@Override
protected void handleEvent(Runnable event) {
// TODO: retry when queue is empty and event.run() failed?
event.run();
}
};
worker.start();
_asyncFIFOWorkerPool.put(type, worker);
logger.info("Started async worker {}", worker.getName());
}
}
private void shutdownAsyncFIFOWorkers() {
for (DedupEventProcessor processor : _asyncFIFOWorkerPool.values()) {
processor.shutdown();
logger.info("Shutdown async worker {}", processor.getName());
}
}
private boolean isEventQueueEmpty(boolean taskQueue) {
if (taskQueue) {
return _taskEventQueue == null || _taskEventQueue.isEmpty();
} else {
return _eventQueue == null || _eventQueue.isEmpty();
}
}
/**
* lock-always: caller always needs to obtain an external lock before call, calls to handleEvent()
* should be serialized
* @param event cluster event to handle
*/
private void handleEvent(ClusterEvent event, BaseControllerDataProvider dataProvider) {
HelixManager manager = event.getAttribute(AttributeName.helixmanager.name());
if (manager == null) {
logger.error("No cluster manager in event:" + event.getEventType());
return;
}
// Event handling happens in a different thread from the onControllerChange processing thread.
// Thus, there are several possible conditions.
// 1. Event handled after leadership acquired. So we will have a valid rebalancer for the
// event processing.
// 2. Event handled shortly after leadership relinquished. And the rebalancer has not been
// marked as invalid yet. So the event will be processed the same as case one.
// 3. Event is leftover from the previous session, and it is handled when the controller
// regains the leadership. The rebalancer will be reset before being used. That is the
// expected behavior so as to avoid inconsistent rebalance result.
// 4. Event handled shortly after leadership relinquished. And the rebalancer has been marked
// as invalid. So we reset the rebalancer. But the later isLeader() check will return false and
// the pipeline will be triggered. So the reset rebalancer won't be used before the controller
// regains leadership.
event.addAttribute(AttributeName.STATEFUL_REBALANCER.name(),
_rebalancerRef.getRebalancer(manager));
if (!manager.isLeader()) {
logger.error("Cluster manager: " + manager.getInstanceName() + " is not leader for " + manager
.getClusterName() + ". Pipeline will not be invoked");
return;
}
_helixManager = manager;
// TODO If init controller with paused = true, it may not take effect immediately
// _paused is default false. If any events come before controllerChangeEvent, the controller
// will be excuting in un-paused mode. Which might not be the config in ZK.
if (_paused) {
logger.info("Cluster " + manager.getClusterName() + " is paused. Ignoring the event:" + event
.getEventType());
return;
}
NotificationContext context = null;
if (event.getAttribute(AttributeName.changeContext.name()) != null) {
context = event.getAttribute(AttributeName.changeContext.name());
}
if (context != null) {
if (context.getType() == NotificationContext.Type.FINALIZE) {
stopPeriodRebalance();
logger.info("Get FINALIZE notification, skip the pipeline. Event :" + event.getEventType());
return;
} else {
// TODO: should be in the initialization of controller.
if (_resourceControlDataProvider != null) {
checkRebalancingTimer(manager, Collections.<IdealState>emptyList(), dataProvider.getClusterConfig());
}
if (_isMonitoring) {
event.addAttribute(AttributeName.clusterStatusMonitor.name(), _clusterStatusMonitor);
}
}
}
dataProvider.setClusterEventId(event.getEventId());
event.addAttribute(AttributeName.LastRebalanceFinishTimeStamp.name(), _lastPipelineEndTimestamp);
// Prepare ClusterEvent
// TODO (harry): this is a temporal workaround - after controller is separated we should not
// have this instanceof clauses
List<Pipeline> pipelines;
boolean isTaskFrameworkPipeline = false;
if (dataProvider instanceof ResourceControllerDataProvider) {
pipelines = _registry
.getPipelinesForEvent(event.getEventType());
} else if (dataProvider instanceof WorkflowControllerDataProvider) {
pipelines = _taskRegistry
.getPipelinesForEvent(event.getEventType());
isTaskFrameworkPipeline = true;
} else {
logger.warn(String
.format("No %s pipeline to run for event: %s::%s", dataProvider.getPipelineName(),
event.getEventType(), event.getEventId()));
return;
}
event.addAttribute(AttributeName.ControllerDataProvider.name(), dataProvider);
logger.info(String.format("START: Invoking %s controller pipeline for cluster %s event: %s %s",
manager.getClusterName(), dataProvider.getPipelineName(), event.getEventType(),
event.getEventId()));
long startTime = System.currentTimeMillis();
boolean rebalanceFail = false;
for (Pipeline pipeline : pipelines) {
event.addAttribute(AttributeName.PipelineType.name(), pipeline.getPipelineType());
try {
pipeline.handle(event);
pipeline.finish();
} catch (Exception e) {
logger.error(
"Exception while executing {} pipeline: {} for cluster {}. Will not continue to next pipeline",
dataProvider.getPipelineName(), _clusterName, Arrays.toString(e.getStackTrace()));
if (e instanceof HelixMetaDataAccessException) {
rebalanceFail = true;
// If pipeline failed due to read/write fails to zookeeper, retry the pipeline.
dataProvider.requireFullRefresh();
logger.warn("Rebalance pipeline failed due to read failure from zookeeper, cluster: " + _clusterName);
// only push a retry event when there is no pending event in the corresponding event queue.
if (isEventQueueEmpty(isTaskFrameworkPipeline)) {
_continousRebalanceFailureCount ++;
long delay = getRetryDelay(_continousRebalanceFailureCount);
if (delay == 0) {
forceRebalance(manager, ClusterEventType.RetryRebalance);
} else {
_asyncTasksThreadPool
.schedule(new RebalanceTask(manager, ClusterEventType.RetryRebalance), delay,
TimeUnit.MILLISECONDS);
}
logger.info("Retry rebalance pipeline with delay " + delay + "ms for cluster: " + _clusterName);
}
}
_clusterStatusMonitor.reportRebalanceFailure();
break;
}
}
if (!rebalanceFail) {
_continousRebalanceFailureCount = 0;
}
_lastPipelineEndTimestamp = System.currentTimeMillis();
logger.info("END: Invoking {} controller pipeline for event {}::{} for cluster {}, took {} ms",
dataProvider.getPipelineName(), event.getEventType(), event.getEventId(), _clusterName,
_lastPipelineEndTimestamp - startTime);
if (!isTaskFrameworkPipeline) {
// report event process durations
NotificationContext notificationContext =
event.getAttribute(AttributeName.changeContext.name());
long enqueueTime = event.getCreationTime();
long zkCallbackTime;
StringBuilder sb = new StringBuilder();
if (notificationContext != null) {
zkCallbackTime = notificationContext.getCreationTime();
if (_isMonitoring) {
_clusterStatusMonitor
.updateClusterEventDuration(ClusterEventMonitor.PhaseName.Callback.name(),
enqueueTime - zkCallbackTime);
}
sb.append(String.format("Callback time for event: %s took: %s ms\n", event.getEventType(),
enqueueTime - zkCallbackTime));
}
if (_isMonitoring) {
_clusterStatusMonitor
.updateClusterEventDuration(ClusterEventMonitor.PhaseName.InQueue.name(),
startTime - enqueueTime);
_clusterStatusMonitor
.updateClusterEventDuration(ClusterEventMonitor.PhaseName.TotalProcessed.name(),
_lastPipelineEndTimestamp - startTime);
}
sb.append(String.format("InQueue time for event: %s took: %s ms\n", event.getEventType(),
startTime - enqueueTime));
sb.append(String.format("TotalProcessed time for event: %s took: %s ms", event.getEventType(),
_lastPipelineEndTimestamp - startTime));
logger.info(sb.toString());
}
// If event handling happens before controller deactivate, the process may write unnecessary
// MBeans to monitoring after the monitor is disabled.
// So reset ClusterStatusMonitor according to it's status after all event handling.
// TODO remove this once clusterStatusMonitor blocks any MBean register on isMonitoring = false.
resetClusterStatusMonitor();
}
/**
* get the delay on next retry rebalance due to zk read failure, We use a simple exponential
* backoff to make the delay between [10ms, 1000ms]
*/
private long getRetryDelay(long failCount) {
int lowLimit = 5;
if (failCount <= lowLimit) {
return 0;
}
long backoff = (long) (Math.pow(2, failCount - lowLimit) * 10);
return Math.min(backoff, 1000);
}
@Override
@PreFetch(enabled = false)
public void onStateChange(String instanceName, List<CurrentState> statesInfo,
NotificationContext changeContext) {
logger.info("START: GenericClusterController.onStateChange()");
notifyCaches(changeContext, ChangeType.CURRENT_STATE);
pushToEventQueues(ClusterEventType.CurrentStateChange, changeContext, Collections
.<String, Object>singletonMap(AttributeName.instanceName.name(), instanceName));
logger.info("END: GenericClusterController.onStateChange()");
}
@Override
@PreFetch(enabled = false)
public void onMessage(String instanceName, List<Message> messages,
NotificationContext changeContext) {
logger.info("START: GenericClusterController.onMessage() for cluster " + _clusterName);
notifyCaches(changeContext, ChangeType.MESSAGE);
pushToEventQueues(ClusterEventType.MessageChange, changeContext,
Collections.<String, Object>singletonMap(AttributeName.instanceName.name(), instanceName));
if (_isMonitoring && messages != null) {
_clusterStatusMonitor.addMessageQueueSize(instanceName, messages.size());
}
logger.info("END: GenericClusterController.onMessage() for cluster " + _clusterName);
}
@Override
public void onLiveInstanceChange(List<LiveInstance> liveInstances,
NotificationContext changeContext) {
logger.info("START: Generic GenericClusterController.onLiveInstanceChange() for cluster " + _clusterName);
notifyCaches(changeContext, ChangeType.LIVE_INSTANCE);
if (liveInstances == null) {
liveInstances = Collections.emptyList();
}
// Go though the live instance list and make sure that we are observing them
// accordingly. The action is done regardless of the paused flag.
if (changeContext.getType() == NotificationContext.Type.INIT
|| changeContext.getType() == NotificationContext.Type.CALLBACK) {
checkLiveInstancesObservation(liveInstances, changeContext);
} else if (changeContext.getType() == NotificationContext.Type.FINALIZE) {
// on finalize, should remove all message/current-state listeners
logger.info("remove message/current-state listeners. lastSeenInstances: " + _lastSeenInstances
+ ", lastSeenSessions: " + _lastSeenSessions);
liveInstances = Collections.emptyList();
checkLiveInstancesObservation(liveInstances, changeContext);
}
pushToEventQueues(ClusterEventType.LiveInstanceChange, changeContext,
Collections.<String, Object>singletonMap(AttributeName.eventData.name(), liveInstances));
logger.info(
"END: Generic GenericClusterController.onLiveInstanceChange() for cluster " + _clusterName);
}
private void checkRebalancingTimer(HelixManager manager, List<IdealState> idealStates,
ClusterConfig clusterConfig) {
if (manager.getConfigAccessor() == null) {
logger.warn(manager.getInstanceName()
+ " config accessor doesn't exist. should be in file-based mode.");
return;
}
long minPeriod = Long.MAX_VALUE;
if (clusterConfig != null) {
long period = clusterConfig.getRebalanceTimePeriod();
if (period > 0 && minPeriod > period) {
minPeriod = period;
}
}
// TODO: resource level rebalance does not make sense, to remove it!
for (IdealState idealState : idealStates) {
long period = idealState.getRebalanceTimerPeriod();
if (period > 0 && minPeriod > period) {
minPeriod = period;
}
}
if (minPeriod != Long.MAX_VALUE) {
startPeriodRebalance(minPeriod, manager);
} else {
stopPeriodRebalance();
}
}
@Override
@PreFetch(enabled = false)
public void onIdealStateChange(List<IdealState> idealStates, NotificationContext changeContext) {
logger.info(
"START: Generic GenericClusterController.onIdealStateChange() for cluster " + _clusterName);
notifyCaches(changeContext, ChangeType.IDEAL_STATE);
pushToEventQueues(ClusterEventType.IdealStateChange, changeContext,
Collections.<String, Object>emptyMap());
if (changeContext.getType() != NotificationContext.Type.FINALIZE) {
HelixManager manager = changeContext.getManager();
if (manager != null) {
HelixDataAccessor dataAccessor = changeContext.getManager().getHelixDataAccessor();
checkRebalancingTimer(changeContext.getManager(), idealStates,
(ClusterConfig) dataAccessor.getProperty(dataAccessor.keyBuilder().clusterConfig()));
}
}
logger.info("END: GenericClusterController.onIdealStateChange() for cluster " + _clusterName);
}
@Override
@PreFetch(enabled = false)
public void onInstanceConfigChange(List<InstanceConfig> instanceConfigs,
NotificationContext changeContext) {
logger.info(
"START: GenericClusterController.onInstanceConfigChange() for cluster " + _clusterName);
notifyCaches(changeContext, ChangeType.INSTANCE_CONFIG);
pushToEventQueues(ClusterEventType.InstanceConfigChange, changeContext,
Collections.<String, Object>emptyMap());
logger.info(
"END: GenericClusterController.onInstanceConfigChange() for cluster " + _clusterName);
}
@Override
@PreFetch(enabled = false)
public void onResourceConfigChange(
List<ResourceConfig> resourceConfigs, NotificationContext context) {
logger.info(
"START: GenericClusterController.onResourceConfigChange() for cluster " + _clusterName);
notifyCaches(context, ChangeType.RESOURCE_CONFIG);
pushToEventQueues(ClusterEventType.ResourceConfigChange, context,
Collections.<String, Object>emptyMap());
logger
.info("END: GenericClusterController.onResourceConfigChange() for cluster " + _clusterName);
}
@Override
@PreFetch(enabled = false)
public void onClusterConfigChange(ClusterConfig clusterConfig,
NotificationContext context) {
logger.info(
"START: GenericClusterController.onClusterConfigChange() for cluster " + _clusterName);
notifyCaches(context, ChangeType.CLUSTER_CONFIG);
pushToEventQueues(ClusterEventType.ClusterConfigChange, context,
Collections.<String, Object>emptyMap());
logger
.info("END: GenericClusterController.onClusterConfigChange() for cluster " + _clusterName);
}
private void notifyCaches(NotificationContext context, ChangeType changeType) {
if (context == null || context.getType() != NotificationContext.Type.CALLBACK) {
requestDataProvidersFullRefresh();
} else {
updateDataChangeInProvider(changeType, context.getPathChanged());
}
}
private void updateDataChangeInProvider(ChangeType type, String path) {
if (_resourceControlDataProvider != null) {
_resourceControlDataProvider.notifyDataChange(type, path);
}
if (_workflowControlDataProvider != null) {
_workflowControlDataProvider.notifyDataChange(type, path);
}
}
private void requestDataProvidersFullRefresh() {
if (_resourceControlDataProvider != null) {
_resourceControlDataProvider.requireFullRefresh();
}
if (_workflowControlDataProvider != null) {
_workflowControlDataProvider.requireFullRefresh();
}
}
private void pushToEventQueues(ClusterEventType eventType, NotificationContext changeContext,
Map<String, Object> eventAttributes) {
// No need for completed UUID, prefixed should be fine
String uid = UUID.randomUUID().toString().substring(0, 8);
ClusterEvent event = new ClusterEvent(_clusterName, eventType,
String.format("%s_%s", uid, Pipeline.Type.DEFAULT.name()));
event.addAttribute(AttributeName.helixmanager.name(), changeContext.getManager());
event.addAttribute(AttributeName.changeContext.name(), changeContext);
event.addAttribute(AttributeName.AsyncFIFOWorkerPool.name(), _asyncFIFOWorkerPool);
for (Map.Entry<String, Object> attr : eventAttributes.entrySet()) {
event.addAttribute(attr.getKey(), attr.getValue());
}
enqueueEvent(_eventQueue, event);
enqueueEvent(_taskEventQueue,
event.clone(String.format("%s_%s", uid, Pipeline.Type.TASK.name())));
}
private void enqueueEvent(ClusterEventBlockingQueue queue, ClusterEvent event) {
if (event == null || queue == null) {
return;
}
queue.put(event);
}
@Override
public void onControllerChange(NotificationContext changeContext) {
logger.info("START: GenericClusterController.onControllerChange() for cluster " + _clusterName);
requestDataProvidersFullRefresh();
boolean controllerIsLeader;
if (changeContext == null || changeContext.getType() == NotificationContext.Type.FINALIZE) {
logger.info(
"GenericClusterController.onControllerChange() Cluster change type {} for cluster {}. Disable leadership.",
changeContext == null ? null : changeContext.getType(), _clusterName);
controllerIsLeader = false;
} else {
// double check if this controller is the leader
controllerIsLeader = changeContext.getManager().isLeader();
}
if (controllerIsLeader) {
HelixManager manager = changeContext.getManager();
HelixDataAccessor accessor = manager.getHelixDataAccessor();
Builder keyBuilder = accessor.keyBuilder();
PauseSignal pauseSignal = accessor.getProperty(keyBuilder.pause());
MaintenanceSignal maintenanceSignal = accessor.getProperty(keyBuilder.maintenance());
_paused = updateControllerState(changeContext, pauseSignal, _paused);
_inMaintenanceMode =
updateControllerState(changeContext, maintenanceSignal, _inMaintenanceMode);
enableClusterStatusMonitor(true);
_clusterStatusMonitor.setEnabled(!_paused);
_clusterStatusMonitor.setPaused(_paused);
_clusterStatusMonitor.setMaintenance(_inMaintenanceMode);
} else {
enableClusterStatusMonitor(false);
// Note that onControllerChange is executed in parallel with the event processing thread. It
// is possible that the current WAGED rebalancer object is in use for handling callback. So
// mark the rebalancer invalid only, instead of closing it here.
// This to-be-closed WAGED rebalancer will be reset later on a later event processing if
// the controller becomes leader again.
_rebalancerRef.invalidateRebalancer();
}
logger.info("END: GenericClusterController.onControllerChange() for cluster " + _clusterName);
}
/**
* Go through the list of liveinstances in the cluster, and add currentstateChange listener and
* Message listeners to them if they are newly added. For current state change, the observation is
* tied to the session id of each live instance.
*/
protected void checkLiveInstancesObservation(List<LiveInstance> liveInstances,
NotificationContext changeContext) {
// construct maps for current live-instances
Map<String, LiveInstance> curInstances = new HashMap<>();
Map<String, LiveInstance> curSessions = new HashMap<>();
for (LiveInstance liveInstance : liveInstances) {
curInstances.put(liveInstance.getInstanceName(), liveInstance);
curSessions.put(liveInstance.getEphemeralOwner(), liveInstance);
}
// TODO: remove the synchronization here once we move this update into dataCache.
synchronized (_lastSeenInstances) {
Map<String, LiveInstance> lastInstances = _lastSeenInstances.get();
Map<String, LiveInstance> lastSessions = _lastSeenSessions.get();
HelixManager manager = changeContext.getManager();
Builder keyBuilder = new Builder(manager.getClusterName());
if (lastSessions != null) {
for (String session : lastSessions.keySet()) {
if (!curSessions.containsKey(session)) {
// remove current-state listener for expired session
String instanceName = lastSessions.get(session).getInstanceName();
manager.removeListener(keyBuilder.currentStates(instanceName, session), this);
}
}
}
if (lastInstances != null) {
for (String instance : lastInstances.keySet()) {
if (!curInstances.containsKey(instance)) {
// remove message listener for disconnected instances
manager.removeListener(keyBuilder.messages(instance), this);
}
}
}
for (String session : curSessions.keySet()) {
if (lastSessions == null || !lastSessions.containsKey(session)) {
String instanceName = curSessions.get(session).getInstanceName();
try {
// add current-state listeners for new sessions
manager.addCurrentStateChangeListener(this, instanceName, session);
logger.info(manager.getInstanceName() + " added current-state listener for instance: "
+ instanceName + ", session: " + session + ", listener: " + this);
} catch (Exception e) {
logger.error("Fail to add current state listener for instance: " + instanceName
+ " with session: " + session, e);
}
}
}
for (String instance : curInstances.keySet()) {
if (lastInstances == null || !lastInstances.containsKey(instance)) {
try {
// add message listeners for new instances
manager.addMessageListener(this, instance);
logger.info(manager.getInstanceName() + " added message listener for " + instance
+ ", listener: " + this);
} catch (Exception e) {
logger.error("Fail to add message listener for instance: " + instance, e);
}
}
}
// update last-seen
_lastSeenInstances.set(curInstances);
_lastSeenSessions.set(curSessions);
}
}
public void shutdown() throws InterruptedException {
stopPeriodRebalance();
logger.info("Shutting down {} pipeline", Pipeline.Type.DEFAULT.name());
shutdownPipeline(_eventThread, _eventQueue);
logger.info("Shutting down {} pipeline", Pipeline.Type.TASK.name());
shutdownPipeline(_taskEventThread, _taskEventQueue);
// shutdown asycTasksThreadpool and wait for terminate.
_asyncTasksThreadPool.shutdownNow();
try {
_asyncTasksThreadPool.awaitTermination(EVENT_THREAD_JOIN_TIMEOUT, TimeUnit.MILLISECONDS);
} catch (InterruptedException ex) {
logger.warn("Timeout when terminating async tasks. Some async tasks are still executing.");
}
// shutdown async workers
shutdownAsyncFIFOWorkers();
enableClusterStatusMonitor(false);
_rebalancerRef.closeRebalancer();
// TODO controller shouldn't be used in anyway after shutdown.
// Need to record shutdown and throw Exception if the controller is used again.
}
private void enableClusterStatusMonitor(boolean enable) {
synchronized (_clusterStatusMonitor) {
if (_isMonitoring != enable) {
// monitoring state changed
if (enable) {
logger.info("Enable clusterStatusMonitor for cluster " + _clusterName);
// Clear old cached monitoring related data to avoid reporting stats cross different
// leadership periods
if (_resourceControlDataProvider != null) {
_resourceControlDataProvider.clearMonitoringRecords();
}
_clusterStatusMonitor.active();
} else {
logger.info("Disable clusterStatusMonitor for cluster " + _clusterName);
// Reset will be done if (_isMonitoring = false) later, no matter if the state is changed or not.
}
_isMonitoring = enable;
}
// Due to multithreads processing, async thread may write to monitor even it is closed.
// So when it is disabled, always try to clear the monitor.
resetClusterStatusMonitor();
}
}
private void resetClusterStatusMonitor() {
synchronized (_clusterStatusMonitor) {
if (!_isMonitoring) {
_clusterStatusMonitor.reset();
}
}
}
private void shutdownPipeline(Thread thread, ClusterEventBlockingQueue queue)
throws InterruptedException {
if (queue != null) {
queue.clear();
}
if (thread != null) {
while (thread.isAlive()) {
thread.interrupt();
thread.join(EVENT_THREAD_JOIN_TIMEOUT);
}
}
}
private boolean updateControllerState(NotificationContext changeContext, PauseSignal signal,
boolean statusFlag) {
if (signal != null) {
// This logic is used for recording first time entering PAUSE/MAINTENCE mode
if (!statusFlag) {
statusFlag = true;
logger.info(String.format("controller is now %s",
(signal instanceof MaintenanceSignal) ? "in maintenance mode" : "paused"));
}
} else {
if (statusFlag) {
statusFlag = false;
logger.info("controller is now resumed from paused state");
String uid = UUID.randomUUID().toString().substring(0, 8);
ClusterEvent event = new ClusterEvent(_clusterName, ClusterEventType.Resume,
String.format("%s_%s", uid, Pipeline.Type.DEFAULT.name()));
event.addAttribute(AttributeName.changeContext.name(), changeContext);
event.addAttribute(AttributeName.helixmanager.name(), changeContext.getManager());
event.addAttribute(AttributeName.AsyncFIFOWorkerPool.name(), _asyncFIFOWorkerPool);
enqueueEvent(_eventQueue, event);
enqueueEvent(_taskEventQueue,
event.clone(String.format("%s_%s", uid, Pipeline.Type.TASK.name())));
}
}
return statusFlag;
}
// TODO: refactor this to use common/ClusterEventProcessor.
@Deprecated
private class ClusterEventProcessor extends Thread {
private final BaseControllerDataProvider _cache;
private final ClusterEventBlockingQueue _eventBlockingQueue;
private final String _processorName;
ClusterEventProcessor(BaseControllerDataProvider cache,
ClusterEventBlockingQueue eventBlockingQueue, String processorName) {
_cache = cache;
_eventBlockingQueue = eventBlockingQueue;
_processorName = processorName;
}
@Override
public void run() {
logger.info(
"START ClusterEventProcessor thread for cluster " + _clusterName + ", processor name: "
+ _processorName);
while (!isInterrupted()) {
try {
ClusterEvent newClusterEvent = _eventBlockingQueue.take();
String threadName = String.format(
"HelixController-pipeline-%s-(%s)", _processorName, newClusterEvent.getEventId());
this.setName(threadName);
handleEvent(newClusterEvent, _cache);
} catch (InterruptedException e) {
logger.warn("ClusterEventProcessor interrupted " + _processorName, e);
interrupt();
} catch (ZkInterruptedException e) {
logger
.warn("ClusterEventProcessor caught a ZK connection interrupt " + _processorName, e);
interrupt();
} catch (ThreadDeath death) {
logger.error("ClusterEventProcessor caught a ThreadDeath " + _processorName, death);
throw death;
} catch (Throwable t) {
logger.error("ClusterEventProcessor failed while running the controller pipeline "
+ _processorName, t);
}
}
logger.info("END ClusterEventProcessor thread " + _processorName);
}
}
private void initPipeline(Thread eventThread, BaseControllerDataProvider cache) {
if (eventThread == null || cache == null) {
logger.warn("pipeline cannot be initialized");
return;
}
cache.setAsyncTasksThreadPool(_asyncTasksThreadPool);
eventThread.setDaemon(true);
eventThread.start();
}
/**
* A wrapper class for the stateful rebalancer instance that will be tracked in the
* GenericHelixController.
*/
private abstract class StatefulRebalancerRef<T extends StatefulRebalancer> {
private T _rebalancer = null;
private boolean _isRebalancerValid = true;
/**
* @param helixManager
* @return A new stateful rebalancer instance with initial state.
*/
protected abstract T createRebalancer(HelixManager helixManager);
/**
* Mark the current rebalancer object to be invalid, which indicates it needs to be reset before
* the next usage.
*/
synchronized void invalidateRebalancer() {
_isRebalancerValid = false;
}
/**
* @return A valid rebalancer object.
* If the rebalancer is no longer valid, it will be reset before returning.
* TODO: Make rebalancer volatile or make it singleton, if this method is called in multiple
* TODO: threads outside the controller object.
*/
synchronized T getRebalancer(HelixManager helixManager) {
// Lazily initialize the stateful rebalancer instance since the GenericHelixController
// instance is instantiated without the HelixManager information that is required.
if (_rebalancer == null) {
_rebalancer = createRebalancer(helixManager);
_isRebalancerValid = true;
}
// If the rebalance exists but has been marked as invalid (due to leadership switch), it needs
// to be reset before return.
if (!_isRebalancerValid) {
_rebalancer.reset();
_isRebalancerValid = true;
}
return _rebalancer;
}
/**
* Proactively close the rebalance object to release the resources.
*/
synchronized void closeRebalancer() {
if (_rebalancer != null) {
_rebalancer.close();
_rebalancer = null;
}
}
}
}