blob: 54e0f56f5d90faf94049be6d12456a3474e38fdb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cluster;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteCompute;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.BaselineNode;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterGroupAdapter;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.ExchangeActions;
import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.GridChangeGlobalStateMessageResponse;
import org.apache.ignite.internal.processors.cache.StateChangeRequest;
import org.apache.ignite.internal.processors.cache.StoredCacheData;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageLifecycleListener;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadOnlyMetastorage;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadWriteMetastorage;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.jdk.JdkMarshaller;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.spi.IgniteNodeValidationResult;
import org.apache.ignite.spi.discovery.DiscoveryDataBag;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.STATE_PROC;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
/**
*
*/
public class GridClusterStateProcessor extends GridProcessorAdapter implements IGridClusterStateProcessor, MetastorageLifecycleListener {
/** */
private static final String METASTORE_CURR_BLT_KEY = "metastoreBltKey";
/** */
private boolean inMemoryMode;
/**
* Compatibility mode flag. When node detects it runs in heterogeneous cluster (nodes of different versions),
* it should skip baseline topology operations.
*/
private volatile boolean compatibilityMode;
/** */
private volatile DiscoveryDataClusterState globalState;
/** */
private final BaselineTopologyHistory bltHist = new BaselineTopologyHistory();
/** Local action future. */
private final AtomicReference<GridChangeGlobalStateFuture> stateChangeFut = new AtomicReference<>();
/** */
private final ConcurrentMap<UUID, GridFutureAdapter<Void>> transitionFuts = new ConcurrentHashMap<>();
/** Future initialized if node joins when cluster state change is in progress. */
private TransitionOnJoinWaitFuture joinFut;
/** Process. */
@GridToStringExclude
private GridCacheProcessor cacheProc;
/** Shared context. */
@GridToStringExclude
private GridCacheSharedContext<?, ?> sharedCtx;
/** Fully initialized metastorage. */
@GridToStringExclude
private ReadWriteMetastorage metastorage;
/** */
private final JdkMarshaller marsh = new JdkMarshaller();
/** Listener. */
private final GridLocalEventListener lsr = new GridLocalEventListener() {
@Override public void onEvent(Event evt) {
assert evt != null;
final DiscoveryEvent e = (DiscoveryEvent)evt;
assert e.type() == EVT_NODE_LEFT || e.type() == EVT_NODE_FAILED : this;
final GridChangeGlobalStateFuture f = stateChangeFut.get();
if (f != null) {
f.initFut.listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> fut) {
f.onNodeLeft(e);
}
});
}
}
};
/**
* @param ctx Kernal context.
*/
public GridClusterStateProcessor(GridKernalContext ctx) {
super(ctx);
ctx.internalSubscriptionProcessor().registerMetastorageListener(this);
}
/**
* @return {@code True} if {@link IGridClusterStateProcessor} has detected that cluster is working
* in compatibility mode (nodes of different versions are joined to the cluster).
*/
public boolean compatibilityMode() {
return compatibilityMode;
}
/** {@inheritDoc} */
@Override public boolean publicApiActiveState(boolean waitForTransition) {
if (ctx.isDaemon())
return sendComputeCheckGlobalState();
DiscoveryDataClusterState globalState = this.globalState;
assert globalState != null;
if (globalState.transition()) {
Boolean transitionRes = globalState.transitionResult();
if (transitionRes != null)
return transitionRes;
else {
if (waitForTransition) {
GridFutureAdapter<Void> fut = transitionFuts.get(globalState.transitionRequestId());
if (fut != null) {
try {
fut.get();
}
catch (IgniteCheckedException ex) {
throw new IgniteException(ex);
}
}
transitionRes = globalState.transitionResult();
assert transitionRes != null;
return transitionRes;
}
else
return false;
}
}
else
return globalState.active();
}
/** {@inheritDoc} */
@Override public void onReadyForRead(ReadOnlyMetastorage metastorage) throws IgniteCheckedException {
BaselineTopology blt = (BaselineTopology) metastorage.read(METASTORE_CURR_BLT_KEY);
if (blt != null) {
if (log.isInfoEnabled())
U.log(log, "Restoring history for BaselineTopology[id=" + blt.id() + "]");
bltHist.restoreHistory(metastorage, blt.id());
}
onStateRestored(blt);
}
/** {@inheritDoc} */
@Override public void onReadyForReadWrite(ReadWriteMetastorage metastorage) throws IgniteCheckedException {
this.metastorage = metastorage;
if (compatibilityMode) {
if (log.isInfoEnabled())
log.info("BaselineTopology won't be stored as this node is running in compatibility mode");
return;
}
writeBaselineTopology(globalState.baselineTopology(), null);
bltHist.flushHistoryItems(metastorage);
}
/**
* Resets branching history on current BaselineTopology.
*
* @throws IgniteCheckedException If write to metastore has failed.
*/
public void resetBranchingHistory(long newBranchingHash) throws IgniteCheckedException {
globalState.baselineTopology().resetBranchingHistory(newBranchingHash);
writeBaselineTopology(globalState.baselineTopology(), null);
U.log(log,
String.format("Branching history of current BaselineTopology is reset to the value %d", newBranchingHash));
}
/**
* @param blt Blt.
*/
private void writeBaselineTopology(BaselineTopology blt, BaselineTopologyHistoryItem prevBltHistItem) throws IgniteCheckedException {
assert metastorage != null;
sharedCtx.database().checkpointReadLock();
try {
if (blt != null) {
if (log.isInfoEnabled()) {
U.log(log, "Writing BaselineTopology[id=" + blt.id() + "]");
if (prevBltHistItem != null)
U.log(log, "Writing BaselineTopologyHistoryItem[id=" + prevBltHistItem.id() + "]");
}
bltHist.writeHistoryItem(metastorage, prevBltHistItem);
metastorage.write(METASTORE_CURR_BLT_KEY, blt);
}
else {
if (log.isInfoEnabled())
U.log(log, "Removing BaselineTopology and history");
metastorage.remove(METASTORE_CURR_BLT_KEY);
bltHist.removeHistory(metastorage);
}
}
finally {
sharedCtx.database().checkpointReadUnlock();
}
}
/** {@inheritDoc} */
@Override public void start() throws IgniteCheckedException {
inMemoryMode = !CU.isPersistenceEnabled(ctx.config());
// Start first node as inactive if persistence is enabled.
boolean activeOnStart = inMemoryMode && ctx.config().isActiveOnStart();
globalState = DiscoveryDataClusterState.createState(activeOnStart, null);
ctx.event().addLocalEventListener(lsr, EVT_NODE_LEFT, EVT_NODE_FAILED);
}
/** {@inheritDoc} */
@Override public void onKernalStop(boolean cancel) {
GridChangeGlobalStateFuture fut = this.stateChangeFut.get();
if (fut != null)
fut.onDone(new IgniteCheckedException("Failed to wait for cluster state change, node is stopping."));
super.onKernalStop(cancel);
}
/** {@inheritDoc} */
@Override @Nullable public IgniteInternalFuture<Boolean> onLocalJoin(DiscoCache discoCache) {
final DiscoveryDataClusterState state = globalState;
if (state.transition()) {
joinFut = new TransitionOnJoinWaitFuture(state, discoCache);
return joinFut;
}
else if (!ctx.clientNode()
&& !ctx.isDaemon()
&& ctx.config().isAutoActivationEnabled()
&& !state.active()
&& isBaselineSatisfied(state.baselineTopology(), discoCache.serverNodes()))
changeGlobalState0(true, state.baselineTopology(), false);
return null;
}
/**
* Checks whether all conditions to meet BaselineTopology are satisfied.
*/
private boolean isBaselineSatisfied(BaselineTopology blt, List<ClusterNode> serverNodes) {
if (blt == null)
return false;
if (blt.consistentIds() == null)
return false;
if (//only node participating in BaselineTopology is allowed to send activation command...
blt.consistentIds().contains(ctx.discovery().localNode().consistentId())
//...and with this node BaselineTopology is reached
&& blt.isSatisfied(serverNodes))
return true;
return false;
}
/** {@inheritDoc} */
@Override @Nullable public ChangeGlobalStateFinishMessage onNodeLeft(ClusterNode node) {
if (globalState.transition()) {
Set<UUID> nodes = globalState.transitionNodes();
if (nodes.remove(node.id()) && nodes.isEmpty()) {
U.warn(log, "Failed to change cluster state, all participating nodes failed. " +
"Switching to inactive state.");
ChangeGlobalStateFinishMessage msg =
new ChangeGlobalStateFinishMessage(globalState.transitionRequestId(), false, false);
onStateFinishMessage(msg);
return msg;
}
}
return null;
}
/** {@inheritDoc} */
@Override public void onStateFinishMessage(ChangeGlobalStateFinishMessage msg) {
DiscoveryDataClusterState state = globalState;
if (msg.requestId().equals(state.transitionRequestId())) {
log.info("Received state change finish message: " + msg.clusterActive());
globalState = globalState.finish(msg.success());
afterStateChangeFinished(msg.id(), msg.success());
ctx.cache().onStateChangeFinish(msg);
TransitionOnJoinWaitFuture joinFut = this.joinFut;
if (joinFut != null)
joinFut.onDone(false);
GridFutureAdapter<Void> transitionFut = transitionFuts.remove(state.transitionRequestId());
if (transitionFut != null) {
state.setTransitionResult(msg.requestId(), msg.clusterActive());
transitionFut.onDone();
}
}
else
U.warn(log, "Received state finish message with unexpected ID: " + msg);
}
/** */
protected void afterStateChangeFinished(IgniteUuid msgId, boolean success) {
// no-op
}
/** {@inheritDoc} */
@Override public boolean onStateChangeMessage(
AffinityTopologyVersion topVer,
ChangeGlobalStateMessage msg,
DiscoCache discoCache
) {
DiscoveryDataClusterState state = globalState;
if (log.isInfoEnabled())
U.log(log, "Received " + prettyStr(msg.activate()) + " request with BaselineTopology" +
(msg.baselineTopology() == null ? ": null"
: "[id=" + msg.baselineTopology().id() + "]"));
if (state.transition()) {
if (isApplicable(msg, state)) {
GridChangeGlobalStateFuture fut = changeStateFuture(msg);
if (fut != null)
fut.onDone(concurrentStateChangeError(msg.activate()));
}
else {
final GridChangeGlobalStateFuture stateFut = changeStateFuture(msg);
GridFutureAdapter<Void> transitionFut = transitionFuts.get(state.transitionRequestId());
if (stateFut != null && transitionFut != null) {
transitionFut.listen(new IgniteInClosure<IgniteInternalFuture<Void>>() {
@Override public void apply(IgniteInternalFuture<Void> fut) {
try {
fut.get();
stateFut.onDone();
}
catch (Exception ex) {
stateFut.onDone(ex);
}
}
});
}
}
}
else {
if (isApplicable(msg, state)) {
ExchangeActions exchangeActions;
try {
exchangeActions = ctx.cache().onStateChangeRequest(msg, topVer, state);
}
catch (IgniteCheckedException e) {
GridChangeGlobalStateFuture fut = changeStateFuture(msg);
if (fut != null)
fut.onDone(e);
return false;
}
Set<UUID> nodeIds = U.newHashSet(discoCache.allNodes().size());
for (ClusterNode node : discoCache.allNodes())
nodeIds.add(node.id());
GridChangeGlobalStateFuture fut = changeStateFuture(msg);
if (fut != null)
fut.setRemaining(nodeIds, topVer.nextMinorVersion());
if (log.isInfoEnabled())
log.info("Started state transition: " + msg.activate());
BaselineTopologyHistoryItem bltHistItem = BaselineTopologyHistoryItem.fromBaseline(
globalState.baselineTopology());
transitionFuts.put(msg.requestId(), new GridFutureAdapter<Void>());
globalState = DiscoveryDataClusterState.createTransitionState(
globalState,
msg.activate(),
msg.activate() ? msg.baselineTopology() : globalState.baselineTopology(),
msg.requestId(),
topVer,
nodeIds);
if (msg.forceChangeBaselineTopology())
globalState.setTransitionResult(msg.requestId(), msg.activate());
AffinityTopologyVersion stateChangeTopVer = topVer.nextMinorVersion();
StateChangeRequest req = new StateChangeRequest(msg, bltHistItem, msg.activate() != state.active(), stateChangeTopVer);
exchangeActions.stateChangeRequest(req);
msg.exchangeActions(exchangeActions);
return true;
}
else {
// State already changed.
GridChangeGlobalStateFuture stateFut = changeStateFuture(msg);
if (stateFut != null)
stateFut.onDone();
}
}
return false;
}
/**
* @param msg State change message.
* @param state Current cluster state.
* @return {@code True} if state change from message can be applied to the current state.
*/
protected boolean isApplicable(ChangeGlobalStateMessage msg, DiscoveryDataClusterState state) {
return !isEquivalent(msg, state);
}
/**
* @param msg State change message.
* @param state Current cluster state.
* @return {@code True} if states are equivalent.
*/
protected static boolean isEquivalent(ChangeGlobalStateMessage msg, DiscoveryDataClusterState state) {
return (msg.activate() == state.active() && BaselineTopology.equals(msg.baselineTopology(), state.baselineTopology()));
}
/** {@inheritDoc} */
@Override public DiscoveryDataClusterState clusterState() {
return globalState;
}
/** {@inheritDoc} */
@Override public DiscoveryDataClusterState pendingState(ChangeGlobalStateMessage stateMsg) {
return DiscoveryDataClusterState.createState(stateMsg.activate() || stateMsg.forceChangeBaselineTopology(),
stateMsg.baselineTopology());
}
/**
* @param msg State change message.
* @return Local future for state change process.
*/
@Nullable private GridChangeGlobalStateFuture changeStateFuture(ChangeGlobalStateMessage msg) {
return changeStateFuture(msg.initiatorNodeId(), msg.requestId());
}
/**
* @param initiatorNode Node initiated state change process.
* @param reqId State change request ID.
* @return Local future for state change process.
*/
@Nullable private GridChangeGlobalStateFuture changeStateFuture(UUID initiatorNode, UUID reqId) {
assert initiatorNode != null;
assert reqId != null;
if (initiatorNode.equals(ctx.localNodeId())) {
GridChangeGlobalStateFuture fut = stateChangeFut.get();
if (fut != null && fut.requestId.equals(reqId))
return fut;
}
return null;
}
/**
* @param activate New state.
* @return State change error.
*/
protected IgniteCheckedException concurrentStateChangeError(boolean activate) {
return new IgniteCheckedException("Failed to " + prettyStr(activate) +
", because another state change operation is currently in progress: " + prettyStr(!activate));
}
/** {@inheritDoc} */
@Override public void cacheProcessorStarted() {
cacheProc = ctx.cache();
sharedCtx = cacheProc.context();
sharedCtx.io().addCacheHandler(
0, GridChangeGlobalStateMessageResponse.class,
new CI2<UUID, GridChangeGlobalStateMessageResponse>() {
@Override public void apply(UUID nodeId, GridChangeGlobalStateMessageResponse msg) {
processChangeGlobalStateResponse(nodeId, msg);
}
});
}
/** {@inheritDoc} */
@Override public void stop(boolean cancel) throws IgniteCheckedException {
super.stop(cancel);
if (sharedCtx != null)
sharedCtx.io().removeHandler(false, 0, GridChangeGlobalStateMessageResponse.class);
ctx.event().removeLocalEventListener(lsr, EVT_NODE_LEFT, EVT_NODE_FAILED);
IgniteCheckedException stopErr = new IgniteCheckedException(
"Node is stopping: " + ctx.igniteInstanceName());
GridChangeGlobalStateFuture f = stateChangeFut.get();
if (f != null)
f.onDone(stopErr);
}
/** {@inheritDoc} */
@Nullable @Override public DiscoveryDataExchangeType discoveryDataType() {
return DiscoveryDataExchangeType.STATE_PROC;
}
/** {@inheritDoc} */
@Override public void collectJoiningNodeData(DiscoveryDataBag dataBag) {
try {
byte[] marshalledState = marsh.marshal(globalState);
dataBag.addJoiningNodeData(discoveryDataType().ordinal(), marshalledState);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
}
/** {@inheritDoc} */
@Override public void collectGridNodeData(DiscoveryDataBag dataBag) {
if (!dataBag.commonDataCollectedFor(STATE_PROC.ordinal())) {
DiscoveryDataBag.JoiningNodeDiscoveryData joiningNodeData = dataBag.newJoinerDiscoveryData(STATE_PROC.ordinal());
BaselineTopologyHistory historyToSend = null;
if (joiningNodeData != null) {
if (!joiningNodeData.hasJoiningNodeData()) {
//compatibility mode: old nodes don't send any data on join, so coordinator of new version
//doesn't send BaselineTopology history, only its current globalState
dataBag.addGridCommonData(STATE_PROC.ordinal(), globalState);
return;
}
DiscoveryDataClusterState joiningNodeState = null;
try {
if (joiningNodeData.joiningNodeData() != null)
joiningNodeState = marsh.unmarshal(
(byte[]) joiningNodeData.joiningNodeData(),
U.resolveClassLoader(ctx.config()));
} catch (IgniteCheckedException e) {
U.error(log, "Failed to unmarshal disco data from joining node: " + joiningNodeData.joiningNodeId());
return;
}
if (!bltHist.isEmpty()) {
if (joiningNodeState != null && joiningNodeState.baselineTopology() != null) {
int lastId = joiningNodeState.baselineTopology().id();
historyToSend = bltHist.tailFrom(lastId);
}
else
historyToSend = bltHist;
}
dataBag.addGridCommonData(STATE_PROC.ordinal(), new BaselineStateAndHistoryData(globalState, historyToSend));
}
}
}
/** {@inheritDoc} */
@Override public void onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) {
if (data.commonData() instanceof DiscoveryDataClusterState) {
if (globalState != null && globalState.baselineTopology() != null)
//node with BaselineTopology is not allowed to join mixed cluster
// (where some nodes don't support BaselineTopology)
throw new IgniteException("Node with BaselineTopology cannot join" +
" mixed cluster running in compatibility mode");
globalState = (DiscoveryDataClusterState) data.commonData();
compatibilityMode = true;
return;
}
BaselineStateAndHistoryData stateDiscoData = (BaselineStateAndHistoryData)data.commonData();
if (stateDiscoData != null) {
DiscoveryDataClusterState state = stateDiscoData.globalState;
if (state.transition())
transitionFuts.put(state.transitionRequestId(), new GridFutureAdapter<Void>());
globalState = state;
if (stateDiscoData.recentHistory != null) {
for (BaselineTopologyHistoryItem item : stateDiscoData.recentHistory.history())
bltHist.bufferHistoryItemForStore(item);
}
}
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<?> changeGlobalState(
final boolean activate,
Collection<? extends BaselineNode> baselineNodes,
boolean forceChangeBaselineTopology
) {
if (inMemoryMode)
return changeGlobalState0(activate, null, false);
BaselineTopology newBlt = compatibilityMode ? null :
calculateNewBaselineTopology(activate, baselineNodes, forceChangeBaselineTopology);
return changeGlobalState0(activate, newBlt, forceChangeBaselineTopology);
}
/**
*
*/
private BaselineTopology calculateNewBaselineTopology(final boolean activate,
Collection<? extends BaselineNode> baselineNodes,
boolean forceChangeBaselineTopology) {
BaselineTopology newBlt;
BaselineTopology currentBlt = globalState.baselineTopology();
int newBltId = 0;
if (currentBlt != null)
newBltId = activate ? currentBlt.id() + 1 : currentBlt.id();
if (baselineNodes != null && !baselineNodes.isEmpty()) {
List<BaselineNode> baselineNodes0 = new ArrayList<>();
for (BaselineNode node : baselineNodes) {
if (node instanceof ClusterNode) {
ClusterNode clusterNode = (ClusterNode) node;
if (!clusterNode.isClient() && !clusterNode.isDaemon())
baselineNodes0.add(node);
}
else
baselineNodes0.add(node);
}
baselineNodes = baselineNodes0;
}
if (forceChangeBaselineTopology)
newBlt = BaselineTopology.build(baselineNodes, newBltId);
else if (activate) {
if (baselineNodes == null)
baselineNodes = baselineNodes();
if (currentBlt == null)
newBlt = BaselineTopology.build(baselineNodes, newBltId);
else {
newBlt = currentBlt;
newBlt.updateHistory(baselineNodes);
}
}
else
newBlt = null;
return newBlt;
}
/** */
private Collection<BaselineNode> baselineNodes() {
List<ClusterNode> clNodes = ctx.discovery().serverNodes(AffinityTopologyVersion.NONE);
ArrayList<BaselineNode> bltNodes = new ArrayList<>(clNodes.size());
for (ClusterNode clNode : clNodes)
bltNodes.add(clNode);
return bltNodes;
}
/** */
private IgniteInternalFuture<?> changeGlobalState0(final boolean activate,
BaselineTopology blt, boolean forceChangeBaselineTopology) {
if (ctx.isDaemon() || ctx.clientNode()) {
GridFutureAdapter<Void> fut = new GridFutureAdapter<>();
sendComputeChangeGlobalState(activate, blt, forceChangeBaselineTopology, fut);
return fut;
}
if (cacheProc.transactions().tx() != null || sharedCtx.lockedTopologyVersion(null) != null) {
return new GridFinishedFuture<>(new IgniteCheckedException("Failed to " + prettyStr(activate) +
" cluster (must invoke the method outside of an active transaction)."));
}
DiscoveryDataClusterState curState = globalState;
if (!curState.transition() && curState.active() == activate && BaselineTopology.equals(curState.baselineTopology(), blt))
return new GridFinishedFuture<>();
GridChangeGlobalStateFuture startedFut = null;
GridChangeGlobalStateFuture fut = stateChangeFut.get();
while (fut == null || fut.isDone()) {
fut = new GridChangeGlobalStateFuture(UUID.randomUUID(), activate, ctx);
if (stateChangeFut.compareAndSet(null, fut)) {
startedFut = fut;
break;
}
else
fut = stateChangeFut.get();
}
if (startedFut == null) {
if (fut.activate != activate) {
return new GridFinishedFuture<>(new IgniteCheckedException("Failed to " + prettyStr(activate) +
", because another state change operation is currently in progress: " + prettyStr(fut.activate)));
}
else
return fut;
}
List<StoredCacheData> storedCfgs = null;
if (activate && CU.isPersistenceEnabled(ctx.config())) {
try {
Map<String, StoredCacheData> cfgs = ctx.cache().context().pageStore().readCacheConfigurations();
if (!F.isEmpty(cfgs))
storedCfgs = new ArrayList<>(cfgs.values());
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to read stored cache configurations: " + e, e);
startedFut.onDone(e);
return startedFut;
}
}
ChangeGlobalStateMessage msg = new ChangeGlobalStateMessage(startedFut.requestId,
ctx.localNodeId(),
storedCfgs,
activate,
blt,
forceChangeBaselineTopology,
System.currentTimeMillis());
try {
if (log.isInfoEnabled())
U.log(log, "Sending " + prettyStr(activate) + " request with BaselineTopology " + blt);
ctx.discovery().sendCustomEvent(msg);
if (ctx.isStopping())
startedFut.onDone(new IgniteCheckedException("Failed to execute " + prettyStr(activate) + " request, " +
"node is stopping."));
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to send global state change request: " + activate, e);
startedFut.onDone(e);
}
return wrapStateChangeFuture(startedFut, msg);
}
/** {@inheritDoc} */
@Nullable @Override public IgniteNodeValidationResult validateNode(ClusterNode node, DiscoveryDataBag.JoiningNodeDiscoveryData discoData) {
if (node.isClient() || node.isDaemon())
return null;
if (discoData.joiningNodeData() == null) {
if (globalState.baselineTopology() != null) {
String msg = "Node not supporting BaselineTopology" +
" is not allowed to join the cluster with BaselineTopology";
return new IgniteNodeValidationResult(node.id(), msg, msg);
}
return null;
}
DiscoveryDataClusterState joiningNodeState;
try {
joiningNodeState = marsh.unmarshal((byte[]) discoData.joiningNodeData(), Thread.currentThread().getContextClassLoader());
} catch (IgniteCheckedException e) {
String msg = "Error on unmarshalling discovery data " +
"from node " + node.consistentId() + ": " + e.getMessage() +
"; node is not allowed to join";
return new IgniteNodeValidationResult(node.id(), msg , msg);
}
if (joiningNodeState == null || joiningNodeState.baselineTopology() == null)
return null;
if (globalState == null || globalState.baselineTopology() == null) {
if (joiningNodeState != null && joiningNodeState.baselineTopology() != null) {
String msg = "Node with set up BaselineTopology is not allowed to join cluster without one: " + node.consistentId();
return new IgniteNodeValidationResult(node.id(), msg, msg);
}
}
BaselineTopology joiningNodeBlt = joiningNodeState.baselineTopology();
BaselineTopology clusterBlt = globalState.baselineTopology();
String recommendation = " Consider cleaning persistent storage of the node and adding it to the cluster again.";
if (joiningNodeBlt.id() > clusterBlt.id()) {
String msg = "BaselineTopology of joining node ("
+ node.consistentId()
+ ") is not compatible with BaselineTopology in the cluster."
+ " Joining node BlT id (" + joiningNodeBlt.id()
+ ") is greater than cluster BlT id (" + clusterBlt.id() + ")."
+ " New BaselineTopology was set on joining node with set-baseline command."
+ recommendation;
return new IgniteNodeValidationResult(node.id(), msg, msg);
}
if (joiningNodeBlt.id() == clusterBlt.id()) {
if (!clusterBlt.isCompatibleWith(joiningNodeBlt)) {
String msg = "BaselineTopology of joining node ("
+ node.consistentId()
+ " ) is not compatible with BaselineTopology in the cluster."
+ " Branching history of cluster BlT (" + clusterBlt.branchingHistory()
+ ") doesn't contain branching point hash of joining node BlT ("
+ joiningNodeBlt.branchingPointHash()
+ ")." + recommendation;
return new IgniteNodeValidationResult(node.id(), msg, msg);
}
}
else if (joiningNodeBlt.id() < clusterBlt.id()) {
if (!bltHist.isCompatibleWith(joiningNodeBlt)) {
String msg = "BaselineTopology of joining node ("
+ node.consistentId()
+ ") is not compatible with BaselineTopology in the cluster."
+ " BlT id of joining node (" + joiningNodeBlt.id()
+ ") less than BlT id of cluster (" + clusterBlt.id()
+ ") but cluster's BaselineHistory doesn't contain branching point hash of joining node BlT ("
+ joiningNodeBlt.branchingPointHash()
+ ")." + recommendation;
return new IgniteNodeValidationResult(node.id(), msg, msg);
}
}
return null;
}
/**
* @param fut Original state change future.
* @param msg State change message.
* @return Wrapped state change future.
*/
protected IgniteInternalFuture<?> wrapStateChangeFuture(IgniteInternalFuture fut, ChangeGlobalStateMessage msg) {
return fut;
}
/**
* @param activate New cluster state.
* @param resFut State change future.
*/
private void sendComputeChangeGlobalState(
boolean activate,
BaselineTopology blt,
boolean forceBlt,
final GridFutureAdapter<Void> resFut
) {
AffinityTopologyVersion topVer = ctx.discovery().topologyVersionEx();
if (log.isInfoEnabled()) {
log.info("Sending " + prettyStr(activate) + " request from node [id=" + ctx.localNodeId() +
", topVer=" + topVer +
", client=" + ctx.clientNode() +
", daemon=" + ctx.isDaemon() + "]");
}
IgniteCompute comp = ((ClusterGroupAdapter)ctx.cluster().get().forServers()).compute();
IgniteFuture<Void> fut = comp.runAsync(new ClientChangeGlobalStateComputeRequest(activate, blt, forceBlt));
fut.listen(new CI1<IgniteFuture>() {
@Override public void apply(IgniteFuture fut) {
try {
fut.get();
resFut.onDone();
}
catch (Exception e) {
resFut.onDone(e);
}
}
});
}
/**
* Check cluster state.
*
* @return Cluster state, {@code True} if cluster active, {@code False} if inactive.
*/
private boolean sendComputeCheckGlobalState() {
AffinityTopologyVersion topVer = ctx.discovery().topologyVersionEx();
if (log.isInfoEnabled()) {
log.info("Sending check cluster state request from node [id=" + ctx.localNodeId() +
", topVer=" + topVer +
", client=" + ctx.clientNode() +
", daemon" + ctx.isDaemon() + "]");
}
IgniteCompute comp = ((ClusterGroupAdapter)ctx.cluster().get().forServers()).compute();
return comp.call(new IgniteCallable<Boolean>() {
@IgniteInstanceResource
private Ignite ig;
@Override public Boolean call() throws Exception {
return ig.active();
}
});
}
/** {@inheritDoc} */
@Override public void onStateChangeError(Map<UUID, Exception> errs, StateChangeRequest req) {
assert !F.isEmpty(errs);
// Revert caches start if activation request fail.
if (req.activeChanged()) {
if (req.activate()) {
try {
cacheProc.onKernalStopCaches(true);
cacheProc.stopCaches(true);
sharedCtx.affinity().removeAllCacheInfo();
if (!ctx.clientNode())
sharedCtx.deactivate();
}
catch (Exception e) {
U.error(log, "Failed to revert activation request changes", e);
}
}
else {
//todo https://issues.apache.org/jira/browse/IGNITE-5480
}
}
GridChangeGlobalStateFuture fut = changeStateFuture(req.initiatorNodeId(), req.requestId());
if (fut != null) {
IgniteCheckedException e = new IgniteCheckedException(
"Failed to " + prettyStr(req.activate()) + " cluster",
null,
false
);
for (Map.Entry<UUID, Exception> entry : errs.entrySet())
e.addSuppressed(entry.getValue());
fut.onDone(e);
}
}
/**
* @param req State change request.
*/
private void onFinalActivate(final StateChangeRequest req) {
ctx.dataStructures().onBeforeActivate();
ctx.closure().runLocalSafe(new Runnable() {
@Override public void run() {
boolean client = ctx.clientNode();
Exception e = null;
try {
ctx.service().onUtilityCacheStarted();
ctx.service().onActivate(ctx);
ctx.dataStructures().onActivate(ctx);
ctx.igfs().onActivate(ctx);
ctx.task().onActivate(ctx);
if (log.isInfoEnabled())
log.info("Successfully performed final activation steps [nodeId="
+ ctx.localNodeId() + ", client=" + client + ", topVer=" + req.topologyVersion() + "]");
}
catch (Exception ex) {
throw new IgniteException(ex);
}
}
});
}
/** {@inheritDoc} */
@Override public void onStateChangeExchangeDone(StateChangeRequest req) {
try {
if (req.activeChanged()) {
if (req.activate())
onFinalActivate(req);
globalState.setTransitionResult(req.requestId(), req.activate());
}
sendChangeGlobalStateResponse(req.requestId(), req.initiatorNodeId(), null);
}
catch (Exception ex) {
Exception e = new IgniteCheckedException("Failed to perform final activation steps", ex);
U.error(log, "Failed to perform final activation steps [nodeId=" + ctx.localNodeId() +
", client=" + ctx.clientNode() + ", topVer=" + req.topologyVersion() + "]", ex);
sendChangeGlobalStateResponse(req.requestId(), req.initiatorNodeId(), e);
}
}
/** {@inheritDoc} */
@Override public void onBaselineTopologyChanged(BaselineTopology blt, BaselineTopologyHistoryItem prevBltHistItem) throws IgniteCheckedException {
if (compatibilityMode) {
if (log.isDebugEnabled())
log.info("BaselineTopology won't be stored as this node is running in compatibility mode");
return;
}
writeBaselineTopology(blt, prevBltHistItem);
}
/**
* @param reqId Request ID.
* @param initNodeId Initialize node id.
* @param ex Exception.
*/
private void sendChangeGlobalStateResponse(UUID reqId, UUID initNodeId, Exception ex) {
assert reqId != null;
assert initNodeId != null;
GridChangeGlobalStateMessageResponse res = new GridChangeGlobalStateMessageResponse(reqId, ex);
try {
if (log.isDebugEnabled())
log.debug("Sending global state change response [nodeId=" + ctx.localNodeId() +
", topVer=" + ctx.discovery().topologyVersionEx() + ", res=" + res + "]");
if (ctx.localNodeId().equals(initNodeId))
processChangeGlobalStateResponse(ctx.localNodeId(), res);
else
sharedCtx.io().send(initNodeId, res, SYSTEM_POOL);
}
catch (ClusterTopologyCheckedException e) {
if (log.isDebugEnabled()) {
log.debug("Failed to send change global state response, node left [node=" + initNodeId +
", res=" + res + ']');
}
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to send change global state response [node=" + initNodeId + ", res=" + res + ']', e);
}
}
/**
* @param nodeId Node ID.
* @param msg Message.
*/
private void processChangeGlobalStateResponse(final UUID nodeId, final GridChangeGlobalStateMessageResponse msg) {
assert nodeId != null;
assert msg != null;
if (log.isDebugEnabled()) {
log.debug("Received activation response [requestId=" + msg.getRequestId() +
", nodeId=" + nodeId + "]");
}
UUID requestId = msg.getRequestId();
final GridChangeGlobalStateFuture fut = stateChangeFut.get();
if (fut != null && requestId.equals(fut.requestId)) {
if (fut.initFut.isDone())
fut.onResponse(nodeId, msg);
else {
fut.initFut.listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> f) {
// initFut is completed from discovery thread, process response from other thread.
ctx.getSystemExecutorService().execute(new Runnable() {
@Override public void run() {
fut.onResponse(nodeId, msg);
}
});
}
});
}
}
}
/** */
private void onStateRestored(BaselineTopology blt) {
DiscoveryDataClusterState state = globalState;
if (!state.active() && !state.transition() && state.baselineTopology() == null) {
DiscoveryDataClusterState newState = DiscoveryDataClusterState.createState(false, blt);
globalState = newState;
}
}
/** {@inheritDoc} */
@Override public void onExchangeFinishedOnCoordinator(IgniteInternalFuture exchangeFuture, boolean hasMovingPartitions) {
// no-op
}
/** {@inheritDoc} */
@Override public boolean evictionsAllowed() {
return true;
}
/**
* @param activate Activate.
* @return Activate flag string.
*/
private static String prettyStr(boolean activate) {
return activate ? "activate" : "deactivate";
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridClusterStateProcessor.class, this);
}
/**
*
*/
private class GridChangeGlobalStateFuture extends GridFutureAdapter<Void> {
/** Request id. */
@GridToStringInclude
private final UUID requestId;
/** Activate. */
private final boolean activate;
/** Nodes. */
@GridToStringInclude
private final Set<UUID> remaining = new HashSet<>();
/** Responses. */
@GridToStringInclude
private final Map<UUID, GridChangeGlobalStateMessageResponse> responses = new HashMap<>();
/** Context. */
@GridToStringExclude
private final GridKernalContext ctx;
/** */
@GridToStringExclude
private final Object mux = new Object();
/** */
@GridToStringInclude
private final GridFutureAdapter<?> initFut = new GridFutureAdapter<>();
/** Grid logger. */
@GridToStringExclude
private final IgniteLogger log;
/**
* @param requestId State change request ID.
* @param activate New cluster state.
* @param ctx Context.
*/
GridChangeGlobalStateFuture(UUID requestId, boolean activate, GridKernalContext ctx) {
this.requestId = requestId;
this.activate = activate;
this.ctx = ctx;
log = ctx.log(getClass());
}
/**
* @param event Event.
*/
void onNodeLeft(DiscoveryEvent event) {
assert event != null;
if (isDone())
return;
boolean allReceived = false;
synchronized (mux) {
if (remaining.remove(event.eventNode().id()))
allReceived = remaining.isEmpty();
}
if (allReceived)
onAllReceived();
}
/**
* @param nodesIds Node IDs.
* @param topVer Current topology version.
*/
void setRemaining(Set<UUID> nodesIds, AffinityTopologyVersion topVer) {
if (log.isDebugEnabled()) {
log.debug("Setup remaining node [id=" + ctx.localNodeId() +
", client=" + ctx.clientNode() +
", topVer=" + topVer +
", nodes=" + nodesIds + "]");
}
synchronized (mux) {
remaining.addAll(nodesIds);
}
initFut.onDone();
}
/**
* @param nodeId Sender node ID.
* @param msg Activation message response.
*/
public void onResponse(UUID nodeId, GridChangeGlobalStateMessageResponse msg) {
assert msg != null;
if (isDone())
return;
boolean allReceived = false;
synchronized (mux) {
if (remaining.remove(nodeId))
allReceived = remaining.isEmpty();
responses.put(nodeId, msg);
}
if (allReceived)
onAllReceived();
}
/**
*
*/
private void onAllReceived() {
IgniteCheckedException e = new IgniteCheckedException();
boolean fail = false;
for (Map.Entry<UUID, GridChangeGlobalStateMessageResponse> entry : responses.entrySet()) {
GridChangeGlobalStateMessageResponse r = entry.getValue();
if (r.getError() != null) {
fail = true;
e.addSuppressed(r.getError());
}
}
if (fail)
onDone(e);
else
onDone();
}
/** {@inheritDoc} */
@Override public boolean onDone(@Nullable Void res, @Nullable Throwable err) {
if (super.onDone(res, err)) {
stateChangeFut.compareAndSet(this, null);
return true;
}
return false;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridChangeGlobalStateFuture.class, this);
}
}
/**
*
*/
private static class ClientChangeGlobalStateComputeRequest implements IgniteRunnable {
/** */
private static final long serialVersionUID = 0L;
/** */
private final boolean activate;
/** */
private final BaselineTopology baselineTopology;
/** */
private final boolean forceChangeBaselineTopology;
/** Ignite. */
@IgniteInstanceResource
private IgniteEx ig;
/**
* @param activate New cluster state.
*/
private ClientChangeGlobalStateComputeRequest(boolean activate, BaselineTopology blt, boolean forceBlt) {
this.activate = activate;
this.baselineTopology = blt;
this.forceChangeBaselineTopology = forceBlt;
}
/** {@inheritDoc} */
@Override public void run() {
try {
ig.context().state().changeGlobalState(
activate,
baselineTopology != null ? baselineTopology.currentBaseline() : null,
forceChangeBaselineTopology
).get();
}
catch (IgniteCheckedException ex) {
throw new IgniteException(ex);
}
}
}
/**
*
*/
private static class CheckGlobalStateComputeRequest implements IgniteCallable<Boolean> {
/** */
private static final long serialVersionUID = 0L;
/** Ignite. */
@IgniteInstanceResource
private Ignite ig;
@Override public Boolean call() throws Exception {
return ig.active();
}
}
/**
*
*/
class TransitionOnJoinWaitFuture extends GridFutureAdapter<Boolean> {
/** */
private DiscoveryDataClusterState transitionState;
/** */
private final Set<UUID> transitionNodes;
/**
* @param state Current state.
* @param discoCache Discovery data cache.
*/
TransitionOnJoinWaitFuture(DiscoveryDataClusterState state, DiscoCache discoCache) {
assert state.transition() : state;
transitionNodes = U.newHashSet(state.transitionNodes().size());
for (UUID nodeId : state.transitionNodes()) {
if (discoCache.node(nodeId) != null)
transitionNodes.add(nodeId);
}
}
/** {@inheritDoc} */
@Override public boolean onDone(@Nullable Boolean res, @Nullable Throwable err) {
if (super.onDone(res, err)) {
joinFut = null;
return true;
}
return false;
}
}
/** */
private static class BaselineStateAndHistoryData implements Serializable {
/** */
private static final long serialVersionUID = 0L;
/** */
private final DiscoveryDataClusterState globalState;
/** */
private final BaselineTopologyHistory recentHistory;
/** */
BaselineStateAndHistoryData(DiscoveryDataClusterState globalState, BaselineTopologyHistory recentHistory) {
this.globalState = globalState;
this.recentHistory = recentHistory;
}
}
}