blob: dbc24580bcb625274816408a165a62ccd191e9eb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.cluster.management.raft;
import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.util.IgniteUtils.capacity;
import java.io.Serializable;
import java.nio.file.Path;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
import java.util.stream.Collectors;
import org.apache.ignite.internal.cluster.management.ClusterState;
import org.apache.ignite.internal.cluster.management.raft.commands.ClusterNodeMessage;
import org.apache.ignite.internal.cluster.management.raft.commands.InitCmgStateCommand;
import org.apache.ignite.internal.cluster.management.raft.commands.JoinReadyCommand;
import org.apache.ignite.internal.cluster.management.raft.commands.JoinRequestCommand;
import org.apache.ignite.internal.cluster.management.raft.commands.NodesLeaveCommand;
import org.apache.ignite.internal.cluster.management.raft.commands.ReadLogicalTopologyCommand;
import org.apache.ignite.internal.cluster.management.raft.commands.ReadStateCommand;
import org.apache.ignite.internal.cluster.management.raft.commands.ReadValidatedNodesCommand;
import org.apache.ignite.internal.cluster.management.raft.commands.UpdateClusterStateCommand;
import org.apache.ignite.internal.cluster.management.raft.responses.LogicalTopologyResponse;
import org.apache.ignite.internal.cluster.management.raft.responses.ValidationErrorResponse;
import org.apache.ignite.internal.cluster.management.topology.LogicalTopology;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.raft.ReadCommand;
import org.apache.ignite.internal.raft.WriteCommand;
import org.apache.ignite.internal.raft.service.CommandClosure;
import org.apache.ignite.internal.raft.service.RaftGroupListener;
import org.apache.ignite.network.ClusterNode;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
/**
* {@link RaftGroupListener} implementation for the CMG.
*/
public class CmgRaftGroupListener implements RaftGroupListener {
private static final IgniteLogger LOG = Loggers.forClass(CmgRaftGroupListener.class);
private final RaftStorageManager storage;
private final LogicalTopology logicalTopology;
private final ValidationManager validationManager;
private final LongConsumer onLogicalTopologyChanged;
/**
* Creates a new instance.
*
* @param storage Storage where this listener local data will be stored.
* @param logicalTopology Logical topology that will be updated by this listener.
* @param onLogicalTopologyChanged Callback invoked (with the corresponding RAFT term) when logical topology gets changed.
*/
public CmgRaftGroupListener(
ClusterStateStorage storage,
LogicalTopology logicalTopology,
LongConsumer onLogicalTopologyChanged
) {
this.storage = new RaftStorageManager(storage);
this.logicalTopology = logicalTopology;
this.onLogicalTopologyChanged = onLogicalTopologyChanged;
this.validationManager = new ValidationManager(this.storage, logicalTopology);
}
@Override
public void onRead(Iterator<CommandClosure<ReadCommand>> iterator) {
while (iterator.hasNext()) {
CommandClosure<ReadCommand> clo = iterator.next();
ReadCommand command = clo.command();
if (command instanceof ReadStateCommand) {
clo.result(storage.getClusterState());
} else if (command instanceof ReadLogicalTopologyCommand) {
clo.result(new LogicalTopologyResponse(logicalTopology.getLogicalTopology()));
} else if (command instanceof ReadValidatedNodesCommand) {
clo.result(getValidatedNodes());
}
}
}
private HashSet<LogicalNode> getValidatedNodes() {
Set<LogicalNode> validatedNodes = storage.getValidatedNodes();
Set<LogicalNode> logicalTopologyNodes = logicalTopology.getLogicalTopology().nodes();
var result = new HashSet<LogicalNode>(capacity(validatedNodes.size() + logicalTopologyNodes.size()));
result.addAll(validatedNodes);
result.addAll(logicalTopologyNodes);
return result;
}
@Override
public void onWrite(Iterator<CommandClosure<WriteCommand>> iterator) {
while (iterator.hasNext()) {
CommandClosure<WriteCommand> clo = iterator.next();
WriteCommand command = clo.command();
if (command instanceof InitCmgStateCommand) {
Serializable response = initCmgState((InitCmgStateCommand) command);
clo.result(response);
} else if (command instanceof UpdateClusterStateCommand) {
UpdateClusterStateCommand updateClusterStateCommand = (UpdateClusterStateCommand) command;
storage.putClusterState(updateClusterStateCommand.clusterState());
clo.result(null);
} else if (command instanceof JoinRequestCommand) {
ValidationResult response = validateNode((JoinRequestCommand) command);
clo.result(response.isValid() ? null : new ValidationErrorResponse(response.errorDescription()));
} else if (command instanceof JoinReadyCommand) {
Serializable response = completeValidation((JoinReadyCommand) command);
if (response == null) {
// It is valid, the topology has been changed.
onLogicalTopologyChanged.accept(clo.term());
}
clo.result(response);
} else if (command instanceof NodesLeaveCommand) {
removeNodesFromLogicalTopology((NodesLeaveCommand) command);
onLogicalTopologyChanged.accept(clo.term());
clo.result(null);
}
}
}
@Nullable
private Serializable initCmgState(InitCmgStateCommand command) {
ClusterState state = storage.getClusterState();
if (state == null) {
storage.putClusterState(command.clusterState());
return command.clusterState();
} else {
ValidationResult validationResult = ValidationManager.validateState(
state,
command.node().asClusterNode(),
command.clusterState()
);
return validationResult.isValid() ? state : new ValidationErrorResponse(validationResult.errorDescription());
}
}
private ValidationResult validateNode(JoinRequestCommand command) {
ClusterNode node = command.node().asClusterNode();
Optional<LogicalNode> previousVersion = logicalTopology.getLogicalTopology().nodes()
.stream()
.filter(n -> n.name().equals(node.name()))
.findAny();
if (previousVersion.isPresent()) {
LogicalNode previousNode = previousVersion.get();
if (previousNode.id().equals(node.id())) {
return ValidationResult.successfulResult();
} else {
// Remove the previous node from the Logical Topology in case we haven't received the disappeared event yet.
logicalTopology.removeNodes(Set.of(previousNode));
}
}
LogicalNode logicalNode = new LogicalNode(
node,
command.node().userAttributes(),
command.node().systemAttributes(),
command.node().storageProfiles()
);
return validationManager.validateNode(storage.getClusterState(), logicalNode, command.igniteVersion(), command.clusterTag());
}
@Nullable
private Serializable completeValidation(JoinReadyCommand command) {
ClusterNode node = command.node().asClusterNode();
LogicalNode logicalNode = new LogicalNode(
node,
command.node().userAttributes(),
command.node().systemAttributes(),
command.node().storageProfiles()
);
if (validationManager.isNodeValidated(logicalNode)) {
validationManager.completeValidation(logicalNode);
logicalTopology.putNode(logicalNode);
return null;
} else {
return new ValidationErrorResponse(String.format("Node \"%s\" has not yet passed the validation step", node));
}
}
private void removeNodesFromLogicalTopology(NodesLeaveCommand command) {
Set<ClusterNode> nodes = command.nodes().stream().map(ClusterNodeMessage::asClusterNode).collect(Collectors.toSet());
// Nodes will be removed from a topology, so it is safe to set nodeAttributes to the default value
Set<LogicalNode> logicalNodes = nodes.stream()
.map(n -> new LogicalNode(n, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyList()))
.collect(Collectors.toSet());
logicalTopology.removeNodes(logicalNodes);
validationManager.removeValidatedNodes(logicalNodes);
if (LOG.isInfoEnabled()) {
LOG.info("Nodes removed from the logical topology [nodes={}]", nodes.stream().map(ClusterNode::name).collect(toList()));
}
}
@Override
public void onSnapshotSave(Path path, Consumer<Throwable> doneClo) {
storage.snapshot(path)
.whenComplete((unused, throwable) -> doneClo.accept(throwable));
}
@Override
public boolean onSnapshotLoad(Path path) {
try {
storage.restoreSnapshot(path);
logicalTopology.fireTopologyLeap();
return true;
} catch (IgniteInternalException e) {
LOG.error("Failed to restore snapshot [path={}]", path, e);
return false;
}
}
@Override
public void onShutdown() {
// Raft storage lifecycle is managed by outside components.
}
@TestOnly
public RaftStorageManager storage() {
return storage;
}
}