blob: de930356726479df26f78460bc0b94b2adad6086 [file] [log] [blame]
package org.apache.helix.manager.zk;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Timer;
import java.util.concurrent.TimeUnit;
import org.I0Itec.zkclient.ZkConnection;
import org.apache.helix.BaseDataAccessor;
import org.apache.helix.ClusterMessagingService;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.ConfigChangeListener;
import org.apache.helix.AccessOption;
import org.apache.helix.ControllerChangeListener;
import org.apache.helix.CurrentStateChangeListener;
import org.apache.helix.ExternalViewChangeListener;
import org.apache.helix.HealthStateChangeListener;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixConstants.ChangeType;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixTimerTask;
import org.apache.helix.IdealStateChangeListener;
import org.apache.helix.InstanceConfigChangeListener;
import org.apache.helix.InstanceType;
import org.apache.helix.LiveInstanceChangeListener;
import org.apache.helix.LiveInstanceInfoProvider;
import org.apache.helix.MessageListener;
import org.apache.helix.PreConnectCallback;
import org.apache.helix.HelixManagerProperties;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.PropertyPathConfig;
import org.apache.helix.PropertyType;
import org.apache.helix.ScopedConfigChangeListener;
import org.apache.helix.ZNRecord;
import org.apache.helix.controller.restlet.ZKPropertyTransferServer;
import org.apache.helix.healthcheck.HealthStatsAggregationTask;
import org.apache.helix.healthcheck.ParticipantHealthReportCollector;
import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl;
import org.apache.helix.messaging.DefaultMessagingService;
import org.apache.helix.messaging.handling.MessageHandlerFactory;
import org.apache.helix.model.ConfigScope;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message.MessageType;
import org.apache.helix.model.builder.ConfigScopeBuilder;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.monitoring.ZKPathDataDumpTask;
import org.apache.helix.participant.DistClusterControllerElection;
import org.apache.helix.participant.HelixStateMachineEngine;
import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.participant.statemachine.ScheduledTaskStateModelFactory;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.log4j.Logger;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
public class ZKHelixManager implements HelixManager
{
private static Logger logger =
Logger.getLogger(ZKHelixManager.class);
private static final int RETRY_LIMIT = 3;
private static final int CONNECTIONTIMEOUT = 60 * 1000;
private final String _clusterName;
private final String _instanceName;
private final String _zkConnectString;
private static final int DEFAULT_SESSION_TIMEOUT = 30 * 1000;
private ZKHelixDataAccessor _helixAccessor;
private ConfigAccessor _configAccessor;
protected ZkClient _zkClient;
protected final List<CallbackHandler> _handlers = new ArrayList<CallbackHandler>();
private final ZkStateChangeListener _zkStateChangeListener;
private final InstanceType _instanceType;
volatile String _sessionId;
private Timer _timer;
private CallbackHandler _leaderElectionHandler;
private ParticipantHealthReportCollectorImpl _participantHealthCheckInfoCollector;
private final DefaultMessagingService _messagingService;
private ZKHelixAdmin _managementTool;
private final String _version;
private final HelixManagerProperties _properties;
private final StateMachineEngine _stateMachEngine;
private int _sessionTimeout;
private ZkHelixPropertyStore<ZNRecord> _helixPropertyStore;
private final List<HelixTimerTask> _controllerTimerTasks;
private BaseDataAccessor<ZNRecord> _baseDataAccessor;
List<PreConnectCallback> _preConnectCallbacks =
new LinkedList<PreConnectCallback>();
ZKPropertyTransferServer _transferServer = null;
int _flappingTimeWindowMs;
int _maxDisconnectThreshold;
public static final int FLAPPING_TIME_WINDIOW = 300000; // Default to 300 sec
public static final int MAX_DISCONNECT_THRESHOLD = 5;
LiveInstanceInfoProvider _liveInstanceInfoProvider = null;
public static final String ALLOW_PARTICIPANT_AUTO_JOIN = "allowParticipantAutoJoin";
public ZKHelixManager(String clusterName,
String instanceName,
InstanceType instanceType,
String zkConnectString)
{
logger.info("Create a zk-based cluster manager. clusterName:" + clusterName
+ ", instanceName:" + instanceName + ", type:" + instanceType + ", zkSvr:"
+ zkConnectString);
_flappingTimeWindowMs = FLAPPING_TIME_WINDIOW;
try
{
_flappingTimeWindowMs =
Integer.parseInt(System.getProperty("helixmanager.flappingTimeWindow", ""
+ FLAPPING_TIME_WINDIOW));
}
catch (NumberFormatException e)
{
logger.warn("Exception while parsing helixmanager.flappingTimeWindow: "
+ System.getProperty("helixmanager.flappingTimeWindow", "" + FLAPPING_TIME_WINDIOW));
}
_maxDisconnectThreshold = MAX_DISCONNECT_THRESHOLD;
try
{
_maxDisconnectThreshold =
Integer.parseInt(System.getProperty("helixmanager.maxDisconnectThreshold", ""
+ MAX_DISCONNECT_THRESHOLD));
}
catch (NumberFormatException e)
{
logger.warn("Exception while parsing helixmanager.maxDisconnectThreshold: "
+ System.getProperty("helixmanager.maxDisconnectThreshold", "" + MAX_DISCONNECT_THRESHOLD));
}
int sessionTimeoutInt = -1;
try
{
sessionTimeoutInt =
Integer.parseInt(System.getProperty("zk.session.timeout", ""
+ DEFAULT_SESSION_TIMEOUT));
}
catch (NumberFormatException e)
{
logger.warn("Exception while parsing session timeout: "
+ System.getProperty("zk.session.timeout", "" + DEFAULT_SESSION_TIMEOUT));
}
if (sessionTimeoutInt > 0)
{
_sessionTimeout = sessionTimeoutInt;
}
else
{
_sessionTimeout = DEFAULT_SESSION_TIMEOUT;
}
if (instanceName == null)
{
try
{
instanceName =
InetAddress.getLocalHost().getCanonicalHostName() + "-"
+ instanceType.toString();
}
catch (UnknownHostException e)
{
// can ignore it
logger.info("Unable to get host name. Will set it to UNKNOWN, mostly ignorable",
e);
instanceName = "UNKNOWN";
}
}
_clusterName = clusterName;
_instanceName = instanceName;
_instanceType = instanceType;
_zkConnectString = zkConnectString;
_zkStateChangeListener = new ZkStateChangeListener(this, _flappingTimeWindowMs, _maxDisconnectThreshold);
_timer = null;
_messagingService = new DefaultMessagingService(this);
_properties =
new HelixManagerProperties("cluster-manager-version.properties");
_version = _properties.getVersion();
_stateMachEngine = new HelixStateMachineEngine(this);
// add all timer tasks
_controllerTimerTasks = new ArrayList<HelixTimerTask>();
if (_instanceType == InstanceType.CONTROLLER)
{
_controllerTimerTasks.add(new HealthStatsAggregationTask(this));
}
}
@Override
public boolean removeListener(PropertyKey key, Object listener)
{
logger.info("Removing listener: " + listener + " on path: " + key.getPath()
+ " from cluster: " + _clusterName + " by instance: " + _instanceName);
synchronized (this)
{
List<CallbackHandler> toRemove = new ArrayList<CallbackHandler>();
for (CallbackHandler handler : _handlers)
{
// compare property-key path and listener reference
if (handler.getPath().equals(key.getPath()) && handler.getListener().equals(listener))
{
toRemove.add(handler);
}
}
_handlers.removeAll(toRemove);
// handler.reset() may modify the handlers list, so do it outside the iteration
for (CallbackHandler handler : toRemove) {
handler.reset();
}
}
return true;
}
private void addListener(Object listener, PropertyKey propertyKey, ChangeType changeType, EventType[] eventType)
{
checkConnected();
PropertyType type = propertyKey.getType();
synchronized (this)
{
for (CallbackHandler handler : _handlers)
{
// compare property-key path and listener reference
if (handler.getPath().equals(propertyKey.getPath()) && handler.getListener().equals(listener))
{
logger.info("Listener: " + listener + " on path: " + propertyKey.getPath() + " already exists. skip adding it");
return;
}
}
CallbackHandler newHandler =
createCallBackHandler(propertyKey, listener, eventType, changeType);
_handlers.add(newHandler);
logger.info("Add listener: " + listener + " for type: " + type + " to path: " + newHandler.getPath());
}
}
@Override
public void addIdealStateChangeListener(final IdealStateChangeListener listener) throws Exception
{
addListener(listener, new Builder(_clusterName).idealStates(), ChangeType.IDEAL_STATE,
new EventType[] { EventType.NodeDataChanged, EventType.NodeDeleted, EventType.NodeCreated });
}
@Override
public void addLiveInstanceChangeListener(LiveInstanceChangeListener listener) throws Exception
{
addListener(listener, new Builder(_clusterName).liveInstances(), ChangeType.LIVE_INSTANCE,
new EventType[] { EventType.NodeDataChanged, EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated });
}
@Override
public void addConfigChangeListener(ConfigChangeListener listener)
{
addListener(listener, new Builder(_clusterName).instanceConfigs(), ChangeType.INSTANCE_CONFIG,
new EventType[] { EventType.NodeChildrenChanged });
}
@Override
public void addInstanceConfigChangeListener(InstanceConfigChangeListener listener)
{
addListener(listener, new Builder(_clusterName).instanceConfigs(), ChangeType.INSTANCE_CONFIG,
new EventType[] { EventType.NodeChildrenChanged });
}
@Override
public void addConfigChangeListener(ScopedConfigChangeListener listener, ConfigScopeProperty scope)
{
Builder keyBuilder = new Builder(_clusterName);
PropertyKey propertyKey = null;
switch(scope)
{
case CLUSTER:
propertyKey = keyBuilder.clusterConfigs();
break;
case PARTICIPANT:
propertyKey = keyBuilder.instanceConfigs();
break;
case RESOURCE:
propertyKey = keyBuilder.resourceConfigs();
break;
default:
break;
}
if (propertyKey != null)
{
addListener(listener, propertyKey, ChangeType.CONFIG,
new EventType[] { EventType.NodeChildrenChanged });
} else
{
logger.error("Can't add listener to config scope: " + scope);
}
}
// TODO: Decide if do we still need this since we are exposing
// ClusterMessagingService
@Override
public void addMessageListener(MessageListener listener, String instanceName)
{
addListener(listener, new Builder(_clusterName).messages(instanceName), ChangeType.MESSAGE,
new EventType[] { EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated });
}
void addControllerMessageListener(MessageListener listener)
{
addListener(listener, new Builder(_clusterName).controllerMessages(), ChangeType.MESSAGES_CONTROLLER,
new EventType[] { EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated });
}
@Override
public void addCurrentStateChangeListener(CurrentStateChangeListener listener,
String instanceName,
String sessionId)
{
addListener(listener, new Builder(_clusterName).currentStates(instanceName, sessionId), ChangeType.CURRENT_STATE,
new EventType[] { EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated });
}
@Override
public void addHealthStateChangeListener(HealthStateChangeListener listener,
String instanceName)
{
addListener(listener, new Builder(_clusterName).healthReports(instanceName), ChangeType.HEALTH,
new EventType[] { EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated });
}
@Override
public void addExternalViewChangeListener(ExternalViewChangeListener listener)
{
addListener(listener, new Builder(_clusterName).externalViews(), ChangeType.EXTERNAL_VIEW,
new EventType[] { EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated });
}
@Override
public void addControllerListener(ControllerChangeListener listener)
{
addListener(listener, new Builder(_clusterName).controller(), ChangeType.CONTROLLER,
new EventType[] { EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated });
}
@Override
public HelixDataAccessor getHelixDataAccessor()
{
checkConnected();
return _helixAccessor;
}
@Override
public ConfigAccessor getConfigAccessor()
{
checkConnected();
return _configAccessor;
}
@Override
public String getClusterName()
{
return _clusterName;
}
@Override
public String getInstanceName()
{
return _instanceName;
}
@Override
public void connect() throws Exception
{
logger.info("ClusterManager.connect()");
if (_zkStateChangeListener.isConnected())
{
logger.warn("Cluster manager " + _clusterName + " " + _instanceName
+ " already connected");
return;
}
try
{
createClient(_zkConnectString);
_messagingService.onConnected();
}
catch (Exception e)
{
logger.error(e);
disconnect();
throw e;
}
}
@Override
public void disconnect()
{
if (!isConnected())
{
logger.error("ClusterManager " + _instanceName + " already disconnected");
return;
}
disconnectInternal();
}
void disconnectInternal()
{
// This function can be called when the connection are in bad state(e.g. flapping),
// in which isConnected() could be false and we want to disconnect from cluster.
logger.info("disconnect " + _instanceName + "(" + _instanceType + ") from "
+ _clusterName);
/**
* shutdown thread pool first to avoid reset() being invoked in the middle of state
* transition
*/
_messagingService.getExecutor().shutdown();
resetHandlers();
_helixAccessor.shutdown();
if (_leaderElectionHandler != null)
{
_leaderElectionHandler.reset();
}
if (_participantHealthCheckInfoCollector != null)
{
_participantHealthCheckInfoCollector.stop();
}
if (_timer != null)
{
_timer.cancel();
_timer = null;
}
if (_instanceType == InstanceType.CONTROLLER)
{
stopTimerTasks();
}
// unsubscribe accessor from controllerChange
_zkClient.unsubscribeAll();
_zkClient.close();
// HACK seems that zkClient is not sending DISCONNECT event
_zkStateChangeListener.disconnect();
logger.info("Cluster manager: " + _instanceName + " disconnected");
}
@Override
public String getSessionId()
{
checkConnected();
return _sessionId;
}
@Override
public boolean isConnected()
{
return _zkStateChangeListener.isConnected();
}
@Override
public long getLastNotificationTime()
{
return -1;
}
private void addLiveInstance()
{
LiveInstance liveInstance = new LiveInstance(_instanceName);
liveInstance.setSessionId(_sessionId);
liveInstance.setHelixVersion(_version);
liveInstance.setLiveInstance(ManagementFactory.getRuntimeMXBean().getName());
if(_liveInstanceInfoProvider != null)
{
logger.info("invoking _liveInstanceInfoProvider");
ZNRecord additionalLiveInstanceInfo = _liveInstanceInfoProvider.getAdditionalLiveInstanceInfo();
if(additionalLiveInstanceInfo != null)
{
additionalLiveInstanceInfo.merge(liveInstance.getRecord());
ZNRecord mergedLiveInstance = new ZNRecord(additionalLiveInstanceInfo, _instanceName);
liveInstance = new LiveInstance(mergedLiveInstance);
logger.info("liveInstance content :" + _instanceName + " " + liveInstance.toString());
}
}
logger.info("Add live instance: InstanceName: " + _instanceName + " Session id:"
+ _sessionId);
Builder keyBuilder = _helixAccessor.keyBuilder();
if (!_helixAccessor.createProperty(keyBuilder.liveInstance(_instanceName),
liveInstance))
{
String errorMsg =
"Fail to create live instance node after waiting, so quit. instance:"
+ _instanceName;
logger.warn(errorMsg);
throw new HelixException(errorMsg);
}
String currentStatePathParent =
PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
_clusterName,
_instanceName,
getSessionId());
if (!_zkClient.exists(currentStatePathParent))
{
_zkClient.createPersistent(currentStatePathParent);
logger.info("Creating current state path " + currentStatePathParent);
}
}
private void startStatusUpdatedumpTask()
{
long initialDelay = 30 * 60 * 1000;
long period = 120 * 60 * 1000;
int timeThresholdNoChange = 180 * 60 * 1000;
if (_timer == null)
{
_timer = new Timer(true);
_timer.scheduleAtFixedRate(new ZKPathDataDumpTask(this,
_zkClient,
timeThresholdNoChange),
initialDelay,
period);
}
}
private void createClient(String zkServers) throws Exception
{
String propertyStorePath =
PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, _clusterName);
// by default use ZNRecordStreamingSerializer except for paths within the property
// store which expects raw byte[] serialization/deserialization
PathBasedZkSerializer zkSerializer =
ChainedPathZkSerializer.builder(new ZNRecordStreamingSerializer())
.serialize(propertyStorePath, new ByteArraySerializer())
.build();
_zkClient = new ZkClient(zkServers, _sessionTimeout, CONNECTIONTIMEOUT, zkSerializer);
ZkBaseDataAccessor<ZNRecord> baseDataAccessor =
new ZkBaseDataAccessor<ZNRecord>(_zkClient);
if (_instanceType == InstanceType.PARTICIPANT)
{
String curStatePath =
PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
_clusterName,
_instanceName);
_baseDataAccessor =
new ZkCacheBaseDataAccessor<ZNRecord>(baseDataAccessor,
Arrays.asList(curStatePath));
}
else if (_instanceType == InstanceType.CONTROLLER)
{
String extViewPath = PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW,
_clusterName);
_baseDataAccessor =
new ZkCacheBaseDataAccessor<ZNRecord>(baseDataAccessor,
Arrays.asList(extViewPath));
}
else
{
_baseDataAccessor = baseDataAccessor;
}
_helixAccessor =
new ZKHelixDataAccessor(_clusterName, _instanceType, _baseDataAccessor);
_configAccessor = new ConfigAccessor(_zkClient);
int retryCount = 0;
_zkClient.subscribeStateChanges(_zkStateChangeListener);
while (retryCount < RETRY_LIMIT)
{
try
{
_zkClient.waitUntilConnected(_sessionTimeout, TimeUnit.MILLISECONDS);
_zkStateChangeListener.handleStateChanged(KeeperState.SyncConnected);
_zkStateChangeListener.handleNewSession();
break;
}
catch (HelixException e)
{
logger.error("fail to createClient.", e);
throw e;
}
catch (Exception e)
{
retryCount++;
logger.error("fail to createClient. retry " + retryCount, e);
if (retryCount == RETRY_LIMIT)
{
throw e;
}
}
}
}
private CallbackHandler createCallBackHandler(PropertyKey propertyKey,
Object listener,
EventType[] eventTypes,
ChangeType changeType)
{
if (listener == null)
{
throw new HelixException("Listener cannot be null");
}
return new CallbackHandler(this, _zkClient, propertyKey, listener, eventTypes, changeType);
}
/**
* This will be invoked when ever a new session is created<br/>
*
* case 1: the cluster manager was a participant carry over current state, add live
* instance, and invoke message listener; case 2: the cluster manager was controller and
* was a leader before do leader election, and if it becomes leader again, invoke ideal
* state listener, current state listener, etc. if it fails to become leader in the new
* session, then becomes standby; case 3: the cluster manager was controller and was NOT
* a leader before do leader election, and if it becomes leader, instantiate and invoke
* ideal state listener, current state listener, etc. if if fails to become leader in
* the new session, stay as standby
*/
protected void handleNewSession()
{
boolean isConnected = _zkClient.waitUntilConnected(CONNECTIONTIMEOUT, TimeUnit.MILLISECONDS);
while (!isConnected)
{
logger.error("Could NOT connect to zk server in " + CONNECTIONTIMEOUT + "ms. zkServer: "
+ _zkConnectString + ", expiredSessionId: " + _sessionId + ", clusterName: "
+ _clusterName);
isConnected = _zkClient.waitUntilConnected(CONNECTIONTIMEOUT, TimeUnit.MILLISECONDS);
}
ZkConnection zkConnection = ((ZkConnection) _zkClient.getConnection());
synchronized (this)
{
_sessionId = Long.toHexString(zkConnection.getZookeeper().getSessionId());
}
_baseDataAccessor.reset();
// reset all handlers so they have a chance to unsubscribe zk changes from zkclient
// abandon all callback-handlers added in expired session
resetHandlers();
logger.info("Handling new session, session id:" + _sessionId + ", instance:"
+ _instanceName + ", instanceTye: " + _instanceType + ", cluster: " + _clusterName);
logger.info(zkConnection.getZookeeper());
if (!ZKUtil.isClusterSetup(_clusterName, _zkClient))
{
throw new HelixException("Initial cluster structure is not set up for cluster:"
+ _clusterName);
}
// Read cluster config and see if instance can auto join the cluster
boolean autoJoin = false;
try
{
HelixConfigScope scope =
new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER).forCluster(getClusterName())
.build();
autoJoin = Boolean.parseBoolean(getConfigAccessor().get(scope, ALLOW_PARTICIPANT_AUTO_JOIN));
logger.info("Auto joining " + _clusterName +" is true");
}
catch(Exception e)
{
}
if (!ZKUtil.isInstanceSetup(_zkClient, _clusterName, _instanceName, _instanceType))
{
if(!autoJoin)
{
throw new HelixException("Initial cluster structure is not set up for instance:"
+ _instanceName + " instanceType:" + _instanceType);
}
else
{
logger.info("Auto joining instance " + _instanceName);
InstanceConfig instanceConfig = new InstanceConfig(_instanceName);
String hostName = _instanceName;
String port = "";
int lastPos = _instanceName.lastIndexOf("_");
if (lastPos > 0)
{
hostName = _instanceName.substring(0, lastPos);
port = _instanceName.substring(lastPos + 1);
}
instanceConfig.setHostName(hostName);
instanceConfig.setPort(port);
instanceConfig.setInstanceEnabled(true);
getClusterManagmentTool().addInstance(_clusterName, instanceConfig);
}
}
if (_instanceType == InstanceType.PARTICIPANT
|| _instanceType == InstanceType.CONTROLLER_PARTICIPANT)
{
handleNewSessionAsParticipant();
}
if (_instanceType == InstanceType.CONTROLLER
|| _instanceType == InstanceType.CONTROLLER_PARTICIPANT)
{
addControllerMessageListener(_messagingService.getExecutor());
MessageHandlerFactory defaultControllerMsgHandlerFactory =
new DefaultControllerMessageHandlerFactory();
_messagingService.getExecutor()
.registerMessageHandlerFactory(defaultControllerMsgHandlerFactory.getMessageType(),
defaultControllerMsgHandlerFactory);
MessageHandlerFactory defaultSchedulerMsgHandlerFactory =
new DefaultSchedulerMessageHandlerFactory(this);
_messagingService.getExecutor()
.registerMessageHandlerFactory(defaultSchedulerMsgHandlerFactory.getMessageType(),
defaultSchedulerMsgHandlerFactory);
MessageHandlerFactory defaultParticipantErrorMessageHandlerFactory =
new DefaultParticipantErrorMessageHandlerFactory(this);
_messagingService.getExecutor()
.registerMessageHandlerFactory(defaultParticipantErrorMessageHandlerFactory.getMessageType(),
defaultParticipantErrorMessageHandlerFactory);
if (_leaderElectionHandler != null) {
_leaderElectionHandler.reset();
_leaderElectionHandler.init();
} else {
_leaderElectionHandler =
createCallBackHandler(new Builder(_clusterName).controller(),
new DistClusterControllerElection(_zkConnectString),
new EventType[] { EventType.NodeChildrenChanged,
EventType.NodeDeleted, EventType.NodeCreated },
ChangeType.CONTROLLER);
}
}
if (_instanceType == InstanceType.PARTICIPANT
|| _instanceType == InstanceType.CONTROLLER_PARTICIPANT
|| (_instanceType == InstanceType.CONTROLLER && isLeader()))
{
initHandlers();
}
}
private void handleNewSessionAsParticipant()
{
// In case there is a live instance record on zookeeper
Builder keyBuilder = _helixAccessor.keyBuilder();
if (_helixAccessor.getProperty(keyBuilder.liveInstance(_instanceName)) != null)
{
logger.warn("Found another instance with same instanceName: " + _instanceName
+ " in cluster " + _clusterName);
// Wait for a while, in case previous storage node exits unexpectedly
// and its liveinstance
// still hangs around until session timeout happens
try
{
Thread.sleep(_sessionTimeout + 5000);
}
catch (InterruptedException e)
{
logger.warn("Sleep interrupted while waiting for previous liveinstance to go away.",
e);
}
if (_helixAccessor.getProperty(keyBuilder.liveInstance(_instanceName)) != null)
{
String errorMessage =
"instance " + _instanceName + " already has a liveinstance in cluster "
+ _clusterName;
logger.error(errorMessage);
throw new HelixException(errorMessage);
}
}
// Invoke the PreConnectCallbacks
for (PreConnectCallback callback : _preConnectCallbacks)
{
callback.onPreConnect();
}
addLiveInstance();
carryOverPreviousCurrentState();
// In case the cluster manager is running as a participant, setup message
// listener
_messagingService.registerMessageHandlerFactory(MessageType.STATE_TRANSITION.toString(),
_stateMachEngine);
addMessageListener(_messagingService.getExecutor(), _instanceName);
addControllerListener(_helixAccessor);
ScheduledTaskStateModelFactory stStateModelFactory = new ScheduledTaskStateModelFactory(_messagingService.getExecutor());
_stateMachEngine.registerStateModelFactory(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, stStateModelFactory);
if (_participantHealthCheckInfoCollector == null)
{
_participantHealthCheckInfoCollector =
new ParticipantHealthReportCollectorImpl(this, _instanceName);
_participantHealthCheckInfoCollector.start();
}
// start the participant health check timer, also create zk path for health
// check info
String healthCheckInfoPath =
_helixAccessor.keyBuilder().healthReports(_instanceName).getPath();
if (!_zkClient.exists(healthCheckInfoPath))
{
_zkClient.createPersistent(healthCheckInfoPath, true);
logger.info("Creating healthcheck info path " + healthCheckInfoPath);
}
}
@Override
public void addPreConnectCallback(PreConnectCallback callback)
{
logger.info("Adding preconnect callback");
_preConnectCallbacks.add(callback);
}
private void resetHandlers()
{
synchronized (this)
{
if (_handlers != null)
{
// get a copy of the list and iterate over the copy list
// in case handler.reset() will modify the original handler list
List<CallbackHandler> tmpHandlers = new ArrayList<CallbackHandler>();
tmpHandlers.addAll(_handlers);
for (CallbackHandler handler : tmpHandlers)
{
handler.reset();
logger.info("reset handler: " + handler.getPath() + ", " + handler.getListener());
}
}
}
}
private void initHandlers()
{
synchronized (this)
{
if (_handlers != null)
{
// may add new currentState and message listeners during init()
// so make a copy and iterate over the copy
List<CallbackHandler> tmpHandlers = new ArrayList<CallbackHandler>();
tmpHandlers.addAll(_handlers);
for (CallbackHandler handler : tmpHandlers)
{
handler.init();
logger.info("init handler: " + handler.getPath() + ", " + handler.getListener());
}
}
}
}
@Override
public boolean isLeader()
{
if (!isConnected())
{
return false;
}
if (_instanceType != InstanceType.CONTROLLER)
{
return false;
}
Builder keyBuilder = _helixAccessor.keyBuilder();
LiveInstance leader = _helixAccessor.getProperty(keyBuilder.controllerLeader());
if (leader == null)
{
return false;
}
else
{
String leaderName = leader.getInstanceName();
// TODO need check sessionId also, but in distributed mode, leader's
// sessionId is
// not equal to
// the leader znode's sessionId field which is the sessionId of the
// controller_participant that
// successfully creates the leader node
if (leaderName == null || !leaderName.equals(_instanceName))
{
return false;
}
}
return true;
}
/**
* carry over current-states from last sessions
* set to initial state for current session only when the state doesn't exist in current session
*/
private void carryOverPreviousCurrentState()
{
Builder keyBuilder = _helixAccessor.keyBuilder();
List<String> sessions = _helixAccessor.getChildNames(keyBuilder.sessions(_instanceName));
// carry-over
for (String session : sessions) {
if (session.equals(_sessionId)) {
continue;
}
List<CurrentState> lastCurStates =
_helixAccessor.getChildValues(keyBuilder.currentStates(_instanceName, session));
for (CurrentState lastCurState : lastCurStates) {
logger.info("Carrying over old session: " + session + ", resource: "
+ lastCurState.getId() + " to current session: " + _sessionId);
String stateModelDefRef = lastCurState.getStateModelDefRef();
if (stateModelDefRef == null)
{
logger.error("skip carry-over because previous current state doesn't have a state model definition. previous current-state: "
+ lastCurState);
continue;
}
StateModelDefinition stateModel =
_helixAccessor.getProperty(keyBuilder.stateModelDef(stateModelDefRef));
String curStatePath = keyBuilder.currentState(_instanceName, _sessionId, lastCurState.getResourceName()).getPath();
_helixAccessor.getBaseDataAccessor().update(curStatePath,
new CurStateCarryOverUpdater(_sessionId, stateModel.getInitialState(), lastCurState), AccessOption.PERSISTENT);
}
}
// remove previous current states
for (String session : sessions)
{
if (session.equals(_sessionId)) {
continue;
}
String path =
_helixAccessor.keyBuilder()
.currentStates(_instanceName, session)
.getPath();
logger.info("Removing current states from previous sessions. path: " + path);
_zkClient.deleteRecursive(path);
}
}
@Override
public synchronized ZkHelixPropertyStore<ZNRecord> getHelixPropertyStore()
{
checkConnected();
if (_helixPropertyStore == null)
{
String path =
PropertyPathConfig.getPath(PropertyType.HELIX_PROPERTYSTORE, _clusterName);
_helixPropertyStore =
new ZkHelixPropertyStore<ZNRecord>(new ZkBaseDataAccessor<ZNRecord>(_zkClient),
path,
null);
}
return _helixPropertyStore;
}
@Override
public synchronized HelixAdmin getClusterManagmentTool()
{
checkConnected();
if (_zkClient != null)
{
_managementTool = new ZKHelixAdmin(_zkClient);
}
else
{
logger.error("Couldn't get ZKClusterManagementTool because zkClient is null");
}
return _managementTool;
}
@Override
public ClusterMessagingService getMessagingService()
{
// The caller can register message handler factories on messaging service before the
// helix manager is connected. Thus we do not do connected check here.
return _messagingService;
}
@Override
public ParticipantHealthReportCollector getHealthReportCollector()
{
checkConnected();
return _participantHealthCheckInfoCollector;
}
@Override
public InstanceType getInstanceType()
{
return _instanceType;
}
private void checkConnected()
{
if (!isConnected())
{
throw new HelixException("ClusterManager not connected. Call clusterManager.connect()");
}
}
@Override
public String getVersion()
{
return _version;
}
@Override
public HelixManagerProperties getProperties() {
return _properties;
}
@Override
public StateMachineEngine getStateMachineEngine()
{
return _stateMachEngine;
}
// TODO: rename this and not expose this function as part of interface
@Override
public void startTimerTasks()
{
for (HelixTimerTask task : _controllerTimerTasks)
{
task.start();
}
startStatusUpdatedumpTask();
}
@Override
public void stopTimerTasks()
{
for (HelixTimerTask task : _controllerTimerTasks)
{
task.stop();
}
}
@Override
public void setLiveInstanceInfoProvider(
LiveInstanceInfoProvider liveInstanceInfoProvider)
{
_liveInstanceInfoProvider = liveInstanceInfoProvider;
}
}