blob: 2942c873bca1efff5a92dff562e7f3bc1f69da65 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.SB;
/**
* Distributed process governing WAL state change.
*/
public class WalStateDistributedProcess {
/** Original propose message. */
private final WalStateProposeMessage msg;
/** Remaining nodes. */
@GridToStringInclude
private final Collection<UUID> remainingNodes;
/** Acks. */
private final Map<UUID, WalStateAckMessage> acks;
/**
* Constructor.
*
* @param msg Original propose message.
* @param remainingNodes Remaining nodes.
*/
public WalStateDistributedProcess(WalStateProposeMessage msg, Collection<UUID> remainingNodes) {
assert !F.isEmpty(remainingNodes);
this.msg = msg;
this.remainingNodes = remainingNodes;
acks = new HashMap<>(remainingNodes.size());
}
/**
* Handle node finish.
*
* @param nodeId Node ID.
* @param ack Ack message.
*/
public void onNodeFinished(UUID nodeId, WalStateAckMessage ack) {
remainingNodes.remove(nodeId);
// Log only messages from affinity nodes. Non-affinity nodes are of no interest for us since they do not have
// group context in general case and hence we cannot tell fo sure whether anything was changed or not.
if (ack.affNode())
acks.put(nodeId, ack);
}
/**
* Handle node leave.
*
* @param nodeId Node ID.
*/
public void onNodeLeft(UUID nodeId) {
remainingNodes.remove(nodeId);
}
/**
* @return {@code True} if process is completed.
*/
public boolean completed() {
return remainingNodes.isEmpty();
}
/**
* @return Operation ID.
*/
public UUID operationId() {
return msg.operationId();
}
/**
* Prepare finish message based on obtained results.
*
* @return Message.
*/
public WalStateFinishMessage prepareFinishMessage() {
assert completed();
if (acks.isEmpty()) {
// We haven't received any response from affinity nodes. Result is unknown, so throw an exception.
return new WalStateFinishMessage(msg.operationId(), msg.groupId(), msg.groupDeploymentId(), false,
"Operation result is unknown because all affinity nodes have left the grid.");
}
Map<UUID, String> errs = new HashMap<>();
// Look for errors first.
for (Map.Entry<UUID, WalStateAckMessage> ackEntry : acks.entrySet()) {
UUID nodeId = ackEntry.getKey();
WalStateAckMessage ackMsg = ackEntry.getValue();
assert ackMsg.affNode();
if (ackMsg.errorMessage() != null)
errs.put(nodeId, ackMsg.errorMessage());
}
if (!errs.isEmpty()) {
SB errMsg =
new SB("Operation failed on some nodes (please consult to node logs for more information) [");
boolean first = true;
for (Map.Entry<UUID, String> err : errs.entrySet()) {
if (first)
first = false;
else
errMsg.a(", ");
errMsg.a("[nodeId=" + err.getKey() + ", err=" + err.getValue() + ']');
}
errMsg.a(']');
return new WalStateFinishMessage(msg.operationId(), msg.groupId(), msg.groupDeploymentId(), false,
errMsg.toString());
}
// Verify results consistency. Note that non-affinity node are not present at this stage.
Boolean changed = null;
for (WalStateAckMessage ackMsg : acks.values()) {
boolean curChanged = ackMsg.changed();
if (changed == null)
changed = curChanged;
else {
if (!F.eq(curChanged, changed)) {
return new WalStateFinishMessage(msg.operationId(), msg.groupId(), msg.groupDeploymentId(),
false, "Operation result is unknown because nodes reported different results (please " +
"re-try operation).");
}
}
}
// All nodes completed operation with the same result, complete with success.
assert changed != null;
return new WalStateFinishMessage(msg.operationId(), msg.groupId(), msg.groupDeploymentId(), changed, null);
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(WalStateDistributedProcess.class, this);
}
}